1use crate::runtime::dataflow_debugger::{
7 DataflowDebugger, PipelineStage, SessionState, StageStatus,
8 MaterializedFrame,
9};
10use anyhow::Result;
11use std::collections::HashMap;
12use std::io::{self, Write};
13use std::sync::{Arc, Mutex};
14use std::time::{Duration, Instant};
15
16pub struct DataflowUI {
18 debugger: Arc<Mutex<DataflowDebugger>>,
20
21 config: UIConfig,
23
24 display_mode: DisplayMode,
26
27 #[allow(dead_code)] terminal_size: (u16, u16), refresh_interval: Duration,
33
34 last_refresh: Instant,
36
37 colors_enabled: bool,
39}
40
41#[derive(Debug, Clone)]
43pub struct UIConfig {
44 pub max_preview_rows: usize,
46
47 pub max_history_events: usize,
49
50 pub auto_refresh: bool,
52
53 pub refresh_interval_ms: u64,
55
56 pub show_metrics: bool,
58
59 pub enable_colors: bool,
61
62 pub compact_mode: bool,
64}
65
66#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum DisplayMode {
69 Overview,
71 StageDetail(String),
73 Breakpoints,
75 DataViewer(String),
77 Metrics,
79 History,
81 Diff(String, String),
83 Help,
85}
86
87impl Default for UIConfig {
88 fn default() -> Self {
89 Self {
90 max_preview_rows: 20,
91 max_history_events: 100,
92 auto_refresh: true,
93 refresh_interval_ms: 1000,
94 show_metrics: true,
95 enable_colors: true,
96 compact_mode: false,
97 }
98 }
99}
100
101impl DataflowUI {
102 pub fn new(debugger: Arc<Mutex<DataflowDebugger>>, config: UIConfig) -> Self {
104 Self {
105 debugger,
106 config: config.clone(),
107 display_mode: DisplayMode::Overview,
108 terminal_size: Self::get_terminal_size(),
109 refresh_interval: Duration::from_millis(config.refresh_interval_ms),
110 last_refresh: Instant::now(),
111 colors_enabled: config.enable_colors,
112 }
113 }
114
115 pub fn run_interactive(&mut self) -> Result<()> {
117 self.print_header()?;
118 self.print_help_hint()?;
119
120 loop {
121 self.refresh_display()?;
122
123 if let Some(command) = self.get_user_input()? {
124 match self.handle_command(&command)? {
125 UIAction::Continue => continue,
126 UIAction::Exit => break,
127 }
128 }
129
130 if self.config.auto_refresh && self.last_refresh.elapsed() >= self.refresh_interval {
132 self.refresh_display()?;
133 }
134
135 std::thread::sleep(Duration::from_millis(100));
136 }
137
138 Ok(())
139 }
140
141 pub fn refresh_display(&mut self) -> Result<()> {
143 self.clear_screen()?;
144
145 match &self.display_mode {
146 DisplayMode::Overview => self.render_overview()?,
147 DisplayMode::StageDetail(stage_id) => self.render_stage_detail(stage_id)?,
148 DisplayMode::Breakpoints => self.render_breakpoints()?,
149 DisplayMode::DataViewer(stage_id) => self.render_data_viewer(stage_id)?,
150 DisplayMode::Metrics => self.render_metrics()?,
151 DisplayMode::History => self.render_history()?,
152 DisplayMode::Diff(stage1, stage2) => self.render_diff(stage1, stage2)?,
153 DisplayMode::Help => self.render_help()?,
154 }
155
156 self.print_status_bar()?;
157 self.print_command_prompt()?;
158
159 self.last_refresh = Instant::now();
160 Ok(())
161 }
162
163 fn render_overview(&self) -> Result<()> {
165 self.print_title("📊 Dataflow Pipeline Overview")?;
166
167 let debugger = self.debugger
168 .lock()
169 .map_err(|_| anyhow::anyhow!("Failed to acquire debugger lock"))?;
170
171 let session_status = debugger.get_session_status()?;
172 self.render_session_info(&session_status)?;
173
174 let sample_stages = self.get_sample_stages();
176
177 println!();
178 self.print_section_header("Pipeline Stages")?;
179 self.print_separator()?;
180
181 if self.colors_enabled {
182 println!("{:<4} {:<20} {:<12} {:<10} {:<15} {:<10}",
183 "#", "Stage Name", "Type", "Status", "Rows", "Time");
184 } else {
185 println!("{:<4} {:<20} {:<12} {:<10} {:<15} {:<10}",
186 "#", "Stage Name", "Type", "Status", "Rows", "Time");
187 }
188
189 self.print_separator()?;
190
191 for (i, stage) in sample_stages.iter().enumerate() {
192 let status_color = if self.colors_enabled {
193 match stage.status {
194 StageStatus::Completed => "\x1b[32m", StageStatus::Running => "\x1b[33m", StageStatus::Failed(_) => "\x1b[31m", StageStatus::Paused => "\x1b[36m", _ => "\x1b[37m", }
200 } else {
201 ""
202 };
203
204 let reset_color = if self.colors_enabled { "\x1b[0m" } else { "" };
205
206 let time_str = stage.execution_time.map_or_else(|| "-".to_string(), |t| format!("{}ms", t.as_millis()));
207
208 let rows_str = stage.rows_processed.map_or_else(|| "-".to_string(), |r| format!("{r}"));
209
210 println!("{:<4} {:<20} {:<12} {}{:<10}{} {:<15} {:<10}",
211 i + 1,
212 stage.stage_name,
213 stage.stage_type,
214 status_color,
215 stage.status,
216 reset_color,
217 rows_str,
218 time_str);
219 }
220
221 Ok(())
222 }
223
224 fn render_stage_detail(&self, stage_id: &str) -> Result<()> {
226 self.print_title(&format!("🔍 Stage Detail: {stage_id}"))?;
227
228 let stage = self.get_sample_stage(stage_id);
230
231 self.print_section_header("Stage Information")?;
232 println!("ID: {}", stage.stage_id);
233 println!("Name: {}", stage.stage_name);
234 println!("Type: {}", stage.stage_type);
235 println!("Status: {}", stage.status);
236
237 if let Some(time) = stage.execution_time {
238 println!("Execution Time: {}ms", time.as_millis());
239 }
240
241 if let Some(rows) = stage.rows_processed {
242 println!("Rows Processed: {rows}");
243 }
244
245 if let Some(memory) = stage.memory_usage {
246 println!("Memory Usage: {}", self.format_bytes(memory));
247 }
248
249 if let Some(input_schema) = &stage.input_schema {
251 println!();
252 self.print_section_header("Input Schema")?;
253 for col in &input_schema.columns {
254 println!(" {}: {} (nullable: {})", col.name, col.data_type, col.nullable);
255 }
256 }
257
258 if let Some(output_schema) = &stage.output_schema {
259 println!();
260 self.print_section_header("Output Schema")?;
261 for col in &output_schema.columns {
262 println!(" {}: {} (nullable: {})", col.name, col.data_type, col.nullable);
263 }
264 }
265
266 if !stage.metadata.is_empty() {
268 println!();
269 self.print_section_header("Metadata")?;
270 for (key, value) in &stage.metadata {
271 println!(" {key}: {value}");
272 }
273 }
274
275 Ok(())
276 }
277
278 fn render_breakpoints(&self) -> Result<()> {
280 self.print_title("🔴 Breakpoint Management")?;
281
282 println!("Active Breakpoints:");
283 self.print_separator()?;
284 println!("{:<20} {:<10} {:<15} {:<8}", "Stage ID", "Condition", "Actions", "Hit Count");
285 self.print_separator()?;
286
287 println!("{:<20} {:<10} {:<15} {:<8}", "load_data", "Always", "Pause,Print", "3");
289 println!("{:<20} {:<10} {:<15} {:<8}", "filter_stage", "RowCount>1000", "Materialize", "1");
290
291 println!();
292 println!("Commands:");
293 println!(" add <stage_id> [condition] - Add breakpoint");
294 println!(" remove <stage_id> - Remove breakpoint");
295 println!(" toggle <stage_id> - Toggle breakpoint");
296 println!(" clear - Clear all breakpoints");
297
298 Ok(())
299 }
300
301 fn render_data_viewer(&self, stage_id: &str) -> Result<()> {
303 self.print_title(&format!("📋 Data Viewer: {stage_id}"))?;
304
305 let _debugger = self.debugger
306 .lock()
307 .map_err(|_| anyhow::anyhow!("Failed to acquire debugger lock"))?;
308
309 let sample_data = self.get_sample_materialized_data(stage_id);
311
312 self.print_section_header(&format!("Sample Data ({} rows)", sample_data.total_rows))?;
313
314 println!("Schema:");
316 for col in &sample_data.schema.columns {
317 println!(" {}: {}", col.name, col.data_type);
318 }
319
320 println!();
321 println!("Data Preview (showing {} of {} rows):",
322 sample_data.sample_data.len(), sample_data.total_rows);
323
324 self.print_separator()?;
325
326 for col in &sample_data.schema.columns {
328 print!("{:<15} ", col.name);
329 }
330 println!();
331
332 self.print_separator()?;
333
334 for row in &sample_data.sample_data {
336 for value in &row.values {
337 let value_str = format!("{value}");
338 let truncated = if value_str.len() > 14 {
339 format!("{}...", &value_str[..11])
340 } else {
341 value_str
342 };
343 print!("{truncated:<15} ");
344 }
345 println!();
346 }
347
348 println!();
349 println!("Memory Size: {}", self.format_bytes(sample_data.memory_size));
350 println!("Materialized: {}", self.format_timestamp(sample_data.timestamp));
351
352 Ok(())
353 }
354
355 fn render_metrics(&self) -> Result<()> {
357 self.print_title("⚡ Performance Metrics")?;
358
359 let debugger = self.debugger
360 .lock()
361 .map_err(|_| anyhow::anyhow!("Failed to acquire debugger lock"))?;
362
363 let metrics = debugger.get_stage_metrics()?;
364
365 if metrics.is_empty() {
366 println!("No performance metrics available yet.");
367 return Ok(());
368 }
369
370 self.print_section_header("Stage Performance")?;
371 self.print_separator()?;
372 println!("{:<20} {:<10} {:<12} {:<12} {:<10} {:<10}",
373 "Stage", "Time (ms)", "Memory (MB)", "Input Rows", "Output Rows", "Cache Hit");
374 self.print_separator()?;
375
376 for (stage_id, metric) in &metrics {
377 let cache_hit = metric.cache_hit_ratio.map_or_else(|| "-".to_string(), |r| format!("{:.1}%", r * 100.0));
378
379 println!("{:<20} {:<10} {:<12} {:<12} {:<10} {:<10}",
380 stage_id,
381 metric.execution_time.as_millis(),
382 metric.peak_memory / (1024 * 1024),
383 metric.input_rows,
384 metric.output_rows,
385 cache_hit);
386 }
387
388 let total_time: Duration = metrics.values().map(|m| m.execution_time).sum();
390 let total_memory: usize = metrics.values().map(|m| m.peak_memory).sum();
391 let total_rows: usize = metrics.values().map(|m| m.output_rows).sum();
392
393 println!();
394 self.print_section_header("Summary")?;
395 println!("Total Execution Time: {}ms", total_time.as_millis());
396 println!("Peak Memory Usage: {}", self.format_bytes(total_memory));
397 println!("Total Rows Processed: {total_rows}");
398
399 Ok(())
400 }
401
402 fn render_history(&self) -> Result<()> {
404 self.print_title("📜 Execution History")?;
405
406 let debugger = self.debugger
407 .lock()
408 .map_err(|_| anyhow::anyhow!("Failed to acquire debugger lock"))?;
409
410 let history = debugger.get_execution_history()?;
411 let recent_events = history.iter().rev().take(self.config.max_history_events);
412
413 self.print_separator()?;
414 println!("{:<20} {:<15} {:<20} {:<30}", "Timestamp", "Event Type", "Stage", "Details");
415 self.print_separator()?;
416
417 for event in recent_events {
418 let timestamp_str = self.format_timestamp(event.timestamp);
419 let details = event.data.iter()
420 .map(|(k, v)| format!("{k}:{v}"))
421 .collect::<Vec<_>>()
422 .join(", ");
423
424 println!("{:<20} {:<15} {:<20} {:<30}",
425 timestamp_str,
426 format!("{:?}", event.event_type),
427 event.stage_id,
428 details);
429 }
430
431 Ok(())
432 }
433
434 fn render_diff(&self, stage1: &str, stage2: &str) -> Result<()> {
436 self.print_title(&format!("🔄 Stage Diff: {stage1} → {stage2}"))?;
437
438 let debugger = self.debugger
439 .lock()
440 .map_err(|_| anyhow::anyhow!("Failed to acquire debugger lock"))?;
441
442 match debugger.compute_stage_diff(stage1, stage2) {
444 Ok(diff) => {
445 self.print_section_header("Diff Summary")?;
446 println!("Row Count Change: {}", diff.row_count_diff);
447 println!("Schema Changed: {}", diff.schema_changed);
448
449 if !diff.column_changes.is_empty() {
450 println!();
451 self.print_section_header("Column Changes")?;
452 for change in &diff.column_changes {
453 println!(" {change:?}");
454 }
455 }
456
457 if !diff.data_changes.is_empty() {
458 println!();
459 self.print_section_header("Data Changes")?;
460 for change in &diff.data_changes {
461 println!(" {change:?}");
462 }
463 }
464 }
465 Err(e) => {
466 println!("Error computing diff: {e}");
467 println!("Make sure both stages have materialized data.");
468 }
469 }
470
471 Ok(())
472 }
473
474 fn render_help(&self) -> Result<()> {
476 self.print_title("❓ Dataflow Debugger Help")?;
477
478 println!("Navigation Commands:");
479 println!(" overview - Show pipeline overview");
480 println!(" stage <id> - Show stage details");
481 println!(" breakpoints - Manage breakpoints");
482 println!(" data <stage_id> - View materialized data");
483 println!(" metrics - Show performance metrics");
484 println!(" history - Show execution history");
485 println!(" diff <stage1> <stage2> - Compare stages");
486 println!(" help - Show this help");
487 println!(" quit/exit - Exit debugger");
488
489 println!();
490 println!("Debugging Commands:");
491 println!(" materialize <stage_id> - Materialize stage data");
492 println!(" break <stage_id> - Add breakpoint");
493 println!(" continue - Continue execution");
494 println!(" step - Execute next stage");
495 println!(" export <format> <path> - Export debug data");
496
497 println!();
498 println!("Display Commands:");
499 println!(" refresh - Refresh current view");
500 println!(" colors on/off - Toggle color output");
501 println!(" compact on/off - Toggle compact mode");
502
503 Ok(())
504 }
505
506 fn handle_command(&mut self, command: &str) -> Result<UIAction> {
508 let parts: Vec<&str> = command.split_whitespace().collect();
509 if parts.is_empty() {
510 return Ok(UIAction::Continue);
511 }
512
513 match parts[0].to_lowercase().as_str() {
514 "quit" | "exit" | "q" => Ok(UIAction::Exit),
515 "help" | "h" => {
516 self.display_mode = DisplayMode::Help;
517 Ok(UIAction::Continue)
518 }
519 "overview" | "o" => {
520 self.display_mode = DisplayMode::Overview;
521 Ok(UIAction::Continue)
522 }
523 "stage" | "s" => {
524 if parts.len() > 1 {
525 self.display_mode = DisplayMode::StageDetail(parts[1].to_string());
526 } else {
527 println!("Usage: stage <stage_id>");
528 }
529 Ok(UIAction::Continue)
530 }
531 "breakpoints" | "bp" => {
532 self.display_mode = DisplayMode::Breakpoints;
533 Ok(UIAction::Continue)
534 }
535 "data" | "d" => {
536 if parts.len() > 1 {
537 self.display_mode = DisplayMode::DataViewer(parts[1].to_string());
538 } else {
539 println!("Usage: data <stage_id>");
540 }
541 Ok(UIAction::Continue)
542 }
543 "metrics" | "m" => {
544 self.display_mode = DisplayMode::Metrics;
545 Ok(UIAction::Continue)
546 }
547 "history" | "hist" => {
548 self.display_mode = DisplayMode::History;
549 Ok(UIAction::Continue)
550 }
551 "diff" => {
552 if parts.len() > 2 {
553 self.display_mode = DisplayMode::Diff(parts[1].to_string(), parts[2].to_string());
554 } else {
555 println!("Usage: diff <stage1> <stage2>");
556 }
557 Ok(UIAction::Continue)
558 }
559 "refresh" | "r" => {
560 self.last_refresh = Instant::now().checked_sub(self.refresh_interval).unwrap();
562 Ok(UIAction::Continue)
563 }
564 "colors" => {
565 if parts.len() > 1 {
566 match parts[1] {
567 "on" => self.colors_enabled = true,
568 "off" => self.colors_enabled = false,
569 _ => println!("Usage: colors on/off"),
570 }
571 }
572 Ok(UIAction::Continue)
573 }
574 "materialize" => {
575 if parts.len() > 1 {
576 let debugger = self.debugger
577 .lock()
578 .map_err(|_| anyhow::anyhow!("Failed to acquire debugger lock"))?;
579 match debugger.materialize_stage(parts[1]) {
580 Ok(_) => println!("Materialized data for stage: {}", parts[1]),
581 Err(e) => println!("Failed to materialize: {e}"),
582 }
583 } else {
584 println!("Usage: materialize <stage_id>");
585 }
586 Ok(UIAction::Continue)
587 }
588 _ => {
589 println!("Unknown command: {}. Type 'help' for available commands.", parts[0]);
590 Ok(UIAction::Continue)
591 }
592 }
593 }
594
595 fn get_user_input(&self) -> Result<Option<String>> {
597 print!("> ");
598 io::stdout().flush()?;
599
600 let mut input = String::new();
601 match io::stdin().read_line(&mut input) {
602 Ok(0) => Ok(None), Ok(_) => Ok(Some(input.trim().to_string())),
604 Err(e) => Err(anyhow::anyhow!("Failed to read input: {e}")),
605 }
606 }
607
608 fn clear_screen(&self) -> Result<()> {
611 print!("\x1b[2J\x1b[H");
612 io::stdout().flush()?;
613 Ok(())
614 }
615
616 fn print_header(&self) -> Result<()> {
617 if self.colors_enabled {
618 println!("\x1b[1;34m╔══════════════════════════════════════════════════════════════════════════════╗\x1b[0m");
619 println!("\x1b[1;34m║ RUCHY DATAFLOW DEBUGGER ║\x1b[0m");
620 println!("\x1b[1;34m╚══════════════════════════════════════════════════════════════════════════════╝\x1b[0m");
621 } else {
622 println!("===============================================================================");
623 println!(" RUCHY DATAFLOW DEBUGGER ");
624 println!("===============================================================================");
625 }
626 Ok(())
627 }
628
629 fn print_title(&self, title: &str) -> Result<()> {
630 println!();
631 if self.colors_enabled {
632 println!("\x1b[1;36m{title}\x1b[0m");
633 } else {
634 println!("{title}");
635 }
636 println!();
637 Ok(())
638 }
639
640 fn print_section_header(&self, header: &str) -> Result<()> {
641 if self.colors_enabled {
642 println!("\x1b[1;33m{header}:\x1b[0m");
643 } else {
644 println!("{header}:");
645 }
646 Ok(())
647 }
648
649 fn print_separator(&self) -> Result<()> {
650 println!("{}", "-".repeat(80));
651 Ok(())
652 }
653
654 fn print_help_hint(&self) -> Result<()> {
655 println!("Type 'help' for commands, 'quit' to exit");
656 println!();
657 Ok(())
658 }
659
660 fn print_status_bar(&self) -> Result<()> {
661 let status = format!("Mode: {:?} | Auto-refresh: {} | Colors: {}",
662 self.display_mode,
663 self.config.auto_refresh,
664 self.colors_enabled);
665
666 if self.colors_enabled {
667 println!("\n\x1b[7m{status:<80}\x1b[0m");
668 } else {
669 println!("\n{status}");
670 }
671 Ok(())
672 }
673
674 fn print_command_prompt(&self) -> Result<()> {
675 println!();
676 Ok(())
677 }
678
679 fn get_terminal_size() -> (u16, u16) {
680 (80, 24)
682 }
683
684 fn format_bytes(&self, bytes: usize) -> String {
685 const UNITS: &[&str] = &["B", "KB", "MB", "GB"];
686 let mut size = bytes as f64;
687 let mut unit_index = 0;
688
689 while size >= 1024.0 && unit_index < UNITS.len() - 1 {
690 size /= 1024.0;
691 unit_index += 1;
692 }
693
694 if size >= 100.0 {
695 format!("{:.0}{}", size, UNITS[unit_index])
696 } else if size >= 10.0 {
697 format!("{:.1}{}", size, UNITS[unit_index])
698 } else {
699 format!("{:.2}{}", size, UNITS[unit_index])
700 }
701 }
702
703 fn format_timestamp(&self, timestamp: std::time::SystemTime) -> String {
704 format!("{:?}", timestamp.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs())
706 }
707
708 fn render_session_info(&self, session: &SessionState) -> Result<()> {
709 self.print_section_header("Session Status")?;
710 println!("Active: {}", session.active);
711 if let Some(current) = &session.current_stage {
712 println!("Current Stage: {current}");
713 }
714 println!("Total Time: {}ms", session.total_execution_time.as_millis());
715 println!("Breakpoints Hit: {}", session.breakpoints_hit);
716 Ok(())
717 }
718
719 fn get_sample_stages(&self) -> Vec<PipelineStage> {
722 use crate::runtime::dataflow_debugger::{StageType, StageStatus};
723
724 vec![
725 PipelineStage {
726 stage_id: "load_data".to_string(),
727 stage_name: "Load CSV Data".to_string(),
728 stage_type: StageType::Load,
729 status: StageStatus::Completed,
730 input_schema: None,
731 output_schema: None,
732 execution_time: Some(Duration::from_millis(120)),
733 memory_usage: Some(1024 * 1024 * 5), rows_processed: Some(10000),
735 metadata: HashMap::new(),
736 },
737 PipelineStage {
738 stage_id: "filter_age".to_string(),
739 stage_name: "Filter by Age".to_string(),
740 stage_type: StageType::Filter,
741 status: StageStatus::Completed,
742 input_schema: None,
743 output_schema: None,
744 execution_time: Some(Duration::from_millis(45)),
745 memory_usage: Some(1024 * 1024 * 3), rows_processed: Some(7500),
747 metadata: HashMap::new(),
748 },
749 PipelineStage {
750 stage_id: "group_by_city".to_string(),
751 stage_name: "Group by City".to_string(),
752 stage_type: StageType::GroupBy,
753 status: StageStatus::Running,
754 input_schema: None,
755 output_schema: None,
756 execution_time: None,
757 memory_usage: None,
758 rows_processed: None,
759 metadata: HashMap::new(),
760 },
761 ]
762 }
763
764 fn get_sample_stage(&self, _stage_id: &str) -> PipelineStage {
765 use crate::runtime::dataflow_debugger::{StageType, StageStatus, DataSchema, ColumnDef, DataType};
766
767 PipelineStage {
768 stage_id: "load_data".to_string(),
769 stage_name: "Load CSV Data".to_string(),
770 stage_type: StageType::Load,
771 status: StageStatus::Completed,
772 input_schema: None,
773 output_schema: Some(DataSchema {
774 columns: vec![
775 ColumnDef {
776 name: "id".to_string(),
777 data_type: DataType::Integer,
778 nullable: false,
779 },
780 ColumnDef {
781 name: "name".to_string(),
782 data_type: DataType::String,
783 nullable: false,
784 },
785 ColumnDef {
786 name: "age".to_string(),
787 data_type: DataType::Integer,
788 nullable: false,
789 },
790 ColumnDef {
791 name: "city".to_string(),
792 data_type: DataType::String,
793 nullable: true,
794 },
795 ],
796 schema_hash: 12345,
797 }),
798 execution_time: Some(Duration::from_millis(120)),
799 memory_usage: Some(1024 * 1024 * 5), rows_processed: Some(10000),
801 metadata: HashMap::from([
802 ("file_path".to_string(), "/data/users.csv".to_string()),
803 ("encoding".to_string(), "UTF-8".to_string()),
804 ]),
805 }
806 }
807
808 fn get_sample_materialized_data(&self, _stage_id: &str) -> MaterializedFrame {
809 use crate::runtime::dataflow_debugger::{
810 MaterializedFrame, DataSchema, ColumnDef, DataType, DataRow, DataValue
811 };
812
813 MaterializedFrame {
814 stage_id: "load_data".to_string(),
815 schema: DataSchema {
816 columns: vec![
817 ColumnDef {
818 name: "id".to_string(),
819 data_type: DataType::Integer,
820 nullable: false,
821 },
822 ColumnDef {
823 name: "name".to_string(),
824 data_type: DataType::String,
825 nullable: false,
826 },
827 ColumnDef {
828 name: "age".to_string(),
829 data_type: DataType::Integer,
830 nullable: false,
831 },
832 ],
833 schema_hash: 12345,
834 },
835 sample_data: vec![
836 DataRow {
837 values: vec![
838 DataValue::Integer(1),
839 DataValue::String("Alice".to_string()),
840 DataValue::Integer(30),
841 ],
842 },
843 DataRow {
844 values: vec![
845 DataValue::Integer(2),
846 DataValue::String("Bob".to_string()),
847 DataValue::Integer(25),
848 ],
849 },
850 DataRow {
851 values: vec![
852 DataValue::Integer(3),
853 DataValue::String("Charlie".to_string()),
854 DataValue::Integer(35),
855 ],
856 },
857 ],
858 total_rows: 10000,
859 timestamp: std::time::SystemTime::now(),
860 memory_size: 1024 * 50, }
862 }
863}
864
865#[derive(Debug, Clone, PartialEq, Eq)]
867pub enum UIAction {
868 Continue,
870 Exit,
872}
873
874#[cfg(test)]
875mod tests {
876 use super::*;
877 use crate::runtime::dataflow_debugger::{
878 DataflowDebugger, DataflowConfig
879 };
880 use std::sync::{Arc, Mutex};
881 use std::time::{Duration, Instant};
882
883 fn create_test_debugger() -> Arc<Mutex<DataflowDebugger>> {
885 let config = DataflowConfig::default();
886 Arc::new(Mutex::new(DataflowDebugger::new(config)))
887 }
888
889 fn create_test_ui_config() -> UIConfig {
890 UIConfig {
891 max_preview_rows: 5,
892 max_history_events: 10,
893 auto_refresh: false,
894 refresh_interval_ms: 500,
895 show_metrics: true,
896 enable_colors: false, compact_mode: true,
898 }
899 }
900
901 fn create_test_ui_with_config(config: UIConfig) -> DataflowUI {
902 let debugger = create_test_debugger();
903 DataflowUI::new(debugger, config)
904 }
905
906 fn create_test_ui() -> DataflowUI {
907 create_test_ui_with_config(create_test_ui_config())
908 }
909
910 fn create_test_pipeline_stage() -> PipelineStage {
911 use crate::runtime::dataflow_debugger::{StageType, StageStatus};
912
913 PipelineStage {
914 stage_id: "test_stage".to_string(),
915 stage_name: "Test Stage".to_string(),
916 stage_type: StageType::Filter,
917 status: StageStatus::Running,
918 input_schema: None,
919 output_schema: None,
920 execution_time: Some(Duration::from_millis(150)),
921 memory_usage: Some(1024 * 64), rows_processed: Some(500),
923 metadata: std::collections::HashMap::new(),
924 }
925 }
926
927 fn create_test_materialized_frame() -> MaterializedFrame {
928 use crate::runtime::dataflow_debugger::{DataSchema, ColumnDef, DataType, DataRow, DataValue};
929 use std::time::SystemTime;
930
931 MaterializedFrame {
932 stage_id: "test_stage".to_string(),
933 schema: DataSchema {
934 columns: vec![
935 ColumnDef {
936 name: "id".to_string(),
937 data_type: DataType::Integer,
938 nullable: false,
939 },
940 ColumnDef {
941 name: "name".to_string(),
942 data_type: DataType::String,
943 nullable: false,
944 },
945 ],
946 schema_hash: 54321,
947 },
948 sample_data: vec![
949 DataRow {
950 values: vec![
951 DataValue::Integer(1),
952 DataValue::String("Test User".to_string()),
953 ],
954 },
955 ],
956 total_rows: 100,
957 timestamp: SystemTime::now(),
958 memory_size: 1024 * 8, }
960 }
961
962 #[test]
965 fn test_ui_config_default() {
966 let config = UIConfig::default();
967 assert_eq!(config.max_preview_rows, 20);
968 assert_eq!(config.max_history_events, 100);
969 assert!(config.auto_refresh);
970 assert_eq!(config.refresh_interval_ms, 1000);
971 assert!(config.show_metrics);
972 assert!(config.enable_colors);
973 assert!(!config.compact_mode);
974 }
975
976 #[test]
977 fn test_ui_config_clone() {
978 let config1 = UIConfig::default();
979 let config2 = config1.clone();
980 assert_eq!(config1.max_preview_rows, config2.max_preview_rows);
981 assert_eq!(config1.auto_refresh, config2.auto_refresh);
982 }
983
984 #[test]
985 fn test_ui_config_debug() {
986 let config = UIConfig::default();
987 let debug_str = format!("{:?}", config);
988 assert!(debug_str.contains("UIConfig"));
989 assert!(debug_str.contains("max_preview_rows"));
990 assert!(debug_str.contains("auto_refresh"));
991 }
992
993 #[test]
996 fn test_display_mode_variants() {
997 let modes = vec![
998 DisplayMode::Overview,
999 DisplayMode::StageDetail("test_stage".to_string()),
1000 DisplayMode::Breakpoints,
1001 DisplayMode::DataViewer("test_data".to_string()),
1002 DisplayMode::Metrics,
1003 DisplayMode::History,
1004 DisplayMode::Diff("stage1".to_string(), "stage2".to_string()),
1005 DisplayMode::Help,
1006 ];
1007
1008 assert_eq!(modes.len(), 8);
1009 assert_eq!(modes[0], DisplayMode::Overview);
1010 }
1011
1012 #[test]
1013 fn test_display_mode_equality() {
1014 let mode1 = DisplayMode::StageDetail("test".to_string());
1015 let mode2 = DisplayMode::StageDetail("test".to_string());
1016 let mode3 = DisplayMode::StageDetail("other".to_string());
1017
1018 assert_eq!(mode1, mode2);
1019 assert_ne!(mode1, mode3);
1020 assert_ne!(mode1, DisplayMode::Overview);
1021 }
1022
1023 #[test]
1024 fn test_display_mode_clone() {
1025 let mode1 = DisplayMode::Diff("a".to_string(), "b".to_string());
1026 let mode2 = mode1.clone();
1027 assert_eq!(mode1, mode2);
1028 }
1029
1030 #[test]
1031 fn test_display_mode_debug() {
1032 let mode = DisplayMode::DataViewer("test_viewer".to_string());
1033 let debug_str = format!("{:?}", mode);
1034 assert!(debug_str.contains("DataViewer"));
1035 assert!(debug_str.contains("test_viewer"));
1036 }
1037
1038 #[test]
1041 fn test_ui_action_variants() {
1042 let actions = vec![UIAction::Continue, UIAction::Exit];
1043 assert_eq!(actions.len(), 2);
1044 assert_eq!(actions[0], UIAction::Continue);
1045 assert_eq!(actions[1], UIAction::Exit);
1046 }
1047
1048 #[test]
1049 fn test_ui_action_equality() {
1050 assert_eq!(UIAction::Continue, UIAction::Continue);
1051 assert_eq!(UIAction::Exit, UIAction::Exit);
1052 assert_ne!(UIAction::Continue, UIAction::Exit);
1053 }
1054
1055 #[test]
1056 fn test_ui_action_clone_debug() {
1057 let action = UIAction::Continue;
1058 let cloned = action.clone();
1059 assert_eq!(action, cloned);
1060
1061 let debug_str = format!("{:?}", action);
1062 assert!(debug_str.contains("Continue"));
1063 }
1064
1065 #[test]
1068 fn test_dataflow_ui_creation() {
1069 let debugger = create_test_debugger();
1070 let config = create_test_ui_config();
1071 let ui = DataflowUI::new(debugger, config.clone());
1072
1073 assert_eq!(ui.display_mode, DisplayMode::Overview);
1074 assert_eq!(ui.config.max_preview_rows, config.max_preview_rows);
1075 assert!(!ui.colors_enabled);
1076 }
1077
1078 #[test]
1079 fn test_dataflow_ui_with_default_config() {
1080 let debugger = create_test_debugger();
1081 let ui = DataflowUI::new(debugger, UIConfig::default());
1082
1083 assert_eq!(ui.display_mode, DisplayMode::Overview);
1084 assert_eq!(ui.config.max_preview_rows, 20);
1085 assert!(ui.colors_enabled);
1086 }
1087
1088 #[test]
1089 fn test_dataflow_ui_terminal_size() {
1090 let ui = create_test_ui();
1091 assert!(ui.terminal_size.0 > 0);
1093 assert!(ui.terminal_size.1 > 0);
1094 }
1095
1096 #[test]
1097 fn test_dataflow_ui_refresh_timing() {
1098 let ui = create_test_ui();
1099 let now = Instant::now();
1100 assert!(now.duration_since(ui.last_refresh) < Duration::from_secs(1));
1102 }
1103
1104 #[test]
1107 fn test_set_display_mode() {
1108 let mut ui = create_test_ui();
1109 assert_eq!(ui.display_mode, DisplayMode::Overview);
1110
1111 ui.display_mode = DisplayMode::Metrics;
1112 assert_eq!(ui.display_mode, DisplayMode::Metrics);
1113
1114 ui.display_mode = DisplayMode::StageDetail("test".to_string());
1115 assert_eq!(ui.display_mode, DisplayMode::StageDetail("test".to_string()));
1116 }
1117
1118 #[test]
1119 fn test_get_current_display_mode() {
1120 let mut ui = create_test_ui();
1121 assert_eq!(ui.display_mode, DisplayMode::Overview);
1122
1123 ui.display_mode = DisplayMode::Help;
1124 assert_eq!(ui.display_mode, DisplayMode::Help);
1125 }
1126
1127 #[test]
1128 fn test_toggle_colors() {
1129 let mut ui = create_test_ui();
1130 let initial_colors = ui.colors_enabled;
1131
1132 ui.colors_enabled = !ui.colors_enabled;
1133 assert_eq!(ui.colors_enabled, !initial_colors);
1134
1135 ui.colors_enabled = !ui.colors_enabled;
1136 assert_eq!(ui.colors_enabled, initial_colors);
1137 }
1138
1139 #[test]
1142 fn test_render_overview() {
1143 let ui = create_test_ui();
1144 let result = ui.render_overview();
1145 assert!(result.is_ok());
1146 }
1147
1148 #[test]
1149 fn test_render_stage_detail() {
1150 let ui = create_test_ui();
1151 let stage_id = "test_stage";
1152 let result = ui.render_stage_detail(stage_id);
1153 assert!(result.is_ok());
1154 }
1155
1156 #[test]
1157 fn test_render_breakpoints() {
1158 let ui = create_test_ui();
1159 let result = ui.render_breakpoints();
1160 assert!(result.is_ok());
1161 }
1162
1163 #[test]
1164 fn test_render_data_viewer() {
1165 let ui = create_test_ui();
1166 let stage_id = "test_stage";
1167 let result = ui.render_data_viewer(stage_id);
1168 assert!(result.is_ok());
1169 }
1170
1171 #[test]
1172 fn test_render_metrics() {
1173 let ui = create_test_ui();
1174 let result = ui.render_metrics();
1175 assert!(result.is_ok());
1176 }
1177
1178 #[test]
1179 fn test_render_history() {
1180 let ui = create_test_ui();
1181 let result = ui.render_history();
1182 assert!(result.is_ok());
1183 }
1184
1185 #[test]
1186 fn test_render_diff() {
1187 let ui = create_test_ui();
1188 let stage1 = "stage_a";
1189 let stage2 = "stage_b";
1190 let result = ui.render_diff(stage1, stage2);
1191 assert!(result.is_ok());
1192 }
1193
1194 #[test]
1195 fn test_render_help() {
1196 let ui = create_test_ui();
1197 let result = ui.render_help();
1198 assert!(result.is_ok());
1199 }
1200
1201 #[test]
1204 fn test_handle_command_navigation() {
1205 let mut ui = create_test_ui();
1206
1207 assert_eq!(ui.handle_command("overview").unwrap(), UIAction::Continue);
1209 assert_eq!(ui.display_mode, DisplayMode::Overview);
1210
1211 assert_eq!(ui.handle_command("metrics").unwrap(), UIAction::Continue);
1212 assert_eq!(ui.display_mode, DisplayMode::Metrics);
1213
1214 assert_eq!(ui.handle_command("history").unwrap(), UIAction::Continue);
1215 assert_eq!(ui.display_mode, DisplayMode::History);
1216
1217 assert_eq!(ui.handle_command("help").unwrap(), UIAction::Continue);
1218 assert_eq!(ui.display_mode, DisplayMode::Help);
1219 }
1220
1221 #[test]
1222 fn test_handle_command_quit() {
1223 let mut ui = create_test_ui();
1224 assert_eq!(ui.handle_command("quit").unwrap(), UIAction::Exit);
1225 assert_eq!(ui.handle_command("exit").unwrap(), UIAction::Exit);
1226 assert_eq!(ui.handle_command("q").unwrap(), UIAction::Exit);
1227 }
1228
1229 #[test]
1230 fn test_handle_command_breakpoints() {
1231 let mut ui = create_test_ui();
1232 assert_eq!(ui.handle_command("breakpoints").unwrap(), UIAction::Continue);
1233 assert_eq!(ui.display_mode, DisplayMode::Breakpoints);
1234 }
1235
1236 #[test]
1237 fn test_handle_command_colors() {
1238 let mut ui = create_test_ui();
1239
1240 ui.handle_command("colors on").unwrap();
1241 assert!(ui.colors_enabled);
1242
1243 ui.handle_command("colors off").unwrap();
1244 assert!(!ui.colors_enabled);
1245 }
1246
1247 #[test]
1248 fn test_handle_command_refresh() {
1249 let mut ui = create_test_ui();
1250
1251 std::thread::sleep(Duration::from_millis(1));
1253
1254 assert_eq!(ui.handle_command("refresh").unwrap(), UIAction::Continue);
1255 }
1257
1258 #[test]
1259 fn test_handle_command_unknown() {
1260 let mut ui = create_test_ui();
1261 let initial_mode = ui.display_mode.clone();
1262
1263 assert_eq!(ui.handle_command("xyz").unwrap(), UIAction::Continue);
1264 assert_eq!(ui.display_mode, initial_mode); }
1266
1267 #[test]
1270 fn test_refresh_timing() {
1271 let ui = create_test_ui();
1272
1273 assert_eq!(ui.refresh_interval, Duration::from_millis(500));
1275
1276 let now = Instant::now();
1278 assert!(now.duration_since(ui.last_refresh) < Duration::from_secs(1));
1279 }
1280
1281 #[test]
1282 fn test_auto_refresh_config() {
1283 let mut config = create_test_ui_config();
1284 config.auto_refresh = true;
1285 config.refresh_interval_ms = 2000;
1286
1287 let ui = create_test_ui_with_config(config);
1288 assert!(ui.config.auto_refresh);
1289 assert_eq!(ui.refresh_interval, Duration::from_millis(2000));
1290 }
1291
1292 #[test]
1293 fn test_terminal_size_initialization() {
1294 let ui = create_test_ui();
1295 assert!(ui.terminal_size.0 > 0);
1297 assert!(ui.terminal_size.1 > 0);
1298 }
1299
1300 #[test]
1303 fn test_format_bytes() {
1304 let ui = create_test_ui();
1305
1306 assert_eq!(ui.format_bytes(512), "512B");
1307 assert_eq!(ui.format_bytes(1536), "1.50KB"); assert_eq!(ui.format_bytes(1024 * 1024), "1.00MB");
1309 assert_eq!(ui.format_bytes(1024 * 1024 * 1024), "1.00GB");
1310 }
1311
1312 #[test]
1313 fn test_format_timestamp() {
1314 let ui = create_test_ui();
1315 let timestamp = std::time::SystemTime::now();
1316 let formatted = ui.format_timestamp(timestamp);
1317
1318 assert!(!formatted.is_empty());
1320 assert!(formatted.parse::<u64>().is_ok());
1322 }
1323
1324 #[test]
1325 fn test_sample_data_creation() {
1326 let ui = create_test_ui();
1327 let stages = ui.get_sample_stages();
1328
1329 assert_eq!(stages.len(), 3);
1330 assert_eq!(stages[0].stage_id, "load_data");
1331 assert_eq!(stages[1].stage_id, "filter_age");
1332 assert_eq!(stages[2].stage_id, "group_by_city");
1333 }
1334
1335 #[test]
1336 fn test_sample_stage_detail() {
1337 let ui = create_test_ui();
1338 let stage = ui.get_sample_stage("any_id");
1339
1340 assert_eq!(stage.stage_id, "load_data");
1341 assert_eq!(stage.stage_name, "Load CSV Data");
1342 assert!(stage.execution_time.is_some());
1343 assert!(stage.memory_usage.is_some());
1344 assert!(stage.rows_processed.is_some());
1345 }
1346
1347 #[test]
1348 fn test_sample_materialized_data_creation() {
1349 let ui = create_test_ui();
1350 let frame = ui.get_sample_materialized_data("test_id");
1351
1352 assert_eq!(frame.stage_id, "load_data");
1353 assert_eq!(frame.schema.columns.len(), 3);
1354 assert_eq!(frame.sample_data.len(), 3);
1355 assert_eq!(frame.total_rows, 10000);
1356 }
1357
1358 #[test]
1361 fn test_colors_enabled() {
1362 let config_with_colors = UIConfig { enable_colors: true, ..create_test_ui_config() };
1363 let ui_with_colors = create_test_ui_with_config(config_with_colors);
1364 assert!(ui_with_colors.colors_enabled);
1365
1366 let config_without_colors = UIConfig { enable_colors: false, ..create_test_ui_config() };
1367 let ui_without_colors = create_test_ui_with_config(config_without_colors);
1368 assert!(!ui_without_colors.colors_enabled);
1369 }
1370
1371 #[test]
1372 fn test_color_configuration() {
1373 let mut ui = create_test_ui();
1375 assert!(!ui.colors_enabled); ui.colors_enabled = true;
1379 assert!(ui.colors_enabled);
1380 }
1381
1382 #[test]
1383 fn test_color_impact_on_rendering() {
1384 let mut ui = create_test_ui();
1385
1386 ui.colors_enabled = false;
1388 let result = ui.render_overview();
1389 assert!(result.is_ok());
1390
1391 ui.colors_enabled = true;
1393 let result = ui.render_overview();
1394 assert!(result.is_ok());
1395 }
1396
1397 #[test]
1400 fn test_ui_configuration_settings() {
1401 let config = UIConfig {
1402 max_preview_rows: 10,
1403 max_history_events: 50,
1404 auto_refresh: false,
1405 refresh_interval_ms: 2000,
1406 show_metrics: false,
1407 enable_colors: true,
1408 compact_mode: true,
1409 };
1410
1411 let ui = create_test_ui_with_config(config.clone());
1412 assert_eq!(ui.config.max_preview_rows, 10);
1413 assert_eq!(ui.config.max_history_events, 50);
1414 assert!(!ui.config.auto_refresh);
1415 assert_eq!(ui.config.refresh_interval_ms, 2000);
1416 assert!(!ui.config.show_metrics);
1417 assert!(ui.config.enable_colors);
1418 assert!(ui.config.compact_mode);
1419 }
1420
1421 #[test]
1422 fn test_display_mode_variations() {
1423 let modes = vec![
1425 DisplayMode::Overview,
1426 DisplayMode::StageDetail("test".to_string()),
1427 DisplayMode::Breakpoints,
1428 DisplayMode::DataViewer("data_test".to_string()),
1429 DisplayMode::Metrics,
1430 DisplayMode::History,
1431 DisplayMode::Diff("a".to_string(), "b".to_string()),
1432 DisplayMode::Help,
1433 ];
1434
1435 for mode in modes {
1436 assert_ne!(format!("{:?}", mode), "");
1438 }
1439 }
1440
1441 #[test]
1444 fn test_refresh_display_all_modes() {
1445 let mut ui = create_test_ui();
1446
1447 ui.display_mode = DisplayMode::Overview;
1449 assert!(ui.refresh_display().is_ok());
1450
1451 ui.display_mode = DisplayMode::Metrics;
1452 assert!(ui.refresh_display().is_ok());
1453
1454 ui.display_mode = DisplayMode::Help;
1455 assert!(ui.refresh_display().is_ok());
1456
1457 ui.display_mode = DisplayMode::History;
1458 assert!(ui.refresh_display().is_ok());
1459 }
1460
1461 #[test]
1462 fn test_interactive_command_sequence() {
1463 let mut ui = create_test_ui();
1464
1465 assert_eq!(ui.handle_command("metrics").unwrap(), UIAction::Continue);
1467 assert_eq!(ui.display_mode, DisplayMode::Metrics);
1468
1469 assert_eq!(ui.handle_command("overview").unwrap(), UIAction::Continue);
1470 assert_eq!(ui.display_mode, DisplayMode::Overview);
1471
1472 assert_eq!(ui.handle_command("breakpoints").unwrap(), UIAction::Continue);
1473 assert_eq!(ui.display_mode, DisplayMode::Breakpoints);
1474
1475 assert_eq!(ui.handle_command("quit").unwrap(), UIAction::Exit);
1476 }
1477
1478 #[test]
1479 fn test_config_variations() {
1480 let compact_config = UIConfig {
1481 max_preview_rows: 2,
1482 compact_mode: true,
1483 enable_colors: false,
1484 auto_refresh: false,
1485 ..Default::default()
1486 };
1487
1488 let ui = create_test_ui_with_config(compact_config);
1489
1490 assert_eq!(ui.config.max_preview_rows, 2);
1492 assert!(ui.config.compact_mode);
1493 assert!(!ui.colors_enabled);
1494 assert!(!ui.config.auto_refresh);
1495 }
1496
1497 #[test]
1498 fn test_error_handling_graceful() {
1499 let ui = create_test_ui();
1500
1501 assert!(ui.render_stage_detail("nonexistent_stage").is_ok());
1503 assert!(ui.render_data_viewer("missing_data").is_ok());
1504 assert!(ui.render_diff("stage1", "stage2").is_ok());
1505
1506 }
1508}