1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::row_expanders::RowExpanderRegistry;
19use crate::data::subquery_executor::SubqueryExecutor;
20use crate::data::temp_table_registry::TempTableRegistry;
21use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
22use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
23use crate::sql::parser::ast::ColumnRef;
24use crate::sql::parser::ast::SetOperation;
25use crate::sql::parser::ast::TableSource;
26use crate::sql::parser::ast::WindowSpec;
27use crate::sql::recursive_parser::{
28 CTEType, OrderByItem, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
29 TableFunction,
30};
31
32#[derive(Debug, Clone)]
34pub struct ExecutionContext {
35 alias_map: HashMap<String, String>,
38}
39
40impl ExecutionContext {
41 pub fn new() -> Self {
43 Self {
44 alias_map: HashMap::new(),
45 }
46 }
47
48 pub fn register_alias(&mut self, alias: String, table_name: String) {
50 debug!("Registering alias: {} -> {}", alias, table_name);
51 self.alias_map.insert(alias, table_name);
52 }
53
54 pub fn resolve_alias(&self, name: &str) -> String {
57 self.alias_map
58 .get(name)
59 .cloned()
60 .unwrap_or_else(|| name.to_string())
61 }
62
63 pub fn is_alias(&self, name: &str) -> bool {
65 self.alias_map.contains_key(name)
66 }
67
68 pub fn get_aliases(&self) -> HashMap<String, String> {
70 self.alias_map.clone()
71 }
72
73 pub fn resolve_column_index(&self, table: &DataTable, column_ref: &ColumnRef) -> Result<usize> {
88 if let Some(table_prefix) = &column_ref.table_prefix {
89 let actual_table = self.resolve_alias(table_prefix);
91
92 let qualified_name = format!("{}.{}", actual_table, column_ref.name);
94 if let Some(idx) = table.find_column_by_qualified_name(&qualified_name) {
95 debug!(
96 "Resolved {}.{} -> qualified column '{}' at index {}",
97 table_prefix, column_ref.name, qualified_name, idx
98 );
99 return Ok(idx);
100 }
101
102 if let Some(idx) = table.get_column_index(&column_ref.name) {
104 debug!(
105 "Resolved {}.{} -> unqualified column '{}' at index {}",
106 table_prefix, column_ref.name, column_ref.name, idx
107 );
108 return Ok(idx);
109 }
110
111 Err(anyhow!(
113 "Column '{}' not found. Table '{}' may not support qualified column names",
114 qualified_name,
115 actual_table
116 ))
117 } else {
118 if let Some(idx) = table.get_column_index(&column_ref.name) {
120 debug!(
121 "Resolved unqualified column '{}' at index {}",
122 column_ref.name, idx
123 );
124 return Ok(idx);
125 }
126
127 if column_ref.name.contains('.') {
129 if let Some(idx) = table.find_column_by_qualified_name(&column_ref.name) {
130 debug!(
131 "Resolved '{}' as qualified column at index {}",
132 column_ref.name, idx
133 );
134 return Ok(idx);
135 }
136 }
137
138 let suggestion = self.find_similar_column(table, &column_ref.name);
140 match suggestion {
141 Some(similar) => Err(anyhow!(
142 "Column '{}' not found. Did you mean '{}'?",
143 column_ref.name,
144 similar
145 )),
146 None => Err(anyhow!("Column '{}' not found", column_ref.name)),
147 }
148 }
149 }
150
151 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
153 let columns = table.column_names();
154 let mut best_match: Option<(String, usize)> = None;
155
156 for col in columns {
157 let distance = edit_distance(name, &col);
158 if distance <= 2 {
159 match best_match {
161 Some((_, best_dist)) if distance < best_dist => {
162 best_match = Some((col.clone(), distance));
163 }
164 None => {
165 best_match = Some((col.clone(), distance));
166 }
167 _ => {}
168 }
169 }
170 }
171
172 best_match.map(|(name, _)| name)
173 }
174}
175
176impl Default for ExecutionContext {
177 fn default() -> Self {
178 Self::new()
179 }
180}
181
182fn edit_distance(a: &str, b: &str) -> usize {
184 let len_a = a.chars().count();
185 let len_b = b.chars().count();
186
187 if len_a == 0 {
188 return len_b;
189 }
190 if len_b == 0 {
191 return len_a;
192 }
193
194 let mut matrix = vec![vec![0; len_b + 1]; len_a + 1];
195
196 for i in 0..=len_a {
197 matrix[i][0] = i;
198 }
199 for j in 0..=len_b {
200 matrix[0][j] = j;
201 }
202
203 let a_chars: Vec<char> = a.chars().collect();
204 let b_chars: Vec<char> = b.chars().collect();
205
206 for i in 1..=len_a {
207 for j in 1..=len_b {
208 let cost = if a_chars[i - 1] == b_chars[j - 1] {
209 0
210 } else {
211 1
212 };
213 matrix[i][j] = min(
214 min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1),
215 matrix[i - 1][j - 1] + cost,
216 );
217 }
218 }
219
220 matrix[len_a][len_b]
221}
222
223#[derive(Clone)]
225pub struct QueryEngine {
226 case_insensitive: bool,
227 date_notation: String,
228 _behavior_config: Option<BehaviorConfig>,
229}
230
231impl Default for QueryEngine {
232 fn default() -> Self {
233 Self::new()
234 }
235}
236
237impl QueryEngine {
238 #[must_use]
239 pub fn new() -> Self {
240 Self {
241 case_insensitive: false,
242 date_notation: get_date_notation(),
243 _behavior_config: None,
244 }
245 }
246
247 #[must_use]
248 pub fn with_behavior_config(config: BehaviorConfig) -> Self {
249 let case_insensitive = config.case_insensitive_default;
250 let date_notation = get_date_notation();
252 Self {
253 case_insensitive,
254 date_notation,
255 _behavior_config: Some(config),
256 }
257 }
258
259 #[must_use]
260 pub fn with_date_notation(_date_notation: String) -> Self {
261 Self {
262 case_insensitive: false,
263 date_notation: get_date_notation(), _behavior_config: None,
265 }
266 }
267
268 #[must_use]
269 pub fn with_case_insensitive(case_insensitive: bool) -> Self {
270 Self {
271 case_insensitive,
272 date_notation: get_date_notation(),
273 _behavior_config: None,
274 }
275 }
276
277 #[must_use]
278 pub fn with_case_insensitive_and_date_notation(
279 case_insensitive: bool,
280 _date_notation: String, ) -> Self {
282 Self {
283 case_insensitive,
284 date_notation: get_date_notation(), _behavior_config: None,
286 }
287 }
288
289 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
291 let columns = table.column_names();
292 let mut best_match: Option<(String, usize)> = None;
293
294 for col in columns {
295 let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
296 let max_distance = if name.len() > 10 { 3 } else { 2 };
299 if distance <= max_distance {
300 match &best_match {
301 None => best_match = Some((col, distance)),
302 Some((_, best_dist)) if distance < *best_dist => {
303 best_match = Some((col, distance));
304 }
305 _ => {}
306 }
307 }
308 }
309
310 best_match.map(|(name, _)| name)
311 }
312
313 fn edit_distance(&self, s1: &str, s2: &str) -> usize {
315 let len1 = s1.len();
316 let len2 = s2.len();
317 let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
318
319 for i in 0..=len1 {
320 matrix[i][0] = i;
321 }
322 for j in 0..=len2 {
323 matrix[0][j] = j;
324 }
325
326 for (i, c1) in s1.chars().enumerate() {
327 for (j, c2) in s2.chars().enumerate() {
328 let cost = usize::from(c1 != c2);
329 matrix[i + 1][j + 1] = std::cmp::min(
330 matrix[i][j + 1] + 1, std::cmp::min(
332 matrix[i + 1][j] + 1, matrix[i][j] + cost, ),
335 );
336 }
337 }
338
339 matrix[len1][len2]
340 }
341
342 fn contains_unnest(expr: &SqlExpression) -> bool {
344 match expr {
345 SqlExpression::Unnest { .. } => true,
347 SqlExpression::FunctionCall { name, args, .. } => {
348 if name.to_uppercase() == "UNNEST" {
349 return true;
350 }
351 args.iter().any(Self::contains_unnest)
353 }
354 SqlExpression::BinaryOp { left, right, .. } => {
355 Self::contains_unnest(left) || Self::contains_unnest(right)
356 }
357 SqlExpression::Not { expr } => Self::contains_unnest(expr),
358 SqlExpression::CaseExpression {
359 when_branches,
360 else_branch,
361 } => {
362 when_branches.iter().any(|branch| {
363 Self::contains_unnest(&branch.condition)
364 || Self::contains_unnest(&branch.result)
365 }) || else_branch
366 .as_ref()
367 .map_or(false, |e| Self::contains_unnest(e))
368 }
369 SqlExpression::SimpleCaseExpression {
370 expr,
371 when_branches,
372 else_branch,
373 } => {
374 Self::contains_unnest(expr)
375 || when_branches.iter().any(|branch| {
376 Self::contains_unnest(&branch.value)
377 || Self::contains_unnest(&branch.result)
378 })
379 || else_branch
380 .as_ref()
381 .map_or(false, |e| Self::contains_unnest(e))
382 }
383 SqlExpression::InList { expr, values } => {
384 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
385 }
386 SqlExpression::NotInList { expr, values } => {
387 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
388 }
389 SqlExpression::Between { expr, lower, upper } => {
390 Self::contains_unnest(expr)
391 || Self::contains_unnest(lower)
392 || Self::contains_unnest(upper)
393 }
394 SqlExpression::InSubquery { expr, .. } => Self::contains_unnest(expr),
395 SqlExpression::NotInSubquery { expr, .. } => Self::contains_unnest(expr),
396 SqlExpression::ScalarSubquery { .. } => false, SqlExpression::WindowFunction { args, .. } => args.iter().any(Self::contains_unnest),
398 SqlExpression::MethodCall { args, .. } => args.iter().any(Self::contains_unnest),
399 SqlExpression::ChainedMethodCall { base, args, .. } => {
400 Self::contains_unnest(base) || args.iter().any(Self::contains_unnest)
401 }
402 _ => false,
403 }
404 }
405
406 fn collect_window_specs(expr: &SqlExpression, specs: &mut Vec<WindowSpec>) {
408 match expr {
409 SqlExpression::WindowFunction {
410 window_spec, args, ..
411 } => {
412 specs.push(window_spec.clone());
414 for arg in args {
416 Self::collect_window_specs(arg, specs);
417 }
418 }
419 SqlExpression::BinaryOp { left, right, .. } => {
420 Self::collect_window_specs(left, specs);
421 Self::collect_window_specs(right, specs);
422 }
423 SqlExpression::Not { expr } => {
424 Self::collect_window_specs(expr, specs);
425 }
426 SqlExpression::FunctionCall { args, .. } => {
427 for arg in args {
428 Self::collect_window_specs(arg, specs);
429 }
430 }
431 SqlExpression::CaseExpression {
432 when_branches,
433 else_branch,
434 } => {
435 for branch in when_branches {
436 Self::collect_window_specs(&branch.condition, specs);
437 Self::collect_window_specs(&branch.result, specs);
438 }
439 if let Some(else_expr) = else_branch {
440 Self::collect_window_specs(else_expr, specs);
441 }
442 }
443 SqlExpression::SimpleCaseExpression {
444 expr,
445 when_branches,
446 else_branch,
447 } => {
448 Self::collect_window_specs(expr, specs);
449 for branch in when_branches {
450 Self::collect_window_specs(&branch.value, specs);
451 Self::collect_window_specs(&branch.result, specs);
452 }
453 if let Some(else_expr) = else_branch {
454 Self::collect_window_specs(else_expr, specs);
455 }
456 }
457 SqlExpression::InList { expr, values, .. } => {
458 Self::collect_window_specs(expr, specs);
459 for item in values {
460 Self::collect_window_specs(item, specs);
461 }
462 }
463 SqlExpression::ChainedMethodCall { base, args, .. } => {
464 Self::collect_window_specs(base, specs);
465 for arg in args {
466 Self::collect_window_specs(arg, specs);
467 }
468 }
469 SqlExpression::Column(_)
471 | SqlExpression::NumberLiteral(_)
472 | SqlExpression::StringLiteral(_)
473 | SqlExpression::BooleanLiteral(_)
474 | SqlExpression::Null
475 | SqlExpression::DateTimeToday { .. }
476 | SqlExpression::DateTimeConstructor { .. }
477 | SqlExpression::MethodCall { .. } => {}
478 _ => {}
480 }
481 }
482
483 fn contains_window_function(expr: &SqlExpression) -> bool {
485 match expr {
486 SqlExpression::WindowFunction { .. } => true,
487 SqlExpression::BinaryOp { left, right, .. } => {
488 Self::contains_window_function(left) || Self::contains_window_function(right)
489 }
490 SqlExpression::Not { expr } => Self::contains_window_function(expr),
491 SqlExpression::FunctionCall { args, .. } => {
492 args.iter().any(Self::contains_window_function)
493 }
494 SqlExpression::CaseExpression {
495 when_branches,
496 else_branch,
497 } => {
498 when_branches.iter().any(|branch| {
499 Self::contains_window_function(&branch.condition)
500 || Self::contains_window_function(&branch.result)
501 }) || else_branch
502 .as_ref()
503 .map_or(false, |e| Self::contains_window_function(e))
504 }
505 SqlExpression::SimpleCaseExpression {
506 expr,
507 when_branches,
508 else_branch,
509 } => {
510 Self::contains_window_function(expr)
511 || when_branches.iter().any(|branch| {
512 Self::contains_window_function(&branch.value)
513 || Self::contains_window_function(&branch.result)
514 })
515 || else_branch
516 .as_ref()
517 .map_or(false, |e| Self::contains_window_function(e))
518 }
519 SqlExpression::InList { expr, values } => {
520 Self::contains_window_function(expr)
521 || values.iter().any(Self::contains_window_function)
522 }
523 SqlExpression::NotInList { expr, values } => {
524 Self::contains_window_function(expr)
525 || values.iter().any(Self::contains_window_function)
526 }
527 SqlExpression::Between { expr, lower, upper } => {
528 Self::contains_window_function(expr)
529 || Self::contains_window_function(lower)
530 || Self::contains_window_function(upper)
531 }
532 SqlExpression::InSubquery { expr, .. } => Self::contains_window_function(expr),
533 SqlExpression::NotInSubquery { expr, .. } => Self::contains_window_function(expr),
534 SqlExpression::MethodCall { args, .. } => {
535 args.iter().any(Self::contains_window_function)
536 }
537 SqlExpression::ChainedMethodCall { base, args, .. } => {
538 Self::contains_window_function(base)
539 || args.iter().any(Self::contains_window_function)
540 }
541 _ => false,
542 }
543 }
544
545 fn extract_window_specs(
547 items: &[SelectItem],
548 ) -> Vec<crate::data::batch_window_evaluator::WindowFunctionSpec> {
549 let mut specs = Vec::new();
550 for (idx, item) in items.iter().enumerate() {
551 if let SelectItem::Expression { expr, .. } = item {
552 Self::collect_window_function_specs(expr, idx, &mut specs);
553 }
554 }
555 specs
556 }
557
558 fn collect_window_function_specs(
560 expr: &SqlExpression,
561 output_column_index: usize,
562 specs: &mut Vec<crate::data::batch_window_evaluator::WindowFunctionSpec>,
563 ) {
564 match expr {
565 SqlExpression::WindowFunction {
566 name,
567 args,
568 window_spec,
569 } => {
570 specs.push(crate::data::batch_window_evaluator::WindowFunctionSpec {
571 spec: window_spec.clone(),
572 function_name: name.clone(),
573 args: args.clone(),
574 output_column_index,
575 });
576 }
577 SqlExpression::BinaryOp { left, right, .. } => {
578 Self::collect_window_function_specs(left, output_column_index, specs);
579 Self::collect_window_function_specs(right, output_column_index, specs);
580 }
581 SqlExpression::Not { expr } => {
582 Self::collect_window_function_specs(expr, output_column_index, specs);
583 }
584 SqlExpression::FunctionCall { args, .. } => {
585 for arg in args {
586 Self::collect_window_function_specs(arg, output_column_index, specs);
587 }
588 }
589 SqlExpression::CaseExpression {
590 when_branches,
591 else_branch,
592 } => {
593 for branch in when_branches {
594 Self::collect_window_function_specs(
595 &branch.condition,
596 output_column_index,
597 specs,
598 );
599 Self::collect_window_function_specs(&branch.result, output_column_index, specs);
600 }
601 if let Some(e) = else_branch {
602 Self::collect_window_function_specs(e, output_column_index, specs);
603 }
604 }
605 SqlExpression::SimpleCaseExpression {
606 expr,
607 when_branches,
608 else_branch,
609 } => {
610 Self::collect_window_function_specs(expr, output_column_index, specs);
611 for branch in when_branches {
612 Self::collect_window_function_specs(&branch.value, output_column_index, specs);
613 Self::collect_window_function_specs(&branch.result, output_column_index, specs);
614 }
615 if let Some(e) = else_branch {
616 Self::collect_window_function_specs(e, output_column_index, specs);
617 }
618 }
619 SqlExpression::InList { expr, values } => {
620 Self::collect_window_function_specs(expr, output_column_index, specs);
621 for val in values {
622 Self::collect_window_function_specs(val, output_column_index, specs);
623 }
624 }
625 SqlExpression::NotInList { expr, values } => {
626 Self::collect_window_function_specs(expr, output_column_index, specs);
627 for val in values {
628 Self::collect_window_function_specs(val, output_column_index, specs);
629 }
630 }
631 SqlExpression::Between { expr, lower, upper } => {
632 Self::collect_window_function_specs(expr, output_column_index, specs);
633 Self::collect_window_function_specs(lower, output_column_index, specs);
634 Self::collect_window_function_specs(upper, output_column_index, specs);
635 }
636 SqlExpression::InSubquery { expr, .. } => {
637 Self::collect_window_function_specs(expr, output_column_index, specs);
638 }
639 SqlExpression::NotInSubquery { expr, .. } => {
640 Self::collect_window_function_specs(expr, output_column_index, specs);
641 }
642 SqlExpression::MethodCall { args, .. } => {
643 for arg in args {
644 Self::collect_window_function_specs(arg, output_column_index, specs);
645 }
646 }
647 SqlExpression::ChainedMethodCall { base, args, .. } => {
648 Self::collect_window_function_specs(base, output_column_index, specs);
649 for arg in args {
650 Self::collect_window_function_specs(arg, output_column_index, specs);
651 }
652 }
653 _ => {} }
655 }
656
657 pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
659 let (view, _plan) = self.execute_with_plan(table, sql)?;
660 Ok(view)
661 }
662
663 pub fn execute_with_temp_tables(
665 &self,
666 table: Arc<DataTable>,
667 sql: &str,
668 temp_tables: Option<&TempTableRegistry>,
669 ) -> Result<DataView> {
670 let (view, _plan) = self.execute_with_plan_and_temp_tables(table, sql, temp_tables)?;
671 Ok(view)
672 }
673
674 pub fn execute_statement(
676 &self,
677 table: Arc<DataTable>,
678 statement: SelectStatement,
679 ) -> Result<DataView> {
680 self.execute_statement_with_temp_tables(table, statement, None)
681 }
682
683 pub fn execute_statement_with_temp_tables(
685 &self,
686 table: Arc<DataTable>,
687 statement: SelectStatement,
688 temp_tables: Option<&TempTableRegistry>,
689 ) -> Result<DataView> {
690 let mut cte_context = HashMap::new();
692
693 if let Some(temp_registry) = temp_tables {
695 for table_name in temp_registry.list_tables() {
696 if let Some(temp_table) = temp_registry.get(&table_name) {
697 debug!("Adding temp table {} to CTE context", table_name);
698 let view = DataView::new(temp_table);
699 cte_context.insert(table_name, Arc::new(view));
700 }
701 }
702 }
703
704 for cte in &statement.ctes {
705 debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
706 let cte_result = match &cte.cte_type {
708 CTEType::Standard(query) => {
709 let view = self.build_view_with_context(
711 table.clone(),
712 query.clone(),
713 &mut cte_context,
714 )?;
715
716 let mut materialized = self.materialize_view(view)?;
718
719 for column in materialized.columns_mut() {
721 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
722 column.source_table = Some(cte.name.clone());
723 }
724
725 DataView::new(Arc::new(materialized))
726 }
727 CTEType::Web(web_spec) => {
728 use crate::web::http_fetcher::WebDataFetcher;
730
731 let fetcher = WebDataFetcher::new()?;
732 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
734
735 for column in data_table.columns_mut() {
737 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
738 column.source_table = Some(cte.name.clone());
739 }
740
741 DataView::new(Arc::new(data_table))
743 }
744 CTEType::File(_file_spec) => {
745 return Err(anyhow!(
747 "FILE CTE execution not yet implemented (parser-only milestone)"
748 ));
749 }
750 };
751 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
753 debug!(
754 "QueryEngine: CTE '{}' pre-processed, stored in context",
755 cte.name
756 );
757 }
758
759 let mut subquery_executor =
761 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
762 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
763
764 self.build_view_with_context(table, processed_statement, &mut cte_context)
766 }
767
768 pub fn execute_statement_with_cte_context(
770 &self,
771 table: Arc<DataTable>,
772 statement: SelectStatement,
773 cte_context: &HashMap<String, Arc<DataView>>,
774 ) -> Result<DataView> {
775 let mut local_context = cte_context.clone();
777
778 for cte in &statement.ctes {
780 debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
781 let cte_result = match &cte.cte_type {
782 CTEType::Standard(query) => {
783 let view = self.build_view_with_context(
784 table.clone(),
785 query.clone(),
786 &mut local_context,
787 )?;
788
789 let mut materialized = self.materialize_view(view)?;
791
792 for column in materialized.columns_mut() {
794 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
795 column.source_table = Some(cte.name.clone());
796 }
797
798 DataView::new(Arc::new(materialized))
799 }
800 CTEType::Web(web_spec) => {
801 use crate::web::http_fetcher::WebDataFetcher;
803
804 let fetcher = WebDataFetcher::new()?;
805 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
807
808 for column in data_table.columns_mut() {
810 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
811 column.source_table = Some(cte.name.clone());
812 }
813
814 DataView::new(Arc::new(data_table))
816 }
817 CTEType::File(_file_spec) => {
818 return Err(anyhow!(
820 "FILE CTE execution not yet implemented (parser-only milestone)"
821 ));
822 }
823 };
824 local_context.insert(cte.name.clone(), Arc::new(cte_result));
825 }
826
827 let mut subquery_executor =
829 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
830 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
831
832 self.build_view_with_context(table, processed_statement, &mut local_context)
834 }
835
836 pub fn execute_with_plan(
838 &self,
839 table: Arc<DataTable>,
840 sql: &str,
841 ) -> Result<(DataView, ExecutionPlan)> {
842 self.execute_with_plan_and_temp_tables(table, sql, None)
843 }
844
845 pub fn execute_with_plan_and_temp_tables(
847 &self,
848 table: Arc<DataTable>,
849 sql: &str,
850 temp_tables: Option<&TempTableRegistry>,
851 ) -> Result<(DataView, ExecutionPlan)> {
852 let mut plan_builder = ExecutionPlanBuilder::new();
853 let start_time = Instant::now();
854
855 plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
857 plan_builder.add_detail(format!("Query: {}", sql));
858 let mut parser = Parser::new(sql);
859 let statement = parser
860 .parse()
861 .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
862 plan_builder.add_detail(format!("Parsed successfully"));
863 if let Some(ref from_source) = statement.from_source {
864 match from_source {
865 TableSource::Table(name) => {
866 plan_builder.add_detail(format!("FROM: {}", name));
867 }
868 TableSource::DerivedTable { alias, .. } => {
869 plan_builder.add_detail(format!("FROM: derived table (alias: {})", alias));
870 }
871 TableSource::Pivot { .. } => {
872 plan_builder.add_detail("FROM: PIVOT".to_string());
873 }
874 }
875 }
876 if statement.where_clause.is_some() {
877 plan_builder.add_detail("WHERE clause present".to_string());
878 }
879 plan_builder.end_step();
880
881 let mut cte_context = HashMap::new();
883
884 if let Some(temp_registry) = temp_tables {
886 for table_name in temp_registry.list_tables() {
887 if let Some(temp_table) = temp_registry.get(&table_name) {
888 debug!("Adding temp table {} to CTE context", table_name);
889 let view = DataView::new(temp_table);
890 cte_context.insert(table_name, Arc::new(view));
891 }
892 }
893 }
894
895 if !statement.ctes.is_empty() {
896 plan_builder.begin_step(
897 StepType::CTE,
898 format!("Process {} CTEs", statement.ctes.len()),
899 );
900
901 for cte in &statement.ctes {
902 let cte_start = Instant::now();
903 plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
904
905 let cte_result = match &cte.cte_type {
906 CTEType::Standard(query) => {
907 if let Some(ref from_source) = query.from_source {
909 match from_source {
910 TableSource::Table(name) => {
911 plan_builder.add_detail(format!("Source: {}", name));
912 }
913 TableSource::DerivedTable { alias, .. } => {
914 plan_builder
915 .add_detail(format!("Source: derived table ({})", alias));
916 }
917 TableSource::Pivot { .. } => {
918 plan_builder.add_detail("Source: PIVOT".to_string());
919 }
920 }
921 }
922 if query.where_clause.is_some() {
923 plan_builder.add_detail("Has WHERE clause".to_string());
924 }
925 if query.group_by.is_some() {
926 plan_builder.add_detail("Has GROUP BY".to_string());
927 }
928
929 debug!(
930 "QueryEngine: Processing CTE '{}' with existing context: {:?}",
931 cte.name,
932 cte_context.keys().collect::<Vec<_>>()
933 );
934
935 let mut subquery_executor = SubqueryExecutor::with_cte_context(
938 self.clone(),
939 table.clone(),
940 cte_context.clone(),
941 );
942 let processed_query = subquery_executor.execute_subqueries(query)?;
943
944 let view = self.build_view_with_context(
945 table.clone(),
946 processed_query,
947 &mut cte_context,
948 )?;
949
950 let mut materialized = self.materialize_view(view)?;
952
953 for column in materialized.columns_mut() {
955 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
956 column.source_table = Some(cte.name.clone());
957 }
958
959 DataView::new(Arc::new(materialized))
960 }
961 CTEType::Web(web_spec) => {
962 plan_builder.add_detail(format!("URL: {}", web_spec.url));
963 if let Some(format) = &web_spec.format {
964 plan_builder.add_detail(format!("Format: {:?}", format));
965 }
966 if let Some(cache) = web_spec.cache_seconds {
967 plan_builder.add_detail(format!("Cache: {} seconds", cache));
968 }
969
970 use crate::web::http_fetcher::WebDataFetcher;
972
973 let fetcher = WebDataFetcher::new()?;
974 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
976
977 for column in data_table.columns_mut() {
979 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
980 column.source_table = Some(cte.name.clone());
981 }
982
983 DataView::new(Arc::new(data_table))
985 }
986 CTEType::File(file_spec) => {
987 plan_builder.add_detail(format!("PATH: {}", file_spec.path));
988 if file_spec.recursive {
989 plan_builder.add_detail("RECURSIVE".to_string());
990 }
991 if let Some(ref g) = file_spec.glob {
992 plan_builder.add_detail(format!("GLOB: {}", g));
993 }
994 if let Some(d) = file_spec.max_depth {
995 plan_builder.add_detail(format!("MAX_DEPTH: {}", d));
996 }
997 return Err(anyhow!(
999 "FILE CTE execution not yet implemented (parser-only milestone)"
1000 ));
1001 }
1002 };
1003
1004 plan_builder.set_rows_out(cte_result.row_count());
1006 plan_builder.add_detail(format!(
1007 "Result: {} rows, {} columns",
1008 cte_result.row_count(),
1009 cte_result.column_count()
1010 ));
1011 plan_builder.add_detail(format!(
1012 "Execution time: {:.3}ms",
1013 cte_start.elapsed().as_secs_f64() * 1000.0
1014 ));
1015
1016 debug!(
1017 "QueryEngine: Storing CTE '{}' in context with {} rows",
1018 cte.name,
1019 cte_result.row_count()
1020 );
1021 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1022 plan_builder.end_step();
1023 }
1024
1025 plan_builder.add_detail(format!(
1026 "All {} CTEs cached in context",
1027 statement.ctes.len()
1028 ));
1029 plan_builder.end_step();
1030 }
1031
1032 plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
1034 let mut subquery_executor =
1035 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
1036
1037 let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
1039 format!("{:?}", w).contains("Subquery")
1041 });
1042
1043 if has_subqueries {
1044 plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
1045 }
1046
1047 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
1048
1049 if has_subqueries {
1050 plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
1051 } else {
1052 plan_builder.add_detail("No subqueries to process".to_string());
1053 }
1054
1055 plan_builder.end_step();
1056 let result = self.build_view_with_context_and_plan(
1057 table,
1058 processed_statement,
1059 &mut cte_context,
1060 &mut plan_builder,
1061 )?;
1062
1063 let total_duration = start_time.elapsed();
1064 info!(
1065 "Query execution complete: total={:?}, rows={}",
1066 total_duration,
1067 result.row_count()
1068 );
1069
1070 let plan = plan_builder.build();
1071 Ok((result, plan))
1072 }
1073
1074 fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
1076 let mut cte_context = HashMap::new();
1077 self.build_view_with_context(table, statement, &mut cte_context)
1078 }
1079
1080 fn build_view_with_context(
1082 &self,
1083 table: Arc<DataTable>,
1084 statement: SelectStatement,
1085 cte_context: &mut HashMap<String, Arc<DataView>>,
1086 ) -> Result<DataView> {
1087 let mut dummy_plan = ExecutionPlanBuilder::new();
1088 let mut exec_context = ExecutionContext::new();
1089 self.build_view_with_context_and_plan_and_exec(
1090 table,
1091 statement,
1092 cte_context,
1093 &mut dummy_plan,
1094 &mut exec_context,
1095 )
1096 }
1097
1098 fn build_view_with_context_and_plan(
1100 &self,
1101 table: Arc<DataTable>,
1102 statement: SelectStatement,
1103 cte_context: &mut HashMap<String, Arc<DataView>>,
1104 plan: &mut ExecutionPlanBuilder,
1105 ) -> Result<DataView> {
1106 let mut exec_context = ExecutionContext::new();
1107 self.build_view_with_context_and_plan_and_exec(
1108 table,
1109 statement,
1110 cte_context,
1111 plan,
1112 &mut exec_context,
1113 )
1114 }
1115
1116 fn build_view_with_context_and_plan_and_exec(
1118 &self,
1119 table: Arc<DataTable>,
1120 statement: SelectStatement,
1121 cte_context: &mut HashMap<String, Arc<DataView>>,
1122 plan: &mut ExecutionPlanBuilder,
1123 exec_context: &mut ExecutionContext,
1124 ) -> Result<DataView> {
1125 for cte in &statement.ctes {
1127 if cte_context.contains_key(&cte.name) {
1129 debug!(
1130 "QueryEngine: CTE '{}' already in context, skipping",
1131 cte.name
1132 );
1133 continue;
1134 }
1135
1136 debug!("QueryEngine: Processing CTE '{}'...", cte.name);
1137 debug!(
1138 "QueryEngine: Available CTEs for '{}': {:?}",
1139 cte.name,
1140 cte_context.keys().collect::<Vec<_>>()
1141 );
1142
1143 let cte_result = match &cte.cte_type {
1145 CTEType::Standard(query) => {
1146 let view =
1147 self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
1148
1149 let mut materialized = self.materialize_view(view)?;
1151
1152 for column in materialized.columns_mut() {
1154 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1155 column.source_table = Some(cte.name.clone());
1156 }
1157
1158 DataView::new(Arc::new(materialized))
1159 }
1160 CTEType::Web(_web_spec) => {
1161 return Err(anyhow!(
1163 "Web CTEs should be processed in execute_select method"
1164 ));
1165 }
1166 CTEType::File(_file_spec) => {
1167 return Err(anyhow!(
1169 "FILE CTEs should be processed in execute_select method"
1170 ));
1171 }
1172 };
1173
1174 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1176 debug!(
1177 "QueryEngine: CTE '{}' processed, stored in context",
1178 cte.name
1179 );
1180 }
1181
1182 let source_table = if let Some(ref from_source) = statement.from_source {
1184 match from_source {
1185 TableSource::Table(table_name) => {
1186 if let Some(cte_view) = cte_context.get(table_name) {
1188 debug!("QueryEngine: Using CTE '{}' as source table", table_name);
1189 let mut materialized = self.materialize_view((**cte_view).clone())?;
1191
1192 #[allow(deprecated)]
1194 if let Some(ref alias) = statement.from_alias {
1195 debug!(
1196 "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1197 alias, table_name
1198 );
1199 for column in materialized.columns_mut() {
1200 if let Some(ref qualified_name) = column.qualified_name {
1202 if qualified_name.starts_with(&format!("{}.", table_name)) {
1203 column.qualified_name = Some(qualified_name.replace(
1204 &format!("{}.", table_name),
1205 &format!("{}.", alias),
1206 ));
1207 }
1208 }
1209 if column.source_table.as_ref() == Some(table_name) {
1211 column.source_table = Some(alias.clone());
1212 }
1213 }
1214 }
1215
1216 Arc::new(materialized)
1217 } else {
1218 table.clone()
1220 }
1221 }
1222 TableSource::DerivedTable { query, alias } => {
1223 debug!(
1225 "QueryEngine: Processing FROM derived table (alias: {})",
1226 alias
1227 );
1228 let subquery_result =
1229 self.build_view_with_context(table.clone(), *query.clone(), cte_context)?;
1230
1231 let mut materialized = self.materialize_view(subquery_result)?;
1234
1235 for column in materialized.columns_mut() {
1239 column.source_table = Some(alias.clone());
1240 }
1241
1242 Arc::new(materialized)
1243 }
1244 TableSource::Pivot { .. } => {
1245 return Err(anyhow!(
1247 "PIVOT in FROM clause should have been expanded by preprocessing pipeline"
1248 ));
1249 }
1250 }
1251 } else {
1252 #[allow(deprecated)]
1254 if let Some(ref table_func) = statement.from_function {
1255 debug!("QueryEngine: Processing table function (deprecated field)...");
1257 match table_func {
1258 TableFunction::Generator { name, args } => {
1259 use crate::sql::generators::GeneratorRegistry;
1261
1262 let registry = GeneratorRegistry::new();
1264
1265 if let Some(generator) = registry.get(name) {
1266 let mut evaluator = ArithmeticEvaluator::with_date_notation(
1268 &table,
1269 self.date_notation.clone(),
1270 );
1271 let dummy_row = 0;
1272
1273 let mut evaluated_args = Vec::new();
1274 for arg in args {
1275 evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
1276 }
1277
1278 generator.generate(evaluated_args)?
1280 } else {
1281 return Err(anyhow!("Unknown generator function: {}", name));
1282 }
1283 }
1284 }
1285 } else {
1286 #[allow(deprecated)]
1287 if let Some(ref subquery) = statement.from_subquery {
1288 debug!("QueryEngine: Processing FROM subquery (deprecated field)...");
1290 let subquery_result = self.build_view_with_context(
1291 table.clone(),
1292 *subquery.clone(),
1293 cte_context,
1294 )?;
1295
1296 let materialized = self.materialize_view(subquery_result)?;
1299 Arc::new(materialized)
1300 } else {
1301 #[allow(deprecated)]
1302 if let Some(ref table_name) = statement.from_table {
1303 if let Some(cte_view) = cte_context.get(table_name) {
1305 debug!(
1306 "QueryEngine: Using CTE '{}' as source table (deprecated field)",
1307 table_name
1308 );
1309 let mut materialized = self.materialize_view((**cte_view).clone())?;
1311
1312 #[allow(deprecated)]
1314 if let Some(ref alias) = statement.from_alias {
1315 debug!(
1316 "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1317 alias, table_name
1318 );
1319 for column in materialized.columns_mut() {
1320 if let Some(ref qualified_name) = column.qualified_name {
1322 if qualified_name.starts_with(&format!("{}.", table_name)) {
1323 column.qualified_name = Some(qualified_name.replace(
1324 &format!("{}.", table_name),
1325 &format!("{}.", alias),
1326 ));
1327 }
1328 }
1329 if column.source_table.as_ref() == Some(table_name) {
1331 column.source_table = Some(alias.clone());
1332 }
1333 }
1334 }
1335
1336 Arc::new(materialized)
1337 } else {
1338 table.clone()
1340 }
1341 } else {
1342 table.clone()
1344 }
1345 }
1346 }
1347 };
1348
1349 #[allow(deprecated)]
1351 if let Some(ref alias) = statement.from_alias {
1352 #[allow(deprecated)]
1353 if let Some(ref table_name) = statement.from_table {
1354 exec_context.register_alias(alias.clone(), table_name.clone());
1355 }
1356 }
1357
1358 let final_table = if !statement.joins.is_empty() {
1360 plan.begin_step(
1361 StepType::Join,
1362 format!("Process {} JOINs", statement.joins.len()),
1363 );
1364 plan.set_rows_in(source_table.row_count());
1365
1366 let join_executor = HashJoinExecutor::new(self.case_insensitive);
1367 let mut current_table = source_table;
1368
1369 for (idx, join_clause) in statement.joins.iter().enumerate() {
1370 let join_start = Instant::now();
1371 plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
1372 plan.add_detail(format!("Type: {:?}", join_clause.join_type));
1373 plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
1374 plan.add_detail(format!(
1375 "Executing {:?} JOIN on {} condition(s)",
1376 join_clause.join_type,
1377 join_clause.condition.conditions.len()
1378 ));
1379
1380 let right_table = match &join_clause.table {
1382 TableSource::Table(name) => {
1383 if let Some(cte_view) = cte_context.get(name) {
1385 let mut materialized = self.materialize_view((**cte_view).clone())?;
1386
1387 if let Some(ref alias) = join_clause.alias {
1389 debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
1390 for column in materialized.columns_mut() {
1391 if let Some(ref qualified_name) = column.qualified_name {
1393 if qualified_name.starts_with(&format!("{}.", name)) {
1394 column.qualified_name = Some(qualified_name.replace(
1395 &format!("{}.", name),
1396 &format!("{}.", alias),
1397 ));
1398 }
1399 }
1400 if column.source_table.as_ref() == Some(name) {
1402 column.source_table = Some(alias.clone());
1403 }
1404 }
1405 }
1406
1407 Arc::new(materialized)
1408 } else {
1409 return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
1412 }
1413 }
1414 TableSource::DerivedTable { query, alias: _ } => {
1415 let subquery_result = self.build_view_with_context(
1417 table.clone(),
1418 *query.clone(),
1419 cte_context,
1420 )?;
1421 let materialized = self.materialize_view(subquery_result)?;
1422 Arc::new(materialized)
1423 }
1424 TableSource::Pivot { .. } => {
1425 return Err(anyhow!("PIVOT in JOIN clause is not yet supported"));
1427 }
1428 };
1429
1430 let joined = join_executor.execute_join(
1432 current_table.clone(),
1433 join_clause,
1434 right_table.clone(),
1435 )?;
1436
1437 plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
1438 plan.set_rows_out(joined.row_count());
1439 plan.add_detail(format!("Result: {} rows", joined.row_count()));
1440 plan.add_detail(format!(
1441 "Join time: {:.3}ms",
1442 join_start.elapsed().as_secs_f64() * 1000.0
1443 ));
1444 plan.end_step();
1445
1446 current_table = Arc::new(joined);
1447 }
1448
1449 plan.set_rows_out(current_table.row_count());
1450 plan.add_detail(format!(
1451 "Final result after all joins: {} rows",
1452 current_table.row_count()
1453 ));
1454 plan.end_step();
1455 current_table
1456 } else {
1457 source_table
1458 };
1459
1460 self.build_view_internal_with_plan_and_exec(
1462 final_table,
1463 statement,
1464 plan,
1465 Some(exec_context),
1466 )
1467 }
1468
1469 pub fn materialize_view(&self, view: DataView) -> Result<DataTable> {
1471 let source = view.source();
1472 let mut result_table = DataTable::new("derived");
1473
1474 let visible_cols = view.visible_column_indices().to_vec();
1476
1477 for col_idx in &visible_cols {
1479 let col = &source.columns[*col_idx];
1480 let new_col = DataColumn {
1481 name: col.name.clone(),
1482 data_type: col.data_type.clone(),
1483 nullable: col.nullable,
1484 unique_values: col.unique_values,
1485 null_count: col.null_count,
1486 metadata: col.metadata.clone(),
1487 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), };
1490 result_table.add_column(new_col);
1491 }
1492
1493 for row_idx in view.visible_row_indices() {
1495 let source_row = &source.rows[*row_idx];
1496 let mut new_row = DataRow { values: Vec::new() };
1497
1498 for col_idx in &visible_cols {
1499 new_row.values.push(source_row.values[*col_idx].clone());
1500 }
1501
1502 result_table.add_row(new_row);
1503 }
1504
1505 Ok(result_table)
1506 }
1507
1508 fn build_view_internal(
1509 &self,
1510 table: Arc<DataTable>,
1511 statement: SelectStatement,
1512 ) -> Result<DataView> {
1513 let mut dummy_plan = ExecutionPlanBuilder::new();
1514 self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
1515 }
1516
1517 fn build_view_internal_with_plan(
1518 &self,
1519 table: Arc<DataTable>,
1520 statement: SelectStatement,
1521 plan: &mut ExecutionPlanBuilder,
1522 ) -> Result<DataView> {
1523 self.build_view_internal_with_plan_and_exec(table, statement, plan, None)
1524 }
1525
1526 fn build_view_internal_with_plan_and_exec(
1527 &self,
1528 table: Arc<DataTable>,
1529 statement: SelectStatement,
1530 plan: &mut ExecutionPlanBuilder,
1531 exec_context: Option<&ExecutionContext>,
1532 ) -> Result<DataView> {
1533 debug!(
1534 "QueryEngine::build_view - select_items: {:?}",
1535 statement.select_items
1536 );
1537 debug!(
1538 "QueryEngine::build_view - where_clause: {:?}",
1539 statement.where_clause
1540 );
1541
1542 let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
1544
1545 if let Some(where_clause) = &statement.where_clause {
1547 let total_rows = table.row_count();
1548 debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
1549 debug!("QueryEngine: WHERE clause = {:?}", where_clause);
1550
1551 plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
1552 plan.set_rows_in(total_rows);
1553 plan.add_detail(format!("Input: {} rows", total_rows));
1554
1555 for condition in &where_clause.conditions {
1557 plan.add_detail(format!("Condition: {:?}", condition.expr));
1558 }
1559
1560 let filter_start = Instant::now();
1561 let mut eval_context = EvaluationContext::new(self.case_insensitive);
1563
1564 let mut evaluator = if let Some(exec_ctx) = exec_context {
1566 RecursiveWhereEvaluator::with_both_contexts(&table, &mut eval_context, exec_ctx)
1568 } else {
1569 RecursiveWhereEvaluator::with_context(&table, &mut eval_context)
1570 };
1571
1572 let mut filtered_rows = Vec::new();
1574 for row_idx in visible_rows {
1575 if row_idx < 3 {
1577 debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
1578 }
1579
1580 match evaluator.evaluate(where_clause, row_idx) {
1581 Ok(result) => {
1582 if row_idx < 3 {
1583 debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
1584 }
1585 if result {
1586 filtered_rows.push(row_idx);
1587 }
1588 }
1589 Err(e) => {
1590 if row_idx < 3 {
1591 debug!(
1592 "QueryEngine: WHERE evaluation error for row {}: {}",
1593 row_idx, e
1594 );
1595 }
1596 return Err(e);
1598 }
1599 }
1600 }
1601
1602 let (compilations, cache_hits) = eval_context.get_stats();
1604 if compilations > 0 || cache_hits > 0 {
1605 debug!(
1606 "LIKE pattern cache: {} compilations, {} cache hits",
1607 compilations, cache_hits
1608 );
1609 }
1610 visible_rows = filtered_rows;
1611 let filter_duration = filter_start.elapsed();
1612 info!(
1613 "WHERE clause filtering: {} rows -> {} rows in {:?}",
1614 total_rows,
1615 visible_rows.len(),
1616 filter_duration
1617 );
1618
1619 plan.set_rows_out(visible_rows.len());
1620 plan.add_detail(format!("Output: {} rows", visible_rows.len()));
1621 plan.add_detail(format!(
1622 "Filter time: {:.3}ms",
1623 filter_duration.as_secs_f64() * 1000.0
1624 ));
1625 plan.end_step();
1626 }
1627
1628 let mut view = DataView::new(table.clone());
1630 view = view.with_rows(visible_rows);
1631
1632 if let Some(group_by_exprs) = &statement.group_by {
1634 if !group_by_exprs.is_empty() {
1635 debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
1636
1637 plan.begin_step(
1638 StepType::GroupBy,
1639 format!("GROUP BY {} expressions", group_by_exprs.len()),
1640 );
1641 plan.set_rows_in(view.row_count());
1642 plan.add_detail(format!("Input: {} rows", view.row_count()));
1643 for expr in group_by_exprs {
1644 plan.add_detail(format!("Group by: {:?}", expr));
1645 }
1646
1647 let group_start = Instant::now();
1648 view = self.apply_group_by(
1649 view,
1650 group_by_exprs,
1651 &statement.select_items,
1652 statement.having.as_ref(),
1653 plan,
1654 )?;
1655
1656 plan.set_rows_out(view.row_count());
1657 plan.add_detail(format!("Output: {} groups", view.row_count()));
1658 plan.add_detail(format!(
1659 "Overall time: {:.3}ms",
1660 group_start.elapsed().as_secs_f64() * 1000.0
1661 ));
1662 plan.end_step();
1663 }
1664 } else {
1665 if !statement.select_items.is_empty() {
1667 let has_non_star_items = statement
1669 .select_items
1670 .iter()
1671 .any(|item| !matches!(item, SelectItem::Star { .. }));
1672
1673 if has_non_star_items || statement.select_items.len() > 1 {
1677 view = self.apply_select_items(
1678 view,
1679 &statement.select_items,
1680 &statement,
1681 exec_context,
1682 plan,
1683 )?;
1684 }
1685 } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
1687 debug!("QueryEngine: Using legacy columns path");
1688 let source_table = view.source();
1691 let column_indices =
1692 self.resolve_column_indices(source_table, &statement.columns)?;
1693 view = view.with_columns(column_indices);
1694 }
1695 }
1696
1697 if statement.distinct {
1699 plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
1700 plan.set_rows_in(view.row_count());
1701 plan.add_detail(format!("Input: {} rows", view.row_count()));
1702
1703 let distinct_start = Instant::now();
1704 view = self.apply_distinct(view)?;
1705
1706 plan.set_rows_out(view.row_count());
1707 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1708 plan.add_detail(format!(
1709 "Distinct time: {:.3}ms",
1710 distinct_start.elapsed().as_secs_f64() * 1000.0
1711 ));
1712 plan.end_step();
1713 }
1714
1715 if let Some(order_by_columns) = &statement.order_by {
1717 if !order_by_columns.is_empty() {
1718 plan.begin_step(
1719 StepType::Sort,
1720 format!("ORDER BY {} columns", order_by_columns.len()),
1721 );
1722 plan.set_rows_in(view.row_count());
1723 for col in order_by_columns {
1724 let expr_str = match &col.expr {
1726 SqlExpression::Column(col_ref) => col_ref.name.clone(),
1727 _ => "expr".to_string(),
1728 };
1729 plan.add_detail(format!("{} {:?}", expr_str, col.direction));
1730 }
1731
1732 let sort_start = Instant::now();
1733 view =
1734 self.apply_multi_order_by_with_context(view, order_by_columns, exec_context)?;
1735
1736 plan.add_detail(format!(
1737 "Sort time: {:.3}ms",
1738 sort_start.elapsed().as_secs_f64() * 1000.0
1739 ));
1740 plan.end_step();
1741 }
1742 }
1743
1744 if let Some(limit) = statement.limit {
1746 let offset = statement.offset.unwrap_or(0);
1747 plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1748 plan.set_rows_in(view.row_count());
1749 if offset > 0 {
1750 plan.add_detail(format!("OFFSET: {}", offset));
1751 }
1752 view = view.with_limit(limit, offset);
1753 plan.set_rows_out(view.row_count());
1754 plan.add_detail(format!("Output: {} rows", view.row_count()));
1755 plan.end_step();
1756 }
1757
1758 if !statement.set_operations.is_empty() {
1760 plan.begin_step(
1761 StepType::SetOperation,
1762 format!("Process {} set operations", statement.set_operations.len()),
1763 );
1764 plan.set_rows_in(view.row_count());
1765
1766 let mut combined_table = self.materialize_view(view)?;
1768 let first_columns = combined_table.column_names();
1769 let first_column_count = first_columns.len();
1770
1771 let mut needs_deduplication = false;
1773
1774 for (idx, (operation, next_statement)) in statement.set_operations.iter().enumerate() {
1776 let op_start = Instant::now();
1777 plan.begin_step(
1778 StepType::SetOperation,
1779 format!("{:?} operation #{}", operation, idx + 1),
1780 );
1781
1782 let next_view = if let Some(exec_ctx) = exec_context {
1785 self.build_view_internal_with_plan_and_exec(
1786 table.clone(),
1787 *next_statement.clone(),
1788 plan,
1789 Some(exec_ctx),
1790 )?
1791 } else {
1792 self.build_view_internal_with_plan(
1793 table.clone(),
1794 *next_statement.clone(),
1795 plan,
1796 )?
1797 };
1798
1799 let next_table = self.materialize_view(next_view)?;
1801 let next_columns = next_table.column_names();
1802 let next_column_count = next_columns.len();
1803
1804 if first_column_count != next_column_count {
1806 return Err(anyhow!(
1807 "UNION queries must have the same number of columns: first query has {} columns, but query #{} has {} columns",
1808 first_column_count,
1809 idx + 2,
1810 next_column_count
1811 ));
1812 }
1813
1814 for (col_idx, (first_col, next_col)) in
1816 first_columns.iter().zip(next_columns.iter()).enumerate()
1817 {
1818 if !first_col.eq_ignore_ascii_case(next_col) {
1819 debug!(
1820 "UNION column name mismatch at position {}: '{}' vs '{}' (using first query's name)",
1821 col_idx + 1,
1822 first_col,
1823 next_col
1824 );
1825 }
1826 }
1827
1828 plan.add_detail(format!("Left: {} rows", combined_table.row_count()));
1829 plan.add_detail(format!("Right: {} rows", next_table.row_count()));
1830
1831 match operation {
1833 SetOperation::UnionAll => {
1834 for row in next_table.rows.iter() {
1836 combined_table.add_row(row.clone());
1837 }
1838 plan.add_detail(format!(
1839 "Result: {} rows (no deduplication)",
1840 combined_table.row_count()
1841 ));
1842 }
1843 SetOperation::Union => {
1844 for row in next_table.rows.iter() {
1846 combined_table.add_row(row.clone());
1847 }
1848 needs_deduplication = true;
1849 plan.add_detail(format!(
1850 "Combined: {} rows (deduplication pending)",
1851 combined_table.row_count()
1852 ));
1853 }
1854 SetOperation::Intersect => {
1855 return Err(anyhow!("INTERSECT is not yet implemented"));
1858 }
1859 SetOperation::Except => {
1860 return Err(anyhow!("EXCEPT is not yet implemented"));
1863 }
1864 }
1865
1866 plan.add_detail(format!(
1867 "Operation time: {:.3}ms",
1868 op_start.elapsed().as_secs_f64() * 1000.0
1869 ));
1870 plan.set_rows_out(combined_table.row_count());
1871 plan.end_step();
1872 }
1873
1874 plan.set_rows_out(combined_table.row_count());
1875 plan.add_detail(format!(
1876 "Combined result: {} rows after {} operations",
1877 combined_table.row_count(),
1878 statement.set_operations.len()
1879 ));
1880 plan.end_step();
1881
1882 view = DataView::new(Arc::new(combined_table));
1884
1885 if needs_deduplication {
1887 plan.begin_step(
1888 StepType::Distinct,
1889 "UNION deduplication - remove duplicate rows".to_string(),
1890 );
1891 plan.set_rows_in(view.row_count());
1892 plan.add_detail(format!("Input: {} rows", view.row_count()));
1893
1894 let distinct_start = Instant::now();
1895 view = self.apply_distinct(view)?;
1896
1897 plan.set_rows_out(view.row_count());
1898 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1899 plan.add_detail(format!(
1900 "Deduplication time: {:.3}ms",
1901 distinct_start.elapsed().as_secs_f64() * 1000.0
1902 ));
1903 plan.end_step();
1904 }
1905 }
1906
1907 Ok(view)
1908 }
1909
1910 fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1912 let mut indices = Vec::new();
1913 let table_columns = table.column_names();
1914
1915 for col_name in columns {
1916 let index = table_columns
1917 .iter()
1918 .position(|c| c.eq_ignore_ascii_case(col_name))
1919 .ok_or_else(|| {
1920 let suggestion = self.find_similar_column(table, col_name);
1921 match suggestion {
1922 Some(similar) => anyhow::anyhow!(
1923 "Column '{}' not found. Did you mean '{}'?",
1924 col_name,
1925 similar
1926 ),
1927 None => anyhow::anyhow!("Column '{}' not found", col_name),
1928 }
1929 })?;
1930 indices.push(index);
1931 }
1932
1933 Ok(indices)
1934 }
1935
1936 fn apply_select_items(
1938 &self,
1939 view: DataView,
1940 select_items: &[SelectItem],
1941 _statement: &SelectStatement,
1942 exec_context: Option<&ExecutionContext>,
1943 plan: &mut ExecutionPlanBuilder,
1944 ) -> Result<DataView> {
1945 debug!(
1946 "QueryEngine::apply_select_items - items: {:?}",
1947 select_items
1948 );
1949 debug!(
1950 "QueryEngine::apply_select_items - input view has {} rows",
1951 view.row_count()
1952 );
1953
1954 let has_window_functions = select_items.iter().any(|item| match item {
1956 SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
1957 _ => false,
1958 });
1959
1960 let window_func_count: usize = select_items
1962 .iter()
1963 .filter(|item| match item {
1964 SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
1965 _ => false,
1966 })
1967 .count();
1968
1969 let window_start = if has_window_functions {
1971 debug!(
1972 "QueryEngine::apply_select_items - detected {} window functions",
1973 window_func_count
1974 );
1975
1976 let window_specs = Self::extract_window_specs(select_items);
1978 debug!("Extracted {} window function specs", window_specs.len());
1979
1980 Some(Instant::now())
1981 } else {
1982 None
1983 };
1984
1985 let has_unnest = select_items.iter().any(|item| match item {
1987 SelectItem::Expression { expr, .. } => Self::contains_unnest(expr),
1988 _ => false,
1989 });
1990
1991 if has_unnest {
1992 debug!("QueryEngine::apply_select_items - UNNEST detected, using row expansion");
1993 return self.apply_select_with_row_expansion(view, select_items);
1994 }
1995
1996 let has_aggregates = select_items.iter().any(|item| match item {
2000 SelectItem::Expression { expr, .. } => contains_aggregate(expr),
2001 SelectItem::Column { .. } => false,
2002 SelectItem::Star { .. } => false,
2003 SelectItem::StarExclude { .. } => false,
2004 });
2005
2006 let all_aggregate_compatible = select_items.iter().all(|item| match item {
2007 SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
2008 SelectItem::Column { .. } => false, SelectItem::Star { .. } => false, SelectItem::StarExclude { .. } => false, });
2012
2013 if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
2014 debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
2017 return self.apply_aggregate_select(view, select_items);
2018 }
2019
2020 let has_computed_expressions = select_items
2022 .iter()
2023 .any(|item| matches!(item, SelectItem::Expression { .. }));
2024
2025 debug!(
2026 "QueryEngine::apply_select_items - has_computed_expressions: {}",
2027 has_computed_expressions
2028 );
2029
2030 if !has_computed_expressions {
2031 let column_indices = self.resolve_select_columns(view.source(), select_items)?;
2033 return Ok(view.with_columns(column_indices));
2034 }
2035
2036 let source_table = view.source();
2041 let visible_rows = view.visible_row_indices();
2042
2043 let mut computed_table = DataTable::new("query_result");
2046
2047 let mut expanded_items = Vec::new();
2049 for item in select_items {
2050 match item {
2051 SelectItem::Star { table_prefix, .. } => {
2052 if let Some(prefix) = table_prefix {
2053 debug!("QueryEngine::apply_select_items - expanding {}.*", prefix);
2055 for col in &source_table.columns {
2056 if Self::column_matches_table(col, prefix) {
2057 expanded_items.push(SelectItem::Column {
2058 column: ColumnRef::unquoted(col.name.clone()),
2059 leading_comments: vec![],
2060 trailing_comment: None,
2061 });
2062 }
2063 }
2064 } else {
2065 debug!("QueryEngine::apply_select_items - expanding *");
2067 for col_name in source_table.column_names() {
2068 expanded_items.push(SelectItem::Column {
2069 column: ColumnRef::unquoted(col_name.to_string()),
2070 leading_comments: vec![],
2071 trailing_comment: None,
2072 });
2073 }
2074 }
2075 }
2076 _ => expanded_items.push(item.clone()),
2077 }
2078 }
2079
2080 let mut column_name_counts: std::collections::HashMap<String, usize> =
2082 std::collections::HashMap::new();
2083
2084 for item in &expanded_items {
2085 let base_name = match item {
2086 SelectItem::Column {
2087 column: col_ref, ..
2088 } => col_ref.name.clone(),
2089 SelectItem::Expression { alias, .. } => alias.clone(),
2090 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2091 SelectItem::StarExclude { .. } => {
2092 unreachable!("StarExclude should have been expanded")
2093 }
2094 };
2095
2096 let count = column_name_counts.entry(base_name.clone()).or_insert(0);
2098 let column_name = if *count == 0 {
2099 base_name.clone()
2101 } else {
2102 format!("{base_name}_{count}")
2104 };
2105 *count += 1;
2106
2107 computed_table.add_column(DataColumn::new(&column_name));
2108 }
2109
2110 let can_use_batch = expanded_items.iter().all(|item| {
2114 match item {
2115 SelectItem::Expression { expr, .. } => {
2116 matches!(expr, SqlExpression::WindowFunction { .. })
2119 || !Self::contains_window_function(expr)
2120 }
2121 _ => true, }
2123 });
2124
2125 let use_batch_evaluation = can_use_batch
2128 && std::env::var("SQL_CLI_BATCH_WINDOW")
2129 .map(|v| v != "0" && v.to_lowercase() != "false")
2130 .unwrap_or(true);
2131
2132 let batch_window_specs = if use_batch_evaluation && has_window_functions {
2134 debug!("BATCH window function evaluation flag is enabled");
2135 let specs = Self::extract_window_specs(&expanded_items);
2137 debug!(
2138 "Extracted {} window function specs for batch evaluation",
2139 specs.len()
2140 );
2141 Some(specs)
2142 } else {
2143 None
2144 };
2145
2146 let mut evaluator =
2148 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2149
2150 if let Some(exec_ctx) = exec_context {
2152 let aliases = exec_ctx.get_aliases();
2153 if !aliases.is_empty() {
2154 debug!(
2155 "Applying {} aliases to evaluator: {:?}",
2156 aliases.len(),
2157 aliases
2158 );
2159 evaluator = evaluator.with_table_aliases(aliases);
2160 }
2161 }
2162
2163 if has_window_functions {
2166 let preload_start = Instant::now();
2167
2168 let mut window_specs = Vec::new();
2170 for item in &expanded_items {
2171 if let SelectItem::Expression { expr, .. } = item {
2172 Self::collect_window_specs(expr, &mut window_specs);
2173 }
2174 }
2175
2176 for spec in &window_specs {
2178 let _ = evaluator.get_or_create_window_context(spec);
2179 }
2180
2181 debug!(
2182 "Pre-created {} WindowContext(s) in {:.2}ms",
2183 window_specs.len(),
2184 preload_start.elapsed().as_secs_f64() * 1000.0
2185 );
2186 }
2187
2188 if let Some(window_specs) = batch_window_specs {
2190 debug!("Starting batch window function evaluation");
2191 let batch_start = Instant::now();
2192
2193 let mut batch_results: Vec<Vec<DataValue>> =
2195 vec![vec![DataValue::Null; expanded_items.len()]; visible_rows.len()];
2196
2197 let detailed_window_specs = &window_specs;
2199
2200 let mut specs_by_window: HashMap<
2202 u64,
2203 Vec<&crate::data::batch_window_evaluator::WindowFunctionSpec>,
2204 > = HashMap::new();
2205 for spec in detailed_window_specs {
2206 let hash = spec.spec.compute_hash();
2207 specs_by_window
2208 .entry(hash)
2209 .or_insert_with(Vec::new)
2210 .push(spec);
2211 }
2212
2213 for (_window_hash, specs) in specs_by_window {
2215 let context = evaluator.get_or_create_window_context(&specs[0].spec)?;
2217
2218 for spec in specs {
2220 match spec.function_name.as_str() {
2221 "LAG" => {
2222 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2224 let column_name = col_ref.name.as_str();
2225 let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2226 spec.args.get(1)
2227 {
2228 n.parse::<i64>().unwrap_or(1)
2229 } else {
2230 1 };
2232
2233 let values = context.evaluate_lag_batch(
2234 visible_rows,
2235 column_name,
2236 offset,
2237 )?;
2238
2239 for (row_idx, value) in values.into_iter().enumerate() {
2241 batch_results[row_idx][spec.output_column_index] = value;
2242 }
2243 }
2244 }
2245 "LEAD" => {
2246 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2248 let column_name = col_ref.name.as_str();
2249 let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2250 spec.args.get(1)
2251 {
2252 n.parse::<i64>().unwrap_or(1)
2253 } else {
2254 1 };
2256
2257 let values = context.evaluate_lead_batch(
2258 visible_rows,
2259 column_name,
2260 offset,
2261 )?;
2262
2263 for (row_idx, value) in values.into_iter().enumerate() {
2265 batch_results[row_idx][spec.output_column_index] = value;
2266 }
2267 }
2268 }
2269 "ROW_NUMBER" => {
2270 let values = context.evaluate_row_number_batch(visible_rows)?;
2271
2272 for (row_idx, value) in values.into_iter().enumerate() {
2274 batch_results[row_idx][spec.output_column_index] = value;
2275 }
2276 }
2277 "RANK" => {
2278 let values = context.evaluate_rank_batch(visible_rows)?;
2279
2280 for (row_idx, value) in values.into_iter().enumerate() {
2282 batch_results[row_idx][spec.output_column_index] = value;
2283 }
2284 }
2285 "DENSE_RANK" => {
2286 let values = context.evaluate_dense_rank_batch(visible_rows)?;
2287
2288 for (row_idx, value) in values.into_iter().enumerate() {
2290 batch_results[row_idx][spec.output_column_index] = value;
2291 }
2292 }
2293 "SUM" => {
2294 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2295 let column_name = col_ref.name.as_str();
2296 let values =
2297 context.evaluate_sum_batch(visible_rows, column_name)?;
2298
2299 for (row_idx, value) in values.into_iter().enumerate() {
2300 batch_results[row_idx][spec.output_column_index] = value;
2301 }
2302 }
2303 }
2304 "AVG" => {
2305 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2306 let column_name = col_ref.name.as_str();
2307 let values =
2308 context.evaluate_avg_batch(visible_rows, column_name)?;
2309
2310 for (row_idx, value) in values.into_iter().enumerate() {
2311 batch_results[row_idx][spec.output_column_index] = value;
2312 }
2313 }
2314 }
2315 "MIN" => {
2316 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2317 let column_name = col_ref.name.as_str();
2318 let values =
2319 context.evaluate_min_batch(visible_rows, column_name)?;
2320
2321 for (row_idx, value) in values.into_iter().enumerate() {
2322 batch_results[row_idx][spec.output_column_index] = value;
2323 }
2324 }
2325 }
2326 "MAX" => {
2327 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2328 let column_name = col_ref.name.as_str();
2329 let values =
2330 context.evaluate_max_batch(visible_rows, column_name)?;
2331
2332 for (row_idx, value) in values.into_iter().enumerate() {
2333 batch_results[row_idx][spec.output_column_index] = value;
2334 }
2335 }
2336 }
2337 "COUNT" => {
2338 let column_name = match spec.args.get(0) {
2340 Some(SqlExpression::Column(col_ref)) => Some(col_ref.name.as_str()),
2341 Some(SqlExpression::StringLiteral(s)) if s == "*" => None,
2342 _ => None,
2343 };
2344
2345 let values = context.evaluate_count_batch(visible_rows, column_name)?;
2346
2347 for (row_idx, value) in values.into_iter().enumerate() {
2348 batch_results[row_idx][spec.output_column_index] = value;
2349 }
2350 }
2351 "FIRST_VALUE" => {
2352 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2353 let column_name = col_ref.name.as_str();
2354 let values = context
2355 .evaluate_first_value_batch(visible_rows, column_name)?;
2356
2357 for (row_idx, value) in values.into_iter().enumerate() {
2358 batch_results[row_idx][spec.output_column_index] = value;
2359 }
2360 }
2361 }
2362 "LAST_VALUE" => {
2363 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2364 let column_name = col_ref.name.as_str();
2365 let values =
2366 context.evaluate_last_value_batch(visible_rows, column_name)?;
2367
2368 for (row_idx, value) in values.into_iter().enumerate() {
2369 batch_results[row_idx][spec.output_column_index] = value;
2370 }
2371 }
2372 }
2373 _ => {
2374 debug!(
2376 "Window function {} not supported in batch mode, using per-row",
2377 spec.function_name
2378 );
2379 }
2380 }
2381 }
2382 }
2383
2384 for (result_row_idx, &source_row_idx) in visible_rows.iter().enumerate() {
2386 for (col_idx, item) in expanded_items.iter().enumerate() {
2387 if !matches!(batch_results[result_row_idx][col_idx], DataValue::Null) {
2389 continue;
2390 }
2391
2392 let value = match item {
2393 SelectItem::Column {
2394 column: col_ref, ..
2395 } => {
2396 match evaluator
2397 .evaluate(&SqlExpression::Column(col_ref.clone()), source_row_idx)
2398 {
2399 Ok(val) => val,
2400 Err(e) => {
2401 return Err(anyhow!(
2402 "Failed to evaluate column {}: {}",
2403 col_ref.to_sql(),
2404 e
2405 ));
2406 }
2407 }
2408 }
2409 SelectItem::Expression { expr, .. } => {
2410 if matches!(expr, SqlExpression::WindowFunction { .. }) {
2413 continue;
2415 }
2416 evaluator.evaluate(&expr, source_row_idx)?
2419 }
2420 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2421 SelectItem::StarExclude { .. } => {
2422 unreachable!("StarExclude should have been expanded")
2423 }
2424 };
2425 batch_results[result_row_idx][col_idx] = value;
2426 }
2427 }
2428
2429 for row_values in batch_results {
2431 computed_table
2432 .add_row(DataRow::new(row_values))
2433 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2434 }
2435
2436 debug!(
2437 "Batch window evaluation completed in {:.3}ms",
2438 batch_start.elapsed().as_secs_f64() * 1000.0
2439 );
2440 } else {
2441 for &row_idx in visible_rows {
2443 let mut row_values = Vec::new();
2444
2445 for item in &expanded_items {
2446 let value = match item {
2447 SelectItem::Column {
2448 column: col_ref, ..
2449 } => {
2450 match evaluator
2452 .evaluate(&SqlExpression::Column(col_ref.clone()), row_idx)
2453 {
2454 Ok(val) => val,
2455 Err(e) => {
2456 return Err(anyhow!(
2457 "Failed to evaluate column {}: {}",
2458 col_ref.to_sql(),
2459 e
2460 ));
2461 }
2462 }
2463 }
2464 SelectItem::Expression { expr, .. } => {
2465 evaluator.evaluate(&expr, row_idx)?
2467 }
2468 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2469 SelectItem::StarExclude { .. } => {
2470 unreachable!("StarExclude should have been expanded")
2471 }
2472 };
2473 row_values.push(value);
2474 }
2475
2476 computed_table
2477 .add_row(DataRow::new(row_values))
2478 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2479 }
2480 }
2481
2482 if let Some(start) = window_start {
2484 let window_duration = start.elapsed();
2485 info!(
2486 "Window function evaluation took {:.2}ms for {} rows ({} window functions)",
2487 window_duration.as_secs_f64() * 1000.0,
2488 visible_rows.len(),
2489 window_func_count
2490 );
2491
2492 plan.begin_step(
2494 StepType::WindowFunction,
2495 format!("Evaluate {} window function(s)", window_func_count),
2496 );
2497 plan.set_rows_in(visible_rows.len());
2498 plan.set_rows_out(visible_rows.len());
2499 plan.add_detail(format!("Input: {} rows", visible_rows.len()));
2500 plan.add_detail(format!("{} window functions evaluated", window_func_count));
2501 plan.add_detail(format!(
2502 "Evaluation time: {:.3}ms",
2503 window_duration.as_secs_f64() * 1000.0
2504 ));
2505 plan.end_step();
2506 }
2507
2508 Ok(DataView::new(Arc::new(computed_table)))
2511 }
2512
2513 fn apply_select_with_row_expansion(
2515 &self,
2516 view: DataView,
2517 select_items: &[SelectItem],
2518 ) -> Result<DataView> {
2519 debug!("QueryEngine::apply_select_with_row_expansion - expanding rows");
2520
2521 let source_table = view.source();
2522 let visible_rows = view.visible_row_indices();
2523 let expander_registry = RowExpanderRegistry::new();
2524
2525 let mut result_table = DataTable::new("unnest_result");
2527
2528 let mut expanded_items = Vec::new();
2530 for item in select_items {
2531 match item {
2532 SelectItem::Star { table_prefix, .. } => {
2533 if let Some(prefix) = table_prefix {
2534 debug!(
2536 "QueryEngine::apply_select_with_row_expansion - expanding {}.*",
2537 prefix
2538 );
2539 for col in &source_table.columns {
2540 if Self::column_matches_table(col, prefix) {
2541 expanded_items.push(SelectItem::Column {
2542 column: ColumnRef::unquoted(col.name.clone()),
2543 leading_comments: vec![],
2544 trailing_comment: None,
2545 });
2546 }
2547 }
2548 } else {
2549 debug!("QueryEngine::apply_select_with_row_expansion - expanding *");
2551 for col_name in source_table.column_names() {
2552 expanded_items.push(SelectItem::Column {
2553 column: ColumnRef::unquoted(col_name.to_string()),
2554 leading_comments: vec![],
2555 trailing_comment: None,
2556 });
2557 }
2558 }
2559 }
2560 _ => expanded_items.push(item.clone()),
2561 }
2562 }
2563
2564 for item in &expanded_items {
2566 let column_name = match item {
2567 SelectItem::Column {
2568 column: col_ref, ..
2569 } => col_ref.name.clone(),
2570 SelectItem::Expression { alias, .. } => alias.clone(),
2571 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2572 SelectItem::StarExclude { .. } => {
2573 unreachable!("StarExclude should have been expanded")
2574 }
2575 };
2576 result_table.add_column(DataColumn::new(&column_name));
2577 }
2578
2579 let mut evaluator =
2581 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2582
2583 for &row_idx in visible_rows {
2584 let mut unnest_expansions = Vec::new();
2586 let mut unnest_indices = Vec::new();
2587
2588 for (col_idx, item) in expanded_items.iter().enumerate() {
2589 if let SelectItem::Expression { expr, .. } = item {
2590 if let Some(expansion_result) = self.try_expand_unnest(
2591 &expr,
2592 source_table,
2593 row_idx,
2594 &mut evaluator,
2595 &expander_registry,
2596 )? {
2597 unnest_expansions.push(expansion_result);
2598 unnest_indices.push(col_idx);
2599 }
2600 }
2601 }
2602
2603 let expansion_count = if unnest_expansions.is_empty() {
2605 1 } else {
2607 unnest_expansions
2608 .iter()
2609 .map(|exp| exp.row_count())
2610 .max()
2611 .unwrap_or(1)
2612 };
2613
2614 for output_idx in 0..expansion_count {
2616 let mut row_values = Vec::new();
2617
2618 for (col_idx, item) in expanded_items.iter().enumerate() {
2619 let unnest_position = unnest_indices.iter().position(|&idx| idx == col_idx);
2621
2622 let value = if let Some(unnest_idx) = unnest_position {
2623 let expansion = &unnest_expansions[unnest_idx];
2625 expansion
2626 .values
2627 .get(output_idx)
2628 .cloned()
2629 .unwrap_or(DataValue::Null)
2630 } else {
2631 match item {
2633 SelectItem::Column {
2634 column: col_ref, ..
2635 } => {
2636 let col_idx =
2637 source_table.get_column_index(&col_ref.name).ok_or_else(
2638 || anyhow::anyhow!("Column '{}' not found", col_ref.name),
2639 )?;
2640 let row = source_table
2641 .get_row(row_idx)
2642 .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
2643 row.get(col_idx)
2644 .ok_or_else(|| {
2645 anyhow::anyhow!("Column {} not found in row", col_idx)
2646 })?
2647 .clone()
2648 }
2649 SelectItem::Expression { expr, .. } => {
2650 evaluator.evaluate(&expr, row_idx)?
2652 }
2653 SelectItem::Star { .. } => unreachable!(),
2654 SelectItem::StarExclude { .. } => {
2655 unreachable!("StarExclude should have been expanded")
2656 }
2657 }
2658 };
2659
2660 row_values.push(value);
2661 }
2662
2663 result_table
2664 .add_row(DataRow::new(row_values))
2665 .map_err(|e| anyhow::anyhow!("Failed to add expanded row: {}", e))?;
2666 }
2667 }
2668
2669 debug!(
2670 "QueryEngine::apply_select_with_row_expansion - input rows: {}, output rows: {}",
2671 visible_rows.len(),
2672 result_table.row_count()
2673 );
2674
2675 Ok(DataView::new(Arc::new(result_table)))
2676 }
2677
2678 fn try_expand_unnest(
2681 &self,
2682 expr: &SqlExpression,
2683 _source_table: &DataTable,
2684 row_idx: usize,
2685 evaluator: &mut ArithmeticEvaluator,
2686 expander_registry: &RowExpanderRegistry,
2687 ) -> Result<Option<crate::data::row_expanders::ExpansionResult>> {
2688 if let SqlExpression::Unnest { column, delimiter } = expr {
2690 let column_value = evaluator.evaluate(column, row_idx)?;
2692
2693 let delimiter_value = DataValue::String(delimiter.clone());
2695
2696 let expander = expander_registry
2698 .get("UNNEST")
2699 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2700
2701 let expansion = expander.expand(&column_value, &[delimiter_value])?;
2703 return Ok(Some(expansion));
2704 }
2705
2706 if let SqlExpression::FunctionCall { name, args, .. } = expr {
2708 if name.to_uppercase() == "UNNEST" {
2709 if args.len() != 2 {
2711 return Err(anyhow::anyhow!(
2712 "UNNEST requires exactly 2 arguments: UNNEST(column, delimiter)"
2713 ));
2714 }
2715
2716 let column_value = evaluator.evaluate(&args[0], row_idx)?;
2718
2719 let delimiter_value = evaluator.evaluate(&args[1], row_idx)?;
2721
2722 let expander = expander_registry
2724 .get("UNNEST")
2725 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2726
2727 let expansion = expander.expand(&column_value, &[delimiter_value])?;
2729 return Ok(Some(expansion));
2730 }
2731 }
2732
2733 Ok(None)
2734 }
2735
2736 fn apply_aggregate_select(
2738 &self,
2739 view: DataView,
2740 select_items: &[SelectItem],
2741 ) -> Result<DataView> {
2742 debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
2743
2744 let source_table = view.source();
2745 let mut result_table = DataTable::new("aggregate_result");
2746
2747 for item in select_items {
2749 let column_name = match item {
2750 SelectItem::Expression { alias, .. } => alias.clone(),
2751 _ => unreachable!("Should only have expressions in aggregate-only query"),
2752 };
2753 result_table.add_column(DataColumn::new(&column_name));
2754 }
2755
2756 let visible_rows = view.visible_row_indices().to_vec();
2758 let mut evaluator =
2759 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
2760 .with_visible_rows(visible_rows);
2761
2762 let mut row_values = Vec::new();
2764 for item in select_items {
2765 match item {
2766 SelectItem::Expression { expr, .. } => {
2767 let value = evaluator.evaluate(expr, 0)?;
2770 row_values.push(value);
2771 }
2772 _ => unreachable!("Should only have expressions in aggregate-only query"),
2773 }
2774 }
2775
2776 result_table
2778 .add_row(DataRow::new(row_values))
2779 .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
2780
2781 Ok(DataView::new(Arc::new(result_table)))
2782 }
2783
2784 fn column_matches_table(col: &DataColumn, table_name: &str) -> bool {
2796 if let Some(ref source) = col.source_table {
2798 if source == table_name || source.ends_with(&format!(".{}", table_name)) {
2800 return true;
2801 }
2802 }
2803
2804 if let Some(ref qualified) = col.qualified_name {
2806 if qualified.starts_with(&format!("{}.", table_name)) {
2808 return true;
2809 }
2810 }
2811
2812 false
2813 }
2814
2815 fn resolve_select_columns(
2817 &self,
2818 table: &DataTable,
2819 select_items: &[SelectItem],
2820 ) -> Result<Vec<usize>> {
2821 let mut indices = Vec::new();
2822 let table_columns = table.column_names();
2823
2824 for item in select_items {
2825 match item {
2826 SelectItem::Column {
2827 column: col_ref, ..
2828 } => {
2829 let index = if let Some(table_prefix) = &col_ref.table_prefix {
2831 let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
2833 table.find_column_by_qualified_name(&qualified_name)
2834 .ok_or_else(|| {
2835 let has_qualified = table.columns.iter()
2837 .any(|c| c.qualified_name.is_some());
2838 if !has_qualified {
2839 anyhow::anyhow!(
2840 "Column '{}' not found. Note: Table '{}' may not support qualified column names",
2841 qualified_name, table_prefix
2842 )
2843 } else {
2844 anyhow::anyhow!("Column '{}' not found", qualified_name)
2845 }
2846 })?
2847 } else {
2848 table_columns
2850 .iter()
2851 .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
2852 .ok_or_else(|| {
2853 let suggestion = self.find_similar_column(table, &col_ref.name);
2854 match suggestion {
2855 Some(similar) => anyhow::anyhow!(
2856 "Column '{}' not found. Did you mean '{}'?",
2857 col_ref.name,
2858 similar
2859 ),
2860 None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
2861 }
2862 })?
2863 };
2864 indices.push(index);
2865 }
2866 SelectItem::Star { table_prefix, .. } => {
2867 if let Some(prefix) = table_prefix {
2868 for (i, col) in table.columns.iter().enumerate() {
2870 if Self::column_matches_table(col, prefix) {
2871 indices.push(i);
2872 }
2873 }
2874 } else {
2875 for i in 0..table_columns.len() {
2877 indices.push(i);
2878 }
2879 }
2880 }
2881 SelectItem::StarExclude {
2882 table_prefix,
2883 excluded_columns,
2884 ..
2885 } => {
2886 if let Some(prefix) = table_prefix {
2888 for (i, col) in table.columns.iter().enumerate() {
2890 if Self::column_matches_table(col, prefix)
2891 && !excluded_columns.contains(&col.name)
2892 {
2893 indices.push(i);
2894 }
2895 }
2896 } else {
2897 for (i, col_name) in table_columns.iter().enumerate() {
2899 if !excluded_columns
2900 .iter()
2901 .any(|exc| exc.eq_ignore_ascii_case(col_name))
2902 {
2903 indices.push(i);
2904 }
2905 }
2906 }
2907 }
2908 SelectItem::Expression { .. } => {
2909 return Err(anyhow::anyhow!(
2910 "Computed expressions require new table creation"
2911 ));
2912 }
2913 }
2914 }
2915
2916 Ok(indices)
2917 }
2918
2919 fn apply_distinct(&self, view: DataView) -> Result<DataView> {
2921 use std::collections::HashSet;
2922
2923 let source = view.source();
2924 let visible_cols = view.visible_column_indices();
2925 let visible_rows = view.visible_row_indices();
2926
2927 let mut seen_rows = HashSet::new();
2929 let mut unique_row_indices = Vec::new();
2930
2931 for &row_idx in visible_rows {
2932 let mut row_key = Vec::new();
2934 for &col_idx in visible_cols {
2935 let value = source
2936 .get_value(row_idx, col_idx)
2937 .ok_or_else(|| anyhow!("Invalid cell reference"))?;
2938 row_key.push(format!("{:?}", value));
2940 }
2941
2942 if seen_rows.insert(row_key) {
2944 unique_row_indices.push(row_idx);
2946 }
2947 }
2948
2949 Ok(view.with_rows(unique_row_indices))
2951 }
2952
2953 fn apply_multi_order_by(
2955 &self,
2956 view: DataView,
2957 order_by_columns: &[OrderByItem],
2958 ) -> Result<DataView> {
2959 self.apply_multi_order_by_with_context(view, order_by_columns, None)
2960 }
2961
2962 fn apply_multi_order_by_with_context(
2964 &self,
2965 mut view: DataView,
2966 order_by_columns: &[OrderByItem],
2967 _exec_context: Option<&ExecutionContext>,
2968 ) -> Result<DataView> {
2969 let mut sort_columns = Vec::new();
2971
2972 for order_col in order_by_columns {
2973 let column_name = match &order_col.expr {
2975 SqlExpression::Column(col_ref) => col_ref.name.clone(),
2976 _ => {
2977 return Err(anyhow!(
2979 "ORDER BY expressions not yet supported - only simple columns allowed"
2980 ));
2981 }
2982 };
2983
2984 let col_index = if column_name.contains('.') {
2986 if let Some(dot_pos) = column_name.rfind('.') {
2988 let col_name = &column_name[dot_pos + 1..];
2989
2990 debug!(
2993 "ORDER BY: Extracting unqualified column '{}' from '{}'",
2994 col_name, column_name
2995 );
2996 view.source().get_column_index(col_name)
2997 } else {
2998 view.source().get_column_index(&column_name)
2999 }
3000 } else {
3001 view.source().get_column_index(&column_name)
3003 }
3004 .ok_or_else(|| {
3005 let suggestion = self.find_similar_column(view.source(), &column_name);
3007 match suggestion {
3008 Some(similar) => anyhow::anyhow!(
3009 "Column '{}' not found. Did you mean '{}'?",
3010 column_name,
3011 similar
3012 ),
3013 None => {
3014 let available_cols = view.source().column_names().join(", ");
3016 anyhow::anyhow!(
3017 "Column '{}' not found. Available columns: {}",
3018 column_name,
3019 available_cols
3020 )
3021 }
3022 }
3023 })?;
3024
3025 let ascending = matches!(order_col.direction, SortDirection::Asc);
3026 sort_columns.push((col_index, ascending));
3027 }
3028
3029 view.apply_multi_sort(&sort_columns)?;
3031 Ok(view)
3032 }
3033
3034 fn apply_group_by(
3036 &self,
3037 view: DataView,
3038 group_by_exprs: &[SqlExpression],
3039 select_items: &[SelectItem],
3040 having: Option<&SqlExpression>,
3041 plan: &mut ExecutionPlanBuilder,
3042 ) -> Result<DataView> {
3043 let (result_view, phase_info) = self.apply_group_by_expressions(
3045 view,
3046 group_by_exprs,
3047 select_items,
3048 having,
3049 self.case_insensitive,
3050 self.date_notation.clone(),
3051 )?;
3052
3053 plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
3055 plan.add_detail(format!(
3056 "Phase 1 - Group Building: {:.3}ms",
3057 phase_info.phase2_key_building.as_secs_f64() * 1000.0
3058 ));
3059 plan.add_detail(format!(
3060 " • Processing {} rows into {} groups",
3061 phase_info.total_rows, phase_info.num_groups
3062 ));
3063 plan.add_detail(format!(
3064 "Phase 2 - Aggregation: {:.3}ms",
3065 phase_info.phase4_aggregation.as_secs_f64() * 1000.0
3066 ));
3067 if phase_info.phase4_having_evaluation > Duration::ZERO {
3068 plan.add_detail(format!(
3069 "Phase 3 - HAVING Filter: {:.3}ms",
3070 phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
3071 ));
3072 plan.add_detail(format!(
3073 " • Filtered {} groups",
3074 phase_info.groups_filtered_by_having
3075 ));
3076 }
3077 plan.add_detail(format!(
3078 "Total GROUP BY time: {:.3}ms",
3079 phase_info.total_time.as_secs_f64() * 1000.0
3080 ));
3081
3082 Ok(result_view)
3083 }
3084
3085 pub fn estimate_group_cardinality(
3088 &self,
3089 view: &DataView,
3090 group_by_exprs: &[SqlExpression],
3091 ) -> usize {
3092 let row_count = view.get_visible_rows().len();
3094 if row_count <= 100 {
3095 return row_count;
3096 }
3097
3098 let sample_size = min(1000, row_count / 10).max(100);
3100 let mut seen = FxHashSet::default();
3101
3102 let visible_rows = view.get_visible_rows();
3103 for (i, &row_idx) in visible_rows.iter().enumerate() {
3104 if i >= sample_size {
3105 break;
3106 }
3107
3108 let mut key_values = Vec::new();
3110 for expr in group_by_exprs {
3111 let mut evaluator = ArithmeticEvaluator::new(view.source());
3112 let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
3113 key_values.push(value);
3114 }
3115
3116 seen.insert(key_values);
3117 }
3118
3119 let sample_cardinality = seen.len();
3121 let estimated = (sample_cardinality * row_count) / sample_size;
3122
3123 estimated.min(row_count).max(sample_cardinality)
3125 }
3126}
3127
3128#[cfg(test)]
3129mod tests {
3130 use super::*;
3131 use crate::data::datatable::{DataColumn, DataRow, DataValue};
3132
3133 fn create_test_table() -> Arc<DataTable> {
3134 let mut table = DataTable::new("test");
3135
3136 table.add_column(DataColumn::new("id"));
3138 table.add_column(DataColumn::new("name"));
3139 table.add_column(DataColumn::new("age"));
3140
3141 table
3143 .add_row(DataRow::new(vec![
3144 DataValue::Integer(1),
3145 DataValue::String("Alice".to_string()),
3146 DataValue::Integer(30),
3147 ]))
3148 .unwrap();
3149
3150 table
3151 .add_row(DataRow::new(vec![
3152 DataValue::Integer(2),
3153 DataValue::String("Bob".to_string()),
3154 DataValue::Integer(25),
3155 ]))
3156 .unwrap();
3157
3158 table
3159 .add_row(DataRow::new(vec![
3160 DataValue::Integer(3),
3161 DataValue::String("Charlie".to_string()),
3162 DataValue::Integer(35),
3163 ]))
3164 .unwrap();
3165
3166 Arc::new(table)
3167 }
3168
3169 #[test]
3170 fn test_select_all() {
3171 let table = create_test_table();
3172 let engine = QueryEngine::new();
3173
3174 let view = engine
3175 .execute(table.clone(), "SELECT * FROM users")
3176 .unwrap();
3177 assert_eq!(view.row_count(), 3);
3178 assert_eq!(view.column_count(), 3);
3179 }
3180
3181 #[test]
3182 fn test_select_columns() {
3183 let table = create_test_table();
3184 let engine = QueryEngine::new();
3185
3186 let view = engine
3187 .execute(table.clone(), "SELECT name, age FROM users")
3188 .unwrap();
3189 assert_eq!(view.row_count(), 3);
3190 assert_eq!(view.column_count(), 2);
3191 }
3192
3193 #[test]
3194 fn test_select_with_limit() {
3195 let table = create_test_table();
3196 let engine = QueryEngine::new();
3197
3198 let view = engine
3199 .execute(table.clone(), "SELECT * FROM users LIMIT 2")
3200 .unwrap();
3201 assert_eq!(view.row_count(), 2);
3202 }
3203
3204 #[test]
3205 fn test_type_coercion_contains() {
3206 let _ = tracing_subscriber::fmt()
3208 .with_max_level(tracing::Level::DEBUG)
3209 .try_init();
3210
3211 let mut table = DataTable::new("test");
3212 table.add_column(DataColumn::new("id"));
3213 table.add_column(DataColumn::new("status"));
3214 table.add_column(DataColumn::new("price"));
3215
3216 table
3218 .add_row(DataRow::new(vec![
3219 DataValue::Integer(1),
3220 DataValue::String("Pending".to_string()),
3221 DataValue::Float(99.99),
3222 ]))
3223 .unwrap();
3224
3225 table
3226 .add_row(DataRow::new(vec![
3227 DataValue::Integer(2),
3228 DataValue::String("Confirmed".to_string()),
3229 DataValue::Float(150.50),
3230 ]))
3231 .unwrap();
3232
3233 table
3234 .add_row(DataRow::new(vec![
3235 DataValue::Integer(3),
3236 DataValue::String("Pending".to_string()),
3237 DataValue::Float(75.00),
3238 ]))
3239 .unwrap();
3240
3241 let table = Arc::new(table);
3242 let engine = QueryEngine::new();
3243
3244 println!("\n=== Testing WHERE clause with Contains ===");
3245 println!("Table has {} rows", table.row_count());
3246 for i in 0..table.row_count() {
3247 let status = table.get_value(i, 1);
3248 println!("Row {i}: status = {status:?}");
3249 }
3250
3251 println!("\n--- Test 1: status.Contains('pend') ---");
3253 let result = engine.execute(
3254 table.clone(),
3255 "SELECT * FROM test WHERE status.Contains('pend')",
3256 );
3257 match result {
3258 Ok(view) => {
3259 println!("SUCCESS: Found {} matching rows", view.row_count());
3260 assert_eq!(view.row_count(), 2); }
3262 Err(e) => {
3263 panic!("Query failed: {e}");
3264 }
3265 }
3266
3267 println!("\n--- Test 2: price.Contains('9') ---");
3269 let result = engine.execute(
3270 table.clone(),
3271 "SELECT * FROM test WHERE price.Contains('9')",
3272 );
3273 match result {
3274 Ok(view) => {
3275 println!(
3276 "SUCCESS: Found {} matching rows with price containing '9'",
3277 view.row_count()
3278 );
3279 assert!(view.row_count() >= 1);
3281 }
3282 Err(e) => {
3283 panic!("Numeric coercion query failed: {e}");
3284 }
3285 }
3286
3287 println!("\n=== All tests passed! ===");
3288 }
3289
3290 #[test]
3291 fn test_not_in_clause() {
3292 let _ = tracing_subscriber::fmt()
3294 .with_max_level(tracing::Level::DEBUG)
3295 .try_init();
3296
3297 let mut table = DataTable::new("test");
3298 table.add_column(DataColumn::new("id"));
3299 table.add_column(DataColumn::new("country"));
3300
3301 table
3303 .add_row(DataRow::new(vec![
3304 DataValue::Integer(1),
3305 DataValue::String("CA".to_string()),
3306 ]))
3307 .unwrap();
3308
3309 table
3310 .add_row(DataRow::new(vec![
3311 DataValue::Integer(2),
3312 DataValue::String("US".to_string()),
3313 ]))
3314 .unwrap();
3315
3316 table
3317 .add_row(DataRow::new(vec![
3318 DataValue::Integer(3),
3319 DataValue::String("UK".to_string()),
3320 ]))
3321 .unwrap();
3322
3323 let table = Arc::new(table);
3324 let engine = QueryEngine::new();
3325
3326 println!("\n=== Testing NOT IN clause ===");
3327 println!("Table has {} rows", table.row_count());
3328 for i in 0..table.row_count() {
3329 let country = table.get_value(i, 1);
3330 println!("Row {i}: country = {country:?}");
3331 }
3332
3333 println!("\n--- Test: country NOT IN ('CA') ---");
3335 let result = engine.execute(
3336 table.clone(),
3337 "SELECT * FROM test WHERE country NOT IN ('CA')",
3338 );
3339 match result {
3340 Ok(view) => {
3341 println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
3342 assert_eq!(view.row_count(), 2); }
3344 Err(e) => {
3345 panic!("NOT IN query failed: {e}");
3346 }
3347 }
3348
3349 println!("\n=== NOT IN test complete! ===");
3350 }
3351
3352 #[test]
3353 fn test_case_insensitive_in_and_not_in() {
3354 let _ = tracing_subscriber::fmt()
3356 .with_max_level(tracing::Level::DEBUG)
3357 .try_init();
3358
3359 let mut table = DataTable::new("test");
3360 table.add_column(DataColumn::new("id"));
3361 table.add_column(DataColumn::new("country"));
3362
3363 table
3365 .add_row(DataRow::new(vec![
3366 DataValue::Integer(1),
3367 DataValue::String("CA".to_string()), ]))
3369 .unwrap();
3370
3371 table
3372 .add_row(DataRow::new(vec![
3373 DataValue::Integer(2),
3374 DataValue::String("us".to_string()), ]))
3376 .unwrap();
3377
3378 table
3379 .add_row(DataRow::new(vec![
3380 DataValue::Integer(3),
3381 DataValue::String("UK".to_string()), ]))
3383 .unwrap();
3384
3385 let table = Arc::new(table);
3386
3387 println!("\n=== Testing Case-Insensitive IN clause ===");
3388 println!("Table has {} rows", table.row_count());
3389 for i in 0..table.row_count() {
3390 let country = table.get_value(i, 1);
3391 println!("Row {i}: country = {country:?}");
3392 }
3393
3394 println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
3396 let engine = QueryEngine::with_case_insensitive(true);
3397 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3398 match result {
3399 Ok(view) => {
3400 println!(
3401 "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
3402 view.row_count()
3403 );
3404 assert_eq!(view.row_count(), 1); }
3406 Err(e) => {
3407 panic!("Case-insensitive IN query failed: {e}");
3408 }
3409 }
3410
3411 println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
3413 let result = engine.execute(
3414 table.clone(),
3415 "SELECT * FROM test WHERE country NOT IN ('ca')",
3416 );
3417 match result {
3418 Ok(view) => {
3419 println!(
3420 "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
3421 view.row_count()
3422 );
3423 assert_eq!(view.row_count(), 2); }
3425 Err(e) => {
3426 panic!("Case-insensitive NOT IN query failed: {e}");
3427 }
3428 }
3429
3430 println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
3432 let engine_case_sensitive = QueryEngine::new(); let result = engine_case_sensitive
3434 .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3435 match result {
3436 Ok(view) => {
3437 println!(
3438 "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
3439 view.row_count()
3440 );
3441 assert_eq!(view.row_count(), 0); }
3443 Err(e) => {
3444 panic!("Case-sensitive IN query failed: {e}");
3445 }
3446 }
3447
3448 println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
3449 }
3450
3451 #[test]
3452 #[ignore = "Parentheses in WHERE clause not yet implemented"]
3453 fn test_parentheses_in_where_clause() {
3454 let _ = tracing_subscriber::fmt()
3456 .with_max_level(tracing::Level::DEBUG)
3457 .try_init();
3458
3459 let mut table = DataTable::new("test");
3460 table.add_column(DataColumn::new("id"));
3461 table.add_column(DataColumn::new("status"));
3462 table.add_column(DataColumn::new("priority"));
3463
3464 table
3466 .add_row(DataRow::new(vec![
3467 DataValue::Integer(1),
3468 DataValue::String("Pending".to_string()),
3469 DataValue::String("High".to_string()),
3470 ]))
3471 .unwrap();
3472
3473 table
3474 .add_row(DataRow::new(vec![
3475 DataValue::Integer(2),
3476 DataValue::String("Complete".to_string()),
3477 DataValue::String("High".to_string()),
3478 ]))
3479 .unwrap();
3480
3481 table
3482 .add_row(DataRow::new(vec![
3483 DataValue::Integer(3),
3484 DataValue::String("Pending".to_string()),
3485 DataValue::String("Low".to_string()),
3486 ]))
3487 .unwrap();
3488
3489 table
3490 .add_row(DataRow::new(vec![
3491 DataValue::Integer(4),
3492 DataValue::String("Complete".to_string()),
3493 DataValue::String("Low".to_string()),
3494 ]))
3495 .unwrap();
3496
3497 let table = Arc::new(table);
3498 let engine = QueryEngine::new();
3499
3500 println!("\n=== Testing Parentheses in WHERE clause ===");
3501 println!("Table has {} rows", table.row_count());
3502 for i in 0..table.row_count() {
3503 let status = table.get_value(i, 1);
3504 let priority = table.get_value(i, 2);
3505 println!("Row {i}: status = {status:?}, priority = {priority:?}");
3506 }
3507
3508 println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
3510 let result = engine.execute(
3511 table.clone(),
3512 "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
3513 );
3514 match result {
3515 Ok(view) => {
3516 println!(
3517 "SUCCESS: Found {} rows with parenthetical logic",
3518 view.row_count()
3519 );
3520 assert_eq!(view.row_count(), 2); }
3522 Err(e) => {
3523 panic!("Parentheses query failed: {e}");
3524 }
3525 }
3526
3527 println!("\n=== Parentheses test complete! ===");
3528 }
3529
3530 #[test]
3531 #[ignore = "Numeric type coercion needs fixing"]
3532 fn test_numeric_type_coercion() {
3533 let _ = tracing_subscriber::fmt()
3535 .with_max_level(tracing::Level::DEBUG)
3536 .try_init();
3537
3538 let mut table = DataTable::new("test");
3539 table.add_column(DataColumn::new("id"));
3540 table.add_column(DataColumn::new("price"));
3541 table.add_column(DataColumn::new("quantity"));
3542
3543 table
3545 .add_row(DataRow::new(vec![
3546 DataValue::Integer(1),
3547 DataValue::Float(99.50), DataValue::Integer(100),
3549 ]))
3550 .unwrap();
3551
3552 table
3553 .add_row(DataRow::new(vec![
3554 DataValue::Integer(2),
3555 DataValue::Float(150.0), DataValue::Integer(200),
3557 ]))
3558 .unwrap();
3559
3560 table
3561 .add_row(DataRow::new(vec![
3562 DataValue::Integer(3),
3563 DataValue::Integer(75), DataValue::Integer(50),
3565 ]))
3566 .unwrap();
3567
3568 let table = Arc::new(table);
3569 let engine = QueryEngine::new();
3570
3571 println!("\n=== Testing Numeric Type Coercion ===");
3572 println!("Table has {} rows", table.row_count());
3573 for i in 0..table.row_count() {
3574 let price = table.get_value(i, 1);
3575 let quantity = table.get_value(i, 2);
3576 println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
3577 }
3578
3579 println!("\n--- Test: price.Contains('.') ---");
3581 let result = engine.execute(
3582 table.clone(),
3583 "SELECT * FROM test WHERE price.Contains('.')",
3584 );
3585 match result {
3586 Ok(view) => {
3587 println!(
3588 "SUCCESS: Found {} rows with decimal points in price",
3589 view.row_count()
3590 );
3591 assert_eq!(view.row_count(), 2); }
3593 Err(e) => {
3594 panic!("Numeric Contains query failed: {e}");
3595 }
3596 }
3597
3598 println!("\n--- Test: quantity.Contains('0') ---");
3600 let result = engine.execute(
3601 table.clone(),
3602 "SELECT * FROM test WHERE quantity.Contains('0')",
3603 );
3604 match result {
3605 Ok(view) => {
3606 println!(
3607 "SUCCESS: Found {} rows with '0' in quantity",
3608 view.row_count()
3609 );
3610 assert_eq!(view.row_count(), 2); }
3612 Err(e) => {
3613 panic!("Integer Contains query failed: {e}");
3614 }
3615 }
3616
3617 println!("\n=== Numeric type coercion test complete! ===");
3618 }
3619
3620 #[test]
3621 fn test_datetime_comparisons() {
3622 let _ = tracing_subscriber::fmt()
3624 .with_max_level(tracing::Level::DEBUG)
3625 .try_init();
3626
3627 let mut table = DataTable::new("test");
3628 table.add_column(DataColumn::new("id"));
3629 table.add_column(DataColumn::new("created_date"));
3630
3631 table
3633 .add_row(DataRow::new(vec![
3634 DataValue::Integer(1),
3635 DataValue::String("2024-12-15".to_string()),
3636 ]))
3637 .unwrap();
3638
3639 table
3640 .add_row(DataRow::new(vec![
3641 DataValue::Integer(2),
3642 DataValue::String("2025-01-15".to_string()),
3643 ]))
3644 .unwrap();
3645
3646 table
3647 .add_row(DataRow::new(vec![
3648 DataValue::Integer(3),
3649 DataValue::String("2025-02-15".to_string()),
3650 ]))
3651 .unwrap();
3652
3653 let table = Arc::new(table);
3654 let engine = QueryEngine::new();
3655
3656 println!("\n=== Testing DateTime Comparisons ===");
3657 println!("Table has {} rows", table.row_count());
3658 for i in 0..table.row_count() {
3659 let date = table.get_value(i, 1);
3660 println!("Row {i}: created_date = {date:?}");
3661 }
3662
3663 println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
3665 let result = engine.execute(
3666 table.clone(),
3667 "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
3668 );
3669 match result {
3670 Ok(view) => {
3671 println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
3672 assert_eq!(view.row_count(), 2); }
3674 Err(e) => {
3675 panic!("DateTime comparison query failed: {e}");
3676 }
3677 }
3678
3679 println!("\n=== DateTime comparison test complete! ===");
3680 }
3681
3682 #[test]
3683 fn test_not_with_method_calls() {
3684 let _ = tracing_subscriber::fmt()
3686 .with_max_level(tracing::Level::DEBUG)
3687 .try_init();
3688
3689 let mut table = DataTable::new("test");
3690 table.add_column(DataColumn::new("id"));
3691 table.add_column(DataColumn::new("status"));
3692
3693 table
3695 .add_row(DataRow::new(vec![
3696 DataValue::Integer(1),
3697 DataValue::String("Pending Review".to_string()),
3698 ]))
3699 .unwrap();
3700
3701 table
3702 .add_row(DataRow::new(vec![
3703 DataValue::Integer(2),
3704 DataValue::String("Complete".to_string()),
3705 ]))
3706 .unwrap();
3707
3708 table
3709 .add_row(DataRow::new(vec![
3710 DataValue::Integer(3),
3711 DataValue::String("Pending Approval".to_string()),
3712 ]))
3713 .unwrap();
3714
3715 let table = Arc::new(table);
3716 let engine = QueryEngine::with_case_insensitive(true);
3717
3718 println!("\n=== Testing NOT with Method Calls ===");
3719 println!("Table has {} rows", table.row_count());
3720 for i in 0..table.row_count() {
3721 let status = table.get_value(i, 1);
3722 println!("Row {i}: status = {status:?}");
3723 }
3724
3725 println!("\n--- Test: NOT status.Contains('pend') ---");
3727 let result = engine.execute(
3728 table.clone(),
3729 "SELECT * FROM test WHERE NOT status.Contains('pend')",
3730 );
3731 match result {
3732 Ok(view) => {
3733 println!(
3734 "SUCCESS: Found {} rows NOT containing 'pend'",
3735 view.row_count()
3736 );
3737 assert_eq!(view.row_count(), 1); }
3739 Err(e) => {
3740 panic!("NOT Contains query failed: {e}");
3741 }
3742 }
3743
3744 println!("\n--- Test: NOT status.StartsWith('Pending') ---");
3746 let result = engine.execute(
3747 table.clone(),
3748 "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
3749 );
3750 match result {
3751 Ok(view) => {
3752 println!(
3753 "SUCCESS: Found {} rows NOT starting with 'Pending'",
3754 view.row_count()
3755 );
3756 assert_eq!(view.row_count(), 1); }
3758 Err(e) => {
3759 panic!("NOT StartsWith query failed: {e}");
3760 }
3761 }
3762
3763 println!("\n=== NOT with method calls test complete! ===");
3764 }
3765
3766 #[test]
3767 #[ignore = "Complex logical expressions with parentheses not yet implemented"]
3768 fn test_complex_logical_expressions() {
3769 let _ = tracing_subscriber::fmt()
3771 .with_max_level(tracing::Level::DEBUG)
3772 .try_init();
3773
3774 let mut table = DataTable::new("test");
3775 table.add_column(DataColumn::new("id"));
3776 table.add_column(DataColumn::new("status"));
3777 table.add_column(DataColumn::new("priority"));
3778 table.add_column(DataColumn::new("assigned"));
3779
3780 table
3782 .add_row(DataRow::new(vec![
3783 DataValue::Integer(1),
3784 DataValue::String("Pending".to_string()),
3785 DataValue::String("High".to_string()),
3786 DataValue::String("John".to_string()),
3787 ]))
3788 .unwrap();
3789
3790 table
3791 .add_row(DataRow::new(vec![
3792 DataValue::Integer(2),
3793 DataValue::String("Complete".to_string()),
3794 DataValue::String("High".to_string()),
3795 DataValue::String("Jane".to_string()),
3796 ]))
3797 .unwrap();
3798
3799 table
3800 .add_row(DataRow::new(vec![
3801 DataValue::Integer(3),
3802 DataValue::String("Pending".to_string()),
3803 DataValue::String("Low".to_string()),
3804 DataValue::String("John".to_string()),
3805 ]))
3806 .unwrap();
3807
3808 table
3809 .add_row(DataRow::new(vec![
3810 DataValue::Integer(4),
3811 DataValue::String("In Progress".to_string()),
3812 DataValue::String("Medium".to_string()),
3813 DataValue::String("Jane".to_string()),
3814 ]))
3815 .unwrap();
3816
3817 let table = Arc::new(table);
3818 let engine = QueryEngine::new();
3819
3820 println!("\n=== Testing Complex Logical Expressions ===");
3821 println!("Table has {} rows", table.row_count());
3822 for i in 0..table.row_count() {
3823 let status = table.get_value(i, 1);
3824 let priority = table.get_value(i, 2);
3825 let assigned = table.get_value(i, 3);
3826 println!(
3827 "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
3828 );
3829 }
3830
3831 println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
3833 let result = engine.execute(
3834 table.clone(),
3835 "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
3836 );
3837 match result {
3838 Ok(view) => {
3839 println!(
3840 "SUCCESS: Found {} rows with complex logic",
3841 view.row_count()
3842 );
3843 assert_eq!(view.row_count(), 2); }
3845 Err(e) => {
3846 panic!("Complex logic query failed: {e}");
3847 }
3848 }
3849
3850 println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
3852 let result = engine.execute(
3853 table.clone(),
3854 "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
3855 );
3856 match result {
3857 Ok(view) => {
3858 println!(
3859 "SUCCESS: Found {} rows with NOT complex logic",
3860 view.row_count()
3861 );
3862 assert_eq!(view.row_count(), 2); }
3864 Err(e) => {
3865 panic!("NOT complex logic query failed: {e}");
3866 }
3867 }
3868
3869 println!("\n=== Complex logical expressions test complete! ===");
3870 }
3871
3872 #[test]
3873 fn test_mixed_data_types_and_edge_cases() {
3874 let _ = tracing_subscriber::fmt()
3876 .with_max_level(tracing::Level::DEBUG)
3877 .try_init();
3878
3879 let mut table = DataTable::new("test");
3880 table.add_column(DataColumn::new("id"));
3881 table.add_column(DataColumn::new("value"));
3882 table.add_column(DataColumn::new("nullable_field"));
3883
3884 table
3886 .add_row(DataRow::new(vec![
3887 DataValue::Integer(1),
3888 DataValue::String("123.45".to_string()),
3889 DataValue::String("present".to_string()),
3890 ]))
3891 .unwrap();
3892
3893 table
3894 .add_row(DataRow::new(vec![
3895 DataValue::Integer(2),
3896 DataValue::Float(678.90),
3897 DataValue::Null,
3898 ]))
3899 .unwrap();
3900
3901 table
3902 .add_row(DataRow::new(vec![
3903 DataValue::Integer(3),
3904 DataValue::Boolean(true),
3905 DataValue::String("also present".to_string()),
3906 ]))
3907 .unwrap();
3908
3909 table
3910 .add_row(DataRow::new(vec![
3911 DataValue::Integer(4),
3912 DataValue::String("false".to_string()),
3913 DataValue::Null,
3914 ]))
3915 .unwrap();
3916
3917 let table = Arc::new(table);
3918 let engine = QueryEngine::new();
3919
3920 println!("\n=== Testing Mixed Data Types and Edge Cases ===");
3921 println!("Table has {} rows", table.row_count());
3922 for i in 0..table.row_count() {
3923 let value = table.get_value(i, 1);
3924 let nullable = table.get_value(i, 2);
3925 println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
3926 }
3927
3928 println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
3930 let result = engine.execute(
3931 table.clone(),
3932 "SELECT * FROM test WHERE value.Contains('true')",
3933 );
3934 match result {
3935 Ok(view) => {
3936 println!(
3937 "SUCCESS: Found {} rows with boolean coercion",
3938 view.row_count()
3939 );
3940 assert_eq!(view.row_count(), 1); }
3942 Err(e) => {
3943 panic!("Boolean coercion query failed: {e}");
3944 }
3945 }
3946
3947 println!("\n--- Test: id IN (1, 3) ---");
3949 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
3950 match result {
3951 Ok(view) => {
3952 println!("SUCCESS: Found {} rows with IN clause", view.row_count());
3953 assert_eq!(view.row_count(), 2); }
3955 Err(e) => {
3956 panic!("Multiple IN values query failed: {e}");
3957 }
3958 }
3959
3960 println!("\n=== Mixed data types test complete! ===");
3961 }
3962
3963 #[test]
3965 fn test_aggregate_only_single_row() {
3966 let table = create_test_stock_data();
3967 let engine = QueryEngine::new();
3968
3969 let result = engine
3971 .execute(
3972 table.clone(),
3973 "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
3974 )
3975 .expect("Query should succeed");
3976
3977 assert_eq!(
3978 result.row_count(),
3979 1,
3980 "Aggregate-only query should return exactly 1 row"
3981 );
3982 assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
3983
3984 let source = result.source();
3986 let row = source.get_row(0).expect("Should have first row");
3987
3988 assert_eq!(row.values[0], DataValue::Integer(5));
3990
3991 assert_eq!(row.values[1], DataValue::Float(99.5));
3993
3994 assert_eq!(row.values[2], DataValue::Float(105.0));
3996
3997 if let DataValue::Float(avg) = &row.values[3] {
3999 assert!(
4000 (avg - 102.4).abs() < 0.01,
4001 "Average should be approximately 102.4, got {}",
4002 avg
4003 );
4004 } else {
4005 panic!("AVG should return a Float value");
4006 }
4007 }
4008
4009 #[test]
4011 fn test_single_aggregate_single_row() {
4012 let table = create_test_stock_data();
4013 let engine = QueryEngine::new();
4014
4015 let result = engine
4016 .execute(table.clone(), "SELECT COUNT(*) FROM stock")
4017 .expect("Query should succeed");
4018
4019 assert_eq!(
4020 result.row_count(),
4021 1,
4022 "Single aggregate query should return exactly 1 row"
4023 );
4024 assert_eq!(result.column_count(), 1, "Should have 1 column");
4025
4026 let source = result.source();
4027 let row = source.get_row(0).expect("Should have first row");
4028 assert_eq!(row.values[0], DataValue::Integer(5));
4029 }
4030
4031 #[test]
4033 fn test_aggregate_with_where_single_row() {
4034 let table = create_test_stock_data();
4035 let engine = QueryEngine::new();
4036
4037 let result = engine
4039 .execute(
4040 table.clone(),
4041 "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
4042 )
4043 .expect("Query should succeed");
4044
4045 assert_eq!(
4046 result.row_count(),
4047 1,
4048 "Filtered aggregate query should return exactly 1 row"
4049 );
4050 assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
4051
4052 let source = result.source();
4053 let row = source.get_row(0).expect("Should have first row");
4054
4055 assert_eq!(row.values[0], DataValue::Integer(2));
4057 assert_eq!(row.values[1], DataValue::Float(103.5)); assert_eq!(row.values[2], DataValue::Float(105.0)); }
4060
4061 #[test]
4062 fn test_not_in_parsing() {
4063 use crate::sql::recursive_parser::Parser;
4064
4065 let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
4066 println!("\n=== Testing NOT IN parsing ===");
4067 println!("Parsing query: {query}");
4068
4069 let mut parser = Parser::new(query);
4070 match parser.parse() {
4071 Ok(statement) => {
4072 println!("Parsed statement: {statement:#?}");
4073 if let Some(where_clause) = statement.where_clause {
4074 println!("WHERE conditions: {:#?}", where_clause.conditions);
4075 if let Some(first_condition) = where_clause.conditions.first() {
4076 println!("First condition expression: {:#?}", first_condition.expr);
4077 }
4078 }
4079 }
4080 Err(e) => {
4081 panic!("Parse error: {e}");
4082 }
4083 }
4084 }
4085
4086 fn create_test_stock_data() -> Arc<DataTable> {
4088 let mut table = DataTable::new("stock");
4089
4090 table.add_column(DataColumn::new("symbol"));
4091 table.add_column(DataColumn::new("close"));
4092 table.add_column(DataColumn::new("volume"));
4093
4094 let test_data = vec![
4096 ("AAPL", 99.5, 1000),
4097 ("AAPL", 101.2, 1500),
4098 ("AAPL", 103.5, 2000),
4099 ("AAPL", 105.0, 1200),
4100 ("AAPL", 102.8, 1800),
4101 ];
4102
4103 for (symbol, close, volume) in test_data {
4104 table
4105 .add_row(DataRow::new(vec![
4106 DataValue::String(symbol.to_string()),
4107 DataValue::Float(close),
4108 DataValue::Integer(volume),
4109 ]))
4110 .expect("Should add row successfully");
4111 }
4112
4113 Arc::new(table)
4114 }
4115}
4116
4117#[cfg(test)]
4118#[path = "query_engine_tests.rs"]
4119mod query_engine_tests;