sql_cli/execution/
statement_executor.rs1use anyhow::Result;
8use std::sync::Arc;
9use std::time::Instant;
10
11use crate::data::data_view::DataView;
12use crate::data::datatable::DataTable;
13use crate::data::query_engine::QueryEngine;
14use crate::query_plan::{create_pipeline_with_config, IntoClauseRemover};
15use crate::sql::parser::ast::SelectStatement;
16
17use super::config::ExecutionConfig;
18use super::context::ExecutionContext;
19
20#[derive(Debug)]
22pub struct ExecutionResult {
23 pub dataview: DataView,
25
26 pub stats: ExecutionStats,
28
29 pub transformed_ast: Option<SelectStatement>,
31}
32
33#[derive(Debug, Clone)]
35pub struct ExecutionStats {
36 pub preprocessing_time_ms: f64,
38
39 pub execution_time_ms: f64,
41
42 pub total_time_ms: f64,
44
45 pub row_count: usize,
47
48 pub column_count: usize,
50
51 pub preprocessing_applied: bool,
53}
54
55impl ExecutionStats {
56 fn new() -> Self {
57 Self {
58 preprocessing_time_ms: 0.0,
59 execution_time_ms: 0.0,
60 total_time_ms: 0.0,
61 row_count: 0,
62 column_count: 0,
63 preprocessing_applied: false,
64 }
65 }
66}
67
68pub struct StatementExecutor {
77 config: ExecutionConfig,
78}
79
80impl StatementExecutor {
81 pub fn new() -> Self {
83 Self {
84 config: ExecutionConfig::default(),
85 }
86 }
87
88 pub fn with_config(config: ExecutionConfig) -> Self {
90 Self { config }
91 }
92
93 pub fn execute(
110 &self,
111 stmt: SelectStatement,
112 context: &mut ExecutionContext,
113 ) -> Result<ExecutionResult> {
114 let total_start = Instant::now();
115 let mut stats = ExecutionStats::new();
116
117 let into_table_name = stmt.into_table.as_ref().map(|it| it.name.clone());
119
120 let source_table = if let Some(ref from_table) = stmt.from_table {
122 context.resolve_table(from_table)
123 } else {
124 Arc::new(DataTable::dual())
126 };
127
128 let preprocess_start = Instant::now();
130 let (transformed_stmt, preprocessing_applied) = self.apply_preprocessing(stmt)?;
131 stats.preprocessing_time_ms = preprocess_start.elapsed().as_secs_f64() * 1000.0;
132 stats.preprocessing_applied = preprocessing_applied;
133
134 let exec_start = Instant::now();
136 let result_view = self.execute_ast(transformed_stmt.clone(), source_table, context)?;
137 stats.execution_time_ms = exec_start.elapsed().as_secs_f64() * 1000.0;
138
139 if let Some(table_name) = into_table_name {
141 let engine = QueryEngine::with_case_insensitive(self.config.case_insensitive);
143 let temp_table = engine.materialize_view(result_view.clone())?;
144
145 context.store_temp_table(table_name.clone(), Arc::new(temp_table))?;
147 tracing::debug!("Stored temp table: {}", table_name);
148 }
149
150 stats.total_time_ms = total_start.elapsed().as_secs_f64() * 1000.0;
152 stats.row_count = result_view.row_count();
153 stats.column_count = result_view.column_count();
154
155 Ok(ExecutionResult {
156 dataview: result_view,
157 stats,
158 transformed_ast: Some(transformed_stmt),
159 })
160 }
161
162 fn apply_preprocessing(&self, mut stmt: SelectStatement) -> Result<(SelectStatement, bool)> {
166 let has_from_clause = stmt.from_table.is_some()
169 || stmt.from_subquery.is_some()
170 || stmt.from_function.is_some();
171
172 if !has_from_clause {
173 return Ok((stmt, false));
175 }
176
177 let mut pipeline = create_pipeline_with_config(
179 self.config.show_preprocessing,
180 self.config.show_sql_transformations,
181 self.config.transformer_config.clone(),
182 );
183
184 match pipeline.process(stmt.clone()) {
186 Ok(transformed) => {
187 let final_stmt = if transformed.into_table.is_some() {
189 IntoClauseRemover::remove_into_clause(transformed)
190 } else {
191 transformed
192 };
193
194 Ok((final_stmt, true))
195 }
196 Err(e) => {
197 tracing::debug!("Preprocessing failed: {}, using original statement", e);
199
200 let fallback = if stmt.into_table.is_some() {
202 IntoClauseRemover::remove_into_clause(stmt)
203 } else {
204 stmt
205 };
206
207 Ok((fallback, false))
208 }
209 }
210 }
211
212 fn execute_ast(
217 &self,
218 stmt: SelectStatement,
219 source_table: Arc<DataTable>,
220 context: &ExecutionContext,
221 ) -> Result<DataView> {
222 let engine = QueryEngine::with_case_insensitive(self.config.case_insensitive);
224
225 engine.execute_statement_with_temp_tables(source_table, stmt, Some(&context.temp_tables))
228 }
229
230 pub fn config(&self) -> &ExecutionConfig {
232 &self.config
233 }
234
235 pub fn set_config(&mut self, config: ExecutionConfig) {
237 self.config = config;
238 }
239}
240
241impl Default for StatementExecutor {
242 fn default() -> Self {
243 Self::new()
244 }
245}
246
247#[cfg(test)]
248mod tests {
249 use super::*;
250 use crate::data::datatable::{DataColumn, DataRow, DataType, DataValue};
251 use crate::sql::recursive_parser::Parser;
252
253 fn create_test_table(name: &str, rows: usize) -> DataTable {
254 let mut table = DataTable::new(name);
255 table.add_column(DataColumn::new("id").with_type(DataType::Integer));
256 table.add_column(DataColumn::new("name").with_type(DataType::String));
257
258 for i in 0..rows {
259 let _ = table.add_row(DataRow {
260 values: vec![
261 DataValue::Integer(i as i64),
262 DataValue::String(format!("name_{}", i)),
263 ],
264 });
265 }
266
267 table
268 }
269
270 #[test]
271 fn test_new_executor() {
272 let executor = StatementExecutor::new();
273 assert!(!executor.config().case_insensitive);
274 assert!(!executor.config().show_preprocessing);
275 }
276
277 #[test]
278 fn test_executor_with_config() {
279 let config = ExecutionConfig::new()
280 .with_case_insensitive(true)
281 .with_show_preprocessing(true);
282
283 let executor = StatementExecutor::with_config(config);
284 assert!(executor.config().case_insensitive);
285 assert!(executor.config().show_preprocessing);
286 }
287
288 #[test]
289 fn test_execute_simple_select() {
290 let table = create_test_table("test", 10);
291 let mut context = ExecutionContext::new(Arc::new(table));
292 let executor = StatementExecutor::new();
293
294 let mut parser = Parser::new("SELECT id, name FROM test WHERE id < 5");
296 let stmt = parser.parse().unwrap();
297
298 let result = executor.execute(stmt, &mut context).unwrap();
299
300 assert_eq!(result.dataview.row_count(), 5);
301 assert_eq!(result.dataview.column_count(), 2);
302 assert!(result.stats.total_time_ms >= 0.0);
303 }
304
305 #[test]
306 fn test_execute_select_star() {
307 let table = create_test_table("test", 5);
308 let mut context = ExecutionContext::new(Arc::new(table));
309 let executor = StatementExecutor::new();
310
311 let mut parser = Parser::new("SELECT * FROM test");
312 let stmt = parser.parse().unwrap();
313
314 let result = executor.execute(stmt, &mut context).unwrap();
315
316 assert_eq!(result.dataview.row_count(), 5);
317 assert_eq!(result.dataview.column_count(), 2);
318 }
319
320 #[test]
321 fn test_execute_with_dual() {
322 let table = create_test_table("test", 5);
323 let mut context = ExecutionContext::new(Arc::new(table));
324 let executor = StatementExecutor::new();
325
326 let mut parser = Parser::new("SELECT 1+1 as result");
328 let stmt = parser.parse().unwrap();
329
330 let result = executor.execute(stmt, &mut context).unwrap();
331
332 assert_eq!(result.dataview.row_count(), 1);
333 assert_eq!(result.dataview.column_count(), 1);
334 }
335
336 #[test]
337 fn test_execute_with_temp_table() {
338 let base_table = create_test_table("base", 10);
339 let mut context = ExecutionContext::new(Arc::new(base_table));
340 let executor = StatementExecutor::new();
341
342 let temp_table = create_test_table("#temp", 3);
344 context
345 .store_temp_table("#temp".to_string(), Arc::new(temp_table))
346 .unwrap();
347
348 let mut parser = Parser::new("SELECT * FROM #temp");
350 let stmt = parser.parse().unwrap();
351
352 let result = executor.execute(stmt, &mut context).unwrap();
353
354 assert_eq!(result.dataview.row_count(), 3);
355 }
356
357 #[test]
358 fn test_preprocessing_applied_with_from() {
359 let table = create_test_table("test", 10);
360 let mut context = ExecutionContext::new(Arc::new(table));
361 let executor = StatementExecutor::new();
362
363 let mut parser = Parser::new("SELECT id FROM test WHERE id > 0");
365 let stmt = parser.parse().unwrap();
366
367 let result = executor.execute(stmt, &mut context).unwrap();
368
369 assert!(result.stats.preprocessing_time_ms >= 0.0);
371 }
372
373 #[test]
374 fn test_no_preprocessing_without_from() {
375 let table = create_test_table("test", 10);
376 let mut context = ExecutionContext::new(Arc::new(table));
377 let executor = StatementExecutor::new();
378
379 let mut parser = Parser::new("SELECT 42 as answer");
381 let stmt = parser.parse().unwrap();
382
383 let result = executor.execute(stmt, &mut context).unwrap();
384
385 assert!(!result.stats.preprocessing_applied);
387 }
388
389 #[test]
390 fn test_execution_stats() {
391 let table = create_test_table("test", 100);
392 let mut context = ExecutionContext::new(Arc::new(table));
393 let executor = StatementExecutor::new();
394
395 let mut parser = Parser::new("SELECT * FROM test WHERE id < 50");
396 let stmt = parser.parse().unwrap();
397
398 let result = executor.execute(stmt, &mut context).unwrap();
399
400 let stats = result.stats;
401 assert_eq!(stats.row_count, 50);
402 assert_eq!(stats.column_count, 2);
403 assert!(stats.total_time_ms >= 0.0);
404 assert!(stats.total_time_ms >= stats.preprocessing_time_ms);
405 assert!(stats.total_time_ms >= stats.execution_time_ms);
406 }
407
408 #[test]
409 fn test_case_insensitive_execution() {
410 let table = create_test_table("test", 10);
411 let mut context = ExecutionContext::new(Arc::new(table));
412
413 let config = ExecutionConfig::new().with_case_insensitive(true);
414 let executor = StatementExecutor::with_config(config);
415
416 let mut parser = Parser::new("SELECT ID FROM test");
418 let stmt = parser.parse().unwrap();
419
420 let result = executor.execute(stmt, &mut context);
421
422 assert!(result.is_ok());
424 }
425}