1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::row_expanders::RowExpanderRegistry;
19use crate::data::subquery_executor::SubqueryExecutor;
20use crate::data::temp_table_registry::TempTableRegistry;
21use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
22use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
23use crate::sql::parser::ast::ColumnRef;
24use crate::sql::parser::ast::SetOperation;
25use crate::sql::parser::ast::TableSource;
26use crate::sql::parser::ast::WindowSpec;
27use crate::sql::recursive_parser::{
28 CTEType, OrderByItem, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
29 TableFunction,
30};
31
32fn resolve_cte<'a>(
37 context: &'a HashMap<String, Arc<DataView>>,
38 name: &str,
39) -> Option<&'a Arc<DataView>> {
40 if let Some(v) = context.get(name) {
41 return Some(v);
42 }
43 let lower = name.to_lowercase();
44 context
45 .iter()
46 .find(|(k, _)| k.to_lowercase() == lower)
47 .map(|(_, v)| v)
48}
49
50#[derive(Debug, Clone)]
52pub struct ExecutionContext {
53 alias_map: HashMap<String, String>,
56}
57
58impl ExecutionContext {
59 pub fn new() -> Self {
61 Self {
62 alias_map: HashMap::new(),
63 }
64 }
65
66 pub fn register_alias(&mut self, alias: String, table_name: String) {
68 debug!("Registering alias: {} -> {}", alias, table_name);
69 self.alias_map.insert(alias, table_name);
70 }
71
72 pub fn resolve_alias(&self, name: &str) -> String {
75 self.alias_map
76 .get(name)
77 .cloned()
78 .unwrap_or_else(|| name.to_string())
79 }
80
81 pub fn is_alias(&self, name: &str) -> bool {
83 self.alias_map.contains_key(name)
84 }
85
86 pub fn get_aliases(&self) -> HashMap<String, String> {
88 self.alias_map.clone()
89 }
90
91 pub fn resolve_column_index(&self, table: &DataTable, column_ref: &ColumnRef) -> Result<usize> {
106 if let Some(table_prefix) = &column_ref.table_prefix {
107 let actual_table = self.resolve_alias(table_prefix);
109
110 let qualified_name = format!("{}.{}", actual_table, column_ref.name);
112 if let Some(idx) = table.find_column_by_qualified_name(&qualified_name) {
113 debug!(
114 "Resolved {}.{} -> qualified column '{}' at index {}",
115 table_prefix, column_ref.name, qualified_name, idx
116 );
117 return Ok(idx);
118 }
119
120 if let Some(idx) = table.get_column_index(&column_ref.name) {
122 debug!(
123 "Resolved {}.{} -> unqualified column '{}' at index {}",
124 table_prefix, column_ref.name, column_ref.name, idx
125 );
126 return Ok(idx);
127 }
128
129 Err(anyhow!(
131 "Column '{}' not found. Table '{}' may not support qualified column names",
132 qualified_name,
133 actual_table
134 ))
135 } else {
136 if let Some(idx) = table.get_column_index(&column_ref.name) {
138 debug!(
139 "Resolved unqualified column '{}' at index {}",
140 column_ref.name, idx
141 );
142 return Ok(idx);
143 }
144
145 if column_ref.name.contains('.') {
147 if let Some(idx) = table.find_column_by_qualified_name(&column_ref.name) {
148 debug!(
149 "Resolved '{}' as qualified column at index {}",
150 column_ref.name, idx
151 );
152 return Ok(idx);
153 }
154 }
155
156 let suggestion = self.find_similar_column(table, &column_ref.name);
158 match suggestion {
159 Some(similar) => Err(anyhow!(
160 "Column '{}' not found. Did you mean '{}'?",
161 column_ref.name,
162 similar
163 )),
164 None => Err(anyhow!("Column '{}' not found", column_ref.name)),
165 }
166 }
167 }
168
169 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
171 let columns = table.column_names();
172 let mut best_match: Option<(String, usize)> = None;
173
174 for col in columns {
175 let distance = edit_distance(name, &col);
176 if distance <= 2 {
177 match best_match {
179 Some((_, best_dist)) if distance < best_dist => {
180 best_match = Some((col.clone(), distance));
181 }
182 None => {
183 best_match = Some((col.clone(), distance));
184 }
185 _ => {}
186 }
187 }
188 }
189
190 best_match.map(|(name, _)| name)
191 }
192}
193
194impl Default for ExecutionContext {
195 fn default() -> Self {
196 Self::new()
197 }
198}
199
200fn edit_distance(a: &str, b: &str) -> usize {
202 let len_a = a.chars().count();
203 let len_b = b.chars().count();
204
205 if len_a == 0 {
206 return len_b;
207 }
208 if len_b == 0 {
209 return len_a;
210 }
211
212 let mut matrix = vec![vec![0; len_b + 1]; len_a + 1];
213
214 for i in 0..=len_a {
215 matrix[i][0] = i;
216 }
217 for j in 0..=len_b {
218 matrix[0][j] = j;
219 }
220
221 let a_chars: Vec<char> = a.chars().collect();
222 let b_chars: Vec<char> = b.chars().collect();
223
224 for i in 1..=len_a {
225 for j in 1..=len_b {
226 let cost = if a_chars[i - 1] == b_chars[j - 1] {
227 0
228 } else {
229 1
230 };
231 matrix[i][j] = min(
232 min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1),
233 matrix[i - 1][j - 1] + cost,
234 );
235 }
236 }
237
238 matrix[len_a][len_b]
239}
240
241#[derive(Clone)]
243pub struct QueryEngine {
244 case_insensitive: bool,
245 date_notation: String,
246 _behavior_config: Option<BehaviorConfig>,
247}
248
249impl Default for QueryEngine {
250 fn default() -> Self {
251 Self::new()
252 }
253}
254
255impl QueryEngine {
256 #[must_use]
257 pub fn new() -> Self {
258 Self {
259 case_insensitive: false,
260 date_notation: get_date_notation(),
261 _behavior_config: None,
262 }
263 }
264
265 #[must_use]
266 pub fn with_behavior_config(config: BehaviorConfig) -> Self {
267 let case_insensitive = config.case_insensitive_default;
268 let date_notation = get_date_notation();
270 Self {
271 case_insensitive,
272 date_notation,
273 _behavior_config: Some(config),
274 }
275 }
276
277 #[must_use]
278 pub fn with_date_notation(_date_notation: String) -> Self {
279 Self {
280 case_insensitive: false,
281 date_notation: get_date_notation(), _behavior_config: None,
283 }
284 }
285
286 #[must_use]
287 pub fn with_case_insensitive(case_insensitive: bool) -> Self {
288 Self {
289 case_insensitive,
290 date_notation: get_date_notation(),
291 _behavior_config: None,
292 }
293 }
294
295 #[must_use]
296 pub fn with_case_insensitive_and_date_notation(
297 case_insensitive: bool,
298 _date_notation: String, ) -> Self {
300 Self {
301 case_insensitive,
302 date_notation: get_date_notation(), _behavior_config: None,
304 }
305 }
306
307 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
309 let columns = table.column_names();
310 let mut best_match: Option<(String, usize)> = None;
311
312 for col in columns {
313 let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
314 let max_distance = if name.len() > 10 { 3 } else { 2 };
317 if distance <= max_distance {
318 match &best_match {
319 None => best_match = Some((col, distance)),
320 Some((_, best_dist)) if distance < *best_dist => {
321 best_match = Some((col, distance));
322 }
323 _ => {}
324 }
325 }
326 }
327
328 best_match.map(|(name, _)| name)
329 }
330
331 fn edit_distance(&self, s1: &str, s2: &str) -> usize {
333 let len1 = s1.len();
334 let len2 = s2.len();
335 let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
336
337 for i in 0..=len1 {
338 matrix[i][0] = i;
339 }
340 for j in 0..=len2 {
341 matrix[0][j] = j;
342 }
343
344 for (i, c1) in s1.chars().enumerate() {
345 for (j, c2) in s2.chars().enumerate() {
346 let cost = usize::from(c1 != c2);
347 matrix[i + 1][j + 1] = std::cmp::min(
348 matrix[i][j + 1] + 1, std::cmp::min(
350 matrix[i + 1][j] + 1, matrix[i][j] + cost, ),
353 );
354 }
355 }
356
357 matrix[len1][len2]
358 }
359
360 fn contains_unnest(expr: &SqlExpression) -> bool {
362 match expr {
363 SqlExpression::Unnest { .. } => true,
365 SqlExpression::FunctionCall { name, args, .. } => {
366 if name.to_uppercase() == "UNNEST" {
367 return true;
368 }
369 args.iter().any(Self::contains_unnest)
371 }
372 SqlExpression::BinaryOp { left, right, .. } => {
373 Self::contains_unnest(left) || Self::contains_unnest(right)
374 }
375 SqlExpression::Not { expr } => Self::contains_unnest(expr),
376 SqlExpression::CaseExpression {
377 when_branches,
378 else_branch,
379 } => {
380 when_branches.iter().any(|branch| {
381 Self::contains_unnest(&branch.condition)
382 || Self::contains_unnest(&branch.result)
383 }) || else_branch
384 .as_ref()
385 .map_or(false, |e| Self::contains_unnest(e))
386 }
387 SqlExpression::SimpleCaseExpression {
388 expr,
389 when_branches,
390 else_branch,
391 } => {
392 Self::contains_unnest(expr)
393 || when_branches.iter().any(|branch| {
394 Self::contains_unnest(&branch.value)
395 || Self::contains_unnest(&branch.result)
396 })
397 || else_branch
398 .as_ref()
399 .map_or(false, |e| Self::contains_unnest(e))
400 }
401 SqlExpression::InList { expr, values } => {
402 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
403 }
404 SqlExpression::NotInList { expr, values } => {
405 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
406 }
407 SqlExpression::Between { expr, lower, upper } => {
408 Self::contains_unnest(expr)
409 || Self::contains_unnest(lower)
410 || Self::contains_unnest(upper)
411 }
412 SqlExpression::InSubquery { expr, .. } => Self::contains_unnest(expr),
413 SqlExpression::NotInSubquery { expr, .. } => Self::contains_unnest(expr),
414 SqlExpression::ScalarSubquery { .. } => false, SqlExpression::WindowFunction { args, .. } => args.iter().any(Self::contains_unnest),
416 SqlExpression::MethodCall { args, .. } => args.iter().any(Self::contains_unnest),
417 SqlExpression::ChainedMethodCall { base, args, .. } => {
418 Self::contains_unnest(base) || args.iter().any(Self::contains_unnest)
419 }
420 _ => false,
421 }
422 }
423
424 fn collect_window_specs(expr: &SqlExpression, specs: &mut Vec<WindowSpec>) {
426 match expr {
427 SqlExpression::WindowFunction {
428 window_spec, args, ..
429 } => {
430 specs.push(window_spec.clone());
432 for arg in args {
434 Self::collect_window_specs(arg, specs);
435 }
436 }
437 SqlExpression::BinaryOp { left, right, .. } => {
438 Self::collect_window_specs(left, specs);
439 Self::collect_window_specs(right, specs);
440 }
441 SqlExpression::Not { expr } => {
442 Self::collect_window_specs(expr, specs);
443 }
444 SqlExpression::FunctionCall { args, .. } => {
445 for arg in args {
446 Self::collect_window_specs(arg, specs);
447 }
448 }
449 SqlExpression::CaseExpression {
450 when_branches,
451 else_branch,
452 } => {
453 for branch in when_branches {
454 Self::collect_window_specs(&branch.condition, specs);
455 Self::collect_window_specs(&branch.result, specs);
456 }
457 if let Some(else_expr) = else_branch {
458 Self::collect_window_specs(else_expr, specs);
459 }
460 }
461 SqlExpression::SimpleCaseExpression {
462 expr,
463 when_branches,
464 else_branch,
465 } => {
466 Self::collect_window_specs(expr, specs);
467 for branch in when_branches {
468 Self::collect_window_specs(&branch.value, specs);
469 Self::collect_window_specs(&branch.result, specs);
470 }
471 if let Some(else_expr) = else_branch {
472 Self::collect_window_specs(else_expr, specs);
473 }
474 }
475 SqlExpression::InList { expr, values, .. } => {
476 Self::collect_window_specs(expr, specs);
477 for item in values {
478 Self::collect_window_specs(item, specs);
479 }
480 }
481 SqlExpression::ChainedMethodCall { base, args, .. } => {
482 Self::collect_window_specs(base, specs);
483 for arg in args {
484 Self::collect_window_specs(arg, specs);
485 }
486 }
487 SqlExpression::Column(_)
489 | SqlExpression::NumberLiteral(_)
490 | SqlExpression::StringLiteral(_)
491 | SqlExpression::BooleanLiteral(_)
492 | SqlExpression::Null
493 | SqlExpression::DateTimeToday { .. }
494 | SqlExpression::DateTimeConstructor { .. }
495 | SqlExpression::MethodCall { .. } => {}
496 _ => {}
498 }
499 }
500
501 fn contains_window_function(expr: &SqlExpression) -> bool {
503 match expr {
504 SqlExpression::WindowFunction { .. } => true,
505 SqlExpression::BinaryOp { left, right, .. } => {
506 Self::contains_window_function(left) || Self::contains_window_function(right)
507 }
508 SqlExpression::Not { expr } => Self::contains_window_function(expr),
509 SqlExpression::FunctionCall { args, .. } => {
510 args.iter().any(Self::contains_window_function)
511 }
512 SqlExpression::CaseExpression {
513 when_branches,
514 else_branch,
515 } => {
516 when_branches.iter().any(|branch| {
517 Self::contains_window_function(&branch.condition)
518 || Self::contains_window_function(&branch.result)
519 }) || else_branch
520 .as_ref()
521 .map_or(false, |e| Self::contains_window_function(e))
522 }
523 SqlExpression::SimpleCaseExpression {
524 expr,
525 when_branches,
526 else_branch,
527 } => {
528 Self::contains_window_function(expr)
529 || when_branches.iter().any(|branch| {
530 Self::contains_window_function(&branch.value)
531 || Self::contains_window_function(&branch.result)
532 })
533 || else_branch
534 .as_ref()
535 .map_or(false, |e| Self::contains_window_function(e))
536 }
537 SqlExpression::InList { expr, values } => {
538 Self::contains_window_function(expr)
539 || values.iter().any(Self::contains_window_function)
540 }
541 SqlExpression::NotInList { expr, values } => {
542 Self::contains_window_function(expr)
543 || values.iter().any(Self::contains_window_function)
544 }
545 SqlExpression::Between { expr, lower, upper } => {
546 Self::contains_window_function(expr)
547 || Self::contains_window_function(lower)
548 || Self::contains_window_function(upper)
549 }
550 SqlExpression::InSubquery { expr, .. } => Self::contains_window_function(expr),
551 SqlExpression::NotInSubquery { expr, .. } => Self::contains_window_function(expr),
552 SqlExpression::MethodCall { args, .. } => {
553 args.iter().any(Self::contains_window_function)
554 }
555 SqlExpression::ChainedMethodCall { base, args, .. } => {
556 Self::contains_window_function(base)
557 || args.iter().any(Self::contains_window_function)
558 }
559 _ => false,
560 }
561 }
562
563 fn extract_window_specs(
565 items: &[SelectItem],
566 ) -> Vec<crate::data::batch_window_evaluator::WindowFunctionSpec> {
567 let mut specs = Vec::new();
568 for (idx, item) in items.iter().enumerate() {
569 if let SelectItem::Expression { expr, .. } = item {
570 Self::collect_window_function_specs(expr, idx, &mut specs);
571 }
572 }
573 specs
574 }
575
576 fn collect_window_function_specs(
578 expr: &SqlExpression,
579 output_column_index: usize,
580 specs: &mut Vec<crate::data::batch_window_evaluator::WindowFunctionSpec>,
581 ) {
582 match expr {
583 SqlExpression::WindowFunction {
584 name,
585 args,
586 window_spec,
587 } => {
588 specs.push(crate::data::batch_window_evaluator::WindowFunctionSpec {
589 spec: window_spec.clone(),
590 function_name: name.clone(),
591 args: args.clone(),
592 output_column_index,
593 });
594 }
595 SqlExpression::BinaryOp { left, right, .. } => {
596 Self::collect_window_function_specs(left, output_column_index, specs);
597 Self::collect_window_function_specs(right, output_column_index, specs);
598 }
599 SqlExpression::Not { expr } => {
600 Self::collect_window_function_specs(expr, output_column_index, specs);
601 }
602 SqlExpression::FunctionCall { args, .. } => {
603 for arg in args {
604 Self::collect_window_function_specs(arg, output_column_index, specs);
605 }
606 }
607 SqlExpression::CaseExpression {
608 when_branches,
609 else_branch,
610 } => {
611 for branch in when_branches {
612 Self::collect_window_function_specs(
613 &branch.condition,
614 output_column_index,
615 specs,
616 );
617 Self::collect_window_function_specs(&branch.result, output_column_index, specs);
618 }
619 if let Some(e) = else_branch {
620 Self::collect_window_function_specs(e, output_column_index, specs);
621 }
622 }
623 SqlExpression::SimpleCaseExpression {
624 expr,
625 when_branches,
626 else_branch,
627 } => {
628 Self::collect_window_function_specs(expr, output_column_index, specs);
629 for branch in when_branches {
630 Self::collect_window_function_specs(&branch.value, output_column_index, specs);
631 Self::collect_window_function_specs(&branch.result, output_column_index, specs);
632 }
633 if let Some(e) = else_branch {
634 Self::collect_window_function_specs(e, output_column_index, specs);
635 }
636 }
637 SqlExpression::InList { expr, values } => {
638 Self::collect_window_function_specs(expr, output_column_index, specs);
639 for val in values {
640 Self::collect_window_function_specs(val, output_column_index, specs);
641 }
642 }
643 SqlExpression::NotInList { expr, values } => {
644 Self::collect_window_function_specs(expr, output_column_index, specs);
645 for val in values {
646 Self::collect_window_function_specs(val, output_column_index, specs);
647 }
648 }
649 SqlExpression::Between { expr, lower, upper } => {
650 Self::collect_window_function_specs(expr, output_column_index, specs);
651 Self::collect_window_function_specs(lower, output_column_index, specs);
652 Self::collect_window_function_specs(upper, output_column_index, specs);
653 }
654 SqlExpression::InSubquery { expr, .. } => {
655 Self::collect_window_function_specs(expr, output_column_index, specs);
656 }
657 SqlExpression::NotInSubquery { expr, .. } => {
658 Self::collect_window_function_specs(expr, output_column_index, specs);
659 }
660 SqlExpression::MethodCall { args, .. } => {
661 for arg in args {
662 Self::collect_window_function_specs(arg, output_column_index, specs);
663 }
664 }
665 SqlExpression::ChainedMethodCall { base, args, .. } => {
666 Self::collect_window_function_specs(base, output_column_index, specs);
667 for arg in args {
668 Self::collect_window_function_specs(arg, output_column_index, specs);
669 }
670 }
671 _ => {} }
673 }
674
675 pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
677 let (view, _plan) = self.execute_with_plan(table, sql)?;
678 Ok(view)
679 }
680
681 pub fn execute_with_temp_tables(
683 &self,
684 table: Arc<DataTable>,
685 sql: &str,
686 temp_tables: Option<&TempTableRegistry>,
687 ) -> Result<DataView> {
688 let (view, _plan) = self.execute_with_plan_and_temp_tables(table, sql, temp_tables)?;
689 Ok(view)
690 }
691
692 pub fn execute_statement(
694 &self,
695 table: Arc<DataTable>,
696 statement: SelectStatement,
697 ) -> Result<DataView> {
698 self.execute_statement_with_temp_tables(table, statement, None)
699 }
700
701 pub fn execute_statement_with_temp_tables(
703 &self,
704 table: Arc<DataTable>,
705 statement: SelectStatement,
706 temp_tables: Option<&TempTableRegistry>,
707 ) -> Result<DataView> {
708 let mut cte_context = HashMap::new();
710
711 if let Some(temp_registry) = temp_tables {
713 for table_name in temp_registry.list_tables() {
714 if let Some(temp_table) = temp_registry.get(&table_name) {
715 debug!("Adding temp table {} to CTE context", table_name);
716 let view = DataView::new(temp_table);
717 cte_context.insert(table_name, Arc::new(view));
718 }
719 }
720 }
721
722 for cte in &statement.ctes {
723 debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
724 let cte_result = match &cte.cte_type {
726 CTEType::Standard(query) => {
727 let view = self.build_view_with_context(
729 table.clone(),
730 query.clone(),
731 &mut cte_context,
732 )?;
733
734 let mut materialized = self.materialize_view(view)?;
736
737 for column in materialized.columns_mut() {
739 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
740 column.source_table = Some(cte.name.clone());
741 }
742
743 DataView::new(Arc::new(materialized))
744 }
745 CTEType::Web(web_spec) => {
746 use crate::web::http_fetcher::WebDataFetcher;
748
749 let fetcher = WebDataFetcher::new()?;
750 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
752
753 for column in data_table.columns_mut() {
755 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
756 column.source_table = Some(cte.name.clone());
757 }
758
759 DataView::new(Arc::new(data_table))
761 }
762 CTEType::File(file_spec) => {
763 let mut data_table =
764 crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
765
766 for column in data_table.columns_mut() {
767 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
768 column.source_table = Some(cte.name.clone());
769 }
770
771 DataView::new(Arc::new(data_table))
772 }
773 };
774 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
776 debug!(
777 "QueryEngine: CTE '{}' pre-processed, stored in context",
778 cte.name
779 );
780 }
781
782 let mut subquery_executor =
784 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
785 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
786
787 self.build_view_with_context(table, processed_statement, &mut cte_context)
789 }
790
791 pub fn execute_statement_with_cte_context(
793 &self,
794 table: Arc<DataTable>,
795 statement: SelectStatement,
796 cte_context: &HashMap<String, Arc<DataView>>,
797 ) -> Result<DataView> {
798 let mut local_context = cte_context.clone();
800
801 for cte in &statement.ctes {
803 debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
804 let cte_result = match &cte.cte_type {
805 CTEType::Standard(query) => {
806 let view = self.build_view_with_context(
807 table.clone(),
808 query.clone(),
809 &mut local_context,
810 )?;
811
812 let mut materialized = self.materialize_view(view)?;
814
815 for column in materialized.columns_mut() {
817 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
818 column.source_table = Some(cte.name.clone());
819 }
820
821 DataView::new(Arc::new(materialized))
822 }
823 CTEType::Web(web_spec) => {
824 use crate::web::http_fetcher::WebDataFetcher;
826
827 let fetcher = WebDataFetcher::new()?;
828 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
830
831 for column in data_table.columns_mut() {
833 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
834 column.source_table = Some(cte.name.clone());
835 }
836
837 DataView::new(Arc::new(data_table))
839 }
840 CTEType::File(file_spec) => {
841 let mut data_table =
842 crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
843
844 for column in data_table.columns_mut() {
845 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
846 column.source_table = Some(cte.name.clone());
847 }
848
849 DataView::new(Arc::new(data_table))
850 }
851 };
852 local_context.insert(cte.name.clone(), Arc::new(cte_result));
853 }
854
855 let mut subquery_executor =
857 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
858 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
859
860 self.build_view_with_context(table, processed_statement, &mut local_context)
862 }
863
864 pub fn execute_with_plan(
866 &self,
867 table: Arc<DataTable>,
868 sql: &str,
869 ) -> Result<(DataView, ExecutionPlan)> {
870 self.execute_with_plan_and_temp_tables(table, sql, None)
871 }
872
873 pub fn execute_with_plan_and_temp_tables(
875 &self,
876 table: Arc<DataTable>,
877 sql: &str,
878 temp_tables: Option<&TempTableRegistry>,
879 ) -> Result<(DataView, ExecutionPlan)> {
880 let mut plan_builder = ExecutionPlanBuilder::new();
881 let start_time = Instant::now();
882
883 plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
885 plan_builder.add_detail(format!("Query: {}", sql));
886 let mut parser = Parser::new(sql);
887 let statement = parser
888 .parse()
889 .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
890 plan_builder.add_detail(format!("Parsed successfully"));
891 if let Some(ref from_source) = statement.from_source {
892 match from_source {
893 TableSource::Table(name) => {
894 plan_builder.add_detail(format!("FROM: {}", name));
895 }
896 TableSource::DerivedTable { alias, .. } => {
897 plan_builder.add_detail(format!("FROM: derived table (alias: {})", alias));
898 }
899 TableSource::Pivot { .. } => {
900 plan_builder.add_detail("FROM: PIVOT".to_string());
901 }
902 }
903 }
904 if statement.where_clause.is_some() {
905 plan_builder.add_detail("WHERE clause present".to_string());
906 }
907 plan_builder.end_step();
908
909 let mut cte_context = HashMap::new();
911
912 if let Some(temp_registry) = temp_tables {
914 for table_name in temp_registry.list_tables() {
915 if let Some(temp_table) = temp_registry.get(&table_name) {
916 debug!("Adding temp table {} to CTE context", table_name);
917 let view = DataView::new(temp_table);
918 cte_context.insert(table_name, Arc::new(view));
919 }
920 }
921 }
922
923 if !statement.ctes.is_empty() {
924 plan_builder.begin_step(
925 StepType::CTE,
926 format!("Process {} CTEs", statement.ctes.len()),
927 );
928
929 for cte in &statement.ctes {
930 let cte_start = Instant::now();
931 plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
932
933 let cte_result = match &cte.cte_type {
934 CTEType::Standard(query) => {
935 if let Some(ref from_source) = query.from_source {
937 match from_source {
938 TableSource::Table(name) => {
939 plan_builder.add_detail(format!("Source: {}", name));
940 }
941 TableSource::DerivedTable { alias, .. } => {
942 plan_builder
943 .add_detail(format!("Source: derived table ({})", alias));
944 }
945 TableSource::Pivot { .. } => {
946 plan_builder.add_detail("Source: PIVOT".to_string());
947 }
948 }
949 }
950 if query.where_clause.is_some() {
951 plan_builder.add_detail("Has WHERE clause".to_string());
952 }
953 if query.group_by.is_some() {
954 plan_builder.add_detail("Has GROUP BY".to_string());
955 }
956
957 debug!(
958 "QueryEngine: Processing CTE '{}' with existing context: {:?}",
959 cte.name,
960 cte_context.keys().collect::<Vec<_>>()
961 );
962
963 let mut subquery_executor = SubqueryExecutor::with_cte_context(
966 self.clone(),
967 table.clone(),
968 cte_context.clone(),
969 );
970 let processed_query = subquery_executor.execute_subqueries(query)?;
971
972 let view = self.build_view_with_context(
973 table.clone(),
974 processed_query,
975 &mut cte_context,
976 )?;
977
978 let mut materialized = self.materialize_view(view)?;
980
981 for column in materialized.columns_mut() {
983 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
984 column.source_table = Some(cte.name.clone());
985 }
986
987 DataView::new(Arc::new(materialized))
988 }
989 CTEType::Web(web_spec) => {
990 plan_builder.add_detail(format!("URL: {}", web_spec.url));
991 if let Some(format) = &web_spec.format {
992 plan_builder.add_detail(format!("Format: {:?}", format));
993 }
994 if let Some(cache) = web_spec.cache_seconds {
995 plan_builder.add_detail(format!("Cache: {} seconds", cache));
996 }
997
998 use crate::web::http_fetcher::WebDataFetcher;
1000
1001 let fetcher = WebDataFetcher::new()?;
1002 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
1004
1005 for column in data_table.columns_mut() {
1007 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1008 column.source_table = Some(cte.name.clone());
1009 }
1010
1011 DataView::new(Arc::new(data_table))
1013 }
1014 CTEType::File(file_spec) => {
1015 plan_builder.add_detail(format!("PATH: {}", file_spec.path));
1016 if file_spec.recursive {
1017 plan_builder.add_detail("RECURSIVE".to_string());
1018 }
1019 if let Some(ref g) = file_spec.glob {
1020 plan_builder.add_detail(format!("GLOB: {}", g));
1021 }
1022 if let Some(d) = file_spec.max_depth {
1023 plan_builder.add_detail(format!("MAX_DEPTH: {}", d));
1024 }
1025
1026 let mut data_table =
1027 crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
1028
1029 for column in data_table.columns_mut() {
1030 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1031 column.source_table = Some(cte.name.clone());
1032 }
1033
1034 DataView::new(Arc::new(data_table))
1035 }
1036 };
1037
1038 plan_builder.set_rows_out(cte_result.row_count());
1040 plan_builder.add_detail(format!(
1041 "Result: {} rows, {} columns",
1042 cte_result.row_count(),
1043 cte_result.column_count()
1044 ));
1045 plan_builder.add_detail(format!(
1046 "Execution time: {:.3}ms",
1047 cte_start.elapsed().as_secs_f64() * 1000.0
1048 ));
1049
1050 debug!(
1051 "QueryEngine: Storing CTE '{}' in context with {} rows",
1052 cte.name,
1053 cte_result.row_count()
1054 );
1055 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1056 plan_builder.end_step();
1057 }
1058
1059 plan_builder.add_detail(format!(
1060 "All {} CTEs cached in context",
1061 statement.ctes.len()
1062 ));
1063 plan_builder.end_step();
1064 }
1065
1066 plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
1068 let mut subquery_executor =
1069 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
1070
1071 let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
1073 format!("{:?}", w).contains("Subquery")
1075 });
1076
1077 if has_subqueries {
1078 plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
1079 }
1080
1081 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
1082
1083 if has_subqueries {
1084 plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
1085 } else {
1086 plan_builder.add_detail("No subqueries to process".to_string());
1087 }
1088
1089 plan_builder.end_step();
1090 let result = self.build_view_with_context_and_plan(
1091 table,
1092 processed_statement,
1093 &mut cte_context,
1094 &mut plan_builder,
1095 )?;
1096
1097 let total_duration = start_time.elapsed();
1098 info!(
1099 "Query execution complete: total={:?}, rows={}",
1100 total_duration,
1101 result.row_count()
1102 );
1103
1104 let plan = plan_builder.build();
1105 Ok((result, plan))
1106 }
1107
1108 fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
1110 let mut cte_context = HashMap::new();
1111 self.build_view_with_context(table, statement, &mut cte_context)
1112 }
1113
1114 fn build_view_with_context(
1116 &self,
1117 table: Arc<DataTable>,
1118 statement: SelectStatement,
1119 cte_context: &mut HashMap<String, Arc<DataView>>,
1120 ) -> Result<DataView> {
1121 let mut dummy_plan = ExecutionPlanBuilder::new();
1122 let mut exec_context = ExecutionContext::new();
1123 self.build_view_with_context_and_plan_and_exec(
1124 table,
1125 statement,
1126 cte_context,
1127 &mut dummy_plan,
1128 &mut exec_context,
1129 )
1130 }
1131
1132 fn build_view_with_context_and_plan(
1134 &self,
1135 table: Arc<DataTable>,
1136 statement: SelectStatement,
1137 cte_context: &mut HashMap<String, Arc<DataView>>,
1138 plan: &mut ExecutionPlanBuilder,
1139 ) -> Result<DataView> {
1140 let mut exec_context = ExecutionContext::new();
1141 self.build_view_with_context_and_plan_and_exec(
1142 table,
1143 statement,
1144 cte_context,
1145 plan,
1146 &mut exec_context,
1147 )
1148 }
1149
1150 fn build_view_with_context_and_plan_and_exec(
1152 &self,
1153 table: Arc<DataTable>,
1154 statement: SelectStatement,
1155 cte_context: &mut HashMap<String, Arc<DataView>>,
1156 plan: &mut ExecutionPlanBuilder,
1157 exec_context: &mut ExecutionContext,
1158 ) -> Result<DataView> {
1159 for cte in &statement.ctes {
1161 if cte_context.contains_key(&cte.name) {
1163 debug!(
1164 "QueryEngine: CTE '{}' already in context, skipping",
1165 cte.name
1166 );
1167 continue;
1168 }
1169
1170 debug!("QueryEngine: Processing CTE '{}'...", cte.name);
1171 debug!(
1172 "QueryEngine: Available CTEs for '{}': {:?}",
1173 cte.name,
1174 cte_context.keys().collect::<Vec<_>>()
1175 );
1176
1177 let cte_result = match &cte.cte_type {
1179 CTEType::Standard(query) => {
1180 let view =
1181 self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
1182
1183 let mut materialized = self.materialize_view(view)?;
1185
1186 for column in materialized.columns_mut() {
1188 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1189 column.source_table = Some(cte.name.clone());
1190 }
1191
1192 DataView::new(Arc::new(materialized))
1193 }
1194 CTEType::Web(_web_spec) => {
1195 return Err(anyhow!(
1197 "Web CTEs should be processed in execute_select method"
1198 ));
1199 }
1200 CTEType::File(_file_spec) => {
1201 return Err(anyhow!(
1203 "FILE CTEs should be processed in execute_select method"
1204 ));
1205 }
1206 };
1207
1208 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1210 debug!(
1211 "QueryEngine: CTE '{}' processed, stored in context",
1212 cte.name
1213 );
1214 }
1215
1216 let source_table = if let Some(ref from_source) = statement.from_source {
1218 match from_source {
1219 TableSource::Table(table_name) => {
1220 if let Some(cte_view) = resolve_cte(cte_context, table_name) {
1222 debug!("QueryEngine: Using CTE '{}' as source table", table_name);
1223 let mut materialized = self.materialize_view((**cte_view).clone())?;
1225
1226 #[allow(deprecated)]
1228 if let Some(ref alias) = statement.from_alias {
1229 debug!(
1230 "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1231 alias, table_name
1232 );
1233 for column in materialized.columns_mut() {
1234 if let Some(ref qualified_name) = column.qualified_name {
1236 if qualified_name.starts_with(&format!("{}.", table_name)) {
1237 column.qualified_name = Some(qualified_name.replace(
1238 &format!("{}.", table_name),
1239 &format!("{}.", alias),
1240 ));
1241 }
1242 }
1243 if column.source_table.as_ref() == Some(table_name) {
1245 column.source_table = Some(alias.clone());
1246 }
1247 }
1248 }
1249
1250 Arc::new(materialized)
1251 } else {
1252 table.clone()
1254 }
1255 }
1256 TableSource::DerivedTable { query, alias } => {
1257 debug!(
1259 "QueryEngine: Processing FROM derived table (alias: {})",
1260 alias
1261 );
1262 let subquery_result =
1263 self.build_view_with_context(table.clone(), *query.clone(), cte_context)?;
1264
1265 let mut materialized = self.materialize_view(subquery_result)?;
1268
1269 for column in materialized.columns_mut() {
1273 column.source_table = Some(alias.clone());
1274 }
1275
1276 Arc::new(materialized)
1277 }
1278 TableSource::Pivot { .. } => {
1279 return Err(anyhow!(
1281 "PIVOT in FROM clause should have been expanded by preprocessing pipeline"
1282 ));
1283 }
1284 }
1285 } else {
1286 #[allow(deprecated)]
1288 if let Some(ref table_func) = statement.from_function {
1289 debug!("QueryEngine: Processing table function (deprecated field)...");
1291 match table_func {
1292 TableFunction::Generator { name, args } => {
1293 use crate::sql::generators::GeneratorRegistry;
1295
1296 let registry = GeneratorRegistry::new();
1298
1299 if let Some(generator) = registry.get(name) {
1300 let mut evaluator = ArithmeticEvaluator::with_date_notation(
1302 &table,
1303 self.date_notation.clone(),
1304 );
1305 let dummy_row = 0;
1306
1307 let mut evaluated_args = Vec::new();
1308 for arg in args {
1309 evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
1310 }
1311
1312 generator.generate(evaluated_args)?
1314 } else {
1315 return Err(anyhow!("Unknown generator function: {}", name));
1316 }
1317 }
1318 }
1319 } else {
1320 #[allow(deprecated)]
1321 if let Some(ref subquery) = statement.from_subquery {
1322 debug!("QueryEngine: Processing FROM subquery (deprecated field)...");
1324 let subquery_result = self.build_view_with_context(
1325 table.clone(),
1326 *subquery.clone(),
1327 cte_context,
1328 )?;
1329
1330 let materialized = self.materialize_view(subquery_result)?;
1333 Arc::new(materialized)
1334 } else {
1335 #[allow(deprecated)]
1336 if let Some(ref table_name) = statement.from_table {
1337 if let Some(cte_view) = resolve_cte(cte_context, table_name) {
1339 debug!(
1340 "QueryEngine: Using CTE '{}' as source table (deprecated field)",
1341 table_name
1342 );
1343 let mut materialized = self.materialize_view((**cte_view).clone())?;
1345
1346 #[allow(deprecated)]
1348 if let Some(ref alias) = statement.from_alias {
1349 debug!(
1350 "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1351 alias, table_name
1352 );
1353 for column in materialized.columns_mut() {
1354 if let Some(ref qualified_name) = column.qualified_name {
1356 if qualified_name.starts_with(&format!("{}.", table_name)) {
1357 column.qualified_name = Some(qualified_name.replace(
1358 &format!("{}.", table_name),
1359 &format!("{}.", alias),
1360 ));
1361 }
1362 }
1363 if column.source_table.as_ref() == Some(table_name) {
1365 column.source_table = Some(alias.clone());
1366 }
1367 }
1368 }
1369
1370 Arc::new(materialized)
1371 } else {
1372 table.clone()
1374 }
1375 } else {
1376 table.clone()
1378 }
1379 }
1380 }
1381 };
1382
1383 #[allow(deprecated)]
1385 if let Some(ref alias) = statement.from_alias {
1386 #[allow(deprecated)]
1387 if let Some(ref table_name) = statement.from_table {
1388 exec_context.register_alias(alias.clone(), table_name.clone());
1389 }
1390 }
1391
1392 let final_table = if !statement.joins.is_empty() {
1394 plan.begin_step(
1395 StepType::Join,
1396 format!("Process {} JOINs", statement.joins.len()),
1397 );
1398 plan.set_rows_in(source_table.row_count());
1399
1400 let join_executor = HashJoinExecutor::new(self.case_insensitive);
1401 let mut current_table = source_table;
1402
1403 for (idx, join_clause) in statement.joins.iter().enumerate() {
1404 let join_start = Instant::now();
1405 plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
1406 plan.add_detail(format!("Type: {:?}", join_clause.join_type));
1407 plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
1408 plan.add_detail(format!(
1409 "Executing {:?} JOIN on {} condition(s)",
1410 join_clause.join_type,
1411 join_clause.condition.conditions.len()
1412 ));
1413
1414 let right_table = match &join_clause.table {
1416 TableSource::Table(name) => {
1417 if let Some(cte_view) = resolve_cte(cte_context, name) {
1419 let mut materialized = self.materialize_view((**cte_view).clone())?;
1420
1421 if let Some(ref alias) = join_clause.alias {
1423 debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
1424 for column in materialized.columns_mut() {
1425 if let Some(ref qualified_name) = column.qualified_name {
1427 if qualified_name.starts_with(&format!("{}.", name)) {
1428 column.qualified_name = Some(qualified_name.replace(
1429 &format!("{}.", name),
1430 &format!("{}.", alias),
1431 ));
1432 }
1433 }
1434 if column.source_table.as_ref() == Some(name) {
1436 column.source_table = Some(alias.clone());
1437 }
1438 }
1439 }
1440
1441 Arc::new(materialized)
1442 } else {
1443 return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
1446 }
1447 }
1448 TableSource::DerivedTable { query, alias: _ } => {
1449 let subquery_result = self.build_view_with_context(
1451 table.clone(),
1452 *query.clone(),
1453 cte_context,
1454 )?;
1455 let materialized = self.materialize_view(subquery_result)?;
1456 Arc::new(materialized)
1457 }
1458 TableSource::Pivot { .. } => {
1459 return Err(anyhow!("PIVOT in JOIN clause is not yet supported"));
1461 }
1462 };
1463
1464 let joined = join_executor.execute_join(
1466 current_table.clone(),
1467 join_clause,
1468 right_table.clone(),
1469 )?;
1470
1471 plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
1472 plan.set_rows_out(joined.row_count());
1473 plan.add_detail(format!("Result: {} rows", joined.row_count()));
1474 plan.add_detail(format!(
1475 "Join time: {:.3}ms",
1476 join_start.elapsed().as_secs_f64() * 1000.0
1477 ));
1478 plan.end_step();
1479
1480 current_table = Arc::new(joined);
1481 }
1482
1483 plan.set_rows_out(current_table.row_count());
1484 plan.add_detail(format!(
1485 "Final result after all joins: {} rows",
1486 current_table.row_count()
1487 ));
1488 plan.end_step();
1489 current_table
1490 } else {
1491 source_table
1492 };
1493
1494 self.build_view_internal_with_plan_and_exec(
1496 final_table,
1497 statement,
1498 plan,
1499 Some(exec_context),
1500 )
1501 }
1502
1503 pub fn materialize_view(&self, view: DataView) -> Result<DataTable> {
1505 let source = view.source();
1506 let mut result_table = DataTable::new("derived");
1507
1508 let visible_cols = view.visible_column_indices().to_vec();
1510
1511 for col_idx in &visible_cols {
1513 let col = &source.columns[*col_idx];
1514 let new_col = DataColumn {
1515 name: col.name.clone(),
1516 data_type: col.data_type.clone(),
1517 nullable: col.nullable,
1518 unique_values: col.unique_values,
1519 null_count: col.null_count,
1520 metadata: col.metadata.clone(),
1521 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), };
1524 result_table.add_column(new_col);
1525 }
1526
1527 for row_idx in view.visible_row_indices() {
1529 let source_row = &source.rows[*row_idx];
1530 let mut new_row = DataRow { values: Vec::new() };
1531
1532 for col_idx in &visible_cols {
1533 new_row.values.push(source_row.values[*col_idx].clone());
1534 }
1535
1536 result_table.add_row(new_row);
1537 }
1538
1539 Ok(result_table)
1540 }
1541
1542 fn build_view_internal(
1543 &self,
1544 table: Arc<DataTable>,
1545 statement: SelectStatement,
1546 ) -> Result<DataView> {
1547 let mut dummy_plan = ExecutionPlanBuilder::new();
1548 self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
1549 }
1550
1551 fn build_view_internal_with_plan(
1552 &self,
1553 table: Arc<DataTable>,
1554 statement: SelectStatement,
1555 plan: &mut ExecutionPlanBuilder,
1556 ) -> Result<DataView> {
1557 self.build_view_internal_with_plan_and_exec(table, statement, plan, None)
1558 }
1559
1560 fn build_view_internal_with_plan_and_exec(
1561 &self,
1562 table: Arc<DataTable>,
1563 statement: SelectStatement,
1564 plan: &mut ExecutionPlanBuilder,
1565 exec_context: Option<&ExecutionContext>,
1566 ) -> Result<DataView> {
1567 debug!(
1568 "QueryEngine::build_view - select_items: {:?}",
1569 statement.select_items
1570 );
1571 debug!(
1572 "QueryEngine::build_view - where_clause: {:?}",
1573 statement.where_clause
1574 );
1575
1576 let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
1578
1579 if let Some(where_clause) = &statement.where_clause {
1581 let total_rows = table.row_count();
1582 debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
1583 debug!("QueryEngine: WHERE clause = {:?}", where_clause);
1584
1585 plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
1586 plan.set_rows_in(total_rows);
1587 plan.add_detail(format!("Input: {} rows", total_rows));
1588
1589 for condition in &where_clause.conditions {
1591 plan.add_detail(format!("Condition: {:?}", condition.expr));
1592 }
1593
1594 let filter_start = Instant::now();
1595 let mut eval_context = EvaluationContext::new(self.case_insensitive);
1597
1598 let mut evaluator = if let Some(exec_ctx) = exec_context {
1600 RecursiveWhereEvaluator::with_both_contexts(&table, &mut eval_context, exec_ctx)
1602 } else {
1603 RecursiveWhereEvaluator::with_context(&table, &mut eval_context)
1604 };
1605
1606 let mut filtered_rows = Vec::new();
1608 for row_idx in visible_rows {
1609 if row_idx < 3 {
1611 debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
1612 }
1613
1614 match evaluator.evaluate(where_clause, row_idx) {
1615 Ok(result) => {
1616 if row_idx < 3 {
1617 debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
1618 }
1619 if result {
1620 filtered_rows.push(row_idx);
1621 }
1622 }
1623 Err(e) => {
1624 if row_idx < 3 {
1625 debug!(
1626 "QueryEngine: WHERE evaluation error for row {}: {}",
1627 row_idx, e
1628 );
1629 }
1630 return Err(e);
1632 }
1633 }
1634 }
1635
1636 let (compilations, cache_hits) = eval_context.get_stats();
1638 if compilations > 0 || cache_hits > 0 {
1639 debug!(
1640 "LIKE pattern cache: {} compilations, {} cache hits",
1641 compilations, cache_hits
1642 );
1643 }
1644 visible_rows = filtered_rows;
1645 let filter_duration = filter_start.elapsed();
1646 info!(
1647 "WHERE clause filtering: {} rows -> {} rows in {:?}",
1648 total_rows,
1649 visible_rows.len(),
1650 filter_duration
1651 );
1652
1653 plan.set_rows_out(visible_rows.len());
1654 plan.add_detail(format!("Output: {} rows", visible_rows.len()));
1655 plan.add_detail(format!(
1656 "Filter time: {:.3}ms",
1657 filter_duration.as_secs_f64() * 1000.0
1658 ));
1659 plan.end_step();
1660 }
1661
1662 let mut view = DataView::new(table.clone());
1664 view = view.with_rows(visible_rows);
1665
1666 if let Some(group_by_exprs) = &statement.group_by {
1668 if !group_by_exprs.is_empty() {
1669 debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
1670
1671 plan.begin_step(
1672 StepType::GroupBy,
1673 format!("GROUP BY {} expressions", group_by_exprs.len()),
1674 );
1675 plan.set_rows_in(view.row_count());
1676 plan.add_detail(format!("Input: {} rows", view.row_count()));
1677 for expr in group_by_exprs {
1678 plan.add_detail(format!("Group by: {:?}", expr));
1679 }
1680
1681 let group_start = Instant::now();
1682 view = self.apply_group_by(
1683 view,
1684 group_by_exprs,
1685 &statement.select_items,
1686 statement.having.as_ref(),
1687 plan,
1688 )?;
1689
1690 use crate::query_plan::having_alias_transformer::HIDDEN_AGG_PREFIX;
1693 let hidden_indices: Vec<usize> = view
1694 .source()
1695 .columns
1696 .iter()
1697 .enumerate()
1698 .filter_map(|(i, c)| {
1699 if c.name.starts_with(HIDDEN_AGG_PREFIX) {
1700 Some(i)
1701 } else {
1702 None
1703 }
1704 })
1705 .collect();
1706 for &idx in hidden_indices.iter().rev() {
1707 view.hide_column(idx);
1708 }
1709
1710 plan.set_rows_out(view.row_count());
1711 plan.add_detail(format!("Output: {} groups", view.row_count()));
1712 plan.add_detail(format!(
1713 "Overall time: {:.3}ms",
1714 group_start.elapsed().as_secs_f64() * 1000.0
1715 ));
1716 plan.end_step();
1717 }
1718 } else {
1719 if !statement.select_items.is_empty() {
1721 let has_non_star_items = statement
1723 .select_items
1724 .iter()
1725 .any(|item| !matches!(item, SelectItem::Star { .. }));
1726
1727 if has_non_star_items || statement.select_items.len() > 1 {
1731 view = self.apply_select_items(
1732 view,
1733 &statement.select_items,
1734 &statement,
1735 exec_context,
1736 plan,
1737 )?;
1738 }
1739 } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
1741 debug!("QueryEngine: Using legacy columns path");
1742 let source_table = view.source();
1745 let column_indices =
1746 self.resolve_column_indices(source_table, &statement.columns)?;
1747 view = view.with_columns(column_indices);
1748 }
1749 }
1750
1751 if statement.distinct {
1753 plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
1754 plan.set_rows_in(view.row_count());
1755 plan.add_detail(format!("Input: {} rows", view.row_count()));
1756
1757 let distinct_start = Instant::now();
1758 view = self.apply_distinct(view)?;
1759
1760 plan.set_rows_out(view.row_count());
1761 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1762 plan.add_detail(format!(
1763 "Distinct time: {:.3}ms",
1764 distinct_start.elapsed().as_secs_f64() * 1000.0
1765 ));
1766 plan.end_step();
1767 }
1768
1769 if let Some(order_by_columns) = &statement.order_by {
1771 if !order_by_columns.is_empty() {
1772 plan.begin_step(
1773 StepType::Sort,
1774 format!("ORDER BY {} columns", order_by_columns.len()),
1775 );
1776 plan.set_rows_in(view.row_count());
1777 for col in order_by_columns {
1778 let expr_str = match &col.expr {
1780 SqlExpression::Column(col_ref) => col_ref.name.clone(),
1781 _ => "expr".to_string(),
1782 };
1783 plan.add_detail(format!("{} {:?}", expr_str, col.direction));
1784 }
1785
1786 let sort_start = Instant::now();
1787 view =
1788 self.apply_multi_order_by_with_context(view, order_by_columns, exec_context)?;
1789
1790 plan.add_detail(format!(
1791 "Sort time: {:.3}ms",
1792 sort_start.elapsed().as_secs_f64() * 1000.0
1793 ));
1794 plan.end_step();
1795 }
1796 }
1797
1798 {
1803 use crate::query_plan::order_by_alias_transformer::HIDDEN_ORDERBY_PREFIX;
1804 let hidden_indices: Vec<usize> = view
1805 .source()
1806 .columns
1807 .iter()
1808 .enumerate()
1809 .filter_map(|(i, c)| {
1810 if c.name.starts_with(HIDDEN_ORDERBY_PREFIX) {
1811 Some(i)
1812 } else {
1813 None
1814 }
1815 })
1816 .collect();
1817 for &idx in hidden_indices.iter().rev() {
1818 view.hide_column(idx);
1819 }
1820 }
1821
1822 if let Some(limit) = statement.limit {
1824 let offset = statement.offset.unwrap_or(0);
1825 plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1826 plan.set_rows_in(view.row_count());
1827 if offset > 0 {
1828 plan.add_detail(format!("OFFSET: {}", offset));
1829 }
1830 view = view.with_limit(limit, offset);
1831 plan.set_rows_out(view.row_count());
1832 plan.add_detail(format!("Output: {} rows", view.row_count()));
1833 plan.end_step();
1834 }
1835
1836 if !statement.set_operations.is_empty() {
1838 plan.begin_step(
1839 StepType::SetOperation,
1840 format!("Process {} set operations", statement.set_operations.len()),
1841 );
1842 plan.set_rows_in(view.row_count());
1843
1844 let mut combined_table = self.materialize_view(view)?;
1846 let first_columns = combined_table.column_names();
1847 let first_column_count = first_columns.len();
1848
1849 let mut needs_deduplication = false;
1851
1852 for (idx, (operation, next_statement)) in statement.set_operations.iter().enumerate() {
1854 let op_start = Instant::now();
1855 plan.begin_step(
1856 StepType::SetOperation,
1857 format!("{:?} operation #{}", operation, idx + 1),
1858 );
1859
1860 let next_view = if let Some(exec_ctx) = exec_context {
1863 self.build_view_internal_with_plan_and_exec(
1864 table.clone(),
1865 *next_statement.clone(),
1866 plan,
1867 Some(exec_ctx),
1868 )?
1869 } else {
1870 self.build_view_internal_with_plan(
1871 table.clone(),
1872 *next_statement.clone(),
1873 plan,
1874 )?
1875 };
1876
1877 let next_table = self.materialize_view(next_view)?;
1879 let next_columns = next_table.column_names();
1880 let next_column_count = next_columns.len();
1881
1882 if first_column_count != next_column_count {
1884 return Err(anyhow!(
1885 "UNION queries must have the same number of columns: first query has {} columns, but query #{} has {} columns",
1886 first_column_count,
1887 idx + 2,
1888 next_column_count
1889 ));
1890 }
1891
1892 for (col_idx, (first_col, next_col)) in
1894 first_columns.iter().zip(next_columns.iter()).enumerate()
1895 {
1896 if !first_col.eq_ignore_ascii_case(next_col) {
1897 debug!(
1898 "UNION column name mismatch at position {}: '{}' vs '{}' (using first query's name)",
1899 col_idx + 1,
1900 first_col,
1901 next_col
1902 );
1903 }
1904 }
1905
1906 plan.add_detail(format!("Left: {} rows", combined_table.row_count()));
1907 plan.add_detail(format!("Right: {} rows", next_table.row_count()));
1908
1909 match operation {
1911 SetOperation::UnionAll => {
1912 for row in next_table.rows.iter() {
1914 combined_table.add_row(row.clone());
1915 }
1916 plan.add_detail(format!(
1917 "Result: {} rows (no deduplication)",
1918 combined_table.row_count()
1919 ));
1920 }
1921 SetOperation::Union => {
1922 for row in next_table.rows.iter() {
1924 combined_table.add_row(row.clone());
1925 }
1926 needs_deduplication = true;
1927 plan.add_detail(format!(
1928 "Combined: {} rows (deduplication pending)",
1929 combined_table.row_count()
1930 ));
1931 }
1932 SetOperation::Intersect => {
1933 return Err(anyhow!("INTERSECT is not yet implemented"));
1936 }
1937 SetOperation::Except => {
1938 return Err(anyhow!("EXCEPT is not yet implemented"));
1941 }
1942 }
1943
1944 plan.add_detail(format!(
1945 "Operation time: {:.3}ms",
1946 op_start.elapsed().as_secs_f64() * 1000.0
1947 ));
1948 plan.set_rows_out(combined_table.row_count());
1949 plan.end_step();
1950 }
1951
1952 plan.set_rows_out(combined_table.row_count());
1953 plan.add_detail(format!(
1954 "Combined result: {} rows after {} operations",
1955 combined_table.row_count(),
1956 statement.set_operations.len()
1957 ));
1958 plan.end_step();
1959
1960 view = DataView::new(Arc::new(combined_table));
1962
1963 if needs_deduplication {
1965 plan.begin_step(
1966 StepType::Distinct,
1967 "UNION deduplication - remove duplicate rows".to_string(),
1968 );
1969 plan.set_rows_in(view.row_count());
1970 plan.add_detail(format!("Input: {} rows", view.row_count()));
1971
1972 let distinct_start = Instant::now();
1973 view = self.apply_distinct(view)?;
1974
1975 plan.set_rows_out(view.row_count());
1976 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1977 plan.add_detail(format!(
1978 "Deduplication time: {:.3}ms",
1979 distinct_start.elapsed().as_secs_f64() * 1000.0
1980 ));
1981 plan.end_step();
1982 }
1983 }
1984
1985 Ok(view)
1986 }
1987
1988 fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1990 let mut indices = Vec::new();
1991 let table_columns = table.column_names();
1992
1993 for col_name in columns {
1994 let index = table_columns
1995 .iter()
1996 .position(|c| c.eq_ignore_ascii_case(col_name))
1997 .ok_or_else(|| {
1998 let suggestion = self.find_similar_column(table, col_name);
1999 match suggestion {
2000 Some(similar) => anyhow::anyhow!(
2001 "Column '{}' not found. Did you mean '{}'?",
2002 col_name,
2003 similar
2004 ),
2005 None => anyhow::anyhow!("Column '{}' not found", col_name),
2006 }
2007 })?;
2008 indices.push(index);
2009 }
2010
2011 Ok(indices)
2012 }
2013
2014 fn apply_select_items(
2016 &self,
2017 view: DataView,
2018 select_items: &[SelectItem],
2019 _statement: &SelectStatement,
2020 exec_context: Option<&ExecutionContext>,
2021 plan: &mut ExecutionPlanBuilder,
2022 ) -> Result<DataView> {
2023 debug!(
2024 "QueryEngine::apply_select_items - items: {:?}",
2025 select_items
2026 );
2027 debug!(
2028 "QueryEngine::apply_select_items - input view has {} rows",
2029 view.row_count()
2030 );
2031
2032 let has_window_functions = select_items.iter().any(|item| match item {
2034 SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
2035 _ => false,
2036 });
2037
2038 let window_func_count: usize = select_items
2040 .iter()
2041 .filter(|item| match item {
2042 SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
2043 _ => false,
2044 })
2045 .count();
2046
2047 let window_start = if has_window_functions {
2049 debug!(
2050 "QueryEngine::apply_select_items - detected {} window functions",
2051 window_func_count
2052 );
2053
2054 let window_specs = Self::extract_window_specs(select_items);
2056 debug!("Extracted {} window function specs", window_specs.len());
2057
2058 Some(Instant::now())
2059 } else {
2060 None
2061 };
2062
2063 let has_unnest = select_items.iter().any(|item| match item {
2065 SelectItem::Expression { expr, .. } => Self::contains_unnest(expr),
2066 _ => false,
2067 });
2068
2069 if has_unnest {
2070 debug!("QueryEngine::apply_select_items - UNNEST detected, using row expansion");
2071 return self.apply_select_with_row_expansion(view, select_items);
2072 }
2073
2074 let has_aggregates = select_items.iter().any(|item| match item {
2078 SelectItem::Expression { expr, .. } => contains_aggregate(expr),
2079 SelectItem::Column { .. } => false,
2080 SelectItem::Star { .. } => false,
2081 SelectItem::StarExclude { .. } => false,
2082 });
2083
2084 let all_aggregate_compatible = select_items.iter().all(|item| match item {
2085 SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
2086 SelectItem::Column { .. } => false, SelectItem::Star { .. } => false, SelectItem::StarExclude { .. } => false, });
2090
2091 if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
2092 debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
2095 return self.apply_aggregate_select(view, select_items);
2096 }
2097
2098 let has_computed_expressions = select_items
2100 .iter()
2101 .any(|item| matches!(item, SelectItem::Expression { .. }));
2102
2103 debug!(
2104 "QueryEngine::apply_select_items - has_computed_expressions: {}",
2105 has_computed_expressions
2106 );
2107
2108 if !has_computed_expressions {
2109 let column_indices = self.resolve_select_columns(view.source(), select_items)?;
2111 return Ok(view.with_columns(column_indices));
2112 }
2113
2114 let source_table = view.source();
2119 let visible_rows = view.visible_row_indices();
2120
2121 let mut computed_table = DataTable::new("query_result");
2124
2125 let mut expanded_items = Vec::new();
2127 for item in select_items {
2128 match item {
2129 SelectItem::Star { table_prefix, .. } => {
2130 if let Some(prefix) = table_prefix {
2131 debug!("QueryEngine::apply_select_items - expanding {}.*", prefix);
2133 for col in &source_table.columns {
2134 if Self::column_matches_table(col, prefix) {
2135 expanded_items.push(SelectItem::Column {
2136 column: ColumnRef::unquoted(col.name.clone()),
2137 leading_comments: vec![],
2138 trailing_comment: None,
2139 });
2140 }
2141 }
2142 } else {
2143 debug!("QueryEngine::apply_select_items - expanding *");
2145 for col_name in source_table.column_names() {
2146 expanded_items.push(SelectItem::Column {
2147 column: ColumnRef::unquoted(col_name.to_string()),
2148 leading_comments: vec![],
2149 trailing_comment: None,
2150 });
2151 }
2152 }
2153 }
2154 _ => expanded_items.push(item.clone()),
2155 }
2156 }
2157
2158 let mut column_name_counts: std::collections::HashMap<String, usize> =
2160 std::collections::HashMap::new();
2161
2162 for item in &expanded_items {
2163 let base_name = match item {
2164 SelectItem::Column {
2165 column: col_ref, ..
2166 } => col_ref.name.clone(),
2167 SelectItem::Expression { alias, .. } => alias.clone(),
2168 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2169 SelectItem::StarExclude { .. } => {
2170 unreachable!("StarExclude should have been expanded")
2171 }
2172 };
2173
2174 let count = column_name_counts.entry(base_name.clone()).or_insert(0);
2176 let column_name = if *count == 0 {
2177 base_name.clone()
2179 } else {
2180 format!("{base_name}_{count}")
2182 };
2183 *count += 1;
2184
2185 computed_table.add_column(DataColumn::new(&column_name));
2186 }
2187
2188 let can_use_batch = expanded_items.iter().all(|item| {
2192 match item {
2193 SelectItem::Expression { expr, .. } => {
2194 matches!(expr, SqlExpression::WindowFunction { .. })
2197 || !Self::contains_window_function(expr)
2198 }
2199 _ => true, }
2201 });
2202
2203 let use_batch_evaluation = can_use_batch
2206 && std::env::var("SQL_CLI_BATCH_WINDOW")
2207 .map(|v| v != "0" && v.to_lowercase() != "false")
2208 .unwrap_or(true);
2209
2210 let batch_window_specs = if use_batch_evaluation && has_window_functions {
2212 debug!("BATCH window function evaluation flag is enabled");
2213 let specs = Self::extract_window_specs(&expanded_items);
2215 debug!(
2216 "Extracted {} window function specs for batch evaluation",
2217 specs.len()
2218 );
2219 Some(specs)
2220 } else {
2221 None
2222 };
2223
2224 let mut evaluator =
2226 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2227
2228 if let Some(exec_ctx) = exec_context {
2230 let aliases = exec_ctx.get_aliases();
2231 if !aliases.is_empty() {
2232 debug!(
2233 "Applying {} aliases to evaluator: {:?}",
2234 aliases.len(),
2235 aliases
2236 );
2237 evaluator = evaluator.with_table_aliases(aliases);
2238 }
2239 }
2240
2241 if has_window_functions {
2244 let preload_start = Instant::now();
2245
2246 let mut window_specs = Vec::new();
2248 for item in &expanded_items {
2249 if let SelectItem::Expression { expr, .. } = item {
2250 Self::collect_window_specs(expr, &mut window_specs);
2251 }
2252 }
2253
2254 for spec in &window_specs {
2256 let _ = evaluator.get_or_create_window_context(spec);
2257 }
2258
2259 debug!(
2260 "Pre-created {} WindowContext(s) in {:.2}ms",
2261 window_specs.len(),
2262 preload_start.elapsed().as_secs_f64() * 1000.0
2263 );
2264 }
2265
2266 if let Some(window_specs) = batch_window_specs {
2268 debug!("Starting batch window function evaluation");
2269 let batch_start = Instant::now();
2270
2271 let mut batch_results: Vec<Vec<DataValue>> =
2273 vec![vec![DataValue::Null; expanded_items.len()]; visible_rows.len()];
2274
2275 let detailed_window_specs = &window_specs;
2277
2278 let mut specs_by_window: HashMap<
2280 u64,
2281 Vec<&crate::data::batch_window_evaluator::WindowFunctionSpec>,
2282 > = HashMap::new();
2283 for spec in detailed_window_specs {
2284 let hash = spec.spec.compute_hash();
2285 specs_by_window
2286 .entry(hash)
2287 .or_insert_with(Vec::new)
2288 .push(spec);
2289 }
2290
2291 for (_window_hash, specs) in specs_by_window {
2293 let context = evaluator.get_or_create_window_context(&specs[0].spec)?;
2295
2296 for spec in specs {
2298 match spec.function_name.as_str() {
2299 "LAG" => {
2300 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2302 let column_name = col_ref.name.as_str();
2303 let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2304 spec.args.get(1)
2305 {
2306 n.parse::<i64>().unwrap_or(1)
2307 } else {
2308 1 };
2310
2311 let values = context.evaluate_lag_batch(
2312 visible_rows,
2313 column_name,
2314 offset,
2315 )?;
2316
2317 for (row_idx, value) in values.into_iter().enumerate() {
2319 batch_results[row_idx][spec.output_column_index] = value;
2320 }
2321 }
2322 }
2323 "LEAD" => {
2324 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2326 let column_name = col_ref.name.as_str();
2327 let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2328 spec.args.get(1)
2329 {
2330 n.parse::<i64>().unwrap_or(1)
2331 } else {
2332 1 };
2334
2335 let values = context.evaluate_lead_batch(
2336 visible_rows,
2337 column_name,
2338 offset,
2339 )?;
2340
2341 for (row_idx, value) in values.into_iter().enumerate() {
2343 batch_results[row_idx][spec.output_column_index] = value;
2344 }
2345 }
2346 }
2347 "ROW_NUMBER" => {
2348 let values = context.evaluate_row_number_batch(visible_rows)?;
2349
2350 for (row_idx, value) in values.into_iter().enumerate() {
2352 batch_results[row_idx][spec.output_column_index] = value;
2353 }
2354 }
2355 "RANK" => {
2356 let values = context.evaluate_rank_batch(visible_rows)?;
2357
2358 for (row_idx, value) in values.into_iter().enumerate() {
2360 batch_results[row_idx][spec.output_column_index] = value;
2361 }
2362 }
2363 "DENSE_RANK" => {
2364 let values = context.evaluate_dense_rank_batch(visible_rows)?;
2365
2366 for (row_idx, value) in values.into_iter().enumerate() {
2368 batch_results[row_idx][spec.output_column_index] = value;
2369 }
2370 }
2371 "SUM" => {
2372 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2373 let column_name = col_ref.name.as_str();
2374 let values =
2375 context.evaluate_sum_batch(visible_rows, column_name)?;
2376
2377 for (row_idx, value) in values.into_iter().enumerate() {
2378 batch_results[row_idx][spec.output_column_index] = value;
2379 }
2380 }
2381 }
2382 "AVG" => {
2383 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2384 let column_name = col_ref.name.as_str();
2385 let values =
2386 context.evaluate_avg_batch(visible_rows, column_name)?;
2387
2388 for (row_idx, value) in values.into_iter().enumerate() {
2389 batch_results[row_idx][spec.output_column_index] = value;
2390 }
2391 }
2392 }
2393 "MIN" => {
2394 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2395 let column_name = col_ref.name.as_str();
2396 let values =
2397 context.evaluate_min_batch(visible_rows, column_name)?;
2398
2399 for (row_idx, value) in values.into_iter().enumerate() {
2400 batch_results[row_idx][spec.output_column_index] = value;
2401 }
2402 }
2403 }
2404 "MAX" => {
2405 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2406 let column_name = col_ref.name.as_str();
2407 let values =
2408 context.evaluate_max_batch(visible_rows, column_name)?;
2409
2410 for (row_idx, value) in values.into_iter().enumerate() {
2411 batch_results[row_idx][spec.output_column_index] = value;
2412 }
2413 }
2414 }
2415 "COUNT" => {
2416 let column_name = match spec.args.get(0) {
2418 Some(SqlExpression::Column(col_ref)) => Some(col_ref.name.as_str()),
2419 Some(SqlExpression::StringLiteral(s)) if s == "*" => None,
2420 _ => None,
2421 };
2422
2423 let values = context.evaluate_count_batch(visible_rows, column_name)?;
2424
2425 for (row_idx, value) in values.into_iter().enumerate() {
2426 batch_results[row_idx][spec.output_column_index] = value;
2427 }
2428 }
2429 "FIRST_VALUE" => {
2430 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2431 let column_name = col_ref.name.as_str();
2432 let values = context
2433 .evaluate_first_value_batch(visible_rows, column_name)?;
2434
2435 for (row_idx, value) in values.into_iter().enumerate() {
2436 batch_results[row_idx][spec.output_column_index] = value;
2437 }
2438 }
2439 }
2440 "LAST_VALUE" => {
2441 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2442 let column_name = col_ref.name.as_str();
2443 let values =
2444 context.evaluate_last_value_batch(visible_rows, column_name)?;
2445
2446 for (row_idx, value) in values.into_iter().enumerate() {
2447 batch_results[row_idx][spec.output_column_index] = value;
2448 }
2449 }
2450 }
2451 _ => {
2452 debug!(
2454 "Window function {} not supported in batch mode, using per-row",
2455 spec.function_name
2456 );
2457 }
2458 }
2459 }
2460 }
2461
2462 for (result_row_idx, &source_row_idx) in visible_rows.iter().enumerate() {
2464 for (col_idx, item) in expanded_items.iter().enumerate() {
2465 if !matches!(batch_results[result_row_idx][col_idx], DataValue::Null) {
2467 continue;
2468 }
2469
2470 let value = match item {
2471 SelectItem::Column {
2472 column: col_ref, ..
2473 } => {
2474 match evaluator
2475 .evaluate(&SqlExpression::Column(col_ref.clone()), source_row_idx)
2476 {
2477 Ok(val) => val,
2478 Err(e) => {
2479 return Err(anyhow!(
2480 "Failed to evaluate column {}: {}",
2481 col_ref.to_sql(),
2482 e
2483 ));
2484 }
2485 }
2486 }
2487 SelectItem::Expression { expr, .. } => {
2488 if matches!(expr, SqlExpression::WindowFunction { .. }) {
2491 continue;
2493 }
2494 evaluator.evaluate(&expr, source_row_idx)?
2497 }
2498 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2499 SelectItem::StarExclude { .. } => {
2500 unreachable!("StarExclude should have been expanded")
2501 }
2502 };
2503 batch_results[result_row_idx][col_idx] = value;
2504 }
2505 }
2506
2507 for row_values in batch_results {
2509 computed_table
2510 .add_row(DataRow::new(row_values))
2511 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2512 }
2513
2514 debug!(
2515 "Batch window evaluation completed in {:.3}ms",
2516 batch_start.elapsed().as_secs_f64() * 1000.0
2517 );
2518 } else {
2519 for &row_idx in visible_rows {
2521 let mut row_values = Vec::new();
2522
2523 for item in &expanded_items {
2524 let value = match item {
2525 SelectItem::Column {
2526 column: col_ref, ..
2527 } => {
2528 match evaluator
2530 .evaluate(&SqlExpression::Column(col_ref.clone()), row_idx)
2531 {
2532 Ok(val) => val,
2533 Err(e) => {
2534 return Err(anyhow!(
2535 "Failed to evaluate column {}: {}",
2536 col_ref.to_sql(),
2537 e
2538 ));
2539 }
2540 }
2541 }
2542 SelectItem::Expression { expr, .. } => {
2543 evaluator.evaluate(&expr, row_idx)?
2545 }
2546 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2547 SelectItem::StarExclude { .. } => {
2548 unreachable!("StarExclude should have been expanded")
2549 }
2550 };
2551 row_values.push(value);
2552 }
2553
2554 computed_table
2555 .add_row(DataRow::new(row_values))
2556 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2557 }
2558 }
2559
2560 if let Some(start) = window_start {
2562 let window_duration = start.elapsed();
2563 info!(
2564 "Window function evaluation took {:.2}ms for {} rows ({} window functions)",
2565 window_duration.as_secs_f64() * 1000.0,
2566 visible_rows.len(),
2567 window_func_count
2568 );
2569
2570 plan.begin_step(
2572 StepType::WindowFunction,
2573 format!("Evaluate {} window function(s)", window_func_count),
2574 );
2575 plan.set_rows_in(visible_rows.len());
2576 plan.set_rows_out(visible_rows.len());
2577 plan.add_detail(format!("Input: {} rows", visible_rows.len()));
2578 plan.add_detail(format!("{} window functions evaluated", window_func_count));
2579 plan.add_detail(format!(
2580 "Evaluation time: {:.3}ms",
2581 window_duration.as_secs_f64() * 1000.0
2582 ));
2583 plan.end_step();
2584 }
2585
2586 Ok(DataView::new(Arc::new(computed_table)))
2589 }
2590
2591 fn apply_select_with_row_expansion(
2593 &self,
2594 view: DataView,
2595 select_items: &[SelectItem],
2596 ) -> Result<DataView> {
2597 debug!("QueryEngine::apply_select_with_row_expansion - expanding rows");
2598
2599 let source_table = view.source();
2600 let visible_rows = view.visible_row_indices();
2601 let expander_registry = RowExpanderRegistry::new();
2602
2603 let mut result_table = DataTable::new("unnest_result");
2605
2606 let mut expanded_items = Vec::new();
2608 for item in select_items {
2609 match item {
2610 SelectItem::Star { table_prefix, .. } => {
2611 if let Some(prefix) = table_prefix {
2612 debug!(
2614 "QueryEngine::apply_select_with_row_expansion - expanding {}.*",
2615 prefix
2616 );
2617 for col in &source_table.columns {
2618 if Self::column_matches_table(col, prefix) {
2619 expanded_items.push(SelectItem::Column {
2620 column: ColumnRef::unquoted(col.name.clone()),
2621 leading_comments: vec![],
2622 trailing_comment: None,
2623 });
2624 }
2625 }
2626 } else {
2627 debug!("QueryEngine::apply_select_with_row_expansion - expanding *");
2629 for col_name in source_table.column_names() {
2630 expanded_items.push(SelectItem::Column {
2631 column: ColumnRef::unquoted(col_name.to_string()),
2632 leading_comments: vec![],
2633 trailing_comment: None,
2634 });
2635 }
2636 }
2637 }
2638 _ => expanded_items.push(item.clone()),
2639 }
2640 }
2641
2642 for item in &expanded_items {
2644 let column_name = match item {
2645 SelectItem::Column {
2646 column: col_ref, ..
2647 } => col_ref.name.clone(),
2648 SelectItem::Expression { alias, .. } => alias.clone(),
2649 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2650 SelectItem::StarExclude { .. } => {
2651 unreachable!("StarExclude should have been expanded")
2652 }
2653 };
2654 result_table.add_column(DataColumn::new(&column_name));
2655 }
2656
2657 let mut evaluator =
2659 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2660
2661 for &row_idx in visible_rows {
2662 let mut unnest_expansions = Vec::new();
2664 let mut unnest_indices = Vec::new();
2665
2666 for (col_idx, item) in expanded_items.iter().enumerate() {
2667 if let SelectItem::Expression { expr, .. } = item {
2668 if let Some(expansion_result) = self.try_expand_unnest(
2669 &expr,
2670 source_table,
2671 row_idx,
2672 &mut evaluator,
2673 &expander_registry,
2674 )? {
2675 unnest_expansions.push(expansion_result);
2676 unnest_indices.push(col_idx);
2677 }
2678 }
2679 }
2680
2681 let expansion_count = if unnest_expansions.is_empty() {
2683 1 } else {
2685 unnest_expansions
2686 .iter()
2687 .map(|exp| exp.row_count())
2688 .max()
2689 .unwrap_or(1)
2690 };
2691
2692 for output_idx in 0..expansion_count {
2694 let mut row_values = Vec::new();
2695
2696 for (col_idx, item) in expanded_items.iter().enumerate() {
2697 let unnest_position = unnest_indices.iter().position(|&idx| idx == col_idx);
2699
2700 let value = if let Some(unnest_idx) = unnest_position {
2701 let expansion = &unnest_expansions[unnest_idx];
2703 expansion
2704 .values
2705 .get(output_idx)
2706 .cloned()
2707 .unwrap_or(DataValue::Null)
2708 } else {
2709 match item {
2711 SelectItem::Column {
2712 column: col_ref, ..
2713 } => {
2714 let col_idx =
2715 source_table.get_column_index(&col_ref.name).ok_or_else(
2716 || anyhow::anyhow!("Column '{}' not found", col_ref.name),
2717 )?;
2718 let row = source_table
2719 .get_row(row_idx)
2720 .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
2721 row.get(col_idx)
2722 .ok_or_else(|| {
2723 anyhow::anyhow!("Column {} not found in row", col_idx)
2724 })?
2725 .clone()
2726 }
2727 SelectItem::Expression { expr, .. } => {
2728 evaluator.evaluate(&expr, row_idx)?
2730 }
2731 SelectItem::Star { .. } => unreachable!(),
2732 SelectItem::StarExclude { .. } => {
2733 unreachable!("StarExclude should have been expanded")
2734 }
2735 }
2736 };
2737
2738 row_values.push(value);
2739 }
2740
2741 result_table
2742 .add_row(DataRow::new(row_values))
2743 .map_err(|e| anyhow::anyhow!("Failed to add expanded row: {}", e))?;
2744 }
2745 }
2746
2747 debug!(
2748 "QueryEngine::apply_select_with_row_expansion - input rows: {}, output rows: {}",
2749 visible_rows.len(),
2750 result_table.row_count()
2751 );
2752
2753 Ok(DataView::new(Arc::new(result_table)))
2754 }
2755
2756 fn try_expand_unnest(
2759 &self,
2760 expr: &SqlExpression,
2761 _source_table: &DataTable,
2762 row_idx: usize,
2763 evaluator: &mut ArithmeticEvaluator,
2764 expander_registry: &RowExpanderRegistry,
2765 ) -> Result<Option<crate::data::row_expanders::ExpansionResult>> {
2766 if let SqlExpression::Unnest { column, delimiter } = expr {
2768 let column_value = evaluator.evaluate(column, row_idx)?;
2770
2771 let delimiter_value = DataValue::String(delimiter.clone());
2773
2774 let expander = expander_registry
2776 .get("UNNEST")
2777 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2778
2779 let expansion = expander.expand(&column_value, &[delimiter_value])?;
2781 return Ok(Some(expansion));
2782 }
2783
2784 if let SqlExpression::FunctionCall { name, args, .. } = expr {
2786 if name.to_uppercase() == "UNNEST" {
2787 if args.len() != 2 {
2789 return Err(anyhow::anyhow!(
2790 "UNNEST requires exactly 2 arguments: UNNEST(column, delimiter)"
2791 ));
2792 }
2793
2794 let column_value = evaluator.evaluate(&args[0], row_idx)?;
2796
2797 let delimiter_value = evaluator.evaluate(&args[1], row_idx)?;
2799
2800 let expander = expander_registry
2802 .get("UNNEST")
2803 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2804
2805 let expansion = expander.expand(&column_value, &[delimiter_value])?;
2807 return Ok(Some(expansion));
2808 }
2809 }
2810
2811 Ok(None)
2812 }
2813
2814 fn apply_aggregate_select(
2816 &self,
2817 view: DataView,
2818 select_items: &[SelectItem],
2819 ) -> Result<DataView> {
2820 debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
2821
2822 let source_table = view.source();
2823 let mut result_table = DataTable::new("aggregate_result");
2824
2825 for item in select_items {
2827 let column_name = match item {
2828 SelectItem::Expression { alias, .. } => alias.clone(),
2829 _ => unreachable!("Should only have expressions in aggregate-only query"),
2830 };
2831 result_table.add_column(DataColumn::new(&column_name));
2832 }
2833
2834 let visible_rows = view.visible_row_indices().to_vec();
2836 let mut evaluator =
2837 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
2838 .with_visible_rows(visible_rows);
2839
2840 let mut row_values = Vec::new();
2842 for item in select_items {
2843 match item {
2844 SelectItem::Expression { expr, .. } => {
2845 let value = evaluator.evaluate(expr, 0)?;
2848 row_values.push(value);
2849 }
2850 _ => unreachable!("Should only have expressions in aggregate-only query"),
2851 }
2852 }
2853
2854 result_table
2856 .add_row(DataRow::new(row_values))
2857 .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
2858
2859 Ok(DataView::new(Arc::new(result_table)))
2860 }
2861
2862 fn column_matches_table(col: &DataColumn, table_name: &str) -> bool {
2874 if let Some(ref source) = col.source_table {
2876 if source == table_name || source.ends_with(&format!(".{}", table_name)) {
2878 return true;
2879 }
2880 }
2881
2882 if let Some(ref qualified) = col.qualified_name {
2884 if qualified.starts_with(&format!("{}.", table_name)) {
2886 return true;
2887 }
2888 }
2889
2890 false
2891 }
2892
2893 fn resolve_select_columns(
2895 &self,
2896 table: &DataTable,
2897 select_items: &[SelectItem],
2898 ) -> Result<Vec<usize>> {
2899 let mut indices = Vec::new();
2900 let table_columns = table.column_names();
2901
2902 for item in select_items {
2903 match item {
2904 SelectItem::Column {
2905 column: col_ref, ..
2906 } => {
2907 let index = if let Some(table_prefix) = &col_ref.table_prefix {
2909 let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
2918 table.find_column_by_qualified_name(&qualified_name)
2919 .or_else(|| {
2920 table_columns
2921 .iter()
2922 .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
2923 })
2924 .ok_or_else(|| {
2925 let has_qualified = table.columns.iter()
2927 .any(|c| c.qualified_name.is_some());
2928 if !has_qualified {
2929 anyhow::anyhow!(
2930 "Column '{}' not found. Note: Table '{}' may not support qualified column names",
2931 qualified_name, table_prefix
2932 )
2933 } else {
2934 anyhow::anyhow!("Column '{}' not found", qualified_name)
2935 }
2936 })?
2937 } else {
2938 table_columns
2940 .iter()
2941 .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
2942 .ok_or_else(|| {
2943 let suggestion = self.find_similar_column(table, &col_ref.name);
2944 match suggestion {
2945 Some(similar) => anyhow::anyhow!(
2946 "Column '{}' not found. Did you mean '{}'?",
2947 col_ref.name,
2948 similar
2949 ),
2950 None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
2951 }
2952 })?
2953 };
2954 indices.push(index);
2955 }
2956 SelectItem::Star { table_prefix, .. } => {
2957 if let Some(prefix) = table_prefix {
2958 for (i, col) in table.columns.iter().enumerate() {
2960 if Self::column_matches_table(col, prefix) {
2961 indices.push(i);
2962 }
2963 }
2964 } else {
2965 for i in 0..table_columns.len() {
2967 indices.push(i);
2968 }
2969 }
2970 }
2971 SelectItem::StarExclude {
2972 table_prefix,
2973 excluded_columns,
2974 ..
2975 } => {
2976 if let Some(prefix) = table_prefix {
2978 for (i, col) in table.columns.iter().enumerate() {
2980 if Self::column_matches_table(col, prefix)
2981 && !excluded_columns.contains(&col.name)
2982 {
2983 indices.push(i);
2984 }
2985 }
2986 } else {
2987 for (i, col_name) in table_columns.iter().enumerate() {
2989 if !excluded_columns
2990 .iter()
2991 .any(|exc| exc.eq_ignore_ascii_case(col_name))
2992 {
2993 indices.push(i);
2994 }
2995 }
2996 }
2997 }
2998 SelectItem::Expression { .. } => {
2999 return Err(anyhow::anyhow!(
3000 "Computed expressions require new table creation"
3001 ));
3002 }
3003 }
3004 }
3005
3006 Ok(indices)
3007 }
3008
3009 fn apply_distinct(&self, view: DataView) -> Result<DataView> {
3011 use std::collections::HashSet;
3012
3013 let source = view.source();
3014 let visible_cols = view.visible_column_indices();
3015 let visible_rows = view.visible_row_indices();
3016
3017 let mut seen_rows = HashSet::new();
3019 let mut unique_row_indices = Vec::new();
3020
3021 for &row_idx in visible_rows {
3022 let mut row_key = Vec::new();
3024 for &col_idx in visible_cols {
3025 let value = source
3026 .get_value(row_idx, col_idx)
3027 .ok_or_else(|| anyhow!("Invalid cell reference"))?;
3028 row_key.push(format!("{:?}", value));
3030 }
3031
3032 if seen_rows.insert(row_key) {
3034 unique_row_indices.push(row_idx);
3036 }
3037 }
3038
3039 Ok(view.with_rows(unique_row_indices))
3041 }
3042
3043 fn apply_multi_order_by(
3045 &self,
3046 view: DataView,
3047 order_by_columns: &[OrderByItem],
3048 ) -> Result<DataView> {
3049 self.apply_multi_order_by_with_context(view, order_by_columns, None)
3050 }
3051
3052 fn apply_multi_order_by_with_context(
3054 &self,
3055 mut view: DataView,
3056 order_by_columns: &[OrderByItem],
3057 _exec_context: Option<&ExecutionContext>,
3058 ) -> Result<DataView> {
3059 let mut sort_columns = Vec::new();
3061
3062 for order_col in order_by_columns {
3063 let column_name = match &order_col.expr {
3065 SqlExpression::Column(col_ref) => col_ref.name.clone(),
3066 _ => {
3067 return Err(anyhow!(
3069 "ORDER BY expressions not yet supported - only simple columns allowed"
3070 ));
3071 }
3072 };
3073
3074 let col_index = if column_name.contains('.') {
3076 if let Some(dot_pos) = column_name.rfind('.') {
3078 let col_name = &column_name[dot_pos + 1..];
3079
3080 debug!(
3083 "ORDER BY: Extracting unqualified column '{}' from '{}'",
3084 col_name, column_name
3085 );
3086 view.source().get_column_index(col_name)
3087 } else {
3088 view.source().get_column_index(&column_name)
3089 }
3090 } else {
3091 view.source().get_column_index(&column_name)
3093 }
3094 .ok_or_else(|| {
3095 let suggestion = self.find_similar_column(view.source(), &column_name);
3097 match suggestion {
3098 Some(similar) => anyhow::anyhow!(
3099 "Column '{}' not found. Did you mean '{}'?",
3100 column_name,
3101 similar
3102 ),
3103 None => {
3104 let available_cols = view.source().column_names().join(", ");
3106 anyhow::anyhow!(
3107 "Column '{}' not found. Available columns: {}",
3108 column_name,
3109 available_cols
3110 )
3111 }
3112 }
3113 })?;
3114
3115 let ascending = matches!(order_col.direction, SortDirection::Asc);
3116 sort_columns.push((col_index, ascending));
3117 }
3118
3119 view.apply_multi_sort(&sort_columns)?;
3121 Ok(view)
3122 }
3123
3124 fn apply_group_by(
3126 &self,
3127 view: DataView,
3128 group_by_exprs: &[SqlExpression],
3129 select_items: &[SelectItem],
3130 having: Option<&SqlExpression>,
3131 plan: &mut ExecutionPlanBuilder,
3132 ) -> Result<DataView> {
3133 let (result_view, phase_info) = self.apply_group_by_expressions(
3135 view,
3136 group_by_exprs,
3137 select_items,
3138 having,
3139 self.case_insensitive,
3140 self.date_notation.clone(),
3141 )?;
3142
3143 plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
3145 plan.add_detail(format!(
3146 "Phase 1 - Group Building: {:.3}ms",
3147 phase_info.phase2_key_building.as_secs_f64() * 1000.0
3148 ));
3149 plan.add_detail(format!(
3150 " • Processing {} rows into {} groups",
3151 phase_info.total_rows, phase_info.num_groups
3152 ));
3153 plan.add_detail(format!(
3154 "Phase 2 - Aggregation: {:.3}ms",
3155 phase_info.phase4_aggregation.as_secs_f64() * 1000.0
3156 ));
3157 if phase_info.phase4_having_evaluation > Duration::ZERO {
3158 plan.add_detail(format!(
3159 "Phase 3 - HAVING Filter: {:.3}ms",
3160 phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
3161 ));
3162 plan.add_detail(format!(
3163 " • Filtered {} groups",
3164 phase_info.groups_filtered_by_having
3165 ));
3166 }
3167 plan.add_detail(format!(
3168 "Total GROUP BY time: {:.3}ms",
3169 phase_info.total_time.as_secs_f64() * 1000.0
3170 ));
3171
3172 Ok(result_view)
3173 }
3174
3175 pub fn estimate_group_cardinality(
3178 &self,
3179 view: &DataView,
3180 group_by_exprs: &[SqlExpression],
3181 ) -> usize {
3182 let row_count = view.get_visible_rows().len();
3184 if row_count <= 100 {
3185 return row_count;
3186 }
3187
3188 let sample_size = min(1000, row_count / 10).max(100);
3190 let mut seen = FxHashSet::default();
3191
3192 let visible_rows = view.get_visible_rows();
3193 for (i, &row_idx) in visible_rows.iter().enumerate() {
3194 if i >= sample_size {
3195 break;
3196 }
3197
3198 let mut key_values = Vec::new();
3200 for expr in group_by_exprs {
3201 let mut evaluator = ArithmeticEvaluator::new(view.source());
3202 let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
3203 key_values.push(value);
3204 }
3205
3206 seen.insert(key_values);
3207 }
3208
3209 let sample_cardinality = seen.len();
3211 let estimated = (sample_cardinality * row_count) / sample_size;
3212
3213 estimated.min(row_count).max(sample_cardinality)
3215 }
3216}
3217
3218#[cfg(test)]
3219mod tests {
3220 use super::*;
3221 use crate::data::datatable::{DataColumn, DataRow, DataValue};
3222
3223 fn create_test_table() -> Arc<DataTable> {
3224 let mut table = DataTable::new("test");
3225
3226 table.add_column(DataColumn::new("id"));
3228 table.add_column(DataColumn::new("name"));
3229 table.add_column(DataColumn::new("age"));
3230
3231 table
3233 .add_row(DataRow::new(vec![
3234 DataValue::Integer(1),
3235 DataValue::String("Alice".to_string()),
3236 DataValue::Integer(30),
3237 ]))
3238 .unwrap();
3239
3240 table
3241 .add_row(DataRow::new(vec![
3242 DataValue::Integer(2),
3243 DataValue::String("Bob".to_string()),
3244 DataValue::Integer(25),
3245 ]))
3246 .unwrap();
3247
3248 table
3249 .add_row(DataRow::new(vec![
3250 DataValue::Integer(3),
3251 DataValue::String("Charlie".to_string()),
3252 DataValue::Integer(35),
3253 ]))
3254 .unwrap();
3255
3256 Arc::new(table)
3257 }
3258
3259 #[test]
3260 fn test_select_all() {
3261 let table = create_test_table();
3262 let engine = QueryEngine::new();
3263
3264 let view = engine
3265 .execute(table.clone(), "SELECT * FROM users")
3266 .unwrap();
3267 assert_eq!(view.row_count(), 3);
3268 assert_eq!(view.column_count(), 3);
3269 }
3270
3271 #[test]
3272 fn test_select_columns() {
3273 let table = create_test_table();
3274 let engine = QueryEngine::new();
3275
3276 let view = engine
3277 .execute(table.clone(), "SELECT name, age FROM users")
3278 .unwrap();
3279 assert_eq!(view.row_count(), 3);
3280 assert_eq!(view.column_count(), 2);
3281 }
3282
3283 #[test]
3284 fn test_select_with_limit() {
3285 let table = create_test_table();
3286 let engine = QueryEngine::new();
3287
3288 let view = engine
3289 .execute(table.clone(), "SELECT * FROM users LIMIT 2")
3290 .unwrap();
3291 assert_eq!(view.row_count(), 2);
3292 }
3293
3294 #[test]
3295 fn test_type_coercion_contains() {
3296 let _ = tracing_subscriber::fmt()
3298 .with_max_level(tracing::Level::DEBUG)
3299 .try_init();
3300
3301 let mut table = DataTable::new("test");
3302 table.add_column(DataColumn::new("id"));
3303 table.add_column(DataColumn::new("status"));
3304 table.add_column(DataColumn::new("price"));
3305
3306 table
3308 .add_row(DataRow::new(vec![
3309 DataValue::Integer(1),
3310 DataValue::String("Pending".to_string()),
3311 DataValue::Float(99.99),
3312 ]))
3313 .unwrap();
3314
3315 table
3316 .add_row(DataRow::new(vec![
3317 DataValue::Integer(2),
3318 DataValue::String("Confirmed".to_string()),
3319 DataValue::Float(150.50),
3320 ]))
3321 .unwrap();
3322
3323 table
3324 .add_row(DataRow::new(vec![
3325 DataValue::Integer(3),
3326 DataValue::String("Pending".to_string()),
3327 DataValue::Float(75.00),
3328 ]))
3329 .unwrap();
3330
3331 let table = Arc::new(table);
3332 let engine = QueryEngine::new();
3333
3334 println!("\n=== Testing WHERE clause with Contains ===");
3335 println!("Table has {} rows", table.row_count());
3336 for i in 0..table.row_count() {
3337 let status = table.get_value(i, 1);
3338 println!("Row {i}: status = {status:?}");
3339 }
3340
3341 println!("\n--- Test 1: status.Contains('pend') ---");
3343 let result = engine.execute(
3344 table.clone(),
3345 "SELECT * FROM test WHERE status.Contains('pend')",
3346 );
3347 match result {
3348 Ok(view) => {
3349 println!("SUCCESS: Found {} matching rows", view.row_count());
3350 assert_eq!(view.row_count(), 2); }
3352 Err(e) => {
3353 panic!("Query failed: {e}");
3354 }
3355 }
3356
3357 println!("\n--- Test 2: price.Contains('9') ---");
3359 let result = engine.execute(
3360 table.clone(),
3361 "SELECT * FROM test WHERE price.Contains('9')",
3362 );
3363 match result {
3364 Ok(view) => {
3365 println!(
3366 "SUCCESS: Found {} matching rows with price containing '9'",
3367 view.row_count()
3368 );
3369 assert!(view.row_count() >= 1);
3371 }
3372 Err(e) => {
3373 panic!("Numeric coercion query failed: {e}");
3374 }
3375 }
3376
3377 println!("\n=== All tests passed! ===");
3378 }
3379
3380 #[test]
3381 fn test_not_in_clause() {
3382 let _ = tracing_subscriber::fmt()
3384 .with_max_level(tracing::Level::DEBUG)
3385 .try_init();
3386
3387 let mut table = DataTable::new("test");
3388 table.add_column(DataColumn::new("id"));
3389 table.add_column(DataColumn::new("country"));
3390
3391 table
3393 .add_row(DataRow::new(vec![
3394 DataValue::Integer(1),
3395 DataValue::String("CA".to_string()),
3396 ]))
3397 .unwrap();
3398
3399 table
3400 .add_row(DataRow::new(vec![
3401 DataValue::Integer(2),
3402 DataValue::String("US".to_string()),
3403 ]))
3404 .unwrap();
3405
3406 table
3407 .add_row(DataRow::new(vec![
3408 DataValue::Integer(3),
3409 DataValue::String("UK".to_string()),
3410 ]))
3411 .unwrap();
3412
3413 let table = Arc::new(table);
3414 let engine = QueryEngine::new();
3415
3416 println!("\n=== Testing NOT IN clause ===");
3417 println!("Table has {} rows", table.row_count());
3418 for i in 0..table.row_count() {
3419 let country = table.get_value(i, 1);
3420 println!("Row {i}: country = {country:?}");
3421 }
3422
3423 println!("\n--- Test: country NOT IN ('CA') ---");
3425 let result = engine.execute(
3426 table.clone(),
3427 "SELECT * FROM test WHERE country NOT IN ('CA')",
3428 );
3429 match result {
3430 Ok(view) => {
3431 println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
3432 assert_eq!(view.row_count(), 2); }
3434 Err(e) => {
3435 panic!("NOT IN query failed: {e}");
3436 }
3437 }
3438
3439 println!("\n=== NOT IN test complete! ===");
3440 }
3441
3442 #[test]
3443 fn test_case_insensitive_in_and_not_in() {
3444 let _ = tracing_subscriber::fmt()
3446 .with_max_level(tracing::Level::DEBUG)
3447 .try_init();
3448
3449 let mut table = DataTable::new("test");
3450 table.add_column(DataColumn::new("id"));
3451 table.add_column(DataColumn::new("country"));
3452
3453 table
3455 .add_row(DataRow::new(vec![
3456 DataValue::Integer(1),
3457 DataValue::String("CA".to_string()), ]))
3459 .unwrap();
3460
3461 table
3462 .add_row(DataRow::new(vec![
3463 DataValue::Integer(2),
3464 DataValue::String("us".to_string()), ]))
3466 .unwrap();
3467
3468 table
3469 .add_row(DataRow::new(vec![
3470 DataValue::Integer(3),
3471 DataValue::String("UK".to_string()), ]))
3473 .unwrap();
3474
3475 let table = Arc::new(table);
3476
3477 println!("\n=== Testing Case-Insensitive IN clause ===");
3478 println!("Table has {} rows", table.row_count());
3479 for i in 0..table.row_count() {
3480 let country = table.get_value(i, 1);
3481 println!("Row {i}: country = {country:?}");
3482 }
3483
3484 println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
3486 let engine = QueryEngine::with_case_insensitive(true);
3487 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3488 match result {
3489 Ok(view) => {
3490 println!(
3491 "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
3492 view.row_count()
3493 );
3494 assert_eq!(view.row_count(), 1); }
3496 Err(e) => {
3497 panic!("Case-insensitive IN query failed: {e}");
3498 }
3499 }
3500
3501 println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
3503 let result = engine.execute(
3504 table.clone(),
3505 "SELECT * FROM test WHERE country NOT IN ('ca')",
3506 );
3507 match result {
3508 Ok(view) => {
3509 println!(
3510 "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
3511 view.row_count()
3512 );
3513 assert_eq!(view.row_count(), 2); }
3515 Err(e) => {
3516 panic!("Case-insensitive NOT IN query failed: {e}");
3517 }
3518 }
3519
3520 println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
3522 let engine_case_sensitive = QueryEngine::new(); let result = engine_case_sensitive
3524 .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3525 match result {
3526 Ok(view) => {
3527 println!(
3528 "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
3529 view.row_count()
3530 );
3531 assert_eq!(view.row_count(), 0); }
3533 Err(e) => {
3534 panic!("Case-sensitive IN query failed: {e}");
3535 }
3536 }
3537
3538 println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
3539 }
3540
3541 #[test]
3542 #[ignore = "Parentheses in WHERE clause not yet implemented"]
3543 fn test_parentheses_in_where_clause() {
3544 let _ = tracing_subscriber::fmt()
3546 .with_max_level(tracing::Level::DEBUG)
3547 .try_init();
3548
3549 let mut table = DataTable::new("test");
3550 table.add_column(DataColumn::new("id"));
3551 table.add_column(DataColumn::new("status"));
3552 table.add_column(DataColumn::new("priority"));
3553
3554 table
3556 .add_row(DataRow::new(vec![
3557 DataValue::Integer(1),
3558 DataValue::String("Pending".to_string()),
3559 DataValue::String("High".to_string()),
3560 ]))
3561 .unwrap();
3562
3563 table
3564 .add_row(DataRow::new(vec![
3565 DataValue::Integer(2),
3566 DataValue::String("Complete".to_string()),
3567 DataValue::String("High".to_string()),
3568 ]))
3569 .unwrap();
3570
3571 table
3572 .add_row(DataRow::new(vec![
3573 DataValue::Integer(3),
3574 DataValue::String("Pending".to_string()),
3575 DataValue::String("Low".to_string()),
3576 ]))
3577 .unwrap();
3578
3579 table
3580 .add_row(DataRow::new(vec![
3581 DataValue::Integer(4),
3582 DataValue::String("Complete".to_string()),
3583 DataValue::String("Low".to_string()),
3584 ]))
3585 .unwrap();
3586
3587 let table = Arc::new(table);
3588 let engine = QueryEngine::new();
3589
3590 println!("\n=== Testing Parentheses in WHERE clause ===");
3591 println!("Table has {} rows", table.row_count());
3592 for i in 0..table.row_count() {
3593 let status = table.get_value(i, 1);
3594 let priority = table.get_value(i, 2);
3595 println!("Row {i}: status = {status:?}, priority = {priority:?}");
3596 }
3597
3598 println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
3600 let result = engine.execute(
3601 table.clone(),
3602 "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
3603 );
3604 match result {
3605 Ok(view) => {
3606 println!(
3607 "SUCCESS: Found {} rows with parenthetical logic",
3608 view.row_count()
3609 );
3610 assert_eq!(view.row_count(), 2); }
3612 Err(e) => {
3613 panic!("Parentheses query failed: {e}");
3614 }
3615 }
3616
3617 println!("\n=== Parentheses test complete! ===");
3618 }
3619
3620 #[test]
3621 #[ignore = "Numeric type coercion needs fixing"]
3622 fn test_numeric_type_coercion() {
3623 let _ = tracing_subscriber::fmt()
3625 .with_max_level(tracing::Level::DEBUG)
3626 .try_init();
3627
3628 let mut table = DataTable::new("test");
3629 table.add_column(DataColumn::new("id"));
3630 table.add_column(DataColumn::new("price"));
3631 table.add_column(DataColumn::new("quantity"));
3632
3633 table
3635 .add_row(DataRow::new(vec![
3636 DataValue::Integer(1),
3637 DataValue::Float(99.50), DataValue::Integer(100),
3639 ]))
3640 .unwrap();
3641
3642 table
3643 .add_row(DataRow::new(vec![
3644 DataValue::Integer(2),
3645 DataValue::Float(150.0), DataValue::Integer(200),
3647 ]))
3648 .unwrap();
3649
3650 table
3651 .add_row(DataRow::new(vec![
3652 DataValue::Integer(3),
3653 DataValue::Integer(75), DataValue::Integer(50),
3655 ]))
3656 .unwrap();
3657
3658 let table = Arc::new(table);
3659 let engine = QueryEngine::new();
3660
3661 println!("\n=== Testing Numeric Type Coercion ===");
3662 println!("Table has {} rows", table.row_count());
3663 for i in 0..table.row_count() {
3664 let price = table.get_value(i, 1);
3665 let quantity = table.get_value(i, 2);
3666 println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
3667 }
3668
3669 println!("\n--- Test: price.Contains('.') ---");
3671 let result = engine.execute(
3672 table.clone(),
3673 "SELECT * FROM test WHERE price.Contains('.')",
3674 );
3675 match result {
3676 Ok(view) => {
3677 println!(
3678 "SUCCESS: Found {} rows with decimal points in price",
3679 view.row_count()
3680 );
3681 assert_eq!(view.row_count(), 2); }
3683 Err(e) => {
3684 panic!("Numeric Contains query failed: {e}");
3685 }
3686 }
3687
3688 println!("\n--- Test: quantity.Contains('0') ---");
3690 let result = engine.execute(
3691 table.clone(),
3692 "SELECT * FROM test WHERE quantity.Contains('0')",
3693 );
3694 match result {
3695 Ok(view) => {
3696 println!(
3697 "SUCCESS: Found {} rows with '0' in quantity",
3698 view.row_count()
3699 );
3700 assert_eq!(view.row_count(), 2); }
3702 Err(e) => {
3703 panic!("Integer Contains query failed: {e}");
3704 }
3705 }
3706
3707 println!("\n=== Numeric type coercion test complete! ===");
3708 }
3709
3710 #[test]
3711 fn test_datetime_comparisons() {
3712 let _ = tracing_subscriber::fmt()
3714 .with_max_level(tracing::Level::DEBUG)
3715 .try_init();
3716
3717 let mut table = DataTable::new("test");
3718 table.add_column(DataColumn::new("id"));
3719 table.add_column(DataColumn::new("created_date"));
3720
3721 table
3723 .add_row(DataRow::new(vec![
3724 DataValue::Integer(1),
3725 DataValue::String("2024-12-15".to_string()),
3726 ]))
3727 .unwrap();
3728
3729 table
3730 .add_row(DataRow::new(vec![
3731 DataValue::Integer(2),
3732 DataValue::String("2025-01-15".to_string()),
3733 ]))
3734 .unwrap();
3735
3736 table
3737 .add_row(DataRow::new(vec![
3738 DataValue::Integer(3),
3739 DataValue::String("2025-02-15".to_string()),
3740 ]))
3741 .unwrap();
3742
3743 let table = Arc::new(table);
3744 let engine = QueryEngine::new();
3745
3746 println!("\n=== Testing DateTime Comparisons ===");
3747 println!("Table has {} rows", table.row_count());
3748 for i in 0..table.row_count() {
3749 let date = table.get_value(i, 1);
3750 println!("Row {i}: created_date = {date:?}");
3751 }
3752
3753 println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
3755 let result = engine.execute(
3756 table.clone(),
3757 "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
3758 );
3759 match result {
3760 Ok(view) => {
3761 println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
3762 assert_eq!(view.row_count(), 2); }
3764 Err(e) => {
3765 panic!("DateTime comparison query failed: {e}");
3766 }
3767 }
3768
3769 println!("\n=== DateTime comparison test complete! ===");
3770 }
3771
3772 #[test]
3773 fn test_not_with_method_calls() {
3774 let _ = tracing_subscriber::fmt()
3776 .with_max_level(tracing::Level::DEBUG)
3777 .try_init();
3778
3779 let mut table = DataTable::new("test");
3780 table.add_column(DataColumn::new("id"));
3781 table.add_column(DataColumn::new("status"));
3782
3783 table
3785 .add_row(DataRow::new(vec![
3786 DataValue::Integer(1),
3787 DataValue::String("Pending Review".to_string()),
3788 ]))
3789 .unwrap();
3790
3791 table
3792 .add_row(DataRow::new(vec![
3793 DataValue::Integer(2),
3794 DataValue::String("Complete".to_string()),
3795 ]))
3796 .unwrap();
3797
3798 table
3799 .add_row(DataRow::new(vec![
3800 DataValue::Integer(3),
3801 DataValue::String("Pending Approval".to_string()),
3802 ]))
3803 .unwrap();
3804
3805 let table = Arc::new(table);
3806 let engine = QueryEngine::with_case_insensitive(true);
3807
3808 println!("\n=== Testing NOT with Method Calls ===");
3809 println!("Table has {} rows", table.row_count());
3810 for i in 0..table.row_count() {
3811 let status = table.get_value(i, 1);
3812 println!("Row {i}: status = {status:?}");
3813 }
3814
3815 println!("\n--- Test: NOT status.Contains('pend') ---");
3817 let result = engine.execute(
3818 table.clone(),
3819 "SELECT * FROM test WHERE NOT status.Contains('pend')",
3820 );
3821 match result {
3822 Ok(view) => {
3823 println!(
3824 "SUCCESS: Found {} rows NOT containing 'pend'",
3825 view.row_count()
3826 );
3827 assert_eq!(view.row_count(), 1); }
3829 Err(e) => {
3830 panic!("NOT Contains query failed: {e}");
3831 }
3832 }
3833
3834 println!("\n--- Test: NOT status.StartsWith('Pending') ---");
3836 let result = engine.execute(
3837 table.clone(),
3838 "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
3839 );
3840 match result {
3841 Ok(view) => {
3842 println!(
3843 "SUCCESS: Found {} rows NOT starting with 'Pending'",
3844 view.row_count()
3845 );
3846 assert_eq!(view.row_count(), 1); }
3848 Err(e) => {
3849 panic!("NOT StartsWith query failed: {e}");
3850 }
3851 }
3852
3853 println!("\n=== NOT with method calls test complete! ===");
3854 }
3855
3856 #[test]
3857 #[ignore = "Complex logical expressions with parentheses not yet implemented"]
3858 fn test_complex_logical_expressions() {
3859 let _ = tracing_subscriber::fmt()
3861 .with_max_level(tracing::Level::DEBUG)
3862 .try_init();
3863
3864 let mut table = DataTable::new("test");
3865 table.add_column(DataColumn::new("id"));
3866 table.add_column(DataColumn::new("status"));
3867 table.add_column(DataColumn::new("priority"));
3868 table.add_column(DataColumn::new("assigned"));
3869
3870 table
3872 .add_row(DataRow::new(vec![
3873 DataValue::Integer(1),
3874 DataValue::String("Pending".to_string()),
3875 DataValue::String("High".to_string()),
3876 DataValue::String("John".to_string()),
3877 ]))
3878 .unwrap();
3879
3880 table
3881 .add_row(DataRow::new(vec![
3882 DataValue::Integer(2),
3883 DataValue::String("Complete".to_string()),
3884 DataValue::String("High".to_string()),
3885 DataValue::String("Jane".to_string()),
3886 ]))
3887 .unwrap();
3888
3889 table
3890 .add_row(DataRow::new(vec![
3891 DataValue::Integer(3),
3892 DataValue::String("Pending".to_string()),
3893 DataValue::String("Low".to_string()),
3894 DataValue::String("John".to_string()),
3895 ]))
3896 .unwrap();
3897
3898 table
3899 .add_row(DataRow::new(vec![
3900 DataValue::Integer(4),
3901 DataValue::String("In Progress".to_string()),
3902 DataValue::String("Medium".to_string()),
3903 DataValue::String("Jane".to_string()),
3904 ]))
3905 .unwrap();
3906
3907 let table = Arc::new(table);
3908 let engine = QueryEngine::new();
3909
3910 println!("\n=== Testing Complex Logical Expressions ===");
3911 println!("Table has {} rows", table.row_count());
3912 for i in 0..table.row_count() {
3913 let status = table.get_value(i, 1);
3914 let priority = table.get_value(i, 2);
3915 let assigned = table.get_value(i, 3);
3916 println!(
3917 "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
3918 );
3919 }
3920
3921 println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
3923 let result = engine.execute(
3924 table.clone(),
3925 "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
3926 );
3927 match result {
3928 Ok(view) => {
3929 println!(
3930 "SUCCESS: Found {} rows with complex logic",
3931 view.row_count()
3932 );
3933 assert_eq!(view.row_count(), 2); }
3935 Err(e) => {
3936 panic!("Complex logic query failed: {e}");
3937 }
3938 }
3939
3940 println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
3942 let result = engine.execute(
3943 table.clone(),
3944 "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
3945 );
3946 match result {
3947 Ok(view) => {
3948 println!(
3949 "SUCCESS: Found {} rows with NOT complex logic",
3950 view.row_count()
3951 );
3952 assert_eq!(view.row_count(), 2); }
3954 Err(e) => {
3955 panic!("NOT complex logic query failed: {e}");
3956 }
3957 }
3958
3959 println!("\n=== Complex logical expressions test complete! ===");
3960 }
3961
3962 #[test]
3963 fn test_mixed_data_types_and_edge_cases() {
3964 let _ = tracing_subscriber::fmt()
3966 .with_max_level(tracing::Level::DEBUG)
3967 .try_init();
3968
3969 let mut table = DataTable::new("test");
3970 table.add_column(DataColumn::new("id"));
3971 table.add_column(DataColumn::new("value"));
3972 table.add_column(DataColumn::new("nullable_field"));
3973
3974 table
3976 .add_row(DataRow::new(vec![
3977 DataValue::Integer(1),
3978 DataValue::String("123.45".to_string()),
3979 DataValue::String("present".to_string()),
3980 ]))
3981 .unwrap();
3982
3983 table
3984 .add_row(DataRow::new(vec![
3985 DataValue::Integer(2),
3986 DataValue::Float(678.90),
3987 DataValue::Null,
3988 ]))
3989 .unwrap();
3990
3991 table
3992 .add_row(DataRow::new(vec![
3993 DataValue::Integer(3),
3994 DataValue::Boolean(true),
3995 DataValue::String("also present".to_string()),
3996 ]))
3997 .unwrap();
3998
3999 table
4000 .add_row(DataRow::new(vec![
4001 DataValue::Integer(4),
4002 DataValue::String("false".to_string()),
4003 DataValue::Null,
4004 ]))
4005 .unwrap();
4006
4007 let table = Arc::new(table);
4008 let engine = QueryEngine::new();
4009
4010 println!("\n=== Testing Mixed Data Types and Edge Cases ===");
4011 println!("Table has {} rows", table.row_count());
4012 for i in 0..table.row_count() {
4013 let value = table.get_value(i, 1);
4014 let nullable = table.get_value(i, 2);
4015 println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
4016 }
4017
4018 println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
4020 let result = engine.execute(
4021 table.clone(),
4022 "SELECT * FROM test WHERE value.Contains('true')",
4023 );
4024 match result {
4025 Ok(view) => {
4026 println!(
4027 "SUCCESS: Found {} rows with boolean coercion",
4028 view.row_count()
4029 );
4030 assert_eq!(view.row_count(), 1); }
4032 Err(e) => {
4033 panic!("Boolean coercion query failed: {e}");
4034 }
4035 }
4036
4037 println!("\n--- Test: id IN (1, 3) ---");
4039 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
4040 match result {
4041 Ok(view) => {
4042 println!("SUCCESS: Found {} rows with IN clause", view.row_count());
4043 assert_eq!(view.row_count(), 2); }
4045 Err(e) => {
4046 panic!("Multiple IN values query failed: {e}");
4047 }
4048 }
4049
4050 println!("\n=== Mixed data types test complete! ===");
4051 }
4052
4053 #[test]
4055 fn test_aggregate_only_single_row() {
4056 let table = create_test_stock_data();
4057 let engine = QueryEngine::new();
4058
4059 let result = engine
4061 .execute(
4062 table.clone(),
4063 "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
4064 )
4065 .expect("Query should succeed");
4066
4067 assert_eq!(
4068 result.row_count(),
4069 1,
4070 "Aggregate-only query should return exactly 1 row"
4071 );
4072 assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
4073
4074 let source = result.source();
4076 let row = source.get_row(0).expect("Should have first row");
4077
4078 assert_eq!(row.values[0], DataValue::Integer(5));
4080
4081 assert_eq!(row.values[1], DataValue::Float(99.5));
4083
4084 assert_eq!(row.values[2], DataValue::Float(105.0));
4086
4087 if let DataValue::Float(avg) = &row.values[3] {
4089 assert!(
4090 (avg - 102.4).abs() < 0.01,
4091 "Average should be approximately 102.4, got {}",
4092 avg
4093 );
4094 } else {
4095 panic!("AVG should return a Float value");
4096 }
4097 }
4098
4099 #[test]
4101 fn test_single_aggregate_single_row() {
4102 let table = create_test_stock_data();
4103 let engine = QueryEngine::new();
4104
4105 let result = engine
4106 .execute(table.clone(), "SELECT COUNT(*) FROM stock")
4107 .expect("Query should succeed");
4108
4109 assert_eq!(
4110 result.row_count(),
4111 1,
4112 "Single aggregate query should return exactly 1 row"
4113 );
4114 assert_eq!(result.column_count(), 1, "Should have 1 column");
4115
4116 let source = result.source();
4117 let row = source.get_row(0).expect("Should have first row");
4118 assert_eq!(row.values[0], DataValue::Integer(5));
4119 }
4120
4121 #[test]
4123 fn test_aggregate_with_where_single_row() {
4124 let table = create_test_stock_data();
4125 let engine = QueryEngine::new();
4126
4127 let result = engine
4129 .execute(
4130 table.clone(),
4131 "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
4132 )
4133 .expect("Query should succeed");
4134
4135 assert_eq!(
4136 result.row_count(),
4137 1,
4138 "Filtered aggregate query should return exactly 1 row"
4139 );
4140 assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
4141
4142 let source = result.source();
4143 let row = source.get_row(0).expect("Should have first row");
4144
4145 assert_eq!(row.values[0], DataValue::Integer(2));
4147 assert_eq!(row.values[1], DataValue::Float(103.5)); assert_eq!(row.values[2], DataValue::Float(105.0)); }
4150
4151 #[test]
4152 fn test_not_in_parsing() {
4153 use crate::sql::recursive_parser::Parser;
4154
4155 let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
4156 println!("\n=== Testing NOT IN parsing ===");
4157 println!("Parsing query: {query}");
4158
4159 let mut parser = Parser::new(query);
4160 match parser.parse() {
4161 Ok(statement) => {
4162 println!("Parsed statement: {statement:#?}");
4163 if let Some(where_clause) = statement.where_clause {
4164 println!("WHERE conditions: {:#?}", where_clause.conditions);
4165 if let Some(first_condition) = where_clause.conditions.first() {
4166 println!("First condition expression: {:#?}", first_condition.expr);
4167 }
4168 }
4169 }
4170 Err(e) => {
4171 panic!("Parse error: {e}");
4172 }
4173 }
4174 }
4175
4176 fn create_test_stock_data() -> Arc<DataTable> {
4178 let mut table = DataTable::new("stock");
4179
4180 table.add_column(DataColumn::new("symbol"));
4181 table.add_column(DataColumn::new("close"));
4182 table.add_column(DataColumn::new("volume"));
4183
4184 let test_data = vec![
4186 ("AAPL", 99.5, 1000),
4187 ("AAPL", 101.2, 1500),
4188 ("AAPL", 103.5, 2000),
4189 ("AAPL", 105.0, 1200),
4190 ("AAPL", 102.8, 1800),
4191 ];
4192
4193 for (symbol, close, volume) in test_data {
4194 table
4195 .add_row(DataRow::new(vec![
4196 DataValue::String(symbol.to_string()),
4197 DataValue::Float(close),
4198 DataValue::Integer(volume),
4199 ]))
4200 .expect("Should add row successfully");
4201 }
4202
4203 Arc::new(table)
4204 }
4205}
4206
4207#[cfg(test)]
4208#[path = "query_engine_tests.rs"]
4209mod query_engine_tests;