sql_cli/query_plan/
pipeline.rs1use crate::sql::parser::ast::SelectStatement;
8use anyhow::Result;
9use std::time::Instant;
10use tracing::{debug, info};
11
12pub trait ASTTransformer: Send + Sync {
14 fn name(&self) -> &str;
16
17 fn description(&self) -> &str {
19 "No description provided"
20 }
21
22 fn enabled(&self) -> bool {
24 true
25 }
26
27 fn transform(&mut self, stmt: SelectStatement) -> Result<SelectStatement>;
32
33 fn begin(&mut self) -> Result<()> {
35 Ok(())
36 }
37
38 fn end(&mut self) -> Result<()> {
40 Ok(())
41 }
42}
43
44#[derive(Debug, Clone)]
46pub struct TransformStats {
47 pub transformer_name: String,
48 pub duration_micros: u64,
49 pub applied: bool,
50 pub modifications: usize,
51}
52
53#[derive(Debug, Clone, Default)]
55pub struct PreprocessingStats {
56 pub transformations: Vec<TransformStats>,
57 pub total_duration_micros: u64,
58 pub transformers_applied: usize,
59}
60
61impl PreprocessingStats {
62 pub fn add_transform(&mut self, stats: TransformStats) {
63 self.total_duration_micros += stats.duration_micros;
64 if stats.applied {
65 self.transformers_applied += 1;
66 }
67 self.transformations.push(stats);
68 }
69
70 pub fn summary(&self) -> String {
71 format!(
72 "{} transformer(s) applied in {:.2}ms",
73 self.transformers_applied,
74 self.total_duration_micros as f64 / 1000.0
75 )
76 }
77
78 pub fn has_modifications(&self) -> bool {
80 self.transformations
81 .iter()
82 .any(|stats| stats.modifications > 0)
83 }
84}
85
86#[derive(Debug, Clone)]
88pub struct PipelineConfig {
89 pub enabled: bool,
91
92 pub verbose_logging: bool,
94
95 pub collect_stats: bool,
97
98 pub debug_ast_changes: bool,
100
101 pub show_sql_transformations: bool,
103}
104
105impl Default for PipelineConfig {
106 fn default() -> Self {
107 Self {
108 enabled: true,
109 verbose_logging: false,
110 collect_stats: true,
111 debug_ast_changes: false,
112 show_sql_transformations: false,
113 }
114 }
115}
116
117pub struct PreprocessingPipeline {
135 transformers: Vec<Box<dyn ASTTransformer>>,
136 config: PipelineConfig,
137 stats: PreprocessingStats,
138}
139
140impl PreprocessingPipeline {
141 pub fn new(config: PipelineConfig) -> Self {
143 Self {
144 transformers: Vec::new(),
145 config,
146 stats: PreprocessingStats::default(),
147 }
148 }
149
150 pub fn add_transformer(&mut self, transformer: Box<dyn ASTTransformer>) {
154 self.transformers.push(transformer);
155 }
156
157 pub fn stats(&self) -> &PreprocessingStats {
159 &self.stats
160 }
161
162 pub fn reset_stats(&mut self) {
164 self.stats = PreprocessingStats::default();
165 }
166
167 pub fn process(&mut self, mut stmt: SelectStatement) -> Result<SelectStatement> {
172 if !self.config.enabled {
173 debug!("Preprocessing pipeline is disabled");
174 return Ok(stmt);
175 }
176
177 let pipeline_start = Instant::now();
178 self.reset_stats();
179
180 info!(
181 "Starting preprocessing pipeline with {} transformer(s)",
182 self.transformers.len()
183 );
184
185 for transformer in &mut self.transformers {
186 if !transformer.enabled() {
187 debug!("Transformer '{}' is disabled, skipping", transformer.name());
188 continue;
189 }
190
191 let transform_start = Instant::now();
192
193 if self.config.verbose_logging {
194 info!(
195 "Applying transformer: {} - {}",
196 transformer.name(),
197 transformer.description()
198 );
199 }
200
201 let original_ast = if self.config.debug_ast_changes {
203 Some(format!("{:#?}", stmt))
204 } else {
205 None
206 };
207
208 let original_sql = if self.config.show_sql_transformations {
210 Some(crate::sql::parser::ast_formatter::format_select_statement(
211 &stmt,
212 ))
213 } else {
214 None
215 };
216
217 transformer.begin()?;
219
220 let transformed = transformer.transform(stmt)?;
222
223 transformer.end()?;
225
226 let duration = transform_start.elapsed();
227
228 let mut modifications = 0;
230 if let Some(original_ast_str) = original_ast {
231 let new_ast_str = format!("{:#?}", transformed);
232 if original_ast_str != new_ast_str {
233 if self.config.debug_ast_changes {
234 debug!("AST changed by '{}'", transformer.name());
235 debug!("Before:\n{}", original_ast_str);
236 debug!("After:\n{}", new_ast_str);
237 }
238 modifications = 1;
239 }
240 }
241
242 if let Some(original_sql_str) = original_sql {
244 let new_sql_str =
245 crate::sql::parser::ast_formatter::format_select_statement(&transformed);
246 if original_sql_str != new_sql_str {
247 eprintln!(
248 "\n╔════════════════════════════════════════════════════════════════╗"
249 );
250 eprintln!("║ Transformer: {:<51} ║", transformer.name());
251 eprintln!("╠════════════════════════════════════════════════════════════════╣");
252 eprintln!("║ BEFORE: ║");
253 eprintln!("╠════════════════════════════════════════════════════════════════╣");
254 for line in original_sql_str.lines() {
255 eprintln!(" {}", line);
256 }
257 eprintln!("╠════════════════════════════════════════════════════════════════╣");
258 eprintln!("║ AFTER: ║");
259 eprintln!("╠════════════════════════════════════════════════════════════════╣");
260 for line in new_sql_str.lines() {
261 eprintln!(" {}", line);
262 }
263 eprintln!(
264 "╚════════════════════════════════════════════════════════════════╝\n"
265 );
266 modifications = 1;
267 }
268 }
269
270 let stats = TransformStats {
272 transformer_name: transformer.name().to_string(),
273 duration_micros: duration.as_micros() as u64,
274 applied: true,
275 modifications,
276 };
277
278 self.stats.add_transform(stats);
279
280 stmt = transformed;
281 }
282
283 let total_duration = pipeline_start.elapsed();
284 self.stats.total_duration_micros = total_duration.as_micros() as u64;
285
286 if self.config.verbose_logging {
287 info!("Preprocessing complete: {}", self.stats.summary());
288 }
289
290 Ok(stmt)
291 }
292
293 pub fn transformer_summary(&self) -> String {
295 let enabled_count = self.transformers.iter().filter(|t| t.enabled()).count();
296 let total_count = self.transformers.len();
297
298 let names: Vec<String> = self
299 .transformers
300 .iter()
301 .map(|t| {
302 let status = if t.enabled() { "✓" } else { "✗" };
303 format!("{} {}", status, t.name())
304 })
305 .collect();
306
307 format!(
308 "{}/{} transformers enabled:\n{}",
309 enabled_count,
310 total_count,
311 names.join("\n")
312 )
313 }
314}
315
316impl Default for PreprocessingPipeline {
317 fn default() -> Self {
318 Self::new(PipelineConfig::default())
319 }
320}
321
322pub struct PipelineBuilder {
324 pipeline: PreprocessingPipeline,
325}
326
327impl PipelineBuilder {
328 pub fn new() -> Self {
330 Self {
331 pipeline: PreprocessingPipeline::default(),
332 }
333 }
334
335 pub fn with_config(config: PipelineConfig) -> Self {
337 Self {
338 pipeline: PreprocessingPipeline::new(config),
339 }
340 }
341
342 pub fn verbose(mut self) -> Self {
344 self.pipeline.config.verbose_logging = true;
345 self
346 }
347
348 pub fn debug_ast(mut self) -> Self {
350 self.pipeline.config.debug_ast_changes = true;
351 self
352 }
353
354 pub fn with_transformer(mut self, transformer: Box<dyn ASTTransformer>) -> Self {
356 self.pipeline.add_transformer(transformer);
357 self
358 }
359
360 pub fn build(self) -> PreprocessingPipeline {
362 self.pipeline
363 }
364}
365
366impl Default for PipelineBuilder {
367 fn default() -> Self {
368 Self::new()
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375 use crate::sql::parser::ast::SelectStatement;
376
377 struct NoOpTransformer {
379 name: String,
380 enabled: bool,
381 }
382
383 impl ASTTransformer for NoOpTransformer {
384 fn name(&self) -> &str {
385 &self.name
386 }
387
388 fn description(&self) -> &str {
389 "Test transformer that does nothing"
390 }
391
392 fn enabled(&self) -> bool {
393 self.enabled
394 }
395
396 fn transform(&mut self, stmt: SelectStatement) -> Result<SelectStatement> {
397 Ok(stmt)
398 }
399 }
400
401 #[test]
402 fn test_empty_pipeline() {
403 let mut pipeline = PreprocessingPipeline::default();
404 let stmt = SelectStatement::default();
405 let result = pipeline.process(stmt);
406 assert!(result.is_ok());
407 }
408
409 #[test]
410 fn test_disabled_pipeline() {
411 let mut config = PipelineConfig::default();
412 config.enabled = false;
413
414 let mut pipeline = PreprocessingPipeline::new(config);
415 pipeline.add_transformer(Box::new(NoOpTransformer {
416 name: "test".to_string(),
417 enabled: true,
418 }));
419
420 let stmt = SelectStatement::default();
421 let result = pipeline.process(stmt);
422 assert!(result.is_ok());
423 assert_eq!(pipeline.stats().transformers_applied, 0);
424 }
425
426 #[test]
427 fn test_disabled_transformer() {
428 let mut pipeline = PreprocessingPipeline::default();
429 pipeline.add_transformer(Box::new(NoOpTransformer {
430 name: "disabled".to_string(),
431 enabled: false,
432 }));
433
434 let stmt = SelectStatement::default();
435 let result = pipeline.process(stmt);
436 assert!(result.is_ok());
437 assert_eq!(pipeline.stats().transformers_applied, 0);
438 }
439
440 #[test]
441 fn test_stats_collection() {
442 let mut pipeline = PreprocessingPipeline::default();
443 pipeline.add_transformer(Box::new(NoOpTransformer {
444 name: "test1".to_string(),
445 enabled: true,
446 }));
447 pipeline.add_transformer(Box::new(NoOpTransformer {
448 name: "test2".to_string(),
449 enabled: true,
450 }));
451
452 let stmt = SelectStatement::default();
453 let result = pipeline.process(stmt);
454 assert!(result.is_ok());
455 assert_eq!(pipeline.stats().transformers_applied, 2);
456 assert_eq!(pipeline.stats().transformations.len(), 2);
457 }
458
459 #[test]
460 fn test_builder() {
461 let pipeline = PipelineBuilder::new()
462 .verbose()
463 .debug_ast()
464 .with_transformer(Box::new(NoOpTransformer {
465 name: "test".to_string(),
466 enabled: true,
467 }))
468 .build();
469
470 assert!(pipeline.config.verbose_logging);
471 assert!(pipeline.config.debug_ast_changes);
472 assert_eq!(pipeline.transformers.len(), 1);
473 }
474}