1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::row_expanders::RowExpanderRegistry;
19use crate::data::subquery_executor::SubqueryExecutor;
20use crate::data::temp_table_registry::TempTableRegistry;
21use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
22use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
23use crate::sql::parser::ast::ColumnRef;
24use crate::sql::parser::ast::SetOperation;
25use crate::sql::parser::ast::TableSource;
26use crate::sql::recursive_parser::{
27 CTEType, OrderByItem, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
28 TableFunction,
29};
30
31#[derive(Debug, Clone)]
33pub struct ExecutionContext {
34 alias_map: HashMap<String, String>,
37}
38
39impl ExecutionContext {
40 pub fn new() -> Self {
42 Self {
43 alias_map: HashMap::new(),
44 }
45 }
46
47 pub fn register_alias(&mut self, alias: String, table_name: String) {
49 debug!("Registering alias: {} -> {}", alias, table_name);
50 self.alias_map.insert(alias, table_name);
51 }
52
53 pub fn resolve_alias(&self, name: &str) -> String {
56 self.alias_map
57 .get(name)
58 .cloned()
59 .unwrap_or_else(|| name.to_string())
60 }
61
62 pub fn is_alias(&self, name: &str) -> bool {
64 self.alias_map.contains_key(name)
65 }
66
67 pub fn get_aliases(&self) -> HashMap<String, String> {
69 self.alias_map.clone()
70 }
71
72 pub fn resolve_column_index(&self, table: &DataTable, column_ref: &ColumnRef) -> Result<usize> {
87 if let Some(table_prefix) = &column_ref.table_prefix {
88 let actual_table = self.resolve_alias(table_prefix);
90
91 let qualified_name = format!("{}.{}", actual_table, column_ref.name);
93 if let Some(idx) = table.find_column_by_qualified_name(&qualified_name) {
94 debug!(
95 "Resolved {}.{} -> qualified column '{}' at index {}",
96 table_prefix, column_ref.name, qualified_name, idx
97 );
98 return Ok(idx);
99 }
100
101 if let Some(idx) = table.get_column_index(&column_ref.name) {
103 debug!(
104 "Resolved {}.{} -> unqualified column '{}' at index {}",
105 table_prefix, column_ref.name, column_ref.name, idx
106 );
107 return Ok(idx);
108 }
109
110 Err(anyhow!(
112 "Column '{}' not found. Table '{}' may not support qualified column names",
113 qualified_name,
114 actual_table
115 ))
116 } else {
117 if let Some(idx) = table.get_column_index(&column_ref.name) {
119 debug!(
120 "Resolved unqualified column '{}' at index {}",
121 column_ref.name, idx
122 );
123 return Ok(idx);
124 }
125
126 if column_ref.name.contains('.') {
128 if let Some(idx) = table.find_column_by_qualified_name(&column_ref.name) {
129 debug!(
130 "Resolved '{}' as qualified column at index {}",
131 column_ref.name, idx
132 );
133 return Ok(idx);
134 }
135 }
136
137 let suggestion = self.find_similar_column(table, &column_ref.name);
139 match suggestion {
140 Some(similar) => Err(anyhow!(
141 "Column '{}' not found. Did you mean '{}'?",
142 column_ref.name,
143 similar
144 )),
145 None => Err(anyhow!("Column '{}' not found", column_ref.name)),
146 }
147 }
148 }
149
150 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
152 let columns = table.column_names();
153 let mut best_match: Option<(String, usize)> = None;
154
155 for col in columns {
156 let distance = edit_distance(name, &col);
157 if distance <= 2 {
158 match best_match {
160 Some((_, best_dist)) if distance < best_dist => {
161 best_match = Some((col.clone(), distance));
162 }
163 None => {
164 best_match = Some((col.clone(), distance));
165 }
166 _ => {}
167 }
168 }
169 }
170
171 best_match.map(|(name, _)| name)
172 }
173}
174
175impl Default for ExecutionContext {
176 fn default() -> Self {
177 Self::new()
178 }
179}
180
181fn edit_distance(a: &str, b: &str) -> usize {
183 let len_a = a.chars().count();
184 let len_b = b.chars().count();
185
186 if len_a == 0 {
187 return len_b;
188 }
189 if len_b == 0 {
190 return len_a;
191 }
192
193 let mut matrix = vec![vec![0; len_b + 1]; len_a + 1];
194
195 for i in 0..=len_a {
196 matrix[i][0] = i;
197 }
198 for j in 0..=len_b {
199 matrix[0][j] = j;
200 }
201
202 let a_chars: Vec<char> = a.chars().collect();
203 let b_chars: Vec<char> = b.chars().collect();
204
205 for i in 1..=len_a {
206 for j in 1..=len_b {
207 let cost = if a_chars[i - 1] == b_chars[j - 1] {
208 0
209 } else {
210 1
211 };
212 matrix[i][j] = min(
213 min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1),
214 matrix[i - 1][j - 1] + cost,
215 );
216 }
217 }
218
219 matrix[len_a][len_b]
220}
221
222#[derive(Clone)]
224pub struct QueryEngine {
225 case_insensitive: bool,
226 date_notation: String,
227 _behavior_config: Option<BehaviorConfig>,
228}
229
230impl Default for QueryEngine {
231 fn default() -> Self {
232 Self::new()
233 }
234}
235
236impl QueryEngine {
237 #[must_use]
238 pub fn new() -> Self {
239 Self {
240 case_insensitive: false,
241 date_notation: get_date_notation(),
242 _behavior_config: None,
243 }
244 }
245
246 #[must_use]
247 pub fn with_behavior_config(config: BehaviorConfig) -> Self {
248 let case_insensitive = config.case_insensitive_default;
249 let date_notation = get_date_notation();
251 Self {
252 case_insensitive,
253 date_notation,
254 _behavior_config: Some(config),
255 }
256 }
257
258 #[must_use]
259 pub fn with_date_notation(_date_notation: String) -> Self {
260 Self {
261 case_insensitive: false,
262 date_notation: get_date_notation(), _behavior_config: None,
264 }
265 }
266
267 #[must_use]
268 pub fn with_case_insensitive(case_insensitive: bool) -> Self {
269 Self {
270 case_insensitive,
271 date_notation: get_date_notation(),
272 _behavior_config: None,
273 }
274 }
275
276 #[must_use]
277 pub fn with_case_insensitive_and_date_notation(
278 case_insensitive: bool,
279 _date_notation: String, ) -> Self {
281 Self {
282 case_insensitive,
283 date_notation: get_date_notation(), _behavior_config: None,
285 }
286 }
287
288 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
290 let columns = table.column_names();
291 let mut best_match: Option<(String, usize)> = None;
292
293 for col in columns {
294 let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
295 let max_distance = if name.len() > 10 { 3 } else { 2 };
298 if distance <= max_distance {
299 match &best_match {
300 None => best_match = Some((col, distance)),
301 Some((_, best_dist)) if distance < *best_dist => {
302 best_match = Some((col, distance));
303 }
304 _ => {}
305 }
306 }
307 }
308
309 best_match.map(|(name, _)| name)
310 }
311
312 fn edit_distance(&self, s1: &str, s2: &str) -> usize {
314 let len1 = s1.len();
315 let len2 = s2.len();
316 let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
317
318 for i in 0..=len1 {
319 matrix[i][0] = i;
320 }
321 for j in 0..=len2 {
322 matrix[0][j] = j;
323 }
324
325 for (i, c1) in s1.chars().enumerate() {
326 for (j, c2) in s2.chars().enumerate() {
327 let cost = usize::from(c1 != c2);
328 matrix[i + 1][j + 1] = std::cmp::min(
329 matrix[i][j + 1] + 1, std::cmp::min(
331 matrix[i + 1][j] + 1, matrix[i][j] + cost, ),
334 );
335 }
336 }
337
338 matrix[len1][len2]
339 }
340
341 fn contains_unnest(expr: &SqlExpression) -> bool {
343 match expr {
344 SqlExpression::Unnest { .. } => true,
346 SqlExpression::FunctionCall { name, args, .. } => {
347 if name.to_uppercase() == "UNNEST" {
348 return true;
349 }
350 args.iter().any(Self::contains_unnest)
352 }
353 SqlExpression::BinaryOp { left, right, .. } => {
354 Self::contains_unnest(left) || Self::contains_unnest(right)
355 }
356 SqlExpression::Not { expr } => Self::contains_unnest(expr),
357 SqlExpression::CaseExpression {
358 when_branches,
359 else_branch,
360 } => {
361 when_branches.iter().any(|branch| {
362 Self::contains_unnest(&branch.condition)
363 || Self::contains_unnest(&branch.result)
364 }) || else_branch
365 .as_ref()
366 .map_or(false, |e| Self::contains_unnest(e))
367 }
368 SqlExpression::SimpleCaseExpression {
369 expr,
370 when_branches,
371 else_branch,
372 } => {
373 Self::contains_unnest(expr)
374 || when_branches.iter().any(|branch| {
375 Self::contains_unnest(&branch.value)
376 || Self::contains_unnest(&branch.result)
377 })
378 || else_branch
379 .as_ref()
380 .map_or(false, |e| Self::contains_unnest(e))
381 }
382 SqlExpression::InList { expr, values } => {
383 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
384 }
385 SqlExpression::NotInList { expr, values } => {
386 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
387 }
388 SqlExpression::Between { expr, lower, upper } => {
389 Self::contains_unnest(expr)
390 || Self::contains_unnest(lower)
391 || Self::contains_unnest(upper)
392 }
393 SqlExpression::InSubquery { expr, .. } => Self::contains_unnest(expr),
394 SqlExpression::NotInSubquery { expr, .. } => Self::contains_unnest(expr),
395 SqlExpression::ScalarSubquery { .. } => false, SqlExpression::WindowFunction { args, .. } => args.iter().any(Self::contains_unnest),
397 SqlExpression::MethodCall { args, .. } => args.iter().any(Self::contains_unnest),
398 SqlExpression::ChainedMethodCall { base, args, .. } => {
399 Self::contains_unnest(base) || args.iter().any(Self::contains_unnest)
400 }
401 _ => false,
402 }
403 }
404
405 pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
407 let (view, _plan) = self.execute_with_plan(table, sql)?;
408 Ok(view)
409 }
410
411 pub fn execute_with_temp_tables(
413 &self,
414 table: Arc<DataTable>,
415 sql: &str,
416 temp_tables: Option<&TempTableRegistry>,
417 ) -> Result<DataView> {
418 let (view, _plan) = self.execute_with_plan_and_temp_tables(table, sql, temp_tables)?;
419 Ok(view)
420 }
421
422 pub fn execute_statement(
424 &self,
425 table: Arc<DataTable>,
426 statement: SelectStatement,
427 ) -> Result<DataView> {
428 self.execute_statement_with_temp_tables(table, statement, None)
429 }
430
431 pub fn execute_statement_with_temp_tables(
433 &self,
434 table: Arc<DataTable>,
435 statement: SelectStatement,
436 temp_tables: Option<&TempTableRegistry>,
437 ) -> Result<DataView> {
438 let mut cte_context = HashMap::new();
440
441 if let Some(temp_registry) = temp_tables {
443 for table_name in temp_registry.list_tables() {
444 if let Some(temp_table) = temp_registry.get(&table_name) {
445 debug!("Adding temp table {} to CTE context", table_name);
446 let view = DataView::new(temp_table);
447 cte_context.insert(table_name, Arc::new(view));
448 }
449 }
450 }
451
452 for cte in &statement.ctes {
453 debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
454 let cte_result = match &cte.cte_type {
456 CTEType::Standard(query) => {
457 let view = self.build_view_with_context(
459 table.clone(),
460 query.clone(),
461 &mut cte_context,
462 )?;
463
464 let mut materialized = self.materialize_view(view)?;
466
467 for column in materialized.columns_mut() {
469 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
470 column.source_table = Some(cte.name.clone());
471 }
472
473 DataView::new(Arc::new(materialized))
474 }
475 CTEType::Web(web_spec) => {
476 use crate::web::http_fetcher::WebDataFetcher;
478
479 let fetcher = WebDataFetcher::new()?;
480 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
482
483 for column in data_table.columns_mut() {
485 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
486 column.source_table = Some(cte.name.clone());
487 }
488
489 DataView::new(Arc::new(data_table))
491 }
492 };
493 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
495 debug!(
496 "QueryEngine: CTE '{}' pre-processed, stored in context",
497 cte.name
498 );
499 }
500
501 let mut subquery_executor =
503 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
504 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
505
506 self.build_view_with_context(table, processed_statement, &mut cte_context)
508 }
509
510 pub fn execute_statement_with_cte_context(
512 &self,
513 table: Arc<DataTable>,
514 statement: SelectStatement,
515 cte_context: &HashMap<String, Arc<DataView>>,
516 ) -> Result<DataView> {
517 let mut local_context = cte_context.clone();
519
520 for cte in &statement.ctes {
522 debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
523 let cte_result = match &cte.cte_type {
524 CTEType::Standard(query) => {
525 let view = self.build_view_with_context(
526 table.clone(),
527 query.clone(),
528 &mut local_context,
529 )?;
530
531 let mut materialized = self.materialize_view(view)?;
533
534 for column in materialized.columns_mut() {
536 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
537 column.source_table = Some(cte.name.clone());
538 }
539
540 DataView::new(Arc::new(materialized))
541 }
542 CTEType::Web(web_spec) => {
543 use crate::web::http_fetcher::WebDataFetcher;
545
546 let fetcher = WebDataFetcher::new()?;
547 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
549
550 for column in data_table.columns_mut() {
552 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
553 column.source_table = Some(cte.name.clone());
554 }
555
556 DataView::new(Arc::new(data_table))
558 }
559 };
560 local_context.insert(cte.name.clone(), Arc::new(cte_result));
561 }
562
563 let mut subquery_executor =
565 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
566 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
567
568 self.build_view_with_context(table, processed_statement, &mut local_context)
570 }
571
572 pub fn execute_with_plan(
574 &self,
575 table: Arc<DataTable>,
576 sql: &str,
577 ) -> Result<(DataView, ExecutionPlan)> {
578 self.execute_with_plan_and_temp_tables(table, sql, None)
579 }
580
581 pub fn execute_with_plan_and_temp_tables(
583 &self,
584 table: Arc<DataTable>,
585 sql: &str,
586 temp_tables: Option<&TempTableRegistry>,
587 ) -> Result<(DataView, ExecutionPlan)> {
588 let mut plan_builder = ExecutionPlanBuilder::new();
589 let start_time = Instant::now();
590
591 plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
593 plan_builder.add_detail(format!("Query: {}", sql));
594 let mut parser = Parser::new(sql);
595 let statement = parser
596 .parse()
597 .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
598 plan_builder.add_detail(format!("Parsed successfully"));
599 if let Some(from) = &statement.from_table {
600 plan_builder.add_detail(format!("FROM: {}", from));
601 }
602 if statement.where_clause.is_some() {
603 plan_builder.add_detail("WHERE clause present".to_string());
604 }
605 plan_builder.end_step();
606
607 let mut cte_context = HashMap::new();
609
610 if let Some(temp_registry) = temp_tables {
612 for table_name in temp_registry.list_tables() {
613 if let Some(temp_table) = temp_registry.get(&table_name) {
614 debug!("Adding temp table {} to CTE context", table_name);
615 let view = DataView::new(temp_table);
616 cte_context.insert(table_name, Arc::new(view));
617 }
618 }
619 }
620
621 if !statement.ctes.is_empty() {
622 plan_builder.begin_step(
623 StepType::CTE,
624 format!("Process {} CTEs", statement.ctes.len()),
625 );
626
627 for cte in &statement.ctes {
628 let cte_start = Instant::now();
629 plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
630
631 let cte_result = match &cte.cte_type {
632 CTEType::Standard(query) => {
633 if let Some(from) = &query.from_table {
635 plan_builder.add_detail(format!("Source: {}", from));
636 }
637 if query.where_clause.is_some() {
638 plan_builder.add_detail("Has WHERE clause".to_string());
639 }
640 if query.group_by.is_some() {
641 plan_builder.add_detail("Has GROUP BY".to_string());
642 }
643
644 debug!(
645 "QueryEngine: Processing CTE '{}' with existing context: {:?}",
646 cte.name,
647 cte_context.keys().collect::<Vec<_>>()
648 );
649
650 let mut subquery_executor = SubqueryExecutor::with_cte_context(
653 self.clone(),
654 table.clone(),
655 cte_context.clone(),
656 );
657 let processed_query = subquery_executor.execute_subqueries(query)?;
658
659 let view = self.build_view_with_context(
660 table.clone(),
661 processed_query,
662 &mut cte_context,
663 )?;
664
665 let mut materialized = self.materialize_view(view)?;
667
668 for column in materialized.columns_mut() {
670 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
671 column.source_table = Some(cte.name.clone());
672 }
673
674 DataView::new(Arc::new(materialized))
675 }
676 CTEType::Web(web_spec) => {
677 plan_builder.add_detail(format!("URL: {}", web_spec.url));
678 if let Some(format) = &web_spec.format {
679 plan_builder.add_detail(format!("Format: {:?}", format));
680 }
681 if let Some(cache) = web_spec.cache_seconds {
682 plan_builder.add_detail(format!("Cache: {} seconds", cache));
683 }
684
685 use crate::web::http_fetcher::WebDataFetcher;
687
688 let fetcher = WebDataFetcher::new()?;
689 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
691
692 for column in data_table.columns_mut() {
694 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
695 column.source_table = Some(cte.name.clone());
696 }
697
698 DataView::new(Arc::new(data_table))
700 }
701 };
702
703 plan_builder.set_rows_out(cte_result.row_count());
705 plan_builder.add_detail(format!(
706 "Result: {} rows, {} columns",
707 cte_result.row_count(),
708 cte_result.column_count()
709 ));
710 plan_builder.add_detail(format!(
711 "Execution time: {:.3}ms",
712 cte_start.elapsed().as_secs_f64() * 1000.0
713 ));
714
715 debug!(
716 "QueryEngine: Storing CTE '{}' in context with {} rows",
717 cte.name,
718 cte_result.row_count()
719 );
720 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
721 plan_builder.end_step();
722 }
723
724 plan_builder.add_detail(format!(
725 "All {} CTEs cached in context",
726 statement.ctes.len()
727 ));
728 plan_builder.end_step();
729 }
730
731 plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
733 let mut subquery_executor =
734 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
735
736 let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
738 format!("{:?}", w).contains("Subquery")
740 });
741
742 if has_subqueries {
743 plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
744 }
745
746 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
747
748 if has_subqueries {
749 plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
750 } else {
751 plan_builder.add_detail("No subqueries to process".to_string());
752 }
753
754 plan_builder.end_step();
755 let result = self.build_view_with_context_and_plan(
756 table,
757 processed_statement,
758 &mut cte_context,
759 &mut plan_builder,
760 )?;
761
762 let total_duration = start_time.elapsed();
763 info!(
764 "Query execution complete: total={:?}, rows={}",
765 total_duration,
766 result.row_count()
767 );
768
769 let plan = plan_builder.build();
770 Ok((result, plan))
771 }
772
773 fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
775 let mut cte_context = HashMap::new();
776 self.build_view_with_context(table, statement, &mut cte_context)
777 }
778
779 fn build_view_with_context(
781 &self,
782 table: Arc<DataTable>,
783 statement: SelectStatement,
784 cte_context: &mut HashMap<String, Arc<DataView>>,
785 ) -> Result<DataView> {
786 let mut dummy_plan = ExecutionPlanBuilder::new();
787 let mut exec_context = ExecutionContext::new();
788 self.build_view_with_context_and_plan_and_exec(
789 table,
790 statement,
791 cte_context,
792 &mut dummy_plan,
793 &mut exec_context,
794 )
795 }
796
797 fn build_view_with_context_and_plan(
799 &self,
800 table: Arc<DataTable>,
801 statement: SelectStatement,
802 cte_context: &mut HashMap<String, Arc<DataView>>,
803 plan: &mut ExecutionPlanBuilder,
804 ) -> Result<DataView> {
805 let mut exec_context = ExecutionContext::new();
806 self.build_view_with_context_and_plan_and_exec(
807 table,
808 statement,
809 cte_context,
810 plan,
811 &mut exec_context,
812 )
813 }
814
815 fn build_view_with_context_and_plan_and_exec(
817 &self,
818 table: Arc<DataTable>,
819 statement: SelectStatement,
820 cte_context: &mut HashMap<String, Arc<DataView>>,
821 plan: &mut ExecutionPlanBuilder,
822 exec_context: &mut ExecutionContext,
823 ) -> Result<DataView> {
824 for cte in &statement.ctes {
826 if cte_context.contains_key(&cte.name) {
828 debug!(
829 "QueryEngine: CTE '{}' already in context, skipping",
830 cte.name
831 );
832 continue;
833 }
834
835 debug!("QueryEngine: Processing CTE '{}'...", cte.name);
836 debug!(
837 "QueryEngine: Available CTEs for '{}': {:?}",
838 cte.name,
839 cte_context.keys().collect::<Vec<_>>()
840 );
841
842 let cte_result = match &cte.cte_type {
844 CTEType::Standard(query) => {
845 let view =
846 self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
847
848 let mut materialized = self.materialize_view(view)?;
850
851 for column in materialized.columns_mut() {
853 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
854 column.source_table = Some(cte.name.clone());
855 }
856
857 DataView::new(Arc::new(materialized))
858 }
859 CTEType::Web(_web_spec) => {
860 return Err(anyhow!(
862 "Web CTEs should be processed in execute_select method"
863 ));
864 }
865 };
866
867 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
869 debug!(
870 "QueryEngine: CTE '{}' processed, stored in context",
871 cte.name
872 );
873 }
874
875 let source_table = if let Some(ref table_func) = statement.from_function {
877 debug!("QueryEngine: Processing table function...");
879 match table_func {
880 TableFunction::Generator { name, args } => {
881 use crate::sql::generators::GeneratorRegistry;
883
884 let registry = GeneratorRegistry::new();
886
887 if let Some(generator) = registry.get(name) {
888 let mut evaluator = ArithmeticEvaluator::with_date_notation(
890 &table,
891 self.date_notation.clone(),
892 );
893 let dummy_row = 0;
894
895 let mut evaluated_args = Vec::new();
896 for arg in args {
897 evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
898 }
899
900 generator.generate(evaluated_args)?
902 } else {
903 return Err(anyhow!("Unknown generator function: {}", name));
904 }
905 }
906 }
907 } else if let Some(ref subquery) = statement.from_subquery {
908 debug!("QueryEngine: Processing FROM subquery...");
910 let subquery_result =
911 self.build_view_with_context(table.clone(), *subquery.clone(), cte_context)?;
912
913 let materialized = self.materialize_view(subquery_result)?;
916 Arc::new(materialized)
917 } else if let Some(ref table_name) = statement.from_table {
918 if let Some(cte_view) = cte_context.get(table_name) {
920 debug!("QueryEngine: Using CTE '{}' as source table", table_name);
921 let mut materialized = self.materialize_view((**cte_view).clone())?;
923
924 if let Some(ref alias) = statement.from_alias {
926 debug!(
927 "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
928 alias, table_name
929 );
930 for column in materialized.columns_mut() {
931 if let Some(ref qualified_name) = column.qualified_name {
933 if qualified_name.starts_with(&format!("{}.", table_name)) {
934 column.qualified_name =
935 Some(qualified_name.replace(
936 &format!("{}.", table_name),
937 &format!("{}.", alias),
938 ));
939 }
940 }
941 if column.source_table.as_ref() == Some(table_name) {
943 column.source_table = Some(alias.clone());
944 }
945 }
946 }
947
948 Arc::new(materialized)
949 } else {
950 table.clone()
952 }
953 } else {
954 table.clone()
956 };
957
958 if let Some(ref alias) = statement.from_alias {
960 if let Some(ref table_name) = statement.from_table {
961 exec_context.register_alias(alias.clone(), table_name.clone());
962 }
963 }
964
965 let final_table = if !statement.joins.is_empty() {
967 plan.begin_step(
968 StepType::Join,
969 format!("Process {} JOINs", statement.joins.len()),
970 );
971 plan.set_rows_in(source_table.row_count());
972
973 let join_executor = HashJoinExecutor::new(self.case_insensitive);
974 let mut current_table = source_table;
975
976 for (idx, join_clause) in statement.joins.iter().enumerate() {
977 let join_start = Instant::now();
978 plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
979 plan.add_detail(format!("Type: {:?}", join_clause.join_type));
980 plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
981 plan.add_detail(format!(
982 "Executing {:?} JOIN on {} condition(s)",
983 join_clause.join_type,
984 join_clause.condition.conditions.len()
985 ));
986
987 let right_table = match &join_clause.table {
989 TableSource::Table(name) => {
990 if let Some(cte_view) = cte_context.get(name) {
992 let mut materialized = self.materialize_view((**cte_view).clone())?;
993
994 if let Some(ref alias) = join_clause.alias {
996 debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
997 for column in materialized.columns_mut() {
998 if let Some(ref qualified_name) = column.qualified_name {
1000 if qualified_name.starts_with(&format!("{}.", name)) {
1001 column.qualified_name = Some(qualified_name.replace(
1002 &format!("{}.", name),
1003 &format!("{}.", alias),
1004 ));
1005 }
1006 }
1007 if column.source_table.as_ref() == Some(name) {
1009 column.source_table = Some(alias.clone());
1010 }
1011 }
1012 }
1013
1014 Arc::new(materialized)
1015 } else {
1016 return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
1019 }
1020 }
1021 TableSource::DerivedTable { query, alias: _ } => {
1022 let subquery_result = self.build_view_with_context(
1024 table.clone(),
1025 *query.clone(),
1026 cte_context,
1027 )?;
1028 let materialized = self.materialize_view(subquery_result)?;
1029 Arc::new(materialized)
1030 }
1031 };
1032
1033 let joined = join_executor.execute_join(
1035 current_table.clone(),
1036 join_clause,
1037 right_table.clone(),
1038 )?;
1039
1040 plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
1041 plan.set_rows_out(joined.row_count());
1042 plan.add_detail(format!("Result: {} rows", joined.row_count()));
1043 plan.add_detail(format!(
1044 "Join time: {:.3}ms",
1045 join_start.elapsed().as_secs_f64() * 1000.0
1046 ));
1047 plan.end_step();
1048
1049 current_table = Arc::new(joined);
1050 }
1051
1052 plan.set_rows_out(current_table.row_count());
1053 plan.add_detail(format!(
1054 "Final result after all joins: {} rows",
1055 current_table.row_count()
1056 ));
1057 plan.end_step();
1058 current_table
1059 } else {
1060 source_table
1061 };
1062
1063 self.build_view_internal_with_plan_and_exec(
1065 final_table,
1066 statement,
1067 plan,
1068 Some(exec_context),
1069 )
1070 }
1071
1072 pub fn materialize_view(&self, view: DataView) -> Result<DataTable> {
1074 let source = view.source();
1075 let mut result_table = DataTable::new("derived");
1076
1077 let visible_cols = view.visible_column_indices().to_vec();
1079
1080 for col_idx in &visible_cols {
1082 let col = &source.columns[*col_idx];
1083 let new_col = DataColumn {
1084 name: col.name.clone(),
1085 data_type: col.data_type.clone(),
1086 nullable: col.nullable,
1087 unique_values: col.unique_values,
1088 null_count: col.null_count,
1089 metadata: col.metadata.clone(),
1090 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), };
1093 result_table.add_column(new_col);
1094 }
1095
1096 for row_idx in view.visible_row_indices() {
1098 let source_row = &source.rows[*row_idx];
1099 let mut new_row = DataRow { values: Vec::new() };
1100
1101 for col_idx in &visible_cols {
1102 new_row.values.push(source_row.values[*col_idx].clone());
1103 }
1104
1105 result_table.add_row(new_row);
1106 }
1107
1108 Ok(result_table)
1109 }
1110
1111 fn build_view_internal(
1112 &self,
1113 table: Arc<DataTable>,
1114 statement: SelectStatement,
1115 ) -> Result<DataView> {
1116 let mut dummy_plan = ExecutionPlanBuilder::new();
1117 self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
1118 }
1119
1120 fn build_view_internal_with_plan(
1121 &self,
1122 table: Arc<DataTable>,
1123 statement: SelectStatement,
1124 plan: &mut ExecutionPlanBuilder,
1125 ) -> Result<DataView> {
1126 self.build_view_internal_with_plan_and_exec(table, statement, plan, None)
1127 }
1128
1129 fn build_view_internal_with_plan_and_exec(
1130 &self,
1131 table: Arc<DataTable>,
1132 statement: SelectStatement,
1133 plan: &mut ExecutionPlanBuilder,
1134 exec_context: Option<&ExecutionContext>,
1135 ) -> Result<DataView> {
1136 debug!(
1137 "QueryEngine::build_view - select_items: {:?}",
1138 statement.select_items
1139 );
1140 debug!(
1141 "QueryEngine::build_view - where_clause: {:?}",
1142 statement.where_clause
1143 );
1144
1145 let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
1147
1148 if let Some(where_clause) = &statement.where_clause {
1150 let total_rows = table.row_count();
1151 debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
1152 debug!("QueryEngine: WHERE clause = {:?}", where_clause);
1153
1154 plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
1155 plan.set_rows_in(total_rows);
1156 plan.add_detail(format!("Input: {} rows", total_rows));
1157
1158 for condition in &where_clause.conditions {
1160 plan.add_detail(format!("Condition: {:?}", condition.expr));
1161 }
1162
1163 let filter_start = Instant::now();
1164 let mut eval_context = EvaluationContext::new(self.case_insensitive);
1166
1167 let mut evaluator = if let Some(exec_ctx) = exec_context {
1169 RecursiveWhereEvaluator::with_both_contexts(&table, &mut eval_context, exec_ctx)
1171 } else {
1172 RecursiveWhereEvaluator::with_context(&table, &mut eval_context)
1173 };
1174
1175 let mut filtered_rows = Vec::new();
1177 for row_idx in visible_rows {
1178 if row_idx < 3 {
1180 debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
1181 }
1182
1183 match evaluator.evaluate(where_clause, row_idx) {
1184 Ok(result) => {
1185 if row_idx < 3 {
1186 debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
1187 }
1188 if result {
1189 filtered_rows.push(row_idx);
1190 }
1191 }
1192 Err(e) => {
1193 if row_idx < 3 {
1194 debug!(
1195 "QueryEngine: WHERE evaluation error for row {}: {}",
1196 row_idx, e
1197 );
1198 }
1199 return Err(e);
1201 }
1202 }
1203 }
1204
1205 let (compilations, cache_hits) = eval_context.get_stats();
1207 if compilations > 0 || cache_hits > 0 {
1208 debug!(
1209 "LIKE pattern cache: {} compilations, {} cache hits",
1210 compilations, cache_hits
1211 );
1212 }
1213 visible_rows = filtered_rows;
1214 let filter_duration = filter_start.elapsed();
1215 info!(
1216 "WHERE clause filtering: {} rows -> {} rows in {:?}",
1217 total_rows,
1218 visible_rows.len(),
1219 filter_duration
1220 );
1221
1222 plan.set_rows_out(visible_rows.len());
1223 plan.add_detail(format!("Output: {} rows", visible_rows.len()));
1224 plan.add_detail(format!(
1225 "Filter time: {:.3}ms",
1226 filter_duration.as_secs_f64() * 1000.0
1227 ));
1228 plan.end_step();
1229 }
1230
1231 let mut view = DataView::new(table.clone());
1233 view = view.with_rows(visible_rows);
1234
1235 if let Some(group_by_exprs) = &statement.group_by {
1237 if !group_by_exprs.is_empty() {
1238 debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
1239
1240 plan.begin_step(
1241 StepType::GroupBy,
1242 format!("GROUP BY {} expressions", group_by_exprs.len()),
1243 );
1244 plan.set_rows_in(view.row_count());
1245 plan.add_detail(format!("Input: {} rows", view.row_count()));
1246 for expr in group_by_exprs {
1247 plan.add_detail(format!("Group by: {:?}", expr));
1248 }
1249
1250 let group_start = Instant::now();
1251 view = self.apply_group_by(
1252 view,
1253 group_by_exprs,
1254 &statement.select_items,
1255 statement.having.as_ref(),
1256 plan,
1257 )?;
1258
1259 plan.set_rows_out(view.row_count());
1260 plan.add_detail(format!("Output: {} groups", view.row_count()));
1261 plan.add_detail(format!(
1262 "Overall time: {:.3}ms",
1263 group_start.elapsed().as_secs_f64() * 1000.0
1264 ));
1265 plan.end_step();
1266 }
1267 } else {
1268 if !statement.select_items.is_empty() {
1270 let has_non_star_items = statement
1272 .select_items
1273 .iter()
1274 .any(|item| !matches!(item, SelectItem::Star { .. }));
1275
1276 if has_non_star_items || statement.select_items.len() > 1 {
1280 view = self.apply_select_items(
1281 view,
1282 &statement.select_items,
1283 &statement,
1284 exec_context,
1285 )?;
1286 }
1287 } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
1289 debug!("QueryEngine: Using legacy columns path");
1290 let source_table = view.source();
1293 let column_indices =
1294 self.resolve_column_indices(source_table, &statement.columns)?;
1295 view = view.with_columns(column_indices);
1296 }
1297 }
1298
1299 if statement.distinct {
1301 plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
1302 plan.set_rows_in(view.row_count());
1303 plan.add_detail(format!("Input: {} rows", view.row_count()));
1304
1305 let distinct_start = Instant::now();
1306 view = self.apply_distinct(view)?;
1307
1308 plan.set_rows_out(view.row_count());
1309 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1310 plan.add_detail(format!(
1311 "Distinct time: {:.3}ms",
1312 distinct_start.elapsed().as_secs_f64() * 1000.0
1313 ));
1314 plan.end_step();
1315 }
1316
1317 if let Some(order_by_columns) = &statement.order_by {
1319 if !order_by_columns.is_empty() {
1320 plan.begin_step(
1321 StepType::Sort,
1322 format!("ORDER BY {} columns", order_by_columns.len()),
1323 );
1324 plan.set_rows_in(view.row_count());
1325 for col in order_by_columns {
1326 let expr_str = match &col.expr {
1328 SqlExpression::Column(col_ref) => col_ref.name.clone(),
1329 _ => "expr".to_string(),
1330 };
1331 plan.add_detail(format!("{} {:?}", expr_str, col.direction));
1332 }
1333
1334 let sort_start = Instant::now();
1335 view =
1336 self.apply_multi_order_by_with_context(view, order_by_columns, exec_context)?;
1337
1338 plan.add_detail(format!(
1339 "Sort time: {:.3}ms",
1340 sort_start.elapsed().as_secs_f64() * 1000.0
1341 ));
1342 plan.end_step();
1343 }
1344 }
1345
1346 if let Some(limit) = statement.limit {
1348 let offset = statement.offset.unwrap_or(0);
1349 plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1350 plan.set_rows_in(view.row_count());
1351 if offset > 0 {
1352 plan.add_detail(format!("OFFSET: {}", offset));
1353 }
1354 view = view.with_limit(limit, offset);
1355 plan.set_rows_out(view.row_count());
1356 plan.add_detail(format!("Output: {} rows", view.row_count()));
1357 plan.end_step();
1358 }
1359
1360 if !statement.set_operations.is_empty() {
1362 plan.begin_step(
1363 StepType::SetOperation,
1364 format!("Process {} set operations", statement.set_operations.len()),
1365 );
1366 plan.set_rows_in(view.row_count());
1367
1368 let mut combined_table = self.materialize_view(view)?;
1370 let first_columns = combined_table.column_names();
1371 let first_column_count = first_columns.len();
1372
1373 let mut needs_deduplication = false;
1375
1376 for (idx, (operation, next_statement)) in statement.set_operations.iter().enumerate() {
1378 let op_start = Instant::now();
1379 plan.begin_step(
1380 StepType::SetOperation,
1381 format!("{:?} operation #{}", operation, idx + 1),
1382 );
1383
1384 let next_view = if let Some(exec_ctx) = exec_context {
1387 self.build_view_internal_with_plan_and_exec(
1388 table.clone(),
1389 *next_statement.clone(),
1390 plan,
1391 Some(exec_ctx),
1392 )?
1393 } else {
1394 self.build_view_internal_with_plan(
1395 table.clone(),
1396 *next_statement.clone(),
1397 plan,
1398 )?
1399 };
1400
1401 let next_table = self.materialize_view(next_view)?;
1403 let next_columns = next_table.column_names();
1404 let next_column_count = next_columns.len();
1405
1406 if first_column_count != next_column_count {
1408 return Err(anyhow!(
1409 "UNION queries must have the same number of columns: first query has {} columns, but query #{} has {} columns",
1410 first_column_count,
1411 idx + 2,
1412 next_column_count
1413 ));
1414 }
1415
1416 for (col_idx, (first_col, next_col)) in
1418 first_columns.iter().zip(next_columns.iter()).enumerate()
1419 {
1420 if !first_col.eq_ignore_ascii_case(next_col) {
1421 debug!(
1422 "UNION column name mismatch at position {}: '{}' vs '{}' (using first query's name)",
1423 col_idx + 1,
1424 first_col,
1425 next_col
1426 );
1427 }
1428 }
1429
1430 plan.add_detail(format!("Left: {} rows", combined_table.row_count()));
1431 plan.add_detail(format!("Right: {} rows", next_table.row_count()));
1432
1433 match operation {
1435 SetOperation::UnionAll => {
1436 for row in next_table.rows.iter() {
1438 combined_table.add_row(row.clone());
1439 }
1440 plan.add_detail(format!(
1441 "Result: {} rows (no deduplication)",
1442 combined_table.row_count()
1443 ));
1444 }
1445 SetOperation::Union => {
1446 for row in next_table.rows.iter() {
1448 combined_table.add_row(row.clone());
1449 }
1450 needs_deduplication = true;
1451 plan.add_detail(format!(
1452 "Combined: {} rows (deduplication pending)",
1453 combined_table.row_count()
1454 ));
1455 }
1456 SetOperation::Intersect => {
1457 return Err(anyhow!("INTERSECT is not yet implemented"));
1460 }
1461 SetOperation::Except => {
1462 return Err(anyhow!("EXCEPT is not yet implemented"));
1465 }
1466 }
1467
1468 plan.add_detail(format!(
1469 "Operation time: {:.3}ms",
1470 op_start.elapsed().as_secs_f64() * 1000.0
1471 ));
1472 plan.set_rows_out(combined_table.row_count());
1473 plan.end_step();
1474 }
1475
1476 plan.set_rows_out(combined_table.row_count());
1477 plan.add_detail(format!(
1478 "Combined result: {} rows after {} operations",
1479 combined_table.row_count(),
1480 statement.set_operations.len()
1481 ));
1482 plan.end_step();
1483
1484 view = DataView::new(Arc::new(combined_table));
1486
1487 if needs_deduplication {
1489 plan.begin_step(
1490 StepType::Distinct,
1491 "UNION deduplication - remove duplicate rows".to_string(),
1492 );
1493 plan.set_rows_in(view.row_count());
1494 plan.add_detail(format!("Input: {} rows", view.row_count()));
1495
1496 let distinct_start = Instant::now();
1497 view = self.apply_distinct(view)?;
1498
1499 plan.set_rows_out(view.row_count());
1500 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1501 plan.add_detail(format!(
1502 "Deduplication time: {:.3}ms",
1503 distinct_start.elapsed().as_secs_f64() * 1000.0
1504 ));
1505 plan.end_step();
1506 }
1507 }
1508
1509 Ok(view)
1510 }
1511
1512 fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1514 let mut indices = Vec::new();
1515 let table_columns = table.column_names();
1516
1517 for col_name in columns {
1518 let index = table_columns
1519 .iter()
1520 .position(|c| c.eq_ignore_ascii_case(col_name))
1521 .ok_or_else(|| {
1522 let suggestion = self.find_similar_column(table, col_name);
1523 match suggestion {
1524 Some(similar) => anyhow::anyhow!(
1525 "Column '{}' not found. Did you mean '{}'?",
1526 col_name,
1527 similar
1528 ),
1529 None => anyhow::anyhow!("Column '{}' not found", col_name),
1530 }
1531 })?;
1532 indices.push(index);
1533 }
1534
1535 Ok(indices)
1536 }
1537
1538 fn apply_select_items(
1540 &self,
1541 view: DataView,
1542 select_items: &[SelectItem],
1543 _statement: &SelectStatement,
1544 exec_context: Option<&ExecutionContext>,
1545 ) -> Result<DataView> {
1546 debug!(
1547 "QueryEngine::apply_select_items - items: {:?}",
1548 select_items
1549 );
1550 debug!(
1551 "QueryEngine::apply_select_items - input view has {} rows",
1552 view.row_count()
1553 );
1554
1555 let has_unnest = select_items.iter().any(|item| match item {
1557 SelectItem::Expression { expr, .. } => Self::contains_unnest(expr),
1558 _ => false,
1559 });
1560
1561 if has_unnest {
1562 debug!("QueryEngine::apply_select_items - UNNEST detected, using row expansion");
1563 return self.apply_select_with_row_expansion(view, select_items);
1564 }
1565
1566 let has_aggregates = select_items.iter().any(|item| match item {
1570 SelectItem::Expression { expr, .. } => contains_aggregate(expr),
1571 SelectItem::Column { .. } => false,
1572 SelectItem::Star { .. } => false,
1573 SelectItem::StarExclude { .. } => false,
1574 });
1575
1576 let all_aggregate_compatible = select_items.iter().all(|item| match item {
1577 SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
1578 SelectItem::Column { .. } => false, SelectItem::Star { .. } => false, SelectItem::StarExclude { .. } => false, });
1582
1583 if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
1584 debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
1587 return self.apply_aggregate_select(view, select_items);
1588 }
1589
1590 let has_computed_expressions = select_items
1592 .iter()
1593 .any(|item| matches!(item, SelectItem::Expression { .. }));
1594
1595 debug!(
1596 "QueryEngine::apply_select_items - has_computed_expressions: {}",
1597 has_computed_expressions
1598 );
1599
1600 if !has_computed_expressions {
1601 let column_indices = self.resolve_select_columns(view.source(), select_items)?;
1603 return Ok(view.with_columns(column_indices));
1604 }
1605
1606 let source_table = view.source();
1611 let visible_rows = view.visible_row_indices();
1612
1613 let mut computed_table = DataTable::new("query_result");
1616
1617 let mut expanded_items = Vec::new();
1619 for item in select_items {
1620 match item {
1621 SelectItem::Star { table_prefix, .. } => {
1622 if let Some(prefix) = table_prefix {
1623 debug!("QueryEngine::apply_select_items - expanding {}.*", prefix);
1625 for col in &source_table.columns {
1626 if Self::column_matches_table(col, prefix) {
1627 expanded_items.push(SelectItem::Column {
1628 column: ColumnRef::unquoted(col.name.clone()),
1629 leading_comments: vec![],
1630 trailing_comment: None,
1631 });
1632 }
1633 }
1634 } else {
1635 debug!("QueryEngine::apply_select_items - expanding *");
1637 for col_name in source_table.column_names() {
1638 expanded_items.push(SelectItem::Column {
1639 column: ColumnRef::unquoted(col_name.to_string()),
1640 leading_comments: vec![],
1641 trailing_comment: None,
1642 });
1643 }
1644 }
1645 }
1646 _ => expanded_items.push(item.clone()),
1647 }
1648 }
1649
1650 let mut column_name_counts: std::collections::HashMap<String, usize> =
1652 std::collections::HashMap::new();
1653
1654 for item in &expanded_items {
1655 let base_name = match item {
1656 SelectItem::Column {
1657 column: col_ref, ..
1658 } => col_ref.name.clone(),
1659 SelectItem::Expression { alias, .. } => alias.clone(),
1660 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
1661 SelectItem::StarExclude { .. } => {
1662 unreachable!("StarExclude should have been expanded")
1663 }
1664 };
1665
1666 let count = column_name_counts.entry(base_name.clone()).or_insert(0);
1668 let column_name = if *count == 0 {
1669 base_name.clone()
1671 } else {
1672 format!("{base_name}_{count}")
1674 };
1675 *count += 1;
1676
1677 computed_table.add_column(DataColumn::new(&column_name));
1678 }
1679
1680 let mut evaluator =
1682 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1683
1684 if let Some(exec_ctx) = exec_context {
1686 let aliases = exec_ctx.get_aliases();
1687 if !aliases.is_empty() {
1688 debug!(
1689 "Applying {} aliases to evaluator: {:?}",
1690 aliases.len(),
1691 aliases
1692 );
1693 evaluator = evaluator.with_table_aliases(aliases);
1694 }
1695 }
1696
1697 for &row_idx in visible_rows {
1698 let mut row_values = Vec::new();
1699
1700 for item in &expanded_items {
1701 let value = match item {
1702 SelectItem::Column {
1703 column: col_ref, ..
1704 } => {
1705 match evaluator.evaluate(&SqlExpression::Column(col_ref.clone()), row_idx) {
1707 Ok(val) => val,
1708 Err(e) => {
1709 return Err(anyhow!(
1710 "Failed to evaluate column {}: {}",
1711 col_ref.to_sql(),
1712 e
1713 ));
1714 }
1715 }
1716 }
1717 SelectItem::Expression { expr, .. } => {
1718 evaluator.evaluate(&expr, row_idx)?
1720 }
1721 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
1722 SelectItem::StarExclude { .. } => {
1723 unreachable!("StarExclude should have been expanded")
1724 }
1725 };
1726 row_values.push(value);
1727 }
1728
1729 computed_table
1730 .add_row(DataRow::new(row_values))
1731 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
1732 }
1733
1734 Ok(DataView::new(Arc::new(computed_table)))
1737 }
1738
1739 fn apply_select_with_row_expansion(
1741 &self,
1742 view: DataView,
1743 select_items: &[SelectItem],
1744 ) -> Result<DataView> {
1745 debug!("QueryEngine::apply_select_with_row_expansion - expanding rows");
1746
1747 let source_table = view.source();
1748 let visible_rows = view.visible_row_indices();
1749 let expander_registry = RowExpanderRegistry::new();
1750
1751 let mut result_table = DataTable::new("unnest_result");
1753
1754 let mut expanded_items = Vec::new();
1756 for item in select_items {
1757 match item {
1758 SelectItem::Star { table_prefix, .. } => {
1759 if let Some(prefix) = table_prefix {
1760 debug!(
1762 "QueryEngine::apply_select_with_row_expansion - expanding {}.*",
1763 prefix
1764 );
1765 for col in &source_table.columns {
1766 if Self::column_matches_table(col, prefix) {
1767 expanded_items.push(SelectItem::Column {
1768 column: ColumnRef::unquoted(col.name.clone()),
1769 leading_comments: vec![],
1770 trailing_comment: None,
1771 });
1772 }
1773 }
1774 } else {
1775 debug!("QueryEngine::apply_select_with_row_expansion - expanding *");
1777 for col_name in source_table.column_names() {
1778 expanded_items.push(SelectItem::Column {
1779 column: ColumnRef::unquoted(col_name.to_string()),
1780 leading_comments: vec![],
1781 trailing_comment: None,
1782 });
1783 }
1784 }
1785 }
1786 _ => expanded_items.push(item.clone()),
1787 }
1788 }
1789
1790 for item in &expanded_items {
1792 let column_name = match item {
1793 SelectItem::Column {
1794 column: col_ref, ..
1795 } => col_ref.name.clone(),
1796 SelectItem::Expression { alias, .. } => alias.clone(),
1797 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
1798 SelectItem::StarExclude { .. } => {
1799 unreachable!("StarExclude should have been expanded")
1800 }
1801 };
1802 result_table.add_column(DataColumn::new(&column_name));
1803 }
1804
1805 let mut evaluator =
1807 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1808
1809 for &row_idx in visible_rows {
1810 let mut unnest_expansions = Vec::new();
1812 let mut unnest_indices = Vec::new();
1813
1814 for (col_idx, item) in expanded_items.iter().enumerate() {
1815 if let SelectItem::Expression { expr, .. } = item {
1816 if let Some(expansion_result) = self.try_expand_unnest(
1817 &expr,
1818 source_table,
1819 row_idx,
1820 &mut evaluator,
1821 &expander_registry,
1822 )? {
1823 unnest_expansions.push(expansion_result);
1824 unnest_indices.push(col_idx);
1825 }
1826 }
1827 }
1828
1829 let expansion_count = if unnest_expansions.is_empty() {
1831 1 } else {
1833 unnest_expansions
1834 .iter()
1835 .map(|exp| exp.row_count())
1836 .max()
1837 .unwrap_or(1)
1838 };
1839
1840 for output_idx in 0..expansion_count {
1842 let mut row_values = Vec::new();
1843
1844 for (col_idx, item) in expanded_items.iter().enumerate() {
1845 let unnest_position = unnest_indices.iter().position(|&idx| idx == col_idx);
1847
1848 let value = if let Some(unnest_idx) = unnest_position {
1849 let expansion = &unnest_expansions[unnest_idx];
1851 expansion
1852 .values
1853 .get(output_idx)
1854 .cloned()
1855 .unwrap_or(DataValue::Null)
1856 } else {
1857 match item {
1859 SelectItem::Column {
1860 column: col_ref, ..
1861 } => {
1862 let col_idx =
1863 source_table.get_column_index(&col_ref.name).ok_or_else(
1864 || anyhow::anyhow!("Column '{}' not found", col_ref.name),
1865 )?;
1866 let row = source_table
1867 .get_row(row_idx)
1868 .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
1869 row.get(col_idx)
1870 .ok_or_else(|| {
1871 anyhow::anyhow!("Column {} not found in row", col_idx)
1872 })?
1873 .clone()
1874 }
1875 SelectItem::Expression { expr, .. } => {
1876 evaluator.evaluate(&expr, row_idx)?
1878 }
1879 SelectItem::Star { .. } => unreachable!(),
1880 SelectItem::StarExclude { .. } => {
1881 unreachable!("StarExclude should have been expanded")
1882 }
1883 }
1884 };
1885
1886 row_values.push(value);
1887 }
1888
1889 result_table
1890 .add_row(DataRow::new(row_values))
1891 .map_err(|e| anyhow::anyhow!("Failed to add expanded row: {}", e))?;
1892 }
1893 }
1894
1895 debug!(
1896 "QueryEngine::apply_select_with_row_expansion - input rows: {}, output rows: {}",
1897 visible_rows.len(),
1898 result_table.row_count()
1899 );
1900
1901 Ok(DataView::new(Arc::new(result_table)))
1902 }
1903
1904 fn try_expand_unnest(
1907 &self,
1908 expr: &SqlExpression,
1909 _source_table: &DataTable,
1910 row_idx: usize,
1911 evaluator: &mut ArithmeticEvaluator,
1912 expander_registry: &RowExpanderRegistry,
1913 ) -> Result<Option<crate::data::row_expanders::ExpansionResult>> {
1914 if let SqlExpression::Unnest { column, delimiter } = expr {
1916 let column_value = evaluator.evaluate(column, row_idx)?;
1918
1919 let delimiter_value = DataValue::String(delimiter.clone());
1921
1922 let expander = expander_registry
1924 .get("UNNEST")
1925 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
1926
1927 let expansion = expander.expand(&column_value, &[delimiter_value])?;
1929 return Ok(Some(expansion));
1930 }
1931
1932 if let SqlExpression::FunctionCall { name, args, .. } = expr {
1934 if name.to_uppercase() == "UNNEST" {
1935 if args.len() != 2 {
1937 return Err(anyhow::anyhow!(
1938 "UNNEST requires exactly 2 arguments: UNNEST(column, delimiter)"
1939 ));
1940 }
1941
1942 let column_value = evaluator.evaluate(&args[0], row_idx)?;
1944
1945 let delimiter_value = evaluator.evaluate(&args[1], row_idx)?;
1947
1948 let expander = expander_registry
1950 .get("UNNEST")
1951 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
1952
1953 let expansion = expander.expand(&column_value, &[delimiter_value])?;
1955 return Ok(Some(expansion));
1956 }
1957 }
1958
1959 Ok(None)
1960 }
1961
1962 fn apply_aggregate_select(
1964 &self,
1965 view: DataView,
1966 select_items: &[SelectItem],
1967 ) -> Result<DataView> {
1968 debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
1969
1970 let source_table = view.source();
1971 let mut result_table = DataTable::new("aggregate_result");
1972
1973 for item in select_items {
1975 let column_name = match item {
1976 SelectItem::Expression { alias, .. } => alias.clone(),
1977 _ => unreachable!("Should only have expressions in aggregate-only query"),
1978 };
1979 result_table.add_column(DataColumn::new(&column_name));
1980 }
1981
1982 let visible_rows = view.visible_row_indices().to_vec();
1984 let mut evaluator =
1985 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
1986 .with_visible_rows(visible_rows);
1987
1988 let mut row_values = Vec::new();
1990 for item in select_items {
1991 match item {
1992 SelectItem::Expression { expr, .. } => {
1993 let value = evaluator.evaluate(expr, 0)?;
1996 row_values.push(value);
1997 }
1998 _ => unreachable!("Should only have expressions in aggregate-only query"),
1999 }
2000 }
2001
2002 result_table
2004 .add_row(DataRow::new(row_values))
2005 .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
2006
2007 Ok(DataView::new(Arc::new(result_table)))
2008 }
2009
2010 fn column_matches_table(col: &DataColumn, table_name: &str) -> bool {
2022 if let Some(ref source) = col.source_table {
2024 if source == table_name || source.ends_with(&format!(".{}", table_name)) {
2026 return true;
2027 }
2028 }
2029
2030 if let Some(ref qualified) = col.qualified_name {
2032 if qualified.starts_with(&format!("{}.", table_name)) {
2034 return true;
2035 }
2036 }
2037
2038 false
2039 }
2040
2041 fn resolve_select_columns(
2043 &self,
2044 table: &DataTable,
2045 select_items: &[SelectItem],
2046 ) -> Result<Vec<usize>> {
2047 let mut indices = Vec::new();
2048 let table_columns = table.column_names();
2049
2050 for item in select_items {
2051 match item {
2052 SelectItem::Column {
2053 column: col_ref, ..
2054 } => {
2055 let index = if let Some(table_prefix) = &col_ref.table_prefix {
2057 let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
2059 table.find_column_by_qualified_name(&qualified_name)
2060 .ok_or_else(|| {
2061 let has_qualified = table.columns.iter()
2063 .any(|c| c.qualified_name.is_some());
2064 if !has_qualified {
2065 anyhow::anyhow!(
2066 "Column '{}' not found. Note: Table '{}' may not support qualified column names",
2067 qualified_name, table_prefix
2068 )
2069 } else {
2070 anyhow::anyhow!("Column '{}' not found", qualified_name)
2071 }
2072 })?
2073 } else {
2074 table_columns
2076 .iter()
2077 .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
2078 .ok_or_else(|| {
2079 let suggestion = self.find_similar_column(table, &col_ref.name);
2080 match suggestion {
2081 Some(similar) => anyhow::anyhow!(
2082 "Column '{}' not found. Did you mean '{}'?",
2083 col_ref.name,
2084 similar
2085 ),
2086 None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
2087 }
2088 })?
2089 };
2090 indices.push(index);
2091 }
2092 SelectItem::Star { table_prefix, .. } => {
2093 if let Some(prefix) = table_prefix {
2094 for (i, col) in table.columns.iter().enumerate() {
2096 if Self::column_matches_table(col, prefix) {
2097 indices.push(i);
2098 }
2099 }
2100 } else {
2101 for i in 0..table_columns.len() {
2103 indices.push(i);
2104 }
2105 }
2106 }
2107 SelectItem::StarExclude {
2108 table_prefix,
2109 excluded_columns,
2110 ..
2111 } => {
2112 if let Some(prefix) = table_prefix {
2114 for (i, col) in table.columns.iter().enumerate() {
2116 if Self::column_matches_table(col, prefix)
2117 && !excluded_columns.contains(&col.name)
2118 {
2119 indices.push(i);
2120 }
2121 }
2122 } else {
2123 for (i, col_name) in table_columns.iter().enumerate() {
2125 if !excluded_columns
2126 .iter()
2127 .any(|exc| exc.eq_ignore_ascii_case(col_name))
2128 {
2129 indices.push(i);
2130 }
2131 }
2132 }
2133 }
2134 SelectItem::Expression { .. } => {
2135 return Err(anyhow::anyhow!(
2136 "Computed expressions require new table creation"
2137 ));
2138 }
2139 }
2140 }
2141
2142 Ok(indices)
2143 }
2144
2145 fn apply_distinct(&self, view: DataView) -> Result<DataView> {
2147 use std::collections::HashSet;
2148
2149 let source = view.source();
2150 let visible_cols = view.visible_column_indices();
2151 let visible_rows = view.visible_row_indices();
2152
2153 let mut seen_rows = HashSet::new();
2155 let mut unique_row_indices = Vec::new();
2156
2157 for &row_idx in visible_rows {
2158 let mut row_key = Vec::new();
2160 for &col_idx in visible_cols {
2161 let value = source
2162 .get_value(row_idx, col_idx)
2163 .ok_or_else(|| anyhow!("Invalid cell reference"))?;
2164 row_key.push(format!("{:?}", value));
2166 }
2167
2168 if seen_rows.insert(row_key) {
2170 unique_row_indices.push(row_idx);
2172 }
2173 }
2174
2175 Ok(view.with_rows(unique_row_indices))
2177 }
2178
2179 fn apply_multi_order_by(
2181 &self,
2182 view: DataView,
2183 order_by_columns: &[OrderByItem],
2184 ) -> Result<DataView> {
2185 self.apply_multi_order_by_with_context(view, order_by_columns, None)
2186 }
2187
2188 fn apply_multi_order_by_with_context(
2190 &self,
2191 mut view: DataView,
2192 order_by_columns: &[OrderByItem],
2193 _exec_context: Option<&ExecutionContext>,
2194 ) -> Result<DataView> {
2195 let mut sort_columns = Vec::new();
2197
2198 for order_col in order_by_columns {
2199 let column_name = match &order_col.expr {
2201 SqlExpression::Column(col_ref) => col_ref.name.clone(),
2202 _ => {
2203 return Err(anyhow!(
2205 "ORDER BY expressions not yet supported - only simple columns allowed"
2206 ));
2207 }
2208 };
2209
2210 let col_index = if column_name.contains('.') {
2212 if let Some(dot_pos) = column_name.rfind('.') {
2214 let col_name = &column_name[dot_pos + 1..];
2215
2216 debug!(
2219 "ORDER BY: Extracting unqualified column '{}' from '{}'",
2220 col_name, column_name
2221 );
2222 view.source().get_column_index(col_name)
2223 } else {
2224 view.source().get_column_index(&column_name)
2225 }
2226 } else {
2227 view.source().get_column_index(&column_name)
2229 }
2230 .ok_or_else(|| {
2231 let suggestion = self.find_similar_column(view.source(), &column_name);
2233 match suggestion {
2234 Some(similar) => anyhow::anyhow!(
2235 "Column '{}' not found. Did you mean '{}'?",
2236 column_name,
2237 similar
2238 ),
2239 None => {
2240 let available_cols = view.source().column_names().join(", ");
2242 anyhow::anyhow!(
2243 "Column '{}' not found. Available columns: {}",
2244 column_name,
2245 available_cols
2246 )
2247 }
2248 }
2249 })?;
2250
2251 let ascending = matches!(order_col.direction, SortDirection::Asc);
2252 sort_columns.push((col_index, ascending));
2253 }
2254
2255 view.apply_multi_sort(&sort_columns)?;
2257 Ok(view)
2258 }
2259
2260 fn apply_group_by(
2262 &self,
2263 view: DataView,
2264 group_by_exprs: &[SqlExpression],
2265 select_items: &[SelectItem],
2266 having: Option<&SqlExpression>,
2267 plan: &mut ExecutionPlanBuilder,
2268 ) -> Result<DataView> {
2269 let (result_view, phase_info) = self.apply_group_by_expressions(
2271 view,
2272 group_by_exprs,
2273 select_items,
2274 having,
2275 self.case_insensitive,
2276 self.date_notation.clone(),
2277 )?;
2278
2279 plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
2281 plan.add_detail(format!(
2282 "Phase 1 - Group Building: {:.3}ms",
2283 phase_info.phase2_key_building.as_secs_f64() * 1000.0
2284 ));
2285 plan.add_detail(format!(
2286 " • Processing {} rows into {} groups",
2287 phase_info.total_rows, phase_info.num_groups
2288 ));
2289 plan.add_detail(format!(
2290 "Phase 2 - Aggregation: {:.3}ms",
2291 phase_info.phase4_aggregation.as_secs_f64() * 1000.0
2292 ));
2293 if phase_info.phase4_having_evaluation > Duration::ZERO {
2294 plan.add_detail(format!(
2295 "Phase 3 - HAVING Filter: {:.3}ms",
2296 phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
2297 ));
2298 plan.add_detail(format!(
2299 " • Filtered {} groups",
2300 phase_info.groups_filtered_by_having
2301 ));
2302 }
2303 plan.add_detail(format!(
2304 "Total GROUP BY time: {:.3}ms",
2305 phase_info.total_time.as_secs_f64() * 1000.0
2306 ));
2307
2308 Ok(result_view)
2309 }
2310
2311 pub fn estimate_group_cardinality(
2314 &self,
2315 view: &DataView,
2316 group_by_exprs: &[SqlExpression],
2317 ) -> usize {
2318 let row_count = view.get_visible_rows().len();
2320 if row_count <= 100 {
2321 return row_count;
2322 }
2323
2324 let sample_size = min(1000, row_count / 10).max(100);
2326 let mut seen = FxHashSet::default();
2327
2328 let visible_rows = view.get_visible_rows();
2329 for (i, &row_idx) in visible_rows.iter().enumerate() {
2330 if i >= sample_size {
2331 break;
2332 }
2333
2334 let mut key_values = Vec::new();
2336 for expr in group_by_exprs {
2337 let mut evaluator = ArithmeticEvaluator::new(view.source());
2338 let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
2339 key_values.push(value);
2340 }
2341
2342 seen.insert(key_values);
2343 }
2344
2345 let sample_cardinality = seen.len();
2347 let estimated = (sample_cardinality * row_count) / sample_size;
2348
2349 estimated.min(row_count).max(sample_cardinality)
2351 }
2352}
2353
2354#[cfg(test)]
2355mod tests {
2356 use super::*;
2357 use crate::data::datatable::{DataColumn, DataRow, DataValue};
2358
2359 fn create_test_table() -> Arc<DataTable> {
2360 let mut table = DataTable::new("test");
2361
2362 table.add_column(DataColumn::new("id"));
2364 table.add_column(DataColumn::new("name"));
2365 table.add_column(DataColumn::new("age"));
2366
2367 table
2369 .add_row(DataRow::new(vec![
2370 DataValue::Integer(1),
2371 DataValue::String("Alice".to_string()),
2372 DataValue::Integer(30),
2373 ]))
2374 .unwrap();
2375
2376 table
2377 .add_row(DataRow::new(vec![
2378 DataValue::Integer(2),
2379 DataValue::String("Bob".to_string()),
2380 DataValue::Integer(25),
2381 ]))
2382 .unwrap();
2383
2384 table
2385 .add_row(DataRow::new(vec![
2386 DataValue::Integer(3),
2387 DataValue::String("Charlie".to_string()),
2388 DataValue::Integer(35),
2389 ]))
2390 .unwrap();
2391
2392 Arc::new(table)
2393 }
2394
2395 #[test]
2396 fn test_select_all() {
2397 let table = create_test_table();
2398 let engine = QueryEngine::new();
2399
2400 let view = engine
2401 .execute(table.clone(), "SELECT * FROM users")
2402 .unwrap();
2403 assert_eq!(view.row_count(), 3);
2404 assert_eq!(view.column_count(), 3);
2405 }
2406
2407 #[test]
2408 fn test_select_columns() {
2409 let table = create_test_table();
2410 let engine = QueryEngine::new();
2411
2412 let view = engine
2413 .execute(table.clone(), "SELECT name, age FROM users")
2414 .unwrap();
2415 assert_eq!(view.row_count(), 3);
2416 assert_eq!(view.column_count(), 2);
2417 }
2418
2419 #[test]
2420 fn test_select_with_limit() {
2421 let table = create_test_table();
2422 let engine = QueryEngine::new();
2423
2424 let view = engine
2425 .execute(table.clone(), "SELECT * FROM users LIMIT 2")
2426 .unwrap();
2427 assert_eq!(view.row_count(), 2);
2428 }
2429
2430 #[test]
2431 fn test_type_coercion_contains() {
2432 let _ = tracing_subscriber::fmt()
2434 .with_max_level(tracing::Level::DEBUG)
2435 .try_init();
2436
2437 let mut table = DataTable::new("test");
2438 table.add_column(DataColumn::new("id"));
2439 table.add_column(DataColumn::new("status"));
2440 table.add_column(DataColumn::new("price"));
2441
2442 table
2444 .add_row(DataRow::new(vec![
2445 DataValue::Integer(1),
2446 DataValue::String("Pending".to_string()),
2447 DataValue::Float(99.99),
2448 ]))
2449 .unwrap();
2450
2451 table
2452 .add_row(DataRow::new(vec![
2453 DataValue::Integer(2),
2454 DataValue::String("Confirmed".to_string()),
2455 DataValue::Float(150.50),
2456 ]))
2457 .unwrap();
2458
2459 table
2460 .add_row(DataRow::new(vec![
2461 DataValue::Integer(3),
2462 DataValue::String("Pending".to_string()),
2463 DataValue::Float(75.00),
2464 ]))
2465 .unwrap();
2466
2467 let table = Arc::new(table);
2468 let engine = QueryEngine::new();
2469
2470 println!("\n=== Testing WHERE clause with Contains ===");
2471 println!("Table has {} rows", table.row_count());
2472 for i in 0..table.row_count() {
2473 let status = table.get_value(i, 1);
2474 println!("Row {i}: status = {status:?}");
2475 }
2476
2477 println!("\n--- Test 1: status.Contains('pend') ---");
2479 let result = engine.execute(
2480 table.clone(),
2481 "SELECT * FROM test WHERE status.Contains('pend')",
2482 );
2483 match result {
2484 Ok(view) => {
2485 println!("SUCCESS: Found {} matching rows", view.row_count());
2486 assert_eq!(view.row_count(), 2); }
2488 Err(e) => {
2489 panic!("Query failed: {e}");
2490 }
2491 }
2492
2493 println!("\n--- Test 2: price.Contains('9') ---");
2495 let result = engine.execute(
2496 table.clone(),
2497 "SELECT * FROM test WHERE price.Contains('9')",
2498 );
2499 match result {
2500 Ok(view) => {
2501 println!(
2502 "SUCCESS: Found {} matching rows with price containing '9'",
2503 view.row_count()
2504 );
2505 assert!(view.row_count() >= 1);
2507 }
2508 Err(e) => {
2509 panic!("Numeric coercion query failed: {e}");
2510 }
2511 }
2512
2513 println!("\n=== All tests passed! ===");
2514 }
2515
2516 #[test]
2517 fn test_not_in_clause() {
2518 let _ = tracing_subscriber::fmt()
2520 .with_max_level(tracing::Level::DEBUG)
2521 .try_init();
2522
2523 let mut table = DataTable::new("test");
2524 table.add_column(DataColumn::new("id"));
2525 table.add_column(DataColumn::new("country"));
2526
2527 table
2529 .add_row(DataRow::new(vec![
2530 DataValue::Integer(1),
2531 DataValue::String("CA".to_string()),
2532 ]))
2533 .unwrap();
2534
2535 table
2536 .add_row(DataRow::new(vec![
2537 DataValue::Integer(2),
2538 DataValue::String("US".to_string()),
2539 ]))
2540 .unwrap();
2541
2542 table
2543 .add_row(DataRow::new(vec![
2544 DataValue::Integer(3),
2545 DataValue::String("UK".to_string()),
2546 ]))
2547 .unwrap();
2548
2549 let table = Arc::new(table);
2550 let engine = QueryEngine::new();
2551
2552 println!("\n=== Testing NOT IN clause ===");
2553 println!("Table has {} rows", table.row_count());
2554 for i in 0..table.row_count() {
2555 let country = table.get_value(i, 1);
2556 println!("Row {i}: country = {country:?}");
2557 }
2558
2559 println!("\n--- Test: country NOT IN ('CA') ---");
2561 let result = engine.execute(
2562 table.clone(),
2563 "SELECT * FROM test WHERE country NOT IN ('CA')",
2564 );
2565 match result {
2566 Ok(view) => {
2567 println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
2568 assert_eq!(view.row_count(), 2); }
2570 Err(e) => {
2571 panic!("NOT IN query failed: {e}");
2572 }
2573 }
2574
2575 println!("\n=== NOT IN test complete! ===");
2576 }
2577
2578 #[test]
2579 fn test_case_insensitive_in_and_not_in() {
2580 let _ = tracing_subscriber::fmt()
2582 .with_max_level(tracing::Level::DEBUG)
2583 .try_init();
2584
2585 let mut table = DataTable::new("test");
2586 table.add_column(DataColumn::new("id"));
2587 table.add_column(DataColumn::new("country"));
2588
2589 table
2591 .add_row(DataRow::new(vec![
2592 DataValue::Integer(1),
2593 DataValue::String("CA".to_string()), ]))
2595 .unwrap();
2596
2597 table
2598 .add_row(DataRow::new(vec![
2599 DataValue::Integer(2),
2600 DataValue::String("us".to_string()), ]))
2602 .unwrap();
2603
2604 table
2605 .add_row(DataRow::new(vec![
2606 DataValue::Integer(3),
2607 DataValue::String("UK".to_string()), ]))
2609 .unwrap();
2610
2611 let table = Arc::new(table);
2612
2613 println!("\n=== Testing Case-Insensitive IN clause ===");
2614 println!("Table has {} rows", table.row_count());
2615 for i in 0..table.row_count() {
2616 let country = table.get_value(i, 1);
2617 println!("Row {i}: country = {country:?}");
2618 }
2619
2620 println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
2622 let engine = QueryEngine::with_case_insensitive(true);
2623 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
2624 match result {
2625 Ok(view) => {
2626 println!(
2627 "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
2628 view.row_count()
2629 );
2630 assert_eq!(view.row_count(), 1); }
2632 Err(e) => {
2633 panic!("Case-insensitive IN query failed: {e}");
2634 }
2635 }
2636
2637 println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
2639 let result = engine.execute(
2640 table.clone(),
2641 "SELECT * FROM test WHERE country NOT IN ('ca')",
2642 );
2643 match result {
2644 Ok(view) => {
2645 println!(
2646 "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
2647 view.row_count()
2648 );
2649 assert_eq!(view.row_count(), 2); }
2651 Err(e) => {
2652 panic!("Case-insensitive NOT IN query failed: {e}");
2653 }
2654 }
2655
2656 println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
2658 let engine_case_sensitive = QueryEngine::new(); let result = engine_case_sensitive
2660 .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
2661 match result {
2662 Ok(view) => {
2663 println!(
2664 "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
2665 view.row_count()
2666 );
2667 assert_eq!(view.row_count(), 0); }
2669 Err(e) => {
2670 panic!("Case-sensitive IN query failed: {e}");
2671 }
2672 }
2673
2674 println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
2675 }
2676
2677 #[test]
2678 #[ignore = "Parentheses in WHERE clause not yet implemented"]
2679 fn test_parentheses_in_where_clause() {
2680 let _ = tracing_subscriber::fmt()
2682 .with_max_level(tracing::Level::DEBUG)
2683 .try_init();
2684
2685 let mut table = DataTable::new("test");
2686 table.add_column(DataColumn::new("id"));
2687 table.add_column(DataColumn::new("status"));
2688 table.add_column(DataColumn::new("priority"));
2689
2690 table
2692 .add_row(DataRow::new(vec![
2693 DataValue::Integer(1),
2694 DataValue::String("Pending".to_string()),
2695 DataValue::String("High".to_string()),
2696 ]))
2697 .unwrap();
2698
2699 table
2700 .add_row(DataRow::new(vec![
2701 DataValue::Integer(2),
2702 DataValue::String("Complete".to_string()),
2703 DataValue::String("High".to_string()),
2704 ]))
2705 .unwrap();
2706
2707 table
2708 .add_row(DataRow::new(vec![
2709 DataValue::Integer(3),
2710 DataValue::String("Pending".to_string()),
2711 DataValue::String("Low".to_string()),
2712 ]))
2713 .unwrap();
2714
2715 table
2716 .add_row(DataRow::new(vec![
2717 DataValue::Integer(4),
2718 DataValue::String("Complete".to_string()),
2719 DataValue::String("Low".to_string()),
2720 ]))
2721 .unwrap();
2722
2723 let table = Arc::new(table);
2724 let engine = QueryEngine::new();
2725
2726 println!("\n=== Testing Parentheses in WHERE clause ===");
2727 println!("Table has {} rows", table.row_count());
2728 for i in 0..table.row_count() {
2729 let status = table.get_value(i, 1);
2730 let priority = table.get_value(i, 2);
2731 println!("Row {i}: status = {status:?}, priority = {priority:?}");
2732 }
2733
2734 println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
2736 let result = engine.execute(
2737 table.clone(),
2738 "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
2739 );
2740 match result {
2741 Ok(view) => {
2742 println!(
2743 "SUCCESS: Found {} rows with parenthetical logic",
2744 view.row_count()
2745 );
2746 assert_eq!(view.row_count(), 2); }
2748 Err(e) => {
2749 panic!("Parentheses query failed: {e}");
2750 }
2751 }
2752
2753 println!("\n=== Parentheses test complete! ===");
2754 }
2755
2756 #[test]
2757 #[ignore = "Numeric type coercion needs fixing"]
2758 fn test_numeric_type_coercion() {
2759 let _ = tracing_subscriber::fmt()
2761 .with_max_level(tracing::Level::DEBUG)
2762 .try_init();
2763
2764 let mut table = DataTable::new("test");
2765 table.add_column(DataColumn::new("id"));
2766 table.add_column(DataColumn::new("price"));
2767 table.add_column(DataColumn::new("quantity"));
2768
2769 table
2771 .add_row(DataRow::new(vec![
2772 DataValue::Integer(1),
2773 DataValue::Float(99.50), DataValue::Integer(100),
2775 ]))
2776 .unwrap();
2777
2778 table
2779 .add_row(DataRow::new(vec![
2780 DataValue::Integer(2),
2781 DataValue::Float(150.0), DataValue::Integer(200),
2783 ]))
2784 .unwrap();
2785
2786 table
2787 .add_row(DataRow::new(vec![
2788 DataValue::Integer(3),
2789 DataValue::Integer(75), DataValue::Integer(50),
2791 ]))
2792 .unwrap();
2793
2794 let table = Arc::new(table);
2795 let engine = QueryEngine::new();
2796
2797 println!("\n=== Testing Numeric Type Coercion ===");
2798 println!("Table has {} rows", table.row_count());
2799 for i in 0..table.row_count() {
2800 let price = table.get_value(i, 1);
2801 let quantity = table.get_value(i, 2);
2802 println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
2803 }
2804
2805 println!("\n--- Test: price.Contains('.') ---");
2807 let result = engine.execute(
2808 table.clone(),
2809 "SELECT * FROM test WHERE price.Contains('.')",
2810 );
2811 match result {
2812 Ok(view) => {
2813 println!(
2814 "SUCCESS: Found {} rows with decimal points in price",
2815 view.row_count()
2816 );
2817 assert_eq!(view.row_count(), 2); }
2819 Err(e) => {
2820 panic!("Numeric Contains query failed: {e}");
2821 }
2822 }
2823
2824 println!("\n--- Test: quantity.Contains('0') ---");
2826 let result = engine.execute(
2827 table.clone(),
2828 "SELECT * FROM test WHERE quantity.Contains('0')",
2829 );
2830 match result {
2831 Ok(view) => {
2832 println!(
2833 "SUCCESS: Found {} rows with '0' in quantity",
2834 view.row_count()
2835 );
2836 assert_eq!(view.row_count(), 2); }
2838 Err(e) => {
2839 panic!("Integer Contains query failed: {e}");
2840 }
2841 }
2842
2843 println!("\n=== Numeric type coercion test complete! ===");
2844 }
2845
2846 #[test]
2847 fn test_datetime_comparisons() {
2848 let _ = tracing_subscriber::fmt()
2850 .with_max_level(tracing::Level::DEBUG)
2851 .try_init();
2852
2853 let mut table = DataTable::new("test");
2854 table.add_column(DataColumn::new("id"));
2855 table.add_column(DataColumn::new("created_date"));
2856
2857 table
2859 .add_row(DataRow::new(vec![
2860 DataValue::Integer(1),
2861 DataValue::String("2024-12-15".to_string()),
2862 ]))
2863 .unwrap();
2864
2865 table
2866 .add_row(DataRow::new(vec![
2867 DataValue::Integer(2),
2868 DataValue::String("2025-01-15".to_string()),
2869 ]))
2870 .unwrap();
2871
2872 table
2873 .add_row(DataRow::new(vec![
2874 DataValue::Integer(3),
2875 DataValue::String("2025-02-15".to_string()),
2876 ]))
2877 .unwrap();
2878
2879 let table = Arc::new(table);
2880 let engine = QueryEngine::new();
2881
2882 println!("\n=== Testing DateTime Comparisons ===");
2883 println!("Table has {} rows", table.row_count());
2884 for i in 0..table.row_count() {
2885 let date = table.get_value(i, 1);
2886 println!("Row {i}: created_date = {date:?}");
2887 }
2888
2889 println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
2891 let result = engine.execute(
2892 table.clone(),
2893 "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
2894 );
2895 match result {
2896 Ok(view) => {
2897 println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
2898 assert_eq!(view.row_count(), 2); }
2900 Err(e) => {
2901 panic!("DateTime comparison query failed: {e}");
2902 }
2903 }
2904
2905 println!("\n=== DateTime comparison test complete! ===");
2906 }
2907
2908 #[test]
2909 fn test_not_with_method_calls() {
2910 let _ = tracing_subscriber::fmt()
2912 .with_max_level(tracing::Level::DEBUG)
2913 .try_init();
2914
2915 let mut table = DataTable::new("test");
2916 table.add_column(DataColumn::new("id"));
2917 table.add_column(DataColumn::new("status"));
2918
2919 table
2921 .add_row(DataRow::new(vec![
2922 DataValue::Integer(1),
2923 DataValue::String("Pending Review".to_string()),
2924 ]))
2925 .unwrap();
2926
2927 table
2928 .add_row(DataRow::new(vec![
2929 DataValue::Integer(2),
2930 DataValue::String("Complete".to_string()),
2931 ]))
2932 .unwrap();
2933
2934 table
2935 .add_row(DataRow::new(vec![
2936 DataValue::Integer(3),
2937 DataValue::String("Pending Approval".to_string()),
2938 ]))
2939 .unwrap();
2940
2941 let table = Arc::new(table);
2942 let engine = QueryEngine::with_case_insensitive(true);
2943
2944 println!("\n=== Testing NOT with Method Calls ===");
2945 println!("Table has {} rows", table.row_count());
2946 for i in 0..table.row_count() {
2947 let status = table.get_value(i, 1);
2948 println!("Row {i}: status = {status:?}");
2949 }
2950
2951 println!("\n--- Test: NOT status.Contains('pend') ---");
2953 let result = engine.execute(
2954 table.clone(),
2955 "SELECT * FROM test WHERE NOT status.Contains('pend')",
2956 );
2957 match result {
2958 Ok(view) => {
2959 println!(
2960 "SUCCESS: Found {} rows NOT containing 'pend'",
2961 view.row_count()
2962 );
2963 assert_eq!(view.row_count(), 1); }
2965 Err(e) => {
2966 panic!("NOT Contains query failed: {e}");
2967 }
2968 }
2969
2970 println!("\n--- Test: NOT status.StartsWith('Pending') ---");
2972 let result = engine.execute(
2973 table.clone(),
2974 "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
2975 );
2976 match result {
2977 Ok(view) => {
2978 println!(
2979 "SUCCESS: Found {} rows NOT starting with 'Pending'",
2980 view.row_count()
2981 );
2982 assert_eq!(view.row_count(), 1); }
2984 Err(e) => {
2985 panic!("NOT StartsWith query failed: {e}");
2986 }
2987 }
2988
2989 println!("\n=== NOT with method calls test complete! ===");
2990 }
2991
2992 #[test]
2993 #[ignore = "Complex logical expressions with parentheses not yet implemented"]
2994 fn test_complex_logical_expressions() {
2995 let _ = tracing_subscriber::fmt()
2997 .with_max_level(tracing::Level::DEBUG)
2998 .try_init();
2999
3000 let mut table = DataTable::new("test");
3001 table.add_column(DataColumn::new("id"));
3002 table.add_column(DataColumn::new("status"));
3003 table.add_column(DataColumn::new("priority"));
3004 table.add_column(DataColumn::new("assigned"));
3005
3006 table
3008 .add_row(DataRow::new(vec![
3009 DataValue::Integer(1),
3010 DataValue::String("Pending".to_string()),
3011 DataValue::String("High".to_string()),
3012 DataValue::String("John".to_string()),
3013 ]))
3014 .unwrap();
3015
3016 table
3017 .add_row(DataRow::new(vec![
3018 DataValue::Integer(2),
3019 DataValue::String("Complete".to_string()),
3020 DataValue::String("High".to_string()),
3021 DataValue::String("Jane".to_string()),
3022 ]))
3023 .unwrap();
3024
3025 table
3026 .add_row(DataRow::new(vec![
3027 DataValue::Integer(3),
3028 DataValue::String("Pending".to_string()),
3029 DataValue::String("Low".to_string()),
3030 DataValue::String("John".to_string()),
3031 ]))
3032 .unwrap();
3033
3034 table
3035 .add_row(DataRow::new(vec![
3036 DataValue::Integer(4),
3037 DataValue::String("In Progress".to_string()),
3038 DataValue::String("Medium".to_string()),
3039 DataValue::String("Jane".to_string()),
3040 ]))
3041 .unwrap();
3042
3043 let table = Arc::new(table);
3044 let engine = QueryEngine::new();
3045
3046 println!("\n=== Testing Complex Logical Expressions ===");
3047 println!("Table has {} rows", table.row_count());
3048 for i in 0..table.row_count() {
3049 let status = table.get_value(i, 1);
3050 let priority = table.get_value(i, 2);
3051 let assigned = table.get_value(i, 3);
3052 println!(
3053 "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
3054 );
3055 }
3056
3057 println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
3059 let result = engine.execute(
3060 table.clone(),
3061 "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
3062 );
3063 match result {
3064 Ok(view) => {
3065 println!(
3066 "SUCCESS: Found {} rows with complex logic",
3067 view.row_count()
3068 );
3069 assert_eq!(view.row_count(), 2); }
3071 Err(e) => {
3072 panic!("Complex logic query failed: {e}");
3073 }
3074 }
3075
3076 println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
3078 let result = engine.execute(
3079 table.clone(),
3080 "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
3081 );
3082 match result {
3083 Ok(view) => {
3084 println!(
3085 "SUCCESS: Found {} rows with NOT complex logic",
3086 view.row_count()
3087 );
3088 assert_eq!(view.row_count(), 2); }
3090 Err(e) => {
3091 panic!("NOT complex logic query failed: {e}");
3092 }
3093 }
3094
3095 println!("\n=== Complex logical expressions test complete! ===");
3096 }
3097
3098 #[test]
3099 fn test_mixed_data_types_and_edge_cases() {
3100 let _ = tracing_subscriber::fmt()
3102 .with_max_level(tracing::Level::DEBUG)
3103 .try_init();
3104
3105 let mut table = DataTable::new("test");
3106 table.add_column(DataColumn::new("id"));
3107 table.add_column(DataColumn::new("value"));
3108 table.add_column(DataColumn::new("nullable_field"));
3109
3110 table
3112 .add_row(DataRow::new(vec![
3113 DataValue::Integer(1),
3114 DataValue::String("123.45".to_string()),
3115 DataValue::String("present".to_string()),
3116 ]))
3117 .unwrap();
3118
3119 table
3120 .add_row(DataRow::new(vec![
3121 DataValue::Integer(2),
3122 DataValue::Float(678.90),
3123 DataValue::Null,
3124 ]))
3125 .unwrap();
3126
3127 table
3128 .add_row(DataRow::new(vec![
3129 DataValue::Integer(3),
3130 DataValue::Boolean(true),
3131 DataValue::String("also present".to_string()),
3132 ]))
3133 .unwrap();
3134
3135 table
3136 .add_row(DataRow::new(vec![
3137 DataValue::Integer(4),
3138 DataValue::String("false".to_string()),
3139 DataValue::Null,
3140 ]))
3141 .unwrap();
3142
3143 let table = Arc::new(table);
3144 let engine = QueryEngine::new();
3145
3146 println!("\n=== Testing Mixed Data Types and Edge Cases ===");
3147 println!("Table has {} rows", table.row_count());
3148 for i in 0..table.row_count() {
3149 let value = table.get_value(i, 1);
3150 let nullable = table.get_value(i, 2);
3151 println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
3152 }
3153
3154 println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
3156 let result = engine.execute(
3157 table.clone(),
3158 "SELECT * FROM test WHERE value.Contains('true')",
3159 );
3160 match result {
3161 Ok(view) => {
3162 println!(
3163 "SUCCESS: Found {} rows with boolean coercion",
3164 view.row_count()
3165 );
3166 assert_eq!(view.row_count(), 1); }
3168 Err(e) => {
3169 panic!("Boolean coercion query failed: {e}");
3170 }
3171 }
3172
3173 println!("\n--- Test: id IN (1, 3) ---");
3175 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
3176 match result {
3177 Ok(view) => {
3178 println!("SUCCESS: Found {} rows with IN clause", view.row_count());
3179 assert_eq!(view.row_count(), 2); }
3181 Err(e) => {
3182 panic!("Multiple IN values query failed: {e}");
3183 }
3184 }
3185
3186 println!("\n=== Mixed data types test complete! ===");
3187 }
3188
3189 #[test]
3191 fn test_aggregate_only_single_row() {
3192 let table = create_test_stock_data();
3193 let engine = QueryEngine::new();
3194
3195 let result = engine
3197 .execute(
3198 table.clone(),
3199 "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
3200 )
3201 .expect("Query should succeed");
3202
3203 assert_eq!(
3204 result.row_count(),
3205 1,
3206 "Aggregate-only query should return exactly 1 row"
3207 );
3208 assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
3209
3210 let source = result.source();
3212 let row = source.get_row(0).expect("Should have first row");
3213
3214 assert_eq!(row.values[0], DataValue::Integer(5));
3216
3217 assert_eq!(row.values[1], DataValue::Float(99.5));
3219
3220 assert_eq!(row.values[2], DataValue::Float(105.0));
3222
3223 if let DataValue::Float(avg) = &row.values[3] {
3225 assert!(
3226 (avg - 102.4).abs() < 0.01,
3227 "Average should be approximately 102.4, got {}",
3228 avg
3229 );
3230 } else {
3231 panic!("AVG should return a Float value");
3232 }
3233 }
3234
3235 #[test]
3237 fn test_single_aggregate_single_row() {
3238 let table = create_test_stock_data();
3239 let engine = QueryEngine::new();
3240
3241 let result = engine
3242 .execute(table.clone(), "SELECT COUNT(*) FROM stock")
3243 .expect("Query should succeed");
3244
3245 assert_eq!(
3246 result.row_count(),
3247 1,
3248 "Single aggregate query should return exactly 1 row"
3249 );
3250 assert_eq!(result.column_count(), 1, "Should have 1 column");
3251
3252 let source = result.source();
3253 let row = source.get_row(0).expect("Should have first row");
3254 assert_eq!(row.values[0], DataValue::Integer(5));
3255 }
3256
3257 #[test]
3259 fn test_aggregate_with_where_single_row() {
3260 let table = create_test_stock_data();
3261 let engine = QueryEngine::new();
3262
3263 let result = engine
3265 .execute(
3266 table.clone(),
3267 "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
3268 )
3269 .expect("Query should succeed");
3270
3271 assert_eq!(
3272 result.row_count(),
3273 1,
3274 "Filtered aggregate query should return exactly 1 row"
3275 );
3276 assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
3277
3278 let source = result.source();
3279 let row = source.get_row(0).expect("Should have first row");
3280
3281 assert_eq!(row.values[0], DataValue::Integer(2));
3283 assert_eq!(row.values[1], DataValue::Float(103.5)); assert_eq!(row.values[2], DataValue::Float(105.0)); }
3286
3287 #[test]
3288 fn test_not_in_parsing() {
3289 use crate::sql::recursive_parser::Parser;
3290
3291 let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
3292 println!("\n=== Testing NOT IN parsing ===");
3293 println!("Parsing query: {query}");
3294
3295 let mut parser = Parser::new(query);
3296 match parser.parse() {
3297 Ok(statement) => {
3298 println!("Parsed statement: {statement:#?}");
3299 if let Some(where_clause) = statement.where_clause {
3300 println!("WHERE conditions: {:#?}", where_clause.conditions);
3301 if let Some(first_condition) = where_clause.conditions.first() {
3302 println!("First condition expression: {:#?}", first_condition.expr);
3303 }
3304 }
3305 }
3306 Err(e) => {
3307 panic!("Parse error: {e}");
3308 }
3309 }
3310 }
3311
3312 fn create_test_stock_data() -> Arc<DataTable> {
3314 let mut table = DataTable::new("stock");
3315
3316 table.add_column(DataColumn::new("symbol"));
3317 table.add_column(DataColumn::new("close"));
3318 table.add_column(DataColumn::new("volume"));
3319
3320 let test_data = vec![
3322 ("AAPL", 99.5, 1000),
3323 ("AAPL", 101.2, 1500),
3324 ("AAPL", 103.5, 2000),
3325 ("AAPL", 105.0, 1200),
3326 ("AAPL", 102.8, 1800),
3327 ];
3328
3329 for (symbol, close, volume) in test_data {
3330 table
3331 .add_row(DataRow::new(vec![
3332 DataValue::String(symbol.to_string()),
3333 DataValue::Float(close),
3334 DataValue::Integer(volume),
3335 ]))
3336 .expect("Should add row successfully");
3337 }
3338
3339 Arc::new(table)
3340 }
3341}
3342
3343#[cfg(test)]
3344#[path = "query_engine_tests.rs"]
3345mod query_engine_tests;