1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::row_expanders::RowExpanderRegistry;
19use crate::data::subquery_executor::SubqueryExecutor;
20use crate::data::temp_table_registry::TempTableRegistry;
21use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
22use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
23use crate::sql::parser::ast::ColumnRef;
24use crate::sql::parser::ast::SetOperation;
25use crate::sql::parser::ast::TableSource;
26use crate::sql::parser::ast::WindowSpec;
27use crate::sql::recursive_parser::{
28 CTEType, OrderByItem, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
29 TableFunction,
30};
31
32#[derive(Debug, Clone)]
34pub struct ExecutionContext {
35 alias_map: HashMap<String, String>,
38}
39
40impl ExecutionContext {
41 pub fn new() -> Self {
43 Self {
44 alias_map: HashMap::new(),
45 }
46 }
47
48 pub fn register_alias(&mut self, alias: String, table_name: String) {
50 debug!("Registering alias: {} -> {}", alias, table_name);
51 self.alias_map.insert(alias, table_name);
52 }
53
54 pub fn resolve_alias(&self, name: &str) -> String {
57 self.alias_map
58 .get(name)
59 .cloned()
60 .unwrap_or_else(|| name.to_string())
61 }
62
63 pub fn is_alias(&self, name: &str) -> bool {
65 self.alias_map.contains_key(name)
66 }
67
68 pub fn get_aliases(&self) -> HashMap<String, String> {
70 self.alias_map.clone()
71 }
72
73 pub fn resolve_column_index(&self, table: &DataTable, column_ref: &ColumnRef) -> Result<usize> {
88 if let Some(table_prefix) = &column_ref.table_prefix {
89 let actual_table = self.resolve_alias(table_prefix);
91
92 let qualified_name = format!("{}.{}", actual_table, column_ref.name);
94 if let Some(idx) = table.find_column_by_qualified_name(&qualified_name) {
95 debug!(
96 "Resolved {}.{} -> qualified column '{}' at index {}",
97 table_prefix, column_ref.name, qualified_name, idx
98 );
99 return Ok(idx);
100 }
101
102 if let Some(idx) = table.get_column_index(&column_ref.name) {
104 debug!(
105 "Resolved {}.{} -> unqualified column '{}' at index {}",
106 table_prefix, column_ref.name, column_ref.name, idx
107 );
108 return Ok(idx);
109 }
110
111 Err(anyhow!(
113 "Column '{}' not found. Table '{}' may not support qualified column names",
114 qualified_name,
115 actual_table
116 ))
117 } else {
118 if let Some(idx) = table.get_column_index(&column_ref.name) {
120 debug!(
121 "Resolved unqualified column '{}' at index {}",
122 column_ref.name, idx
123 );
124 return Ok(idx);
125 }
126
127 if column_ref.name.contains('.') {
129 if let Some(idx) = table.find_column_by_qualified_name(&column_ref.name) {
130 debug!(
131 "Resolved '{}' as qualified column at index {}",
132 column_ref.name, idx
133 );
134 return Ok(idx);
135 }
136 }
137
138 let suggestion = self.find_similar_column(table, &column_ref.name);
140 match suggestion {
141 Some(similar) => Err(anyhow!(
142 "Column '{}' not found. Did you mean '{}'?",
143 column_ref.name,
144 similar
145 )),
146 None => Err(anyhow!("Column '{}' not found", column_ref.name)),
147 }
148 }
149 }
150
151 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
153 let columns = table.column_names();
154 let mut best_match: Option<(String, usize)> = None;
155
156 for col in columns {
157 let distance = edit_distance(name, &col);
158 if distance <= 2 {
159 match best_match {
161 Some((_, best_dist)) if distance < best_dist => {
162 best_match = Some((col.clone(), distance));
163 }
164 None => {
165 best_match = Some((col.clone(), distance));
166 }
167 _ => {}
168 }
169 }
170 }
171
172 best_match.map(|(name, _)| name)
173 }
174}
175
176impl Default for ExecutionContext {
177 fn default() -> Self {
178 Self::new()
179 }
180}
181
182fn edit_distance(a: &str, b: &str) -> usize {
184 let len_a = a.chars().count();
185 let len_b = b.chars().count();
186
187 if len_a == 0 {
188 return len_b;
189 }
190 if len_b == 0 {
191 return len_a;
192 }
193
194 let mut matrix = vec![vec![0; len_b + 1]; len_a + 1];
195
196 for i in 0..=len_a {
197 matrix[i][0] = i;
198 }
199 for j in 0..=len_b {
200 matrix[0][j] = j;
201 }
202
203 let a_chars: Vec<char> = a.chars().collect();
204 let b_chars: Vec<char> = b.chars().collect();
205
206 for i in 1..=len_a {
207 for j in 1..=len_b {
208 let cost = if a_chars[i - 1] == b_chars[j - 1] {
209 0
210 } else {
211 1
212 };
213 matrix[i][j] = min(
214 min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1),
215 matrix[i - 1][j - 1] + cost,
216 );
217 }
218 }
219
220 matrix[len_a][len_b]
221}
222
223#[derive(Clone)]
225pub struct QueryEngine {
226 case_insensitive: bool,
227 date_notation: String,
228 _behavior_config: Option<BehaviorConfig>,
229}
230
231impl Default for QueryEngine {
232 fn default() -> Self {
233 Self::new()
234 }
235}
236
237impl QueryEngine {
238 #[must_use]
239 pub fn new() -> Self {
240 Self {
241 case_insensitive: false,
242 date_notation: get_date_notation(),
243 _behavior_config: None,
244 }
245 }
246
247 #[must_use]
248 pub fn with_behavior_config(config: BehaviorConfig) -> Self {
249 let case_insensitive = config.case_insensitive_default;
250 let date_notation = get_date_notation();
252 Self {
253 case_insensitive,
254 date_notation,
255 _behavior_config: Some(config),
256 }
257 }
258
259 #[must_use]
260 pub fn with_date_notation(_date_notation: String) -> Self {
261 Self {
262 case_insensitive: false,
263 date_notation: get_date_notation(), _behavior_config: None,
265 }
266 }
267
268 #[must_use]
269 pub fn with_case_insensitive(case_insensitive: bool) -> Self {
270 Self {
271 case_insensitive,
272 date_notation: get_date_notation(),
273 _behavior_config: None,
274 }
275 }
276
277 #[must_use]
278 pub fn with_case_insensitive_and_date_notation(
279 case_insensitive: bool,
280 _date_notation: String, ) -> Self {
282 Self {
283 case_insensitive,
284 date_notation: get_date_notation(), _behavior_config: None,
286 }
287 }
288
289 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
291 let columns = table.column_names();
292 let mut best_match: Option<(String, usize)> = None;
293
294 for col in columns {
295 let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
296 let max_distance = if name.len() > 10 { 3 } else { 2 };
299 if distance <= max_distance {
300 match &best_match {
301 None => best_match = Some((col, distance)),
302 Some((_, best_dist)) if distance < *best_dist => {
303 best_match = Some((col, distance));
304 }
305 _ => {}
306 }
307 }
308 }
309
310 best_match.map(|(name, _)| name)
311 }
312
313 fn edit_distance(&self, s1: &str, s2: &str) -> usize {
315 let len1 = s1.len();
316 let len2 = s2.len();
317 let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
318
319 for i in 0..=len1 {
320 matrix[i][0] = i;
321 }
322 for j in 0..=len2 {
323 matrix[0][j] = j;
324 }
325
326 for (i, c1) in s1.chars().enumerate() {
327 for (j, c2) in s2.chars().enumerate() {
328 let cost = usize::from(c1 != c2);
329 matrix[i + 1][j + 1] = std::cmp::min(
330 matrix[i][j + 1] + 1, std::cmp::min(
332 matrix[i + 1][j] + 1, matrix[i][j] + cost, ),
335 );
336 }
337 }
338
339 matrix[len1][len2]
340 }
341
342 fn contains_unnest(expr: &SqlExpression) -> bool {
344 match expr {
345 SqlExpression::Unnest { .. } => true,
347 SqlExpression::FunctionCall { name, args, .. } => {
348 if name.to_uppercase() == "UNNEST" {
349 return true;
350 }
351 args.iter().any(Self::contains_unnest)
353 }
354 SqlExpression::BinaryOp { left, right, .. } => {
355 Self::contains_unnest(left) || Self::contains_unnest(right)
356 }
357 SqlExpression::Not { expr } => Self::contains_unnest(expr),
358 SqlExpression::CaseExpression {
359 when_branches,
360 else_branch,
361 } => {
362 when_branches.iter().any(|branch| {
363 Self::contains_unnest(&branch.condition)
364 || Self::contains_unnest(&branch.result)
365 }) || else_branch
366 .as_ref()
367 .map_or(false, |e| Self::contains_unnest(e))
368 }
369 SqlExpression::SimpleCaseExpression {
370 expr,
371 when_branches,
372 else_branch,
373 } => {
374 Self::contains_unnest(expr)
375 || when_branches.iter().any(|branch| {
376 Self::contains_unnest(&branch.value)
377 || Self::contains_unnest(&branch.result)
378 })
379 || else_branch
380 .as_ref()
381 .map_or(false, |e| Self::contains_unnest(e))
382 }
383 SqlExpression::InList { expr, values } => {
384 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
385 }
386 SqlExpression::NotInList { expr, values } => {
387 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
388 }
389 SqlExpression::Between { expr, lower, upper } => {
390 Self::contains_unnest(expr)
391 || Self::contains_unnest(lower)
392 || Self::contains_unnest(upper)
393 }
394 SqlExpression::InSubquery { expr, .. } => Self::contains_unnest(expr),
395 SqlExpression::NotInSubquery { expr, .. } => Self::contains_unnest(expr),
396 SqlExpression::ScalarSubquery { .. } => false, SqlExpression::WindowFunction { args, .. } => args.iter().any(Self::contains_unnest),
398 SqlExpression::MethodCall { args, .. } => args.iter().any(Self::contains_unnest),
399 SqlExpression::ChainedMethodCall { base, args, .. } => {
400 Self::contains_unnest(base) || args.iter().any(Self::contains_unnest)
401 }
402 _ => false,
403 }
404 }
405
406 fn collect_window_specs(expr: &SqlExpression, specs: &mut Vec<WindowSpec>) {
408 match expr {
409 SqlExpression::WindowFunction {
410 window_spec, args, ..
411 } => {
412 specs.push(window_spec.clone());
414 for arg in args {
416 Self::collect_window_specs(arg, specs);
417 }
418 }
419 SqlExpression::BinaryOp { left, right, .. } => {
420 Self::collect_window_specs(left, specs);
421 Self::collect_window_specs(right, specs);
422 }
423 SqlExpression::Not { expr } => {
424 Self::collect_window_specs(expr, specs);
425 }
426 SqlExpression::FunctionCall { args, .. } => {
427 for arg in args {
428 Self::collect_window_specs(arg, specs);
429 }
430 }
431 SqlExpression::CaseExpression {
432 when_branches,
433 else_branch,
434 } => {
435 for branch in when_branches {
436 Self::collect_window_specs(&branch.condition, specs);
437 Self::collect_window_specs(&branch.result, specs);
438 }
439 if let Some(else_expr) = else_branch {
440 Self::collect_window_specs(else_expr, specs);
441 }
442 }
443 SqlExpression::SimpleCaseExpression {
444 expr,
445 when_branches,
446 else_branch,
447 } => {
448 Self::collect_window_specs(expr, specs);
449 for branch in when_branches {
450 Self::collect_window_specs(&branch.value, specs);
451 Self::collect_window_specs(&branch.result, specs);
452 }
453 if let Some(else_expr) = else_branch {
454 Self::collect_window_specs(else_expr, specs);
455 }
456 }
457 SqlExpression::InList { expr, values, .. } => {
458 Self::collect_window_specs(expr, specs);
459 for item in values {
460 Self::collect_window_specs(item, specs);
461 }
462 }
463 SqlExpression::ChainedMethodCall { base, args, .. } => {
464 Self::collect_window_specs(base, specs);
465 for arg in args {
466 Self::collect_window_specs(arg, specs);
467 }
468 }
469 SqlExpression::Column(_)
471 | SqlExpression::NumberLiteral(_)
472 | SqlExpression::StringLiteral(_)
473 | SqlExpression::BooleanLiteral(_)
474 | SqlExpression::Null
475 | SqlExpression::DateTimeToday { .. }
476 | SqlExpression::DateTimeConstructor { .. }
477 | SqlExpression::MethodCall { .. } => {}
478 _ => {}
480 }
481 }
482
483 fn contains_window_function(expr: &SqlExpression) -> bool {
485 match expr {
486 SqlExpression::WindowFunction { .. } => true,
487 SqlExpression::BinaryOp { left, right, .. } => {
488 Self::contains_window_function(left) || Self::contains_window_function(right)
489 }
490 SqlExpression::Not { expr } => Self::contains_window_function(expr),
491 SqlExpression::FunctionCall { args, .. } => {
492 args.iter().any(Self::contains_window_function)
493 }
494 SqlExpression::CaseExpression {
495 when_branches,
496 else_branch,
497 } => {
498 when_branches.iter().any(|branch| {
499 Self::contains_window_function(&branch.condition)
500 || Self::contains_window_function(&branch.result)
501 }) || else_branch
502 .as_ref()
503 .map_or(false, |e| Self::contains_window_function(e))
504 }
505 SqlExpression::SimpleCaseExpression {
506 expr,
507 when_branches,
508 else_branch,
509 } => {
510 Self::contains_window_function(expr)
511 || when_branches.iter().any(|branch| {
512 Self::contains_window_function(&branch.value)
513 || Self::contains_window_function(&branch.result)
514 })
515 || else_branch
516 .as_ref()
517 .map_or(false, |e| Self::contains_window_function(e))
518 }
519 SqlExpression::InList { expr, values } => {
520 Self::contains_window_function(expr)
521 || values.iter().any(Self::contains_window_function)
522 }
523 SqlExpression::NotInList { expr, values } => {
524 Self::contains_window_function(expr)
525 || values.iter().any(Self::contains_window_function)
526 }
527 SqlExpression::Between { expr, lower, upper } => {
528 Self::contains_window_function(expr)
529 || Self::contains_window_function(lower)
530 || Self::contains_window_function(upper)
531 }
532 SqlExpression::InSubquery { expr, .. } => Self::contains_window_function(expr),
533 SqlExpression::NotInSubquery { expr, .. } => Self::contains_window_function(expr),
534 SqlExpression::MethodCall { args, .. } => {
535 args.iter().any(Self::contains_window_function)
536 }
537 SqlExpression::ChainedMethodCall { base, args, .. } => {
538 Self::contains_window_function(base)
539 || args.iter().any(Self::contains_window_function)
540 }
541 _ => false,
542 }
543 }
544
545 fn extract_window_specs(
547 items: &[SelectItem],
548 ) -> Vec<crate::data::batch_window_evaluator::WindowFunctionSpec> {
549 let mut specs = Vec::new();
550 for (idx, item) in items.iter().enumerate() {
551 if let SelectItem::Expression { expr, .. } = item {
552 Self::collect_window_function_specs(expr, idx, &mut specs);
553 }
554 }
555 specs
556 }
557
558 fn collect_window_function_specs(
560 expr: &SqlExpression,
561 output_column_index: usize,
562 specs: &mut Vec<crate::data::batch_window_evaluator::WindowFunctionSpec>,
563 ) {
564 match expr {
565 SqlExpression::WindowFunction {
566 name,
567 args,
568 window_spec,
569 } => {
570 specs.push(crate::data::batch_window_evaluator::WindowFunctionSpec {
571 spec: window_spec.clone(),
572 function_name: name.clone(),
573 args: args.clone(),
574 output_column_index,
575 });
576 }
577 SqlExpression::BinaryOp { left, right, .. } => {
578 Self::collect_window_function_specs(left, output_column_index, specs);
579 Self::collect_window_function_specs(right, output_column_index, specs);
580 }
581 SqlExpression::Not { expr } => {
582 Self::collect_window_function_specs(expr, output_column_index, specs);
583 }
584 SqlExpression::FunctionCall { args, .. } => {
585 for arg in args {
586 Self::collect_window_function_specs(arg, output_column_index, specs);
587 }
588 }
589 SqlExpression::CaseExpression {
590 when_branches,
591 else_branch,
592 } => {
593 for branch in when_branches {
594 Self::collect_window_function_specs(
595 &branch.condition,
596 output_column_index,
597 specs,
598 );
599 Self::collect_window_function_specs(&branch.result, output_column_index, specs);
600 }
601 if let Some(e) = else_branch {
602 Self::collect_window_function_specs(e, output_column_index, specs);
603 }
604 }
605 SqlExpression::SimpleCaseExpression {
606 expr,
607 when_branches,
608 else_branch,
609 } => {
610 Self::collect_window_function_specs(expr, output_column_index, specs);
611 for branch in when_branches {
612 Self::collect_window_function_specs(&branch.value, output_column_index, specs);
613 Self::collect_window_function_specs(&branch.result, output_column_index, specs);
614 }
615 if let Some(e) = else_branch {
616 Self::collect_window_function_specs(e, output_column_index, specs);
617 }
618 }
619 SqlExpression::InList { expr, values } => {
620 Self::collect_window_function_specs(expr, output_column_index, specs);
621 for val in values {
622 Self::collect_window_function_specs(val, output_column_index, specs);
623 }
624 }
625 SqlExpression::NotInList { expr, values } => {
626 Self::collect_window_function_specs(expr, output_column_index, specs);
627 for val in values {
628 Self::collect_window_function_specs(val, output_column_index, specs);
629 }
630 }
631 SqlExpression::Between { expr, lower, upper } => {
632 Self::collect_window_function_specs(expr, output_column_index, specs);
633 Self::collect_window_function_specs(lower, output_column_index, specs);
634 Self::collect_window_function_specs(upper, output_column_index, specs);
635 }
636 SqlExpression::InSubquery { expr, .. } => {
637 Self::collect_window_function_specs(expr, output_column_index, specs);
638 }
639 SqlExpression::NotInSubquery { expr, .. } => {
640 Self::collect_window_function_specs(expr, output_column_index, specs);
641 }
642 SqlExpression::MethodCall { args, .. } => {
643 for arg in args {
644 Self::collect_window_function_specs(arg, output_column_index, specs);
645 }
646 }
647 SqlExpression::ChainedMethodCall { base, args, .. } => {
648 Self::collect_window_function_specs(base, output_column_index, specs);
649 for arg in args {
650 Self::collect_window_function_specs(arg, output_column_index, specs);
651 }
652 }
653 _ => {} }
655 }
656
657 pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
659 let (view, _plan) = self.execute_with_plan(table, sql)?;
660 Ok(view)
661 }
662
663 pub fn execute_with_temp_tables(
665 &self,
666 table: Arc<DataTable>,
667 sql: &str,
668 temp_tables: Option<&TempTableRegistry>,
669 ) -> Result<DataView> {
670 let (view, _plan) = self.execute_with_plan_and_temp_tables(table, sql, temp_tables)?;
671 Ok(view)
672 }
673
674 pub fn execute_statement(
676 &self,
677 table: Arc<DataTable>,
678 statement: SelectStatement,
679 ) -> Result<DataView> {
680 self.execute_statement_with_temp_tables(table, statement, None)
681 }
682
683 pub fn execute_statement_with_temp_tables(
685 &self,
686 table: Arc<DataTable>,
687 statement: SelectStatement,
688 temp_tables: Option<&TempTableRegistry>,
689 ) -> Result<DataView> {
690 let mut cte_context = HashMap::new();
692
693 if let Some(temp_registry) = temp_tables {
695 for table_name in temp_registry.list_tables() {
696 if let Some(temp_table) = temp_registry.get(&table_name) {
697 debug!("Adding temp table {} to CTE context", table_name);
698 let view = DataView::new(temp_table);
699 cte_context.insert(table_name, Arc::new(view));
700 }
701 }
702 }
703
704 for cte in &statement.ctes {
705 debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
706 let cte_result = match &cte.cte_type {
708 CTEType::Standard(query) => {
709 let view = self.build_view_with_context(
711 table.clone(),
712 query.clone(),
713 &mut cte_context,
714 )?;
715
716 let mut materialized = self.materialize_view(view)?;
718
719 for column in materialized.columns_mut() {
721 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
722 column.source_table = Some(cte.name.clone());
723 }
724
725 DataView::new(Arc::new(materialized))
726 }
727 CTEType::Web(web_spec) => {
728 use crate::web::http_fetcher::WebDataFetcher;
730
731 let fetcher = WebDataFetcher::new()?;
732 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
734
735 for column in data_table.columns_mut() {
737 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
738 column.source_table = Some(cte.name.clone());
739 }
740
741 DataView::new(Arc::new(data_table))
743 }
744 CTEType::File(file_spec) => {
745 let mut data_table =
746 crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
747
748 for column in data_table.columns_mut() {
749 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
750 column.source_table = Some(cte.name.clone());
751 }
752
753 DataView::new(Arc::new(data_table))
754 }
755 };
756 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
758 debug!(
759 "QueryEngine: CTE '{}' pre-processed, stored in context",
760 cte.name
761 );
762 }
763
764 let mut subquery_executor =
766 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
767 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
768
769 self.build_view_with_context(table, processed_statement, &mut cte_context)
771 }
772
773 pub fn execute_statement_with_cte_context(
775 &self,
776 table: Arc<DataTable>,
777 statement: SelectStatement,
778 cte_context: &HashMap<String, Arc<DataView>>,
779 ) -> Result<DataView> {
780 let mut local_context = cte_context.clone();
782
783 for cte in &statement.ctes {
785 debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
786 let cte_result = match &cte.cte_type {
787 CTEType::Standard(query) => {
788 let view = self.build_view_with_context(
789 table.clone(),
790 query.clone(),
791 &mut local_context,
792 )?;
793
794 let mut materialized = self.materialize_view(view)?;
796
797 for column in materialized.columns_mut() {
799 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
800 column.source_table = Some(cte.name.clone());
801 }
802
803 DataView::new(Arc::new(materialized))
804 }
805 CTEType::Web(web_spec) => {
806 use crate::web::http_fetcher::WebDataFetcher;
808
809 let fetcher = WebDataFetcher::new()?;
810 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
812
813 for column in data_table.columns_mut() {
815 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
816 column.source_table = Some(cte.name.clone());
817 }
818
819 DataView::new(Arc::new(data_table))
821 }
822 CTEType::File(file_spec) => {
823 let mut data_table =
824 crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
825
826 for column in data_table.columns_mut() {
827 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
828 column.source_table = Some(cte.name.clone());
829 }
830
831 DataView::new(Arc::new(data_table))
832 }
833 };
834 local_context.insert(cte.name.clone(), Arc::new(cte_result));
835 }
836
837 let mut subquery_executor =
839 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
840 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
841
842 self.build_view_with_context(table, processed_statement, &mut local_context)
844 }
845
846 pub fn execute_with_plan(
848 &self,
849 table: Arc<DataTable>,
850 sql: &str,
851 ) -> Result<(DataView, ExecutionPlan)> {
852 self.execute_with_plan_and_temp_tables(table, sql, None)
853 }
854
855 pub fn execute_with_plan_and_temp_tables(
857 &self,
858 table: Arc<DataTable>,
859 sql: &str,
860 temp_tables: Option<&TempTableRegistry>,
861 ) -> Result<(DataView, ExecutionPlan)> {
862 let mut plan_builder = ExecutionPlanBuilder::new();
863 let start_time = Instant::now();
864
865 plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
867 plan_builder.add_detail(format!("Query: {}", sql));
868 let mut parser = Parser::new(sql);
869 let statement = parser
870 .parse()
871 .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
872 plan_builder.add_detail(format!("Parsed successfully"));
873 if let Some(ref from_source) = statement.from_source {
874 match from_source {
875 TableSource::Table(name) => {
876 plan_builder.add_detail(format!("FROM: {}", name));
877 }
878 TableSource::DerivedTable { alias, .. } => {
879 plan_builder.add_detail(format!("FROM: derived table (alias: {})", alias));
880 }
881 TableSource::Pivot { .. } => {
882 plan_builder.add_detail("FROM: PIVOT".to_string());
883 }
884 }
885 }
886 if statement.where_clause.is_some() {
887 plan_builder.add_detail("WHERE clause present".to_string());
888 }
889 plan_builder.end_step();
890
891 let mut cte_context = HashMap::new();
893
894 if let Some(temp_registry) = temp_tables {
896 for table_name in temp_registry.list_tables() {
897 if let Some(temp_table) = temp_registry.get(&table_name) {
898 debug!("Adding temp table {} to CTE context", table_name);
899 let view = DataView::new(temp_table);
900 cte_context.insert(table_name, Arc::new(view));
901 }
902 }
903 }
904
905 if !statement.ctes.is_empty() {
906 plan_builder.begin_step(
907 StepType::CTE,
908 format!("Process {} CTEs", statement.ctes.len()),
909 );
910
911 for cte in &statement.ctes {
912 let cte_start = Instant::now();
913 plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
914
915 let cte_result = match &cte.cte_type {
916 CTEType::Standard(query) => {
917 if let Some(ref from_source) = query.from_source {
919 match from_source {
920 TableSource::Table(name) => {
921 plan_builder.add_detail(format!("Source: {}", name));
922 }
923 TableSource::DerivedTable { alias, .. } => {
924 plan_builder
925 .add_detail(format!("Source: derived table ({})", alias));
926 }
927 TableSource::Pivot { .. } => {
928 plan_builder.add_detail("Source: PIVOT".to_string());
929 }
930 }
931 }
932 if query.where_clause.is_some() {
933 plan_builder.add_detail("Has WHERE clause".to_string());
934 }
935 if query.group_by.is_some() {
936 plan_builder.add_detail("Has GROUP BY".to_string());
937 }
938
939 debug!(
940 "QueryEngine: Processing CTE '{}' with existing context: {:?}",
941 cte.name,
942 cte_context.keys().collect::<Vec<_>>()
943 );
944
945 let mut subquery_executor = SubqueryExecutor::with_cte_context(
948 self.clone(),
949 table.clone(),
950 cte_context.clone(),
951 );
952 let processed_query = subquery_executor.execute_subqueries(query)?;
953
954 let view = self.build_view_with_context(
955 table.clone(),
956 processed_query,
957 &mut cte_context,
958 )?;
959
960 let mut materialized = self.materialize_view(view)?;
962
963 for column in materialized.columns_mut() {
965 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
966 column.source_table = Some(cte.name.clone());
967 }
968
969 DataView::new(Arc::new(materialized))
970 }
971 CTEType::Web(web_spec) => {
972 plan_builder.add_detail(format!("URL: {}", web_spec.url));
973 if let Some(format) = &web_spec.format {
974 plan_builder.add_detail(format!("Format: {:?}", format));
975 }
976 if let Some(cache) = web_spec.cache_seconds {
977 plan_builder.add_detail(format!("Cache: {} seconds", cache));
978 }
979
980 use crate::web::http_fetcher::WebDataFetcher;
982
983 let fetcher = WebDataFetcher::new()?;
984 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
986
987 for column in data_table.columns_mut() {
989 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
990 column.source_table = Some(cte.name.clone());
991 }
992
993 DataView::new(Arc::new(data_table))
995 }
996 CTEType::File(file_spec) => {
997 plan_builder.add_detail(format!("PATH: {}", file_spec.path));
998 if file_spec.recursive {
999 plan_builder.add_detail("RECURSIVE".to_string());
1000 }
1001 if let Some(ref g) = file_spec.glob {
1002 plan_builder.add_detail(format!("GLOB: {}", g));
1003 }
1004 if let Some(d) = file_spec.max_depth {
1005 plan_builder.add_detail(format!("MAX_DEPTH: {}", d));
1006 }
1007
1008 let mut data_table =
1009 crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
1010
1011 for column in data_table.columns_mut() {
1012 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1013 column.source_table = Some(cte.name.clone());
1014 }
1015
1016 DataView::new(Arc::new(data_table))
1017 }
1018 };
1019
1020 plan_builder.set_rows_out(cte_result.row_count());
1022 plan_builder.add_detail(format!(
1023 "Result: {} rows, {} columns",
1024 cte_result.row_count(),
1025 cte_result.column_count()
1026 ));
1027 plan_builder.add_detail(format!(
1028 "Execution time: {:.3}ms",
1029 cte_start.elapsed().as_secs_f64() * 1000.0
1030 ));
1031
1032 debug!(
1033 "QueryEngine: Storing CTE '{}' in context with {} rows",
1034 cte.name,
1035 cte_result.row_count()
1036 );
1037 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1038 plan_builder.end_step();
1039 }
1040
1041 plan_builder.add_detail(format!(
1042 "All {} CTEs cached in context",
1043 statement.ctes.len()
1044 ));
1045 plan_builder.end_step();
1046 }
1047
1048 plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
1050 let mut subquery_executor =
1051 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
1052
1053 let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
1055 format!("{:?}", w).contains("Subquery")
1057 });
1058
1059 if has_subqueries {
1060 plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
1061 }
1062
1063 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
1064
1065 if has_subqueries {
1066 plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
1067 } else {
1068 plan_builder.add_detail("No subqueries to process".to_string());
1069 }
1070
1071 plan_builder.end_step();
1072 let result = self.build_view_with_context_and_plan(
1073 table,
1074 processed_statement,
1075 &mut cte_context,
1076 &mut plan_builder,
1077 )?;
1078
1079 let total_duration = start_time.elapsed();
1080 info!(
1081 "Query execution complete: total={:?}, rows={}",
1082 total_duration,
1083 result.row_count()
1084 );
1085
1086 let plan = plan_builder.build();
1087 Ok((result, plan))
1088 }
1089
1090 fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
1092 let mut cte_context = HashMap::new();
1093 self.build_view_with_context(table, statement, &mut cte_context)
1094 }
1095
1096 fn build_view_with_context(
1098 &self,
1099 table: Arc<DataTable>,
1100 statement: SelectStatement,
1101 cte_context: &mut HashMap<String, Arc<DataView>>,
1102 ) -> Result<DataView> {
1103 let mut dummy_plan = ExecutionPlanBuilder::new();
1104 let mut exec_context = ExecutionContext::new();
1105 self.build_view_with_context_and_plan_and_exec(
1106 table,
1107 statement,
1108 cte_context,
1109 &mut dummy_plan,
1110 &mut exec_context,
1111 )
1112 }
1113
1114 fn build_view_with_context_and_plan(
1116 &self,
1117 table: Arc<DataTable>,
1118 statement: SelectStatement,
1119 cte_context: &mut HashMap<String, Arc<DataView>>,
1120 plan: &mut ExecutionPlanBuilder,
1121 ) -> Result<DataView> {
1122 let mut exec_context = ExecutionContext::new();
1123 self.build_view_with_context_and_plan_and_exec(
1124 table,
1125 statement,
1126 cte_context,
1127 plan,
1128 &mut exec_context,
1129 )
1130 }
1131
1132 fn build_view_with_context_and_plan_and_exec(
1134 &self,
1135 table: Arc<DataTable>,
1136 statement: SelectStatement,
1137 cte_context: &mut HashMap<String, Arc<DataView>>,
1138 plan: &mut ExecutionPlanBuilder,
1139 exec_context: &mut ExecutionContext,
1140 ) -> Result<DataView> {
1141 for cte in &statement.ctes {
1143 if cte_context.contains_key(&cte.name) {
1145 debug!(
1146 "QueryEngine: CTE '{}' already in context, skipping",
1147 cte.name
1148 );
1149 continue;
1150 }
1151
1152 debug!("QueryEngine: Processing CTE '{}'...", cte.name);
1153 debug!(
1154 "QueryEngine: Available CTEs for '{}': {:?}",
1155 cte.name,
1156 cte_context.keys().collect::<Vec<_>>()
1157 );
1158
1159 let cte_result = match &cte.cte_type {
1161 CTEType::Standard(query) => {
1162 let view =
1163 self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
1164
1165 let mut materialized = self.materialize_view(view)?;
1167
1168 for column in materialized.columns_mut() {
1170 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1171 column.source_table = Some(cte.name.clone());
1172 }
1173
1174 DataView::new(Arc::new(materialized))
1175 }
1176 CTEType::Web(_web_spec) => {
1177 return Err(anyhow!(
1179 "Web CTEs should be processed in execute_select method"
1180 ));
1181 }
1182 CTEType::File(_file_spec) => {
1183 return Err(anyhow!(
1185 "FILE CTEs should be processed in execute_select method"
1186 ));
1187 }
1188 };
1189
1190 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1192 debug!(
1193 "QueryEngine: CTE '{}' processed, stored in context",
1194 cte.name
1195 );
1196 }
1197
1198 let source_table = if let Some(ref from_source) = statement.from_source {
1200 match from_source {
1201 TableSource::Table(table_name) => {
1202 if let Some(cte_view) = cte_context.get(table_name) {
1204 debug!("QueryEngine: Using CTE '{}' as source table", table_name);
1205 let mut materialized = self.materialize_view((**cte_view).clone())?;
1207
1208 #[allow(deprecated)]
1210 if let Some(ref alias) = statement.from_alias {
1211 debug!(
1212 "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1213 alias, table_name
1214 );
1215 for column in materialized.columns_mut() {
1216 if let Some(ref qualified_name) = column.qualified_name {
1218 if qualified_name.starts_with(&format!("{}.", table_name)) {
1219 column.qualified_name = Some(qualified_name.replace(
1220 &format!("{}.", table_name),
1221 &format!("{}.", alias),
1222 ));
1223 }
1224 }
1225 if column.source_table.as_ref() == Some(table_name) {
1227 column.source_table = Some(alias.clone());
1228 }
1229 }
1230 }
1231
1232 Arc::new(materialized)
1233 } else {
1234 table.clone()
1236 }
1237 }
1238 TableSource::DerivedTable { query, alias } => {
1239 debug!(
1241 "QueryEngine: Processing FROM derived table (alias: {})",
1242 alias
1243 );
1244 let subquery_result =
1245 self.build_view_with_context(table.clone(), *query.clone(), cte_context)?;
1246
1247 let mut materialized = self.materialize_view(subquery_result)?;
1250
1251 for column in materialized.columns_mut() {
1255 column.source_table = Some(alias.clone());
1256 }
1257
1258 Arc::new(materialized)
1259 }
1260 TableSource::Pivot { .. } => {
1261 return Err(anyhow!(
1263 "PIVOT in FROM clause should have been expanded by preprocessing pipeline"
1264 ));
1265 }
1266 }
1267 } else {
1268 #[allow(deprecated)]
1270 if let Some(ref table_func) = statement.from_function {
1271 debug!("QueryEngine: Processing table function (deprecated field)...");
1273 match table_func {
1274 TableFunction::Generator { name, args } => {
1275 use crate::sql::generators::GeneratorRegistry;
1277
1278 let registry = GeneratorRegistry::new();
1280
1281 if let Some(generator) = registry.get(name) {
1282 let mut evaluator = ArithmeticEvaluator::with_date_notation(
1284 &table,
1285 self.date_notation.clone(),
1286 );
1287 let dummy_row = 0;
1288
1289 let mut evaluated_args = Vec::new();
1290 for arg in args {
1291 evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
1292 }
1293
1294 generator.generate(evaluated_args)?
1296 } else {
1297 return Err(anyhow!("Unknown generator function: {}", name));
1298 }
1299 }
1300 }
1301 } else {
1302 #[allow(deprecated)]
1303 if let Some(ref subquery) = statement.from_subquery {
1304 debug!("QueryEngine: Processing FROM subquery (deprecated field)...");
1306 let subquery_result = self.build_view_with_context(
1307 table.clone(),
1308 *subquery.clone(),
1309 cte_context,
1310 )?;
1311
1312 let materialized = self.materialize_view(subquery_result)?;
1315 Arc::new(materialized)
1316 } else {
1317 #[allow(deprecated)]
1318 if let Some(ref table_name) = statement.from_table {
1319 if let Some(cte_view) = cte_context.get(table_name) {
1321 debug!(
1322 "QueryEngine: Using CTE '{}' as source table (deprecated field)",
1323 table_name
1324 );
1325 let mut materialized = self.materialize_view((**cte_view).clone())?;
1327
1328 #[allow(deprecated)]
1330 if let Some(ref alias) = statement.from_alias {
1331 debug!(
1332 "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1333 alias, table_name
1334 );
1335 for column in materialized.columns_mut() {
1336 if let Some(ref qualified_name) = column.qualified_name {
1338 if qualified_name.starts_with(&format!("{}.", table_name)) {
1339 column.qualified_name = Some(qualified_name.replace(
1340 &format!("{}.", table_name),
1341 &format!("{}.", alias),
1342 ));
1343 }
1344 }
1345 if column.source_table.as_ref() == Some(table_name) {
1347 column.source_table = Some(alias.clone());
1348 }
1349 }
1350 }
1351
1352 Arc::new(materialized)
1353 } else {
1354 table.clone()
1356 }
1357 } else {
1358 table.clone()
1360 }
1361 }
1362 }
1363 };
1364
1365 #[allow(deprecated)]
1367 if let Some(ref alias) = statement.from_alias {
1368 #[allow(deprecated)]
1369 if let Some(ref table_name) = statement.from_table {
1370 exec_context.register_alias(alias.clone(), table_name.clone());
1371 }
1372 }
1373
1374 let final_table = if !statement.joins.is_empty() {
1376 plan.begin_step(
1377 StepType::Join,
1378 format!("Process {} JOINs", statement.joins.len()),
1379 );
1380 plan.set_rows_in(source_table.row_count());
1381
1382 let join_executor = HashJoinExecutor::new(self.case_insensitive);
1383 let mut current_table = source_table;
1384
1385 for (idx, join_clause) in statement.joins.iter().enumerate() {
1386 let join_start = Instant::now();
1387 plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
1388 plan.add_detail(format!("Type: {:?}", join_clause.join_type));
1389 plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
1390 plan.add_detail(format!(
1391 "Executing {:?} JOIN on {} condition(s)",
1392 join_clause.join_type,
1393 join_clause.condition.conditions.len()
1394 ));
1395
1396 let right_table = match &join_clause.table {
1398 TableSource::Table(name) => {
1399 if let Some(cte_view) = cte_context.get(name) {
1401 let mut materialized = self.materialize_view((**cte_view).clone())?;
1402
1403 if let Some(ref alias) = join_clause.alias {
1405 debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
1406 for column in materialized.columns_mut() {
1407 if let Some(ref qualified_name) = column.qualified_name {
1409 if qualified_name.starts_with(&format!("{}.", name)) {
1410 column.qualified_name = Some(qualified_name.replace(
1411 &format!("{}.", name),
1412 &format!("{}.", alias),
1413 ));
1414 }
1415 }
1416 if column.source_table.as_ref() == Some(name) {
1418 column.source_table = Some(alias.clone());
1419 }
1420 }
1421 }
1422
1423 Arc::new(materialized)
1424 } else {
1425 return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
1428 }
1429 }
1430 TableSource::DerivedTable { query, alias: _ } => {
1431 let subquery_result = self.build_view_with_context(
1433 table.clone(),
1434 *query.clone(),
1435 cte_context,
1436 )?;
1437 let materialized = self.materialize_view(subquery_result)?;
1438 Arc::new(materialized)
1439 }
1440 TableSource::Pivot { .. } => {
1441 return Err(anyhow!("PIVOT in JOIN clause is not yet supported"));
1443 }
1444 };
1445
1446 let joined = join_executor.execute_join(
1448 current_table.clone(),
1449 join_clause,
1450 right_table.clone(),
1451 )?;
1452
1453 plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
1454 plan.set_rows_out(joined.row_count());
1455 plan.add_detail(format!("Result: {} rows", joined.row_count()));
1456 plan.add_detail(format!(
1457 "Join time: {:.3}ms",
1458 join_start.elapsed().as_secs_f64() * 1000.0
1459 ));
1460 plan.end_step();
1461
1462 current_table = Arc::new(joined);
1463 }
1464
1465 plan.set_rows_out(current_table.row_count());
1466 plan.add_detail(format!(
1467 "Final result after all joins: {} rows",
1468 current_table.row_count()
1469 ));
1470 plan.end_step();
1471 current_table
1472 } else {
1473 source_table
1474 };
1475
1476 self.build_view_internal_with_plan_and_exec(
1478 final_table,
1479 statement,
1480 plan,
1481 Some(exec_context),
1482 )
1483 }
1484
1485 pub fn materialize_view(&self, view: DataView) -> Result<DataTable> {
1487 let source = view.source();
1488 let mut result_table = DataTable::new("derived");
1489
1490 let visible_cols = view.visible_column_indices().to_vec();
1492
1493 for col_idx in &visible_cols {
1495 let col = &source.columns[*col_idx];
1496 let new_col = DataColumn {
1497 name: col.name.clone(),
1498 data_type: col.data_type.clone(),
1499 nullable: col.nullable,
1500 unique_values: col.unique_values,
1501 null_count: col.null_count,
1502 metadata: col.metadata.clone(),
1503 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), };
1506 result_table.add_column(new_col);
1507 }
1508
1509 for row_idx in view.visible_row_indices() {
1511 let source_row = &source.rows[*row_idx];
1512 let mut new_row = DataRow { values: Vec::new() };
1513
1514 for col_idx in &visible_cols {
1515 new_row.values.push(source_row.values[*col_idx].clone());
1516 }
1517
1518 result_table.add_row(new_row);
1519 }
1520
1521 Ok(result_table)
1522 }
1523
1524 fn build_view_internal(
1525 &self,
1526 table: Arc<DataTable>,
1527 statement: SelectStatement,
1528 ) -> Result<DataView> {
1529 let mut dummy_plan = ExecutionPlanBuilder::new();
1530 self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
1531 }
1532
1533 fn build_view_internal_with_plan(
1534 &self,
1535 table: Arc<DataTable>,
1536 statement: SelectStatement,
1537 plan: &mut ExecutionPlanBuilder,
1538 ) -> Result<DataView> {
1539 self.build_view_internal_with_plan_and_exec(table, statement, plan, None)
1540 }
1541
1542 fn build_view_internal_with_plan_and_exec(
1543 &self,
1544 table: Arc<DataTable>,
1545 statement: SelectStatement,
1546 plan: &mut ExecutionPlanBuilder,
1547 exec_context: Option<&ExecutionContext>,
1548 ) -> Result<DataView> {
1549 debug!(
1550 "QueryEngine::build_view - select_items: {:?}",
1551 statement.select_items
1552 );
1553 debug!(
1554 "QueryEngine::build_view - where_clause: {:?}",
1555 statement.where_clause
1556 );
1557
1558 let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
1560
1561 if let Some(where_clause) = &statement.where_clause {
1563 let total_rows = table.row_count();
1564 debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
1565 debug!("QueryEngine: WHERE clause = {:?}", where_clause);
1566
1567 plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
1568 plan.set_rows_in(total_rows);
1569 plan.add_detail(format!("Input: {} rows", total_rows));
1570
1571 for condition in &where_clause.conditions {
1573 plan.add_detail(format!("Condition: {:?}", condition.expr));
1574 }
1575
1576 let filter_start = Instant::now();
1577 let mut eval_context = EvaluationContext::new(self.case_insensitive);
1579
1580 let mut evaluator = if let Some(exec_ctx) = exec_context {
1582 RecursiveWhereEvaluator::with_both_contexts(&table, &mut eval_context, exec_ctx)
1584 } else {
1585 RecursiveWhereEvaluator::with_context(&table, &mut eval_context)
1586 };
1587
1588 let mut filtered_rows = Vec::new();
1590 for row_idx in visible_rows {
1591 if row_idx < 3 {
1593 debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
1594 }
1595
1596 match evaluator.evaluate(where_clause, row_idx) {
1597 Ok(result) => {
1598 if row_idx < 3 {
1599 debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
1600 }
1601 if result {
1602 filtered_rows.push(row_idx);
1603 }
1604 }
1605 Err(e) => {
1606 if row_idx < 3 {
1607 debug!(
1608 "QueryEngine: WHERE evaluation error for row {}: {}",
1609 row_idx, e
1610 );
1611 }
1612 return Err(e);
1614 }
1615 }
1616 }
1617
1618 let (compilations, cache_hits) = eval_context.get_stats();
1620 if compilations > 0 || cache_hits > 0 {
1621 debug!(
1622 "LIKE pattern cache: {} compilations, {} cache hits",
1623 compilations, cache_hits
1624 );
1625 }
1626 visible_rows = filtered_rows;
1627 let filter_duration = filter_start.elapsed();
1628 info!(
1629 "WHERE clause filtering: {} rows -> {} rows in {:?}",
1630 total_rows,
1631 visible_rows.len(),
1632 filter_duration
1633 );
1634
1635 plan.set_rows_out(visible_rows.len());
1636 plan.add_detail(format!("Output: {} rows", visible_rows.len()));
1637 plan.add_detail(format!(
1638 "Filter time: {:.3}ms",
1639 filter_duration.as_secs_f64() * 1000.0
1640 ));
1641 plan.end_step();
1642 }
1643
1644 let mut view = DataView::new(table.clone());
1646 view = view.with_rows(visible_rows);
1647
1648 if let Some(group_by_exprs) = &statement.group_by {
1650 if !group_by_exprs.is_empty() {
1651 debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
1652
1653 plan.begin_step(
1654 StepType::GroupBy,
1655 format!("GROUP BY {} expressions", group_by_exprs.len()),
1656 );
1657 plan.set_rows_in(view.row_count());
1658 plan.add_detail(format!("Input: {} rows", view.row_count()));
1659 for expr in group_by_exprs {
1660 plan.add_detail(format!("Group by: {:?}", expr));
1661 }
1662
1663 let group_start = Instant::now();
1664 view = self.apply_group_by(
1665 view,
1666 group_by_exprs,
1667 &statement.select_items,
1668 statement.having.as_ref(),
1669 plan,
1670 )?;
1671
1672 use crate::query_plan::having_alias_transformer::HIDDEN_AGG_PREFIX;
1675 let hidden_indices: Vec<usize> = view
1676 .source()
1677 .columns
1678 .iter()
1679 .enumerate()
1680 .filter_map(|(i, c)| {
1681 if c.name.starts_with(HIDDEN_AGG_PREFIX) {
1682 Some(i)
1683 } else {
1684 None
1685 }
1686 })
1687 .collect();
1688 for &idx in hidden_indices.iter().rev() {
1689 view.hide_column(idx);
1690 }
1691
1692 plan.set_rows_out(view.row_count());
1693 plan.add_detail(format!("Output: {} groups", view.row_count()));
1694 plan.add_detail(format!(
1695 "Overall time: {:.3}ms",
1696 group_start.elapsed().as_secs_f64() * 1000.0
1697 ));
1698 plan.end_step();
1699 }
1700 } else {
1701 if !statement.select_items.is_empty() {
1703 let has_non_star_items = statement
1705 .select_items
1706 .iter()
1707 .any(|item| !matches!(item, SelectItem::Star { .. }));
1708
1709 if has_non_star_items || statement.select_items.len() > 1 {
1713 view = self.apply_select_items(
1714 view,
1715 &statement.select_items,
1716 &statement,
1717 exec_context,
1718 plan,
1719 )?;
1720 }
1721 } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
1723 debug!("QueryEngine: Using legacy columns path");
1724 let source_table = view.source();
1727 let column_indices =
1728 self.resolve_column_indices(source_table, &statement.columns)?;
1729 view = view.with_columns(column_indices);
1730 }
1731 }
1732
1733 if statement.distinct {
1735 plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
1736 plan.set_rows_in(view.row_count());
1737 plan.add_detail(format!("Input: {} rows", view.row_count()));
1738
1739 let distinct_start = Instant::now();
1740 view = self.apply_distinct(view)?;
1741
1742 plan.set_rows_out(view.row_count());
1743 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1744 plan.add_detail(format!(
1745 "Distinct time: {:.3}ms",
1746 distinct_start.elapsed().as_secs_f64() * 1000.0
1747 ));
1748 plan.end_step();
1749 }
1750
1751 if let Some(order_by_columns) = &statement.order_by {
1753 if !order_by_columns.is_empty() {
1754 plan.begin_step(
1755 StepType::Sort,
1756 format!("ORDER BY {} columns", order_by_columns.len()),
1757 );
1758 plan.set_rows_in(view.row_count());
1759 for col in order_by_columns {
1760 let expr_str = match &col.expr {
1762 SqlExpression::Column(col_ref) => col_ref.name.clone(),
1763 _ => "expr".to_string(),
1764 };
1765 plan.add_detail(format!("{} {:?}", expr_str, col.direction));
1766 }
1767
1768 let sort_start = Instant::now();
1769 view =
1770 self.apply_multi_order_by_with_context(view, order_by_columns, exec_context)?;
1771
1772 plan.add_detail(format!(
1773 "Sort time: {:.3}ms",
1774 sort_start.elapsed().as_secs_f64() * 1000.0
1775 ));
1776 plan.end_step();
1777 }
1778 }
1779
1780 if let Some(limit) = statement.limit {
1782 let offset = statement.offset.unwrap_or(0);
1783 plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1784 plan.set_rows_in(view.row_count());
1785 if offset > 0 {
1786 plan.add_detail(format!("OFFSET: {}", offset));
1787 }
1788 view = view.with_limit(limit, offset);
1789 plan.set_rows_out(view.row_count());
1790 plan.add_detail(format!("Output: {} rows", view.row_count()));
1791 plan.end_step();
1792 }
1793
1794 if !statement.set_operations.is_empty() {
1796 plan.begin_step(
1797 StepType::SetOperation,
1798 format!("Process {} set operations", statement.set_operations.len()),
1799 );
1800 plan.set_rows_in(view.row_count());
1801
1802 let mut combined_table = self.materialize_view(view)?;
1804 let first_columns = combined_table.column_names();
1805 let first_column_count = first_columns.len();
1806
1807 let mut needs_deduplication = false;
1809
1810 for (idx, (operation, next_statement)) in statement.set_operations.iter().enumerate() {
1812 let op_start = Instant::now();
1813 plan.begin_step(
1814 StepType::SetOperation,
1815 format!("{:?} operation #{}", operation, idx + 1),
1816 );
1817
1818 let next_view = if let Some(exec_ctx) = exec_context {
1821 self.build_view_internal_with_plan_and_exec(
1822 table.clone(),
1823 *next_statement.clone(),
1824 plan,
1825 Some(exec_ctx),
1826 )?
1827 } else {
1828 self.build_view_internal_with_plan(
1829 table.clone(),
1830 *next_statement.clone(),
1831 plan,
1832 )?
1833 };
1834
1835 let next_table = self.materialize_view(next_view)?;
1837 let next_columns = next_table.column_names();
1838 let next_column_count = next_columns.len();
1839
1840 if first_column_count != next_column_count {
1842 return Err(anyhow!(
1843 "UNION queries must have the same number of columns: first query has {} columns, but query #{} has {} columns",
1844 first_column_count,
1845 idx + 2,
1846 next_column_count
1847 ));
1848 }
1849
1850 for (col_idx, (first_col, next_col)) in
1852 first_columns.iter().zip(next_columns.iter()).enumerate()
1853 {
1854 if !first_col.eq_ignore_ascii_case(next_col) {
1855 debug!(
1856 "UNION column name mismatch at position {}: '{}' vs '{}' (using first query's name)",
1857 col_idx + 1,
1858 first_col,
1859 next_col
1860 );
1861 }
1862 }
1863
1864 plan.add_detail(format!("Left: {} rows", combined_table.row_count()));
1865 plan.add_detail(format!("Right: {} rows", next_table.row_count()));
1866
1867 match operation {
1869 SetOperation::UnionAll => {
1870 for row in next_table.rows.iter() {
1872 combined_table.add_row(row.clone());
1873 }
1874 plan.add_detail(format!(
1875 "Result: {} rows (no deduplication)",
1876 combined_table.row_count()
1877 ));
1878 }
1879 SetOperation::Union => {
1880 for row in next_table.rows.iter() {
1882 combined_table.add_row(row.clone());
1883 }
1884 needs_deduplication = true;
1885 plan.add_detail(format!(
1886 "Combined: {} rows (deduplication pending)",
1887 combined_table.row_count()
1888 ));
1889 }
1890 SetOperation::Intersect => {
1891 return Err(anyhow!("INTERSECT is not yet implemented"));
1894 }
1895 SetOperation::Except => {
1896 return Err(anyhow!("EXCEPT is not yet implemented"));
1899 }
1900 }
1901
1902 plan.add_detail(format!(
1903 "Operation time: {:.3}ms",
1904 op_start.elapsed().as_secs_f64() * 1000.0
1905 ));
1906 plan.set_rows_out(combined_table.row_count());
1907 plan.end_step();
1908 }
1909
1910 plan.set_rows_out(combined_table.row_count());
1911 plan.add_detail(format!(
1912 "Combined result: {} rows after {} operations",
1913 combined_table.row_count(),
1914 statement.set_operations.len()
1915 ));
1916 plan.end_step();
1917
1918 view = DataView::new(Arc::new(combined_table));
1920
1921 if needs_deduplication {
1923 plan.begin_step(
1924 StepType::Distinct,
1925 "UNION deduplication - remove duplicate rows".to_string(),
1926 );
1927 plan.set_rows_in(view.row_count());
1928 plan.add_detail(format!("Input: {} rows", view.row_count()));
1929
1930 let distinct_start = Instant::now();
1931 view = self.apply_distinct(view)?;
1932
1933 plan.set_rows_out(view.row_count());
1934 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1935 plan.add_detail(format!(
1936 "Deduplication time: {:.3}ms",
1937 distinct_start.elapsed().as_secs_f64() * 1000.0
1938 ));
1939 plan.end_step();
1940 }
1941 }
1942
1943 Ok(view)
1944 }
1945
1946 fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1948 let mut indices = Vec::new();
1949 let table_columns = table.column_names();
1950
1951 for col_name in columns {
1952 let index = table_columns
1953 .iter()
1954 .position(|c| c.eq_ignore_ascii_case(col_name))
1955 .ok_or_else(|| {
1956 let suggestion = self.find_similar_column(table, col_name);
1957 match suggestion {
1958 Some(similar) => anyhow::anyhow!(
1959 "Column '{}' not found. Did you mean '{}'?",
1960 col_name,
1961 similar
1962 ),
1963 None => anyhow::anyhow!("Column '{}' not found", col_name),
1964 }
1965 })?;
1966 indices.push(index);
1967 }
1968
1969 Ok(indices)
1970 }
1971
1972 fn apply_select_items(
1974 &self,
1975 view: DataView,
1976 select_items: &[SelectItem],
1977 _statement: &SelectStatement,
1978 exec_context: Option<&ExecutionContext>,
1979 plan: &mut ExecutionPlanBuilder,
1980 ) -> Result<DataView> {
1981 debug!(
1982 "QueryEngine::apply_select_items - items: {:?}",
1983 select_items
1984 );
1985 debug!(
1986 "QueryEngine::apply_select_items - input view has {} rows",
1987 view.row_count()
1988 );
1989
1990 let has_window_functions = select_items.iter().any(|item| match item {
1992 SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
1993 _ => false,
1994 });
1995
1996 let window_func_count: usize = select_items
1998 .iter()
1999 .filter(|item| match item {
2000 SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
2001 _ => false,
2002 })
2003 .count();
2004
2005 let window_start = if has_window_functions {
2007 debug!(
2008 "QueryEngine::apply_select_items - detected {} window functions",
2009 window_func_count
2010 );
2011
2012 let window_specs = Self::extract_window_specs(select_items);
2014 debug!("Extracted {} window function specs", window_specs.len());
2015
2016 Some(Instant::now())
2017 } else {
2018 None
2019 };
2020
2021 let has_unnest = select_items.iter().any(|item| match item {
2023 SelectItem::Expression { expr, .. } => Self::contains_unnest(expr),
2024 _ => false,
2025 });
2026
2027 if has_unnest {
2028 debug!("QueryEngine::apply_select_items - UNNEST detected, using row expansion");
2029 return self.apply_select_with_row_expansion(view, select_items);
2030 }
2031
2032 let has_aggregates = select_items.iter().any(|item| match item {
2036 SelectItem::Expression { expr, .. } => contains_aggregate(expr),
2037 SelectItem::Column { .. } => false,
2038 SelectItem::Star { .. } => false,
2039 SelectItem::StarExclude { .. } => false,
2040 });
2041
2042 let all_aggregate_compatible = select_items.iter().all(|item| match item {
2043 SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
2044 SelectItem::Column { .. } => false, SelectItem::Star { .. } => false, SelectItem::StarExclude { .. } => false, });
2048
2049 if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
2050 debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
2053 return self.apply_aggregate_select(view, select_items);
2054 }
2055
2056 let has_computed_expressions = select_items
2058 .iter()
2059 .any(|item| matches!(item, SelectItem::Expression { .. }));
2060
2061 debug!(
2062 "QueryEngine::apply_select_items - has_computed_expressions: {}",
2063 has_computed_expressions
2064 );
2065
2066 if !has_computed_expressions {
2067 let column_indices = self.resolve_select_columns(view.source(), select_items)?;
2069 return Ok(view.with_columns(column_indices));
2070 }
2071
2072 let source_table = view.source();
2077 let visible_rows = view.visible_row_indices();
2078
2079 let mut computed_table = DataTable::new("query_result");
2082
2083 let mut expanded_items = Vec::new();
2085 for item in select_items {
2086 match item {
2087 SelectItem::Star { table_prefix, .. } => {
2088 if let Some(prefix) = table_prefix {
2089 debug!("QueryEngine::apply_select_items - expanding {}.*", prefix);
2091 for col in &source_table.columns {
2092 if Self::column_matches_table(col, prefix) {
2093 expanded_items.push(SelectItem::Column {
2094 column: ColumnRef::unquoted(col.name.clone()),
2095 leading_comments: vec![],
2096 trailing_comment: None,
2097 });
2098 }
2099 }
2100 } else {
2101 debug!("QueryEngine::apply_select_items - expanding *");
2103 for col_name in source_table.column_names() {
2104 expanded_items.push(SelectItem::Column {
2105 column: ColumnRef::unquoted(col_name.to_string()),
2106 leading_comments: vec![],
2107 trailing_comment: None,
2108 });
2109 }
2110 }
2111 }
2112 _ => expanded_items.push(item.clone()),
2113 }
2114 }
2115
2116 let mut column_name_counts: std::collections::HashMap<String, usize> =
2118 std::collections::HashMap::new();
2119
2120 for item in &expanded_items {
2121 let base_name = match item {
2122 SelectItem::Column {
2123 column: col_ref, ..
2124 } => col_ref.name.clone(),
2125 SelectItem::Expression { alias, .. } => alias.clone(),
2126 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2127 SelectItem::StarExclude { .. } => {
2128 unreachable!("StarExclude should have been expanded")
2129 }
2130 };
2131
2132 let count = column_name_counts.entry(base_name.clone()).or_insert(0);
2134 let column_name = if *count == 0 {
2135 base_name.clone()
2137 } else {
2138 format!("{base_name}_{count}")
2140 };
2141 *count += 1;
2142
2143 computed_table.add_column(DataColumn::new(&column_name));
2144 }
2145
2146 let can_use_batch = expanded_items.iter().all(|item| {
2150 match item {
2151 SelectItem::Expression { expr, .. } => {
2152 matches!(expr, SqlExpression::WindowFunction { .. })
2155 || !Self::contains_window_function(expr)
2156 }
2157 _ => true, }
2159 });
2160
2161 let use_batch_evaluation = can_use_batch
2164 && std::env::var("SQL_CLI_BATCH_WINDOW")
2165 .map(|v| v != "0" && v.to_lowercase() != "false")
2166 .unwrap_or(true);
2167
2168 let batch_window_specs = if use_batch_evaluation && has_window_functions {
2170 debug!("BATCH window function evaluation flag is enabled");
2171 let specs = Self::extract_window_specs(&expanded_items);
2173 debug!(
2174 "Extracted {} window function specs for batch evaluation",
2175 specs.len()
2176 );
2177 Some(specs)
2178 } else {
2179 None
2180 };
2181
2182 let mut evaluator =
2184 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2185
2186 if let Some(exec_ctx) = exec_context {
2188 let aliases = exec_ctx.get_aliases();
2189 if !aliases.is_empty() {
2190 debug!(
2191 "Applying {} aliases to evaluator: {:?}",
2192 aliases.len(),
2193 aliases
2194 );
2195 evaluator = evaluator.with_table_aliases(aliases);
2196 }
2197 }
2198
2199 if has_window_functions {
2202 let preload_start = Instant::now();
2203
2204 let mut window_specs = Vec::new();
2206 for item in &expanded_items {
2207 if let SelectItem::Expression { expr, .. } = item {
2208 Self::collect_window_specs(expr, &mut window_specs);
2209 }
2210 }
2211
2212 for spec in &window_specs {
2214 let _ = evaluator.get_or_create_window_context(spec);
2215 }
2216
2217 debug!(
2218 "Pre-created {} WindowContext(s) in {:.2}ms",
2219 window_specs.len(),
2220 preload_start.elapsed().as_secs_f64() * 1000.0
2221 );
2222 }
2223
2224 if let Some(window_specs) = batch_window_specs {
2226 debug!("Starting batch window function evaluation");
2227 let batch_start = Instant::now();
2228
2229 let mut batch_results: Vec<Vec<DataValue>> =
2231 vec![vec![DataValue::Null; expanded_items.len()]; visible_rows.len()];
2232
2233 let detailed_window_specs = &window_specs;
2235
2236 let mut specs_by_window: HashMap<
2238 u64,
2239 Vec<&crate::data::batch_window_evaluator::WindowFunctionSpec>,
2240 > = HashMap::new();
2241 for spec in detailed_window_specs {
2242 let hash = spec.spec.compute_hash();
2243 specs_by_window
2244 .entry(hash)
2245 .or_insert_with(Vec::new)
2246 .push(spec);
2247 }
2248
2249 for (_window_hash, specs) in specs_by_window {
2251 let context = evaluator.get_or_create_window_context(&specs[0].spec)?;
2253
2254 for spec in specs {
2256 match spec.function_name.as_str() {
2257 "LAG" => {
2258 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2260 let column_name = col_ref.name.as_str();
2261 let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2262 spec.args.get(1)
2263 {
2264 n.parse::<i64>().unwrap_or(1)
2265 } else {
2266 1 };
2268
2269 let values = context.evaluate_lag_batch(
2270 visible_rows,
2271 column_name,
2272 offset,
2273 )?;
2274
2275 for (row_idx, value) in values.into_iter().enumerate() {
2277 batch_results[row_idx][spec.output_column_index] = value;
2278 }
2279 }
2280 }
2281 "LEAD" => {
2282 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2284 let column_name = col_ref.name.as_str();
2285 let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2286 spec.args.get(1)
2287 {
2288 n.parse::<i64>().unwrap_or(1)
2289 } else {
2290 1 };
2292
2293 let values = context.evaluate_lead_batch(
2294 visible_rows,
2295 column_name,
2296 offset,
2297 )?;
2298
2299 for (row_idx, value) in values.into_iter().enumerate() {
2301 batch_results[row_idx][spec.output_column_index] = value;
2302 }
2303 }
2304 }
2305 "ROW_NUMBER" => {
2306 let values = context.evaluate_row_number_batch(visible_rows)?;
2307
2308 for (row_idx, value) in values.into_iter().enumerate() {
2310 batch_results[row_idx][spec.output_column_index] = value;
2311 }
2312 }
2313 "RANK" => {
2314 let values = context.evaluate_rank_batch(visible_rows)?;
2315
2316 for (row_idx, value) in values.into_iter().enumerate() {
2318 batch_results[row_idx][spec.output_column_index] = value;
2319 }
2320 }
2321 "DENSE_RANK" => {
2322 let values = context.evaluate_dense_rank_batch(visible_rows)?;
2323
2324 for (row_idx, value) in values.into_iter().enumerate() {
2326 batch_results[row_idx][spec.output_column_index] = value;
2327 }
2328 }
2329 "SUM" => {
2330 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2331 let column_name = col_ref.name.as_str();
2332 let values =
2333 context.evaluate_sum_batch(visible_rows, column_name)?;
2334
2335 for (row_idx, value) in values.into_iter().enumerate() {
2336 batch_results[row_idx][spec.output_column_index] = value;
2337 }
2338 }
2339 }
2340 "AVG" => {
2341 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2342 let column_name = col_ref.name.as_str();
2343 let values =
2344 context.evaluate_avg_batch(visible_rows, column_name)?;
2345
2346 for (row_idx, value) in values.into_iter().enumerate() {
2347 batch_results[row_idx][spec.output_column_index] = value;
2348 }
2349 }
2350 }
2351 "MIN" => {
2352 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2353 let column_name = col_ref.name.as_str();
2354 let values =
2355 context.evaluate_min_batch(visible_rows, column_name)?;
2356
2357 for (row_idx, value) in values.into_iter().enumerate() {
2358 batch_results[row_idx][spec.output_column_index] = value;
2359 }
2360 }
2361 }
2362 "MAX" => {
2363 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2364 let column_name = col_ref.name.as_str();
2365 let values =
2366 context.evaluate_max_batch(visible_rows, column_name)?;
2367
2368 for (row_idx, value) in values.into_iter().enumerate() {
2369 batch_results[row_idx][spec.output_column_index] = value;
2370 }
2371 }
2372 }
2373 "COUNT" => {
2374 let column_name = match spec.args.get(0) {
2376 Some(SqlExpression::Column(col_ref)) => Some(col_ref.name.as_str()),
2377 Some(SqlExpression::StringLiteral(s)) if s == "*" => None,
2378 _ => None,
2379 };
2380
2381 let values = context.evaluate_count_batch(visible_rows, column_name)?;
2382
2383 for (row_idx, value) in values.into_iter().enumerate() {
2384 batch_results[row_idx][spec.output_column_index] = value;
2385 }
2386 }
2387 "FIRST_VALUE" => {
2388 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2389 let column_name = col_ref.name.as_str();
2390 let values = context
2391 .evaluate_first_value_batch(visible_rows, column_name)?;
2392
2393 for (row_idx, value) in values.into_iter().enumerate() {
2394 batch_results[row_idx][spec.output_column_index] = value;
2395 }
2396 }
2397 }
2398 "LAST_VALUE" => {
2399 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2400 let column_name = col_ref.name.as_str();
2401 let values =
2402 context.evaluate_last_value_batch(visible_rows, column_name)?;
2403
2404 for (row_idx, value) in values.into_iter().enumerate() {
2405 batch_results[row_idx][spec.output_column_index] = value;
2406 }
2407 }
2408 }
2409 _ => {
2410 debug!(
2412 "Window function {} not supported in batch mode, using per-row",
2413 spec.function_name
2414 );
2415 }
2416 }
2417 }
2418 }
2419
2420 for (result_row_idx, &source_row_idx) in visible_rows.iter().enumerate() {
2422 for (col_idx, item) in expanded_items.iter().enumerate() {
2423 if !matches!(batch_results[result_row_idx][col_idx], DataValue::Null) {
2425 continue;
2426 }
2427
2428 let value = match item {
2429 SelectItem::Column {
2430 column: col_ref, ..
2431 } => {
2432 match evaluator
2433 .evaluate(&SqlExpression::Column(col_ref.clone()), source_row_idx)
2434 {
2435 Ok(val) => val,
2436 Err(e) => {
2437 return Err(anyhow!(
2438 "Failed to evaluate column {}: {}",
2439 col_ref.to_sql(),
2440 e
2441 ));
2442 }
2443 }
2444 }
2445 SelectItem::Expression { expr, .. } => {
2446 if matches!(expr, SqlExpression::WindowFunction { .. }) {
2449 continue;
2451 }
2452 evaluator.evaluate(&expr, source_row_idx)?
2455 }
2456 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2457 SelectItem::StarExclude { .. } => {
2458 unreachable!("StarExclude should have been expanded")
2459 }
2460 };
2461 batch_results[result_row_idx][col_idx] = value;
2462 }
2463 }
2464
2465 for row_values in batch_results {
2467 computed_table
2468 .add_row(DataRow::new(row_values))
2469 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2470 }
2471
2472 debug!(
2473 "Batch window evaluation completed in {:.3}ms",
2474 batch_start.elapsed().as_secs_f64() * 1000.0
2475 );
2476 } else {
2477 for &row_idx in visible_rows {
2479 let mut row_values = Vec::new();
2480
2481 for item in &expanded_items {
2482 let value = match item {
2483 SelectItem::Column {
2484 column: col_ref, ..
2485 } => {
2486 match evaluator
2488 .evaluate(&SqlExpression::Column(col_ref.clone()), row_idx)
2489 {
2490 Ok(val) => val,
2491 Err(e) => {
2492 return Err(anyhow!(
2493 "Failed to evaluate column {}: {}",
2494 col_ref.to_sql(),
2495 e
2496 ));
2497 }
2498 }
2499 }
2500 SelectItem::Expression { expr, .. } => {
2501 evaluator.evaluate(&expr, row_idx)?
2503 }
2504 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2505 SelectItem::StarExclude { .. } => {
2506 unreachable!("StarExclude should have been expanded")
2507 }
2508 };
2509 row_values.push(value);
2510 }
2511
2512 computed_table
2513 .add_row(DataRow::new(row_values))
2514 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2515 }
2516 }
2517
2518 if let Some(start) = window_start {
2520 let window_duration = start.elapsed();
2521 info!(
2522 "Window function evaluation took {:.2}ms for {} rows ({} window functions)",
2523 window_duration.as_secs_f64() * 1000.0,
2524 visible_rows.len(),
2525 window_func_count
2526 );
2527
2528 plan.begin_step(
2530 StepType::WindowFunction,
2531 format!("Evaluate {} window function(s)", window_func_count),
2532 );
2533 plan.set_rows_in(visible_rows.len());
2534 plan.set_rows_out(visible_rows.len());
2535 plan.add_detail(format!("Input: {} rows", visible_rows.len()));
2536 plan.add_detail(format!("{} window functions evaluated", window_func_count));
2537 plan.add_detail(format!(
2538 "Evaluation time: {:.3}ms",
2539 window_duration.as_secs_f64() * 1000.0
2540 ));
2541 plan.end_step();
2542 }
2543
2544 Ok(DataView::new(Arc::new(computed_table)))
2547 }
2548
2549 fn apply_select_with_row_expansion(
2551 &self,
2552 view: DataView,
2553 select_items: &[SelectItem],
2554 ) -> Result<DataView> {
2555 debug!("QueryEngine::apply_select_with_row_expansion - expanding rows");
2556
2557 let source_table = view.source();
2558 let visible_rows = view.visible_row_indices();
2559 let expander_registry = RowExpanderRegistry::new();
2560
2561 let mut result_table = DataTable::new("unnest_result");
2563
2564 let mut expanded_items = Vec::new();
2566 for item in select_items {
2567 match item {
2568 SelectItem::Star { table_prefix, .. } => {
2569 if let Some(prefix) = table_prefix {
2570 debug!(
2572 "QueryEngine::apply_select_with_row_expansion - expanding {}.*",
2573 prefix
2574 );
2575 for col in &source_table.columns {
2576 if Self::column_matches_table(col, prefix) {
2577 expanded_items.push(SelectItem::Column {
2578 column: ColumnRef::unquoted(col.name.clone()),
2579 leading_comments: vec![],
2580 trailing_comment: None,
2581 });
2582 }
2583 }
2584 } else {
2585 debug!("QueryEngine::apply_select_with_row_expansion - expanding *");
2587 for col_name in source_table.column_names() {
2588 expanded_items.push(SelectItem::Column {
2589 column: ColumnRef::unquoted(col_name.to_string()),
2590 leading_comments: vec![],
2591 trailing_comment: None,
2592 });
2593 }
2594 }
2595 }
2596 _ => expanded_items.push(item.clone()),
2597 }
2598 }
2599
2600 for item in &expanded_items {
2602 let column_name = match item {
2603 SelectItem::Column {
2604 column: col_ref, ..
2605 } => col_ref.name.clone(),
2606 SelectItem::Expression { alias, .. } => alias.clone(),
2607 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2608 SelectItem::StarExclude { .. } => {
2609 unreachable!("StarExclude should have been expanded")
2610 }
2611 };
2612 result_table.add_column(DataColumn::new(&column_name));
2613 }
2614
2615 let mut evaluator =
2617 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2618
2619 for &row_idx in visible_rows {
2620 let mut unnest_expansions = Vec::new();
2622 let mut unnest_indices = Vec::new();
2623
2624 for (col_idx, item) in expanded_items.iter().enumerate() {
2625 if let SelectItem::Expression { expr, .. } = item {
2626 if let Some(expansion_result) = self.try_expand_unnest(
2627 &expr,
2628 source_table,
2629 row_idx,
2630 &mut evaluator,
2631 &expander_registry,
2632 )? {
2633 unnest_expansions.push(expansion_result);
2634 unnest_indices.push(col_idx);
2635 }
2636 }
2637 }
2638
2639 let expansion_count = if unnest_expansions.is_empty() {
2641 1 } else {
2643 unnest_expansions
2644 .iter()
2645 .map(|exp| exp.row_count())
2646 .max()
2647 .unwrap_or(1)
2648 };
2649
2650 for output_idx in 0..expansion_count {
2652 let mut row_values = Vec::new();
2653
2654 for (col_idx, item) in expanded_items.iter().enumerate() {
2655 let unnest_position = unnest_indices.iter().position(|&idx| idx == col_idx);
2657
2658 let value = if let Some(unnest_idx) = unnest_position {
2659 let expansion = &unnest_expansions[unnest_idx];
2661 expansion
2662 .values
2663 .get(output_idx)
2664 .cloned()
2665 .unwrap_or(DataValue::Null)
2666 } else {
2667 match item {
2669 SelectItem::Column {
2670 column: col_ref, ..
2671 } => {
2672 let col_idx =
2673 source_table.get_column_index(&col_ref.name).ok_or_else(
2674 || anyhow::anyhow!("Column '{}' not found", col_ref.name),
2675 )?;
2676 let row = source_table
2677 .get_row(row_idx)
2678 .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
2679 row.get(col_idx)
2680 .ok_or_else(|| {
2681 anyhow::anyhow!("Column {} not found in row", col_idx)
2682 })?
2683 .clone()
2684 }
2685 SelectItem::Expression { expr, .. } => {
2686 evaluator.evaluate(&expr, row_idx)?
2688 }
2689 SelectItem::Star { .. } => unreachable!(),
2690 SelectItem::StarExclude { .. } => {
2691 unreachable!("StarExclude should have been expanded")
2692 }
2693 }
2694 };
2695
2696 row_values.push(value);
2697 }
2698
2699 result_table
2700 .add_row(DataRow::new(row_values))
2701 .map_err(|e| anyhow::anyhow!("Failed to add expanded row: {}", e))?;
2702 }
2703 }
2704
2705 debug!(
2706 "QueryEngine::apply_select_with_row_expansion - input rows: {}, output rows: {}",
2707 visible_rows.len(),
2708 result_table.row_count()
2709 );
2710
2711 Ok(DataView::new(Arc::new(result_table)))
2712 }
2713
2714 fn try_expand_unnest(
2717 &self,
2718 expr: &SqlExpression,
2719 _source_table: &DataTable,
2720 row_idx: usize,
2721 evaluator: &mut ArithmeticEvaluator,
2722 expander_registry: &RowExpanderRegistry,
2723 ) -> Result<Option<crate::data::row_expanders::ExpansionResult>> {
2724 if let SqlExpression::Unnest { column, delimiter } = expr {
2726 let column_value = evaluator.evaluate(column, row_idx)?;
2728
2729 let delimiter_value = DataValue::String(delimiter.clone());
2731
2732 let expander = expander_registry
2734 .get("UNNEST")
2735 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2736
2737 let expansion = expander.expand(&column_value, &[delimiter_value])?;
2739 return Ok(Some(expansion));
2740 }
2741
2742 if let SqlExpression::FunctionCall { name, args, .. } = expr {
2744 if name.to_uppercase() == "UNNEST" {
2745 if args.len() != 2 {
2747 return Err(anyhow::anyhow!(
2748 "UNNEST requires exactly 2 arguments: UNNEST(column, delimiter)"
2749 ));
2750 }
2751
2752 let column_value = evaluator.evaluate(&args[0], row_idx)?;
2754
2755 let delimiter_value = evaluator.evaluate(&args[1], row_idx)?;
2757
2758 let expander = expander_registry
2760 .get("UNNEST")
2761 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2762
2763 let expansion = expander.expand(&column_value, &[delimiter_value])?;
2765 return Ok(Some(expansion));
2766 }
2767 }
2768
2769 Ok(None)
2770 }
2771
2772 fn apply_aggregate_select(
2774 &self,
2775 view: DataView,
2776 select_items: &[SelectItem],
2777 ) -> Result<DataView> {
2778 debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
2779
2780 let source_table = view.source();
2781 let mut result_table = DataTable::new("aggregate_result");
2782
2783 for item in select_items {
2785 let column_name = match item {
2786 SelectItem::Expression { alias, .. } => alias.clone(),
2787 _ => unreachable!("Should only have expressions in aggregate-only query"),
2788 };
2789 result_table.add_column(DataColumn::new(&column_name));
2790 }
2791
2792 let visible_rows = view.visible_row_indices().to_vec();
2794 let mut evaluator =
2795 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
2796 .with_visible_rows(visible_rows);
2797
2798 let mut row_values = Vec::new();
2800 for item in select_items {
2801 match item {
2802 SelectItem::Expression { expr, .. } => {
2803 let value = evaluator.evaluate(expr, 0)?;
2806 row_values.push(value);
2807 }
2808 _ => unreachable!("Should only have expressions in aggregate-only query"),
2809 }
2810 }
2811
2812 result_table
2814 .add_row(DataRow::new(row_values))
2815 .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
2816
2817 Ok(DataView::new(Arc::new(result_table)))
2818 }
2819
2820 fn column_matches_table(col: &DataColumn, table_name: &str) -> bool {
2832 if let Some(ref source) = col.source_table {
2834 if source == table_name || source.ends_with(&format!(".{}", table_name)) {
2836 return true;
2837 }
2838 }
2839
2840 if let Some(ref qualified) = col.qualified_name {
2842 if qualified.starts_with(&format!("{}.", table_name)) {
2844 return true;
2845 }
2846 }
2847
2848 false
2849 }
2850
2851 fn resolve_select_columns(
2853 &self,
2854 table: &DataTable,
2855 select_items: &[SelectItem],
2856 ) -> Result<Vec<usize>> {
2857 let mut indices = Vec::new();
2858 let table_columns = table.column_names();
2859
2860 for item in select_items {
2861 match item {
2862 SelectItem::Column {
2863 column: col_ref, ..
2864 } => {
2865 let index = if let Some(table_prefix) = &col_ref.table_prefix {
2867 let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
2869 table.find_column_by_qualified_name(&qualified_name)
2870 .ok_or_else(|| {
2871 let has_qualified = table.columns.iter()
2873 .any(|c| c.qualified_name.is_some());
2874 if !has_qualified {
2875 anyhow::anyhow!(
2876 "Column '{}' not found. Note: Table '{}' may not support qualified column names",
2877 qualified_name, table_prefix
2878 )
2879 } else {
2880 anyhow::anyhow!("Column '{}' not found", qualified_name)
2881 }
2882 })?
2883 } else {
2884 table_columns
2886 .iter()
2887 .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
2888 .ok_or_else(|| {
2889 let suggestion = self.find_similar_column(table, &col_ref.name);
2890 match suggestion {
2891 Some(similar) => anyhow::anyhow!(
2892 "Column '{}' not found. Did you mean '{}'?",
2893 col_ref.name,
2894 similar
2895 ),
2896 None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
2897 }
2898 })?
2899 };
2900 indices.push(index);
2901 }
2902 SelectItem::Star { table_prefix, .. } => {
2903 if let Some(prefix) = table_prefix {
2904 for (i, col) in table.columns.iter().enumerate() {
2906 if Self::column_matches_table(col, prefix) {
2907 indices.push(i);
2908 }
2909 }
2910 } else {
2911 for i in 0..table_columns.len() {
2913 indices.push(i);
2914 }
2915 }
2916 }
2917 SelectItem::StarExclude {
2918 table_prefix,
2919 excluded_columns,
2920 ..
2921 } => {
2922 if let Some(prefix) = table_prefix {
2924 for (i, col) in table.columns.iter().enumerate() {
2926 if Self::column_matches_table(col, prefix)
2927 && !excluded_columns.contains(&col.name)
2928 {
2929 indices.push(i);
2930 }
2931 }
2932 } else {
2933 for (i, col_name) in table_columns.iter().enumerate() {
2935 if !excluded_columns
2936 .iter()
2937 .any(|exc| exc.eq_ignore_ascii_case(col_name))
2938 {
2939 indices.push(i);
2940 }
2941 }
2942 }
2943 }
2944 SelectItem::Expression { .. } => {
2945 return Err(anyhow::anyhow!(
2946 "Computed expressions require new table creation"
2947 ));
2948 }
2949 }
2950 }
2951
2952 Ok(indices)
2953 }
2954
2955 fn apply_distinct(&self, view: DataView) -> Result<DataView> {
2957 use std::collections::HashSet;
2958
2959 let source = view.source();
2960 let visible_cols = view.visible_column_indices();
2961 let visible_rows = view.visible_row_indices();
2962
2963 let mut seen_rows = HashSet::new();
2965 let mut unique_row_indices = Vec::new();
2966
2967 for &row_idx in visible_rows {
2968 let mut row_key = Vec::new();
2970 for &col_idx in visible_cols {
2971 let value = source
2972 .get_value(row_idx, col_idx)
2973 .ok_or_else(|| anyhow!("Invalid cell reference"))?;
2974 row_key.push(format!("{:?}", value));
2976 }
2977
2978 if seen_rows.insert(row_key) {
2980 unique_row_indices.push(row_idx);
2982 }
2983 }
2984
2985 Ok(view.with_rows(unique_row_indices))
2987 }
2988
2989 fn apply_multi_order_by(
2991 &self,
2992 view: DataView,
2993 order_by_columns: &[OrderByItem],
2994 ) -> Result<DataView> {
2995 self.apply_multi_order_by_with_context(view, order_by_columns, None)
2996 }
2997
2998 fn apply_multi_order_by_with_context(
3000 &self,
3001 mut view: DataView,
3002 order_by_columns: &[OrderByItem],
3003 _exec_context: Option<&ExecutionContext>,
3004 ) -> Result<DataView> {
3005 let mut sort_columns = Vec::new();
3007
3008 for order_col in order_by_columns {
3009 let column_name = match &order_col.expr {
3011 SqlExpression::Column(col_ref) => col_ref.name.clone(),
3012 _ => {
3013 return Err(anyhow!(
3015 "ORDER BY expressions not yet supported - only simple columns allowed"
3016 ));
3017 }
3018 };
3019
3020 let col_index = if column_name.contains('.') {
3022 if let Some(dot_pos) = column_name.rfind('.') {
3024 let col_name = &column_name[dot_pos + 1..];
3025
3026 debug!(
3029 "ORDER BY: Extracting unqualified column '{}' from '{}'",
3030 col_name, column_name
3031 );
3032 view.source().get_column_index(col_name)
3033 } else {
3034 view.source().get_column_index(&column_name)
3035 }
3036 } else {
3037 view.source().get_column_index(&column_name)
3039 }
3040 .ok_or_else(|| {
3041 let suggestion = self.find_similar_column(view.source(), &column_name);
3043 match suggestion {
3044 Some(similar) => anyhow::anyhow!(
3045 "Column '{}' not found. Did you mean '{}'?",
3046 column_name,
3047 similar
3048 ),
3049 None => {
3050 let available_cols = view.source().column_names().join(", ");
3052 anyhow::anyhow!(
3053 "Column '{}' not found. Available columns: {}",
3054 column_name,
3055 available_cols
3056 )
3057 }
3058 }
3059 })?;
3060
3061 let ascending = matches!(order_col.direction, SortDirection::Asc);
3062 sort_columns.push((col_index, ascending));
3063 }
3064
3065 view.apply_multi_sort(&sort_columns)?;
3067 Ok(view)
3068 }
3069
3070 fn apply_group_by(
3072 &self,
3073 view: DataView,
3074 group_by_exprs: &[SqlExpression],
3075 select_items: &[SelectItem],
3076 having: Option<&SqlExpression>,
3077 plan: &mut ExecutionPlanBuilder,
3078 ) -> Result<DataView> {
3079 let (result_view, phase_info) = self.apply_group_by_expressions(
3081 view,
3082 group_by_exprs,
3083 select_items,
3084 having,
3085 self.case_insensitive,
3086 self.date_notation.clone(),
3087 )?;
3088
3089 plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
3091 plan.add_detail(format!(
3092 "Phase 1 - Group Building: {:.3}ms",
3093 phase_info.phase2_key_building.as_secs_f64() * 1000.0
3094 ));
3095 plan.add_detail(format!(
3096 " • Processing {} rows into {} groups",
3097 phase_info.total_rows, phase_info.num_groups
3098 ));
3099 plan.add_detail(format!(
3100 "Phase 2 - Aggregation: {:.3}ms",
3101 phase_info.phase4_aggregation.as_secs_f64() * 1000.0
3102 ));
3103 if phase_info.phase4_having_evaluation > Duration::ZERO {
3104 plan.add_detail(format!(
3105 "Phase 3 - HAVING Filter: {:.3}ms",
3106 phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
3107 ));
3108 plan.add_detail(format!(
3109 " • Filtered {} groups",
3110 phase_info.groups_filtered_by_having
3111 ));
3112 }
3113 plan.add_detail(format!(
3114 "Total GROUP BY time: {:.3}ms",
3115 phase_info.total_time.as_secs_f64() * 1000.0
3116 ));
3117
3118 Ok(result_view)
3119 }
3120
3121 pub fn estimate_group_cardinality(
3124 &self,
3125 view: &DataView,
3126 group_by_exprs: &[SqlExpression],
3127 ) -> usize {
3128 let row_count = view.get_visible_rows().len();
3130 if row_count <= 100 {
3131 return row_count;
3132 }
3133
3134 let sample_size = min(1000, row_count / 10).max(100);
3136 let mut seen = FxHashSet::default();
3137
3138 let visible_rows = view.get_visible_rows();
3139 for (i, &row_idx) in visible_rows.iter().enumerate() {
3140 if i >= sample_size {
3141 break;
3142 }
3143
3144 let mut key_values = Vec::new();
3146 for expr in group_by_exprs {
3147 let mut evaluator = ArithmeticEvaluator::new(view.source());
3148 let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
3149 key_values.push(value);
3150 }
3151
3152 seen.insert(key_values);
3153 }
3154
3155 let sample_cardinality = seen.len();
3157 let estimated = (sample_cardinality * row_count) / sample_size;
3158
3159 estimated.min(row_count).max(sample_cardinality)
3161 }
3162}
3163
3164#[cfg(test)]
3165mod tests {
3166 use super::*;
3167 use crate::data::datatable::{DataColumn, DataRow, DataValue};
3168
3169 fn create_test_table() -> Arc<DataTable> {
3170 let mut table = DataTable::new("test");
3171
3172 table.add_column(DataColumn::new("id"));
3174 table.add_column(DataColumn::new("name"));
3175 table.add_column(DataColumn::new("age"));
3176
3177 table
3179 .add_row(DataRow::new(vec![
3180 DataValue::Integer(1),
3181 DataValue::String("Alice".to_string()),
3182 DataValue::Integer(30),
3183 ]))
3184 .unwrap();
3185
3186 table
3187 .add_row(DataRow::new(vec![
3188 DataValue::Integer(2),
3189 DataValue::String("Bob".to_string()),
3190 DataValue::Integer(25),
3191 ]))
3192 .unwrap();
3193
3194 table
3195 .add_row(DataRow::new(vec![
3196 DataValue::Integer(3),
3197 DataValue::String("Charlie".to_string()),
3198 DataValue::Integer(35),
3199 ]))
3200 .unwrap();
3201
3202 Arc::new(table)
3203 }
3204
3205 #[test]
3206 fn test_select_all() {
3207 let table = create_test_table();
3208 let engine = QueryEngine::new();
3209
3210 let view = engine
3211 .execute(table.clone(), "SELECT * FROM users")
3212 .unwrap();
3213 assert_eq!(view.row_count(), 3);
3214 assert_eq!(view.column_count(), 3);
3215 }
3216
3217 #[test]
3218 fn test_select_columns() {
3219 let table = create_test_table();
3220 let engine = QueryEngine::new();
3221
3222 let view = engine
3223 .execute(table.clone(), "SELECT name, age FROM users")
3224 .unwrap();
3225 assert_eq!(view.row_count(), 3);
3226 assert_eq!(view.column_count(), 2);
3227 }
3228
3229 #[test]
3230 fn test_select_with_limit() {
3231 let table = create_test_table();
3232 let engine = QueryEngine::new();
3233
3234 let view = engine
3235 .execute(table.clone(), "SELECT * FROM users LIMIT 2")
3236 .unwrap();
3237 assert_eq!(view.row_count(), 2);
3238 }
3239
3240 #[test]
3241 fn test_type_coercion_contains() {
3242 let _ = tracing_subscriber::fmt()
3244 .with_max_level(tracing::Level::DEBUG)
3245 .try_init();
3246
3247 let mut table = DataTable::new("test");
3248 table.add_column(DataColumn::new("id"));
3249 table.add_column(DataColumn::new("status"));
3250 table.add_column(DataColumn::new("price"));
3251
3252 table
3254 .add_row(DataRow::new(vec![
3255 DataValue::Integer(1),
3256 DataValue::String("Pending".to_string()),
3257 DataValue::Float(99.99),
3258 ]))
3259 .unwrap();
3260
3261 table
3262 .add_row(DataRow::new(vec![
3263 DataValue::Integer(2),
3264 DataValue::String("Confirmed".to_string()),
3265 DataValue::Float(150.50),
3266 ]))
3267 .unwrap();
3268
3269 table
3270 .add_row(DataRow::new(vec![
3271 DataValue::Integer(3),
3272 DataValue::String("Pending".to_string()),
3273 DataValue::Float(75.00),
3274 ]))
3275 .unwrap();
3276
3277 let table = Arc::new(table);
3278 let engine = QueryEngine::new();
3279
3280 println!("\n=== Testing WHERE clause with Contains ===");
3281 println!("Table has {} rows", table.row_count());
3282 for i in 0..table.row_count() {
3283 let status = table.get_value(i, 1);
3284 println!("Row {i}: status = {status:?}");
3285 }
3286
3287 println!("\n--- Test 1: status.Contains('pend') ---");
3289 let result = engine.execute(
3290 table.clone(),
3291 "SELECT * FROM test WHERE status.Contains('pend')",
3292 );
3293 match result {
3294 Ok(view) => {
3295 println!("SUCCESS: Found {} matching rows", view.row_count());
3296 assert_eq!(view.row_count(), 2); }
3298 Err(e) => {
3299 panic!("Query failed: {e}");
3300 }
3301 }
3302
3303 println!("\n--- Test 2: price.Contains('9') ---");
3305 let result = engine.execute(
3306 table.clone(),
3307 "SELECT * FROM test WHERE price.Contains('9')",
3308 );
3309 match result {
3310 Ok(view) => {
3311 println!(
3312 "SUCCESS: Found {} matching rows with price containing '9'",
3313 view.row_count()
3314 );
3315 assert!(view.row_count() >= 1);
3317 }
3318 Err(e) => {
3319 panic!("Numeric coercion query failed: {e}");
3320 }
3321 }
3322
3323 println!("\n=== All tests passed! ===");
3324 }
3325
3326 #[test]
3327 fn test_not_in_clause() {
3328 let _ = tracing_subscriber::fmt()
3330 .with_max_level(tracing::Level::DEBUG)
3331 .try_init();
3332
3333 let mut table = DataTable::new("test");
3334 table.add_column(DataColumn::new("id"));
3335 table.add_column(DataColumn::new("country"));
3336
3337 table
3339 .add_row(DataRow::new(vec![
3340 DataValue::Integer(1),
3341 DataValue::String("CA".to_string()),
3342 ]))
3343 .unwrap();
3344
3345 table
3346 .add_row(DataRow::new(vec![
3347 DataValue::Integer(2),
3348 DataValue::String("US".to_string()),
3349 ]))
3350 .unwrap();
3351
3352 table
3353 .add_row(DataRow::new(vec![
3354 DataValue::Integer(3),
3355 DataValue::String("UK".to_string()),
3356 ]))
3357 .unwrap();
3358
3359 let table = Arc::new(table);
3360 let engine = QueryEngine::new();
3361
3362 println!("\n=== Testing NOT IN clause ===");
3363 println!("Table has {} rows", table.row_count());
3364 for i in 0..table.row_count() {
3365 let country = table.get_value(i, 1);
3366 println!("Row {i}: country = {country:?}");
3367 }
3368
3369 println!("\n--- Test: country NOT IN ('CA') ---");
3371 let result = engine.execute(
3372 table.clone(),
3373 "SELECT * FROM test WHERE country NOT IN ('CA')",
3374 );
3375 match result {
3376 Ok(view) => {
3377 println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
3378 assert_eq!(view.row_count(), 2); }
3380 Err(e) => {
3381 panic!("NOT IN query failed: {e}");
3382 }
3383 }
3384
3385 println!("\n=== NOT IN test complete! ===");
3386 }
3387
3388 #[test]
3389 fn test_case_insensitive_in_and_not_in() {
3390 let _ = tracing_subscriber::fmt()
3392 .with_max_level(tracing::Level::DEBUG)
3393 .try_init();
3394
3395 let mut table = DataTable::new("test");
3396 table.add_column(DataColumn::new("id"));
3397 table.add_column(DataColumn::new("country"));
3398
3399 table
3401 .add_row(DataRow::new(vec![
3402 DataValue::Integer(1),
3403 DataValue::String("CA".to_string()), ]))
3405 .unwrap();
3406
3407 table
3408 .add_row(DataRow::new(vec![
3409 DataValue::Integer(2),
3410 DataValue::String("us".to_string()), ]))
3412 .unwrap();
3413
3414 table
3415 .add_row(DataRow::new(vec![
3416 DataValue::Integer(3),
3417 DataValue::String("UK".to_string()), ]))
3419 .unwrap();
3420
3421 let table = Arc::new(table);
3422
3423 println!("\n=== Testing Case-Insensitive IN clause ===");
3424 println!("Table has {} rows", table.row_count());
3425 for i in 0..table.row_count() {
3426 let country = table.get_value(i, 1);
3427 println!("Row {i}: country = {country:?}");
3428 }
3429
3430 println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
3432 let engine = QueryEngine::with_case_insensitive(true);
3433 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3434 match result {
3435 Ok(view) => {
3436 println!(
3437 "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
3438 view.row_count()
3439 );
3440 assert_eq!(view.row_count(), 1); }
3442 Err(e) => {
3443 panic!("Case-insensitive IN query failed: {e}");
3444 }
3445 }
3446
3447 println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
3449 let result = engine.execute(
3450 table.clone(),
3451 "SELECT * FROM test WHERE country NOT IN ('ca')",
3452 );
3453 match result {
3454 Ok(view) => {
3455 println!(
3456 "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
3457 view.row_count()
3458 );
3459 assert_eq!(view.row_count(), 2); }
3461 Err(e) => {
3462 panic!("Case-insensitive NOT IN query failed: {e}");
3463 }
3464 }
3465
3466 println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
3468 let engine_case_sensitive = QueryEngine::new(); let result = engine_case_sensitive
3470 .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3471 match result {
3472 Ok(view) => {
3473 println!(
3474 "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
3475 view.row_count()
3476 );
3477 assert_eq!(view.row_count(), 0); }
3479 Err(e) => {
3480 panic!("Case-sensitive IN query failed: {e}");
3481 }
3482 }
3483
3484 println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
3485 }
3486
3487 #[test]
3488 #[ignore = "Parentheses in WHERE clause not yet implemented"]
3489 fn test_parentheses_in_where_clause() {
3490 let _ = tracing_subscriber::fmt()
3492 .with_max_level(tracing::Level::DEBUG)
3493 .try_init();
3494
3495 let mut table = DataTable::new("test");
3496 table.add_column(DataColumn::new("id"));
3497 table.add_column(DataColumn::new("status"));
3498 table.add_column(DataColumn::new("priority"));
3499
3500 table
3502 .add_row(DataRow::new(vec![
3503 DataValue::Integer(1),
3504 DataValue::String("Pending".to_string()),
3505 DataValue::String("High".to_string()),
3506 ]))
3507 .unwrap();
3508
3509 table
3510 .add_row(DataRow::new(vec![
3511 DataValue::Integer(2),
3512 DataValue::String("Complete".to_string()),
3513 DataValue::String("High".to_string()),
3514 ]))
3515 .unwrap();
3516
3517 table
3518 .add_row(DataRow::new(vec![
3519 DataValue::Integer(3),
3520 DataValue::String("Pending".to_string()),
3521 DataValue::String("Low".to_string()),
3522 ]))
3523 .unwrap();
3524
3525 table
3526 .add_row(DataRow::new(vec![
3527 DataValue::Integer(4),
3528 DataValue::String("Complete".to_string()),
3529 DataValue::String("Low".to_string()),
3530 ]))
3531 .unwrap();
3532
3533 let table = Arc::new(table);
3534 let engine = QueryEngine::new();
3535
3536 println!("\n=== Testing Parentheses in WHERE clause ===");
3537 println!("Table has {} rows", table.row_count());
3538 for i in 0..table.row_count() {
3539 let status = table.get_value(i, 1);
3540 let priority = table.get_value(i, 2);
3541 println!("Row {i}: status = {status:?}, priority = {priority:?}");
3542 }
3543
3544 println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
3546 let result = engine.execute(
3547 table.clone(),
3548 "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
3549 );
3550 match result {
3551 Ok(view) => {
3552 println!(
3553 "SUCCESS: Found {} rows with parenthetical logic",
3554 view.row_count()
3555 );
3556 assert_eq!(view.row_count(), 2); }
3558 Err(e) => {
3559 panic!("Parentheses query failed: {e}");
3560 }
3561 }
3562
3563 println!("\n=== Parentheses test complete! ===");
3564 }
3565
3566 #[test]
3567 #[ignore = "Numeric type coercion needs fixing"]
3568 fn test_numeric_type_coercion() {
3569 let _ = tracing_subscriber::fmt()
3571 .with_max_level(tracing::Level::DEBUG)
3572 .try_init();
3573
3574 let mut table = DataTable::new("test");
3575 table.add_column(DataColumn::new("id"));
3576 table.add_column(DataColumn::new("price"));
3577 table.add_column(DataColumn::new("quantity"));
3578
3579 table
3581 .add_row(DataRow::new(vec![
3582 DataValue::Integer(1),
3583 DataValue::Float(99.50), DataValue::Integer(100),
3585 ]))
3586 .unwrap();
3587
3588 table
3589 .add_row(DataRow::new(vec![
3590 DataValue::Integer(2),
3591 DataValue::Float(150.0), DataValue::Integer(200),
3593 ]))
3594 .unwrap();
3595
3596 table
3597 .add_row(DataRow::new(vec![
3598 DataValue::Integer(3),
3599 DataValue::Integer(75), DataValue::Integer(50),
3601 ]))
3602 .unwrap();
3603
3604 let table = Arc::new(table);
3605 let engine = QueryEngine::new();
3606
3607 println!("\n=== Testing Numeric Type Coercion ===");
3608 println!("Table has {} rows", table.row_count());
3609 for i in 0..table.row_count() {
3610 let price = table.get_value(i, 1);
3611 let quantity = table.get_value(i, 2);
3612 println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
3613 }
3614
3615 println!("\n--- Test: price.Contains('.') ---");
3617 let result = engine.execute(
3618 table.clone(),
3619 "SELECT * FROM test WHERE price.Contains('.')",
3620 );
3621 match result {
3622 Ok(view) => {
3623 println!(
3624 "SUCCESS: Found {} rows with decimal points in price",
3625 view.row_count()
3626 );
3627 assert_eq!(view.row_count(), 2); }
3629 Err(e) => {
3630 panic!("Numeric Contains query failed: {e}");
3631 }
3632 }
3633
3634 println!("\n--- Test: quantity.Contains('0') ---");
3636 let result = engine.execute(
3637 table.clone(),
3638 "SELECT * FROM test WHERE quantity.Contains('0')",
3639 );
3640 match result {
3641 Ok(view) => {
3642 println!(
3643 "SUCCESS: Found {} rows with '0' in quantity",
3644 view.row_count()
3645 );
3646 assert_eq!(view.row_count(), 2); }
3648 Err(e) => {
3649 panic!("Integer Contains query failed: {e}");
3650 }
3651 }
3652
3653 println!("\n=== Numeric type coercion test complete! ===");
3654 }
3655
3656 #[test]
3657 fn test_datetime_comparisons() {
3658 let _ = tracing_subscriber::fmt()
3660 .with_max_level(tracing::Level::DEBUG)
3661 .try_init();
3662
3663 let mut table = DataTable::new("test");
3664 table.add_column(DataColumn::new("id"));
3665 table.add_column(DataColumn::new("created_date"));
3666
3667 table
3669 .add_row(DataRow::new(vec![
3670 DataValue::Integer(1),
3671 DataValue::String("2024-12-15".to_string()),
3672 ]))
3673 .unwrap();
3674
3675 table
3676 .add_row(DataRow::new(vec![
3677 DataValue::Integer(2),
3678 DataValue::String("2025-01-15".to_string()),
3679 ]))
3680 .unwrap();
3681
3682 table
3683 .add_row(DataRow::new(vec![
3684 DataValue::Integer(3),
3685 DataValue::String("2025-02-15".to_string()),
3686 ]))
3687 .unwrap();
3688
3689 let table = Arc::new(table);
3690 let engine = QueryEngine::new();
3691
3692 println!("\n=== Testing DateTime Comparisons ===");
3693 println!("Table has {} rows", table.row_count());
3694 for i in 0..table.row_count() {
3695 let date = table.get_value(i, 1);
3696 println!("Row {i}: created_date = {date:?}");
3697 }
3698
3699 println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
3701 let result = engine.execute(
3702 table.clone(),
3703 "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
3704 );
3705 match result {
3706 Ok(view) => {
3707 println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
3708 assert_eq!(view.row_count(), 2); }
3710 Err(e) => {
3711 panic!("DateTime comparison query failed: {e}");
3712 }
3713 }
3714
3715 println!("\n=== DateTime comparison test complete! ===");
3716 }
3717
3718 #[test]
3719 fn test_not_with_method_calls() {
3720 let _ = tracing_subscriber::fmt()
3722 .with_max_level(tracing::Level::DEBUG)
3723 .try_init();
3724
3725 let mut table = DataTable::new("test");
3726 table.add_column(DataColumn::new("id"));
3727 table.add_column(DataColumn::new("status"));
3728
3729 table
3731 .add_row(DataRow::new(vec![
3732 DataValue::Integer(1),
3733 DataValue::String("Pending Review".to_string()),
3734 ]))
3735 .unwrap();
3736
3737 table
3738 .add_row(DataRow::new(vec![
3739 DataValue::Integer(2),
3740 DataValue::String("Complete".to_string()),
3741 ]))
3742 .unwrap();
3743
3744 table
3745 .add_row(DataRow::new(vec![
3746 DataValue::Integer(3),
3747 DataValue::String("Pending Approval".to_string()),
3748 ]))
3749 .unwrap();
3750
3751 let table = Arc::new(table);
3752 let engine = QueryEngine::with_case_insensitive(true);
3753
3754 println!("\n=== Testing NOT with Method Calls ===");
3755 println!("Table has {} rows", table.row_count());
3756 for i in 0..table.row_count() {
3757 let status = table.get_value(i, 1);
3758 println!("Row {i}: status = {status:?}");
3759 }
3760
3761 println!("\n--- Test: NOT status.Contains('pend') ---");
3763 let result = engine.execute(
3764 table.clone(),
3765 "SELECT * FROM test WHERE NOT status.Contains('pend')",
3766 );
3767 match result {
3768 Ok(view) => {
3769 println!(
3770 "SUCCESS: Found {} rows NOT containing 'pend'",
3771 view.row_count()
3772 );
3773 assert_eq!(view.row_count(), 1); }
3775 Err(e) => {
3776 panic!("NOT Contains query failed: {e}");
3777 }
3778 }
3779
3780 println!("\n--- Test: NOT status.StartsWith('Pending') ---");
3782 let result = engine.execute(
3783 table.clone(),
3784 "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
3785 );
3786 match result {
3787 Ok(view) => {
3788 println!(
3789 "SUCCESS: Found {} rows NOT starting with 'Pending'",
3790 view.row_count()
3791 );
3792 assert_eq!(view.row_count(), 1); }
3794 Err(e) => {
3795 panic!("NOT StartsWith query failed: {e}");
3796 }
3797 }
3798
3799 println!("\n=== NOT with method calls test complete! ===");
3800 }
3801
3802 #[test]
3803 #[ignore = "Complex logical expressions with parentheses not yet implemented"]
3804 fn test_complex_logical_expressions() {
3805 let _ = tracing_subscriber::fmt()
3807 .with_max_level(tracing::Level::DEBUG)
3808 .try_init();
3809
3810 let mut table = DataTable::new("test");
3811 table.add_column(DataColumn::new("id"));
3812 table.add_column(DataColumn::new("status"));
3813 table.add_column(DataColumn::new("priority"));
3814 table.add_column(DataColumn::new("assigned"));
3815
3816 table
3818 .add_row(DataRow::new(vec![
3819 DataValue::Integer(1),
3820 DataValue::String("Pending".to_string()),
3821 DataValue::String("High".to_string()),
3822 DataValue::String("John".to_string()),
3823 ]))
3824 .unwrap();
3825
3826 table
3827 .add_row(DataRow::new(vec![
3828 DataValue::Integer(2),
3829 DataValue::String("Complete".to_string()),
3830 DataValue::String("High".to_string()),
3831 DataValue::String("Jane".to_string()),
3832 ]))
3833 .unwrap();
3834
3835 table
3836 .add_row(DataRow::new(vec![
3837 DataValue::Integer(3),
3838 DataValue::String("Pending".to_string()),
3839 DataValue::String("Low".to_string()),
3840 DataValue::String("John".to_string()),
3841 ]))
3842 .unwrap();
3843
3844 table
3845 .add_row(DataRow::new(vec![
3846 DataValue::Integer(4),
3847 DataValue::String("In Progress".to_string()),
3848 DataValue::String("Medium".to_string()),
3849 DataValue::String("Jane".to_string()),
3850 ]))
3851 .unwrap();
3852
3853 let table = Arc::new(table);
3854 let engine = QueryEngine::new();
3855
3856 println!("\n=== Testing Complex Logical Expressions ===");
3857 println!("Table has {} rows", table.row_count());
3858 for i in 0..table.row_count() {
3859 let status = table.get_value(i, 1);
3860 let priority = table.get_value(i, 2);
3861 let assigned = table.get_value(i, 3);
3862 println!(
3863 "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
3864 );
3865 }
3866
3867 println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
3869 let result = engine.execute(
3870 table.clone(),
3871 "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
3872 );
3873 match result {
3874 Ok(view) => {
3875 println!(
3876 "SUCCESS: Found {} rows with complex logic",
3877 view.row_count()
3878 );
3879 assert_eq!(view.row_count(), 2); }
3881 Err(e) => {
3882 panic!("Complex logic query failed: {e}");
3883 }
3884 }
3885
3886 println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
3888 let result = engine.execute(
3889 table.clone(),
3890 "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
3891 );
3892 match result {
3893 Ok(view) => {
3894 println!(
3895 "SUCCESS: Found {} rows with NOT complex logic",
3896 view.row_count()
3897 );
3898 assert_eq!(view.row_count(), 2); }
3900 Err(e) => {
3901 panic!("NOT complex logic query failed: {e}");
3902 }
3903 }
3904
3905 println!("\n=== Complex logical expressions test complete! ===");
3906 }
3907
3908 #[test]
3909 fn test_mixed_data_types_and_edge_cases() {
3910 let _ = tracing_subscriber::fmt()
3912 .with_max_level(tracing::Level::DEBUG)
3913 .try_init();
3914
3915 let mut table = DataTable::new("test");
3916 table.add_column(DataColumn::new("id"));
3917 table.add_column(DataColumn::new("value"));
3918 table.add_column(DataColumn::new("nullable_field"));
3919
3920 table
3922 .add_row(DataRow::new(vec![
3923 DataValue::Integer(1),
3924 DataValue::String("123.45".to_string()),
3925 DataValue::String("present".to_string()),
3926 ]))
3927 .unwrap();
3928
3929 table
3930 .add_row(DataRow::new(vec![
3931 DataValue::Integer(2),
3932 DataValue::Float(678.90),
3933 DataValue::Null,
3934 ]))
3935 .unwrap();
3936
3937 table
3938 .add_row(DataRow::new(vec![
3939 DataValue::Integer(3),
3940 DataValue::Boolean(true),
3941 DataValue::String("also present".to_string()),
3942 ]))
3943 .unwrap();
3944
3945 table
3946 .add_row(DataRow::new(vec![
3947 DataValue::Integer(4),
3948 DataValue::String("false".to_string()),
3949 DataValue::Null,
3950 ]))
3951 .unwrap();
3952
3953 let table = Arc::new(table);
3954 let engine = QueryEngine::new();
3955
3956 println!("\n=== Testing Mixed Data Types and Edge Cases ===");
3957 println!("Table has {} rows", table.row_count());
3958 for i in 0..table.row_count() {
3959 let value = table.get_value(i, 1);
3960 let nullable = table.get_value(i, 2);
3961 println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
3962 }
3963
3964 println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
3966 let result = engine.execute(
3967 table.clone(),
3968 "SELECT * FROM test WHERE value.Contains('true')",
3969 );
3970 match result {
3971 Ok(view) => {
3972 println!(
3973 "SUCCESS: Found {} rows with boolean coercion",
3974 view.row_count()
3975 );
3976 assert_eq!(view.row_count(), 1); }
3978 Err(e) => {
3979 panic!("Boolean coercion query failed: {e}");
3980 }
3981 }
3982
3983 println!("\n--- Test: id IN (1, 3) ---");
3985 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
3986 match result {
3987 Ok(view) => {
3988 println!("SUCCESS: Found {} rows with IN clause", view.row_count());
3989 assert_eq!(view.row_count(), 2); }
3991 Err(e) => {
3992 panic!("Multiple IN values query failed: {e}");
3993 }
3994 }
3995
3996 println!("\n=== Mixed data types test complete! ===");
3997 }
3998
3999 #[test]
4001 fn test_aggregate_only_single_row() {
4002 let table = create_test_stock_data();
4003 let engine = QueryEngine::new();
4004
4005 let result = engine
4007 .execute(
4008 table.clone(),
4009 "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
4010 )
4011 .expect("Query should succeed");
4012
4013 assert_eq!(
4014 result.row_count(),
4015 1,
4016 "Aggregate-only query should return exactly 1 row"
4017 );
4018 assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
4019
4020 let source = result.source();
4022 let row = source.get_row(0).expect("Should have first row");
4023
4024 assert_eq!(row.values[0], DataValue::Integer(5));
4026
4027 assert_eq!(row.values[1], DataValue::Float(99.5));
4029
4030 assert_eq!(row.values[2], DataValue::Float(105.0));
4032
4033 if let DataValue::Float(avg) = &row.values[3] {
4035 assert!(
4036 (avg - 102.4).abs() < 0.01,
4037 "Average should be approximately 102.4, got {}",
4038 avg
4039 );
4040 } else {
4041 panic!("AVG should return a Float value");
4042 }
4043 }
4044
4045 #[test]
4047 fn test_single_aggregate_single_row() {
4048 let table = create_test_stock_data();
4049 let engine = QueryEngine::new();
4050
4051 let result = engine
4052 .execute(table.clone(), "SELECT COUNT(*) FROM stock")
4053 .expect("Query should succeed");
4054
4055 assert_eq!(
4056 result.row_count(),
4057 1,
4058 "Single aggregate query should return exactly 1 row"
4059 );
4060 assert_eq!(result.column_count(), 1, "Should have 1 column");
4061
4062 let source = result.source();
4063 let row = source.get_row(0).expect("Should have first row");
4064 assert_eq!(row.values[0], DataValue::Integer(5));
4065 }
4066
4067 #[test]
4069 fn test_aggregate_with_where_single_row() {
4070 let table = create_test_stock_data();
4071 let engine = QueryEngine::new();
4072
4073 let result = engine
4075 .execute(
4076 table.clone(),
4077 "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
4078 )
4079 .expect("Query should succeed");
4080
4081 assert_eq!(
4082 result.row_count(),
4083 1,
4084 "Filtered aggregate query should return exactly 1 row"
4085 );
4086 assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
4087
4088 let source = result.source();
4089 let row = source.get_row(0).expect("Should have first row");
4090
4091 assert_eq!(row.values[0], DataValue::Integer(2));
4093 assert_eq!(row.values[1], DataValue::Float(103.5)); assert_eq!(row.values[2], DataValue::Float(105.0)); }
4096
4097 #[test]
4098 fn test_not_in_parsing() {
4099 use crate::sql::recursive_parser::Parser;
4100
4101 let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
4102 println!("\n=== Testing NOT IN parsing ===");
4103 println!("Parsing query: {query}");
4104
4105 let mut parser = Parser::new(query);
4106 match parser.parse() {
4107 Ok(statement) => {
4108 println!("Parsed statement: {statement:#?}");
4109 if let Some(where_clause) = statement.where_clause {
4110 println!("WHERE conditions: {:#?}", where_clause.conditions);
4111 if let Some(first_condition) = where_clause.conditions.first() {
4112 println!("First condition expression: {:#?}", first_condition.expr);
4113 }
4114 }
4115 }
4116 Err(e) => {
4117 panic!("Parse error: {e}");
4118 }
4119 }
4120 }
4121
4122 fn create_test_stock_data() -> Arc<DataTable> {
4124 let mut table = DataTable::new("stock");
4125
4126 table.add_column(DataColumn::new("symbol"));
4127 table.add_column(DataColumn::new("close"));
4128 table.add_column(DataColumn::new("volume"));
4129
4130 let test_data = vec![
4132 ("AAPL", 99.5, 1000),
4133 ("AAPL", 101.2, 1500),
4134 ("AAPL", 103.5, 2000),
4135 ("AAPL", 105.0, 1200),
4136 ("AAPL", 102.8, 1800),
4137 ];
4138
4139 for (symbol, close, volume) in test_data {
4140 table
4141 .add_row(DataRow::new(vec![
4142 DataValue::String(symbol.to_string()),
4143 DataValue::Float(close),
4144 DataValue::Integer(volume),
4145 ]))
4146 .expect("Should add row successfully");
4147 }
4148
4149 Arc::new(table)
4150 }
4151}
4152
4153#[cfg(test)]
4154#[path = "query_engine_tests.rs"]
4155mod query_engine_tests;