sql_cli/query_plan/
pipeline.rs

1//! Query preprocessing pipeline
2//!
3//! This module provides a structured, extensible pipeline for transforming SQL ASTs
4//! before execution. Transformers are applied in a defined order, with logging and
5//! debugging support.
6
7use crate::sql::parser::ast::SelectStatement;
8use anyhow::Result;
9use std::time::Instant;
10use tracing::{debug, info};
11
12/// Trait for AST transformers that can be added to the preprocessing pipeline
13pub trait ASTTransformer: Send + Sync {
14    /// Name of this transformer (for logging and debugging)
15    fn name(&self) -> &str;
16
17    /// Description of what this transformer does
18    fn description(&self) -> &str {
19        "No description provided"
20    }
21
22    /// Whether this transformer is enabled
23    fn enabled(&self) -> bool {
24        true
25    }
26
27    /// Transform the AST, returning the modified statement
28    ///
29    /// Transformers should be idempotent where possible and should not
30    /// modify the semantic meaning of the query.
31    fn transform(&mut self, stmt: SelectStatement) -> Result<SelectStatement>;
32
33    /// Called before transformation starts (for initialization)
34    fn begin(&mut self) -> Result<()> {
35        Ok(())
36    }
37
38    /// Called after transformation completes (for cleanup)
39    fn end(&mut self) -> Result<()> {
40        Ok(())
41    }
42}
43
44/// Statistics about a single transformation
45#[derive(Debug, Clone)]
46pub struct TransformStats {
47    pub transformer_name: String,
48    pub duration_micros: u64,
49    pub applied: bool,
50    pub modifications: usize,
51}
52
53/// Complete preprocessing statistics
54#[derive(Debug, Clone, Default)]
55pub struct PreprocessingStats {
56    pub transformations: Vec<TransformStats>,
57    pub total_duration_micros: u64,
58    pub transformers_applied: usize,
59}
60
61impl PreprocessingStats {
62    pub fn add_transform(&mut self, stats: TransformStats) {
63        self.total_duration_micros += stats.duration_micros;
64        if stats.applied {
65            self.transformers_applied += 1;
66        }
67        self.transformations.push(stats);
68    }
69
70    pub fn summary(&self) -> String {
71        format!(
72            "{} transformer(s) applied in {:.2}ms",
73            self.transformers_applied,
74            self.total_duration_micros as f64 / 1000.0
75        )
76    }
77
78    /// Returns true if any transformer actually modified the AST
79    pub fn has_modifications(&self) -> bool {
80        self.transformations
81            .iter()
82            .any(|stats| stats.modifications > 0)
83    }
84}
85
86/// Configuration for the preprocessing pipeline
87#[derive(Debug, Clone)]
88pub struct PipelineConfig {
89    /// Whether to enable preprocessing at all
90    pub enabled: bool,
91
92    /// Whether to log each transformation
93    pub verbose_logging: bool,
94
95    /// Whether to collect detailed statistics
96    pub collect_stats: bool,
97
98    /// Whether to show the AST before/after each transformation (Rust Debug format)
99    pub debug_ast_changes: bool,
100
101    /// Whether to show SQL before/after each transformation (formatted SQL)
102    pub show_sql_transformations: bool,
103}
104
105impl Default for PipelineConfig {
106    fn default() -> Self {
107        Self {
108            enabled: true,
109            verbose_logging: false,
110            collect_stats: true,
111            debug_ast_changes: false,
112            show_sql_transformations: false,
113        }
114    }
115}
116
117/// The main preprocessing pipeline
118///
119/// Orchestrates the application of multiple AST transformers in sequence.
120/// Provides logging, debugging, and statistics collection.
121///
122/// # Example
123///
124/// ```ignore
125/// use sql_cli::query_plan::{PreprocessingPipeline, PipelineConfig};
126/// use sql_cli::query_plan::{CTEHoister, ExpressionLifter};
127///
128/// let mut pipeline = PreprocessingPipeline::new(PipelineConfig::default());
129/// pipeline.add_transformer(Box::new(CTEHoister::new()));
130/// pipeline.add_transformer(Box::new(ExpressionLifter::new()));
131///
132/// let transformed = pipeline.process(statement)?;
133/// ```
134pub struct PreprocessingPipeline {
135    transformers: Vec<Box<dyn ASTTransformer>>,
136    config: PipelineConfig,
137    stats: PreprocessingStats,
138}
139
140impl PreprocessingPipeline {
141    /// Create a new empty pipeline with the given configuration
142    pub fn new(config: PipelineConfig) -> Self {
143        Self {
144            transformers: Vec::new(),
145            config,
146            stats: PreprocessingStats::default(),
147        }
148    }
149
150    /// Add a transformer to the pipeline
151    ///
152    /// Transformers are applied in the order they are added.
153    pub fn add_transformer(&mut self, transformer: Box<dyn ASTTransformer>) {
154        self.transformers.push(transformer);
155    }
156
157    /// Get the collected statistics
158    pub fn stats(&self) -> &PreprocessingStats {
159        &self.stats
160    }
161
162    /// Reset statistics
163    pub fn reset_stats(&mut self) {
164        self.stats = PreprocessingStats::default();
165    }
166
167    /// Process a SQL statement through the pipeline
168    ///
169    /// Applies each enabled transformer in sequence, collecting statistics
170    /// and logging as configured.
171    pub fn process(&mut self, mut stmt: SelectStatement) -> Result<SelectStatement> {
172        if !self.config.enabled {
173            debug!("Preprocessing pipeline is disabled");
174            return Ok(stmt);
175        }
176
177        let pipeline_start = Instant::now();
178        self.reset_stats();
179
180        info!(
181            "Starting preprocessing pipeline with {} transformer(s)",
182            self.transformers.len()
183        );
184
185        for transformer in &mut self.transformers {
186            if !transformer.enabled() {
187                debug!("Transformer '{}' is disabled, skipping", transformer.name());
188                continue;
189            }
190
191            let transform_start = Instant::now();
192
193            if self.config.verbose_logging {
194                info!(
195                    "Applying transformer: {} - {}",
196                    transformer.name(),
197                    transformer.description()
198                );
199            }
200
201            // Store original for comparison if debugging
202            let original_ast = if self.config.debug_ast_changes {
203                Some(format!("{:#?}", stmt))
204            } else {
205                None
206            };
207
208            // Store original SQL for comparison if showing transformations
209            let original_sql = if self.config.show_sql_transformations {
210                Some(crate::sql::parser::ast_formatter::format_select_statement(
211                    &stmt,
212                ))
213            } else {
214                None
215            };
216
217            // Call begin hook
218            transformer.begin()?;
219
220            // Apply transformation
221            let transformed = transformer.transform(stmt)?;
222
223            // Call end hook
224            transformer.end()?;
225
226            let duration = transform_start.elapsed();
227
228            // Check if anything changed (AST debug format)
229            let mut modifications = 0;
230            if let Some(original_ast_str) = original_ast {
231                let new_ast_str = format!("{:#?}", transformed);
232                if original_ast_str != new_ast_str {
233                    if self.config.debug_ast_changes {
234                        debug!("AST changed by '{}'", transformer.name());
235                        debug!("Before:\n{}", original_ast_str);
236                        debug!("After:\n{}", new_ast_str);
237                    }
238                    modifications = 1;
239                }
240            }
241
242            // Show SQL transformations if enabled
243            if let Some(original_sql_str) = original_sql {
244                let new_sql_str =
245                    crate::sql::parser::ast_formatter::format_select_statement(&transformed);
246                if original_sql_str != new_sql_str {
247                    eprintln!(
248                        "\n╔════════════════════════════════════════════════════════════════╗"
249                    );
250                    eprintln!("║ Transformer: {:<51} ║", transformer.name());
251                    eprintln!("╠════════════════════════════════════════════════════════════════╣");
252                    eprintln!("║ BEFORE:                                                        ║");
253                    eprintln!("╠════════════════════════════════════════════════════════════════╣");
254                    for line in original_sql_str.lines() {
255                        eprintln!("  {}", line);
256                    }
257                    eprintln!("╠════════════════════════════════════════════════════════════════╣");
258                    eprintln!("║ AFTER:                                                         ║");
259                    eprintln!("╠════════════════════════════════════════════════════════════════╣");
260                    for line in new_sql_str.lines() {
261                        eprintln!("  {}", line);
262                    }
263                    eprintln!(
264                        "╚════════════════════════════════════════════════════════════════╝\n"
265                    );
266                    modifications = 1;
267                }
268            }
269
270            // Record statistics
271            let stats = TransformStats {
272                transformer_name: transformer.name().to_string(),
273                duration_micros: duration.as_micros() as u64,
274                applied: true,
275                modifications,
276            };
277
278            self.stats.add_transform(stats);
279
280            stmt = transformed;
281        }
282
283        let total_duration = pipeline_start.elapsed();
284        self.stats.total_duration_micros = total_duration.as_micros() as u64;
285
286        if self.config.verbose_logging {
287            info!("Preprocessing complete: {}", self.stats.summary());
288        }
289
290        Ok(stmt)
291    }
292
293    /// Get a summary of what transformers are in the pipeline
294    pub fn transformer_summary(&self) -> String {
295        let enabled_count = self.transformers.iter().filter(|t| t.enabled()).count();
296        let total_count = self.transformers.len();
297
298        let names: Vec<String> = self
299            .transformers
300            .iter()
301            .map(|t| {
302                let status = if t.enabled() { "✓" } else { "✗" };
303                format!("{} {}", status, t.name())
304            })
305            .collect();
306
307        format!(
308            "{}/{} transformers enabled:\n{}",
309            enabled_count,
310            total_count,
311            names.join("\n")
312        )
313    }
314}
315
316impl Default for PreprocessingPipeline {
317    fn default() -> Self {
318        Self::new(PipelineConfig::default())
319    }
320}
321
322/// Builder for creating a preprocessing pipeline with common transformers
323pub struct PipelineBuilder {
324    pipeline: PreprocessingPipeline,
325}
326
327impl PipelineBuilder {
328    /// Start building a new pipeline
329    pub fn new() -> Self {
330        Self {
331            pipeline: PreprocessingPipeline::default(),
332        }
333    }
334
335    /// Start with a specific configuration
336    pub fn with_config(config: PipelineConfig) -> Self {
337        Self {
338            pipeline: PreprocessingPipeline::new(config),
339        }
340    }
341
342    /// Enable verbose logging
343    pub fn verbose(mut self) -> Self {
344        self.pipeline.config.verbose_logging = true;
345        self
346    }
347
348    /// Enable AST change debugging
349    pub fn debug_ast(mut self) -> Self {
350        self.pipeline.config.debug_ast_changes = true;
351        self
352    }
353
354    /// Add a custom transformer to the pipeline
355    pub fn with_transformer(mut self, transformer: Box<dyn ASTTransformer>) -> Self {
356        self.pipeline.add_transformer(transformer);
357        self
358    }
359
360    /// Build the final pipeline
361    pub fn build(self) -> PreprocessingPipeline {
362        self.pipeline
363    }
364}
365
366impl Default for PipelineBuilder {
367    fn default() -> Self {
368        Self::new()
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375    use crate::sql::parser::ast::SelectStatement;
376
377    // Mock transformer for testing
378    struct NoOpTransformer {
379        name: String,
380        enabled: bool,
381    }
382
383    impl ASTTransformer for NoOpTransformer {
384        fn name(&self) -> &str {
385            &self.name
386        }
387
388        fn description(&self) -> &str {
389            "Test transformer that does nothing"
390        }
391
392        fn enabled(&self) -> bool {
393            self.enabled
394        }
395
396        fn transform(&mut self, stmt: SelectStatement) -> Result<SelectStatement> {
397            Ok(stmt)
398        }
399    }
400
401    #[test]
402    fn test_empty_pipeline() {
403        let mut pipeline = PreprocessingPipeline::default();
404        let stmt = SelectStatement::default();
405        let result = pipeline.process(stmt);
406        assert!(result.is_ok());
407    }
408
409    #[test]
410    fn test_disabled_pipeline() {
411        let mut config = PipelineConfig::default();
412        config.enabled = false;
413
414        let mut pipeline = PreprocessingPipeline::new(config);
415        pipeline.add_transformer(Box::new(NoOpTransformer {
416            name: "test".to_string(),
417            enabled: true,
418        }));
419
420        let stmt = SelectStatement::default();
421        let result = pipeline.process(stmt);
422        assert!(result.is_ok());
423        assert_eq!(pipeline.stats().transformers_applied, 0);
424    }
425
426    #[test]
427    fn test_disabled_transformer() {
428        let mut pipeline = PreprocessingPipeline::default();
429        pipeline.add_transformer(Box::new(NoOpTransformer {
430            name: "disabled".to_string(),
431            enabled: false,
432        }));
433
434        let stmt = SelectStatement::default();
435        let result = pipeline.process(stmt);
436        assert!(result.is_ok());
437        assert_eq!(pipeline.stats().transformers_applied, 0);
438    }
439
440    #[test]
441    fn test_stats_collection() {
442        let mut pipeline = PreprocessingPipeline::default();
443        pipeline.add_transformer(Box::new(NoOpTransformer {
444            name: "test1".to_string(),
445            enabled: true,
446        }));
447        pipeline.add_transformer(Box::new(NoOpTransformer {
448            name: "test2".to_string(),
449            enabled: true,
450        }));
451
452        let stmt = SelectStatement::default();
453        let result = pipeline.process(stmt);
454        assert!(result.is_ok());
455        assert_eq!(pipeline.stats().transformers_applied, 2);
456        assert_eq!(pipeline.stats().transformations.len(), 2);
457    }
458
459    #[test]
460    fn test_builder() {
461        let pipeline = PipelineBuilder::new()
462            .verbose()
463            .debug_ast()
464            .with_transformer(Box::new(NoOpTransformer {
465                name: "test".to_string(),
466                enabled: true,
467            }))
468            .build();
469
470        assert!(pipeline.config.verbose_logging);
471        assert!(pipeline.config.debug_ast_changes);
472        assert_eq!(pipeline.transformers.len(), 1);
473    }
474}