1use anyhow::Result;
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, VecDeque};
9use std::fmt;
10use std::sync::{Arc, Mutex};
11use std::time::{Duration, Instant};
12
13pub struct DataflowDebugger {
15 #[allow(dead_code)] pipeline_stages: Arc<Mutex<Vec<PipelineStage>>>,
18
19 breakpoints: Arc<Mutex<HashMap<String, Breakpoint>>>,
21
22 materialized_data: Arc<Mutex<HashMap<String, MaterializedFrame>>>,
24
25 config: DataflowConfig,
27
28 execution_history: Arc<Mutex<VecDeque<ExecutionEvent>>>,
30
31 stage_metrics: Arc<Mutex<HashMap<String, StageMetrics>>>,
33
34 session_state: Arc<Mutex<SessionState>>,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct DataflowConfig {
41 pub max_rows_per_stage: usize,
43
44 pub auto_materialize: bool,
46
47 pub max_history_events: usize,
49
50 pub enable_profiling: bool,
52
53 pub stage_timeout_ms: u64,
55
56 pub track_memory: bool,
58
59 pub compute_diffs: bool,
61
62 pub sample_rate: f64,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct PipelineStage {
69 pub stage_id: String,
71
72 pub stage_name: String,
74
75 pub stage_type: StageType,
77
78 pub status: StageStatus,
80
81 pub input_schema: Option<DataSchema>,
83
84 pub output_schema: Option<DataSchema>,
86
87 pub execution_time: Option<Duration>,
89
90 pub memory_usage: Option<usize>,
92
93 pub rows_processed: Option<usize>,
95
96 pub metadata: HashMap<String, String>,
98}
99
100#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
102pub enum StageType {
103 Load,
105 Filter,
107 Select,
109 Map,
111 GroupBy,
113 Aggregate,
115 Join,
117 Sort,
119 Window,
121 Union,
123 Custom(String),
125}
126
127#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
129pub enum StageStatus {
130 Pending,
132 Running,
134 Completed,
136 Failed(String),
138 Cancelled,
140 Paused,
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct Breakpoint {
147 pub stage_id: String,
149
150 pub condition: Option<BreakpointCondition>,
152
153 pub active: bool,
155
156 pub hit_count: usize,
158
159 pub actions: Vec<BreakpointAction>,
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
165pub enum BreakpointCondition {
166 Always,
168 RowCount { operator: ComparisonOp, value: usize },
170 ExecutionTime { threshold_ms: u64 },
172 MemoryUsage { threshold_mb: usize },
174 DataValue { column: String, value: DataValue },
176 OnError,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
182pub enum ComparisonOp {
183 Equal,
184 NotEqual,
185 GreaterThan,
186 GreaterThanOrEqual,
187 LessThan,
188 LessThanOrEqual,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
193pub enum BreakpointAction {
194 Pause,
196 Print(String),
198 Materialize,
200 ComputeDiff,
202 Export { format: ExportFormat, path: String },
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct MaterializedFrame {
209 pub stage_id: String,
211
212 pub schema: DataSchema,
214
215 pub sample_data: Vec<DataRow>,
217
218 pub total_rows: usize,
220
221 pub timestamp: std::time::SystemTime,
223
224 pub memory_size: usize,
226}
227
228#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct DataSchema {
231 pub columns: Vec<ColumnDef>,
233
234 pub schema_hash: u64,
236}
237
238#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct ColumnDef {
241 pub name: String,
243
244 pub data_type: DataType,
246
247 pub nullable: bool,
249}
250
251#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
253pub enum DataType {
254 Boolean,
255 Integer,
256 Float,
257 String,
258 DateTime,
259 Array(Box<DataType>),
260 Struct(Vec<(String, DataType)>),
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize)]
265pub struct DataRow {
266 pub values: Vec<DataValue>,
268}
269
270#[derive(Debug, Clone, Serialize, Deserialize)]
272pub enum DataValue {
273 Boolean(bool),
274 Integer(i64),
275 Float(f64),
276 String(String),
277 Null,
278 Array(Vec<DataValue>),
279}
280
281#[derive(Debug, Clone, Serialize, Deserialize)]
283pub struct StageMetrics {
284 pub execution_time: Duration,
286
287 pub peak_memory: usize,
289
290 pub input_rows: usize,
292
293 pub output_rows: usize,
295
296 pub cpu_time: Duration,
298
299 pub io_operations: usize,
301
302 pub cache_hit_ratio: Option<f64>,
304}
305
306#[derive(Debug, Clone, Serialize, Deserialize)]
308pub struct ExecutionEvent {
309 pub timestamp: std::time::SystemTime,
311
312 pub event_type: EventType,
314
315 pub stage_id: String,
317
318 pub data: HashMap<String, String>,
320}
321
322#[derive(Debug, Clone, Serialize, Deserialize)]
324pub enum EventType {
325 StageStarted,
326 StageCompleted,
327 StageFailed,
328 BreakpointHit,
329 DataMaterialized,
330 DiffComputed,
331 PerformanceAlert,
332}
333
334#[derive(Debug, Clone, Serialize, Deserialize)]
336pub struct SessionState {
337 pub active: bool,
339
340 pub current_stage: Option<String>,
342
343 pub session_start: std::time::SystemTime,
345
346 pub total_execution_time: Duration,
348
349 pub breakpoints_hit: usize,
351
352 pub metadata: HashMap<String, String>,
354}
355
356#[derive(Debug, Clone, Serialize, Deserialize)]
358pub enum ExportFormat {
359 Csv,
361 Json,
363 Parquet,
365 Debug,
367}
368
369impl Default for DataflowConfig {
370 fn default() -> Self {
371 Self {
372 max_rows_per_stage: 1000,
373 auto_materialize: false,
374 max_history_events: 10000,
375 enable_profiling: true,
376 stage_timeout_ms: 30000, track_memory: true,
378 compute_diffs: false,
379 sample_rate: 1.0, }
381 }
382}
383
384impl DataflowDebugger {
385 pub fn new(config: DataflowConfig) -> Self {
387 Self {
388 pipeline_stages: Arc::new(Mutex::new(Vec::new())),
389 breakpoints: Arc::new(Mutex::new(HashMap::new())),
390 materialized_data: Arc::new(Mutex::new(HashMap::new())),
391 config,
392 execution_history: Arc::new(Mutex::new(VecDeque::new())),
393 stage_metrics: Arc::new(Mutex::new(HashMap::new())),
394 session_state: Arc::new(Mutex::new(SessionState::default())),
395 }
396 }
397
398 pub fn start_session(&self) -> Result<()> {
400 let mut state = self.session_state
401 .lock()
402 .map_err(|_| anyhow::anyhow!("Failed to acquire session state lock"))?;
403
404 state.active = true;
405 state.session_start = std::time::SystemTime::now();
406 state.total_execution_time = Duration::from_secs(0);
407 state.breakpoints_hit = 0;
408 state.current_stage = None;
409
410 self.record_event(EventType::StageStarted, "session".to_string(), HashMap::new())?;
411 Ok(())
412 }
413
414 pub fn add_breakpoint(&self, breakpoint: Breakpoint) -> Result<()> {
416 let mut breakpoints = self.breakpoints
417 .lock()
418 .map_err(|_| anyhow::anyhow!("Failed to acquire breakpoints lock"))?;
419
420 breakpoints.insert(breakpoint.stage_id.clone(), breakpoint);
421 Ok(())
422 }
423
424 pub fn remove_breakpoint(&self, stage_id: &str) -> Result<bool> {
426 let mut breakpoints = self.breakpoints
427 .lock()
428 .map_err(|_| anyhow::anyhow!("Failed to acquire breakpoints lock"))?;
429
430 Ok(breakpoints.remove(stage_id).is_some())
431 }
432
433 pub fn execute_stage(&self, pipeline_stage: &mut PipelineStage) -> Result<StageExecutionResult> {
435 let start_time = Instant::now();
436 pipeline_stage.status = StageStatus::Running;
437
438 {
440 let mut state = self.session_state
441 .lock()
442 .map_err(|_| anyhow::anyhow!("Failed to acquire session state lock"))?;
443 state.current_stage = Some(pipeline_stage.stage_id.clone());
444 }
445
446 if let Some(breakpoint) = self.check_breakpoint(&pipeline_stage.stage_id)? {
448 if self.should_break(pipeline_stage, &breakpoint)? {
449 pipeline_stage.status = StageStatus::Paused;
450 self.handle_breakpoint_hit(&pipeline_stage.stage_id, &breakpoint)?;
451 return Ok(StageExecutionResult::Paused);
452 }
453 }
454
455 std::thread::sleep(Duration::from_millis(10)); let execution_time = start_time.elapsed();
460 pipeline_stage.execution_time = Some(execution_time);
461 pipeline_stage.status = StageStatus::Completed;
462
463 let metrics = StageMetrics {
465 execution_time,
466 peak_memory: 1024 * 1024, input_rows: pipeline_stage.rows_processed.unwrap_or(0),
468 output_rows: pipeline_stage.rows_processed.unwrap_or(0),
469 cpu_time: execution_time,
470 io_operations: 1,
471 cache_hit_ratio: Some(0.85),
472 };
473
474 let mut stage_metrics = self.stage_metrics
475 .lock()
476 .map_err(|_| anyhow::anyhow!("Failed to acquire stage metrics lock"))?;
477 stage_metrics.insert(pipeline_stage.stage_id.clone(), metrics);
478
479 if self.config.auto_materialize {
481 self.materialize_stage(&pipeline_stage.stage_id)?;
482 }
483
484 self.record_event(
485 EventType::StageCompleted,
486 pipeline_stage.stage_id.clone(),
487 HashMap::from([("duration_ms".to_string(), execution_time.as_millis().to_string())])
488 )?;
489
490 Ok(StageExecutionResult::Completed)
491 }
492
493 pub fn materialize_stage(&self, stage_id: &str) -> Result<MaterializedFrame> {
495 let materialized = MaterializedFrame {
497 stage_id: stage_id.to_string(),
498 schema: DataSchema {
499 columns: vec![
500 ColumnDef {
501 name: "id".to_string(),
502 data_type: DataType::Integer,
503 nullable: false,
504 },
505 ColumnDef {
506 name: "name".to_string(),
507 data_type: DataType::String,
508 nullable: true,
509 },
510 ],
511 schema_hash: 12345,
512 },
513 sample_data: vec![
514 DataRow {
515 values: vec![DataValue::Integer(1), DataValue::String("Alice".to_string())],
516 },
517 DataRow {
518 values: vec![DataValue::Integer(2), DataValue::String("Bob".to_string())],
519 },
520 ],
521 total_rows: 1000,
522 timestamp: std::time::SystemTime::now(),
523 memory_size: 1024 * 50, };
525
526 let mut materialized_data = self.materialized_data
527 .lock()
528 .map_err(|_| anyhow::anyhow!("Failed to acquire materialized data lock"))?;
529
530 materialized_data.insert(stage_id.to_string(), materialized.clone());
531
532 self.record_event(
533 EventType::DataMaterialized,
534 stage_id.to_string(),
535 HashMap::from([("rows".to_string(), materialized.total_rows.to_string())])
536 )?;
537
538 Ok(materialized)
539 }
540
541 pub fn compute_stage_diff(&self, stage1_id: &str, stage2_id: &str) -> Result<StageDiff> {
543 let materialized_data = self.materialized_data
544 .lock()
545 .map_err(|_| anyhow::anyhow!("Failed to acquire materialized data lock"))?;
546
547 let stage1_data = materialized_data.get(stage1_id)
548 .ok_or_else(|| anyhow::anyhow!("Stage {} not materialized", stage1_id))?;
549 let stage2_data = materialized_data.get(stage2_id)
550 .ok_or_else(|| anyhow::anyhow!("Stage {} not materialized", stage2_id))?;
551
552 let row_count_diff = stage2_data.total_rows as i64 - stage1_data.total_rows as i64;
554 let schema_changed = stage1_data.schema.schema_hash != stage2_data.schema.schema_hash;
555
556 let diff = StageDiff {
557 stage1_id: stage1_id.to_string(),
558 stage2_id: stage2_id.to_string(),
559 row_count_diff,
560 schema_changed,
561 column_changes: self.compute_column_changes(&stage1_data.schema, &stage2_data.schema),
562 data_changes: self.compute_data_changes(&stage1_data.sample_data, &stage2_data.sample_data),
563 };
564
565 self.record_event(
566 EventType::DiffComputed,
567 format!("{stage1_id}:{stage2_id}"),
568 HashMap::from([("row_diff".to_string(), row_count_diff.to_string())])
569 )?;
570
571 Ok(diff)
572 }
573
574 pub fn get_session_status(&self) -> Result<SessionState> {
576 let state = self.session_state
577 .lock()
578 .map_err(|_| anyhow::anyhow!("Failed to acquire session state lock"))?;
579
580 Ok(state.clone())
581 }
582
583 pub fn get_execution_history(&self) -> Result<Vec<ExecutionEvent>> {
585 let history = self.execution_history
586 .lock()
587 .map_err(|_| anyhow::anyhow!("Failed to acquire execution history lock"))?;
588
589 Ok(history.iter().cloned().collect())
590 }
591
592 pub fn get_stage_metrics(&self) -> Result<HashMap<String, StageMetrics>> {
594 let metrics = self.stage_metrics
595 .lock()
596 .map_err(|_| anyhow::anyhow!("Failed to acquire stage metrics lock"))?;
597
598 Ok(metrics.clone())
599 }
600
601 pub fn export_debug_data(&self, format: ExportFormat, output_path: &str) -> Result<()> {
603 let history = self.get_execution_history()?;
604 let metrics = self.get_stage_metrics()?;
605 let session_status = self.get_session_status()?;
606
607 let debug_data = DebugExport {
608 session_status,
609 execution_history: history,
610 stage_metrics: metrics,
611 materialized_data: {
612 let data = self.materialized_data
613 .lock()
614 .map_err(|_| anyhow::anyhow!("Failed to acquire materialized data lock"))?;
615 data.clone()
616 },
617 };
618
619 match format {
620 ExportFormat::Json => {
621 let json_data = serde_json::to_string_pretty(&debug_data)?;
622 std::fs::write(output_path, json_data)?;
623 }
624 ExportFormat::Debug => {
625 let debug_str = format!("{debug_data:#?}");
626 std::fs::write(output_path, debug_str)?;
627 }
628 _ => {
629 return Err(anyhow::anyhow!("Export format {:?} not yet implemented", format));
630 }
631 }
632
633 Ok(())
634 }
635
636 fn check_breakpoint(&self, stage_id: &str) -> Result<Option<Breakpoint>> {
639 let breakpoints = self.breakpoints
640 .lock()
641 .map_err(|_| anyhow::anyhow!("Failed to acquire breakpoints lock"))?;
642
643 Ok(breakpoints.get(stage_id).cloned())
644 }
645
646 fn should_break(&self, _stage: &PipelineStage, breakpoint: &Breakpoint) -> Result<bool> {
647 if !breakpoint.active {
648 return Ok(false);
649 }
650
651 match &breakpoint.condition {
652 None | Some(BreakpointCondition::Always) => Ok(true),
653 Some(BreakpointCondition::RowCount { operator: _, value: _ }) => {
654 Ok(true)
656 }
657 Some(BreakpointCondition::ExecutionTime { threshold_ms: _ }) => {
658 Ok(false)
660 }
661 Some(BreakpointCondition::MemoryUsage { threshold_mb: _ }) => {
662 Ok(false)
664 }
665 Some(BreakpointCondition::DataValue { column: _, value: _ }) => {
666 Ok(false)
668 }
669 Some(BreakpointCondition::OnError) => Ok(false),
670 }
671 }
672
673 fn handle_breakpoint_hit(&self, stage_id: &str, breakpoint: &Breakpoint) -> Result<()> {
674 let mut breakpoints = self.breakpoints
676 .lock()
677 .map_err(|_| anyhow::anyhow!("Failed to acquire breakpoints lock"))?;
678
679 if let Some(bp) = breakpoints.get_mut(stage_id) {
680 bp.hit_count += 1;
681 }
682
683 {
685 let mut state = self.session_state
686 .lock()
687 .map_err(|_| anyhow::anyhow!("Failed to acquire session state lock"))?;
688 state.breakpoints_hit += 1;
689 }
690
691 for action in &breakpoint.actions {
693 match action {
694 BreakpointAction::Pause => {
695 }
697 BreakpointAction::Print(message) => {
698 println!("Breakpoint: {message}");
699 }
700 BreakpointAction::Materialize => {
701 self.materialize_stage(stage_id)?;
702 }
703 BreakpointAction::ComputeDiff => {
704 }
706 BreakpointAction::Export { format: _, path: _ } => {
707 }
709 }
710 }
711
712 self.record_event(
713 EventType::BreakpointHit,
714 stage_id.to_string(),
715 HashMap::from([("hit_count".to_string(), breakpoint.hit_count.to_string())])
716 )?;
717
718 Ok(())
719 }
720
721 fn record_event(&self, event_type: EventType, stage_id: String, data: HashMap<String, String>) -> Result<()> {
722 let event = ExecutionEvent {
723 timestamp: std::time::SystemTime::now(),
724 event_type,
725 stage_id,
726 data,
727 };
728
729 let mut history = self.execution_history
730 .lock()
731 .map_err(|_| anyhow::anyhow!("Failed to acquire execution history lock"))?;
732
733 history.push_back(event);
734
735 while history.len() > self.config.max_history_events {
737 history.pop_front();
738 }
739
740 Ok(())
741 }
742
743 fn compute_column_changes(&self, schema1: &DataSchema, schema2: &DataSchema) -> Vec<ColumnChange> {
744 let mut changes = Vec::new();
745
746 let cols1: Vec<&str> = schema1.columns.iter().map(|c| c.name.as_str()).collect();
748 let cols2: Vec<&str> = schema2.columns.iter().map(|c| c.name.as_str()).collect();
749
750 for col in &cols2 {
751 if !cols1.contains(col) {
752 changes.push(ColumnChange::Added((*col).to_string()));
753 }
754 }
755
756 for col in &cols1 {
757 if !cols2.contains(col) {
758 changes.push(ColumnChange::Removed((*col).to_string()));
759 }
760 }
761
762 changes
763 }
764
765 fn compute_data_changes(&self, _data1: &[DataRow], _data2: &[DataRow]) -> Vec<DataChange> {
766 vec![DataChange::RowCountChanged]
768 }
769}
770
771#[derive(Debug, Clone, PartialEq, Eq)]
773pub enum StageExecutionResult {
774 Completed,
776 Paused,
778 Failed(String),
780}
781
782#[derive(Debug, Clone, Serialize, Deserialize)]
784pub struct StageDiff {
785 pub stage1_id: String,
787
788 pub stage2_id: String,
790
791 pub row_count_diff: i64,
793
794 pub schema_changed: bool,
796
797 pub column_changes: Vec<ColumnChange>,
799
800 pub data_changes: Vec<DataChange>,
802}
803
804#[derive(Debug, Clone, Serialize, Deserialize)]
806pub enum ColumnChange {
807 Added(String),
809 Removed(String),
811 TypeChanged(String, DataType, DataType),
813 Renamed(String, String),
815}
816
817#[derive(Debug, Clone, Serialize, Deserialize)]
819pub enum DataChange {
820 RowCountChanged,
822 RowsAdded(Vec<usize>),
824 RowsRemoved(Vec<usize>),
826 ValuesModified(Vec<(usize, usize)>), }
829
830#[derive(Debug, Clone, Serialize, Deserialize)]
832pub struct DebugExport {
833 pub session_status: SessionState,
835
836 pub execution_history: Vec<ExecutionEvent>,
838
839 pub stage_metrics: HashMap<String, StageMetrics>,
841
842 pub materialized_data: HashMap<String, MaterializedFrame>,
844}
845
846impl Default for SessionState {
847 fn default() -> Self {
848 Self {
849 active: false,
850 current_stage: None,
851 session_start: std::time::SystemTime::now(),
852 total_execution_time: Duration::from_secs(0),
853 breakpoints_hit: 0,
854 metadata: HashMap::new(),
855 }
856 }
857}
858
859impl fmt::Display for StageType {
860 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
861 match self {
862 Self::Load => write!(f, "Load"),
863 Self::Filter => write!(f, "Filter"),
864 Self::Select => write!(f, "Select"),
865 Self::Map => write!(f, "Map"),
866 Self::GroupBy => write!(f, "GroupBy"),
867 Self::Aggregate => write!(f, "Aggregate"),
868 Self::Join => write!(f, "Join"),
869 Self::Sort => write!(f, "Sort"),
870 Self::Window => write!(f, "Window"),
871 Self::Union => write!(f, "Union"),
872 Self::Custom(name) => write!(f, "Custom({name})"),
873 }
874 }
875}
876
877impl fmt::Display for StageStatus {
878 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
879 match self {
880 Self::Pending => write!(f, "Pending"),
881 Self::Running => write!(f, "Running"),
882 Self::Completed => write!(f, "Completed"),
883 Self::Failed(err) => write!(f, "Failed: {err}"),
884 Self::Cancelled => write!(f, "Cancelled"),
885 Self::Paused => write!(f, "Paused"),
886 }
887 }
888}
889
890impl fmt::Display for DataType {
891 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
892 match self {
893 Self::Boolean => write!(f, "Boolean"),
894 Self::Integer => write!(f, "Integer"),
895 Self::Float => write!(f, "Float"),
896 Self::String => write!(f, "String"),
897 Self::DateTime => write!(f, "DateTime"),
898 Self::Array(inner) => write!(f, "Array<{inner}>"),
899 Self::Struct(fields) => {
900 write!(f, "Struct{{")?;
901 for (i, (name, dtype)) in fields.iter().enumerate() {
902 if i > 0 { write!(f, ", ")?; }
903 write!(f, "{name}: {dtype}")?;
904 }
905 write!(f, "}}")
906 }
907 }
908 }
909}
910
911impl fmt::Display for DataValue {
912 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
913 match self {
914 Self::Boolean(b) => write!(f, "{b}"),
915 Self::Integer(i) => write!(f, "{i}"),
916 Self::Float(fl) => write!(f, "{fl}"),
917 Self::String(s) => write!(f, "\"{s}\""),
918 Self::Null => write!(f, "null"),
919 Self::Array(values) => {
920 write!(f, "[")?;
921 for (i, value) in values.iter().enumerate() {
922 if i > 0 { write!(f, ", ")?; }
923 write!(f, "{value}")?;
924 }
925 write!(f, "]")
926 }
927 }
928 }
929}