ruchy/runtime/
dataflow_debugger.rs

1//! Dataflow debugger for `DataFrame` pipeline debugging (RUCHY-0818)
2//!
3//! Provides comprehensive debugging capabilities for `DataFrame` operations,
4//! including breakpoints, materialization on demand, and stage-by-stage analysis.
5
6use anyhow::Result;
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, VecDeque};
9use std::fmt;
10use std::sync::{Arc, Mutex};
11use std::time::{Duration, Instant};
12
13/// Dataflow debugger for `DataFrame` pipeline analysis
14pub struct DataflowDebugger {
15    /// Pipeline execution stages
16    #[allow(dead_code)] // Future feature for pipeline management
17    pipeline_stages: Arc<Mutex<Vec<PipelineStage>>>,
18    
19    /// Active breakpoints by stage name
20    breakpoints: Arc<Mutex<HashMap<String, Breakpoint>>>,
21    
22    /// Materialized data at breakpoints
23    materialized_data: Arc<Mutex<HashMap<String, MaterializedFrame>>>,
24    
25    /// Debugger configuration
26    config: DataflowConfig,
27    
28    /// Execution history for analysis
29    execution_history: Arc<Mutex<VecDeque<ExecutionEvent>>>,
30    
31    /// Performance metrics per stage
32    stage_metrics: Arc<Mutex<HashMap<String, StageMetrics>>>,
33    
34    /// Current debugging session state
35    session_state: Arc<Mutex<SessionState>>,
36}
37
38/// Configuration for the dataflow debugger
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct DataflowConfig {
41    /// Maximum number of rows to materialize at each stage
42    pub max_rows_per_stage: usize,
43    
44    /// Enable automatic materialization at each stage
45    pub auto_materialize: bool,
46    
47    /// Maximum execution history events to keep
48    pub max_history_events: usize,
49    
50    /// Enable performance profiling
51    pub enable_profiling: bool,
52    
53    /// Timeout for stage execution (in milliseconds)
54    pub stage_timeout_ms: u64,
55    
56    /// Enable detailed memory tracking
57    pub track_memory: bool,
58    
59    /// Enable diff computation between stages
60    pub compute_diffs: bool,
61    
62    /// Sample rate for large datasets (0.0-1.0)
63    pub sample_rate: f64,
64}
65
66/// Individual stage in the `DataFrame` pipeline
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct PipelineStage {
69    /// Unique identifier for the stage
70    pub stage_id: String,
71    
72    /// Human-readable name
73    pub stage_name: String,
74    
75    /// Stage type (filter, map, `group_by`, etc.)
76    pub stage_type: StageType,
77    
78    /// Stage execution status
79    pub status: StageStatus,
80    
81    /// Input `DataFrame` schema
82    pub input_schema: Option<DataSchema>,
83    
84    /// Output `DataFrame` schema  
85    pub output_schema: Option<DataSchema>,
86    
87    /// Stage execution time
88    pub execution_time: Option<Duration>,
89    
90    /// Memory usage for this stage
91    pub memory_usage: Option<usize>,
92    
93    /// Number of rows processed
94    pub rows_processed: Option<usize>,
95    
96    /// Stage-specific metadata
97    pub metadata: HashMap<String, String>,
98}
99
100/// Types of `DataFrame` operations
101#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
102pub enum StageType {
103    /// Data loading stage
104    Load,
105    /// Filtering operations
106    Filter,
107    /// Column selection/projection
108    Select,
109    /// Column transformations
110    Map,
111    /// Grouping operations
112    GroupBy,
113    /// Aggregation operations
114    Aggregate,
115    /// Join operations
116    Join,
117    /// Sorting operations  
118    Sort,
119    /// Window functions
120    Window,
121    /// Union operations
122    Union,
123    /// Custom user-defined operations
124    Custom(String),
125}
126
127/// Execution status of a pipeline stage
128#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
129pub enum StageStatus {
130    /// Stage not yet executed
131    Pending,
132    /// Stage currently executing
133    Running,
134    /// Stage completed successfully
135    Completed,
136    /// Stage failed with error
137    Failed(String),
138    /// Stage execution was cancelled
139    Cancelled,
140    /// Stage paused at breakpoint
141    Paused,
142}
143
144/// Breakpoint configuration for pipeline debugging
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct Breakpoint {
147    /// Stage ID where breakpoint is set
148    pub stage_id: String,
149    
150    /// Breakpoint condition (optional)
151    pub condition: Option<BreakpointCondition>,
152    
153    /// Whether breakpoint is currently active
154    pub active: bool,
155    
156    /// Hit count for this breakpoint
157    pub hit_count: usize,
158    
159    /// Actions to perform when breakpoint is hit
160    pub actions: Vec<BreakpointAction>,
161}
162
163/// Conditions for triggering breakpoints
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub enum BreakpointCondition {
166    /// Always break at this stage
167    Always,
168    /// Break if row count meets criteria
169    RowCount { operator: ComparisonOp, value: usize },
170    /// Break if execution time exceeds threshold
171    ExecutionTime { threshold_ms: u64 },
172    /// Break if memory usage exceeds threshold
173    MemoryUsage { threshold_mb: usize },
174    /// Break on specific data values
175    DataValue { column: String, value: DataValue },
176    /// Break on error conditions
177    OnError,
178}
179
180/// Comparison operators for breakpoint conditions
181#[derive(Debug, Clone, Serialize, Deserialize)]
182pub enum ComparisonOp {
183    Equal,
184    NotEqual,
185    GreaterThan,
186    GreaterThanOrEqual,
187    LessThan,
188    LessThanOrEqual,
189}
190
191/// Actions to perform when breakpoint is triggered
192#[derive(Debug, Clone, Serialize, Deserialize)]
193pub enum BreakpointAction {
194    /// Pause execution and wait for user input
195    Pause,
196    /// Print debug information
197    Print(String),
198    /// Materialize current `DataFrame`
199    Materialize,
200    /// Compute diff with previous stage
201    ComputeDiff,
202    /// Export data to file
203    Export { format: ExportFormat, path: String },
204}
205
206/// Materialized `DataFrame` data for inspection
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct MaterializedFrame {
209    /// Stage ID where data was materialized
210    pub stage_id: String,
211    
212    /// `DataFrame` schema
213    pub schema: DataSchema,
214    
215    /// Sample of data rows (limited by config)
216    pub sample_data: Vec<DataRow>,
217    
218    /// Total number of rows in full dataset
219    pub total_rows: usize,
220    
221    /// Materialization timestamp
222    pub timestamp: std::time::SystemTime,
223    
224    /// Memory footprint of materialized data
225    pub memory_size: usize,
226}
227
228/// `DataFrame` schema information
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct DataSchema {
231    /// Column definitions
232    pub columns: Vec<ColumnDef>,
233    
234    /// Schema hash for change detection
235    pub schema_hash: u64,
236}
237
238/// Column definition in `DataFrame` schema
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct ColumnDef {
241    /// Column name
242    pub name: String,
243    
244    /// Data type
245    pub data_type: DataType,
246    
247    /// Whether column allows null values
248    pub nullable: bool,
249}
250
251/// Supported data types in `DataFrames`
252#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
253pub enum DataType {
254    Boolean,
255    Integer,
256    Float,
257    String,
258    DateTime,
259    Array(Box<DataType>),
260    Struct(Vec<(String, DataType)>),
261}
262
263/// Single row of data in materialized `DataFrame`
264#[derive(Debug, Clone, Serialize, Deserialize)]
265pub struct DataRow {
266    /// Values for each column
267    pub values: Vec<DataValue>,
268}
269
270/// Individual data value in a `DataFrame` cell
271#[derive(Debug, Clone, Serialize, Deserialize)]
272pub enum DataValue {
273    Boolean(bool),
274    Integer(i64),
275    Float(f64),
276    String(String),
277    Null,
278    Array(Vec<DataValue>),
279}
280
281/// Performance metrics for a pipeline stage
282#[derive(Debug, Clone, Serialize, Deserialize)]
283pub struct StageMetrics {
284    /// Stage execution time
285    pub execution_time: Duration,
286    
287    /// Memory peak usage during stage
288    pub peak_memory: usize,
289    
290    /// Number of rows input to stage
291    pub input_rows: usize,
292    
293    /// Number of rows output from stage
294    pub output_rows: usize,
295    
296    /// CPU time spent in stage
297    pub cpu_time: Duration,
298    
299    /// I/O operations performed
300    pub io_operations: usize,
301    
302    /// Cache hit ratio (if applicable)
303    pub cache_hit_ratio: Option<f64>,
304}
305
306/// Execution event in the debugging session
307#[derive(Debug, Clone, Serialize, Deserialize)]
308pub struct ExecutionEvent {
309    /// Event timestamp
310    pub timestamp: std::time::SystemTime,
311    
312    /// Event type
313    pub event_type: EventType,
314    
315    /// Stage ID associated with event
316    pub stage_id: String,
317    
318    /// Additional event data
319    pub data: HashMap<String, String>,
320}
321
322/// Types of execution events
323#[derive(Debug, Clone, Serialize, Deserialize)]
324pub enum EventType {
325    StageStarted,
326    StageCompleted,
327    StageFailed,
328    BreakpointHit,
329    DataMaterialized,
330    DiffComputed,
331    PerformanceAlert,
332}
333
334/// Current state of the debugging session
335#[derive(Debug, Clone, Serialize, Deserialize)]
336pub struct SessionState {
337    /// Whether debugger is actively running
338    pub active: bool,
339    
340    /// Current stage being executed or paused at
341    pub current_stage: Option<String>,
342    
343    /// Session start time
344    pub session_start: std::time::SystemTime,
345    
346    /// Total execution time so far
347    pub total_execution_time: Duration,
348    
349    /// Number of breakpoints hit
350    pub breakpoints_hit: usize,
351    
352    /// Session metadata
353    pub metadata: HashMap<String, String>,
354}
355
356/// Export formats for dataflow debugging data
357#[derive(Debug, Clone, Serialize, Deserialize)]
358pub enum ExportFormat {
359    /// Comma-separated values
360    Csv,
361    /// JSON format
362    Json,
363    /// Parquet format
364    Parquet,
365    /// Debug text format
366    Debug,
367}
368
369impl Default for DataflowConfig {
370    fn default() -> Self {
371        Self {
372            max_rows_per_stage: 1000,
373            auto_materialize: false,
374            max_history_events: 10000,
375            enable_profiling: true,
376            stage_timeout_ms: 30000, // 30 seconds
377            track_memory: true,
378            compute_diffs: false,
379            sample_rate: 1.0, // No sampling by default
380        }
381    }
382}
383
384impl DataflowDebugger {
385    /// Create a new dataflow debugger
386    pub fn new(config: DataflowConfig) -> Self {
387        Self {
388            pipeline_stages: Arc::new(Mutex::new(Vec::new())),
389            breakpoints: Arc::new(Mutex::new(HashMap::new())),
390            materialized_data: Arc::new(Mutex::new(HashMap::new())),
391            config,
392            execution_history: Arc::new(Mutex::new(VecDeque::new())),
393            stage_metrics: Arc::new(Mutex::new(HashMap::new())),
394            session_state: Arc::new(Mutex::new(SessionState::default())),
395        }
396    }
397    
398    /// Start a new debugging session
399    pub fn start_session(&self) -> Result<()> {
400        let mut state = self.session_state
401            .lock()
402            .map_err(|_| anyhow::anyhow!("Failed to acquire session state lock"))?;
403        
404        state.active = true;
405        state.session_start = std::time::SystemTime::now();
406        state.total_execution_time = Duration::from_secs(0);
407        state.breakpoints_hit = 0;
408        state.current_stage = None;
409        
410        self.record_event(EventType::StageStarted, "session".to_string(), HashMap::new())?;
411        Ok(())
412    }
413    
414    /// Add a breakpoint to the debugger
415    pub fn add_breakpoint(&self, breakpoint: Breakpoint) -> Result<()> {
416        let mut breakpoints = self.breakpoints
417            .lock()
418            .map_err(|_| anyhow::anyhow!("Failed to acquire breakpoints lock"))?;
419        
420        breakpoints.insert(breakpoint.stage_id.clone(), breakpoint);
421        Ok(())
422    }
423    
424    /// Remove a breakpoint by stage ID
425    pub fn remove_breakpoint(&self, stage_id: &str) -> Result<bool> {
426        let mut breakpoints = self.breakpoints
427            .lock()
428            .map_err(|_| anyhow::anyhow!("Failed to acquire breakpoints lock"))?;
429        
430        Ok(breakpoints.remove(stage_id).is_some())
431    }
432    
433    /// Execute a pipeline stage with debugging support
434    pub fn execute_stage(&self, pipeline_stage: &mut PipelineStage) -> Result<StageExecutionResult> {
435        let start_time = Instant::now();
436        pipeline_stage.status = StageStatus::Running;
437        
438        // Update session state
439        {
440            let mut state = self.session_state
441                .lock()
442                .map_err(|_| anyhow::anyhow!("Failed to acquire session state lock"))?;
443            state.current_stage = Some(pipeline_stage.stage_id.clone());
444        }
445        
446        // Check for breakpoints
447        if let Some(breakpoint) = self.check_breakpoint(&pipeline_stage.stage_id)? {
448            if self.should_break(pipeline_stage, &breakpoint)? {
449                pipeline_stage.status = StageStatus::Paused;
450                self.handle_breakpoint_hit(&pipeline_stage.stage_id, &breakpoint)?;
451                return Ok(StageExecutionResult::Paused);
452            }
453        }
454        
455        // Simulate stage execution (in real implementation, this would execute actual DataFrame operations)
456        std::thread::sleep(Duration::from_millis(10)); // Simulate work
457        
458        // Record execution metrics
459        let execution_time = start_time.elapsed();
460        pipeline_stage.execution_time = Some(execution_time);
461        pipeline_stage.status = StageStatus::Completed;
462        
463        // Store performance metrics
464        let metrics = StageMetrics {
465            execution_time,
466            peak_memory: 1024 * 1024, // 1MB simulated
467            input_rows: pipeline_stage.rows_processed.unwrap_or(0),
468            output_rows: pipeline_stage.rows_processed.unwrap_or(0),
469            cpu_time: execution_time,
470            io_operations: 1,
471            cache_hit_ratio: Some(0.85),
472        };
473        
474        let mut stage_metrics = self.stage_metrics
475            .lock()
476            .map_err(|_| anyhow::anyhow!("Failed to acquire stage metrics lock"))?;
477        stage_metrics.insert(pipeline_stage.stage_id.clone(), metrics);
478        
479        // Auto-materialize if configured
480        if self.config.auto_materialize {
481            self.materialize_stage(&pipeline_stage.stage_id)?;
482        }
483        
484        self.record_event(
485            EventType::StageCompleted,
486            pipeline_stage.stage_id.clone(),
487            HashMap::from([("duration_ms".to_string(), execution_time.as_millis().to_string())])
488        )?;
489        
490        Ok(StageExecutionResult::Completed)
491    }
492    
493    /// Materialize `DataFrame` data at a specific stage
494    pub fn materialize_stage(&self, stage_id: &str) -> Result<MaterializedFrame> {
495        // In a real implementation, this would materialize actual DataFrame data
496        let materialized = MaterializedFrame {
497            stage_id: stage_id.to_string(),
498            schema: DataSchema {
499                columns: vec![
500                    ColumnDef {
501                        name: "id".to_string(),
502                        data_type: DataType::Integer,
503                        nullable: false,
504                    },
505                    ColumnDef {
506                        name: "name".to_string(),
507                        data_type: DataType::String,
508                        nullable: true,
509                    },
510                ],
511                schema_hash: 12345,
512            },
513            sample_data: vec![
514                DataRow {
515                    values: vec![DataValue::Integer(1), DataValue::String("Alice".to_string())],
516                },
517                DataRow {
518                    values: vec![DataValue::Integer(2), DataValue::String("Bob".to_string())],
519                },
520            ],
521            total_rows: 1000,
522            timestamp: std::time::SystemTime::now(),
523            memory_size: 1024 * 50, // 50KB
524        };
525        
526        let mut materialized_data = self.materialized_data
527            .lock()
528            .map_err(|_| anyhow::anyhow!("Failed to acquire materialized data lock"))?;
529        
530        materialized_data.insert(stage_id.to_string(), materialized.clone());
531        
532        self.record_event(
533            EventType::DataMaterialized,
534            stage_id.to_string(),
535            HashMap::from([("rows".to_string(), materialized.total_rows.to_string())])
536        )?;
537        
538        Ok(materialized)
539    }
540    
541    /// Compute diff between two pipeline stages
542    pub fn compute_stage_diff(&self, stage1_id: &str, stage2_id: &str) -> Result<StageDiff> {
543        let materialized_data = self.materialized_data
544            .lock()
545            .map_err(|_| anyhow::anyhow!("Failed to acquire materialized data lock"))?;
546        
547        let stage1_data = materialized_data.get(stage1_id)
548            .ok_or_else(|| anyhow::anyhow!("Stage {} not materialized", stage1_id))?;
549        let stage2_data = materialized_data.get(stage2_id)
550            .ok_or_else(|| anyhow::anyhow!("Stage {} not materialized", stage2_id))?;
551        
552        // Compute basic diff metrics
553        let row_count_diff = stage2_data.total_rows as i64 - stage1_data.total_rows as i64;
554        let schema_changed = stage1_data.schema.schema_hash != stage2_data.schema.schema_hash;
555        
556        let diff = StageDiff {
557            stage1_id: stage1_id.to_string(),
558            stage2_id: stage2_id.to_string(),
559            row_count_diff,
560            schema_changed,
561            column_changes: self.compute_column_changes(&stage1_data.schema, &stage2_data.schema),
562            data_changes: self.compute_data_changes(&stage1_data.sample_data, &stage2_data.sample_data),
563        };
564        
565        self.record_event(
566            EventType::DiffComputed,
567            format!("{stage1_id}:{stage2_id}"),
568            HashMap::from([("row_diff".to_string(), row_count_diff.to_string())])
569        )?;
570        
571        Ok(diff)
572    }
573    
574    /// Get current debugging session status
575    pub fn get_session_status(&self) -> Result<SessionState> {
576        let state = self.session_state
577            .lock()
578            .map_err(|_| anyhow::anyhow!("Failed to acquire session state lock"))?;
579        
580        Ok(state.clone())
581    }
582    
583    /// Get execution history
584    pub fn get_execution_history(&self) -> Result<Vec<ExecutionEvent>> {
585        let history = self.execution_history
586            .lock()
587            .map_err(|_| anyhow::anyhow!("Failed to acquire execution history lock"))?;
588        
589        Ok(history.iter().cloned().collect())
590    }
591    
592    /// Get performance metrics for all stages
593    pub fn get_stage_metrics(&self) -> Result<HashMap<String, StageMetrics>> {
594        let metrics = self.stage_metrics
595            .lock()
596            .map_err(|_| anyhow::anyhow!("Failed to acquire stage metrics lock"))?;
597        
598        Ok(metrics.clone())
599    }
600    
601    /// Export debugging data to various formats
602    pub fn export_debug_data(&self, format: ExportFormat, output_path: &str) -> Result<()> {
603        let history = self.get_execution_history()?;
604        let metrics = self.get_stage_metrics()?;
605        let session_status = self.get_session_status()?;
606        
607        let debug_data = DebugExport {
608            session_status,
609            execution_history: history,
610            stage_metrics: metrics,
611            materialized_data: {
612                let data = self.materialized_data
613                    .lock()
614                    .map_err(|_| anyhow::anyhow!("Failed to acquire materialized data lock"))?;
615                data.clone()
616            },
617        };
618        
619        match format {
620            ExportFormat::Json => {
621                let json_data = serde_json::to_string_pretty(&debug_data)?;
622                std::fs::write(output_path, json_data)?;
623            }
624            ExportFormat::Debug => {
625                let debug_str = format!("{debug_data:#?}");
626                std::fs::write(output_path, debug_str)?;
627            }
628            _ => {
629                return Err(anyhow::anyhow!("Export format {:?} not yet implemented", format));
630            }
631        }
632        
633        Ok(())
634    }
635    
636    // Helper methods
637    
638    fn check_breakpoint(&self, stage_id: &str) -> Result<Option<Breakpoint>> {
639        let breakpoints = self.breakpoints
640            .lock()
641            .map_err(|_| anyhow::anyhow!("Failed to acquire breakpoints lock"))?;
642        
643        Ok(breakpoints.get(stage_id).cloned())
644    }
645    
646    fn should_break(&self, _stage: &PipelineStage, breakpoint: &Breakpoint) -> Result<bool> {
647        if !breakpoint.active {
648            return Ok(false);
649        }
650        
651        match &breakpoint.condition {
652            None | Some(BreakpointCondition::Always) => Ok(true),
653            Some(BreakpointCondition::RowCount { operator: _, value: _ }) => {
654                // In real implementation, would check actual row count
655                Ok(true)
656            }
657            Some(BreakpointCondition::ExecutionTime { threshold_ms: _ }) => {
658                // In real implementation, would check execution time
659                Ok(false)
660            }
661            Some(BreakpointCondition::MemoryUsage { threshold_mb: _ }) => {
662                // In real implementation, would check memory usage
663                Ok(false)
664            }
665            Some(BreakpointCondition::DataValue { column: _, value: _ }) => {
666                // In real implementation, would inspect actual data
667                Ok(false)
668            }
669            Some(BreakpointCondition::OnError) => Ok(false),
670        }
671    }
672    
673    fn handle_breakpoint_hit(&self, stage_id: &str, breakpoint: &Breakpoint) -> Result<()> {
674        // Update breakpoint hit count
675        let mut breakpoints = self.breakpoints
676            .lock()
677            .map_err(|_| anyhow::anyhow!("Failed to acquire breakpoints lock"))?;
678        
679        if let Some(bp) = breakpoints.get_mut(stage_id) {
680            bp.hit_count += 1;
681        }
682        
683        // Update session state
684        {
685            let mut state = self.session_state
686                .lock()
687                .map_err(|_| anyhow::anyhow!("Failed to acquire session state lock"))?;
688            state.breakpoints_hit += 1;
689        }
690        
691        // Execute breakpoint actions
692        for action in &breakpoint.actions {
693            match action {
694                BreakpointAction::Pause => {
695                    // In real implementation, would pause execution and wait for user input
696                }
697                BreakpointAction::Print(message) => {
698                    println!("Breakpoint: {message}");
699                }
700                BreakpointAction::Materialize => {
701                    self.materialize_stage(stage_id)?;
702                }
703                BreakpointAction::ComputeDiff => {
704                    // Would compute diff with previous stage if available
705                }
706                BreakpointAction::Export { format: _, path: _ } => {
707                    // Would export current data
708                }
709            }
710        }
711        
712        self.record_event(
713            EventType::BreakpointHit,
714            stage_id.to_string(),
715            HashMap::from([("hit_count".to_string(), breakpoint.hit_count.to_string())])
716        )?;
717        
718        Ok(())
719    }
720    
721    fn record_event(&self, event_type: EventType, stage_id: String, data: HashMap<String, String>) -> Result<()> {
722        let event = ExecutionEvent {
723            timestamp: std::time::SystemTime::now(),
724            event_type,
725            stage_id,
726            data,
727        };
728        
729        let mut history = self.execution_history
730            .lock()
731            .map_err(|_| anyhow::anyhow!("Failed to acquire execution history lock"))?;
732        
733        history.push_back(event);
734        
735        // Maintain history size limit
736        while history.len() > self.config.max_history_events {
737            history.pop_front();
738        }
739        
740        Ok(())
741    }
742    
743    fn compute_column_changes(&self, schema1: &DataSchema, schema2: &DataSchema) -> Vec<ColumnChange> {
744        let mut changes = Vec::new();
745        
746        // Find added/removed columns (simplified implementation)
747        let cols1: Vec<&str> = schema1.columns.iter().map(|c| c.name.as_str()).collect();
748        let cols2: Vec<&str> = schema2.columns.iter().map(|c| c.name.as_str()).collect();
749        
750        for col in &cols2 {
751            if !cols1.contains(col) {
752                changes.push(ColumnChange::Added((*col).to_string()));
753            }
754        }
755        
756        for col in &cols1 {
757            if !cols2.contains(col) {
758                changes.push(ColumnChange::Removed((*col).to_string()));
759            }
760        }
761        
762        changes
763    }
764    
765    fn compute_data_changes(&self, _data1: &[DataRow], _data2: &[DataRow]) -> Vec<DataChange> {
766        // Simplified implementation - in reality would compute detailed row-level diffs
767        vec![DataChange::RowCountChanged]
768    }
769}
770
771/// Result of executing a pipeline stage
772#[derive(Debug, Clone, PartialEq, Eq)]
773pub enum StageExecutionResult {
774    /// Stage completed successfully
775    Completed,
776    /// Stage paused at breakpoint
777    Paused,
778    /// Stage failed with error
779    Failed(String),
780}
781
782/// Diff between two pipeline stages
783#[derive(Debug, Clone, Serialize, Deserialize)]
784pub struct StageDiff {
785    /// First stage ID
786    pub stage1_id: String,
787    
788    /// Second stage ID  
789    pub stage2_id: String,
790    
791    /// Difference in row count (stage2 - stage1)
792    pub row_count_diff: i64,
793    
794    /// Whether schema changed between stages
795    pub schema_changed: bool,
796    
797    /// Specific column changes
798    pub column_changes: Vec<ColumnChange>,
799    
800    /// Data-level changes
801    pub data_changes: Vec<DataChange>,
802}
803
804/// Types of column changes between stages
805#[derive(Debug, Clone, Serialize, Deserialize)]
806pub enum ColumnChange {
807    /// Column was added
808    Added(String),
809    /// Column was removed
810    Removed(String),
811    /// Column type changed
812    TypeChanged(String, DataType, DataType),
813    /// Column renamed
814    Renamed(String, String),
815}
816
817/// Types of data changes between stages
818#[derive(Debug, Clone, Serialize, Deserialize)]
819pub enum DataChange {
820    /// Row count changed
821    RowCountChanged,
822    /// Specific rows added
823    RowsAdded(Vec<usize>),
824    /// Specific rows removed
825    RowsRemoved(Vec<usize>),
826    /// Cell values modified
827    ValuesModified(Vec<(usize, usize)>), // (row, col) indices
828}
829
830/// Complete debugging data export structure
831#[derive(Debug, Clone, Serialize, Deserialize)]
832pub struct DebugExport {
833    /// Current session status
834    pub session_status: SessionState,
835    
836    /// Execution history
837    pub execution_history: Vec<ExecutionEvent>,
838    
839    /// Performance metrics
840    pub stage_metrics: HashMap<String, StageMetrics>,
841    
842    /// Materialized data
843    pub materialized_data: HashMap<String, MaterializedFrame>,
844}
845
846impl Default for SessionState {
847    fn default() -> Self {
848        Self {
849            active: false,
850            current_stage: None,
851            session_start: std::time::SystemTime::now(),
852            total_execution_time: Duration::from_secs(0),
853            breakpoints_hit: 0,
854            metadata: HashMap::new(),
855        }
856    }
857}
858
859impl fmt::Display for StageType {
860    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
861        match self {
862            Self::Load => write!(f, "Load"),
863            Self::Filter => write!(f, "Filter"),
864            Self::Select => write!(f, "Select"),
865            Self::Map => write!(f, "Map"),
866            Self::GroupBy => write!(f, "GroupBy"),
867            Self::Aggregate => write!(f, "Aggregate"),
868            Self::Join => write!(f, "Join"),
869            Self::Sort => write!(f, "Sort"),
870            Self::Window => write!(f, "Window"),
871            Self::Union => write!(f, "Union"),
872            Self::Custom(name) => write!(f, "Custom({name})"),
873        }
874    }
875}
876
877impl fmt::Display for StageStatus {
878    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
879        match self {
880            Self::Pending => write!(f, "Pending"),
881            Self::Running => write!(f, "Running"),
882            Self::Completed => write!(f, "Completed"),
883            Self::Failed(err) => write!(f, "Failed: {err}"),
884            Self::Cancelled => write!(f, "Cancelled"),
885            Self::Paused => write!(f, "Paused"),
886        }
887    }
888}
889
890impl fmt::Display for DataType {
891    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
892        match self {
893            Self::Boolean => write!(f, "Boolean"),
894            Self::Integer => write!(f, "Integer"),
895            Self::Float => write!(f, "Float"),
896            Self::String => write!(f, "String"),
897            Self::DateTime => write!(f, "DateTime"),
898            Self::Array(inner) => write!(f, "Array<{inner}>"),
899            Self::Struct(fields) => {
900                write!(f, "Struct{{")?;
901                for (i, (name, dtype)) in fields.iter().enumerate() {
902                    if i > 0 { write!(f, ", ")?; }
903                    write!(f, "{name}: {dtype}")?;
904                }
905                write!(f, "}}")
906            }
907        }
908    }
909}
910
911impl fmt::Display for DataValue {
912    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
913        match self {
914            Self::Boolean(b) => write!(f, "{b}"),
915            Self::Integer(i) => write!(f, "{i}"),
916            Self::Float(fl) => write!(f, "{fl}"),
917            Self::String(s) => write!(f, "\"{s}\""),
918            Self::Null => write!(f, "null"),
919            Self::Array(values) => {
920                write!(f, "[")?;
921                for (i, value) in values.iter().enumerate() {
922                    if i > 0 { write!(f, ", ")?; }
923                    write!(f, "{value}")?;
924                }
925                write!(f, "]")
926            }
927        }
928    }
929}