1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::row_expanders::RowExpanderRegistry;
19use crate::data::subquery_executor::SubqueryExecutor;
20use crate::data::temp_table_registry::TempTableRegistry;
21use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
22use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
23use crate::sql::parser::ast::ColumnRef;
24use crate::sql::parser::ast::SetOperation;
25use crate::sql::parser::ast::TableSource;
26use crate::sql::parser::ast::WindowSpec;
27use crate::sql::recursive_parser::{
28 CTEType, OrderByItem, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
29 TableFunction,
30};
31
32fn resolve_cte<'a>(
37 context: &'a HashMap<String, Arc<DataView>>,
38 name: &str,
39) -> Option<&'a Arc<DataView>> {
40 if let Some(v) = context.get(name) {
41 return Some(v);
42 }
43 let lower = name.to_lowercase();
44 context
45 .iter()
46 .find(|(k, _)| k.to_lowercase() == lower)
47 .map(|(_, v)| v)
48}
49
50#[derive(Debug, Clone)]
52pub struct ExecutionContext {
53 alias_map: HashMap<String, String>,
56}
57
58impl ExecutionContext {
59 pub fn new() -> Self {
61 Self {
62 alias_map: HashMap::new(),
63 }
64 }
65
66 pub fn register_alias(&mut self, alias: String, table_name: String) {
68 debug!("Registering alias: {} -> {}", alias, table_name);
69 self.alias_map.insert(alias, table_name);
70 }
71
72 pub fn resolve_alias(&self, name: &str) -> String {
75 self.alias_map
76 .get(name)
77 .cloned()
78 .unwrap_or_else(|| name.to_string())
79 }
80
81 pub fn is_alias(&self, name: &str) -> bool {
83 self.alias_map.contains_key(name)
84 }
85
86 pub fn get_aliases(&self) -> HashMap<String, String> {
88 self.alias_map.clone()
89 }
90
91 pub fn resolve_column_index(&self, table: &DataTable, column_ref: &ColumnRef) -> Result<usize> {
106 if let Some(table_prefix) = &column_ref.table_prefix {
107 let actual_table = self.resolve_alias(table_prefix);
109
110 let qualified_name = format!("{}.{}", actual_table, column_ref.name);
112 if let Some(idx) = table.find_column_by_qualified_name(&qualified_name) {
113 debug!(
114 "Resolved {}.{} -> qualified column '{}' at index {}",
115 table_prefix, column_ref.name, qualified_name, idx
116 );
117 return Ok(idx);
118 }
119
120 if let Some(idx) = table.get_column_index(&column_ref.name) {
122 debug!(
123 "Resolved {}.{} -> unqualified column '{}' at index {}",
124 table_prefix, column_ref.name, column_ref.name, idx
125 );
126 return Ok(idx);
127 }
128
129 Err(anyhow!(
131 "Column '{}' not found. Table '{}' may not support qualified column names",
132 qualified_name,
133 actual_table
134 ))
135 } else {
136 if let Some(idx) = table.get_column_index(&column_ref.name) {
138 debug!(
139 "Resolved unqualified column '{}' at index {}",
140 column_ref.name, idx
141 );
142 return Ok(idx);
143 }
144
145 if column_ref.name.contains('.') {
147 if let Some(idx) = table.find_column_by_qualified_name(&column_ref.name) {
148 debug!(
149 "Resolved '{}' as qualified column at index {}",
150 column_ref.name, idx
151 );
152 return Ok(idx);
153 }
154 }
155
156 let suggestion = self.find_similar_column(table, &column_ref.name);
158 match suggestion {
159 Some(similar) => Err(anyhow!(
160 "Column '{}' not found. Did you mean '{}'?",
161 column_ref.name,
162 similar
163 )),
164 None => Err(anyhow!("Column '{}' not found", column_ref.name)),
165 }
166 }
167 }
168
169 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
171 let columns = table.column_names();
172 let mut best_match: Option<(String, usize)> = None;
173
174 for col in columns {
175 let distance = edit_distance(name, &col);
176 if distance <= 2 {
177 match best_match {
179 Some((_, best_dist)) if distance < best_dist => {
180 best_match = Some((col.clone(), distance));
181 }
182 None => {
183 best_match = Some((col.clone(), distance));
184 }
185 _ => {}
186 }
187 }
188 }
189
190 best_match.map(|(name, _)| name)
191 }
192}
193
194impl Default for ExecutionContext {
195 fn default() -> Self {
196 Self::new()
197 }
198}
199
200fn edit_distance(a: &str, b: &str) -> usize {
202 let len_a = a.chars().count();
203 let len_b = b.chars().count();
204
205 if len_a == 0 {
206 return len_b;
207 }
208 if len_b == 0 {
209 return len_a;
210 }
211
212 let mut matrix = vec![vec![0; len_b + 1]; len_a + 1];
213
214 for i in 0..=len_a {
215 matrix[i][0] = i;
216 }
217 for j in 0..=len_b {
218 matrix[0][j] = j;
219 }
220
221 let a_chars: Vec<char> = a.chars().collect();
222 let b_chars: Vec<char> = b.chars().collect();
223
224 for i in 1..=len_a {
225 for j in 1..=len_b {
226 let cost = if a_chars[i - 1] == b_chars[j - 1] {
227 0
228 } else {
229 1
230 };
231 matrix[i][j] = min(
232 min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1),
233 matrix[i - 1][j - 1] + cost,
234 );
235 }
236 }
237
238 matrix[len_a][len_b]
239}
240
241#[derive(Clone)]
243pub struct QueryEngine {
244 case_insensitive: bool,
245 date_notation: String,
246 _behavior_config: Option<BehaviorConfig>,
247}
248
249impl Default for QueryEngine {
250 fn default() -> Self {
251 Self::new()
252 }
253}
254
255impl QueryEngine {
256 #[must_use]
257 pub fn new() -> Self {
258 Self {
259 case_insensitive: false,
260 date_notation: get_date_notation(),
261 _behavior_config: None,
262 }
263 }
264
265 #[must_use]
266 pub fn with_behavior_config(config: BehaviorConfig) -> Self {
267 let case_insensitive = config.case_insensitive_default;
268 let date_notation = get_date_notation();
270 Self {
271 case_insensitive,
272 date_notation,
273 _behavior_config: Some(config),
274 }
275 }
276
277 #[must_use]
278 pub fn with_date_notation(_date_notation: String) -> Self {
279 Self {
280 case_insensitive: false,
281 date_notation: get_date_notation(), _behavior_config: None,
283 }
284 }
285
286 #[must_use]
287 pub fn with_case_insensitive(case_insensitive: bool) -> Self {
288 Self {
289 case_insensitive,
290 date_notation: get_date_notation(),
291 _behavior_config: None,
292 }
293 }
294
295 #[must_use]
296 pub fn with_case_insensitive_and_date_notation(
297 case_insensitive: bool,
298 _date_notation: String, ) -> Self {
300 Self {
301 case_insensitive,
302 date_notation: get_date_notation(), _behavior_config: None,
304 }
305 }
306
307 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
309 let columns = table.column_names();
310 let mut best_match: Option<(String, usize)> = None;
311
312 for col in columns {
313 let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
314 let max_distance = if name.len() > 10 { 3 } else { 2 };
317 if distance <= max_distance {
318 match &best_match {
319 None => best_match = Some((col, distance)),
320 Some((_, best_dist)) if distance < *best_dist => {
321 best_match = Some((col, distance));
322 }
323 _ => {}
324 }
325 }
326 }
327
328 best_match.map(|(name, _)| name)
329 }
330
331 fn edit_distance(&self, s1: &str, s2: &str) -> usize {
333 let len1 = s1.len();
334 let len2 = s2.len();
335 let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
336
337 for i in 0..=len1 {
338 matrix[i][0] = i;
339 }
340 for j in 0..=len2 {
341 matrix[0][j] = j;
342 }
343
344 for (i, c1) in s1.chars().enumerate() {
345 for (j, c2) in s2.chars().enumerate() {
346 let cost = usize::from(c1 != c2);
347 matrix[i + 1][j + 1] = std::cmp::min(
348 matrix[i][j + 1] + 1, std::cmp::min(
350 matrix[i + 1][j] + 1, matrix[i][j] + cost, ),
353 );
354 }
355 }
356
357 matrix[len1][len2]
358 }
359
360 fn contains_unnest(expr: &SqlExpression) -> bool {
362 match expr {
363 SqlExpression::Unnest { .. } => true,
365 SqlExpression::FunctionCall { name, args, .. } => {
366 if name.to_uppercase() == "UNNEST" {
367 return true;
368 }
369 args.iter().any(Self::contains_unnest)
371 }
372 SqlExpression::BinaryOp { left, right, .. } => {
373 Self::contains_unnest(left) || Self::contains_unnest(right)
374 }
375 SqlExpression::Not { expr } => Self::contains_unnest(expr),
376 SqlExpression::CaseExpression {
377 when_branches,
378 else_branch,
379 } => {
380 when_branches.iter().any(|branch| {
381 Self::contains_unnest(&branch.condition)
382 || Self::contains_unnest(&branch.result)
383 }) || else_branch
384 .as_ref()
385 .map_or(false, |e| Self::contains_unnest(e))
386 }
387 SqlExpression::SimpleCaseExpression {
388 expr,
389 when_branches,
390 else_branch,
391 } => {
392 Self::contains_unnest(expr)
393 || when_branches.iter().any(|branch| {
394 Self::contains_unnest(&branch.value)
395 || Self::contains_unnest(&branch.result)
396 })
397 || else_branch
398 .as_ref()
399 .map_or(false, |e| Self::contains_unnest(e))
400 }
401 SqlExpression::InList { expr, values } => {
402 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
403 }
404 SqlExpression::NotInList { expr, values } => {
405 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
406 }
407 SqlExpression::Between { expr, lower, upper } => {
408 Self::contains_unnest(expr)
409 || Self::contains_unnest(lower)
410 || Self::contains_unnest(upper)
411 }
412 SqlExpression::InSubquery { expr, .. } => Self::contains_unnest(expr),
413 SqlExpression::NotInSubquery { expr, .. } => Self::contains_unnest(expr),
414 SqlExpression::ScalarSubquery { .. } => false, SqlExpression::WindowFunction { args, .. } => args.iter().any(Self::contains_unnest),
416 SqlExpression::MethodCall { args, .. } => args.iter().any(Self::contains_unnest),
417 SqlExpression::ChainedMethodCall { base, args, .. } => {
418 Self::contains_unnest(base) || args.iter().any(Self::contains_unnest)
419 }
420 _ => false,
421 }
422 }
423
424 fn collect_window_specs(expr: &SqlExpression, specs: &mut Vec<WindowSpec>) {
426 match expr {
427 SqlExpression::WindowFunction {
428 window_spec, args, ..
429 } => {
430 specs.push(window_spec.clone());
432 for arg in args {
434 Self::collect_window_specs(arg, specs);
435 }
436 }
437 SqlExpression::BinaryOp { left, right, .. } => {
438 Self::collect_window_specs(left, specs);
439 Self::collect_window_specs(right, specs);
440 }
441 SqlExpression::Not { expr } => {
442 Self::collect_window_specs(expr, specs);
443 }
444 SqlExpression::FunctionCall { args, .. } => {
445 for arg in args {
446 Self::collect_window_specs(arg, specs);
447 }
448 }
449 SqlExpression::CaseExpression {
450 when_branches,
451 else_branch,
452 } => {
453 for branch in when_branches {
454 Self::collect_window_specs(&branch.condition, specs);
455 Self::collect_window_specs(&branch.result, specs);
456 }
457 if let Some(else_expr) = else_branch {
458 Self::collect_window_specs(else_expr, specs);
459 }
460 }
461 SqlExpression::SimpleCaseExpression {
462 expr,
463 when_branches,
464 else_branch,
465 } => {
466 Self::collect_window_specs(expr, specs);
467 for branch in when_branches {
468 Self::collect_window_specs(&branch.value, specs);
469 Self::collect_window_specs(&branch.result, specs);
470 }
471 if let Some(else_expr) = else_branch {
472 Self::collect_window_specs(else_expr, specs);
473 }
474 }
475 SqlExpression::InList { expr, values, .. } => {
476 Self::collect_window_specs(expr, specs);
477 for item in values {
478 Self::collect_window_specs(item, specs);
479 }
480 }
481 SqlExpression::ChainedMethodCall { base, args, .. } => {
482 Self::collect_window_specs(base, specs);
483 for arg in args {
484 Self::collect_window_specs(arg, specs);
485 }
486 }
487 SqlExpression::Column(_)
489 | SqlExpression::NumberLiteral(_)
490 | SqlExpression::StringLiteral(_)
491 | SqlExpression::BooleanLiteral(_)
492 | SqlExpression::Null
493 | SqlExpression::DateTimeToday { .. }
494 | SqlExpression::DateTimeConstructor { .. }
495 | SqlExpression::MethodCall { .. } => {}
496 _ => {}
498 }
499 }
500
501 fn contains_window_function(expr: &SqlExpression) -> bool {
503 match expr {
504 SqlExpression::WindowFunction { .. } => true,
505 SqlExpression::BinaryOp { left, right, .. } => {
506 Self::contains_window_function(left) || Self::contains_window_function(right)
507 }
508 SqlExpression::Not { expr } => Self::contains_window_function(expr),
509 SqlExpression::FunctionCall { args, .. } => {
510 args.iter().any(Self::contains_window_function)
511 }
512 SqlExpression::CaseExpression {
513 when_branches,
514 else_branch,
515 } => {
516 when_branches.iter().any(|branch| {
517 Self::contains_window_function(&branch.condition)
518 || Self::contains_window_function(&branch.result)
519 }) || else_branch
520 .as_ref()
521 .map_or(false, |e| Self::contains_window_function(e))
522 }
523 SqlExpression::SimpleCaseExpression {
524 expr,
525 when_branches,
526 else_branch,
527 } => {
528 Self::contains_window_function(expr)
529 || when_branches.iter().any(|branch| {
530 Self::contains_window_function(&branch.value)
531 || Self::contains_window_function(&branch.result)
532 })
533 || else_branch
534 .as_ref()
535 .map_or(false, |e| Self::contains_window_function(e))
536 }
537 SqlExpression::InList { expr, values } => {
538 Self::contains_window_function(expr)
539 || values.iter().any(Self::contains_window_function)
540 }
541 SqlExpression::NotInList { expr, values } => {
542 Self::contains_window_function(expr)
543 || values.iter().any(Self::contains_window_function)
544 }
545 SqlExpression::Between { expr, lower, upper } => {
546 Self::contains_window_function(expr)
547 || Self::contains_window_function(lower)
548 || Self::contains_window_function(upper)
549 }
550 SqlExpression::InSubquery { expr, .. } => Self::contains_window_function(expr),
551 SqlExpression::NotInSubquery { expr, .. } => Self::contains_window_function(expr),
552 SqlExpression::MethodCall { args, .. } => {
553 args.iter().any(Self::contains_window_function)
554 }
555 SqlExpression::ChainedMethodCall { base, args, .. } => {
556 Self::contains_window_function(base)
557 || args.iter().any(Self::contains_window_function)
558 }
559 _ => false,
560 }
561 }
562
563 fn extract_window_specs(
565 items: &[SelectItem],
566 ) -> Vec<crate::data::batch_window_evaluator::WindowFunctionSpec> {
567 let mut specs = Vec::new();
568 for (idx, item) in items.iter().enumerate() {
569 if let SelectItem::Expression { expr, .. } = item {
570 Self::collect_window_function_specs(expr, idx, &mut specs);
571 }
572 }
573 specs
574 }
575
576 fn collect_window_function_specs(
578 expr: &SqlExpression,
579 output_column_index: usize,
580 specs: &mut Vec<crate::data::batch_window_evaluator::WindowFunctionSpec>,
581 ) {
582 match expr {
583 SqlExpression::WindowFunction {
584 name,
585 args,
586 window_spec,
587 } => {
588 specs.push(crate::data::batch_window_evaluator::WindowFunctionSpec {
589 spec: window_spec.clone(),
590 function_name: name.clone(),
591 args: args.clone(),
592 output_column_index,
593 });
594 }
595 SqlExpression::BinaryOp { left, right, .. } => {
596 Self::collect_window_function_specs(left, output_column_index, specs);
597 Self::collect_window_function_specs(right, output_column_index, specs);
598 }
599 SqlExpression::Not { expr } => {
600 Self::collect_window_function_specs(expr, output_column_index, specs);
601 }
602 SqlExpression::FunctionCall { args, .. } => {
603 for arg in args {
604 Self::collect_window_function_specs(arg, output_column_index, specs);
605 }
606 }
607 SqlExpression::CaseExpression {
608 when_branches,
609 else_branch,
610 } => {
611 for branch in when_branches {
612 Self::collect_window_function_specs(
613 &branch.condition,
614 output_column_index,
615 specs,
616 );
617 Self::collect_window_function_specs(&branch.result, output_column_index, specs);
618 }
619 if let Some(e) = else_branch {
620 Self::collect_window_function_specs(e, output_column_index, specs);
621 }
622 }
623 SqlExpression::SimpleCaseExpression {
624 expr,
625 when_branches,
626 else_branch,
627 } => {
628 Self::collect_window_function_specs(expr, output_column_index, specs);
629 for branch in when_branches {
630 Self::collect_window_function_specs(&branch.value, output_column_index, specs);
631 Self::collect_window_function_specs(&branch.result, output_column_index, specs);
632 }
633 if let Some(e) = else_branch {
634 Self::collect_window_function_specs(e, output_column_index, specs);
635 }
636 }
637 SqlExpression::InList { expr, values } => {
638 Self::collect_window_function_specs(expr, output_column_index, specs);
639 for val in values {
640 Self::collect_window_function_specs(val, output_column_index, specs);
641 }
642 }
643 SqlExpression::NotInList { expr, values } => {
644 Self::collect_window_function_specs(expr, output_column_index, specs);
645 for val in values {
646 Self::collect_window_function_specs(val, output_column_index, specs);
647 }
648 }
649 SqlExpression::Between { expr, lower, upper } => {
650 Self::collect_window_function_specs(expr, output_column_index, specs);
651 Self::collect_window_function_specs(lower, output_column_index, specs);
652 Self::collect_window_function_specs(upper, output_column_index, specs);
653 }
654 SqlExpression::InSubquery { expr, .. } => {
655 Self::collect_window_function_specs(expr, output_column_index, specs);
656 }
657 SqlExpression::NotInSubquery { expr, .. } => {
658 Self::collect_window_function_specs(expr, output_column_index, specs);
659 }
660 SqlExpression::MethodCall { args, .. } => {
661 for arg in args {
662 Self::collect_window_function_specs(arg, output_column_index, specs);
663 }
664 }
665 SqlExpression::ChainedMethodCall { base, args, .. } => {
666 Self::collect_window_function_specs(base, output_column_index, specs);
667 for arg in args {
668 Self::collect_window_function_specs(arg, output_column_index, specs);
669 }
670 }
671 _ => {} }
673 }
674
675 pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
677 let (view, _plan) = self.execute_with_plan(table, sql)?;
678 Ok(view)
679 }
680
681 pub fn execute_with_temp_tables(
683 &self,
684 table: Arc<DataTable>,
685 sql: &str,
686 temp_tables: Option<&TempTableRegistry>,
687 ) -> Result<DataView> {
688 let (view, _plan) = self.execute_with_plan_and_temp_tables(table, sql, temp_tables)?;
689 Ok(view)
690 }
691
692 pub fn execute_statement(
694 &self,
695 table: Arc<DataTable>,
696 statement: SelectStatement,
697 ) -> Result<DataView> {
698 self.execute_statement_with_temp_tables(table, statement, None)
699 }
700
701 pub fn execute_statement_with_temp_tables(
703 &self,
704 table: Arc<DataTable>,
705 statement: SelectStatement,
706 temp_tables: Option<&TempTableRegistry>,
707 ) -> Result<DataView> {
708 let mut cte_context = HashMap::new();
710
711 if let Some(temp_registry) = temp_tables {
713 for table_name in temp_registry.list_tables() {
714 if let Some(temp_table) = temp_registry.get(&table_name) {
715 debug!("Adding temp table {} to CTE context", table_name);
716 let view = DataView::new(temp_table);
717 cte_context.insert(table_name, Arc::new(view));
718 }
719 }
720 }
721
722 for cte in &statement.ctes {
723 debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
724 let cte_result = match &cte.cte_type {
726 CTEType::Standard(query) => {
727 let view = self.build_view_with_context(
729 table.clone(),
730 query.clone(),
731 &mut cte_context,
732 )?;
733
734 let mut materialized = self.materialize_view(view)?;
736
737 for column in materialized.columns_mut() {
739 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
740 column.source_table = Some(cte.name.clone());
741 }
742
743 DataView::new(Arc::new(materialized))
744 }
745 CTEType::Web(web_spec) => {
746 use crate::web::http_fetcher::WebDataFetcher;
748
749 let fetcher = WebDataFetcher::new()?;
750 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
752
753 for column in data_table.columns_mut() {
755 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
756 column.source_table = Some(cte.name.clone());
757 }
758
759 DataView::new(Arc::new(data_table))
761 }
762 CTEType::File(file_spec) => {
763 let mut data_table =
764 crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
765
766 for column in data_table.columns_mut() {
767 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
768 column.source_table = Some(cte.name.clone());
769 }
770
771 DataView::new(Arc::new(data_table))
772 }
773 };
774 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
776 debug!(
777 "QueryEngine: CTE '{}' pre-processed, stored in context",
778 cte.name
779 );
780 }
781
782 let mut subquery_executor =
784 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
785 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
786
787 self.build_view_with_context(table, processed_statement, &mut cte_context)
789 }
790
791 pub fn execute_statement_with_cte_context(
793 &self,
794 table: Arc<DataTable>,
795 statement: SelectStatement,
796 cte_context: &HashMap<String, Arc<DataView>>,
797 ) -> Result<DataView> {
798 let mut local_context = cte_context.clone();
800
801 for cte in &statement.ctes {
803 debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
804 let cte_result = match &cte.cte_type {
805 CTEType::Standard(query) => {
806 let view = self.build_view_with_context(
807 table.clone(),
808 query.clone(),
809 &mut local_context,
810 )?;
811
812 let mut materialized = self.materialize_view(view)?;
814
815 for column in materialized.columns_mut() {
817 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
818 column.source_table = Some(cte.name.clone());
819 }
820
821 DataView::new(Arc::new(materialized))
822 }
823 CTEType::Web(web_spec) => {
824 use crate::web::http_fetcher::WebDataFetcher;
826
827 let fetcher = WebDataFetcher::new()?;
828 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
830
831 for column in data_table.columns_mut() {
833 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
834 column.source_table = Some(cte.name.clone());
835 }
836
837 DataView::new(Arc::new(data_table))
839 }
840 CTEType::File(file_spec) => {
841 let mut data_table =
842 crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
843
844 for column in data_table.columns_mut() {
845 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
846 column.source_table = Some(cte.name.clone());
847 }
848
849 DataView::new(Arc::new(data_table))
850 }
851 };
852 local_context.insert(cte.name.clone(), Arc::new(cte_result));
853 }
854
855 let mut subquery_executor =
857 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
858 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
859
860 self.build_view_with_context(table, processed_statement, &mut local_context)
862 }
863
864 pub fn execute_with_plan(
866 &self,
867 table: Arc<DataTable>,
868 sql: &str,
869 ) -> Result<(DataView, ExecutionPlan)> {
870 self.execute_with_plan_and_temp_tables(table, sql, None)
871 }
872
873 pub fn execute_with_plan_and_temp_tables(
875 &self,
876 table: Arc<DataTable>,
877 sql: &str,
878 temp_tables: Option<&TempTableRegistry>,
879 ) -> Result<(DataView, ExecutionPlan)> {
880 let mut plan_builder = ExecutionPlanBuilder::new();
881 let start_time = Instant::now();
882
883 plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
885 plan_builder.add_detail(format!("Query: {}", sql));
886 let mut parser = Parser::new(sql);
887 let statement = parser
888 .parse()
889 .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
890 plan_builder.add_detail(format!("Parsed successfully"));
891 if let Some(ref from_source) = statement.from_source {
892 match from_source {
893 TableSource::Table(name) => {
894 plan_builder.add_detail(format!("FROM: {}", name));
895 }
896 TableSource::DerivedTable { alias, .. } => {
897 plan_builder.add_detail(format!("FROM: derived table (alias: {})", alias));
898 }
899 TableSource::Pivot { .. } => {
900 plan_builder.add_detail("FROM: PIVOT".to_string());
901 }
902 }
903 }
904 if statement.where_clause.is_some() {
905 plan_builder.add_detail("WHERE clause present".to_string());
906 }
907 plan_builder.end_step();
908
909 let mut cte_context = HashMap::new();
911
912 if let Some(temp_registry) = temp_tables {
914 for table_name in temp_registry.list_tables() {
915 if let Some(temp_table) = temp_registry.get(&table_name) {
916 debug!("Adding temp table {} to CTE context", table_name);
917 let view = DataView::new(temp_table);
918 cte_context.insert(table_name, Arc::new(view));
919 }
920 }
921 }
922
923 if !statement.ctes.is_empty() {
924 plan_builder.begin_step(
925 StepType::CTE,
926 format!("Process {} CTEs", statement.ctes.len()),
927 );
928
929 for cte in &statement.ctes {
930 let cte_start = Instant::now();
931 plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
932
933 let cte_result = match &cte.cte_type {
934 CTEType::Standard(query) => {
935 if let Some(ref from_source) = query.from_source {
937 match from_source {
938 TableSource::Table(name) => {
939 plan_builder.add_detail(format!("Source: {}", name));
940 }
941 TableSource::DerivedTable { alias, .. } => {
942 plan_builder
943 .add_detail(format!("Source: derived table ({})", alias));
944 }
945 TableSource::Pivot { .. } => {
946 plan_builder.add_detail("Source: PIVOT".to_string());
947 }
948 }
949 }
950 if query.where_clause.is_some() {
951 plan_builder.add_detail("Has WHERE clause".to_string());
952 }
953 if query.group_by.is_some() {
954 plan_builder.add_detail("Has GROUP BY".to_string());
955 }
956
957 debug!(
958 "QueryEngine: Processing CTE '{}' with existing context: {:?}",
959 cte.name,
960 cte_context.keys().collect::<Vec<_>>()
961 );
962
963 let mut subquery_executor = SubqueryExecutor::with_cte_context(
966 self.clone(),
967 table.clone(),
968 cte_context.clone(),
969 );
970 let processed_query = subquery_executor.execute_subqueries(query)?;
971
972 let view = self.build_view_with_context(
973 table.clone(),
974 processed_query,
975 &mut cte_context,
976 )?;
977
978 let mut materialized = self.materialize_view(view)?;
980
981 for column in materialized.columns_mut() {
983 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
984 column.source_table = Some(cte.name.clone());
985 }
986
987 DataView::new(Arc::new(materialized))
988 }
989 CTEType::Web(web_spec) => {
990 plan_builder.add_detail(format!("URL: {}", web_spec.url));
991 if let Some(format) = &web_spec.format {
992 plan_builder.add_detail(format!("Format: {:?}", format));
993 }
994 if let Some(cache) = web_spec.cache_seconds {
995 plan_builder.add_detail(format!("Cache: {} seconds", cache));
996 }
997
998 use crate::web::http_fetcher::WebDataFetcher;
1000
1001 let fetcher = WebDataFetcher::new()?;
1002 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
1004
1005 for column in data_table.columns_mut() {
1007 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1008 column.source_table = Some(cte.name.clone());
1009 }
1010
1011 DataView::new(Arc::new(data_table))
1013 }
1014 CTEType::File(file_spec) => {
1015 plan_builder.add_detail(format!("PATH: {}", file_spec.path));
1016 if file_spec.recursive {
1017 plan_builder.add_detail("RECURSIVE".to_string());
1018 }
1019 if let Some(ref g) = file_spec.glob {
1020 plan_builder.add_detail(format!("GLOB: {}", g));
1021 }
1022 if let Some(d) = file_spec.max_depth {
1023 plan_builder.add_detail(format!("MAX_DEPTH: {}", d));
1024 }
1025
1026 let mut data_table =
1027 crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
1028
1029 for column in data_table.columns_mut() {
1030 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1031 column.source_table = Some(cte.name.clone());
1032 }
1033
1034 DataView::new(Arc::new(data_table))
1035 }
1036 };
1037
1038 plan_builder.set_rows_out(cte_result.row_count());
1040 plan_builder.add_detail(format!(
1041 "Result: {} rows, {} columns",
1042 cte_result.row_count(),
1043 cte_result.column_count()
1044 ));
1045 plan_builder.add_detail(format!(
1046 "Execution time: {:.3}ms",
1047 cte_start.elapsed().as_secs_f64() * 1000.0
1048 ));
1049
1050 debug!(
1051 "QueryEngine: Storing CTE '{}' in context with {} rows",
1052 cte.name,
1053 cte_result.row_count()
1054 );
1055 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1056 plan_builder.end_step();
1057 }
1058
1059 plan_builder.add_detail(format!(
1060 "All {} CTEs cached in context",
1061 statement.ctes.len()
1062 ));
1063 plan_builder.end_step();
1064 }
1065
1066 plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
1068 let mut subquery_executor =
1069 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
1070
1071 let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
1073 format!("{:?}", w).contains("Subquery")
1075 });
1076
1077 if has_subqueries {
1078 plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
1079 }
1080
1081 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
1082
1083 if has_subqueries {
1084 plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
1085 } else {
1086 plan_builder.add_detail("No subqueries to process".to_string());
1087 }
1088
1089 plan_builder.end_step();
1090 let result = self.build_view_with_context_and_plan(
1091 table,
1092 processed_statement,
1093 &mut cte_context,
1094 &mut plan_builder,
1095 )?;
1096
1097 let total_duration = start_time.elapsed();
1098 info!(
1099 "Query execution complete: total={:?}, rows={}",
1100 total_duration,
1101 result.row_count()
1102 );
1103
1104 let plan = plan_builder.build();
1105 Ok((result, plan))
1106 }
1107
1108 fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
1110 let mut cte_context = HashMap::new();
1111 self.build_view_with_context(table, statement, &mut cte_context)
1112 }
1113
1114 fn build_view_with_context(
1116 &self,
1117 table: Arc<DataTable>,
1118 statement: SelectStatement,
1119 cte_context: &mut HashMap<String, Arc<DataView>>,
1120 ) -> Result<DataView> {
1121 let mut dummy_plan = ExecutionPlanBuilder::new();
1122 let mut exec_context = ExecutionContext::new();
1123 self.build_view_with_context_and_plan_and_exec(
1124 table,
1125 statement,
1126 cte_context,
1127 &mut dummy_plan,
1128 &mut exec_context,
1129 )
1130 }
1131
1132 fn build_view_with_context_and_plan(
1134 &self,
1135 table: Arc<DataTable>,
1136 statement: SelectStatement,
1137 cte_context: &mut HashMap<String, Arc<DataView>>,
1138 plan: &mut ExecutionPlanBuilder,
1139 ) -> Result<DataView> {
1140 let mut exec_context = ExecutionContext::new();
1141 self.build_view_with_context_and_plan_and_exec(
1142 table,
1143 statement,
1144 cte_context,
1145 plan,
1146 &mut exec_context,
1147 )
1148 }
1149
1150 fn build_view_with_context_and_plan_and_exec(
1152 &self,
1153 table: Arc<DataTable>,
1154 statement: SelectStatement,
1155 cte_context: &mut HashMap<String, Arc<DataView>>,
1156 plan: &mut ExecutionPlanBuilder,
1157 exec_context: &mut ExecutionContext,
1158 ) -> Result<DataView> {
1159 for cte in &statement.ctes {
1161 if cte_context.contains_key(&cte.name) {
1163 debug!(
1164 "QueryEngine: CTE '{}' already in context, skipping",
1165 cte.name
1166 );
1167 continue;
1168 }
1169
1170 debug!("QueryEngine: Processing CTE '{}'...", cte.name);
1171 debug!(
1172 "QueryEngine: Available CTEs for '{}': {:?}",
1173 cte.name,
1174 cte_context.keys().collect::<Vec<_>>()
1175 );
1176
1177 let cte_result = match &cte.cte_type {
1179 CTEType::Standard(query) => {
1180 let view =
1181 self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
1182
1183 let mut materialized = self.materialize_view(view)?;
1185
1186 for column in materialized.columns_mut() {
1188 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1189 column.source_table = Some(cte.name.clone());
1190 }
1191
1192 DataView::new(Arc::new(materialized))
1193 }
1194 CTEType::Web(_web_spec) => {
1195 return Err(anyhow!(
1197 "Web CTEs should be processed in execute_select method"
1198 ));
1199 }
1200 CTEType::File(_file_spec) => {
1201 return Err(anyhow!(
1203 "FILE CTEs should be processed in execute_select method"
1204 ));
1205 }
1206 };
1207
1208 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1210 debug!(
1211 "QueryEngine: CTE '{}' processed, stored in context",
1212 cte.name
1213 );
1214 }
1215
1216 let source_table = if let Some(ref from_source) = statement.from_source {
1218 match from_source {
1219 TableSource::Table(table_name) => {
1220 if let Some(cte_view) = resolve_cte(cte_context, table_name) {
1222 debug!("QueryEngine: Using CTE '{}' as source table", table_name);
1223 let mut materialized = self.materialize_view((**cte_view).clone())?;
1225
1226 #[allow(deprecated)]
1228 if let Some(ref alias) = statement.from_alias {
1229 debug!(
1230 "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1231 alias, table_name
1232 );
1233 for column in materialized.columns_mut() {
1234 if let Some(ref qualified_name) = column.qualified_name {
1236 if qualified_name.starts_with(&format!("{}.", table_name)) {
1237 column.qualified_name = Some(qualified_name.replace(
1238 &format!("{}.", table_name),
1239 &format!("{}.", alias),
1240 ));
1241 }
1242 }
1243 if column.source_table.as_ref() == Some(table_name) {
1245 column.source_table = Some(alias.clone());
1246 }
1247 }
1248 }
1249
1250 Arc::new(materialized)
1251 } else {
1252 table.clone()
1254 }
1255 }
1256 TableSource::DerivedTable { query, alias } => {
1257 debug!(
1259 "QueryEngine: Processing FROM derived table (alias: {})",
1260 alias
1261 );
1262 let subquery_result =
1263 self.build_view_with_context(table.clone(), *query.clone(), cte_context)?;
1264
1265 let mut materialized = self.materialize_view(subquery_result)?;
1268
1269 for column in materialized.columns_mut() {
1273 column.source_table = Some(alias.clone());
1274 }
1275
1276 Arc::new(materialized)
1277 }
1278 TableSource::Pivot { .. } => {
1279 return Err(anyhow!(
1281 "PIVOT in FROM clause should have been expanded by preprocessing pipeline"
1282 ));
1283 }
1284 }
1285 } else {
1286 #[allow(deprecated)]
1288 if let Some(ref table_func) = statement.from_function {
1289 debug!("QueryEngine: Processing table function (deprecated field)...");
1291 match table_func {
1292 TableFunction::Generator { name, args } => {
1293 use crate::sql::generators::GeneratorRegistry;
1295
1296 let registry = GeneratorRegistry::new();
1298
1299 if let Some(generator) = registry.get(name) {
1300 let mut evaluator = ArithmeticEvaluator::with_date_notation(
1302 &table,
1303 self.date_notation.clone(),
1304 );
1305 let dummy_row = 0;
1306
1307 let mut evaluated_args = Vec::new();
1308 for arg in args {
1309 evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
1310 }
1311
1312 generator.generate(evaluated_args)?
1314 } else {
1315 return Err(anyhow!("Unknown generator function: {}", name));
1316 }
1317 }
1318 }
1319 } else {
1320 #[allow(deprecated)]
1321 if let Some(ref subquery) = statement.from_subquery {
1322 debug!("QueryEngine: Processing FROM subquery (deprecated field)...");
1324 let subquery_result = self.build_view_with_context(
1325 table.clone(),
1326 *subquery.clone(),
1327 cte_context,
1328 )?;
1329
1330 let materialized = self.materialize_view(subquery_result)?;
1333 Arc::new(materialized)
1334 } else {
1335 #[allow(deprecated)]
1336 if let Some(ref table_name) = statement.from_table {
1337 if let Some(cte_view) = resolve_cte(cte_context, table_name) {
1339 debug!(
1340 "QueryEngine: Using CTE '{}' as source table (deprecated field)",
1341 table_name
1342 );
1343 let mut materialized = self.materialize_view((**cte_view).clone())?;
1345
1346 #[allow(deprecated)]
1348 if let Some(ref alias) = statement.from_alias {
1349 debug!(
1350 "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1351 alias, table_name
1352 );
1353 for column in materialized.columns_mut() {
1354 if let Some(ref qualified_name) = column.qualified_name {
1356 if qualified_name.starts_with(&format!("{}.", table_name)) {
1357 column.qualified_name = Some(qualified_name.replace(
1358 &format!("{}.", table_name),
1359 &format!("{}.", alias),
1360 ));
1361 }
1362 }
1363 if column.source_table.as_ref() == Some(table_name) {
1365 column.source_table = Some(alias.clone());
1366 }
1367 }
1368 }
1369
1370 Arc::new(materialized)
1371 } else {
1372 table.clone()
1374 }
1375 } else {
1376 table.clone()
1378 }
1379 }
1380 }
1381 };
1382
1383 #[allow(deprecated)]
1385 if let Some(ref alias) = statement.from_alias {
1386 #[allow(deprecated)]
1387 if let Some(ref table_name) = statement.from_table {
1388 exec_context.register_alias(alias.clone(), table_name.clone());
1389 }
1390 }
1391
1392 let final_table = if !statement.joins.is_empty() {
1394 plan.begin_step(
1395 StepType::Join,
1396 format!("Process {} JOINs", statement.joins.len()),
1397 );
1398 plan.set_rows_in(source_table.row_count());
1399
1400 let join_executor = HashJoinExecutor::new(self.case_insensitive);
1401 let mut current_table = source_table;
1402
1403 for (idx, join_clause) in statement.joins.iter().enumerate() {
1404 let join_start = Instant::now();
1405 plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
1406 plan.add_detail(format!("Type: {:?}", join_clause.join_type));
1407 plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
1408 plan.add_detail(format!(
1409 "Executing {:?} JOIN on {} condition(s)",
1410 join_clause.join_type,
1411 join_clause.condition.conditions.len()
1412 ));
1413
1414 let right_table = match &join_clause.table {
1416 TableSource::Table(name) => {
1417 if let Some(cte_view) = resolve_cte(cte_context, name) {
1419 let mut materialized = self.materialize_view((**cte_view).clone())?;
1420
1421 if let Some(ref alias) = join_clause.alias {
1423 debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
1424 for column in materialized.columns_mut() {
1425 if let Some(ref qualified_name) = column.qualified_name {
1427 if qualified_name.starts_with(&format!("{}.", name)) {
1428 column.qualified_name = Some(qualified_name.replace(
1429 &format!("{}.", name),
1430 &format!("{}.", alias),
1431 ));
1432 }
1433 }
1434 if column.source_table.as_ref() == Some(name) {
1436 column.source_table = Some(alias.clone());
1437 }
1438 }
1439 }
1440
1441 Arc::new(materialized)
1442 } else {
1443 return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
1446 }
1447 }
1448 TableSource::DerivedTable { query, alias: _ } => {
1449 let subquery_result = self.build_view_with_context(
1451 table.clone(),
1452 *query.clone(),
1453 cte_context,
1454 )?;
1455 let materialized = self.materialize_view(subquery_result)?;
1456 Arc::new(materialized)
1457 }
1458 TableSource::Pivot { .. } => {
1459 return Err(anyhow!("PIVOT in JOIN clause is not yet supported"));
1461 }
1462 };
1463
1464 let joined = join_executor.execute_join(
1466 current_table.clone(),
1467 join_clause,
1468 right_table.clone(),
1469 )?;
1470
1471 plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
1472 plan.set_rows_out(joined.row_count());
1473 plan.add_detail(format!("Result: {} rows", joined.row_count()));
1474 plan.add_detail(format!(
1475 "Join time: {:.3}ms",
1476 join_start.elapsed().as_secs_f64() * 1000.0
1477 ));
1478 plan.end_step();
1479
1480 current_table = Arc::new(joined);
1481 }
1482
1483 plan.set_rows_out(current_table.row_count());
1484 plan.add_detail(format!(
1485 "Final result after all joins: {} rows",
1486 current_table.row_count()
1487 ));
1488 plan.end_step();
1489 current_table
1490 } else {
1491 source_table
1492 };
1493
1494 self.build_view_internal_with_plan_and_exec(
1496 final_table,
1497 statement,
1498 plan,
1499 Some(exec_context),
1500 )
1501 }
1502
1503 pub fn materialize_view(&self, view: DataView) -> Result<DataTable> {
1505 let source = view.source();
1506 let mut result_table = DataTable::new("derived");
1507
1508 let visible_cols = view.visible_column_indices().to_vec();
1510
1511 for col_idx in &visible_cols {
1513 let col = &source.columns[*col_idx];
1514 let new_col = DataColumn {
1515 name: col.name.clone(),
1516 data_type: col.data_type.clone(),
1517 nullable: col.nullable,
1518 unique_values: col.unique_values,
1519 null_count: col.null_count,
1520 metadata: col.metadata.clone(),
1521 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), };
1524 result_table.add_column(new_col);
1525 }
1526
1527 for row_idx in view.visible_row_indices() {
1529 let source_row = &source.rows[*row_idx];
1530 let mut new_row = DataRow { values: Vec::new() };
1531
1532 for col_idx in &visible_cols {
1533 new_row.values.push(source_row.values[*col_idx].clone());
1534 }
1535
1536 result_table.add_row(new_row);
1537 }
1538
1539 Ok(result_table)
1540 }
1541
1542 fn build_view_internal(
1543 &self,
1544 table: Arc<DataTable>,
1545 statement: SelectStatement,
1546 ) -> Result<DataView> {
1547 let mut dummy_plan = ExecutionPlanBuilder::new();
1548 self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
1549 }
1550
1551 fn build_view_internal_with_plan(
1552 &self,
1553 table: Arc<DataTable>,
1554 statement: SelectStatement,
1555 plan: &mut ExecutionPlanBuilder,
1556 ) -> Result<DataView> {
1557 self.build_view_internal_with_plan_and_exec(table, statement, plan, None)
1558 }
1559
1560 fn build_view_internal_with_plan_and_exec(
1561 &self,
1562 table: Arc<DataTable>,
1563 statement: SelectStatement,
1564 plan: &mut ExecutionPlanBuilder,
1565 exec_context: Option<&ExecutionContext>,
1566 ) -> Result<DataView> {
1567 debug!(
1568 "QueryEngine::build_view - select_items: {:?}",
1569 statement.select_items
1570 );
1571 debug!(
1572 "QueryEngine::build_view - where_clause: {:?}",
1573 statement.where_clause
1574 );
1575
1576 let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
1578
1579 if let Some(where_clause) = &statement.where_clause {
1581 let total_rows = table.row_count();
1582 debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
1583 debug!("QueryEngine: WHERE clause = {:?}", where_clause);
1584
1585 plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
1586 plan.set_rows_in(total_rows);
1587 plan.add_detail(format!("Input: {} rows", total_rows));
1588
1589 for condition in &where_clause.conditions {
1591 plan.add_detail(format!("Condition: {:?}", condition.expr));
1592 }
1593
1594 let filter_start = Instant::now();
1595 let mut eval_context = EvaluationContext::new(self.case_insensitive);
1597
1598 let mut evaluator = if let Some(exec_ctx) = exec_context {
1600 RecursiveWhereEvaluator::with_both_contexts(&table, &mut eval_context, exec_ctx)
1602 } else {
1603 RecursiveWhereEvaluator::with_context(&table, &mut eval_context)
1604 };
1605
1606 let mut filtered_rows = Vec::new();
1608 for row_idx in visible_rows {
1609 if row_idx < 3 {
1611 debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
1612 }
1613
1614 match evaluator.evaluate(where_clause, row_idx) {
1615 Ok(result) => {
1616 if row_idx < 3 {
1617 debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
1618 }
1619 if result {
1620 filtered_rows.push(row_idx);
1621 }
1622 }
1623 Err(e) => {
1624 if row_idx < 3 {
1625 debug!(
1626 "QueryEngine: WHERE evaluation error for row {}: {}",
1627 row_idx, e
1628 );
1629 }
1630 return Err(e);
1632 }
1633 }
1634 }
1635
1636 let (compilations, cache_hits) = eval_context.get_stats();
1638 if compilations > 0 || cache_hits > 0 {
1639 debug!(
1640 "LIKE pattern cache: {} compilations, {} cache hits",
1641 compilations, cache_hits
1642 );
1643 }
1644 visible_rows = filtered_rows;
1645 let filter_duration = filter_start.elapsed();
1646 info!(
1647 "WHERE clause filtering: {} rows -> {} rows in {:?}",
1648 total_rows,
1649 visible_rows.len(),
1650 filter_duration
1651 );
1652
1653 plan.set_rows_out(visible_rows.len());
1654 plan.add_detail(format!("Output: {} rows", visible_rows.len()));
1655 plan.add_detail(format!(
1656 "Filter time: {:.3}ms",
1657 filter_duration.as_secs_f64() * 1000.0
1658 ));
1659 plan.end_step();
1660 }
1661
1662 let mut view = DataView::new(table.clone());
1664 view = view.with_rows(visible_rows);
1665
1666 if let Some(group_by_exprs) = &statement.group_by {
1668 if !group_by_exprs.is_empty() {
1669 debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
1670
1671 plan.begin_step(
1672 StepType::GroupBy,
1673 format!("GROUP BY {} expressions", group_by_exprs.len()),
1674 );
1675 plan.set_rows_in(view.row_count());
1676 plan.add_detail(format!("Input: {} rows", view.row_count()));
1677 for expr in group_by_exprs {
1678 plan.add_detail(format!("Group by: {:?}", expr));
1679 }
1680
1681 let group_start = Instant::now();
1682 view = self.apply_group_by(
1683 view,
1684 group_by_exprs,
1685 &statement.select_items,
1686 statement.having.as_ref(),
1687 plan,
1688 )?;
1689
1690 use crate::query_plan::having_alias_transformer::HIDDEN_AGG_PREFIX;
1693 let hidden_indices: Vec<usize> = view
1694 .source()
1695 .columns
1696 .iter()
1697 .enumerate()
1698 .filter_map(|(i, c)| {
1699 if c.name.starts_with(HIDDEN_AGG_PREFIX) {
1700 Some(i)
1701 } else {
1702 None
1703 }
1704 })
1705 .collect();
1706 for &idx in hidden_indices.iter().rev() {
1707 view.hide_column(idx);
1708 }
1709
1710 plan.set_rows_out(view.row_count());
1711 plan.add_detail(format!("Output: {} groups", view.row_count()));
1712 plan.add_detail(format!(
1713 "Overall time: {:.3}ms",
1714 group_start.elapsed().as_secs_f64() * 1000.0
1715 ));
1716 plan.end_step();
1717 }
1718 } else {
1719 if !statement.select_items.is_empty() {
1721 let has_non_star_items = statement
1723 .select_items
1724 .iter()
1725 .any(|item| !matches!(item, SelectItem::Star { .. }));
1726
1727 if has_non_star_items || statement.select_items.len() > 1 {
1731 view = self.apply_select_items(
1732 view,
1733 &statement.select_items,
1734 &statement,
1735 exec_context,
1736 plan,
1737 )?;
1738 }
1739 } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
1741 debug!("QueryEngine: Using legacy columns path");
1742 let source_table = view.source();
1745 let column_indices =
1746 self.resolve_column_indices(source_table, &statement.columns)?;
1747 view = view.with_columns(column_indices);
1748 }
1749 }
1750
1751 if statement.distinct {
1753 plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
1754 plan.set_rows_in(view.row_count());
1755 plan.add_detail(format!("Input: {} rows", view.row_count()));
1756
1757 let distinct_start = Instant::now();
1758 view = self.apply_distinct(view)?;
1759
1760 plan.set_rows_out(view.row_count());
1761 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1762 plan.add_detail(format!(
1763 "Distinct time: {:.3}ms",
1764 distinct_start.elapsed().as_secs_f64() * 1000.0
1765 ));
1766 plan.end_step();
1767 }
1768
1769 if let Some(order_by_columns) = &statement.order_by {
1771 if !order_by_columns.is_empty() {
1772 plan.begin_step(
1773 StepType::Sort,
1774 format!("ORDER BY {} columns", order_by_columns.len()),
1775 );
1776 plan.set_rows_in(view.row_count());
1777 for col in order_by_columns {
1778 let expr_str = match &col.expr {
1780 SqlExpression::Column(col_ref) => col_ref.name.clone(),
1781 _ => "expr".to_string(),
1782 };
1783 plan.add_detail(format!("{} {:?}", expr_str, col.direction));
1784 }
1785
1786 let sort_start = Instant::now();
1787 view =
1788 self.apply_multi_order_by_with_context(view, order_by_columns, exec_context)?;
1789
1790 plan.add_detail(format!(
1791 "Sort time: {:.3}ms",
1792 sort_start.elapsed().as_secs_f64() * 1000.0
1793 ));
1794 plan.end_step();
1795 }
1796 }
1797
1798 if let Some(limit) = statement.limit {
1800 let offset = statement.offset.unwrap_or(0);
1801 plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1802 plan.set_rows_in(view.row_count());
1803 if offset > 0 {
1804 plan.add_detail(format!("OFFSET: {}", offset));
1805 }
1806 view = view.with_limit(limit, offset);
1807 plan.set_rows_out(view.row_count());
1808 plan.add_detail(format!("Output: {} rows", view.row_count()));
1809 plan.end_step();
1810 }
1811
1812 if !statement.set_operations.is_empty() {
1814 plan.begin_step(
1815 StepType::SetOperation,
1816 format!("Process {} set operations", statement.set_operations.len()),
1817 );
1818 plan.set_rows_in(view.row_count());
1819
1820 let mut combined_table = self.materialize_view(view)?;
1822 let first_columns = combined_table.column_names();
1823 let first_column_count = first_columns.len();
1824
1825 let mut needs_deduplication = false;
1827
1828 for (idx, (operation, next_statement)) in statement.set_operations.iter().enumerate() {
1830 let op_start = Instant::now();
1831 plan.begin_step(
1832 StepType::SetOperation,
1833 format!("{:?} operation #{}", operation, idx + 1),
1834 );
1835
1836 let next_view = if let Some(exec_ctx) = exec_context {
1839 self.build_view_internal_with_plan_and_exec(
1840 table.clone(),
1841 *next_statement.clone(),
1842 plan,
1843 Some(exec_ctx),
1844 )?
1845 } else {
1846 self.build_view_internal_with_plan(
1847 table.clone(),
1848 *next_statement.clone(),
1849 plan,
1850 )?
1851 };
1852
1853 let next_table = self.materialize_view(next_view)?;
1855 let next_columns = next_table.column_names();
1856 let next_column_count = next_columns.len();
1857
1858 if first_column_count != next_column_count {
1860 return Err(anyhow!(
1861 "UNION queries must have the same number of columns: first query has {} columns, but query #{} has {} columns",
1862 first_column_count,
1863 idx + 2,
1864 next_column_count
1865 ));
1866 }
1867
1868 for (col_idx, (first_col, next_col)) in
1870 first_columns.iter().zip(next_columns.iter()).enumerate()
1871 {
1872 if !first_col.eq_ignore_ascii_case(next_col) {
1873 debug!(
1874 "UNION column name mismatch at position {}: '{}' vs '{}' (using first query's name)",
1875 col_idx + 1,
1876 first_col,
1877 next_col
1878 );
1879 }
1880 }
1881
1882 plan.add_detail(format!("Left: {} rows", combined_table.row_count()));
1883 plan.add_detail(format!("Right: {} rows", next_table.row_count()));
1884
1885 match operation {
1887 SetOperation::UnionAll => {
1888 for row in next_table.rows.iter() {
1890 combined_table.add_row(row.clone());
1891 }
1892 plan.add_detail(format!(
1893 "Result: {} rows (no deduplication)",
1894 combined_table.row_count()
1895 ));
1896 }
1897 SetOperation::Union => {
1898 for row in next_table.rows.iter() {
1900 combined_table.add_row(row.clone());
1901 }
1902 needs_deduplication = true;
1903 plan.add_detail(format!(
1904 "Combined: {} rows (deduplication pending)",
1905 combined_table.row_count()
1906 ));
1907 }
1908 SetOperation::Intersect => {
1909 return Err(anyhow!("INTERSECT is not yet implemented"));
1912 }
1913 SetOperation::Except => {
1914 return Err(anyhow!("EXCEPT is not yet implemented"));
1917 }
1918 }
1919
1920 plan.add_detail(format!(
1921 "Operation time: {:.3}ms",
1922 op_start.elapsed().as_secs_f64() * 1000.0
1923 ));
1924 plan.set_rows_out(combined_table.row_count());
1925 plan.end_step();
1926 }
1927
1928 plan.set_rows_out(combined_table.row_count());
1929 plan.add_detail(format!(
1930 "Combined result: {} rows after {} operations",
1931 combined_table.row_count(),
1932 statement.set_operations.len()
1933 ));
1934 plan.end_step();
1935
1936 view = DataView::new(Arc::new(combined_table));
1938
1939 if needs_deduplication {
1941 plan.begin_step(
1942 StepType::Distinct,
1943 "UNION deduplication - remove duplicate rows".to_string(),
1944 );
1945 plan.set_rows_in(view.row_count());
1946 plan.add_detail(format!("Input: {} rows", view.row_count()));
1947
1948 let distinct_start = Instant::now();
1949 view = self.apply_distinct(view)?;
1950
1951 plan.set_rows_out(view.row_count());
1952 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1953 plan.add_detail(format!(
1954 "Deduplication time: {:.3}ms",
1955 distinct_start.elapsed().as_secs_f64() * 1000.0
1956 ));
1957 plan.end_step();
1958 }
1959 }
1960
1961 Ok(view)
1962 }
1963
1964 fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1966 let mut indices = Vec::new();
1967 let table_columns = table.column_names();
1968
1969 for col_name in columns {
1970 let index = table_columns
1971 .iter()
1972 .position(|c| c.eq_ignore_ascii_case(col_name))
1973 .ok_or_else(|| {
1974 let suggestion = self.find_similar_column(table, col_name);
1975 match suggestion {
1976 Some(similar) => anyhow::anyhow!(
1977 "Column '{}' not found. Did you mean '{}'?",
1978 col_name,
1979 similar
1980 ),
1981 None => anyhow::anyhow!("Column '{}' not found", col_name),
1982 }
1983 })?;
1984 indices.push(index);
1985 }
1986
1987 Ok(indices)
1988 }
1989
1990 fn apply_select_items(
1992 &self,
1993 view: DataView,
1994 select_items: &[SelectItem],
1995 _statement: &SelectStatement,
1996 exec_context: Option<&ExecutionContext>,
1997 plan: &mut ExecutionPlanBuilder,
1998 ) -> Result<DataView> {
1999 debug!(
2000 "QueryEngine::apply_select_items - items: {:?}",
2001 select_items
2002 );
2003 debug!(
2004 "QueryEngine::apply_select_items - input view has {} rows",
2005 view.row_count()
2006 );
2007
2008 let has_window_functions = select_items.iter().any(|item| match item {
2010 SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
2011 _ => false,
2012 });
2013
2014 let window_func_count: usize = select_items
2016 .iter()
2017 .filter(|item| match item {
2018 SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
2019 _ => false,
2020 })
2021 .count();
2022
2023 let window_start = if has_window_functions {
2025 debug!(
2026 "QueryEngine::apply_select_items - detected {} window functions",
2027 window_func_count
2028 );
2029
2030 let window_specs = Self::extract_window_specs(select_items);
2032 debug!("Extracted {} window function specs", window_specs.len());
2033
2034 Some(Instant::now())
2035 } else {
2036 None
2037 };
2038
2039 let has_unnest = select_items.iter().any(|item| match item {
2041 SelectItem::Expression { expr, .. } => Self::contains_unnest(expr),
2042 _ => false,
2043 });
2044
2045 if has_unnest {
2046 debug!("QueryEngine::apply_select_items - UNNEST detected, using row expansion");
2047 return self.apply_select_with_row_expansion(view, select_items);
2048 }
2049
2050 let has_aggregates = select_items.iter().any(|item| match item {
2054 SelectItem::Expression { expr, .. } => contains_aggregate(expr),
2055 SelectItem::Column { .. } => false,
2056 SelectItem::Star { .. } => false,
2057 SelectItem::StarExclude { .. } => false,
2058 });
2059
2060 let all_aggregate_compatible = select_items.iter().all(|item| match item {
2061 SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
2062 SelectItem::Column { .. } => false, SelectItem::Star { .. } => false, SelectItem::StarExclude { .. } => false, });
2066
2067 if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
2068 debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
2071 return self.apply_aggregate_select(view, select_items);
2072 }
2073
2074 let has_computed_expressions = select_items
2076 .iter()
2077 .any(|item| matches!(item, SelectItem::Expression { .. }));
2078
2079 debug!(
2080 "QueryEngine::apply_select_items - has_computed_expressions: {}",
2081 has_computed_expressions
2082 );
2083
2084 if !has_computed_expressions {
2085 let column_indices = self.resolve_select_columns(view.source(), select_items)?;
2087 return Ok(view.with_columns(column_indices));
2088 }
2089
2090 let source_table = view.source();
2095 let visible_rows = view.visible_row_indices();
2096
2097 let mut computed_table = DataTable::new("query_result");
2100
2101 let mut expanded_items = Vec::new();
2103 for item in select_items {
2104 match item {
2105 SelectItem::Star { table_prefix, .. } => {
2106 if let Some(prefix) = table_prefix {
2107 debug!("QueryEngine::apply_select_items - expanding {}.*", prefix);
2109 for col in &source_table.columns {
2110 if Self::column_matches_table(col, prefix) {
2111 expanded_items.push(SelectItem::Column {
2112 column: ColumnRef::unquoted(col.name.clone()),
2113 leading_comments: vec![],
2114 trailing_comment: None,
2115 });
2116 }
2117 }
2118 } else {
2119 debug!("QueryEngine::apply_select_items - expanding *");
2121 for col_name in source_table.column_names() {
2122 expanded_items.push(SelectItem::Column {
2123 column: ColumnRef::unquoted(col_name.to_string()),
2124 leading_comments: vec![],
2125 trailing_comment: None,
2126 });
2127 }
2128 }
2129 }
2130 _ => expanded_items.push(item.clone()),
2131 }
2132 }
2133
2134 let mut column_name_counts: std::collections::HashMap<String, usize> =
2136 std::collections::HashMap::new();
2137
2138 for item in &expanded_items {
2139 let base_name = match item {
2140 SelectItem::Column {
2141 column: col_ref, ..
2142 } => col_ref.name.clone(),
2143 SelectItem::Expression { alias, .. } => alias.clone(),
2144 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2145 SelectItem::StarExclude { .. } => {
2146 unreachable!("StarExclude should have been expanded")
2147 }
2148 };
2149
2150 let count = column_name_counts.entry(base_name.clone()).or_insert(0);
2152 let column_name = if *count == 0 {
2153 base_name.clone()
2155 } else {
2156 format!("{base_name}_{count}")
2158 };
2159 *count += 1;
2160
2161 computed_table.add_column(DataColumn::new(&column_name));
2162 }
2163
2164 let can_use_batch = expanded_items.iter().all(|item| {
2168 match item {
2169 SelectItem::Expression { expr, .. } => {
2170 matches!(expr, SqlExpression::WindowFunction { .. })
2173 || !Self::contains_window_function(expr)
2174 }
2175 _ => true, }
2177 });
2178
2179 let use_batch_evaluation = can_use_batch
2182 && std::env::var("SQL_CLI_BATCH_WINDOW")
2183 .map(|v| v != "0" && v.to_lowercase() != "false")
2184 .unwrap_or(true);
2185
2186 let batch_window_specs = if use_batch_evaluation && has_window_functions {
2188 debug!("BATCH window function evaluation flag is enabled");
2189 let specs = Self::extract_window_specs(&expanded_items);
2191 debug!(
2192 "Extracted {} window function specs for batch evaluation",
2193 specs.len()
2194 );
2195 Some(specs)
2196 } else {
2197 None
2198 };
2199
2200 let mut evaluator =
2202 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2203
2204 if let Some(exec_ctx) = exec_context {
2206 let aliases = exec_ctx.get_aliases();
2207 if !aliases.is_empty() {
2208 debug!(
2209 "Applying {} aliases to evaluator: {:?}",
2210 aliases.len(),
2211 aliases
2212 );
2213 evaluator = evaluator.with_table_aliases(aliases);
2214 }
2215 }
2216
2217 if has_window_functions {
2220 let preload_start = Instant::now();
2221
2222 let mut window_specs = Vec::new();
2224 for item in &expanded_items {
2225 if let SelectItem::Expression { expr, .. } = item {
2226 Self::collect_window_specs(expr, &mut window_specs);
2227 }
2228 }
2229
2230 for spec in &window_specs {
2232 let _ = evaluator.get_or_create_window_context(spec);
2233 }
2234
2235 debug!(
2236 "Pre-created {} WindowContext(s) in {:.2}ms",
2237 window_specs.len(),
2238 preload_start.elapsed().as_secs_f64() * 1000.0
2239 );
2240 }
2241
2242 if let Some(window_specs) = batch_window_specs {
2244 debug!("Starting batch window function evaluation");
2245 let batch_start = Instant::now();
2246
2247 let mut batch_results: Vec<Vec<DataValue>> =
2249 vec![vec![DataValue::Null; expanded_items.len()]; visible_rows.len()];
2250
2251 let detailed_window_specs = &window_specs;
2253
2254 let mut specs_by_window: HashMap<
2256 u64,
2257 Vec<&crate::data::batch_window_evaluator::WindowFunctionSpec>,
2258 > = HashMap::new();
2259 for spec in detailed_window_specs {
2260 let hash = spec.spec.compute_hash();
2261 specs_by_window
2262 .entry(hash)
2263 .or_insert_with(Vec::new)
2264 .push(spec);
2265 }
2266
2267 for (_window_hash, specs) in specs_by_window {
2269 let context = evaluator.get_or_create_window_context(&specs[0].spec)?;
2271
2272 for spec in specs {
2274 match spec.function_name.as_str() {
2275 "LAG" => {
2276 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2278 let column_name = col_ref.name.as_str();
2279 let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2280 spec.args.get(1)
2281 {
2282 n.parse::<i64>().unwrap_or(1)
2283 } else {
2284 1 };
2286
2287 let values = context.evaluate_lag_batch(
2288 visible_rows,
2289 column_name,
2290 offset,
2291 )?;
2292
2293 for (row_idx, value) in values.into_iter().enumerate() {
2295 batch_results[row_idx][spec.output_column_index] = value;
2296 }
2297 }
2298 }
2299 "LEAD" => {
2300 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2302 let column_name = col_ref.name.as_str();
2303 let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2304 spec.args.get(1)
2305 {
2306 n.parse::<i64>().unwrap_or(1)
2307 } else {
2308 1 };
2310
2311 let values = context.evaluate_lead_batch(
2312 visible_rows,
2313 column_name,
2314 offset,
2315 )?;
2316
2317 for (row_idx, value) in values.into_iter().enumerate() {
2319 batch_results[row_idx][spec.output_column_index] = value;
2320 }
2321 }
2322 }
2323 "ROW_NUMBER" => {
2324 let values = context.evaluate_row_number_batch(visible_rows)?;
2325
2326 for (row_idx, value) in values.into_iter().enumerate() {
2328 batch_results[row_idx][spec.output_column_index] = value;
2329 }
2330 }
2331 "RANK" => {
2332 let values = context.evaluate_rank_batch(visible_rows)?;
2333
2334 for (row_idx, value) in values.into_iter().enumerate() {
2336 batch_results[row_idx][spec.output_column_index] = value;
2337 }
2338 }
2339 "DENSE_RANK" => {
2340 let values = context.evaluate_dense_rank_batch(visible_rows)?;
2341
2342 for (row_idx, value) in values.into_iter().enumerate() {
2344 batch_results[row_idx][spec.output_column_index] = value;
2345 }
2346 }
2347 "SUM" => {
2348 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2349 let column_name = col_ref.name.as_str();
2350 let values =
2351 context.evaluate_sum_batch(visible_rows, column_name)?;
2352
2353 for (row_idx, value) in values.into_iter().enumerate() {
2354 batch_results[row_idx][spec.output_column_index] = value;
2355 }
2356 }
2357 }
2358 "AVG" => {
2359 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2360 let column_name = col_ref.name.as_str();
2361 let values =
2362 context.evaluate_avg_batch(visible_rows, column_name)?;
2363
2364 for (row_idx, value) in values.into_iter().enumerate() {
2365 batch_results[row_idx][spec.output_column_index] = value;
2366 }
2367 }
2368 }
2369 "MIN" => {
2370 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2371 let column_name = col_ref.name.as_str();
2372 let values =
2373 context.evaluate_min_batch(visible_rows, column_name)?;
2374
2375 for (row_idx, value) in values.into_iter().enumerate() {
2376 batch_results[row_idx][spec.output_column_index] = value;
2377 }
2378 }
2379 }
2380 "MAX" => {
2381 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2382 let column_name = col_ref.name.as_str();
2383 let values =
2384 context.evaluate_max_batch(visible_rows, column_name)?;
2385
2386 for (row_idx, value) in values.into_iter().enumerate() {
2387 batch_results[row_idx][spec.output_column_index] = value;
2388 }
2389 }
2390 }
2391 "COUNT" => {
2392 let column_name = match spec.args.get(0) {
2394 Some(SqlExpression::Column(col_ref)) => Some(col_ref.name.as_str()),
2395 Some(SqlExpression::StringLiteral(s)) if s == "*" => None,
2396 _ => None,
2397 };
2398
2399 let values = context.evaluate_count_batch(visible_rows, column_name)?;
2400
2401 for (row_idx, value) in values.into_iter().enumerate() {
2402 batch_results[row_idx][spec.output_column_index] = value;
2403 }
2404 }
2405 "FIRST_VALUE" => {
2406 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2407 let column_name = col_ref.name.as_str();
2408 let values = context
2409 .evaluate_first_value_batch(visible_rows, column_name)?;
2410
2411 for (row_idx, value) in values.into_iter().enumerate() {
2412 batch_results[row_idx][spec.output_column_index] = value;
2413 }
2414 }
2415 }
2416 "LAST_VALUE" => {
2417 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2418 let column_name = col_ref.name.as_str();
2419 let values =
2420 context.evaluate_last_value_batch(visible_rows, column_name)?;
2421
2422 for (row_idx, value) in values.into_iter().enumerate() {
2423 batch_results[row_idx][spec.output_column_index] = value;
2424 }
2425 }
2426 }
2427 _ => {
2428 debug!(
2430 "Window function {} not supported in batch mode, using per-row",
2431 spec.function_name
2432 );
2433 }
2434 }
2435 }
2436 }
2437
2438 for (result_row_idx, &source_row_idx) in visible_rows.iter().enumerate() {
2440 for (col_idx, item) in expanded_items.iter().enumerate() {
2441 if !matches!(batch_results[result_row_idx][col_idx], DataValue::Null) {
2443 continue;
2444 }
2445
2446 let value = match item {
2447 SelectItem::Column {
2448 column: col_ref, ..
2449 } => {
2450 match evaluator
2451 .evaluate(&SqlExpression::Column(col_ref.clone()), source_row_idx)
2452 {
2453 Ok(val) => val,
2454 Err(e) => {
2455 return Err(anyhow!(
2456 "Failed to evaluate column {}: {}",
2457 col_ref.to_sql(),
2458 e
2459 ));
2460 }
2461 }
2462 }
2463 SelectItem::Expression { expr, .. } => {
2464 if matches!(expr, SqlExpression::WindowFunction { .. }) {
2467 continue;
2469 }
2470 evaluator.evaluate(&expr, source_row_idx)?
2473 }
2474 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2475 SelectItem::StarExclude { .. } => {
2476 unreachable!("StarExclude should have been expanded")
2477 }
2478 };
2479 batch_results[result_row_idx][col_idx] = value;
2480 }
2481 }
2482
2483 for row_values in batch_results {
2485 computed_table
2486 .add_row(DataRow::new(row_values))
2487 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2488 }
2489
2490 debug!(
2491 "Batch window evaluation completed in {:.3}ms",
2492 batch_start.elapsed().as_secs_f64() * 1000.0
2493 );
2494 } else {
2495 for &row_idx in visible_rows {
2497 let mut row_values = Vec::new();
2498
2499 for item in &expanded_items {
2500 let value = match item {
2501 SelectItem::Column {
2502 column: col_ref, ..
2503 } => {
2504 match evaluator
2506 .evaluate(&SqlExpression::Column(col_ref.clone()), row_idx)
2507 {
2508 Ok(val) => val,
2509 Err(e) => {
2510 return Err(anyhow!(
2511 "Failed to evaluate column {}: {}",
2512 col_ref.to_sql(),
2513 e
2514 ));
2515 }
2516 }
2517 }
2518 SelectItem::Expression { expr, .. } => {
2519 evaluator.evaluate(&expr, row_idx)?
2521 }
2522 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2523 SelectItem::StarExclude { .. } => {
2524 unreachable!("StarExclude should have been expanded")
2525 }
2526 };
2527 row_values.push(value);
2528 }
2529
2530 computed_table
2531 .add_row(DataRow::new(row_values))
2532 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2533 }
2534 }
2535
2536 if let Some(start) = window_start {
2538 let window_duration = start.elapsed();
2539 info!(
2540 "Window function evaluation took {:.2}ms for {} rows ({} window functions)",
2541 window_duration.as_secs_f64() * 1000.0,
2542 visible_rows.len(),
2543 window_func_count
2544 );
2545
2546 plan.begin_step(
2548 StepType::WindowFunction,
2549 format!("Evaluate {} window function(s)", window_func_count),
2550 );
2551 plan.set_rows_in(visible_rows.len());
2552 plan.set_rows_out(visible_rows.len());
2553 plan.add_detail(format!("Input: {} rows", visible_rows.len()));
2554 plan.add_detail(format!("{} window functions evaluated", window_func_count));
2555 plan.add_detail(format!(
2556 "Evaluation time: {:.3}ms",
2557 window_duration.as_secs_f64() * 1000.0
2558 ));
2559 plan.end_step();
2560 }
2561
2562 Ok(DataView::new(Arc::new(computed_table)))
2565 }
2566
2567 fn apply_select_with_row_expansion(
2569 &self,
2570 view: DataView,
2571 select_items: &[SelectItem],
2572 ) -> Result<DataView> {
2573 debug!("QueryEngine::apply_select_with_row_expansion - expanding rows");
2574
2575 let source_table = view.source();
2576 let visible_rows = view.visible_row_indices();
2577 let expander_registry = RowExpanderRegistry::new();
2578
2579 let mut result_table = DataTable::new("unnest_result");
2581
2582 let mut expanded_items = Vec::new();
2584 for item in select_items {
2585 match item {
2586 SelectItem::Star { table_prefix, .. } => {
2587 if let Some(prefix) = table_prefix {
2588 debug!(
2590 "QueryEngine::apply_select_with_row_expansion - expanding {}.*",
2591 prefix
2592 );
2593 for col in &source_table.columns {
2594 if Self::column_matches_table(col, prefix) {
2595 expanded_items.push(SelectItem::Column {
2596 column: ColumnRef::unquoted(col.name.clone()),
2597 leading_comments: vec![],
2598 trailing_comment: None,
2599 });
2600 }
2601 }
2602 } else {
2603 debug!("QueryEngine::apply_select_with_row_expansion - expanding *");
2605 for col_name in source_table.column_names() {
2606 expanded_items.push(SelectItem::Column {
2607 column: ColumnRef::unquoted(col_name.to_string()),
2608 leading_comments: vec![],
2609 trailing_comment: None,
2610 });
2611 }
2612 }
2613 }
2614 _ => expanded_items.push(item.clone()),
2615 }
2616 }
2617
2618 for item in &expanded_items {
2620 let column_name = match item {
2621 SelectItem::Column {
2622 column: col_ref, ..
2623 } => col_ref.name.clone(),
2624 SelectItem::Expression { alias, .. } => alias.clone(),
2625 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2626 SelectItem::StarExclude { .. } => {
2627 unreachable!("StarExclude should have been expanded")
2628 }
2629 };
2630 result_table.add_column(DataColumn::new(&column_name));
2631 }
2632
2633 let mut evaluator =
2635 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2636
2637 for &row_idx in visible_rows {
2638 let mut unnest_expansions = Vec::new();
2640 let mut unnest_indices = Vec::new();
2641
2642 for (col_idx, item) in expanded_items.iter().enumerate() {
2643 if let SelectItem::Expression { expr, .. } = item {
2644 if let Some(expansion_result) = self.try_expand_unnest(
2645 &expr,
2646 source_table,
2647 row_idx,
2648 &mut evaluator,
2649 &expander_registry,
2650 )? {
2651 unnest_expansions.push(expansion_result);
2652 unnest_indices.push(col_idx);
2653 }
2654 }
2655 }
2656
2657 let expansion_count = if unnest_expansions.is_empty() {
2659 1 } else {
2661 unnest_expansions
2662 .iter()
2663 .map(|exp| exp.row_count())
2664 .max()
2665 .unwrap_or(1)
2666 };
2667
2668 for output_idx in 0..expansion_count {
2670 let mut row_values = Vec::new();
2671
2672 for (col_idx, item) in expanded_items.iter().enumerate() {
2673 let unnest_position = unnest_indices.iter().position(|&idx| idx == col_idx);
2675
2676 let value = if let Some(unnest_idx) = unnest_position {
2677 let expansion = &unnest_expansions[unnest_idx];
2679 expansion
2680 .values
2681 .get(output_idx)
2682 .cloned()
2683 .unwrap_or(DataValue::Null)
2684 } else {
2685 match item {
2687 SelectItem::Column {
2688 column: col_ref, ..
2689 } => {
2690 let col_idx =
2691 source_table.get_column_index(&col_ref.name).ok_or_else(
2692 || anyhow::anyhow!("Column '{}' not found", col_ref.name),
2693 )?;
2694 let row = source_table
2695 .get_row(row_idx)
2696 .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
2697 row.get(col_idx)
2698 .ok_or_else(|| {
2699 anyhow::anyhow!("Column {} not found in row", col_idx)
2700 })?
2701 .clone()
2702 }
2703 SelectItem::Expression { expr, .. } => {
2704 evaluator.evaluate(&expr, row_idx)?
2706 }
2707 SelectItem::Star { .. } => unreachable!(),
2708 SelectItem::StarExclude { .. } => {
2709 unreachable!("StarExclude should have been expanded")
2710 }
2711 }
2712 };
2713
2714 row_values.push(value);
2715 }
2716
2717 result_table
2718 .add_row(DataRow::new(row_values))
2719 .map_err(|e| anyhow::anyhow!("Failed to add expanded row: {}", e))?;
2720 }
2721 }
2722
2723 debug!(
2724 "QueryEngine::apply_select_with_row_expansion - input rows: {}, output rows: {}",
2725 visible_rows.len(),
2726 result_table.row_count()
2727 );
2728
2729 Ok(DataView::new(Arc::new(result_table)))
2730 }
2731
2732 fn try_expand_unnest(
2735 &self,
2736 expr: &SqlExpression,
2737 _source_table: &DataTable,
2738 row_idx: usize,
2739 evaluator: &mut ArithmeticEvaluator,
2740 expander_registry: &RowExpanderRegistry,
2741 ) -> Result<Option<crate::data::row_expanders::ExpansionResult>> {
2742 if let SqlExpression::Unnest { column, delimiter } = expr {
2744 let column_value = evaluator.evaluate(column, row_idx)?;
2746
2747 let delimiter_value = DataValue::String(delimiter.clone());
2749
2750 let expander = expander_registry
2752 .get("UNNEST")
2753 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2754
2755 let expansion = expander.expand(&column_value, &[delimiter_value])?;
2757 return Ok(Some(expansion));
2758 }
2759
2760 if let SqlExpression::FunctionCall { name, args, .. } = expr {
2762 if name.to_uppercase() == "UNNEST" {
2763 if args.len() != 2 {
2765 return Err(anyhow::anyhow!(
2766 "UNNEST requires exactly 2 arguments: UNNEST(column, delimiter)"
2767 ));
2768 }
2769
2770 let column_value = evaluator.evaluate(&args[0], row_idx)?;
2772
2773 let delimiter_value = evaluator.evaluate(&args[1], row_idx)?;
2775
2776 let expander = expander_registry
2778 .get("UNNEST")
2779 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2780
2781 let expansion = expander.expand(&column_value, &[delimiter_value])?;
2783 return Ok(Some(expansion));
2784 }
2785 }
2786
2787 Ok(None)
2788 }
2789
2790 fn apply_aggregate_select(
2792 &self,
2793 view: DataView,
2794 select_items: &[SelectItem],
2795 ) -> Result<DataView> {
2796 debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
2797
2798 let source_table = view.source();
2799 let mut result_table = DataTable::new("aggregate_result");
2800
2801 for item in select_items {
2803 let column_name = match item {
2804 SelectItem::Expression { alias, .. } => alias.clone(),
2805 _ => unreachable!("Should only have expressions in aggregate-only query"),
2806 };
2807 result_table.add_column(DataColumn::new(&column_name));
2808 }
2809
2810 let visible_rows = view.visible_row_indices().to_vec();
2812 let mut evaluator =
2813 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
2814 .with_visible_rows(visible_rows);
2815
2816 let mut row_values = Vec::new();
2818 for item in select_items {
2819 match item {
2820 SelectItem::Expression { expr, .. } => {
2821 let value = evaluator.evaluate(expr, 0)?;
2824 row_values.push(value);
2825 }
2826 _ => unreachable!("Should only have expressions in aggregate-only query"),
2827 }
2828 }
2829
2830 result_table
2832 .add_row(DataRow::new(row_values))
2833 .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
2834
2835 Ok(DataView::new(Arc::new(result_table)))
2836 }
2837
2838 fn column_matches_table(col: &DataColumn, table_name: &str) -> bool {
2850 if let Some(ref source) = col.source_table {
2852 if source == table_name || source.ends_with(&format!(".{}", table_name)) {
2854 return true;
2855 }
2856 }
2857
2858 if let Some(ref qualified) = col.qualified_name {
2860 if qualified.starts_with(&format!("{}.", table_name)) {
2862 return true;
2863 }
2864 }
2865
2866 false
2867 }
2868
2869 fn resolve_select_columns(
2871 &self,
2872 table: &DataTable,
2873 select_items: &[SelectItem],
2874 ) -> Result<Vec<usize>> {
2875 let mut indices = Vec::new();
2876 let table_columns = table.column_names();
2877
2878 for item in select_items {
2879 match item {
2880 SelectItem::Column {
2881 column: col_ref, ..
2882 } => {
2883 let index = if let Some(table_prefix) = &col_ref.table_prefix {
2885 let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
2887 table.find_column_by_qualified_name(&qualified_name)
2888 .ok_or_else(|| {
2889 let has_qualified = table.columns.iter()
2891 .any(|c| c.qualified_name.is_some());
2892 if !has_qualified {
2893 anyhow::anyhow!(
2894 "Column '{}' not found. Note: Table '{}' may not support qualified column names",
2895 qualified_name, table_prefix
2896 )
2897 } else {
2898 anyhow::anyhow!("Column '{}' not found", qualified_name)
2899 }
2900 })?
2901 } else {
2902 table_columns
2904 .iter()
2905 .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
2906 .ok_or_else(|| {
2907 let suggestion = self.find_similar_column(table, &col_ref.name);
2908 match suggestion {
2909 Some(similar) => anyhow::anyhow!(
2910 "Column '{}' not found. Did you mean '{}'?",
2911 col_ref.name,
2912 similar
2913 ),
2914 None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
2915 }
2916 })?
2917 };
2918 indices.push(index);
2919 }
2920 SelectItem::Star { table_prefix, .. } => {
2921 if let Some(prefix) = table_prefix {
2922 for (i, col) in table.columns.iter().enumerate() {
2924 if Self::column_matches_table(col, prefix) {
2925 indices.push(i);
2926 }
2927 }
2928 } else {
2929 for i in 0..table_columns.len() {
2931 indices.push(i);
2932 }
2933 }
2934 }
2935 SelectItem::StarExclude {
2936 table_prefix,
2937 excluded_columns,
2938 ..
2939 } => {
2940 if let Some(prefix) = table_prefix {
2942 for (i, col) in table.columns.iter().enumerate() {
2944 if Self::column_matches_table(col, prefix)
2945 && !excluded_columns.contains(&col.name)
2946 {
2947 indices.push(i);
2948 }
2949 }
2950 } else {
2951 for (i, col_name) in table_columns.iter().enumerate() {
2953 if !excluded_columns
2954 .iter()
2955 .any(|exc| exc.eq_ignore_ascii_case(col_name))
2956 {
2957 indices.push(i);
2958 }
2959 }
2960 }
2961 }
2962 SelectItem::Expression { .. } => {
2963 return Err(anyhow::anyhow!(
2964 "Computed expressions require new table creation"
2965 ));
2966 }
2967 }
2968 }
2969
2970 Ok(indices)
2971 }
2972
2973 fn apply_distinct(&self, view: DataView) -> Result<DataView> {
2975 use std::collections::HashSet;
2976
2977 let source = view.source();
2978 let visible_cols = view.visible_column_indices();
2979 let visible_rows = view.visible_row_indices();
2980
2981 let mut seen_rows = HashSet::new();
2983 let mut unique_row_indices = Vec::new();
2984
2985 for &row_idx in visible_rows {
2986 let mut row_key = Vec::new();
2988 for &col_idx in visible_cols {
2989 let value = source
2990 .get_value(row_idx, col_idx)
2991 .ok_or_else(|| anyhow!("Invalid cell reference"))?;
2992 row_key.push(format!("{:?}", value));
2994 }
2995
2996 if seen_rows.insert(row_key) {
2998 unique_row_indices.push(row_idx);
3000 }
3001 }
3002
3003 Ok(view.with_rows(unique_row_indices))
3005 }
3006
3007 fn apply_multi_order_by(
3009 &self,
3010 view: DataView,
3011 order_by_columns: &[OrderByItem],
3012 ) -> Result<DataView> {
3013 self.apply_multi_order_by_with_context(view, order_by_columns, None)
3014 }
3015
3016 fn apply_multi_order_by_with_context(
3018 &self,
3019 mut view: DataView,
3020 order_by_columns: &[OrderByItem],
3021 _exec_context: Option<&ExecutionContext>,
3022 ) -> Result<DataView> {
3023 let mut sort_columns = Vec::new();
3025
3026 for order_col in order_by_columns {
3027 let column_name = match &order_col.expr {
3029 SqlExpression::Column(col_ref) => col_ref.name.clone(),
3030 _ => {
3031 return Err(anyhow!(
3033 "ORDER BY expressions not yet supported - only simple columns allowed"
3034 ));
3035 }
3036 };
3037
3038 let col_index = if column_name.contains('.') {
3040 if let Some(dot_pos) = column_name.rfind('.') {
3042 let col_name = &column_name[dot_pos + 1..];
3043
3044 debug!(
3047 "ORDER BY: Extracting unqualified column '{}' from '{}'",
3048 col_name, column_name
3049 );
3050 view.source().get_column_index(col_name)
3051 } else {
3052 view.source().get_column_index(&column_name)
3053 }
3054 } else {
3055 view.source().get_column_index(&column_name)
3057 }
3058 .ok_or_else(|| {
3059 let suggestion = self.find_similar_column(view.source(), &column_name);
3061 match suggestion {
3062 Some(similar) => anyhow::anyhow!(
3063 "Column '{}' not found. Did you mean '{}'?",
3064 column_name,
3065 similar
3066 ),
3067 None => {
3068 let available_cols = view.source().column_names().join(", ");
3070 anyhow::anyhow!(
3071 "Column '{}' not found. Available columns: {}",
3072 column_name,
3073 available_cols
3074 )
3075 }
3076 }
3077 })?;
3078
3079 let ascending = matches!(order_col.direction, SortDirection::Asc);
3080 sort_columns.push((col_index, ascending));
3081 }
3082
3083 view.apply_multi_sort(&sort_columns)?;
3085 Ok(view)
3086 }
3087
3088 fn apply_group_by(
3090 &self,
3091 view: DataView,
3092 group_by_exprs: &[SqlExpression],
3093 select_items: &[SelectItem],
3094 having: Option<&SqlExpression>,
3095 plan: &mut ExecutionPlanBuilder,
3096 ) -> Result<DataView> {
3097 let (result_view, phase_info) = self.apply_group_by_expressions(
3099 view,
3100 group_by_exprs,
3101 select_items,
3102 having,
3103 self.case_insensitive,
3104 self.date_notation.clone(),
3105 )?;
3106
3107 plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
3109 plan.add_detail(format!(
3110 "Phase 1 - Group Building: {:.3}ms",
3111 phase_info.phase2_key_building.as_secs_f64() * 1000.0
3112 ));
3113 plan.add_detail(format!(
3114 " • Processing {} rows into {} groups",
3115 phase_info.total_rows, phase_info.num_groups
3116 ));
3117 plan.add_detail(format!(
3118 "Phase 2 - Aggregation: {:.3}ms",
3119 phase_info.phase4_aggregation.as_secs_f64() * 1000.0
3120 ));
3121 if phase_info.phase4_having_evaluation > Duration::ZERO {
3122 plan.add_detail(format!(
3123 "Phase 3 - HAVING Filter: {:.3}ms",
3124 phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
3125 ));
3126 plan.add_detail(format!(
3127 " • Filtered {} groups",
3128 phase_info.groups_filtered_by_having
3129 ));
3130 }
3131 plan.add_detail(format!(
3132 "Total GROUP BY time: {:.3}ms",
3133 phase_info.total_time.as_secs_f64() * 1000.0
3134 ));
3135
3136 Ok(result_view)
3137 }
3138
3139 pub fn estimate_group_cardinality(
3142 &self,
3143 view: &DataView,
3144 group_by_exprs: &[SqlExpression],
3145 ) -> usize {
3146 let row_count = view.get_visible_rows().len();
3148 if row_count <= 100 {
3149 return row_count;
3150 }
3151
3152 let sample_size = min(1000, row_count / 10).max(100);
3154 let mut seen = FxHashSet::default();
3155
3156 let visible_rows = view.get_visible_rows();
3157 for (i, &row_idx) in visible_rows.iter().enumerate() {
3158 if i >= sample_size {
3159 break;
3160 }
3161
3162 let mut key_values = Vec::new();
3164 for expr in group_by_exprs {
3165 let mut evaluator = ArithmeticEvaluator::new(view.source());
3166 let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
3167 key_values.push(value);
3168 }
3169
3170 seen.insert(key_values);
3171 }
3172
3173 let sample_cardinality = seen.len();
3175 let estimated = (sample_cardinality * row_count) / sample_size;
3176
3177 estimated.min(row_count).max(sample_cardinality)
3179 }
3180}
3181
3182#[cfg(test)]
3183mod tests {
3184 use super::*;
3185 use crate::data::datatable::{DataColumn, DataRow, DataValue};
3186
3187 fn create_test_table() -> Arc<DataTable> {
3188 let mut table = DataTable::new("test");
3189
3190 table.add_column(DataColumn::new("id"));
3192 table.add_column(DataColumn::new("name"));
3193 table.add_column(DataColumn::new("age"));
3194
3195 table
3197 .add_row(DataRow::new(vec![
3198 DataValue::Integer(1),
3199 DataValue::String("Alice".to_string()),
3200 DataValue::Integer(30),
3201 ]))
3202 .unwrap();
3203
3204 table
3205 .add_row(DataRow::new(vec![
3206 DataValue::Integer(2),
3207 DataValue::String("Bob".to_string()),
3208 DataValue::Integer(25),
3209 ]))
3210 .unwrap();
3211
3212 table
3213 .add_row(DataRow::new(vec![
3214 DataValue::Integer(3),
3215 DataValue::String("Charlie".to_string()),
3216 DataValue::Integer(35),
3217 ]))
3218 .unwrap();
3219
3220 Arc::new(table)
3221 }
3222
3223 #[test]
3224 fn test_select_all() {
3225 let table = create_test_table();
3226 let engine = QueryEngine::new();
3227
3228 let view = engine
3229 .execute(table.clone(), "SELECT * FROM users")
3230 .unwrap();
3231 assert_eq!(view.row_count(), 3);
3232 assert_eq!(view.column_count(), 3);
3233 }
3234
3235 #[test]
3236 fn test_select_columns() {
3237 let table = create_test_table();
3238 let engine = QueryEngine::new();
3239
3240 let view = engine
3241 .execute(table.clone(), "SELECT name, age FROM users")
3242 .unwrap();
3243 assert_eq!(view.row_count(), 3);
3244 assert_eq!(view.column_count(), 2);
3245 }
3246
3247 #[test]
3248 fn test_select_with_limit() {
3249 let table = create_test_table();
3250 let engine = QueryEngine::new();
3251
3252 let view = engine
3253 .execute(table.clone(), "SELECT * FROM users LIMIT 2")
3254 .unwrap();
3255 assert_eq!(view.row_count(), 2);
3256 }
3257
3258 #[test]
3259 fn test_type_coercion_contains() {
3260 let _ = tracing_subscriber::fmt()
3262 .with_max_level(tracing::Level::DEBUG)
3263 .try_init();
3264
3265 let mut table = DataTable::new("test");
3266 table.add_column(DataColumn::new("id"));
3267 table.add_column(DataColumn::new("status"));
3268 table.add_column(DataColumn::new("price"));
3269
3270 table
3272 .add_row(DataRow::new(vec![
3273 DataValue::Integer(1),
3274 DataValue::String("Pending".to_string()),
3275 DataValue::Float(99.99),
3276 ]))
3277 .unwrap();
3278
3279 table
3280 .add_row(DataRow::new(vec![
3281 DataValue::Integer(2),
3282 DataValue::String("Confirmed".to_string()),
3283 DataValue::Float(150.50),
3284 ]))
3285 .unwrap();
3286
3287 table
3288 .add_row(DataRow::new(vec![
3289 DataValue::Integer(3),
3290 DataValue::String("Pending".to_string()),
3291 DataValue::Float(75.00),
3292 ]))
3293 .unwrap();
3294
3295 let table = Arc::new(table);
3296 let engine = QueryEngine::new();
3297
3298 println!("\n=== Testing WHERE clause with Contains ===");
3299 println!("Table has {} rows", table.row_count());
3300 for i in 0..table.row_count() {
3301 let status = table.get_value(i, 1);
3302 println!("Row {i}: status = {status:?}");
3303 }
3304
3305 println!("\n--- Test 1: status.Contains('pend') ---");
3307 let result = engine.execute(
3308 table.clone(),
3309 "SELECT * FROM test WHERE status.Contains('pend')",
3310 );
3311 match result {
3312 Ok(view) => {
3313 println!("SUCCESS: Found {} matching rows", view.row_count());
3314 assert_eq!(view.row_count(), 2); }
3316 Err(e) => {
3317 panic!("Query failed: {e}");
3318 }
3319 }
3320
3321 println!("\n--- Test 2: price.Contains('9') ---");
3323 let result = engine.execute(
3324 table.clone(),
3325 "SELECT * FROM test WHERE price.Contains('9')",
3326 );
3327 match result {
3328 Ok(view) => {
3329 println!(
3330 "SUCCESS: Found {} matching rows with price containing '9'",
3331 view.row_count()
3332 );
3333 assert!(view.row_count() >= 1);
3335 }
3336 Err(e) => {
3337 panic!("Numeric coercion query failed: {e}");
3338 }
3339 }
3340
3341 println!("\n=== All tests passed! ===");
3342 }
3343
3344 #[test]
3345 fn test_not_in_clause() {
3346 let _ = tracing_subscriber::fmt()
3348 .with_max_level(tracing::Level::DEBUG)
3349 .try_init();
3350
3351 let mut table = DataTable::new("test");
3352 table.add_column(DataColumn::new("id"));
3353 table.add_column(DataColumn::new("country"));
3354
3355 table
3357 .add_row(DataRow::new(vec![
3358 DataValue::Integer(1),
3359 DataValue::String("CA".to_string()),
3360 ]))
3361 .unwrap();
3362
3363 table
3364 .add_row(DataRow::new(vec![
3365 DataValue::Integer(2),
3366 DataValue::String("US".to_string()),
3367 ]))
3368 .unwrap();
3369
3370 table
3371 .add_row(DataRow::new(vec![
3372 DataValue::Integer(3),
3373 DataValue::String("UK".to_string()),
3374 ]))
3375 .unwrap();
3376
3377 let table = Arc::new(table);
3378 let engine = QueryEngine::new();
3379
3380 println!("\n=== Testing NOT IN clause ===");
3381 println!("Table has {} rows", table.row_count());
3382 for i in 0..table.row_count() {
3383 let country = table.get_value(i, 1);
3384 println!("Row {i}: country = {country:?}");
3385 }
3386
3387 println!("\n--- Test: country NOT IN ('CA') ---");
3389 let result = engine.execute(
3390 table.clone(),
3391 "SELECT * FROM test WHERE country NOT IN ('CA')",
3392 );
3393 match result {
3394 Ok(view) => {
3395 println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
3396 assert_eq!(view.row_count(), 2); }
3398 Err(e) => {
3399 panic!("NOT IN query failed: {e}");
3400 }
3401 }
3402
3403 println!("\n=== NOT IN test complete! ===");
3404 }
3405
3406 #[test]
3407 fn test_case_insensitive_in_and_not_in() {
3408 let _ = tracing_subscriber::fmt()
3410 .with_max_level(tracing::Level::DEBUG)
3411 .try_init();
3412
3413 let mut table = DataTable::new("test");
3414 table.add_column(DataColumn::new("id"));
3415 table.add_column(DataColumn::new("country"));
3416
3417 table
3419 .add_row(DataRow::new(vec![
3420 DataValue::Integer(1),
3421 DataValue::String("CA".to_string()), ]))
3423 .unwrap();
3424
3425 table
3426 .add_row(DataRow::new(vec![
3427 DataValue::Integer(2),
3428 DataValue::String("us".to_string()), ]))
3430 .unwrap();
3431
3432 table
3433 .add_row(DataRow::new(vec![
3434 DataValue::Integer(3),
3435 DataValue::String("UK".to_string()), ]))
3437 .unwrap();
3438
3439 let table = Arc::new(table);
3440
3441 println!("\n=== Testing Case-Insensitive IN clause ===");
3442 println!("Table has {} rows", table.row_count());
3443 for i in 0..table.row_count() {
3444 let country = table.get_value(i, 1);
3445 println!("Row {i}: country = {country:?}");
3446 }
3447
3448 println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
3450 let engine = QueryEngine::with_case_insensitive(true);
3451 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3452 match result {
3453 Ok(view) => {
3454 println!(
3455 "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
3456 view.row_count()
3457 );
3458 assert_eq!(view.row_count(), 1); }
3460 Err(e) => {
3461 panic!("Case-insensitive IN query failed: {e}");
3462 }
3463 }
3464
3465 println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
3467 let result = engine.execute(
3468 table.clone(),
3469 "SELECT * FROM test WHERE country NOT IN ('ca')",
3470 );
3471 match result {
3472 Ok(view) => {
3473 println!(
3474 "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
3475 view.row_count()
3476 );
3477 assert_eq!(view.row_count(), 2); }
3479 Err(e) => {
3480 panic!("Case-insensitive NOT IN query failed: {e}");
3481 }
3482 }
3483
3484 println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
3486 let engine_case_sensitive = QueryEngine::new(); let result = engine_case_sensitive
3488 .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3489 match result {
3490 Ok(view) => {
3491 println!(
3492 "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
3493 view.row_count()
3494 );
3495 assert_eq!(view.row_count(), 0); }
3497 Err(e) => {
3498 panic!("Case-sensitive IN query failed: {e}");
3499 }
3500 }
3501
3502 println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
3503 }
3504
3505 #[test]
3506 #[ignore = "Parentheses in WHERE clause not yet implemented"]
3507 fn test_parentheses_in_where_clause() {
3508 let _ = tracing_subscriber::fmt()
3510 .with_max_level(tracing::Level::DEBUG)
3511 .try_init();
3512
3513 let mut table = DataTable::new("test");
3514 table.add_column(DataColumn::new("id"));
3515 table.add_column(DataColumn::new("status"));
3516 table.add_column(DataColumn::new("priority"));
3517
3518 table
3520 .add_row(DataRow::new(vec![
3521 DataValue::Integer(1),
3522 DataValue::String("Pending".to_string()),
3523 DataValue::String("High".to_string()),
3524 ]))
3525 .unwrap();
3526
3527 table
3528 .add_row(DataRow::new(vec![
3529 DataValue::Integer(2),
3530 DataValue::String("Complete".to_string()),
3531 DataValue::String("High".to_string()),
3532 ]))
3533 .unwrap();
3534
3535 table
3536 .add_row(DataRow::new(vec![
3537 DataValue::Integer(3),
3538 DataValue::String("Pending".to_string()),
3539 DataValue::String("Low".to_string()),
3540 ]))
3541 .unwrap();
3542
3543 table
3544 .add_row(DataRow::new(vec![
3545 DataValue::Integer(4),
3546 DataValue::String("Complete".to_string()),
3547 DataValue::String("Low".to_string()),
3548 ]))
3549 .unwrap();
3550
3551 let table = Arc::new(table);
3552 let engine = QueryEngine::new();
3553
3554 println!("\n=== Testing Parentheses in WHERE clause ===");
3555 println!("Table has {} rows", table.row_count());
3556 for i in 0..table.row_count() {
3557 let status = table.get_value(i, 1);
3558 let priority = table.get_value(i, 2);
3559 println!("Row {i}: status = {status:?}, priority = {priority:?}");
3560 }
3561
3562 println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
3564 let result = engine.execute(
3565 table.clone(),
3566 "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
3567 );
3568 match result {
3569 Ok(view) => {
3570 println!(
3571 "SUCCESS: Found {} rows with parenthetical logic",
3572 view.row_count()
3573 );
3574 assert_eq!(view.row_count(), 2); }
3576 Err(e) => {
3577 panic!("Parentheses query failed: {e}");
3578 }
3579 }
3580
3581 println!("\n=== Parentheses test complete! ===");
3582 }
3583
3584 #[test]
3585 #[ignore = "Numeric type coercion needs fixing"]
3586 fn test_numeric_type_coercion() {
3587 let _ = tracing_subscriber::fmt()
3589 .with_max_level(tracing::Level::DEBUG)
3590 .try_init();
3591
3592 let mut table = DataTable::new("test");
3593 table.add_column(DataColumn::new("id"));
3594 table.add_column(DataColumn::new("price"));
3595 table.add_column(DataColumn::new("quantity"));
3596
3597 table
3599 .add_row(DataRow::new(vec![
3600 DataValue::Integer(1),
3601 DataValue::Float(99.50), DataValue::Integer(100),
3603 ]))
3604 .unwrap();
3605
3606 table
3607 .add_row(DataRow::new(vec![
3608 DataValue::Integer(2),
3609 DataValue::Float(150.0), DataValue::Integer(200),
3611 ]))
3612 .unwrap();
3613
3614 table
3615 .add_row(DataRow::new(vec![
3616 DataValue::Integer(3),
3617 DataValue::Integer(75), DataValue::Integer(50),
3619 ]))
3620 .unwrap();
3621
3622 let table = Arc::new(table);
3623 let engine = QueryEngine::new();
3624
3625 println!("\n=== Testing Numeric Type Coercion ===");
3626 println!("Table has {} rows", table.row_count());
3627 for i in 0..table.row_count() {
3628 let price = table.get_value(i, 1);
3629 let quantity = table.get_value(i, 2);
3630 println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
3631 }
3632
3633 println!("\n--- Test: price.Contains('.') ---");
3635 let result = engine.execute(
3636 table.clone(),
3637 "SELECT * FROM test WHERE price.Contains('.')",
3638 );
3639 match result {
3640 Ok(view) => {
3641 println!(
3642 "SUCCESS: Found {} rows with decimal points in price",
3643 view.row_count()
3644 );
3645 assert_eq!(view.row_count(), 2); }
3647 Err(e) => {
3648 panic!("Numeric Contains query failed: {e}");
3649 }
3650 }
3651
3652 println!("\n--- Test: quantity.Contains('0') ---");
3654 let result = engine.execute(
3655 table.clone(),
3656 "SELECT * FROM test WHERE quantity.Contains('0')",
3657 );
3658 match result {
3659 Ok(view) => {
3660 println!(
3661 "SUCCESS: Found {} rows with '0' in quantity",
3662 view.row_count()
3663 );
3664 assert_eq!(view.row_count(), 2); }
3666 Err(e) => {
3667 panic!("Integer Contains query failed: {e}");
3668 }
3669 }
3670
3671 println!("\n=== Numeric type coercion test complete! ===");
3672 }
3673
3674 #[test]
3675 fn test_datetime_comparisons() {
3676 let _ = tracing_subscriber::fmt()
3678 .with_max_level(tracing::Level::DEBUG)
3679 .try_init();
3680
3681 let mut table = DataTable::new("test");
3682 table.add_column(DataColumn::new("id"));
3683 table.add_column(DataColumn::new("created_date"));
3684
3685 table
3687 .add_row(DataRow::new(vec![
3688 DataValue::Integer(1),
3689 DataValue::String("2024-12-15".to_string()),
3690 ]))
3691 .unwrap();
3692
3693 table
3694 .add_row(DataRow::new(vec![
3695 DataValue::Integer(2),
3696 DataValue::String("2025-01-15".to_string()),
3697 ]))
3698 .unwrap();
3699
3700 table
3701 .add_row(DataRow::new(vec![
3702 DataValue::Integer(3),
3703 DataValue::String("2025-02-15".to_string()),
3704 ]))
3705 .unwrap();
3706
3707 let table = Arc::new(table);
3708 let engine = QueryEngine::new();
3709
3710 println!("\n=== Testing DateTime Comparisons ===");
3711 println!("Table has {} rows", table.row_count());
3712 for i in 0..table.row_count() {
3713 let date = table.get_value(i, 1);
3714 println!("Row {i}: created_date = {date:?}");
3715 }
3716
3717 println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
3719 let result = engine.execute(
3720 table.clone(),
3721 "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
3722 );
3723 match result {
3724 Ok(view) => {
3725 println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
3726 assert_eq!(view.row_count(), 2); }
3728 Err(e) => {
3729 panic!("DateTime comparison query failed: {e}");
3730 }
3731 }
3732
3733 println!("\n=== DateTime comparison test complete! ===");
3734 }
3735
3736 #[test]
3737 fn test_not_with_method_calls() {
3738 let _ = tracing_subscriber::fmt()
3740 .with_max_level(tracing::Level::DEBUG)
3741 .try_init();
3742
3743 let mut table = DataTable::new("test");
3744 table.add_column(DataColumn::new("id"));
3745 table.add_column(DataColumn::new("status"));
3746
3747 table
3749 .add_row(DataRow::new(vec![
3750 DataValue::Integer(1),
3751 DataValue::String("Pending Review".to_string()),
3752 ]))
3753 .unwrap();
3754
3755 table
3756 .add_row(DataRow::new(vec![
3757 DataValue::Integer(2),
3758 DataValue::String("Complete".to_string()),
3759 ]))
3760 .unwrap();
3761
3762 table
3763 .add_row(DataRow::new(vec![
3764 DataValue::Integer(3),
3765 DataValue::String("Pending Approval".to_string()),
3766 ]))
3767 .unwrap();
3768
3769 let table = Arc::new(table);
3770 let engine = QueryEngine::with_case_insensitive(true);
3771
3772 println!("\n=== Testing NOT with Method Calls ===");
3773 println!("Table has {} rows", table.row_count());
3774 for i in 0..table.row_count() {
3775 let status = table.get_value(i, 1);
3776 println!("Row {i}: status = {status:?}");
3777 }
3778
3779 println!("\n--- Test: NOT status.Contains('pend') ---");
3781 let result = engine.execute(
3782 table.clone(),
3783 "SELECT * FROM test WHERE NOT status.Contains('pend')",
3784 );
3785 match result {
3786 Ok(view) => {
3787 println!(
3788 "SUCCESS: Found {} rows NOT containing 'pend'",
3789 view.row_count()
3790 );
3791 assert_eq!(view.row_count(), 1); }
3793 Err(e) => {
3794 panic!("NOT Contains query failed: {e}");
3795 }
3796 }
3797
3798 println!("\n--- Test: NOT status.StartsWith('Pending') ---");
3800 let result = engine.execute(
3801 table.clone(),
3802 "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
3803 );
3804 match result {
3805 Ok(view) => {
3806 println!(
3807 "SUCCESS: Found {} rows NOT starting with 'Pending'",
3808 view.row_count()
3809 );
3810 assert_eq!(view.row_count(), 1); }
3812 Err(e) => {
3813 panic!("NOT StartsWith query failed: {e}");
3814 }
3815 }
3816
3817 println!("\n=== NOT with method calls test complete! ===");
3818 }
3819
3820 #[test]
3821 #[ignore = "Complex logical expressions with parentheses not yet implemented"]
3822 fn test_complex_logical_expressions() {
3823 let _ = tracing_subscriber::fmt()
3825 .with_max_level(tracing::Level::DEBUG)
3826 .try_init();
3827
3828 let mut table = DataTable::new("test");
3829 table.add_column(DataColumn::new("id"));
3830 table.add_column(DataColumn::new("status"));
3831 table.add_column(DataColumn::new("priority"));
3832 table.add_column(DataColumn::new("assigned"));
3833
3834 table
3836 .add_row(DataRow::new(vec![
3837 DataValue::Integer(1),
3838 DataValue::String("Pending".to_string()),
3839 DataValue::String("High".to_string()),
3840 DataValue::String("John".to_string()),
3841 ]))
3842 .unwrap();
3843
3844 table
3845 .add_row(DataRow::new(vec![
3846 DataValue::Integer(2),
3847 DataValue::String("Complete".to_string()),
3848 DataValue::String("High".to_string()),
3849 DataValue::String("Jane".to_string()),
3850 ]))
3851 .unwrap();
3852
3853 table
3854 .add_row(DataRow::new(vec![
3855 DataValue::Integer(3),
3856 DataValue::String("Pending".to_string()),
3857 DataValue::String("Low".to_string()),
3858 DataValue::String("John".to_string()),
3859 ]))
3860 .unwrap();
3861
3862 table
3863 .add_row(DataRow::new(vec![
3864 DataValue::Integer(4),
3865 DataValue::String("In Progress".to_string()),
3866 DataValue::String("Medium".to_string()),
3867 DataValue::String("Jane".to_string()),
3868 ]))
3869 .unwrap();
3870
3871 let table = Arc::new(table);
3872 let engine = QueryEngine::new();
3873
3874 println!("\n=== Testing Complex Logical Expressions ===");
3875 println!("Table has {} rows", table.row_count());
3876 for i in 0..table.row_count() {
3877 let status = table.get_value(i, 1);
3878 let priority = table.get_value(i, 2);
3879 let assigned = table.get_value(i, 3);
3880 println!(
3881 "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
3882 );
3883 }
3884
3885 println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
3887 let result = engine.execute(
3888 table.clone(),
3889 "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
3890 );
3891 match result {
3892 Ok(view) => {
3893 println!(
3894 "SUCCESS: Found {} rows with complex logic",
3895 view.row_count()
3896 );
3897 assert_eq!(view.row_count(), 2); }
3899 Err(e) => {
3900 panic!("Complex logic query failed: {e}");
3901 }
3902 }
3903
3904 println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
3906 let result = engine.execute(
3907 table.clone(),
3908 "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
3909 );
3910 match result {
3911 Ok(view) => {
3912 println!(
3913 "SUCCESS: Found {} rows with NOT complex logic",
3914 view.row_count()
3915 );
3916 assert_eq!(view.row_count(), 2); }
3918 Err(e) => {
3919 panic!("NOT complex logic query failed: {e}");
3920 }
3921 }
3922
3923 println!("\n=== Complex logical expressions test complete! ===");
3924 }
3925
3926 #[test]
3927 fn test_mixed_data_types_and_edge_cases() {
3928 let _ = tracing_subscriber::fmt()
3930 .with_max_level(tracing::Level::DEBUG)
3931 .try_init();
3932
3933 let mut table = DataTable::new("test");
3934 table.add_column(DataColumn::new("id"));
3935 table.add_column(DataColumn::new("value"));
3936 table.add_column(DataColumn::new("nullable_field"));
3937
3938 table
3940 .add_row(DataRow::new(vec![
3941 DataValue::Integer(1),
3942 DataValue::String("123.45".to_string()),
3943 DataValue::String("present".to_string()),
3944 ]))
3945 .unwrap();
3946
3947 table
3948 .add_row(DataRow::new(vec![
3949 DataValue::Integer(2),
3950 DataValue::Float(678.90),
3951 DataValue::Null,
3952 ]))
3953 .unwrap();
3954
3955 table
3956 .add_row(DataRow::new(vec![
3957 DataValue::Integer(3),
3958 DataValue::Boolean(true),
3959 DataValue::String("also present".to_string()),
3960 ]))
3961 .unwrap();
3962
3963 table
3964 .add_row(DataRow::new(vec![
3965 DataValue::Integer(4),
3966 DataValue::String("false".to_string()),
3967 DataValue::Null,
3968 ]))
3969 .unwrap();
3970
3971 let table = Arc::new(table);
3972 let engine = QueryEngine::new();
3973
3974 println!("\n=== Testing Mixed Data Types and Edge Cases ===");
3975 println!("Table has {} rows", table.row_count());
3976 for i in 0..table.row_count() {
3977 let value = table.get_value(i, 1);
3978 let nullable = table.get_value(i, 2);
3979 println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
3980 }
3981
3982 println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
3984 let result = engine.execute(
3985 table.clone(),
3986 "SELECT * FROM test WHERE value.Contains('true')",
3987 );
3988 match result {
3989 Ok(view) => {
3990 println!(
3991 "SUCCESS: Found {} rows with boolean coercion",
3992 view.row_count()
3993 );
3994 assert_eq!(view.row_count(), 1); }
3996 Err(e) => {
3997 panic!("Boolean coercion query failed: {e}");
3998 }
3999 }
4000
4001 println!("\n--- Test: id IN (1, 3) ---");
4003 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
4004 match result {
4005 Ok(view) => {
4006 println!("SUCCESS: Found {} rows with IN clause", view.row_count());
4007 assert_eq!(view.row_count(), 2); }
4009 Err(e) => {
4010 panic!("Multiple IN values query failed: {e}");
4011 }
4012 }
4013
4014 println!("\n=== Mixed data types test complete! ===");
4015 }
4016
4017 #[test]
4019 fn test_aggregate_only_single_row() {
4020 let table = create_test_stock_data();
4021 let engine = QueryEngine::new();
4022
4023 let result = engine
4025 .execute(
4026 table.clone(),
4027 "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
4028 )
4029 .expect("Query should succeed");
4030
4031 assert_eq!(
4032 result.row_count(),
4033 1,
4034 "Aggregate-only query should return exactly 1 row"
4035 );
4036 assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
4037
4038 let source = result.source();
4040 let row = source.get_row(0).expect("Should have first row");
4041
4042 assert_eq!(row.values[0], DataValue::Integer(5));
4044
4045 assert_eq!(row.values[1], DataValue::Float(99.5));
4047
4048 assert_eq!(row.values[2], DataValue::Float(105.0));
4050
4051 if let DataValue::Float(avg) = &row.values[3] {
4053 assert!(
4054 (avg - 102.4).abs() < 0.01,
4055 "Average should be approximately 102.4, got {}",
4056 avg
4057 );
4058 } else {
4059 panic!("AVG should return a Float value");
4060 }
4061 }
4062
4063 #[test]
4065 fn test_single_aggregate_single_row() {
4066 let table = create_test_stock_data();
4067 let engine = QueryEngine::new();
4068
4069 let result = engine
4070 .execute(table.clone(), "SELECT COUNT(*) FROM stock")
4071 .expect("Query should succeed");
4072
4073 assert_eq!(
4074 result.row_count(),
4075 1,
4076 "Single aggregate query should return exactly 1 row"
4077 );
4078 assert_eq!(result.column_count(), 1, "Should have 1 column");
4079
4080 let source = result.source();
4081 let row = source.get_row(0).expect("Should have first row");
4082 assert_eq!(row.values[0], DataValue::Integer(5));
4083 }
4084
4085 #[test]
4087 fn test_aggregate_with_where_single_row() {
4088 let table = create_test_stock_data();
4089 let engine = QueryEngine::new();
4090
4091 let result = engine
4093 .execute(
4094 table.clone(),
4095 "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
4096 )
4097 .expect("Query should succeed");
4098
4099 assert_eq!(
4100 result.row_count(),
4101 1,
4102 "Filtered aggregate query should return exactly 1 row"
4103 );
4104 assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
4105
4106 let source = result.source();
4107 let row = source.get_row(0).expect("Should have first row");
4108
4109 assert_eq!(row.values[0], DataValue::Integer(2));
4111 assert_eq!(row.values[1], DataValue::Float(103.5)); assert_eq!(row.values[2], DataValue::Float(105.0)); }
4114
4115 #[test]
4116 fn test_not_in_parsing() {
4117 use crate::sql::recursive_parser::Parser;
4118
4119 let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
4120 println!("\n=== Testing NOT IN parsing ===");
4121 println!("Parsing query: {query}");
4122
4123 let mut parser = Parser::new(query);
4124 match parser.parse() {
4125 Ok(statement) => {
4126 println!("Parsed statement: {statement:#?}");
4127 if let Some(where_clause) = statement.where_clause {
4128 println!("WHERE conditions: {:#?}", where_clause.conditions);
4129 if let Some(first_condition) = where_clause.conditions.first() {
4130 println!("First condition expression: {:#?}", first_condition.expr);
4131 }
4132 }
4133 }
4134 Err(e) => {
4135 panic!("Parse error: {e}");
4136 }
4137 }
4138 }
4139
4140 fn create_test_stock_data() -> Arc<DataTable> {
4142 let mut table = DataTable::new("stock");
4143
4144 table.add_column(DataColumn::new("symbol"));
4145 table.add_column(DataColumn::new("close"));
4146 table.add_column(DataColumn::new("volume"));
4147
4148 let test_data = vec![
4150 ("AAPL", 99.5, 1000),
4151 ("AAPL", 101.2, 1500),
4152 ("AAPL", 103.5, 2000),
4153 ("AAPL", 105.0, 1200),
4154 ("AAPL", 102.8, 1800),
4155 ];
4156
4157 for (symbol, close, volume) in test_data {
4158 table
4159 .add_row(DataRow::new(vec![
4160 DataValue::String(symbol.to_string()),
4161 DataValue::Float(close),
4162 DataValue::Integer(volume),
4163 ]))
4164 .expect("Should add row successfully");
4165 }
4166
4167 Arc::new(table)
4168 }
4169}
4170
4171#[cfg(test)]
4172#[path = "query_engine_tests.rs"]
4173mod query_engine_tests;