1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::row_expanders::RowExpanderRegistry;
19use crate::data::subquery_executor::SubqueryExecutor;
20use crate::data::temp_table_registry::TempTableRegistry;
21use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
22use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
23use crate::sql::parser::ast::ColumnRef;
24use crate::sql::parser::ast::SetOperation;
25use crate::sql::parser::ast::TableSource;
26use crate::sql::parser::ast::WindowSpec;
27use crate::sql::recursive_parser::{
28 CTEType, OrderByItem, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
29 TableFunction,
30};
31
32fn resolve_cte<'a>(
37 context: &'a HashMap<String, Arc<DataView>>,
38 name: &str,
39) -> Option<&'a Arc<DataView>> {
40 if let Some(v) = context.get(name) {
41 return Some(v);
42 }
43 let lower = name.to_lowercase();
44 context
45 .iter()
46 .find(|(k, _)| k.to_lowercase() == lower)
47 .map(|(_, v)| v)
48}
49
50#[derive(Debug, Clone)]
52pub struct ExecutionContext {
53 alias_map: HashMap<String, String>,
56}
57
58impl ExecutionContext {
59 pub fn new() -> Self {
61 Self {
62 alias_map: HashMap::new(),
63 }
64 }
65
66 pub fn register_alias(&mut self, alias: String, table_name: String) {
68 debug!("Registering alias: {} -> {}", alias, table_name);
69 self.alias_map.insert(alias, table_name);
70 }
71
72 pub fn resolve_alias(&self, name: &str) -> String {
75 self.alias_map
76 .get(name)
77 .cloned()
78 .unwrap_or_else(|| name.to_string())
79 }
80
81 pub fn is_alias(&self, name: &str) -> bool {
83 self.alias_map.contains_key(name)
84 }
85
86 pub fn get_aliases(&self) -> HashMap<String, String> {
88 self.alias_map.clone()
89 }
90
91 pub fn resolve_column_index(&self, table: &DataTable, column_ref: &ColumnRef) -> Result<usize> {
106 if let Some(table_prefix) = &column_ref.table_prefix {
107 let actual_table = self.resolve_alias(table_prefix);
109
110 let qualified_name = format!("{}.{}", actual_table, column_ref.name);
112 if let Some(idx) = table.find_column_by_qualified_name(&qualified_name) {
113 debug!(
114 "Resolved {}.{} -> qualified column '{}' at index {}",
115 table_prefix, column_ref.name, qualified_name, idx
116 );
117 return Ok(idx);
118 }
119
120 if let Some(idx) = table.get_column_index(&column_ref.name) {
122 debug!(
123 "Resolved {}.{} -> unqualified column '{}' at index {}",
124 table_prefix, column_ref.name, column_ref.name, idx
125 );
126 return Ok(idx);
127 }
128
129 Err(anyhow!(
131 "Column '{}' not found. Table '{}' may not support qualified column names",
132 qualified_name,
133 actual_table
134 ))
135 } else {
136 if let Some(idx) = table.get_column_index(&column_ref.name) {
138 debug!(
139 "Resolved unqualified column '{}' at index {}",
140 column_ref.name, idx
141 );
142 return Ok(idx);
143 }
144
145 if column_ref.name.contains('.') {
147 if let Some(idx) = table.find_column_by_qualified_name(&column_ref.name) {
148 debug!(
149 "Resolved '{}' as qualified column at index {}",
150 column_ref.name, idx
151 );
152 return Ok(idx);
153 }
154 }
155
156 let suggestion = self.find_similar_column(table, &column_ref.name);
158 match suggestion {
159 Some(similar) => Err(anyhow!(
160 "Column '{}' not found. Did you mean '{}'?",
161 column_ref.name,
162 similar
163 )),
164 None => Err(anyhow!("Column '{}' not found", column_ref.name)),
165 }
166 }
167 }
168
169 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
171 let columns = table.column_names();
172 let mut best_match: Option<(String, usize)> = None;
173
174 for col in columns {
175 let distance = edit_distance(name, &col);
176 if distance <= 2 {
177 match best_match {
179 Some((_, best_dist)) if distance < best_dist => {
180 best_match = Some((col.clone(), distance));
181 }
182 None => {
183 best_match = Some((col.clone(), distance));
184 }
185 _ => {}
186 }
187 }
188 }
189
190 best_match.map(|(name, _)| name)
191 }
192}
193
194impl Default for ExecutionContext {
195 fn default() -> Self {
196 Self::new()
197 }
198}
199
200fn edit_distance(a: &str, b: &str) -> usize {
202 let len_a = a.chars().count();
203 let len_b = b.chars().count();
204
205 if len_a == 0 {
206 return len_b;
207 }
208 if len_b == 0 {
209 return len_a;
210 }
211
212 let mut matrix = vec![vec![0; len_b + 1]; len_a + 1];
213
214 for i in 0..=len_a {
215 matrix[i][0] = i;
216 }
217 for j in 0..=len_b {
218 matrix[0][j] = j;
219 }
220
221 let a_chars: Vec<char> = a.chars().collect();
222 let b_chars: Vec<char> = b.chars().collect();
223
224 for i in 1..=len_a {
225 for j in 1..=len_b {
226 let cost = if a_chars[i - 1] == b_chars[j - 1] {
227 0
228 } else {
229 1
230 };
231 matrix[i][j] = min(
232 min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1),
233 matrix[i - 1][j - 1] + cost,
234 );
235 }
236 }
237
238 matrix[len_a][len_b]
239}
240
241#[derive(Clone)]
243pub struct QueryEngine {
244 case_insensitive: bool,
245 date_notation: String,
246 _behavior_config: Option<BehaviorConfig>,
247}
248
249impl Default for QueryEngine {
250 fn default() -> Self {
251 Self::new()
252 }
253}
254
255impl QueryEngine {
256 #[must_use]
257 pub fn new() -> Self {
258 Self {
259 case_insensitive: false,
260 date_notation: get_date_notation(),
261 _behavior_config: None,
262 }
263 }
264
265 #[must_use]
266 pub fn with_behavior_config(config: BehaviorConfig) -> Self {
267 let case_insensitive = config.case_insensitive_default;
268 let date_notation = get_date_notation();
270 Self {
271 case_insensitive,
272 date_notation,
273 _behavior_config: Some(config),
274 }
275 }
276
277 #[must_use]
278 pub fn with_date_notation(_date_notation: String) -> Self {
279 Self {
280 case_insensitive: false,
281 date_notation: get_date_notation(), _behavior_config: None,
283 }
284 }
285
286 #[must_use]
287 pub fn with_case_insensitive(case_insensitive: bool) -> Self {
288 Self {
289 case_insensitive,
290 date_notation: get_date_notation(),
291 _behavior_config: None,
292 }
293 }
294
295 #[must_use]
296 pub fn with_case_insensitive_and_date_notation(
297 case_insensitive: bool,
298 _date_notation: String, ) -> Self {
300 Self {
301 case_insensitive,
302 date_notation: get_date_notation(), _behavior_config: None,
304 }
305 }
306
307 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
309 let columns = table.column_names();
310 let mut best_match: Option<(String, usize)> = None;
311
312 for col in columns {
313 let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
314 let max_distance = if name.len() > 10 { 3 } else { 2 };
317 if distance <= max_distance {
318 match &best_match {
319 None => best_match = Some((col, distance)),
320 Some((_, best_dist)) if distance < *best_dist => {
321 best_match = Some((col, distance));
322 }
323 _ => {}
324 }
325 }
326 }
327
328 best_match.map(|(name, _)| name)
329 }
330
331 fn edit_distance(&self, s1: &str, s2: &str) -> usize {
333 let len1 = s1.len();
334 let len2 = s2.len();
335 let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
336
337 for i in 0..=len1 {
338 matrix[i][0] = i;
339 }
340 for j in 0..=len2 {
341 matrix[0][j] = j;
342 }
343
344 for (i, c1) in s1.chars().enumerate() {
345 for (j, c2) in s2.chars().enumerate() {
346 let cost = usize::from(c1 != c2);
347 matrix[i + 1][j + 1] = std::cmp::min(
348 matrix[i][j + 1] + 1, std::cmp::min(
350 matrix[i + 1][j] + 1, matrix[i][j] + cost, ),
353 );
354 }
355 }
356
357 matrix[len1][len2]
358 }
359
360 fn contains_unnest(expr: &SqlExpression) -> bool {
362 match expr {
363 SqlExpression::Unnest { .. } => true,
365 SqlExpression::FunctionCall { name, args, .. } => {
366 if name.to_uppercase() == "UNNEST" {
367 return true;
368 }
369 args.iter().any(Self::contains_unnest)
371 }
372 SqlExpression::BinaryOp { left, right, .. } => {
373 Self::contains_unnest(left) || Self::contains_unnest(right)
374 }
375 SqlExpression::Not { expr } => Self::contains_unnest(expr),
376 SqlExpression::CaseExpression {
377 when_branches,
378 else_branch,
379 } => {
380 when_branches.iter().any(|branch| {
381 Self::contains_unnest(&branch.condition)
382 || Self::contains_unnest(&branch.result)
383 }) || else_branch
384 .as_ref()
385 .map_or(false, |e| Self::contains_unnest(e))
386 }
387 SqlExpression::SimpleCaseExpression {
388 expr,
389 when_branches,
390 else_branch,
391 } => {
392 Self::contains_unnest(expr)
393 || when_branches.iter().any(|branch| {
394 Self::contains_unnest(&branch.value)
395 || Self::contains_unnest(&branch.result)
396 })
397 || else_branch
398 .as_ref()
399 .map_or(false, |e| Self::contains_unnest(e))
400 }
401 SqlExpression::InList { expr, values } => {
402 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
403 }
404 SqlExpression::NotInList { expr, values } => {
405 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
406 }
407 SqlExpression::Between { expr, lower, upper } => {
408 Self::contains_unnest(expr)
409 || Self::contains_unnest(lower)
410 || Self::contains_unnest(upper)
411 }
412 SqlExpression::InSubquery { expr, .. } => Self::contains_unnest(expr),
413 SqlExpression::NotInSubquery { expr, .. } => Self::contains_unnest(expr),
414 SqlExpression::ScalarSubquery { .. } => false, SqlExpression::WindowFunction { args, .. } => args.iter().any(Self::contains_unnest),
416 SqlExpression::MethodCall { args, .. } => args.iter().any(Self::contains_unnest),
417 SqlExpression::ChainedMethodCall { base, args, .. } => {
418 Self::contains_unnest(base) || args.iter().any(Self::contains_unnest)
419 }
420 _ => false,
421 }
422 }
423
424 fn collect_window_specs(expr: &SqlExpression, specs: &mut Vec<WindowSpec>) {
426 match expr {
427 SqlExpression::WindowFunction {
428 window_spec, args, ..
429 } => {
430 specs.push(window_spec.clone());
432 for arg in args {
434 Self::collect_window_specs(arg, specs);
435 }
436 }
437 SqlExpression::BinaryOp { left, right, .. } => {
438 Self::collect_window_specs(left, specs);
439 Self::collect_window_specs(right, specs);
440 }
441 SqlExpression::Not { expr } => {
442 Self::collect_window_specs(expr, specs);
443 }
444 SqlExpression::FunctionCall { args, .. } => {
445 for arg in args {
446 Self::collect_window_specs(arg, specs);
447 }
448 }
449 SqlExpression::CaseExpression {
450 when_branches,
451 else_branch,
452 } => {
453 for branch in when_branches {
454 Self::collect_window_specs(&branch.condition, specs);
455 Self::collect_window_specs(&branch.result, specs);
456 }
457 if let Some(else_expr) = else_branch {
458 Self::collect_window_specs(else_expr, specs);
459 }
460 }
461 SqlExpression::SimpleCaseExpression {
462 expr,
463 when_branches,
464 else_branch,
465 } => {
466 Self::collect_window_specs(expr, specs);
467 for branch in when_branches {
468 Self::collect_window_specs(&branch.value, specs);
469 Self::collect_window_specs(&branch.result, specs);
470 }
471 if let Some(else_expr) = else_branch {
472 Self::collect_window_specs(else_expr, specs);
473 }
474 }
475 SqlExpression::InList { expr, values, .. } => {
476 Self::collect_window_specs(expr, specs);
477 for item in values {
478 Self::collect_window_specs(item, specs);
479 }
480 }
481 SqlExpression::ChainedMethodCall { base, args, .. } => {
482 Self::collect_window_specs(base, specs);
483 for arg in args {
484 Self::collect_window_specs(arg, specs);
485 }
486 }
487 SqlExpression::Column(_)
489 | SqlExpression::NumberLiteral(_)
490 | SqlExpression::StringLiteral(_)
491 | SqlExpression::BooleanLiteral(_)
492 | SqlExpression::Null
493 | SqlExpression::DateTimeToday { .. }
494 | SqlExpression::DateTimeConstructor { .. }
495 | SqlExpression::MethodCall { .. } => {}
496 _ => {}
498 }
499 }
500
501 fn contains_window_function(expr: &SqlExpression) -> bool {
503 match expr {
504 SqlExpression::WindowFunction { .. } => true,
505 SqlExpression::BinaryOp { left, right, .. } => {
506 Self::contains_window_function(left) || Self::contains_window_function(right)
507 }
508 SqlExpression::Not { expr } => Self::contains_window_function(expr),
509 SqlExpression::FunctionCall { args, .. } => {
510 args.iter().any(Self::contains_window_function)
511 }
512 SqlExpression::CaseExpression {
513 when_branches,
514 else_branch,
515 } => {
516 when_branches.iter().any(|branch| {
517 Self::contains_window_function(&branch.condition)
518 || Self::contains_window_function(&branch.result)
519 }) || else_branch
520 .as_ref()
521 .map_or(false, |e| Self::contains_window_function(e))
522 }
523 SqlExpression::SimpleCaseExpression {
524 expr,
525 when_branches,
526 else_branch,
527 } => {
528 Self::contains_window_function(expr)
529 || when_branches.iter().any(|branch| {
530 Self::contains_window_function(&branch.value)
531 || Self::contains_window_function(&branch.result)
532 })
533 || else_branch
534 .as_ref()
535 .map_or(false, |e| Self::contains_window_function(e))
536 }
537 SqlExpression::InList { expr, values } => {
538 Self::contains_window_function(expr)
539 || values.iter().any(Self::contains_window_function)
540 }
541 SqlExpression::NotInList { expr, values } => {
542 Self::contains_window_function(expr)
543 || values.iter().any(Self::contains_window_function)
544 }
545 SqlExpression::Between { expr, lower, upper } => {
546 Self::contains_window_function(expr)
547 || Self::contains_window_function(lower)
548 || Self::contains_window_function(upper)
549 }
550 SqlExpression::InSubquery { expr, .. } => Self::contains_window_function(expr),
551 SqlExpression::NotInSubquery { expr, .. } => Self::contains_window_function(expr),
552 SqlExpression::MethodCall { args, .. } => {
553 args.iter().any(Self::contains_window_function)
554 }
555 SqlExpression::ChainedMethodCall { base, args, .. } => {
556 Self::contains_window_function(base)
557 || args.iter().any(Self::contains_window_function)
558 }
559 _ => false,
560 }
561 }
562
563 fn extract_window_specs(
565 items: &[SelectItem],
566 ) -> Vec<crate::data::batch_window_evaluator::WindowFunctionSpec> {
567 let mut specs = Vec::new();
568 for (idx, item) in items.iter().enumerate() {
569 if let SelectItem::Expression { expr, .. } = item {
570 Self::collect_window_function_specs(expr, idx, &mut specs);
571 }
572 }
573 specs
574 }
575
576 fn collect_window_function_specs(
578 expr: &SqlExpression,
579 output_column_index: usize,
580 specs: &mut Vec<crate::data::batch_window_evaluator::WindowFunctionSpec>,
581 ) {
582 match expr {
583 SqlExpression::WindowFunction {
584 name,
585 args,
586 window_spec,
587 } => {
588 specs.push(crate::data::batch_window_evaluator::WindowFunctionSpec {
589 spec: window_spec.clone(),
590 function_name: name.clone(),
591 args: args.clone(),
592 output_column_index,
593 });
594 }
595 SqlExpression::BinaryOp { left, right, .. } => {
596 Self::collect_window_function_specs(left, output_column_index, specs);
597 Self::collect_window_function_specs(right, output_column_index, specs);
598 }
599 SqlExpression::Not { expr } => {
600 Self::collect_window_function_specs(expr, output_column_index, specs);
601 }
602 SqlExpression::FunctionCall { args, .. } => {
603 for arg in args {
604 Self::collect_window_function_specs(arg, output_column_index, specs);
605 }
606 }
607 SqlExpression::CaseExpression {
608 when_branches,
609 else_branch,
610 } => {
611 for branch in when_branches {
612 Self::collect_window_function_specs(
613 &branch.condition,
614 output_column_index,
615 specs,
616 );
617 Self::collect_window_function_specs(&branch.result, output_column_index, specs);
618 }
619 if let Some(e) = else_branch {
620 Self::collect_window_function_specs(e, output_column_index, specs);
621 }
622 }
623 SqlExpression::SimpleCaseExpression {
624 expr,
625 when_branches,
626 else_branch,
627 } => {
628 Self::collect_window_function_specs(expr, output_column_index, specs);
629 for branch in when_branches {
630 Self::collect_window_function_specs(&branch.value, output_column_index, specs);
631 Self::collect_window_function_specs(&branch.result, output_column_index, specs);
632 }
633 if let Some(e) = else_branch {
634 Self::collect_window_function_specs(e, output_column_index, specs);
635 }
636 }
637 SqlExpression::InList { expr, values } => {
638 Self::collect_window_function_specs(expr, output_column_index, specs);
639 for val in values {
640 Self::collect_window_function_specs(val, output_column_index, specs);
641 }
642 }
643 SqlExpression::NotInList { expr, values } => {
644 Self::collect_window_function_specs(expr, output_column_index, specs);
645 for val in values {
646 Self::collect_window_function_specs(val, output_column_index, specs);
647 }
648 }
649 SqlExpression::Between { expr, lower, upper } => {
650 Self::collect_window_function_specs(expr, output_column_index, specs);
651 Self::collect_window_function_specs(lower, output_column_index, specs);
652 Self::collect_window_function_specs(upper, output_column_index, specs);
653 }
654 SqlExpression::InSubquery { expr, .. } => {
655 Self::collect_window_function_specs(expr, output_column_index, specs);
656 }
657 SqlExpression::NotInSubquery { expr, .. } => {
658 Self::collect_window_function_specs(expr, output_column_index, specs);
659 }
660 SqlExpression::MethodCall { args, .. } => {
661 for arg in args {
662 Self::collect_window_function_specs(arg, output_column_index, specs);
663 }
664 }
665 SqlExpression::ChainedMethodCall { base, args, .. } => {
666 Self::collect_window_function_specs(base, output_column_index, specs);
667 for arg in args {
668 Self::collect_window_function_specs(arg, output_column_index, specs);
669 }
670 }
671 _ => {} }
673 }
674
675 pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
677 let (view, _plan) = self.execute_with_plan(table, sql)?;
678 Ok(view)
679 }
680
681 pub fn execute_with_temp_tables(
683 &self,
684 table: Arc<DataTable>,
685 sql: &str,
686 temp_tables: Option<&TempTableRegistry>,
687 ) -> Result<DataView> {
688 let (view, _plan) = self.execute_with_plan_and_temp_tables(table, sql, temp_tables)?;
689 Ok(view)
690 }
691
692 pub fn execute_statement(
694 &self,
695 table: Arc<DataTable>,
696 statement: SelectStatement,
697 ) -> Result<DataView> {
698 self.execute_statement_with_temp_tables(table, statement, None)
699 }
700
701 pub fn execute_statement_with_temp_tables(
703 &self,
704 table: Arc<DataTable>,
705 statement: SelectStatement,
706 temp_tables: Option<&TempTableRegistry>,
707 ) -> Result<DataView> {
708 let mut cte_context = HashMap::new();
710
711 if let Some(temp_registry) = temp_tables {
713 for table_name in temp_registry.list_tables() {
714 if let Some(temp_table) = temp_registry.get(&table_name) {
715 debug!("Adding temp table {} to CTE context", table_name);
716 let view = DataView::new(temp_table);
717 cte_context.insert(table_name, Arc::new(view));
718 }
719 }
720 }
721
722 for cte in &statement.ctes {
723 debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
724 let cte_result = match &cte.cte_type {
726 CTEType::Standard(query) => {
727 let view = self.build_view_with_context(
729 table.clone(),
730 query.clone(),
731 &mut cte_context,
732 )?;
733
734 let mut materialized = self.materialize_view(view)?;
736
737 for column in materialized.columns_mut() {
739 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
740 column.source_table = Some(cte.name.clone());
741 }
742
743 DataView::new(Arc::new(materialized))
744 }
745 CTEType::Web(web_spec) => {
746 use crate::web::http_fetcher::WebDataFetcher;
748
749 let fetcher = WebDataFetcher::new()?;
750 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
752
753 for column in data_table.columns_mut() {
755 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
756 column.source_table = Some(cte.name.clone());
757 }
758
759 DataView::new(Arc::new(data_table))
761 }
762 CTEType::File(file_spec) => {
763 let mut data_table =
764 crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
765
766 for column in data_table.columns_mut() {
767 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
768 column.source_table = Some(cte.name.clone());
769 }
770
771 DataView::new(Arc::new(data_table))
772 }
773 };
774 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
776 debug!(
777 "QueryEngine: CTE '{}' pre-processed, stored in context",
778 cte.name
779 );
780 }
781
782 let mut subquery_executor =
784 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
785 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
786
787 self.build_view_with_context(table, processed_statement, &mut cte_context)
789 }
790
791 pub fn execute_statement_with_cte_context(
793 &self,
794 table: Arc<DataTable>,
795 statement: SelectStatement,
796 cte_context: &HashMap<String, Arc<DataView>>,
797 ) -> Result<DataView> {
798 let mut local_context = cte_context.clone();
800
801 for cte in &statement.ctes {
803 debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
804 let cte_result = match &cte.cte_type {
805 CTEType::Standard(query) => {
806 let view = self.build_view_with_context(
807 table.clone(),
808 query.clone(),
809 &mut local_context,
810 )?;
811
812 let mut materialized = self.materialize_view(view)?;
814
815 for column in materialized.columns_mut() {
817 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
818 column.source_table = Some(cte.name.clone());
819 }
820
821 DataView::new(Arc::new(materialized))
822 }
823 CTEType::Web(web_spec) => {
824 use crate::web::http_fetcher::WebDataFetcher;
826
827 let fetcher = WebDataFetcher::new()?;
828 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
830
831 for column in data_table.columns_mut() {
833 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
834 column.source_table = Some(cte.name.clone());
835 }
836
837 DataView::new(Arc::new(data_table))
839 }
840 CTEType::File(file_spec) => {
841 let mut data_table =
842 crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
843
844 for column in data_table.columns_mut() {
845 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
846 column.source_table = Some(cte.name.clone());
847 }
848
849 DataView::new(Arc::new(data_table))
850 }
851 };
852 local_context.insert(cte.name.clone(), Arc::new(cte_result));
853 }
854
855 let mut subquery_executor =
857 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
858 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
859
860 self.build_view_with_context(table, processed_statement, &mut local_context)
862 }
863
864 pub fn execute_with_plan(
866 &self,
867 table: Arc<DataTable>,
868 sql: &str,
869 ) -> Result<(DataView, ExecutionPlan)> {
870 self.execute_with_plan_and_temp_tables(table, sql, None)
871 }
872
873 pub fn execute_with_plan_and_temp_tables(
875 &self,
876 table: Arc<DataTable>,
877 sql: &str,
878 temp_tables: Option<&TempTableRegistry>,
879 ) -> Result<(DataView, ExecutionPlan)> {
880 let mut plan_builder = ExecutionPlanBuilder::new();
881 let start_time = Instant::now();
882
883 plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
885 plan_builder.add_detail(format!("Query: {}", sql));
886 let mut parser = Parser::new(sql);
887 let statement = parser
888 .parse()
889 .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
890 plan_builder.add_detail(format!("Parsed successfully"));
891 if let Some(ref from_source) = statement.from_source {
892 match from_source {
893 TableSource::Table(name) => {
894 plan_builder.add_detail(format!("FROM: {}", name));
895 }
896 TableSource::DerivedTable { alias, .. } => {
897 plan_builder.add_detail(format!("FROM: derived table (alias: {})", alias));
898 }
899 TableSource::Pivot { .. } => {
900 plan_builder.add_detail("FROM: PIVOT".to_string());
901 }
902 }
903 }
904 if statement.where_clause.is_some() {
905 plan_builder.add_detail("WHERE clause present".to_string());
906 }
907 plan_builder.end_step();
908
909 let mut cte_context = HashMap::new();
911
912 if let Some(temp_registry) = temp_tables {
914 for table_name in temp_registry.list_tables() {
915 if let Some(temp_table) = temp_registry.get(&table_name) {
916 debug!("Adding temp table {} to CTE context", table_name);
917 let view = DataView::new(temp_table);
918 cte_context.insert(table_name, Arc::new(view));
919 }
920 }
921 }
922
923 if !statement.ctes.is_empty() {
924 plan_builder.begin_step(
925 StepType::CTE,
926 format!("Process {} CTEs", statement.ctes.len()),
927 );
928
929 for cte in &statement.ctes {
930 let cte_start = Instant::now();
931 plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
932
933 let cte_result = match &cte.cte_type {
934 CTEType::Standard(query) => {
935 if let Some(ref from_source) = query.from_source {
937 match from_source {
938 TableSource::Table(name) => {
939 plan_builder.add_detail(format!("Source: {}", name));
940 }
941 TableSource::DerivedTable { alias, .. } => {
942 plan_builder
943 .add_detail(format!("Source: derived table ({})", alias));
944 }
945 TableSource::Pivot { .. } => {
946 plan_builder.add_detail("Source: PIVOT".to_string());
947 }
948 }
949 }
950 if query.where_clause.is_some() {
951 plan_builder.add_detail("Has WHERE clause".to_string());
952 }
953 if query.group_by.is_some() {
954 plan_builder.add_detail("Has GROUP BY".to_string());
955 }
956
957 debug!(
958 "QueryEngine: Processing CTE '{}' with existing context: {:?}",
959 cte.name,
960 cte_context.keys().collect::<Vec<_>>()
961 );
962
963 let mut subquery_executor = SubqueryExecutor::with_cte_context(
966 self.clone(),
967 table.clone(),
968 cte_context.clone(),
969 );
970 let processed_query = subquery_executor.execute_subqueries(query)?;
971
972 let view = self.build_view_with_context(
973 table.clone(),
974 processed_query,
975 &mut cte_context,
976 )?;
977
978 let mut materialized = self.materialize_view(view)?;
980
981 for column in materialized.columns_mut() {
983 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
984 column.source_table = Some(cte.name.clone());
985 }
986
987 DataView::new(Arc::new(materialized))
988 }
989 CTEType::Web(web_spec) => {
990 plan_builder.add_detail(format!("URL: {}", web_spec.url));
991 if let Some(format) = &web_spec.format {
992 plan_builder.add_detail(format!("Format: {:?}", format));
993 }
994 if let Some(cache) = web_spec.cache_seconds {
995 plan_builder.add_detail(format!("Cache: {} seconds", cache));
996 }
997
998 use crate::web::http_fetcher::WebDataFetcher;
1000
1001 let fetcher = WebDataFetcher::new()?;
1002 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
1004
1005 for column in data_table.columns_mut() {
1007 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1008 column.source_table = Some(cte.name.clone());
1009 }
1010
1011 DataView::new(Arc::new(data_table))
1013 }
1014 CTEType::File(file_spec) => {
1015 plan_builder.add_detail(format!("PATH: {}", file_spec.path));
1016 if file_spec.recursive {
1017 plan_builder.add_detail("RECURSIVE".to_string());
1018 }
1019 if let Some(ref g) = file_spec.glob {
1020 plan_builder.add_detail(format!("GLOB: {}", g));
1021 }
1022 if let Some(d) = file_spec.max_depth {
1023 plan_builder.add_detail(format!("MAX_DEPTH: {}", d));
1024 }
1025
1026 let mut data_table =
1027 crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
1028
1029 for column in data_table.columns_mut() {
1030 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1031 column.source_table = Some(cte.name.clone());
1032 }
1033
1034 DataView::new(Arc::new(data_table))
1035 }
1036 };
1037
1038 plan_builder.set_rows_out(cte_result.row_count());
1040 plan_builder.add_detail(format!(
1041 "Result: {} rows, {} columns",
1042 cte_result.row_count(),
1043 cte_result.column_count()
1044 ));
1045 plan_builder.add_detail(format!(
1046 "Execution time: {:.3}ms",
1047 cte_start.elapsed().as_secs_f64() * 1000.0
1048 ));
1049
1050 debug!(
1051 "QueryEngine: Storing CTE '{}' in context with {} rows",
1052 cte.name,
1053 cte_result.row_count()
1054 );
1055 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1056 plan_builder.end_step();
1057 }
1058
1059 plan_builder.add_detail(format!(
1060 "All {} CTEs cached in context",
1061 statement.ctes.len()
1062 ));
1063 plan_builder.end_step();
1064 }
1065
1066 plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
1068 let mut subquery_executor =
1069 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
1070
1071 let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
1073 format!("{:?}", w).contains("Subquery")
1075 });
1076
1077 if has_subqueries {
1078 plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
1079 }
1080
1081 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
1082
1083 if has_subqueries {
1084 plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
1085 } else {
1086 plan_builder.add_detail("No subqueries to process".to_string());
1087 }
1088
1089 plan_builder.end_step();
1090 let result = self.build_view_with_context_and_plan(
1091 table,
1092 processed_statement,
1093 &mut cte_context,
1094 &mut plan_builder,
1095 )?;
1096
1097 let total_duration = start_time.elapsed();
1098 info!(
1099 "Query execution complete: total={:?}, rows={}",
1100 total_duration,
1101 result.row_count()
1102 );
1103
1104 let plan = plan_builder.build();
1105 Ok((result, plan))
1106 }
1107
1108 fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
1110 let mut cte_context = HashMap::new();
1111 self.build_view_with_context(table, statement, &mut cte_context)
1112 }
1113
1114 fn build_view_with_context(
1116 &self,
1117 table: Arc<DataTable>,
1118 statement: SelectStatement,
1119 cte_context: &mut HashMap<String, Arc<DataView>>,
1120 ) -> Result<DataView> {
1121 let mut dummy_plan = ExecutionPlanBuilder::new();
1122 let mut exec_context = ExecutionContext::new();
1123 self.build_view_with_context_and_plan_and_exec(
1124 table,
1125 statement,
1126 cte_context,
1127 &mut dummy_plan,
1128 &mut exec_context,
1129 )
1130 }
1131
1132 fn build_view_with_context_and_plan(
1134 &self,
1135 table: Arc<DataTable>,
1136 statement: SelectStatement,
1137 cte_context: &mut HashMap<String, Arc<DataView>>,
1138 plan: &mut ExecutionPlanBuilder,
1139 ) -> Result<DataView> {
1140 let mut exec_context = ExecutionContext::new();
1141 self.build_view_with_context_and_plan_and_exec(
1142 table,
1143 statement,
1144 cte_context,
1145 plan,
1146 &mut exec_context,
1147 )
1148 }
1149
1150 fn build_view_with_context_and_plan_and_exec(
1152 &self,
1153 table: Arc<DataTable>,
1154 statement: SelectStatement,
1155 cte_context: &mut HashMap<String, Arc<DataView>>,
1156 plan: &mut ExecutionPlanBuilder,
1157 exec_context: &mut ExecutionContext,
1158 ) -> Result<DataView> {
1159 for cte in &statement.ctes {
1161 if cte_context.contains_key(&cte.name) {
1163 debug!(
1164 "QueryEngine: CTE '{}' already in context, skipping",
1165 cte.name
1166 );
1167 continue;
1168 }
1169
1170 debug!("QueryEngine: Processing CTE '{}'...", cte.name);
1171 debug!(
1172 "QueryEngine: Available CTEs for '{}': {:?}",
1173 cte.name,
1174 cte_context.keys().collect::<Vec<_>>()
1175 );
1176
1177 let cte_result = match &cte.cte_type {
1179 CTEType::Standard(query) => {
1180 let view =
1181 self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
1182
1183 let mut materialized = self.materialize_view(view)?;
1185
1186 for column in materialized.columns_mut() {
1188 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1189 column.source_table = Some(cte.name.clone());
1190 }
1191
1192 DataView::new(Arc::new(materialized))
1193 }
1194 CTEType::Web(_web_spec) => {
1195 return Err(anyhow!(
1197 "Web CTEs should be processed in execute_select method"
1198 ));
1199 }
1200 CTEType::File(_file_spec) => {
1201 return Err(anyhow!(
1203 "FILE CTEs should be processed in execute_select method"
1204 ));
1205 }
1206 };
1207
1208 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1210 debug!(
1211 "QueryEngine: CTE '{}' processed, stored in context",
1212 cte.name
1213 );
1214 }
1215
1216 let source_table = if let Some(ref from_source) = statement.from_source {
1218 match from_source {
1219 TableSource::Table(table_name) => {
1220 if let Some(cte_view) = resolve_cte(cte_context, table_name) {
1222 debug!("QueryEngine: Using CTE '{}' as source table", table_name);
1223 let mut materialized = self.materialize_view((**cte_view).clone())?;
1225
1226 #[allow(deprecated)]
1228 if let Some(ref alias) = statement.from_alias {
1229 debug!(
1230 "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1231 alias, table_name
1232 );
1233 for column in materialized.columns_mut() {
1234 if let Some(ref qualified_name) = column.qualified_name {
1236 if qualified_name.starts_with(&format!("{}.", table_name)) {
1237 column.qualified_name = Some(qualified_name.replace(
1238 &format!("{}.", table_name),
1239 &format!("{}.", alias),
1240 ));
1241 }
1242 }
1243 if column.source_table.as_ref() == Some(table_name) {
1245 column.source_table = Some(alias.clone());
1246 }
1247 }
1248 }
1249
1250 Arc::new(materialized)
1251 } else {
1252 table.clone()
1254 }
1255 }
1256 TableSource::DerivedTable { query, alias } => {
1257 debug!(
1259 "QueryEngine: Processing FROM derived table (alias: {})",
1260 alias
1261 );
1262 let subquery_result =
1263 self.build_view_with_context(table.clone(), *query.clone(), cte_context)?;
1264
1265 let mut materialized = self.materialize_view(subquery_result)?;
1268
1269 for column in materialized.columns_mut() {
1273 column.source_table = Some(alias.clone());
1274 }
1275
1276 Arc::new(materialized)
1277 }
1278 TableSource::Pivot { .. } => {
1279 return Err(anyhow!(
1281 "PIVOT in FROM clause should have been expanded by preprocessing pipeline"
1282 ));
1283 }
1284 }
1285 } else {
1286 #[allow(deprecated)]
1288 if let Some(ref table_func) = statement.from_function {
1289 debug!("QueryEngine: Processing table function (deprecated field)...");
1291 match table_func {
1292 TableFunction::Generator { name, args } => {
1293 use crate::sql::generators::GeneratorRegistry;
1295
1296 let registry = GeneratorRegistry::new();
1298
1299 if let Some(generator) = registry.get(name) {
1300 let mut evaluator = ArithmeticEvaluator::with_date_notation(
1302 &table,
1303 self.date_notation.clone(),
1304 );
1305 let dummy_row = 0;
1306
1307 let mut evaluated_args = Vec::new();
1308 for arg in args {
1309 evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
1310 }
1311
1312 generator.generate(evaluated_args)?
1314 } else {
1315 return Err(anyhow!("Unknown generator function: {}", name));
1316 }
1317 }
1318 }
1319 } else {
1320 #[allow(deprecated)]
1321 if let Some(ref subquery) = statement.from_subquery {
1322 debug!("QueryEngine: Processing FROM subquery (deprecated field)...");
1324 let subquery_result = self.build_view_with_context(
1325 table.clone(),
1326 *subquery.clone(),
1327 cte_context,
1328 )?;
1329
1330 let materialized = self.materialize_view(subquery_result)?;
1333 Arc::new(materialized)
1334 } else {
1335 #[allow(deprecated)]
1336 if let Some(ref table_name) = statement.from_table {
1337 if let Some(cte_view) = resolve_cte(cte_context, table_name) {
1339 debug!(
1340 "QueryEngine: Using CTE '{}' as source table (deprecated field)",
1341 table_name
1342 );
1343 let mut materialized = self.materialize_view((**cte_view).clone())?;
1345
1346 #[allow(deprecated)]
1348 if let Some(ref alias) = statement.from_alias {
1349 debug!(
1350 "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1351 alias, table_name
1352 );
1353 for column in materialized.columns_mut() {
1354 if let Some(ref qualified_name) = column.qualified_name {
1356 if qualified_name.starts_with(&format!("{}.", table_name)) {
1357 column.qualified_name = Some(qualified_name.replace(
1358 &format!("{}.", table_name),
1359 &format!("{}.", alias),
1360 ));
1361 }
1362 }
1363 if column.source_table.as_ref() == Some(table_name) {
1365 column.source_table = Some(alias.clone());
1366 }
1367 }
1368 }
1369
1370 Arc::new(materialized)
1371 } else {
1372 table.clone()
1374 }
1375 } else {
1376 table.clone()
1378 }
1379 }
1380 }
1381 };
1382
1383 #[allow(deprecated)]
1385 if let Some(ref alias) = statement.from_alias {
1386 #[allow(deprecated)]
1387 if let Some(ref table_name) = statement.from_table {
1388 exec_context.register_alias(alias.clone(), table_name.clone());
1389 }
1390 }
1391
1392 let final_table = if !statement.joins.is_empty() {
1394 plan.begin_step(
1395 StepType::Join,
1396 format!("Process {} JOINs", statement.joins.len()),
1397 );
1398 plan.set_rows_in(source_table.row_count());
1399
1400 let join_executor = HashJoinExecutor::new(self.case_insensitive);
1401 let mut current_table = source_table;
1402
1403 for (idx, join_clause) in statement.joins.iter().enumerate() {
1404 let join_start = Instant::now();
1405 plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
1406 plan.add_detail(format!("Type: {:?}", join_clause.join_type));
1407 plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
1408 plan.add_detail(format!(
1409 "Executing {:?} JOIN on {} condition(s)",
1410 join_clause.join_type,
1411 join_clause.condition.conditions.len()
1412 ));
1413
1414 let right_table = match &join_clause.table {
1416 TableSource::Table(name) => {
1417 if let Some(cte_view) = resolve_cte(cte_context, name) {
1419 let mut materialized = self.materialize_view((**cte_view).clone())?;
1420
1421 if let Some(ref alias) = join_clause.alias {
1423 debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
1424 for column in materialized.columns_mut() {
1425 if let Some(ref qualified_name) = column.qualified_name {
1427 if qualified_name.starts_with(&format!("{}.", name)) {
1428 column.qualified_name = Some(qualified_name.replace(
1429 &format!("{}.", name),
1430 &format!("{}.", alias),
1431 ));
1432 }
1433 }
1434 if column.source_table.as_ref() == Some(name) {
1436 column.source_table = Some(alias.clone());
1437 }
1438 }
1439 }
1440
1441 Arc::new(materialized)
1442 } else {
1443 return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
1446 }
1447 }
1448 TableSource::DerivedTable { query, alias: _ } => {
1449 let subquery_result = self.build_view_with_context(
1451 table.clone(),
1452 *query.clone(),
1453 cte_context,
1454 )?;
1455 let materialized = self.materialize_view(subquery_result)?;
1456 Arc::new(materialized)
1457 }
1458 TableSource::Pivot { .. } => {
1459 return Err(anyhow!("PIVOT in JOIN clause is not yet supported"));
1461 }
1462 };
1463
1464 let joined = join_executor.execute_join(
1466 current_table.clone(),
1467 join_clause,
1468 right_table.clone(),
1469 )?;
1470
1471 plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
1472 plan.set_rows_out(joined.row_count());
1473 plan.add_detail(format!("Result: {} rows", joined.row_count()));
1474 plan.add_detail(format!(
1475 "Join time: {:.3}ms",
1476 join_start.elapsed().as_secs_f64() * 1000.0
1477 ));
1478 plan.end_step();
1479
1480 current_table = Arc::new(joined);
1481 }
1482
1483 plan.set_rows_out(current_table.row_count());
1484 plan.add_detail(format!(
1485 "Final result after all joins: {} rows",
1486 current_table.row_count()
1487 ));
1488 plan.end_step();
1489 current_table
1490 } else {
1491 source_table
1492 };
1493
1494 self.build_view_internal_with_plan_and_exec(
1496 final_table,
1497 statement,
1498 plan,
1499 Some(exec_context),
1500 )
1501 }
1502
1503 pub fn materialize_view(&self, view: DataView) -> Result<DataTable> {
1505 let source = view.source();
1506 let mut result_table = DataTable::new("derived");
1507
1508 let visible_cols = view.visible_column_indices().to_vec();
1510
1511 for col_idx in &visible_cols {
1513 let col = &source.columns[*col_idx];
1514 let new_col = DataColumn {
1515 name: col.name.clone(),
1516 data_type: col.data_type.clone(),
1517 nullable: col.nullable,
1518 unique_values: col.unique_values,
1519 null_count: col.null_count,
1520 metadata: col.metadata.clone(),
1521 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), };
1524 result_table.add_column(new_col);
1525 }
1526
1527 for row_idx in view.visible_row_indices() {
1529 let source_row = &source.rows[*row_idx];
1530 let mut new_row = DataRow { values: Vec::new() };
1531
1532 for col_idx in &visible_cols {
1533 new_row.values.push(source_row.values[*col_idx].clone());
1534 }
1535
1536 result_table.add_row(new_row);
1537 }
1538
1539 Ok(result_table)
1540 }
1541
1542 fn build_view_internal(
1543 &self,
1544 table: Arc<DataTable>,
1545 statement: SelectStatement,
1546 ) -> Result<DataView> {
1547 let mut dummy_plan = ExecutionPlanBuilder::new();
1548 self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
1549 }
1550
1551 fn build_view_internal_with_plan(
1552 &self,
1553 table: Arc<DataTable>,
1554 statement: SelectStatement,
1555 plan: &mut ExecutionPlanBuilder,
1556 ) -> Result<DataView> {
1557 self.build_view_internal_with_plan_and_exec(table, statement, plan, None)
1558 }
1559
1560 fn build_view_internal_with_plan_and_exec(
1561 &self,
1562 table: Arc<DataTable>,
1563 statement: SelectStatement,
1564 plan: &mut ExecutionPlanBuilder,
1565 exec_context: Option<&ExecutionContext>,
1566 ) -> Result<DataView> {
1567 debug!(
1568 "QueryEngine::build_view - select_items: {:?}",
1569 statement.select_items
1570 );
1571 debug!(
1572 "QueryEngine::build_view - where_clause: {:?}",
1573 statement.where_clause
1574 );
1575
1576 let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
1578
1579 if let Some(where_clause) = &statement.where_clause {
1581 let total_rows = table.row_count();
1582 debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
1583 debug!("QueryEngine: WHERE clause = {:?}", where_clause);
1584
1585 plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
1586 plan.set_rows_in(total_rows);
1587 plan.add_detail(format!("Input: {} rows", total_rows));
1588
1589 for condition in &where_clause.conditions {
1591 plan.add_detail(format!("Condition: {:?}", condition.expr));
1592 }
1593
1594 let filter_start = Instant::now();
1595 let mut eval_context = EvaluationContext::new(self.case_insensitive);
1597
1598 let mut evaluator = if let Some(exec_ctx) = exec_context {
1600 RecursiveWhereEvaluator::with_both_contexts(&table, &mut eval_context, exec_ctx)
1602 } else {
1603 RecursiveWhereEvaluator::with_context(&table, &mut eval_context)
1604 };
1605
1606 let mut filtered_rows = Vec::new();
1608 for row_idx in visible_rows {
1609 if row_idx < 3 {
1611 debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
1612 }
1613
1614 match evaluator.evaluate(where_clause, row_idx) {
1615 Ok(result) => {
1616 if row_idx < 3 {
1617 debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
1618 }
1619 if result {
1620 filtered_rows.push(row_idx);
1621 }
1622 }
1623 Err(e) => {
1624 if row_idx < 3 {
1625 debug!(
1626 "QueryEngine: WHERE evaluation error for row {}: {}",
1627 row_idx, e
1628 );
1629 }
1630 return Err(e);
1632 }
1633 }
1634 }
1635
1636 let (compilations, cache_hits) = eval_context.get_stats();
1638 if compilations > 0 || cache_hits > 0 {
1639 debug!(
1640 "LIKE pattern cache: {} compilations, {} cache hits",
1641 compilations, cache_hits
1642 );
1643 }
1644 visible_rows = filtered_rows;
1645 let filter_duration = filter_start.elapsed();
1646 info!(
1647 "WHERE clause filtering: {} rows -> {} rows in {:?}",
1648 total_rows,
1649 visible_rows.len(),
1650 filter_duration
1651 );
1652
1653 plan.set_rows_out(visible_rows.len());
1654 plan.add_detail(format!("Output: {} rows", visible_rows.len()));
1655 plan.add_detail(format!(
1656 "Filter time: {:.3}ms",
1657 filter_duration.as_secs_f64() * 1000.0
1658 ));
1659 plan.end_step();
1660 }
1661
1662 let mut view = DataView::new(table.clone());
1664 view = view.with_rows(visible_rows);
1665
1666 if let Some(group_by_exprs) = &statement.group_by {
1668 if !group_by_exprs.is_empty() {
1669 debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
1670
1671 plan.begin_step(
1672 StepType::GroupBy,
1673 format!("GROUP BY {} expressions", group_by_exprs.len()),
1674 );
1675 plan.set_rows_in(view.row_count());
1676 plan.add_detail(format!("Input: {} rows", view.row_count()));
1677 for expr in group_by_exprs {
1678 plan.add_detail(format!("Group by: {:?}", expr));
1679 }
1680
1681 let group_start = Instant::now();
1682 view = self.apply_group_by(
1683 view,
1684 group_by_exprs,
1685 &statement.select_items,
1686 statement.having.as_ref(),
1687 plan,
1688 )?;
1689
1690 use crate::query_plan::having_alias_transformer::HIDDEN_AGG_PREFIX;
1693 let hidden_indices: Vec<usize> = view
1694 .source()
1695 .columns
1696 .iter()
1697 .enumerate()
1698 .filter_map(|(i, c)| {
1699 if c.name.starts_with(HIDDEN_AGG_PREFIX) {
1700 Some(i)
1701 } else {
1702 None
1703 }
1704 })
1705 .collect();
1706 for &idx in hidden_indices.iter().rev() {
1707 view.hide_column(idx);
1708 }
1709
1710 plan.set_rows_out(view.row_count());
1711 plan.add_detail(format!("Output: {} groups", view.row_count()));
1712 plan.add_detail(format!(
1713 "Overall time: {:.3}ms",
1714 group_start.elapsed().as_secs_f64() * 1000.0
1715 ));
1716 plan.end_step();
1717 }
1718 } else {
1719 if !statement.select_items.is_empty() {
1721 let has_non_star_items = statement
1723 .select_items
1724 .iter()
1725 .any(|item| !matches!(item, SelectItem::Star { .. }));
1726
1727 if has_non_star_items || statement.select_items.len() > 1 {
1731 view = self.apply_select_items(
1732 view,
1733 &statement.select_items,
1734 &statement,
1735 exec_context,
1736 plan,
1737 )?;
1738 }
1739 } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
1741 debug!("QueryEngine: Using legacy columns path");
1742 let source_table = view.source();
1745 let column_indices =
1746 self.resolve_column_indices(source_table, &statement.columns)?;
1747 view = view.with_columns(column_indices);
1748 }
1749 }
1750
1751 if statement.distinct {
1753 plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
1754 plan.set_rows_in(view.row_count());
1755 plan.add_detail(format!("Input: {} rows", view.row_count()));
1756
1757 let distinct_start = Instant::now();
1758 view = self.apply_distinct(view)?;
1759
1760 plan.set_rows_out(view.row_count());
1761 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1762 plan.add_detail(format!(
1763 "Distinct time: {:.3}ms",
1764 distinct_start.elapsed().as_secs_f64() * 1000.0
1765 ));
1766 plan.end_step();
1767 }
1768
1769 if let Some(order_by_columns) = &statement.order_by {
1771 if !order_by_columns.is_empty() {
1772 plan.begin_step(
1773 StepType::Sort,
1774 format!("ORDER BY {} columns", order_by_columns.len()),
1775 );
1776 plan.set_rows_in(view.row_count());
1777 for col in order_by_columns {
1778 let expr_str = match &col.expr {
1780 SqlExpression::Column(col_ref) => col_ref.name.clone(),
1781 _ => "expr".to_string(),
1782 };
1783 plan.add_detail(format!("{} {:?}", expr_str, col.direction));
1784 }
1785
1786 let sort_start = Instant::now();
1787 view =
1788 self.apply_multi_order_by_with_context(view, order_by_columns, exec_context)?;
1789
1790 plan.add_detail(format!(
1791 "Sort time: {:.3}ms",
1792 sort_start.elapsed().as_secs_f64() * 1000.0
1793 ));
1794 plan.end_step();
1795 }
1796 }
1797
1798 {
1803 use crate::query_plan::order_by_alias_transformer::HIDDEN_ORDERBY_PREFIX;
1804 let hidden_indices: Vec<usize> = view
1805 .source()
1806 .columns
1807 .iter()
1808 .enumerate()
1809 .filter_map(|(i, c)| {
1810 if c.name.starts_with(HIDDEN_ORDERBY_PREFIX) {
1811 Some(i)
1812 } else {
1813 None
1814 }
1815 })
1816 .collect();
1817 for &idx in hidden_indices.iter().rev() {
1818 view.hide_column(idx);
1819 }
1820 }
1821
1822 if let Some(limit) = statement.limit {
1824 let offset = statement.offset.unwrap_or(0);
1825 plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1826 plan.set_rows_in(view.row_count());
1827 if offset > 0 {
1828 plan.add_detail(format!("OFFSET: {}", offset));
1829 }
1830 view = view.with_limit(limit, offset);
1831 plan.set_rows_out(view.row_count());
1832 plan.add_detail(format!("Output: {} rows", view.row_count()));
1833 plan.end_step();
1834 }
1835
1836 if !statement.set_operations.is_empty() {
1838 plan.begin_step(
1839 StepType::SetOperation,
1840 format!("Process {} set operations", statement.set_operations.len()),
1841 );
1842 plan.set_rows_in(view.row_count());
1843
1844 let mut combined_table = self.materialize_view(view)?;
1846 let first_columns = combined_table.column_names();
1847 let first_column_count = first_columns.len();
1848
1849 let mut needs_deduplication = false;
1851
1852 for (idx, (operation, next_statement)) in statement.set_operations.iter().enumerate() {
1854 let op_start = Instant::now();
1855 plan.begin_step(
1856 StepType::SetOperation,
1857 format!("{:?} operation #{}", operation, idx + 1),
1858 );
1859
1860 let next_view = if let Some(exec_ctx) = exec_context {
1863 self.build_view_internal_with_plan_and_exec(
1864 table.clone(),
1865 *next_statement.clone(),
1866 plan,
1867 Some(exec_ctx),
1868 )?
1869 } else {
1870 self.build_view_internal_with_plan(
1871 table.clone(),
1872 *next_statement.clone(),
1873 plan,
1874 )?
1875 };
1876
1877 let next_table = self.materialize_view(next_view)?;
1879 let next_columns = next_table.column_names();
1880 let next_column_count = next_columns.len();
1881
1882 if first_column_count != next_column_count {
1884 return Err(anyhow!(
1885 "UNION queries must have the same number of columns: first query has {} columns, but query #{} has {} columns",
1886 first_column_count,
1887 idx + 2,
1888 next_column_count
1889 ));
1890 }
1891
1892 for (col_idx, (first_col, next_col)) in
1894 first_columns.iter().zip(next_columns.iter()).enumerate()
1895 {
1896 if !first_col.eq_ignore_ascii_case(next_col) {
1897 debug!(
1898 "UNION column name mismatch at position {}: '{}' vs '{}' (using first query's name)",
1899 col_idx + 1,
1900 first_col,
1901 next_col
1902 );
1903 }
1904 }
1905
1906 plan.add_detail(format!("Left: {} rows", combined_table.row_count()));
1907 plan.add_detail(format!("Right: {} rows", next_table.row_count()));
1908
1909 match operation {
1911 SetOperation::UnionAll => {
1912 for row in next_table.rows.iter() {
1914 combined_table.add_row(row.clone());
1915 }
1916 plan.add_detail(format!(
1917 "Result: {} rows (no deduplication)",
1918 combined_table.row_count()
1919 ));
1920 }
1921 SetOperation::Union => {
1922 for row in next_table.rows.iter() {
1924 combined_table.add_row(row.clone());
1925 }
1926 needs_deduplication = true;
1927 plan.add_detail(format!(
1928 "Combined: {} rows (deduplication pending)",
1929 combined_table.row_count()
1930 ));
1931 }
1932 SetOperation::Intersect => {
1933 return Err(anyhow!("INTERSECT is not yet implemented"));
1936 }
1937 SetOperation::Except => {
1938 return Err(anyhow!("EXCEPT is not yet implemented"));
1941 }
1942 }
1943
1944 plan.add_detail(format!(
1945 "Operation time: {:.3}ms",
1946 op_start.elapsed().as_secs_f64() * 1000.0
1947 ));
1948 plan.set_rows_out(combined_table.row_count());
1949 plan.end_step();
1950 }
1951
1952 plan.set_rows_out(combined_table.row_count());
1953 plan.add_detail(format!(
1954 "Combined result: {} rows after {} operations",
1955 combined_table.row_count(),
1956 statement.set_operations.len()
1957 ));
1958 plan.end_step();
1959
1960 view = DataView::new(Arc::new(combined_table));
1962
1963 if needs_deduplication {
1965 plan.begin_step(
1966 StepType::Distinct,
1967 "UNION deduplication - remove duplicate rows".to_string(),
1968 );
1969 plan.set_rows_in(view.row_count());
1970 plan.add_detail(format!("Input: {} rows", view.row_count()));
1971
1972 let distinct_start = Instant::now();
1973 view = self.apply_distinct(view)?;
1974
1975 plan.set_rows_out(view.row_count());
1976 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1977 plan.add_detail(format!(
1978 "Deduplication time: {:.3}ms",
1979 distinct_start.elapsed().as_secs_f64() * 1000.0
1980 ));
1981 plan.end_step();
1982 }
1983 }
1984
1985 Ok(view)
1986 }
1987
1988 fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1990 let mut indices = Vec::new();
1991 let table_columns = table.column_names();
1992
1993 for col_name in columns {
1994 let index = table_columns
1995 .iter()
1996 .position(|c| c.eq_ignore_ascii_case(col_name))
1997 .ok_or_else(|| {
1998 let suggestion = self.find_similar_column(table, col_name);
1999 match suggestion {
2000 Some(similar) => anyhow::anyhow!(
2001 "Column '{}' not found. Did you mean '{}'?",
2002 col_name,
2003 similar
2004 ),
2005 None => anyhow::anyhow!("Column '{}' not found", col_name),
2006 }
2007 })?;
2008 indices.push(index);
2009 }
2010
2011 Ok(indices)
2012 }
2013
2014 fn apply_select_items(
2016 &self,
2017 view: DataView,
2018 select_items: &[SelectItem],
2019 _statement: &SelectStatement,
2020 exec_context: Option<&ExecutionContext>,
2021 plan: &mut ExecutionPlanBuilder,
2022 ) -> Result<DataView> {
2023 debug!(
2024 "QueryEngine::apply_select_items - items: {:?}",
2025 select_items
2026 );
2027 debug!(
2028 "QueryEngine::apply_select_items - input view has {} rows",
2029 view.row_count()
2030 );
2031
2032 let has_window_functions = select_items.iter().any(|item| match item {
2034 SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
2035 _ => false,
2036 });
2037
2038 let window_func_count: usize = select_items
2040 .iter()
2041 .filter(|item| match item {
2042 SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
2043 _ => false,
2044 })
2045 .count();
2046
2047 let window_start = if has_window_functions {
2049 debug!(
2050 "QueryEngine::apply_select_items - detected {} window functions",
2051 window_func_count
2052 );
2053
2054 let window_specs = Self::extract_window_specs(select_items);
2056 debug!("Extracted {} window function specs", window_specs.len());
2057
2058 Some(Instant::now())
2059 } else {
2060 None
2061 };
2062
2063 let has_unnest = select_items.iter().any(|item| match item {
2065 SelectItem::Expression { expr, .. } => Self::contains_unnest(expr),
2066 _ => false,
2067 });
2068
2069 if has_unnest {
2070 debug!("QueryEngine::apply_select_items - UNNEST detected, using row expansion");
2071 return self.apply_select_with_row_expansion(view, select_items);
2072 }
2073
2074 let has_aggregates = select_items.iter().any(|item| match item {
2078 SelectItem::Expression { expr, .. } => contains_aggregate(expr),
2079 SelectItem::Column { .. } => false,
2080 SelectItem::Star { .. } => false,
2081 SelectItem::StarExclude { .. } => false,
2082 });
2083
2084 let all_aggregate_compatible = select_items.iter().all(|item| match item {
2085 SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
2086 SelectItem::Column { .. } => false, SelectItem::Star { .. } => false, SelectItem::StarExclude { .. } => false, });
2090
2091 if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
2092 debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
2095 return self.apply_aggregate_select(view, select_items);
2096 }
2097
2098 let has_computed_expressions = select_items
2100 .iter()
2101 .any(|item| matches!(item, SelectItem::Expression { .. }));
2102
2103 debug!(
2104 "QueryEngine::apply_select_items - has_computed_expressions: {}",
2105 has_computed_expressions
2106 );
2107
2108 if !has_computed_expressions {
2109 let column_indices = self.resolve_select_columns(view.source(), select_items)?;
2111 return Ok(view.with_columns(column_indices));
2112 }
2113
2114 let source_table = view.source();
2119 let visible_rows = view.visible_row_indices();
2120
2121 let mut computed_table = DataTable::new("query_result");
2124
2125 let mut expanded_items = Vec::new();
2127 for item in select_items {
2128 match item {
2129 SelectItem::Star { table_prefix, .. } => {
2130 if let Some(prefix) = table_prefix {
2131 debug!("QueryEngine::apply_select_items - expanding {}.*", prefix);
2133 for col in &source_table.columns {
2134 if Self::column_matches_table(col, prefix) {
2135 expanded_items.push(SelectItem::Column {
2136 column: ColumnRef::unquoted(col.name.clone()),
2137 leading_comments: vec![],
2138 trailing_comment: None,
2139 });
2140 }
2141 }
2142 } else {
2143 debug!("QueryEngine::apply_select_items - expanding *");
2145 for col_name in source_table.column_names() {
2146 expanded_items.push(SelectItem::Column {
2147 column: ColumnRef::unquoted(col_name.to_string()),
2148 leading_comments: vec![],
2149 trailing_comment: None,
2150 });
2151 }
2152 }
2153 }
2154 _ => expanded_items.push(item.clone()),
2155 }
2156 }
2157
2158 let mut column_name_counts: std::collections::HashMap<String, usize> =
2160 std::collections::HashMap::new();
2161
2162 for item in &expanded_items {
2163 let base_name = match item {
2164 SelectItem::Column {
2165 column: col_ref, ..
2166 } => col_ref.name.clone(),
2167 SelectItem::Expression { alias, .. } => alias.clone(),
2168 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2169 SelectItem::StarExclude { .. } => {
2170 unreachable!("StarExclude should have been expanded")
2171 }
2172 };
2173
2174 let count = column_name_counts.entry(base_name.clone()).or_insert(0);
2176 let column_name = if *count == 0 {
2177 base_name.clone()
2179 } else {
2180 format!("{base_name}_{count}")
2182 };
2183 *count += 1;
2184
2185 computed_table.add_column(DataColumn::new(&column_name));
2186 }
2187
2188 let can_use_batch = expanded_items.iter().all(|item| {
2192 match item {
2193 SelectItem::Expression { expr, .. } => {
2194 matches!(expr, SqlExpression::WindowFunction { .. })
2197 || !Self::contains_window_function(expr)
2198 }
2199 _ => true, }
2201 });
2202
2203 let use_batch_evaluation = can_use_batch
2206 && std::env::var("SQL_CLI_BATCH_WINDOW")
2207 .map(|v| v != "0" && v.to_lowercase() != "false")
2208 .unwrap_or(true);
2209
2210 let batch_window_specs = if use_batch_evaluation && has_window_functions {
2212 debug!("BATCH window function evaluation flag is enabled");
2213 let specs = Self::extract_window_specs(&expanded_items);
2215 debug!(
2216 "Extracted {} window function specs for batch evaluation",
2217 specs.len()
2218 );
2219 Some(specs)
2220 } else {
2221 None
2222 };
2223
2224 let mut evaluator =
2226 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2227
2228 if let Some(exec_ctx) = exec_context {
2230 let aliases = exec_ctx.get_aliases();
2231 if !aliases.is_empty() {
2232 debug!(
2233 "Applying {} aliases to evaluator: {:?}",
2234 aliases.len(),
2235 aliases
2236 );
2237 evaluator = evaluator.with_table_aliases(aliases);
2238 }
2239 }
2240
2241 if has_window_functions {
2244 let preload_start = Instant::now();
2245
2246 let mut window_specs = Vec::new();
2248 for item in &expanded_items {
2249 if let SelectItem::Expression { expr, .. } = item {
2250 Self::collect_window_specs(expr, &mut window_specs);
2251 }
2252 }
2253
2254 for spec in &window_specs {
2256 let _ = evaluator.get_or_create_window_context(spec);
2257 }
2258
2259 debug!(
2260 "Pre-created {} WindowContext(s) in {:.2}ms",
2261 window_specs.len(),
2262 preload_start.elapsed().as_secs_f64() * 1000.0
2263 );
2264 }
2265
2266 if let Some(window_specs) = batch_window_specs {
2268 debug!("Starting batch window function evaluation");
2269 let batch_start = Instant::now();
2270
2271 let mut batch_results: Vec<Vec<DataValue>> =
2273 vec![vec![DataValue::Null; expanded_items.len()]; visible_rows.len()];
2274
2275 let detailed_window_specs = &window_specs;
2277
2278 let mut specs_by_window: HashMap<
2280 u64,
2281 Vec<&crate::data::batch_window_evaluator::WindowFunctionSpec>,
2282 > = HashMap::new();
2283 for spec in detailed_window_specs {
2284 let hash = spec.spec.compute_hash();
2285 specs_by_window
2286 .entry(hash)
2287 .or_insert_with(Vec::new)
2288 .push(spec);
2289 }
2290
2291 for (_window_hash, specs) in specs_by_window {
2293 let context = evaluator.get_or_create_window_context(&specs[0].spec)?;
2295
2296 for spec in specs {
2298 match spec.function_name.as_str() {
2299 "LAG" => {
2300 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2302 let column_name = col_ref.name.as_str();
2303 let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2304 spec.args.get(1)
2305 {
2306 n.parse::<i64>().unwrap_or(1)
2307 } else {
2308 1 };
2310
2311 let values = context.evaluate_lag_batch(
2312 visible_rows,
2313 column_name,
2314 offset,
2315 )?;
2316
2317 for (row_idx, value) in values.into_iter().enumerate() {
2319 batch_results[row_idx][spec.output_column_index] = value;
2320 }
2321 }
2322 }
2323 "LEAD" => {
2324 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2326 let column_name = col_ref.name.as_str();
2327 let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2328 spec.args.get(1)
2329 {
2330 n.parse::<i64>().unwrap_or(1)
2331 } else {
2332 1 };
2334
2335 let values = context.evaluate_lead_batch(
2336 visible_rows,
2337 column_name,
2338 offset,
2339 )?;
2340
2341 for (row_idx, value) in values.into_iter().enumerate() {
2343 batch_results[row_idx][spec.output_column_index] = value;
2344 }
2345 }
2346 }
2347 "ROW_NUMBER" => {
2348 let values = context.evaluate_row_number_batch(visible_rows)?;
2349
2350 for (row_idx, value) in values.into_iter().enumerate() {
2352 batch_results[row_idx][spec.output_column_index] = value;
2353 }
2354 }
2355 "RANK" => {
2356 let values = context.evaluate_rank_batch(visible_rows)?;
2357
2358 for (row_idx, value) in values.into_iter().enumerate() {
2360 batch_results[row_idx][spec.output_column_index] = value;
2361 }
2362 }
2363 "DENSE_RANK" => {
2364 let values = context.evaluate_dense_rank_batch(visible_rows)?;
2365
2366 for (row_idx, value) in values.into_iter().enumerate() {
2368 batch_results[row_idx][spec.output_column_index] = value;
2369 }
2370 }
2371 "SUM" => {
2372 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2373 let column_name = col_ref.name.as_str();
2374 let values =
2375 context.evaluate_sum_batch(visible_rows, column_name)?;
2376
2377 for (row_idx, value) in values.into_iter().enumerate() {
2378 batch_results[row_idx][spec.output_column_index] = value;
2379 }
2380 }
2381 }
2382 "AVG" => {
2383 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2384 let column_name = col_ref.name.as_str();
2385 let values =
2386 context.evaluate_avg_batch(visible_rows, column_name)?;
2387
2388 for (row_idx, value) in values.into_iter().enumerate() {
2389 batch_results[row_idx][spec.output_column_index] = value;
2390 }
2391 }
2392 }
2393 "MIN" => {
2394 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2395 let column_name = col_ref.name.as_str();
2396 let values =
2397 context.evaluate_min_batch(visible_rows, column_name)?;
2398
2399 for (row_idx, value) in values.into_iter().enumerate() {
2400 batch_results[row_idx][spec.output_column_index] = value;
2401 }
2402 }
2403 }
2404 "MAX" => {
2405 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2406 let column_name = col_ref.name.as_str();
2407 let values =
2408 context.evaluate_max_batch(visible_rows, column_name)?;
2409
2410 for (row_idx, value) in values.into_iter().enumerate() {
2411 batch_results[row_idx][spec.output_column_index] = value;
2412 }
2413 }
2414 }
2415 "COUNT" => {
2416 let column_name = match spec.args.get(0) {
2418 Some(SqlExpression::Column(col_ref)) => Some(col_ref.name.as_str()),
2419 Some(SqlExpression::StringLiteral(s)) if s == "*" => None,
2420 _ => None,
2421 };
2422
2423 let values = context.evaluate_count_batch(visible_rows, column_name)?;
2424
2425 for (row_idx, value) in values.into_iter().enumerate() {
2426 batch_results[row_idx][spec.output_column_index] = value;
2427 }
2428 }
2429 "FIRST_VALUE" => {
2430 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2431 let column_name = col_ref.name.as_str();
2432 let values = context
2433 .evaluate_first_value_batch(visible_rows, column_name)?;
2434
2435 for (row_idx, value) in values.into_iter().enumerate() {
2436 batch_results[row_idx][spec.output_column_index] = value;
2437 }
2438 }
2439 }
2440 "LAST_VALUE" => {
2441 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2442 let column_name = col_ref.name.as_str();
2443 let values =
2444 context.evaluate_last_value_batch(visible_rows, column_name)?;
2445
2446 for (row_idx, value) in values.into_iter().enumerate() {
2447 batch_results[row_idx][spec.output_column_index] = value;
2448 }
2449 }
2450 }
2451 _ => {
2452 debug!(
2454 "Window function {} not supported in batch mode, using per-row",
2455 spec.function_name
2456 );
2457 }
2458 }
2459 }
2460 }
2461
2462 for (result_row_idx, &source_row_idx) in visible_rows.iter().enumerate() {
2464 for (col_idx, item) in expanded_items.iter().enumerate() {
2465 if !matches!(batch_results[result_row_idx][col_idx], DataValue::Null) {
2467 continue;
2468 }
2469
2470 let value = match item {
2471 SelectItem::Column {
2472 column: col_ref, ..
2473 } => {
2474 match evaluator
2475 .evaluate(&SqlExpression::Column(col_ref.clone()), source_row_idx)
2476 {
2477 Ok(val) => val,
2478 Err(e) => {
2479 return Err(anyhow!(
2480 "Failed to evaluate column {}: {}",
2481 col_ref.to_sql(),
2482 e
2483 ));
2484 }
2485 }
2486 }
2487 SelectItem::Expression { expr, .. } => {
2488 if matches!(expr, SqlExpression::WindowFunction { .. }) {
2491 continue;
2493 }
2494 evaluator.evaluate(&expr, source_row_idx)?
2497 }
2498 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2499 SelectItem::StarExclude { .. } => {
2500 unreachable!("StarExclude should have been expanded")
2501 }
2502 };
2503 batch_results[result_row_idx][col_idx] = value;
2504 }
2505 }
2506
2507 for row_values in batch_results {
2509 computed_table
2510 .add_row(DataRow::new(row_values))
2511 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2512 }
2513
2514 debug!(
2515 "Batch window evaluation completed in {:.3}ms",
2516 batch_start.elapsed().as_secs_f64() * 1000.0
2517 );
2518 } else {
2519 for &row_idx in visible_rows {
2521 let mut row_values = Vec::new();
2522
2523 for item in &expanded_items {
2524 let value = match item {
2525 SelectItem::Column {
2526 column: col_ref, ..
2527 } => {
2528 match evaluator
2530 .evaluate(&SqlExpression::Column(col_ref.clone()), row_idx)
2531 {
2532 Ok(val) => val,
2533 Err(e) => {
2534 return Err(anyhow!(
2535 "Failed to evaluate column {}: {}",
2536 col_ref.to_sql(),
2537 e
2538 ));
2539 }
2540 }
2541 }
2542 SelectItem::Expression { expr, .. } => {
2543 evaluator.evaluate(&expr, row_idx)?
2545 }
2546 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2547 SelectItem::StarExclude { .. } => {
2548 unreachable!("StarExclude should have been expanded")
2549 }
2550 };
2551 row_values.push(value);
2552 }
2553
2554 computed_table
2555 .add_row(DataRow::new(row_values))
2556 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2557 }
2558 }
2559
2560 if let Some(start) = window_start {
2562 let window_duration = start.elapsed();
2563 info!(
2564 "Window function evaluation took {:.2}ms for {} rows ({} window functions)",
2565 window_duration.as_secs_f64() * 1000.0,
2566 visible_rows.len(),
2567 window_func_count
2568 );
2569
2570 plan.begin_step(
2572 StepType::WindowFunction,
2573 format!("Evaluate {} window function(s)", window_func_count),
2574 );
2575 plan.set_rows_in(visible_rows.len());
2576 plan.set_rows_out(visible_rows.len());
2577 plan.add_detail(format!("Input: {} rows", visible_rows.len()));
2578 plan.add_detail(format!("{} window functions evaluated", window_func_count));
2579 plan.add_detail(format!(
2580 "Evaluation time: {:.3}ms",
2581 window_duration.as_secs_f64() * 1000.0
2582 ));
2583 plan.end_step();
2584 }
2585
2586 Ok(DataView::new(Arc::new(computed_table)))
2589 }
2590
2591 fn apply_select_with_row_expansion(
2593 &self,
2594 view: DataView,
2595 select_items: &[SelectItem],
2596 ) -> Result<DataView> {
2597 debug!("QueryEngine::apply_select_with_row_expansion - expanding rows");
2598
2599 let source_table = view.source();
2600 let visible_rows = view.visible_row_indices();
2601 let expander_registry = RowExpanderRegistry::new();
2602
2603 let mut result_table = DataTable::new("unnest_result");
2605
2606 let mut expanded_items = Vec::new();
2608 for item in select_items {
2609 match item {
2610 SelectItem::Star { table_prefix, .. } => {
2611 if let Some(prefix) = table_prefix {
2612 debug!(
2614 "QueryEngine::apply_select_with_row_expansion - expanding {}.*",
2615 prefix
2616 );
2617 for col in &source_table.columns {
2618 if Self::column_matches_table(col, prefix) {
2619 expanded_items.push(SelectItem::Column {
2620 column: ColumnRef::unquoted(col.name.clone()),
2621 leading_comments: vec![],
2622 trailing_comment: None,
2623 });
2624 }
2625 }
2626 } else {
2627 debug!("QueryEngine::apply_select_with_row_expansion - expanding *");
2629 for col_name in source_table.column_names() {
2630 expanded_items.push(SelectItem::Column {
2631 column: ColumnRef::unquoted(col_name.to_string()),
2632 leading_comments: vec![],
2633 trailing_comment: None,
2634 });
2635 }
2636 }
2637 }
2638 _ => expanded_items.push(item.clone()),
2639 }
2640 }
2641
2642 for item in &expanded_items {
2644 let column_name = match item {
2645 SelectItem::Column {
2646 column: col_ref, ..
2647 } => col_ref.name.clone(),
2648 SelectItem::Expression { alias, .. } => alias.clone(),
2649 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2650 SelectItem::StarExclude { .. } => {
2651 unreachable!("StarExclude should have been expanded")
2652 }
2653 };
2654 result_table.add_column(DataColumn::new(&column_name));
2655 }
2656
2657 let mut evaluator =
2659 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2660
2661 for &row_idx in visible_rows {
2662 let mut unnest_expansions = Vec::new();
2664 let mut unnest_indices = Vec::new();
2665
2666 for (col_idx, item) in expanded_items.iter().enumerate() {
2667 if let SelectItem::Expression { expr, .. } = item {
2668 if let Some(expansion_result) = self.try_expand_unnest(
2669 &expr,
2670 source_table,
2671 row_idx,
2672 &mut evaluator,
2673 &expander_registry,
2674 )? {
2675 unnest_expansions.push(expansion_result);
2676 unnest_indices.push(col_idx);
2677 }
2678 }
2679 }
2680
2681 let expansion_count = if unnest_expansions.is_empty() {
2683 1 } else {
2685 unnest_expansions
2686 .iter()
2687 .map(|exp| exp.row_count())
2688 .max()
2689 .unwrap_or(1)
2690 };
2691
2692 for output_idx in 0..expansion_count {
2694 let mut row_values = Vec::new();
2695
2696 for (col_idx, item) in expanded_items.iter().enumerate() {
2697 let unnest_position = unnest_indices.iter().position(|&idx| idx == col_idx);
2699
2700 let value = if let Some(unnest_idx) = unnest_position {
2701 let expansion = &unnest_expansions[unnest_idx];
2703 expansion
2704 .values
2705 .get(output_idx)
2706 .cloned()
2707 .unwrap_or(DataValue::Null)
2708 } else {
2709 match item {
2711 SelectItem::Column {
2712 column: col_ref, ..
2713 } => {
2714 let col_idx =
2715 source_table.get_column_index(&col_ref.name).ok_or_else(
2716 || anyhow::anyhow!("Column '{}' not found", col_ref.name),
2717 )?;
2718 let row = source_table
2719 .get_row(row_idx)
2720 .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
2721 row.get(col_idx)
2722 .ok_or_else(|| {
2723 anyhow::anyhow!("Column {} not found in row", col_idx)
2724 })?
2725 .clone()
2726 }
2727 SelectItem::Expression { expr, .. } => {
2728 evaluator.evaluate(&expr, row_idx)?
2730 }
2731 SelectItem::Star { .. } => unreachable!(),
2732 SelectItem::StarExclude { .. } => {
2733 unreachable!("StarExclude should have been expanded")
2734 }
2735 }
2736 };
2737
2738 row_values.push(value);
2739 }
2740
2741 result_table
2742 .add_row(DataRow::new(row_values))
2743 .map_err(|e| anyhow::anyhow!("Failed to add expanded row: {}", e))?;
2744 }
2745 }
2746
2747 debug!(
2748 "QueryEngine::apply_select_with_row_expansion - input rows: {}, output rows: {}",
2749 visible_rows.len(),
2750 result_table.row_count()
2751 );
2752
2753 Ok(DataView::new(Arc::new(result_table)))
2754 }
2755
2756 fn try_expand_unnest(
2759 &self,
2760 expr: &SqlExpression,
2761 _source_table: &DataTable,
2762 row_idx: usize,
2763 evaluator: &mut ArithmeticEvaluator,
2764 expander_registry: &RowExpanderRegistry,
2765 ) -> Result<Option<crate::data::row_expanders::ExpansionResult>> {
2766 if let SqlExpression::Unnest { column, delimiter } = expr {
2768 let column_value = evaluator.evaluate(column, row_idx)?;
2770
2771 let delimiter_value = DataValue::String(delimiter.clone());
2773
2774 let expander = expander_registry
2776 .get("UNNEST")
2777 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2778
2779 let expansion = expander.expand(&column_value, &[delimiter_value])?;
2781 return Ok(Some(expansion));
2782 }
2783
2784 if let SqlExpression::FunctionCall { name, args, .. } = expr {
2786 if name.to_uppercase() == "UNNEST" {
2787 if args.len() != 2 {
2789 return Err(anyhow::anyhow!(
2790 "UNNEST requires exactly 2 arguments: UNNEST(column, delimiter)"
2791 ));
2792 }
2793
2794 let column_value = evaluator.evaluate(&args[0], row_idx)?;
2796
2797 let delimiter_value = evaluator.evaluate(&args[1], row_idx)?;
2799
2800 let expander = expander_registry
2802 .get("UNNEST")
2803 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2804
2805 let expansion = expander.expand(&column_value, &[delimiter_value])?;
2807 return Ok(Some(expansion));
2808 }
2809 }
2810
2811 Ok(None)
2812 }
2813
2814 fn apply_aggregate_select(
2816 &self,
2817 view: DataView,
2818 select_items: &[SelectItem],
2819 ) -> Result<DataView> {
2820 debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
2821
2822 let source_table = view.source();
2823 let mut result_table = DataTable::new("aggregate_result");
2824
2825 for item in select_items {
2827 let column_name = match item {
2828 SelectItem::Expression { alias, .. } => alias.clone(),
2829 _ => unreachable!("Should only have expressions in aggregate-only query"),
2830 };
2831 result_table.add_column(DataColumn::new(&column_name));
2832 }
2833
2834 let visible_rows = view.visible_row_indices().to_vec();
2836 let mut evaluator =
2837 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
2838 .with_visible_rows(visible_rows);
2839
2840 let mut row_values = Vec::new();
2842 for item in select_items {
2843 match item {
2844 SelectItem::Expression { expr, .. } => {
2845 let value = evaluator.evaluate(expr, 0)?;
2848 row_values.push(value);
2849 }
2850 _ => unreachable!("Should only have expressions in aggregate-only query"),
2851 }
2852 }
2853
2854 result_table
2856 .add_row(DataRow::new(row_values))
2857 .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
2858
2859 Ok(DataView::new(Arc::new(result_table)))
2860 }
2861
2862 fn column_matches_table(col: &DataColumn, table_name: &str) -> bool {
2874 if let Some(ref source) = col.source_table {
2876 if source == table_name || source.ends_with(&format!(".{}", table_name)) {
2878 return true;
2879 }
2880 }
2881
2882 if let Some(ref qualified) = col.qualified_name {
2884 if qualified.starts_with(&format!("{}.", table_name)) {
2886 return true;
2887 }
2888 }
2889
2890 false
2891 }
2892
2893 fn resolve_select_columns(
2895 &self,
2896 table: &DataTable,
2897 select_items: &[SelectItem],
2898 ) -> Result<Vec<usize>> {
2899 let mut indices = Vec::new();
2900 let table_columns = table.column_names();
2901
2902 for item in select_items {
2903 match item {
2904 SelectItem::Column {
2905 column: col_ref, ..
2906 } => {
2907 let index = if let Some(table_prefix) = &col_ref.table_prefix {
2909 let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
2911 table.find_column_by_qualified_name(&qualified_name)
2912 .ok_or_else(|| {
2913 let has_qualified = table.columns.iter()
2915 .any(|c| c.qualified_name.is_some());
2916 if !has_qualified {
2917 anyhow::anyhow!(
2918 "Column '{}' not found. Note: Table '{}' may not support qualified column names",
2919 qualified_name, table_prefix
2920 )
2921 } else {
2922 anyhow::anyhow!("Column '{}' not found", qualified_name)
2923 }
2924 })?
2925 } else {
2926 table_columns
2928 .iter()
2929 .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
2930 .ok_or_else(|| {
2931 let suggestion = self.find_similar_column(table, &col_ref.name);
2932 match suggestion {
2933 Some(similar) => anyhow::anyhow!(
2934 "Column '{}' not found. Did you mean '{}'?",
2935 col_ref.name,
2936 similar
2937 ),
2938 None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
2939 }
2940 })?
2941 };
2942 indices.push(index);
2943 }
2944 SelectItem::Star { table_prefix, .. } => {
2945 if let Some(prefix) = table_prefix {
2946 for (i, col) in table.columns.iter().enumerate() {
2948 if Self::column_matches_table(col, prefix) {
2949 indices.push(i);
2950 }
2951 }
2952 } else {
2953 for i in 0..table_columns.len() {
2955 indices.push(i);
2956 }
2957 }
2958 }
2959 SelectItem::StarExclude {
2960 table_prefix,
2961 excluded_columns,
2962 ..
2963 } => {
2964 if let Some(prefix) = table_prefix {
2966 for (i, col) in table.columns.iter().enumerate() {
2968 if Self::column_matches_table(col, prefix)
2969 && !excluded_columns.contains(&col.name)
2970 {
2971 indices.push(i);
2972 }
2973 }
2974 } else {
2975 for (i, col_name) in table_columns.iter().enumerate() {
2977 if !excluded_columns
2978 .iter()
2979 .any(|exc| exc.eq_ignore_ascii_case(col_name))
2980 {
2981 indices.push(i);
2982 }
2983 }
2984 }
2985 }
2986 SelectItem::Expression { .. } => {
2987 return Err(anyhow::anyhow!(
2988 "Computed expressions require new table creation"
2989 ));
2990 }
2991 }
2992 }
2993
2994 Ok(indices)
2995 }
2996
2997 fn apply_distinct(&self, view: DataView) -> Result<DataView> {
2999 use std::collections::HashSet;
3000
3001 let source = view.source();
3002 let visible_cols = view.visible_column_indices();
3003 let visible_rows = view.visible_row_indices();
3004
3005 let mut seen_rows = HashSet::new();
3007 let mut unique_row_indices = Vec::new();
3008
3009 for &row_idx in visible_rows {
3010 let mut row_key = Vec::new();
3012 for &col_idx in visible_cols {
3013 let value = source
3014 .get_value(row_idx, col_idx)
3015 .ok_or_else(|| anyhow!("Invalid cell reference"))?;
3016 row_key.push(format!("{:?}", value));
3018 }
3019
3020 if seen_rows.insert(row_key) {
3022 unique_row_indices.push(row_idx);
3024 }
3025 }
3026
3027 Ok(view.with_rows(unique_row_indices))
3029 }
3030
3031 fn apply_multi_order_by(
3033 &self,
3034 view: DataView,
3035 order_by_columns: &[OrderByItem],
3036 ) -> Result<DataView> {
3037 self.apply_multi_order_by_with_context(view, order_by_columns, None)
3038 }
3039
3040 fn apply_multi_order_by_with_context(
3042 &self,
3043 mut view: DataView,
3044 order_by_columns: &[OrderByItem],
3045 _exec_context: Option<&ExecutionContext>,
3046 ) -> Result<DataView> {
3047 let mut sort_columns = Vec::new();
3049
3050 for order_col in order_by_columns {
3051 let column_name = match &order_col.expr {
3053 SqlExpression::Column(col_ref) => col_ref.name.clone(),
3054 _ => {
3055 return Err(anyhow!(
3057 "ORDER BY expressions not yet supported - only simple columns allowed"
3058 ));
3059 }
3060 };
3061
3062 let col_index = if column_name.contains('.') {
3064 if let Some(dot_pos) = column_name.rfind('.') {
3066 let col_name = &column_name[dot_pos + 1..];
3067
3068 debug!(
3071 "ORDER BY: Extracting unqualified column '{}' from '{}'",
3072 col_name, column_name
3073 );
3074 view.source().get_column_index(col_name)
3075 } else {
3076 view.source().get_column_index(&column_name)
3077 }
3078 } else {
3079 view.source().get_column_index(&column_name)
3081 }
3082 .ok_or_else(|| {
3083 let suggestion = self.find_similar_column(view.source(), &column_name);
3085 match suggestion {
3086 Some(similar) => anyhow::anyhow!(
3087 "Column '{}' not found. Did you mean '{}'?",
3088 column_name,
3089 similar
3090 ),
3091 None => {
3092 let available_cols = view.source().column_names().join(", ");
3094 anyhow::anyhow!(
3095 "Column '{}' not found. Available columns: {}",
3096 column_name,
3097 available_cols
3098 )
3099 }
3100 }
3101 })?;
3102
3103 let ascending = matches!(order_col.direction, SortDirection::Asc);
3104 sort_columns.push((col_index, ascending));
3105 }
3106
3107 view.apply_multi_sort(&sort_columns)?;
3109 Ok(view)
3110 }
3111
3112 fn apply_group_by(
3114 &self,
3115 view: DataView,
3116 group_by_exprs: &[SqlExpression],
3117 select_items: &[SelectItem],
3118 having: Option<&SqlExpression>,
3119 plan: &mut ExecutionPlanBuilder,
3120 ) -> Result<DataView> {
3121 let (result_view, phase_info) = self.apply_group_by_expressions(
3123 view,
3124 group_by_exprs,
3125 select_items,
3126 having,
3127 self.case_insensitive,
3128 self.date_notation.clone(),
3129 )?;
3130
3131 plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
3133 plan.add_detail(format!(
3134 "Phase 1 - Group Building: {:.3}ms",
3135 phase_info.phase2_key_building.as_secs_f64() * 1000.0
3136 ));
3137 plan.add_detail(format!(
3138 " • Processing {} rows into {} groups",
3139 phase_info.total_rows, phase_info.num_groups
3140 ));
3141 plan.add_detail(format!(
3142 "Phase 2 - Aggregation: {:.3}ms",
3143 phase_info.phase4_aggregation.as_secs_f64() * 1000.0
3144 ));
3145 if phase_info.phase4_having_evaluation > Duration::ZERO {
3146 plan.add_detail(format!(
3147 "Phase 3 - HAVING Filter: {:.3}ms",
3148 phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
3149 ));
3150 plan.add_detail(format!(
3151 " • Filtered {} groups",
3152 phase_info.groups_filtered_by_having
3153 ));
3154 }
3155 plan.add_detail(format!(
3156 "Total GROUP BY time: {:.3}ms",
3157 phase_info.total_time.as_secs_f64() * 1000.0
3158 ));
3159
3160 Ok(result_view)
3161 }
3162
3163 pub fn estimate_group_cardinality(
3166 &self,
3167 view: &DataView,
3168 group_by_exprs: &[SqlExpression],
3169 ) -> usize {
3170 let row_count = view.get_visible_rows().len();
3172 if row_count <= 100 {
3173 return row_count;
3174 }
3175
3176 let sample_size = min(1000, row_count / 10).max(100);
3178 let mut seen = FxHashSet::default();
3179
3180 let visible_rows = view.get_visible_rows();
3181 for (i, &row_idx) in visible_rows.iter().enumerate() {
3182 if i >= sample_size {
3183 break;
3184 }
3185
3186 let mut key_values = Vec::new();
3188 for expr in group_by_exprs {
3189 let mut evaluator = ArithmeticEvaluator::new(view.source());
3190 let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
3191 key_values.push(value);
3192 }
3193
3194 seen.insert(key_values);
3195 }
3196
3197 let sample_cardinality = seen.len();
3199 let estimated = (sample_cardinality * row_count) / sample_size;
3200
3201 estimated.min(row_count).max(sample_cardinality)
3203 }
3204}
3205
3206#[cfg(test)]
3207mod tests {
3208 use super::*;
3209 use crate::data::datatable::{DataColumn, DataRow, DataValue};
3210
3211 fn create_test_table() -> Arc<DataTable> {
3212 let mut table = DataTable::new("test");
3213
3214 table.add_column(DataColumn::new("id"));
3216 table.add_column(DataColumn::new("name"));
3217 table.add_column(DataColumn::new("age"));
3218
3219 table
3221 .add_row(DataRow::new(vec![
3222 DataValue::Integer(1),
3223 DataValue::String("Alice".to_string()),
3224 DataValue::Integer(30),
3225 ]))
3226 .unwrap();
3227
3228 table
3229 .add_row(DataRow::new(vec![
3230 DataValue::Integer(2),
3231 DataValue::String("Bob".to_string()),
3232 DataValue::Integer(25),
3233 ]))
3234 .unwrap();
3235
3236 table
3237 .add_row(DataRow::new(vec![
3238 DataValue::Integer(3),
3239 DataValue::String("Charlie".to_string()),
3240 DataValue::Integer(35),
3241 ]))
3242 .unwrap();
3243
3244 Arc::new(table)
3245 }
3246
3247 #[test]
3248 fn test_select_all() {
3249 let table = create_test_table();
3250 let engine = QueryEngine::new();
3251
3252 let view = engine
3253 .execute(table.clone(), "SELECT * FROM users")
3254 .unwrap();
3255 assert_eq!(view.row_count(), 3);
3256 assert_eq!(view.column_count(), 3);
3257 }
3258
3259 #[test]
3260 fn test_select_columns() {
3261 let table = create_test_table();
3262 let engine = QueryEngine::new();
3263
3264 let view = engine
3265 .execute(table.clone(), "SELECT name, age FROM users")
3266 .unwrap();
3267 assert_eq!(view.row_count(), 3);
3268 assert_eq!(view.column_count(), 2);
3269 }
3270
3271 #[test]
3272 fn test_select_with_limit() {
3273 let table = create_test_table();
3274 let engine = QueryEngine::new();
3275
3276 let view = engine
3277 .execute(table.clone(), "SELECT * FROM users LIMIT 2")
3278 .unwrap();
3279 assert_eq!(view.row_count(), 2);
3280 }
3281
3282 #[test]
3283 fn test_type_coercion_contains() {
3284 let _ = tracing_subscriber::fmt()
3286 .with_max_level(tracing::Level::DEBUG)
3287 .try_init();
3288
3289 let mut table = DataTable::new("test");
3290 table.add_column(DataColumn::new("id"));
3291 table.add_column(DataColumn::new("status"));
3292 table.add_column(DataColumn::new("price"));
3293
3294 table
3296 .add_row(DataRow::new(vec![
3297 DataValue::Integer(1),
3298 DataValue::String("Pending".to_string()),
3299 DataValue::Float(99.99),
3300 ]))
3301 .unwrap();
3302
3303 table
3304 .add_row(DataRow::new(vec![
3305 DataValue::Integer(2),
3306 DataValue::String("Confirmed".to_string()),
3307 DataValue::Float(150.50),
3308 ]))
3309 .unwrap();
3310
3311 table
3312 .add_row(DataRow::new(vec![
3313 DataValue::Integer(3),
3314 DataValue::String("Pending".to_string()),
3315 DataValue::Float(75.00),
3316 ]))
3317 .unwrap();
3318
3319 let table = Arc::new(table);
3320 let engine = QueryEngine::new();
3321
3322 println!("\n=== Testing WHERE clause with Contains ===");
3323 println!("Table has {} rows", table.row_count());
3324 for i in 0..table.row_count() {
3325 let status = table.get_value(i, 1);
3326 println!("Row {i}: status = {status:?}");
3327 }
3328
3329 println!("\n--- Test 1: status.Contains('pend') ---");
3331 let result = engine.execute(
3332 table.clone(),
3333 "SELECT * FROM test WHERE status.Contains('pend')",
3334 );
3335 match result {
3336 Ok(view) => {
3337 println!("SUCCESS: Found {} matching rows", view.row_count());
3338 assert_eq!(view.row_count(), 2); }
3340 Err(e) => {
3341 panic!("Query failed: {e}");
3342 }
3343 }
3344
3345 println!("\n--- Test 2: price.Contains('9') ---");
3347 let result = engine.execute(
3348 table.clone(),
3349 "SELECT * FROM test WHERE price.Contains('9')",
3350 );
3351 match result {
3352 Ok(view) => {
3353 println!(
3354 "SUCCESS: Found {} matching rows with price containing '9'",
3355 view.row_count()
3356 );
3357 assert!(view.row_count() >= 1);
3359 }
3360 Err(e) => {
3361 panic!("Numeric coercion query failed: {e}");
3362 }
3363 }
3364
3365 println!("\n=== All tests passed! ===");
3366 }
3367
3368 #[test]
3369 fn test_not_in_clause() {
3370 let _ = tracing_subscriber::fmt()
3372 .with_max_level(tracing::Level::DEBUG)
3373 .try_init();
3374
3375 let mut table = DataTable::new("test");
3376 table.add_column(DataColumn::new("id"));
3377 table.add_column(DataColumn::new("country"));
3378
3379 table
3381 .add_row(DataRow::new(vec![
3382 DataValue::Integer(1),
3383 DataValue::String("CA".to_string()),
3384 ]))
3385 .unwrap();
3386
3387 table
3388 .add_row(DataRow::new(vec![
3389 DataValue::Integer(2),
3390 DataValue::String("US".to_string()),
3391 ]))
3392 .unwrap();
3393
3394 table
3395 .add_row(DataRow::new(vec![
3396 DataValue::Integer(3),
3397 DataValue::String("UK".to_string()),
3398 ]))
3399 .unwrap();
3400
3401 let table = Arc::new(table);
3402 let engine = QueryEngine::new();
3403
3404 println!("\n=== Testing NOT IN clause ===");
3405 println!("Table has {} rows", table.row_count());
3406 for i in 0..table.row_count() {
3407 let country = table.get_value(i, 1);
3408 println!("Row {i}: country = {country:?}");
3409 }
3410
3411 println!("\n--- Test: country NOT IN ('CA') ---");
3413 let result = engine.execute(
3414 table.clone(),
3415 "SELECT * FROM test WHERE country NOT IN ('CA')",
3416 );
3417 match result {
3418 Ok(view) => {
3419 println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
3420 assert_eq!(view.row_count(), 2); }
3422 Err(e) => {
3423 panic!("NOT IN query failed: {e}");
3424 }
3425 }
3426
3427 println!("\n=== NOT IN test complete! ===");
3428 }
3429
3430 #[test]
3431 fn test_case_insensitive_in_and_not_in() {
3432 let _ = tracing_subscriber::fmt()
3434 .with_max_level(tracing::Level::DEBUG)
3435 .try_init();
3436
3437 let mut table = DataTable::new("test");
3438 table.add_column(DataColumn::new("id"));
3439 table.add_column(DataColumn::new("country"));
3440
3441 table
3443 .add_row(DataRow::new(vec![
3444 DataValue::Integer(1),
3445 DataValue::String("CA".to_string()), ]))
3447 .unwrap();
3448
3449 table
3450 .add_row(DataRow::new(vec![
3451 DataValue::Integer(2),
3452 DataValue::String("us".to_string()), ]))
3454 .unwrap();
3455
3456 table
3457 .add_row(DataRow::new(vec![
3458 DataValue::Integer(3),
3459 DataValue::String("UK".to_string()), ]))
3461 .unwrap();
3462
3463 let table = Arc::new(table);
3464
3465 println!("\n=== Testing Case-Insensitive IN clause ===");
3466 println!("Table has {} rows", table.row_count());
3467 for i in 0..table.row_count() {
3468 let country = table.get_value(i, 1);
3469 println!("Row {i}: country = {country:?}");
3470 }
3471
3472 println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
3474 let engine = QueryEngine::with_case_insensitive(true);
3475 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3476 match result {
3477 Ok(view) => {
3478 println!(
3479 "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
3480 view.row_count()
3481 );
3482 assert_eq!(view.row_count(), 1); }
3484 Err(e) => {
3485 panic!("Case-insensitive IN query failed: {e}");
3486 }
3487 }
3488
3489 println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
3491 let result = engine.execute(
3492 table.clone(),
3493 "SELECT * FROM test WHERE country NOT IN ('ca')",
3494 );
3495 match result {
3496 Ok(view) => {
3497 println!(
3498 "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
3499 view.row_count()
3500 );
3501 assert_eq!(view.row_count(), 2); }
3503 Err(e) => {
3504 panic!("Case-insensitive NOT IN query failed: {e}");
3505 }
3506 }
3507
3508 println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
3510 let engine_case_sensitive = QueryEngine::new(); let result = engine_case_sensitive
3512 .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3513 match result {
3514 Ok(view) => {
3515 println!(
3516 "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
3517 view.row_count()
3518 );
3519 assert_eq!(view.row_count(), 0); }
3521 Err(e) => {
3522 panic!("Case-sensitive IN query failed: {e}");
3523 }
3524 }
3525
3526 println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
3527 }
3528
3529 #[test]
3530 #[ignore = "Parentheses in WHERE clause not yet implemented"]
3531 fn test_parentheses_in_where_clause() {
3532 let _ = tracing_subscriber::fmt()
3534 .with_max_level(tracing::Level::DEBUG)
3535 .try_init();
3536
3537 let mut table = DataTable::new("test");
3538 table.add_column(DataColumn::new("id"));
3539 table.add_column(DataColumn::new("status"));
3540 table.add_column(DataColumn::new("priority"));
3541
3542 table
3544 .add_row(DataRow::new(vec![
3545 DataValue::Integer(1),
3546 DataValue::String("Pending".to_string()),
3547 DataValue::String("High".to_string()),
3548 ]))
3549 .unwrap();
3550
3551 table
3552 .add_row(DataRow::new(vec![
3553 DataValue::Integer(2),
3554 DataValue::String("Complete".to_string()),
3555 DataValue::String("High".to_string()),
3556 ]))
3557 .unwrap();
3558
3559 table
3560 .add_row(DataRow::new(vec![
3561 DataValue::Integer(3),
3562 DataValue::String("Pending".to_string()),
3563 DataValue::String("Low".to_string()),
3564 ]))
3565 .unwrap();
3566
3567 table
3568 .add_row(DataRow::new(vec![
3569 DataValue::Integer(4),
3570 DataValue::String("Complete".to_string()),
3571 DataValue::String("Low".to_string()),
3572 ]))
3573 .unwrap();
3574
3575 let table = Arc::new(table);
3576 let engine = QueryEngine::new();
3577
3578 println!("\n=== Testing Parentheses in WHERE clause ===");
3579 println!("Table has {} rows", table.row_count());
3580 for i in 0..table.row_count() {
3581 let status = table.get_value(i, 1);
3582 let priority = table.get_value(i, 2);
3583 println!("Row {i}: status = {status:?}, priority = {priority:?}");
3584 }
3585
3586 println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
3588 let result = engine.execute(
3589 table.clone(),
3590 "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
3591 );
3592 match result {
3593 Ok(view) => {
3594 println!(
3595 "SUCCESS: Found {} rows with parenthetical logic",
3596 view.row_count()
3597 );
3598 assert_eq!(view.row_count(), 2); }
3600 Err(e) => {
3601 panic!("Parentheses query failed: {e}");
3602 }
3603 }
3604
3605 println!("\n=== Parentheses test complete! ===");
3606 }
3607
3608 #[test]
3609 #[ignore = "Numeric type coercion needs fixing"]
3610 fn test_numeric_type_coercion() {
3611 let _ = tracing_subscriber::fmt()
3613 .with_max_level(tracing::Level::DEBUG)
3614 .try_init();
3615
3616 let mut table = DataTable::new("test");
3617 table.add_column(DataColumn::new("id"));
3618 table.add_column(DataColumn::new("price"));
3619 table.add_column(DataColumn::new("quantity"));
3620
3621 table
3623 .add_row(DataRow::new(vec![
3624 DataValue::Integer(1),
3625 DataValue::Float(99.50), DataValue::Integer(100),
3627 ]))
3628 .unwrap();
3629
3630 table
3631 .add_row(DataRow::new(vec![
3632 DataValue::Integer(2),
3633 DataValue::Float(150.0), DataValue::Integer(200),
3635 ]))
3636 .unwrap();
3637
3638 table
3639 .add_row(DataRow::new(vec![
3640 DataValue::Integer(3),
3641 DataValue::Integer(75), DataValue::Integer(50),
3643 ]))
3644 .unwrap();
3645
3646 let table = Arc::new(table);
3647 let engine = QueryEngine::new();
3648
3649 println!("\n=== Testing Numeric Type Coercion ===");
3650 println!("Table has {} rows", table.row_count());
3651 for i in 0..table.row_count() {
3652 let price = table.get_value(i, 1);
3653 let quantity = table.get_value(i, 2);
3654 println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
3655 }
3656
3657 println!("\n--- Test: price.Contains('.') ---");
3659 let result = engine.execute(
3660 table.clone(),
3661 "SELECT * FROM test WHERE price.Contains('.')",
3662 );
3663 match result {
3664 Ok(view) => {
3665 println!(
3666 "SUCCESS: Found {} rows with decimal points in price",
3667 view.row_count()
3668 );
3669 assert_eq!(view.row_count(), 2); }
3671 Err(e) => {
3672 panic!("Numeric Contains query failed: {e}");
3673 }
3674 }
3675
3676 println!("\n--- Test: quantity.Contains('0') ---");
3678 let result = engine.execute(
3679 table.clone(),
3680 "SELECT * FROM test WHERE quantity.Contains('0')",
3681 );
3682 match result {
3683 Ok(view) => {
3684 println!(
3685 "SUCCESS: Found {} rows with '0' in quantity",
3686 view.row_count()
3687 );
3688 assert_eq!(view.row_count(), 2); }
3690 Err(e) => {
3691 panic!("Integer Contains query failed: {e}");
3692 }
3693 }
3694
3695 println!("\n=== Numeric type coercion test complete! ===");
3696 }
3697
3698 #[test]
3699 fn test_datetime_comparisons() {
3700 let _ = tracing_subscriber::fmt()
3702 .with_max_level(tracing::Level::DEBUG)
3703 .try_init();
3704
3705 let mut table = DataTable::new("test");
3706 table.add_column(DataColumn::new("id"));
3707 table.add_column(DataColumn::new("created_date"));
3708
3709 table
3711 .add_row(DataRow::new(vec![
3712 DataValue::Integer(1),
3713 DataValue::String("2024-12-15".to_string()),
3714 ]))
3715 .unwrap();
3716
3717 table
3718 .add_row(DataRow::new(vec![
3719 DataValue::Integer(2),
3720 DataValue::String("2025-01-15".to_string()),
3721 ]))
3722 .unwrap();
3723
3724 table
3725 .add_row(DataRow::new(vec![
3726 DataValue::Integer(3),
3727 DataValue::String("2025-02-15".to_string()),
3728 ]))
3729 .unwrap();
3730
3731 let table = Arc::new(table);
3732 let engine = QueryEngine::new();
3733
3734 println!("\n=== Testing DateTime Comparisons ===");
3735 println!("Table has {} rows", table.row_count());
3736 for i in 0..table.row_count() {
3737 let date = table.get_value(i, 1);
3738 println!("Row {i}: created_date = {date:?}");
3739 }
3740
3741 println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
3743 let result = engine.execute(
3744 table.clone(),
3745 "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
3746 );
3747 match result {
3748 Ok(view) => {
3749 println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
3750 assert_eq!(view.row_count(), 2); }
3752 Err(e) => {
3753 panic!("DateTime comparison query failed: {e}");
3754 }
3755 }
3756
3757 println!("\n=== DateTime comparison test complete! ===");
3758 }
3759
3760 #[test]
3761 fn test_not_with_method_calls() {
3762 let _ = tracing_subscriber::fmt()
3764 .with_max_level(tracing::Level::DEBUG)
3765 .try_init();
3766
3767 let mut table = DataTable::new("test");
3768 table.add_column(DataColumn::new("id"));
3769 table.add_column(DataColumn::new("status"));
3770
3771 table
3773 .add_row(DataRow::new(vec![
3774 DataValue::Integer(1),
3775 DataValue::String("Pending Review".to_string()),
3776 ]))
3777 .unwrap();
3778
3779 table
3780 .add_row(DataRow::new(vec![
3781 DataValue::Integer(2),
3782 DataValue::String("Complete".to_string()),
3783 ]))
3784 .unwrap();
3785
3786 table
3787 .add_row(DataRow::new(vec![
3788 DataValue::Integer(3),
3789 DataValue::String("Pending Approval".to_string()),
3790 ]))
3791 .unwrap();
3792
3793 let table = Arc::new(table);
3794 let engine = QueryEngine::with_case_insensitive(true);
3795
3796 println!("\n=== Testing NOT with Method Calls ===");
3797 println!("Table has {} rows", table.row_count());
3798 for i in 0..table.row_count() {
3799 let status = table.get_value(i, 1);
3800 println!("Row {i}: status = {status:?}");
3801 }
3802
3803 println!("\n--- Test: NOT status.Contains('pend') ---");
3805 let result = engine.execute(
3806 table.clone(),
3807 "SELECT * FROM test WHERE NOT status.Contains('pend')",
3808 );
3809 match result {
3810 Ok(view) => {
3811 println!(
3812 "SUCCESS: Found {} rows NOT containing 'pend'",
3813 view.row_count()
3814 );
3815 assert_eq!(view.row_count(), 1); }
3817 Err(e) => {
3818 panic!("NOT Contains query failed: {e}");
3819 }
3820 }
3821
3822 println!("\n--- Test: NOT status.StartsWith('Pending') ---");
3824 let result = engine.execute(
3825 table.clone(),
3826 "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
3827 );
3828 match result {
3829 Ok(view) => {
3830 println!(
3831 "SUCCESS: Found {} rows NOT starting with 'Pending'",
3832 view.row_count()
3833 );
3834 assert_eq!(view.row_count(), 1); }
3836 Err(e) => {
3837 panic!("NOT StartsWith query failed: {e}");
3838 }
3839 }
3840
3841 println!("\n=== NOT with method calls test complete! ===");
3842 }
3843
3844 #[test]
3845 #[ignore = "Complex logical expressions with parentheses not yet implemented"]
3846 fn test_complex_logical_expressions() {
3847 let _ = tracing_subscriber::fmt()
3849 .with_max_level(tracing::Level::DEBUG)
3850 .try_init();
3851
3852 let mut table = DataTable::new("test");
3853 table.add_column(DataColumn::new("id"));
3854 table.add_column(DataColumn::new("status"));
3855 table.add_column(DataColumn::new("priority"));
3856 table.add_column(DataColumn::new("assigned"));
3857
3858 table
3860 .add_row(DataRow::new(vec![
3861 DataValue::Integer(1),
3862 DataValue::String("Pending".to_string()),
3863 DataValue::String("High".to_string()),
3864 DataValue::String("John".to_string()),
3865 ]))
3866 .unwrap();
3867
3868 table
3869 .add_row(DataRow::new(vec![
3870 DataValue::Integer(2),
3871 DataValue::String("Complete".to_string()),
3872 DataValue::String("High".to_string()),
3873 DataValue::String("Jane".to_string()),
3874 ]))
3875 .unwrap();
3876
3877 table
3878 .add_row(DataRow::new(vec![
3879 DataValue::Integer(3),
3880 DataValue::String("Pending".to_string()),
3881 DataValue::String("Low".to_string()),
3882 DataValue::String("John".to_string()),
3883 ]))
3884 .unwrap();
3885
3886 table
3887 .add_row(DataRow::new(vec![
3888 DataValue::Integer(4),
3889 DataValue::String("In Progress".to_string()),
3890 DataValue::String("Medium".to_string()),
3891 DataValue::String("Jane".to_string()),
3892 ]))
3893 .unwrap();
3894
3895 let table = Arc::new(table);
3896 let engine = QueryEngine::new();
3897
3898 println!("\n=== Testing Complex Logical Expressions ===");
3899 println!("Table has {} rows", table.row_count());
3900 for i in 0..table.row_count() {
3901 let status = table.get_value(i, 1);
3902 let priority = table.get_value(i, 2);
3903 let assigned = table.get_value(i, 3);
3904 println!(
3905 "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
3906 );
3907 }
3908
3909 println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
3911 let result = engine.execute(
3912 table.clone(),
3913 "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
3914 );
3915 match result {
3916 Ok(view) => {
3917 println!(
3918 "SUCCESS: Found {} rows with complex logic",
3919 view.row_count()
3920 );
3921 assert_eq!(view.row_count(), 2); }
3923 Err(e) => {
3924 panic!("Complex logic query failed: {e}");
3925 }
3926 }
3927
3928 println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
3930 let result = engine.execute(
3931 table.clone(),
3932 "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
3933 );
3934 match result {
3935 Ok(view) => {
3936 println!(
3937 "SUCCESS: Found {} rows with NOT complex logic",
3938 view.row_count()
3939 );
3940 assert_eq!(view.row_count(), 2); }
3942 Err(e) => {
3943 panic!("NOT complex logic query failed: {e}");
3944 }
3945 }
3946
3947 println!("\n=== Complex logical expressions test complete! ===");
3948 }
3949
3950 #[test]
3951 fn test_mixed_data_types_and_edge_cases() {
3952 let _ = tracing_subscriber::fmt()
3954 .with_max_level(tracing::Level::DEBUG)
3955 .try_init();
3956
3957 let mut table = DataTable::new("test");
3958 table.add_column(DataColumn::new("id"));
3959 table.add_column(DataColumn::new("value"));
3960 table.add_column(DataColumn::new("nullable_field"));
3961
3962 table
3964 .add_row(DataRow::new(vec![
3965 DataValue::Integer(1),
3966 DataValue::String("123.45".to_string()),
3967 DataValue::String("present".to_string()),
3968 ]))
3969 .unwrap();
3970
3971 table
3972 .add_row(DataRow::new(vec![
3973 DataValue::Integer(2),
3974 DataValue::Float(678.90),
3975 DataValue::Null,
3976 ]))
3977 .unwrap();
3978
3979 table
3980 .add_row(DataRow::new(vec![
3981 DataValue::Integer(3),
3982 DataValue::Boolean(true),
3983 DataValue::String("also present".to_string()),
3984 ]))
3985 .unwrap();
3986
3987 table
3988 .add_row(DataRow::new(vec![
3989 DataValue::Integer(4),
3990 DataValue::String("false".to_string()),
3991 DataValue::Null,
3992 ]))
3993 .unwrap();
3994
3995 let table = Arc::new(table);
3996 let engine = QueryEngine::new();
3997
3998 println!("\n=== Testing Mixed Data Types and Edge Cases ===");
3999 println!("Table has {} rows", table.row_count());
4000 for i in 0..table.row_count() {
4001 let value = table.get_value(i, 1);
4002 let nullable = table.get_value(i, 2);
4003 println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
4004 }
4005
4006 println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
4008 let result = engine.execute(
4009 table.clone(),
4010 "SELECT * FROM test WHERE value.Contains('true')",
4011 );
4012 match result {
4013 Ok(view) => {
4014 println!(
4015 "SUCCESS: Found {} rows with boolean coercion",
4016 view.row_count()
4017 );
4018 assert_eq!(view.row_count(), 1); }
4020 Err(e) => {
4021 panic!("Boolean coercion query failed: {e}");
4022 }
4023 }
4024
4025 println!("\n--- Test: id IN (1, 3) ---");
4027 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
4028 match result {
4029 Ok(view) => {
4030 println!("SUCCESS: Found {} rows with IN clause", view.row_count());
4031 assert_eq!(view.row_count(), 2); }
4033 Err(e) => {
4034 panic!("Multiple IN values query failed: {e}");
4035 }
4036 }
4037
4038 println!("\n=== Mixed data types test complete! ===");
4039 }
4040
4041 #[test]
4043 fn test_aggregate_only_single_row() {
4044 let table = create_test_stock_data();
4045 let engine = QueryEngine::new();
4046
4047 let result = engine
4049 .execute(
4050 table.clone(),
4051 "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
4052 )
4053 .expect("Query should succeed");
4054
4055 assert_eq!(
4056 result.row_count(),
4057 1,
4058 "Aggregate-only query should return exactly 1 row"
4059 );
4060 assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
4061
4062 let source = result.source();
4064 let row = source.get_row(0).expect("Should have first row");
4065
4066 assert_eq!(row.values[0], DataValue::Integer(5));
4068
4069 assert_eq!(row.values[1], DataValue::Float(99.5));
4071
4072 assert_eq!(row.values[2], DataValue::Float(105.0));
4074
4075 if let DataValue::Float(avg) = &row.values[3] {
4077 assert!(
4078 (avg - 102.4).abs() < 0.01,
4079 "Average should be approximately 102.4, got {}",
4080 avg
4081 );
4082 } else {
4083 panic!("AVG should return a Float value");
4084 }
4085 }
4086
4087 #[test]
4089 fn test_single_aggregate_single_row() {
4090 let table = create_test_stock_data();
4091 let engine = QueryEngine::new();
4092
4093 let result = engine
4094 .execute(table.clone(), "SELECT COUNT(*) FROM stock")
4095 .expect("Query should succeed");
4096
4097 assert_eq!(
4098 result.row_count(),
4099 1,
4100 "Single aggregate query should return exactly 1 row"
4101 );
4102 assert_eq!(result.column_count(), 1, "Should have 1 column");
4103
4104 let source = result.source();
4105 let row = source.get_row(0).expect("Should have first row");
4106 assert_eq!(row.values[0], DataValue::Integer(5));
4107 }
4108
4109 #[test]
4111 fn test_aggregate_with_where_single_row() {
4112 let table = create_test_stock_data();
4113 let engine = QueryEngine::new();
4114
4115 let result = engine
4117 .execute(
4118 table.clone(),
4119 "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
4120 )
4121 .expect("Query should succeed");
4122
4123 assert_eq!(
4124 result.row_count(),
4125 1,
4126 "Filtered aggregate query should return exactly 1 row"
4127 );
4128 assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
4129
4130 let source = result.source();
4131 let row = source.get_row(0).expect("Should have first row");
4132
4133 assert_eq!(row.values[0], DataValue::Integer(2));
4135 assert_eq!(row.values[1], DataValue::Float(103.5)); assert_eq!(row.values[2], DataValue::Float(105.0)); }
4138
4139 #[test]
4140 fn test_not_in_parsing() {
4141 use crate::sql::recursive_parser::Parser;
4142
4143 let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
4144 println!("\n=== Testing NOT IN parsing ===");
4145 println!("Parsing query: {query}");
4146
4147 let mut parser = Parser::new(query);
4148 match parser.parse() {
4149 Ok(statement) => {
4150 println!("Parsed statement: {statement:#?}");
4151 if let Some(where_clause) = statement.where_clause {
4152 println!("WHERE conditions: {:#?}", where_clause.conditions);
4153 if let Some(first_condition) = where_clause.conditions.first() {
4154 println!("First condition expression: {:#?}", first_condition.expr);
4155 }
4156 }
4157 }
4158 Err(e) => {
4159 panic!("Parse error: {e}");
4160 }
4161 }
4162 }
4163
4164 fn create_test_stock_data() -> Arc<DataTable> {
4166 let mut table = DataTable::new("stock");
4167
4168 table.add_column(DataColumn::new("symbol"));
4169 table.add_column(DataColumn::new("close"));
4170 table.add_column(DataColumn::new("volume"));
4171
4172 let test_data = vec![
4174 ("AAPL", 99.5, 1000),
4175 ("AAPL", 101.2, 1500),
4176 ("AAPL", 103.5, 2000),
4177 ("AAPL", 105.0, 1200),
4178 ("AAPL", 102.8, 1800),
4179 ];
4180
4181 for (symbol, close, volume) in test_data {
4182 table
4183 .add_row(DataRow::new(vec![
4184 DataValue::String(symbol.to_string()),
4185 DataValue::Float(close),
4186 DataValue::Integer(volume),
4187 ]))
4188 .expect("Should add row successfully");
4189 }
4190
4191 Arc::new(table)
4192 }
4193}
4194
4195#[cfg(test)]
4196#[path = "query_engine_tests.rs"]
4197mod query_engine_tests;