1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::row_expanders::RowExpanderRegistry;
19use crate::data::subquery_executor::SubqueryExecutor;
20use crate::data::temp_table_registry::TempTableRegistry;
21use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
22use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
23use crate::sql::parser::ast::ColumnRef;
24use crate::sql::parser::ast::SetOperation;
25use crate::sql::parser::ast::TableSource;
26use crate::sql::parser::ast::WindowSpec;
27use crate::sql::recursive_parser::{
28 CTEType, OrderByItem, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
29 TableFunction,
30};
31
32#[derive(Debug, Clone)]
34pub struct ExecutionContext {
35 alias_map: HashMap<String, String>,
38}
39
40impl ExecutionContext {
41 pub fn new() -> Self {
43 Self {
44 alias_map: HashMap::new(),
45 }
46 }
47
48 pub fn register_alias(&mut self, alias: String, table_name: String) {
50 debug!("Registering alias: {} -> {}", alias, table_name);
51 self.alias_map.insert(alias, table_name);
52 }
53
54 pub fn resolve_alias(&self, name: &str) -> String {
57 self.alias_map
58 .get(name)
59 .cloned()
60 .unwrap_or_else(|| name.to_string())
61 }
62
63 pub fn is_alias(&self, name: &str) -> bool {
65 self.alias_map.contains_key(name)
66 }
67
68 pub fn get_aliases(&self) -> HashMap<String, String> {
70 self.alias_map.clone()
71 }
72
73 pub fn resolve_column_index(&self, table: &DataTable, column_ref: &ColumnRef) -> Result<usize> {
88 if let Some(table_prefix) = &column_ref.table_prefix {
89 let actual_table = self.resolve_alias(table_prefix);
91
92 let qualified_name = format!("{}.{}", actual_table, column_ref.name);
94 if let Some(idx) = table.find_column_by_qualified_name(&qualified_name) {
95 debug!(
96 "Resolved {}.{} -> qualified column '{}' at index {}",
97 table_prefix, column_ref.name, qualified_name, idx
98 );
99 return Ok(idx);
100 }
101
102 if let Some(idx) = table.get_column_index(&column_ref.name) {
104 debug!(
105 "Resolved {}.{} -> unqualified column '{}' at index {}",
106 table_prefix, column_ref.name, column_ref.name, idx
107 );
108 return Ok(idx);
109 }
110
111 Err(anyhow!(
113 "Column '{}' not found. Table '{}' may not support qualified column names",
114 qualified_name,
115 actual_table
116 ))
117 } else {
118 if let Some(idx) = table.get_column_index(&column_ref.name) {
120 debug!(
121 "Resolved unqualified column '{}' at index {}",
122 column_ref.name, idx
123 );
124 return Ok(idx);
125 }
126
127 if column_ref.name.contains('.') {
129 if let Some(idx) = table.find_column_by_qualified_name(&column_ref.name) {
130 debug!(
131 "Resolved '{}' as qualified column at index {}",
132 column_ref.name, idx
133 );
134 return Ok(idx);
135 }
136 }
137
138 let suggestion = self.find_similar_column(table, &column_ref.name);
140 match suggestion {
141 Some(similar) => Err(anyhow!(
142 "Column '{}' not found. Did you mean '{}'?",
143 column_ref.name,
144 similar
145 )),
146 None => Err(anyhow!("Column '{}' not found", column_ref.name)),
147 }
148 }
149 }
150
151 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
153 let columns = table.column_names();
154 let mut best_match: Option<(String, usize)> = None;
155
156 for col in columns {
157 let distance = edit_distance(name, &col);
158 if distance <= 2 {
159 match best_match {
161 Some((_, best_dist)) if distance < best_dist => {
162 best_match = Some((col.clone(), distance));
163 }
164 None => {
165 best_match = Some((col.clone(), distance));
166 }
167 _ => {}
168 }
169 }
170 }
171
172 best_match.map(|(name, _)| name)
173 }
174}
175
176impl Default for ExecutionContext {
177 fn default() -> Self {
178 Self::new()
179 }
180}
181
182fn edit_distance(a: &str, b: &str) -> usize {
184 let len_a = a.chars().count();
185 let len_b = b.chars().count();
186
187 if len_a == 0 {
188 return len_b;
189 }
190 if len_b == 0 {
191 return len_a;
192 }
193
194 let mut matrix = vec![vec![0; len_b + 1]; len_a + 1];
195
196 for i in 0..=len_a {
197 matrix[i][0] = i;
198 }
199 for j in 0..=len_b {
200 matrix[0][j] = j;
201 }
202
203 let a_chars: Vec<char> = a.chars().collect();
204 let b_chars: Vec<char> = b.chars().collect();
205
206 for i in 1..=len_a {
207 for j in 1..=len_b {
208 let cost = if a_chars[i - 1] == b_chars[j - 1] {
209 0
210 } else {
211 1
212 };
213 matrix[i][j] = min(
214 min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1),
215 matrix[i - 1][j - 1] + cost,
216 );
217 }
218 }
219
220 matrix[len_a][len_b]
221}
222
223#[derive(Clone)]
225pub struct QueryEngine {
226 case_insensitive: bool,
227 date_notation: String,
228 _behavior_config: Option<BehaviorConfig>,
229}
230
231impl Default for QueryEngine {
232 fn default() -> Self {
233 Self::new()
234 }
235}
236
237impl QueryEngine {
238 #[must_use]
239 pub fn new() -> Self {
240 Self {
241 case_insensitive: false,
242 date_notation: get_date_notation(),
243 _behavior_config: None,
244 }
245 }
246
247 #[must_use]
248 pub fn with_behavior_config(config: BehaviorConfig) -> Self {
249 let case_insensitive = config.case_insensitive_default;
250 let date_notation = get_date_notation();
252 Self {
253 case_insensitive,
254 date_notation,
255 _behavior_config: Some(config),
256 }
257 }
258
259 #[must_use]
260 pub fn with_date_notation(_date_notation: String) -> Self {
261 Self {
262 case_insensitive: false,
263 date_notation: get_date_notation(), _behavior_config: None,
265 }
266 }
267
268 #[must_use]
269 pub fn with_case_insensitive(case_insensitive: bool) -> Self {
270 Self {
271 case_insensitive,
272 date_notation: get_date_notation(),
273 _behavior_config: None,
274 }
275 }
276
277 #[must_use]
278 pub fn with_case_insensitive_and_date_notation(
279 case_insensitive: bool,
280 _date_notation: String, ) -> Self {
282 Self {
283 case_insensitive,
284 date_notation: get_date_notation(), _behavior_config: None,
286 }
287 }
288
289 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
291 let columns = table.column_names();
292 let mut best_match: Option<(String, usize)> = None;
293
294 for col in columns {
295 let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
296 let max_distance = if name.len() > 10 { 3 } else { 2 };
299 if distance <= max_distance {
300 match &best_match {
301 None => best_match = Some((col, distance)),
302 Some((_, best_dist)) if distance < *best_dist => {
303 best_match = Some((col, distance));
304 }
305 _ => {}
306 }
307 }
308 }
309
310 best_match.map(|(name, _)| name)
311 }
312
313 fn edit_distance(&self, s1: &str, s2: &str) -> usize {
315 let len1 = s1.len();
316 let len2 = s2.len();
317 let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
318
319 for i in 0..=len1 {
320 matrix[i][0] = i;
321 }
322 for j in 0..=len2 {
323 matrix[0][j] = j;
324 }
325
326 for (i, c1) in s1.chars().enumerate() {
327 for (j, c2) in s2.chars().enumerate() {
328 let cost = usize::from(c1 != c2);
329 matrix[i + 1][j + 1] = std::cmp::min(
330 matrix[i][j + 1] + 1, std::cmp::min(
332 matrix[i + 1][j] + 1, matrix[i][j] + cost, ),
335 );
336 }
337 }
338
339 matrix[len1][len2]
340 }
341
342 fn contains_unnest(expr: &SqlExpression) -> bool {
344 match expr {
345 SqlExpression::Unnest { .. } => true,
347 SqlExpression::FunctionCall { name, args, .. } => {
348 if name.to_uppercase() == "UNNEST" {
349 return true;
350 }
351 args.iter().any(Self::contains_unnest)
353 }
354 SqlExpression::BinaryOp { left, right, .. } => {
355 Self::contains_unnest(left) || Self::contains_unnest(right)
356 }
357 SqlExpression::Not { expr } => Self::contains_unnest(expr),
358 SqlExpression::CaseExpression {
359 when_branches,
360 else_branch,
361 } => {
362 when_branches.iter().any(|branch| {
363 Self::contains_unnest(&branch.condition)
364 || Self::contains_unnest(&branch.result)
365 }) || else_branch
366 .as_ref()
367 .map_or(false, |e| Self::contains_unnest(e))
368 }
369 SqlExpression::SimpleCaseExpression {
370 expr,
371 when_branches,
372 else_branch,
373 } => {
374 Self::contains_unnest(expr)
375 || when_branches.iter().any(|branch| {
376 Self::contains_unnest(&branch.value)
377 || Self::contains_unnest(&branch.result)
378 })
379 || else_branch
380 .as_ref()
381 .map_or(false, |e| Self::contains_unnest(e))
382 }
383 SqlExpression::InList { expr, values } => {
384 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
385 }
386 SqlExpression::NotInList { expr, values } => {
387 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
388 }
389 SqlExpression::Between { expr, lower, upper } => {
390 Self::contains_unnest(expr)
391 || Self::contains_unnest(lower)
392 || Self::contains_unnest(upper)
393 }
394 SqlExpression::InSubquery { expr, .. } => Self::contains_unnest(expr),
395 SqlExpression::NotInSubquery { expr, .. } => Self::contains_unnest(expr),
396 SqlExpression::ScalarSubquery { .. } => false, SqlExpression::WindowFunction { args, .. } => args.iter().any(Self::contains_unnest),
398 SqlExpression::MethodCall { args, .. } => args.iter().any(Self::contains_unnest),
399 SqlExpression::ChainedMethodCall { base, args, .. } => {
400 Self::contains_unnest(base) || args.iter().any(Self::contains_unnest)
401 }
402 _ => false,
403 }
404 }
405
406 fn collect_window_specs(expr: &SqlExpression, specs: &mut Vec<WindowSpec>) {
408 match expr {
409 SqlExpression::WindowFunction {
410 window_spec, args, ..
411 } => {
412 specs.push(window_spec.clone());
414 for arg in args {
416 Self::collect_window_specs(arg, specs);
417 }
418 }
419 SqlExpression::BinaryOp { left, right, .. } => {
420 Self::collect_window_specs(left, specs);
421 Self::collect_window_specs(right, specs);
422 }
423 SqlExpression::Not { expr } => {
424 Self::collect_window_specs(expr, specs);
425 }
426 SqlExpression::FunctionCall { args, .. } => {
427 for arg in args {
428 Self::collect_window_specs(arg, specs);
429 }
430 }
431 SqlExpression::CaseExpression {
432 when_branches,
433 else_branch,
434 } => {
435 for branch in when_branches {
436 Self::collect_window_specs(&branch.condition, specs);
437 Self::collect_window_specs(&branch.result, specs);
438 }
439 if let Some(else_expr) = else_branch {
440 Self::collect_window_specs(else_expr, specs);
441 }
442 }
443 SqlExpression::SimpleCaseExpression {
444 expr,
445 when_branches,
446 else_branch,
447 } => {
448 Self::collect_window_specs(expr, specs);
449 for branch in when_branches {
450 Self::collect_window_specs(&branch.value, specs);
451 Self::collect_window_specs(&branch.result, specs);
452 }
453 if let Some(else_expr) = else_branch {
454 Self::collect_window_specs(else_expr, specs);
455 }
456 }
457 SqlExpression::InList { expr, values, .. } => {
458 Self::collect_window_specs(expr, specs);
459 for item in values {
460 Self::collect_window_specs(item, specs);
461 }
462 }
463 SqlExpression::ChainedMethodCall { base, args, .. } => {
464 Self::collect_window_specs(base, specs);
465 for arg in args {
466 Self::collect_window_specs(arg, specs);
467 }
468 }
469 SqlExpression::Column(_)
471 | SqlExpression::NumberLiteral(_)
472 | SqlExpression::StringLiteral(_)
473 | SqlExpression::BooleanLiteral(_)
474 | SqlExpression::Null
475 | SqlExpression::DateTimeToday { .. }
476 | SqlExpression::DateTimeConstructor { .. }
477 | SqlExpression::MethodCall { .. } => {}
478 _ => {}
480 }
481 }
482
483 fn contains_window_function(expr: &SqlExpression) -> bool {
485 match expr {
486 SqlExpression::WindowFunction { .. } => true,
487 SqlExpression::BinaryOp { left, right, .. } => {
488 Self::contains_window_function(left) || Self::contains_window_function(right)
489 }
490 SqlExpression::Not { expr } => Self::contains_window_function(expr),
491 SqlExpression::FunctionCall { args, .. } => {
492 args.iter().any(Self::contains_window_function)
493 }
494 SqlExpression::CaseExpression {
495 when_branches,
496 else_branch,
497 } => {
498 when_branches.iter().any(|branch| {
499 Self::contains_window_function(&branch.condition)
500 || Self::contains_window_function(&branch.result)
501 }) || else_branch
502 .as_ref()
503 .map_or(false, |e| Self::contains_window_function(e))
504 }
505 SqlExpression::SimpleCaseExpression {
506 expr,
507 when_branches,
508 else_branch,
509 } => {
510 Self::contains_window_function(expr)
511 || when_branches.iter().any(|branch| {
512 Self::contains_window_function(&branch.value)
513 || Self::contains_window_function(&branch.result)
514 })
515 || else_branch
516 .as_ref()
517 .map_or(false, |e| Self::contains_window_function(e))
518 }
519 SqlExpression::InList { expr, values } => {
520 Self::contains_window_function(expr)
521 || values.iter().any(Self::contains_window_function)
522 }
523 SqlExpression::NotInList { expr, values } => {
524 Self::contains_window_function(expr)
525 || values.iter().any(Self::contains_window_function)
526 }
527 SqlExpression::Between { expr, lower, upper } => {
528 Self::contains_window_function(expr)
529 || Self::contains_window_function(lower)
530 || Self::contains_window_function(upper)
531 }
532 SqlExpression::InSubquery { expr, .. } => Self::contains_window_function(expr),
533 SqlExpression::NotInSubquery { expr, .. } => Self::contains_window_function(expr),
534 SqlExpression::MethodCall { args, .. } => {
535 args.iter().any(Self::contains_window_function)
536 }
537 SqlExpression::ChainedMethodCall { base, args, .. } => {
538 Self::contains_window_function(base)
539 || args.iter().any(Self::contains_window_function)
540 }
541 _ => false,
542 }
543 }
544
545 fn extract_window_specs(
547 items: &[SelectItem],
548 ) -> Vec<crate::data::batch_window_evaluator::WindowFunctionSpec> {
549 let mut specs = Vec::new();
550 for (idx, item) in items.iter().enumerate() {
551 if let SelectItem::Expression { expr, .. } = item {
552 Self::collect_window_function_specs(expr, idx, &mut specs);
553 }
554 }
555 specs
556 }
557
558 fn collect_window_function_specs(
560 expr: &SqlExpression,
561 output_column_index: usize,
562 specs: &mut Vec<crate::data::batch_window_evaluator::WindowFunctionSpec>,
563 ) {
564 match expr {
565 SqlExpression::WindowFunction {
566 name,
567 args,
568 window_spec,
569 } => {
570 specs.push(crate::data::batch_window_evaluator::WindowFunctionSpec {
571 spec: window_spec.clone(),
572 function_name: name.clone(),
573 args: args.clone(),
574 output_column_index,
575 });
576 }
577 SqlExpression::BinaryOp { left, right, .. } => {
578 Self::collect_window_function_specs(left, output_column_index, specs);
579 Self::collect_window_function_specs(right, output_column_index, specs);
580 }
581 SqlExpression::Not { expr } => {
582 Self::collect_window_function_specs(expr, output_column_index, specs);
583 }
584 SqlExpression::FunctionCall { args, .. } => {
585 for arg in args {
586 Self::collect_window_function_specs(arg, output_column_index, specs);
587 }
588 }
589 SqlExpression::CaseExpression {
590 when_branches,
591 else_branch,
592 } => {
593 for branch in when_branches {
594 Self::collect_window_function_specs(
595 &branch.condition,
596 output_column_index,
597 specs,
598 );
599 Self::collect_window_function_specs(&branch.result, output_column_index, specs);
600 }
601 if let Some(e) = else_branch {
602 Self::collect_window_function_specs(e, output_column_index, specs);
603 }
604 }
605 SqlExpression::SimpleCaseExpression {
606 expr,
607 when_branches,
608 else_branch,
609 } => {
610 Self::collect_window_function_specs(expr, output_column_index, specs);
611 for branch in when_branches {
612 Self::collect_window_function_specs(&branch.value, output_column_index, specs);
613 Self::collect_window_function_specs(&branch.result, output_column_index, specs);
614 }
615 if let Some(e) = else_branch {
616 Self::collect_window_function_specs(e, output_column_index, specs);
617 }
618 }
619 SqlExpression::InList { expr, values } => {
620 Self::collect_window_function_specs(expr, output_column_index, specs);
621 for val in values {
622 Self::collect_window_function_specs(val, output_column_index, specs);
623 }
624 }
625 SqlExpression::NotInList { expr, values } => {
626 Self::collect_window_function_specs(expr, output_column_index, specs);
627 for val in values {
628 Self::collect_window_function_specs(val, output_column_index, specs);
629 }
630 }
631 SqlExpression::Between { expr, lower, upper } => {
632 Self::collect_window_function_specs(expr, output_column_index, specs);
633 Self::collect_window_function_specs(lower, output_column_index, specs);
634 Self::collect_window_function_specs(upper, output_column_index, specs);
635 }
636 SqlExpression::InSubquery { expr, .. } => {
637 Self::collect_window_function_specs(expr, output_column_index, specs);
638 }
639 SqlExpression::NotInSubquery { expr, .. } => {
640 Self::collect_window_function_specs(expr, output_column_index, specs);
641 }
642 SqlExpression::MethodCall { args, .. } => {
643 for arg in args {
644 Self::collect_window_function_specs(arg, output_column_index, specs);
645 }
646 }
647 SqlExpression::ChainedMethodCall { base, args, .. } => {
648 Self::collect_window_function_specs(base, output_column_index, specs);
649 for arg in args {
650 Self::collect_window_function_specs(arg, output_column_index, specs);
651 }
652 }
653 _ => {} }
655 }
656
657 pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
659 let (view, _plan) = self.execute_with_plan(table, sql)?;
660 Ok(view)
661 }
662
663 pub fn execute_with_temp_tables(
665 &self,
666 table: Arc<DataTable>,
667 sql: &str,
668 temp_tables: Option<&TempTableRegistry>,
669 ) -> Result<DataView> {
670 let (view, _plan) = self.execute_with_plan_and_temp_tables(table, sql, temp_tables)?;
671 Ok(view)
672 }
673
674 pub fn execute_statement(
676 &self,
677 table: Arc<DataTable>,
678 statement: SelectStatement,
679 ) -> Result<DataView> {
680 self.execute_statement_with_temp_tables(table, statement, None)
681 }
682
683 pub fn execute_statement_with_temp_tables(
685 &self,
686 table: Arc<DataTable>,
687 statement: SelectStatement,
688 temp_tables: Option<&TempTableRegistry>,
689 ) -> Result<DataView> {
690 let mut cte_context = HashMap::new();
692
693 if let Some(temp_registry) = temp_tables {
695 for table_name in temp_registry.list_tables() {
696 if let Some(temp_table) = temp_registry.get(&table_name) {
697 debug!("Adding temp table {} to CTE context", table_name);
698 let view = DataView::new(temp_table);
699 cte_context.insert(table_name, Arc::new(view));
700 }
701 }
702 }
703
704 for cte in &statement.ctes {
705 debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
706 let cte_result = match &cte.cte_type {
708 CTEType::Standard(query) => {
709 let view = self.build_view_with_context(
711 table.clone(),
712 query.clone(),
713 &mut cte_context,
714 )?;
715
716 let mut materialized = self.materialize_view(view)?;
718
719 for column in materialized.columns_mut() {
721 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
722 column.source_table = Some(cte.name.clone());
723 }
724
725 DataView::new(Arc::new(materialized))
726 }
727 CTEType::Web(web_spec) => {
728 use crate::web::http_fetcher::WebDataFetcher;
730
731 let fetcher = WebDataFetcher::new()?;
732 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
734
735 for column in data_table.columns_mut() {
737 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
738 column.source_table = Some(cte.name.clone());
739 }
740
741 DataView::new(Arc::new(data_table))
743 }
744 CTEType::File(file_spec) => {
745 let mut data_table =
746 crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
747
748 for column in data_table.columns_mut() {
749 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
750 column.source_table = Some(cte.name.clone());
751 }
752
753 DataView::new(Arc::new(data_table))
754 }
755 };
756 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
758 debug!(
759 "QueryEngine: CTE '{}' pre-processed, stored in context",
760 cte.name
761 );
762 }
763
764 let mut subquery_executor =
766 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
767 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
768
769 self.build_view_with_context(table, processed_statement, &mut cte_context)
771 }
772
773 pub fn execute_statement_with_cte_context(
775 &self,
776 table: Arc<DataTable>,
777 statement: SelectStatement,
778 cte_context: &HashMap<String, Arc<DataView>>,
779 ) -> Result<DataView> {
780 let mut local_context = cte_context.clone();
782
783 for cte in &statement.ctes {
785 debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
786 let cte_result = match &cte.cte_type {
787 CTEType::Standard(query) => {
788 let view = self.build_view_with_context(
789 table.clone(),
790 query.clone(),
791 &mut local_context,
792 )?;
793
794 let mut materialized = self.materialize_view(view)?;
796
797 for column in materialized.columns_mut() {
799 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
800 column.source_table = Some(cte.name.clone());
801 }
802
803 DataView::new(Arc::new(materialized))
804 }
805 CTEType::Web(web_spec) => {
806 use crate::web::http_fetcher::WebDataFetcher;
808
809 let fetcher = WebDataFetcher::new()?;
810 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
812
813 for column in data_table.columns_mut() {
815 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
816 column.source_table = Some(cte.name.clone());
817 }
818
819 DataView::new(Arc::new(data_table))
821 }
822 CTEType::File(file_spec) => {
823 let mut data_table =
824 crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
825
826 for column in data_table.columns_mut() {
827 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
828 column.source_table = Some(cte.name.clone());
829 }
830
831 DataView::new(Arc::new(data_table))
832 }
833 };
834 local_context.insert(cte.name.clone(), Arc::new(cte_result));
835 }
836
837 let mut subquery_executor =
839 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
840 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
841
842 self.build_view_with_context(table, processed_statement, &mut local_context)
844 }
845
846 pub fn execute_with_plan(
848 &self,
849 table: Arc<DataTable>,
850 sql: &str,
851 ) -> Result<(DataView, ExecutionPlan)> {
852 self.execute_with_plan_and_temp_tables(table, sql, None)
853 }
854
855 pub fn execute_with_plan_and_temp_tables(
857 &self,
858 table: Arc<DataTable>,
859 sql: &str,
860 temp_tables: Option<&TempTableRegistry>,
861 ) -> Result<(DataView, ExecutionPlan)> {
862 let mut plan_builder = ExecutionPlanBuilder::new();
863 let start_time = Instant::now();
864
865 plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
867 plan_builder.add_detail(format!("Query: {}", sql));
868 let mut parser = Parser::new(sql);
869 let statement = parser
870 .parse()
871 .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
872 plan_builder.add_detail(format!("Parsed successfully"));
873 if let Some(ref from_source) = statement.from_source {
874 match from_source {
875 TableSource::Table(name) => {
876 plan_builder.add_detail(format!("FROM: {}", name));
877 }
878 TableSource::DerivedTable { alias, .. } => {
879 plan_builder.add_detail(format!("FROM: derived table (alias: {})", alias));
880 }
881 TableSource::Pivot { .. } => {
882 plan_builder.add_detail("FROM: PIVOT".to_string());
883 }
884 }
885 }
886 if statement.where_clause.is_some() {
887 plan_builder.add_detail("WHERE clause present".to_string());
888 }
889 plan_builder.end_step();
890
891 let mut cte_context = HashMap::new();
893
894 if let Some(temp_registry) = temp_tables {
896 for table_name in temp_registry.list_tables() {
897 if let Some(temp_table) = temp_registry.get(&table_name) {
898 debug!("Adding temp table {} to CTE context", table_name);
899 let view = DataView::new(temp_table);
900 cte_context.insert(table_name, Arc::new(view));
901 }
902 }
903 }
904
905 if !statement.ctes.is_empty() {
906 plan_builder.begin_step(
907 StepType::CTE,
908 format!("Process {} CTEs", statement.ctes.len()),
909 );
910
911 for cte in &statement.ctes {
912 let cte_start = Instant::now();
913 plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
914
915 let cte_result = match &cte.cte_type {
916 CTEType::Standard(query) => {
917 if let Some(ref from_source) = query.from_source {
919 match from_source {
920 TableSource::Table(name) => {
921 plan_builder.add_detail(format!("Source: {}", name));
922 }
923 TableSource::DerivedTable { alias, .. } => {
924 plan_builder
925 .add_detail(format!("Source: derived table ({})", alias));
926 }
927 TableSource::Pivot { .. } => {
928 plan_builder.add_detail("Source: PIVOT".to_string());
929 }
930 }
931 }
932 if query.where_clause.is_some() {
933 plan_builder.add_detail("Has WHERE clause".to_string());
934 }
935 if query.group_by.is_some() {
936 plan_builder.add_detail("Has GROUP BY".to_string());
937 }
938
939 debug!(
940 "QueryEngine: Processing CTE '{}' with existing context: {:?}",
941 cte.name,
942 cte_context.keys().collect::<Vec<_>>()
943 );
944
945 let mut subquery_executor = SubqueryExecutor::with_cte_context(
948 self.clone(),
949 table.clone(),
950 cte_context.clone(),
951 );
952 let processed_query = subquery_executor.execute_subqueries(query)?;
953
954 let view = self.build_view_with_context(
955 table.clone(),
956 processed_query,
957 &mut cte_context,
958 )?;
959
960 let mut materialized = self.materialize_view(view)?;
962
963 for column in materialized.columns_mut() {
965 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
966 column.source_table = Some(cte.name.clone());
967 }
968
969 DataView::new(Arc::new(materialized))
970 }
971 CTEType::Web(web_spec) => {
972 plan_builder.add_detail(format!("URL: {}", web_spec.url));
973 if let Some(format) = &web_spec.format {
974 plan_builder.add_detail(format!("Format: {:?}", format));
975 }
976 if let Some(cache) = web_spec.cache_seconds {
977 plan_builder.add_detail(format!("Cache: {} seconds", cache));
978 }
979
980 use crate::web::http_fetcher::WebDataFetcher;
982
983 let fetcher = WebDataFetcher::new()?;
984 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
986
987 for column in data_table.columns_mut() {
989 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
990 column.source_table = Some(cte.name.clone());
991 }
992
993 DataView::new(Arc::new(data_table))
995 }
996 CTEType::File(file_spec) => {
997 plan_builder.add_detail(format!("PATH: {}", file_spec.path));
998 if file_spec.recursive {
999 plan_builder.add_detail("RECURSIVE".to_string());
1000 }
1001 if let Some(ref g) = file_spec.glob {
1002 plan_builder.add_detail(format!("GLOB: {}", g));
1003 }
1004 if let Some(d) = file_spec.max_depth {
1005 plan_builder.add_detail(format!("MAX_DEPTH: {}", d));
1006 }
1007
1008 let mut data_table =
1009 crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
1010
1011 for column in data_table.columns_mut() {
1012 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1013 column.source_table = Some(cte.name.clone());
1014 }
1015
1016 DataView::new(Arc::new(data_table))
1017 }
1018 };
1019
1020 plan_builder.set_rows_out(cte_result.row_count());
1022 plan_builder.add_detail(format!(
1023 "Result: {} rows, {} columns",
1024 cte_result.row_count(),
1025 cte_result.column_count()
1026 ));
1027 plan_builder.add_detail(format!(
1028 "Execution time: {:.3}ms",
1029 cte_start.elapsed().as_secs_f64() * 1000.0
1030 ));
1031
1032 debug!(
1033 "QueryEngine: Storing CTE '{}' in context with {} rows",
1034 cte.name,
1035 cte_result.row_count()
1036 );
1037 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1038 plan_builder.end_step();
1039 }
1040
1041 plan_builder.add_detail(format!(
1042 "All {} CTEs cached in context",
1043 statement.ctes.len()
1044 ));
1045 plan_builder.end_step();
1046 }
1047
1048 plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
1050 let mut subquery_executor =
1051 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
1052
1053 let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
1055 format!("{:?}", w).contains("Subquery")
1057 });
1058
1059 if has_subqueries {
1060 plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
1061 }
1062
1063 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
1064
1065 if has_subqueries {
1066 plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
1067 } else {
1068 plan_builder.add_detail("No subqueries to process".to_string());
1069 }
1070
1071 plan_builder.end_step();
1072 let result = self.build_view_with_context_and_plan(
1073 table,
1074 processed_statement,
1075 &mut cte_context,
1076 &mut plan_builder,
1077 )?;
1078
1079 let total_duration = start_time.elapsed();
1080 info!(
1081 "Query execution complete: total={:?}, rows={}",
1082 total_duration,
1083 result.row_count()
1084 );
1085
1086 let plan = plan_builder.build();
1087 Ok((result, plan))
1088 }
1089
1090 fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
1092 let mut cte_context = HashMap::new();
1093 self.build_view_with_context(table, statement, &mut cte_context)
1094 }
1095
1096 fn build_view_with_context(
1098 &self,
1099 table: Arc<DataTable>,
1100 statement: SelectStatement,
1101 cte_context: &mut HashMap<String, Arc<DataView>>,
1102 ) -> Result<DataView> {
1103 let mut dummy_plan = ExecutionPlanBuilder::new();
1104 let mut exec_context = ExecutionContext::new();
1105 self.build_view_with_context_and_plan_and_exec(
1106 table,
1107 statement,
1108 cte_context,
1109 &mut dummy_plan,
1110 &mut exec_context,
1111 )
1112 }
1113
1114 fn build_view_with_context_and_plan(
1116 &self,
1117 table: Arc<DataTable>,
1118 statement: SelectStatement,
1119 cte_context: &mut HashMap<String, Arc<DataView>>,
1120 plan: &mut ExecutionPlanBuilder,
1121 ) -> Result<DataView> {
1122 let mut exec_context = ExecutionContext::new();
1123 self.build_view_with_context_and_plan_and_exec(
1124 table,
1125 statement,
1126 cte_context,
1127 plan,
1128 &mut exec_context,
1129 )
1130 }
1131
1132 fn build_view_with_context_and_plan_and_exec(
1134 &self,
1135 table: Arc<DataTable>,
1136 statement: SelectStatement,
1137 cte_context: &mut HashMap<String, Arc<DataView>>,
1138 plan: &mut ExecutionPlanBuilder,
1139 exec_context: &mut ExecutionContext,
1140 ) -> Result<DataView> {
1141 for cte in &statement.ctes {
1143 if cte_context.contains_key(&cte.name) {
1145 debug!(
1146 "QueryEngine: CTE '{}' already in context, skipping",
1147 cte.name
1148 );
1149 continue;
1150 }
1151
1152 debug!("QueryEngine: Processing CTE '{}'...", cte.name);
1153 debug!(
1154 "QueryEngine: Available CTEs for '{}': {:?}",
1155 cte.name,
1156 cte_context.keys().collect::<Vec<_>>()
1157 );
1158
1159 let cte_result = match &cte.cte_type {
1161 CTEType::Standard(query) => {
1162 let view =
1163 self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
1164
1165 let mut materialized = self.materialize_view(view)?;
1167
1168 for column in materialized.columns_mut() {
1170 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1171 column.source_table = Some(cte.name.clone());
1172 }
1173
1174 DataView::new(Arc::new(materialized))
1175 }
1176 CTEType::Web(_web_spec) => {
1177 return Err(anyhow!(
1179 "Web CTEs should be processed in execute_select method"
1180 ));
1181 }
1182 CTEType::File(_file_spec) => {
1183 return Err(anyhow!(
1185 "FILE CTEs should be processed in execute_select method"
1186 ));
1187 }
1188 };
1189
1190 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1192 debug!(
1193 "QueryEngine: CTE '{}' processed, stored in context",
1194 cte.name
1195 );
1196 }
1197
1198 let source_table = if let Some(ref from_source) = statement.from_source {
1200 match from_source {
1201 TableSource::Table(table_name) => {
1202 if let Some(cte_view) = cte_context.get(table_name) {
1204 debug!("QueryEngine: Using CTE '{}' as source table", table_name);
1205 let mut materialized = self.materialize_view((**cte_view).clone())?;
1207
1208 #[allow(deprecated)]
1210 if let Some(ref alias) = statement.from_alias {
1211 debug!(
1212 "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1213 alias, table_name
1214 );
1215 for column in materialized.columns_mut() {
1216 if let Some(ref qualified_name) = column.qualified_name {
1218 if qualified_name.starts_with(&format!("{}.", table_name)) {
1219 column.qualified_name = Some(qualified_name.replace(
1220 &format!("{}.", table_name),
1221 &format!("{}.", alias),
1222 ));
1223 }
1224 }
1225 if column.source_table.as_ref() == Some(table_name) {
1227 column.source_table = Some(alias.clone());
1228 }
1229 }
1230 }
1231
1232 Arc::new(materialized)
1233 } else {
1234 table.clone()
1236 }
1237 }
1238 TableSource::DerivedTable { query, alias } => {
1239 debug!(
1241 "QueryEngine: Processing FROM derived table (alias: {})",
1242 alias
1243 );
1244 let subquery_result =
1245 self.build_view_with_context(table.clone(), *query.clone(), cte_context)?;
1246
1247 let mut materialized = self.materialize_view(subquery_result)?;
1250
1251 for column in materialized.columns_mut() {
1255 column.source_table = Some(alias.clone());
1256 }
1257
1258 Arc::new(materialized)
1259 }
1260 TableSource::Pivot { .. } => {
1261 return Err(anyhow!(
1263 "PIVOT in FROM clause should have been expanded by preprocessing pipeline"
1264 ));
1265 }
1266 }
1267 } else {
1268 #[allow(deprecated)]
1270 if let Some(ref table_func) = statement.from_function {
1271 debug!("QueryEngine: Processing table function (deprecated field)...");
1273 match table_func {
1274 TableFunction::Generator { name, args } => {
1275 use crate::sql::generators::GeneratorRegistry;
1277
1278 let registry = GeneratorRegistry::new();
1280
1281 if let Some(generator) = registry.get(name) {
1282 let mut evaluator = ArithmeticEvaluator::with_date_notation(
1284 &table,
1285 self.date_notation.clone(),
1286 );
1287 let dummy_row = 0;
1288
1289 let mut evaluated_args = Vec::new();
1290 for arg in args {
1291 evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
1292 }
1293
1294 generator.generate(evaluated_args)?
1296 } else {
1297 return Err(anyhow!("Unknown generator function: {}", name));
1298 }
1299 }
1300 }
1301 } else {
1302 #[allow(deprecated)]
1303 if let Some(ref subquery) = statement.from_subquery {
1304 debug!("QueryEngine: Processing FROM subquery (deprecated field)...");
1306 let subquery_result = self.build_view_with_context(
1307 table.clone(),
1308 *subquery.clone(),
1309 cte_context,
1310 )?;
1311
1312 let materialized = self.materialize_view(subquery_result)?;
1315 Arc::new(materialized)
1316 } else {
1317 #[allow(deprecated)]
1318 if let Some(ref table_name) = statement.from_table {
1319 if let Some(cte_view) = cte_context.get(table_name) {
1321 debug!(
1322 "QueryEngine: Using CTE '{}' as source table (deprecated field)",
1323 table_name
1324 );
1325 let mut materialized = self.materialize_view((**cte_view).clone())?;
1327
1328 #[allow(deprecated)]
1330 if let Some(ref alias) = statement.from_alias {
1331 debug!(
1332 "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1333 alias, table_name
1334 );
1335 for column in materialized.columns_mut() {
1336 if let Some(ref qualified_name) = column.qualified_name {
1338 if qualified_name.starts_with(&format!("{}.", table_name)) {
1339 column.qualified_name = Some(qualified_name.replace(
1340 &format!("{}.", table_name),
1341 &format!("{}.", alias),
1342 ));
1343 }
1344 }
1345 if column.source_table.as_ref() == Some(table_name) {
1347 column.source_table = Some(alias.clone());
1348 }
1349 }
1350 }
1351
1352 Arc::new(materialized)
1353 } else {
1354 table.clone()
1356 }
1357 } else {
1358 table.clone()
1360 }
1361 }
1362 }
1363 };
1364
1365 #[allow(deprecated)]
1367 if let Some(ref alias) = statement.from_alias {
1368 #[allow(deprecated)]
1369 if let Some(ref table_name) = statement.from_table {
1370 exec_context.register_alias(alias.clone(), table_name.clone());
1371 }
1372 }
1373
1374 let final_table = if !statement.joins.is_empty() {
1376 plan.begin_step(
1377 StepType::Join,
1378 format!("Process {} JOINs", statement.joins.len()),
1379 );
1380 plan.set_rows_in(source_table.row_count());
1381
1382 let join_executor = HashJoinExecutor::new(self.case_insensitive);
1383 let mut current_table = source_table;
1384
1385 for (idx, join_clause) in statement.joins.iter().enumerate() {
1386 let join_start = Instant::now();
1387 plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
1388 plan.add_detail(format!("Type: {:?}", join_clause.join_type));
1389 plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
1390 plan.add_detail(format!(
1391 "Executing {:?} JOIN on {} condition(s)",
1392 join_clause.join_type,
1393 join_clause.condition.conditions.len()
1394 ));
1395
1396 let right_table = match &join_clause.table {
1398 TableSource::Table(name) => {
1399 if let Some(cte_view) = cte_context.get(name) {
1401 let mut materialized = self.materialize_view((**cte_view).clone())?;
1402
1403 if let Some(ref alias) = join_clause.alias {
1405 debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
1406 for column in materialized.columns_mut() {
1407 if let Some(ref qualified_name) = column.qualified_name {
1409 if qualified_name.starts_with(&format!("{}.", name)) {
1410 column.qualified_name = Some(qualified_name.replace(
1411 &format!("{}.", name),
1412 &format!("{}.", alias),
1413 ));
1414 }
1415 }
1416 if column.source_table.as_ref() == Some(name) {
1418 column.source_table = Some(alias.clone());
1419 }
1420 }
1421 }
1422
1423 Arc::new(materialized)
1424 } else {
1425 return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
1428 }
1429 }
1430 TableSource::DerivedTable { query, alias: _ } => {
1431 let subquery_result = self.build_view_with_context(
1433 table.clone(),
1434 *query.clone(),
1435 cte_context,
1436 )?;
1437 let materialized = self.materialize_view(subquery_result)?;
1438 Arc::new(materialized)
1439 }
1440 TableSource::Pivot { .. } => {
1441 return Err(anyhow!("PIVOT in JOIN clause is not yet supported"));
1443 }
1444 };
1445
1446 let joined = join_executor.execute_join(
1448 current_table.clone(),
1449 join_clause,
1450 right_table.clone(),
1451 )?;
1452
1453 plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
1454 plan.set_rows_out(joined.row_count());
1455 plan.add_detail(format!("Result: {} rows", joined.row_count()));
1456 plan.add_detail(format!(
1457 "Join time: {:.3}ms",
1458 join_start.elapsed().as_secs_f64() * 1000.0
1459 ));
1460 plan.end_step();
1461
1462 current_table = Arc::new(joined);
1463 }
1464
1465 plan.set_rows_out(current_table.row_count());
1466 plan.add_detail(format!(
1467 "Final result after all joins: {} rows",
1468 current_table.row_count()
1469 ));
1470 plan.end_step();
1471 current_table
1472 } else {
1473 source_table
1474 };
1475
1476 self.build_view_internal_with_plan_and_exec(
1478 final_table,
1479 statement,
1480 plan,
1481 Some(exec_context),
1482 )
1483 }
1484
1485 pub fn materialize_view(&self, view: DataView) -> Result<DataTable> {
1487 let source = view.source();
1488 let mut result_table = DataTable::new("derived");
1489
1490 let visible_cols = view.visible_column_indices().to_vec();
1492
1493 for col_idx in &visible_cols {
1495 let col = &source.columns[*col_idx];
1496 let new_col = DataColumn {
1497 name: col.name.clone(),
1498 data_type: col.data_type.clone(),
1499 nullable: col.nullable,
1500 unique_values: col.unique_values,
1501 null_count: col.null_count,
1502 metadata: col.metadata.clone(),
1503 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), };
1506 result_table.add_column(new_col);
1507 }
1508
1509 for row_idx in view.visible_row_indices() {
1511 let source_row = &source.rows[*row_idx];
1512 let mut new_row = DataRow { values: Vec::new() };
1513
1514 for col_idx in &visible_cols {
1515 new_row.values.push(source_row.values[*col_idx].clone());
1516 }
1517
1518 result_table.add_row(new_row);
1519 }
1520
1521 Ok(result_table)
1522 }
1523
1524 fn build_view_internal(
1525 &self,
1526 table: Arc<DataTable>,
1527 statement: SelectStatement,
1528 ) -> Result<DataView> {
1529 let mut dummy_plan = ExecutionPlanBuilder::new();
1530 self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
1531 }
1532
1533 fn build_view_internal_with_plan(
1534 &self,
1535 table: Arc<DataTable>,
1536 statement: SelectStatement,
1537 plan: &mut ExecutionPlanBuilder,
1538 ) -> Result<DataView> {
1539 self.build_view_internal_with_plan_and_exec(table, statement, plan, None)
1540 }
1541
1542 fn build_view_internal_with_plan_and_exec(
1543 &self,
1544 table: Arc<DataTable>,
1545 statement: SelectStatement,
1546 plan: &mut ExecutionPlanBuilder,
1547 exec_context: Option<&ExecutionContext>,
1548 ) -> Result<DataView> {
1549 debug!(
1550 "QueryEngine::build_view - select_items: {:?}",
1551 statement.select_items
1552 );
1553 debug!(
1554 "QueryEngine::build_view - where_clause: {:?}",
1555 statement.where_clause
1556 );
1557
1558 let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
1560
1561 if let Some(where_clause) = &statement.where_clause {
1563 let total_rows = table.row_count();
1564 debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
1565 debug!("QueryEngine: WHERE clause = {:?}", where_clause);
1566
1567 plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
1568 plan.set_rows_in(total_rows);
1569 plan.add_detail(format!("Input: {} rows", total_rows));
1570
1571 for condition in &where_clause.conditions {
1573 plan.add_detail(format!("Condition: {:?}", condition.expr));
1574 }
1575
1576 let filter_start = Instant::now();
1577 let mut eval_context = EvaluationContext::new(self.case_insensitive);
1579
1580 let mut evaluator = if let Some(exec_ctx) = exec_context {
1582 RecursiveWhereEvaluator::with_both_contexts(&table, &mut eval_context, exec_ctx)
1584 } else {
1585 RecursiveWhereEvaluator::with_context(&table, &mut eval_context)
1586 };
1587
1588 let mut filtered_rows = Vec::new();
1590 for row_idx in visible_rows {
1591 if row_idx < 3 {
1593 debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
1594 }
1595
1596 match evaluator.evaluate(where_clause, row_idx) {
1597 Ok(result) => {
1598 if row_idx < 3 {
1599 debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
1600 }
1601 if result {
1602 filtered_rows.push(row_idx);
1603 }
1604 }
1605 Err(e) => {
1606 if row_idx < 3 {
1607 debug!(
1608 "QueryEngine: WHERE evaluation error for row {}: {}",
1609 row_idx, e
1610 );
1611 }
1612 return Err(e);
1614 }
1615 }
1616 }
1617
1618 let (compilations, cache_hits) = eval_context.get_stats();
1620 if compilations > 0 || cache_hits > 0 {
1621 debug!(
1622 "LIKE pattern cache: {} compilations, {} cache hits",
1623 compilations, cache_hits
1624 );
1625 }
1626 visible_rows = filtered_rows;
1627 let filter_duration = filter_start.elapsed();
1628 info!(
1629 "WHERE clause filtering: {} rows -> {} rows in {:?}",
1630 total_rows,
1631 visible_rows.len(),
1632 filter_duration
1633 );
1634
1635 plan.set_rows_out(visible_rows.len());
1636 plan.add_detail(format!("Output: {} rows", visible_rows.len()));
1637 plan.add_detail(format!(
1638 "Filter time: {:.3}ms",
1639 filter_duration.as_secs_f64() * 1000.0
1640 ));
1641 plan.end_step();
1642 }
1643
1644 let mut view = DataView::new(table.clone());
1646 view = view.with_rows(visible_rows);
1647
1648 if let Some(group_by_exprs) = &statement.group_by {
1650 if !group_by_exprs.is_empty() {
1651 debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
1652
1653 plan.begin_step(
1654 StepType::GroupBy,
1655 format!("GROUP BY {} expressions", group_by_exprs.len()),
1656 );
1657 plan.set_rows_in(view.row_count());
1658 plan.add_detail(format!("Input: {} rows", view.row_count()));
1659 for expr in group_by_exprs {
1660 plan.add_detail(format!("Group by: {:?}", expr));
1661 }
1662
1663 let group_start = Instant::now();
1664 view = self.apply_group_by(
1665 view,
1666 group_by_exprs,
1667 &statement.select_items,
1668 statement.having.as_ref(),
1669 plan,
1670 )?;
1671
1672 plan.set_rows_out(view.row_count());
1673 plan.add_detail(format!("Output: {} groups", view.row_count()));
1674 plan.add_detail(format!(
1675 "Overall time: {:.3}ms",
1676 group_start.elapsed().as_secs_f64() * 1000.0
1677 ));
1678 plan.end_step();
1679 }
1680 } else {
1681 if !statement.select_items.is_empty() {
1683 let has_non_star_items = statement
1685 .select_items
1686 .iter()
1687 .any(|item| !matches!(item, SelectItem::Star { .. }));
1688
1689 if has_non_star_items || statement.select_items.len() > 1 {
1693 view = self.apply_select_items(
1694 view,
1695 &statement.select_items,
1696 &statement,
1697 exec_context,
1698 plan,
1699 )?;
1700 }
1701 } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
1703 debug!("QueryEngine: Using legacy columns path");
1704 let source_table = view.source();
1707 let column_indices =
1708 self.resolve_column_indices(source_table, &statement.columns)?;
1709 view = view.with_columns(column_indices);
1710 }
1711 }
1712
1713 if statement.distinct {
1715 plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
1716 plan.set_rows_in(view.row_count());
1717 plan.add_detail(format!("Input: {} rows", view.row_count()));
1718
1719 let distinct_start = Instant::now();
1720 view = self.apply_distinct(view)?;
1721
1722 plan.set_rows_out(view.row_count());
1723 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1724 plan.add_detail(format!(
1725 "Distinct time: {:.3}ms",
1726 distinct_start.elapsed().as_secs_f64() * 1000.0
1727 ));
1728 plan.end_step();
1729 }
1730
1731 if let Some(order_by_columns) = &statement.order_by {
1733 if !order_by_columns.is_empty() {
1734 plan.begin_step(
1735 StepType::Sort,
1736 format!("ORDER BY {} columns", order_by_columns.len()),
1737 );
1738 plan.set_rows_in(view.row_count());
1739 for col in order_by_columns {
1740 let expr_str = match &col.expr {
1742 SqlExpression::Column(col_ref) => col_ref.name.clone(),
1743 _ => "expr".to_string(),
1744 };
1745 plan.add_detail(format!("{} {:?}", expr_str, col.direction));
1746 }
1747
1748 let sort_start = Instant::now();
1749 view =
1750 self.apply_multi_order_by_with_context(view, order_by_columns, exec_context)?;
1751
1752 plan.add_detail(format!(
1753 "Sort time: {:.3}ms",
1754 sort_start.elapsed().as_secs_f64() * 1000.0
1755 ));
1756 plan.end_step();
1757 }
1758 }
1759
1760 if let Some(limit) = statement.limit {
1762 let offset = statement.offset.unwrap_or(0);
1763 plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1764 plan.set_rows_in(view.row_count());
1765 if offset > 0 {
1766 plan.add_detail(format!("OFFSET: {}", offset));
1767 }
1768 view = view.with_limit(limit, offset);
1769 plan.set_rows_out(view.row_count());
1770 plan.add_detail(format!("Output: {} rows", view.row_count()));
1771 plan.end_step();
1772 }
1773
1774 if !statement.set_operations.is_empty() {
1776 plan.begin_step(
1777 StepType::SetOperation,
1778 format!("Process {} set operations", statement.set_operations.len()),
1779 );
1780 plan.set_rows_in(view.row_count());
1781
1782 let mut combined_table = self.materialize_view(view)?;
1784 let first_columns = combined_table.column_names();
1785 let first_column_count = first_columns.len();
1786
1787 let mut needs_deduplication = false;
1789
1790 for (idx, (operation, next_statement)) in statement.set_operations.iter().enumerate() {
1792 let op_start = Instant::now();
1793 plan.begin_step(
1794 StepType::SetOperation,
1795 format!("{:?} operation #{}", operation, idx + 1),
1796 );
1797
1798 let next_view = if let Some(exec_ctx) = exec_context {
1801 self.build_view_internal_with_plan_and_exec(
1802 table.clone(),
1803 *next_statement.clone(),
1804 plan,
1805 Some(exec_ctx),
1806 )?
1807 } else {
1808 self.build_view_internal_with_plan(
1809 table.clone(),
1810 *next_statement.clone(),
1811 plan,
1812 )?
1813 };
1814
1815 let next_table = self.materialize_view(next_view)?;
1817 let next_columns = next_table.column_names();
1818 let next_column_count = next_columns.len();
1819
1820 if first_column_count != next_column_count {
1822 return Err(anyhow!(
1823 "UNION queries must have the same number of columns: first query has {} columns, but query #{} has {} columns",
1824 first_column_count,
1825 idx + 2,
1826 next_column_count
1827 ));
1828 }
1829
1830 for (col_idx, (first_col, next_col)) in
1832 first_columns.iter().zip(next_columns.iter()).enumerate()
1833 {
1834 if !first_col.eq_ignore_ascii_case(next_col) {
1835 debug!(
1836 "UNION column name mismatch at position {}: '{}' vs '{}' (using first query's name)",
1837 col_idx + 1,
1838 first_col,
1839 next_col
1840 );
1841 }
1842 }
1843
1844 plan.add_detail(format!("Left: {} rows", combined_table.row_count()));
1845 plan.add_detail(format!("Right: {} rows", next_table.row_count()));
1846
1847 match operation {
1849 SetOperation::UnionAll => {
1850 for row in next_table.rows.iter() {
1852 combined_table.add_row(row.clone());
1853 }
1854 plan.add_detail(format!(
1855 "Result: {} rows (no deduplication)",
1856 combined_table.row_count()
1857 ));
1858 }
1859 SetOperation::Union => {
1860 for row in next_table.rows.iter() {
1862 combined_table.add_row(row.clone());
1863 }
1864 needs_deduplication = true;
1865 plan.add_detail(format!(
1866 "Combined: {} rows (deduplication pending)",
1867 combined_table.row_count()
1868 ));
1869 }
1870 SetOperation::Intersect => {
1871 return Err(anyhow!("INTERSECT is not yet implemented"));
1874 }
1875 SetOperation::Except => {
1876 return Err(anyhow!("EXCEPT is not yet implemented"));
1879 }
1880 }
1881
1882 plan.add_detail(format!(
1883 "Operation time: {:.3}ms",
1884 op_start.elapsed().as_secs_f64() * 1000.0
1885 ));
1886 plan.set_rows_out(combined_table.row_count());
1887 plan.end_step();
1888 }
1889
1890 plan.set_rows_out(combined_table.row_count());
1891 plan.add_detail(format!(
1892 "Combined result: {} rows after {} operations",
1893 combined_table.row_count(),
1894 statement.set_operations.len()
1895 ));
1896 plan.end_step();
1897
1898 view = DataView::new(Arc::new(combined_table));
1900
1901 if needs_deduplication {
1903 plan.begin_step(
1904 StepType::Distinct,
1905 "UNION deduplication - remove duplicate rows".to_string(),
1906 );
1907 plan.set_rows_in(view.row_count());
1908 plan.add_detail(format!("Input: {} rows", view.row_count()));
1909
1910 let distinct_start = Instant::now();
1911 view = self.apply_distinct(view)?;
1912
1913 plan.set_rows_out(view.row_count());
1914 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1915 plan.add_detail(format!(
1916 "Deduplication time: {:.3}ms",
1917 distinct_start.elapsed().as_secs_f64() * 1000.0
1918 ));
1919 plan.end_step();
1920 }
1921 }
1922
1923 Ok(view)
1924 }
1925
1926 fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1928 let mut indices = Vec::new();
1929 let table_columns = table.column_names();
1930
1931 for col_name in columns {
1932 let index = table_columns
1933 .iter()
1934 .position(|c| c.eq_ignore_ascii_case(col_name))
1935 .ok_or_else(|| {
1936 let suggestion = self.find_similar_column(table, col_name);
1937 match suggestion {
1938 Some(similar) => anyhow::anyhow!(
1939 "Column '{}' not found. Did you mean '{}'?",
1940 col_name,
1941 similar
1942 ),
1943 None => anyhow::anyhow!("Column '{}' not found", col_name),
1944 }
1945 })?;
1946 indices.push(index);
1947 }
1948
1949 Ok(indices)
1950 }
1951
1952 fn apply_select_items(
1954 &self,
1955 view: DataView,
1956 select_items: &[SelectItem],
1957 _statement: &SelectStatement,
1958 exec_context: Option<&ExecutionContext>,
1959 plan: &mut ExecutionPlanBuilder,
1960 ) -> Result<DataView> {
1961 debug!(
1962 "QueryEngine::apply_select_items - items: {:?}",
1963 select_items
1964 );
1965 debug!(
1966 "QueryEngine::apply_select_items - input view has {} rows",
1967 view.row_count()
1968 );
1969
1970 let has_window_functions = select_items.iter().any(|item| match item {
1972 SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
1973 _ => false,
1974 });
1975
1976 let window_func_count: usize = select_items
1978 .iter()
1979 .filter(|item| match item {
1980 SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
1981 _ => false,
1982 })
1983 .count();
1984
1985 let window_start = if has_window_functions {
1987 debug!(
1988 "QueryEngine::apply_select_items - detected {} window functions",
1989 window_func_count
1990 );
1991
1992 let window_specs = Self::extract_window_specs(select_items);
1994 debug!("Extracted {} window function specs", window_specs.len());
1995
1996 Some(Instant::now())
1997 } else {
1998 None
1999 };
2000
2001 let has_unnest = select_items.iter().any(|item| match item {
2003 SelectItem::Expression { expr, .. } => Self::contains_unnest(expr),
2004 _ => false,
2005 });
2006
2007 if has_unnest {
2008 debug!("QueryEngine::apply_select_items - UNNEST detected, using row expansion");
2009 return self.apply_select_with_row_expansion(view, select_items);
2010 }
2011
2012 let has_aggregates = select_items.iter().any(|item| match item {
2016 SelectItem::Expression { expr, .. } => contains_aggregate(expr),
2017 SelectItem::Column { .. } => false,
2018 SelectItem::Star { .. } => false,
2019 SelectItem::StarExclude { .. } => false,
2020 });
2021
2022 let all_aggregate_compatible = select_items.iter().all(|item| match item {
2023 SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
2024 SelectItem::Column { .. } => false, SelectItem::Star { .. } => false, SelectItem::StarExclude { .. } => false, });
2028
2029 if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
2030 debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
2033 return self.apply_aggregate_select(view, select_items);
2034 }
2035
2036 let has_computed_expressions = select_items
2038 .iter()
2039 .any(|item| matches!(item, SelectItem::Expression { .. }));
2040
2041 debug!(
2042 "QueryEngine::apply_select_items - has_computed_expressions: {}",
2043 has_computed_expressions
2044 );
2045
2046 if !has_computed_expressions {
2047 let column_indices = self.resolve_select_columns(view.source(), select_items)?;
2049 return Ok(view.with_columns(column_indices));
2050 }
2051
2052 let source_table = view.source();
2057 let visible_rows = view.visible_row_indices();
2058
2059 let mut computed_table = DataTable::new("query_result");
2062
2063 let mut expanded_items = Vec::new();
2065 for item in select_items {
2066 match item {
2067 SelectItem::Star { table_prefix, .. } => {
2068 if let Some(prefix) = table_prefix {
2069 debug!("QueryEngine::apply_select_items - expanding {}.*", prefix);
2071 for col in &source_table.columns {
2072 if Self::column_matches_table(col, prefix) {
2073 expanded_items.push(SelectItem::Column {
2074 column: ColumnRef::unquoted(col.name.clone()),
2075 leading_comments: vec![],
2076 trailing_comment: None,
2077 });
2078 }
2079 }
2080 } else {
2081 debug!("QueryEngine::apply_select_items - expanding *");
2083 for col_name in source_table.column_names() {
2084 expanded_items.push(SelectItem::Column {
2085 column: ColumnRef::unquoted(col_name.to_string()),
2086 leading_comments: vec![],
2087 trailing_comment: None,
2088 });
2089 }
2090 }
2091 }
2092 _ => expanded_items.push(item.clone()),
2093 }
2094 }
2095
2096 let mut column_name_counts: std::collections::HashMap<String, usize> =
2098 std::collections::HashMap::new();
2099
2100 for item in &expanded_items {
2101 let base_name = match item {
2102 SelectItem::Column {
2103 column: col_ref, ..
2104 } => col_ref.name.clone(),
2105 SelectItem::Expression { alias, .. } => alias.clone(),
2106 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2107 SelectItem::StarExclude { .. } => {
2108 unreachable!("StarExclude should have been expanded")
2109 }
2110 };
2111
2112 let count = column_name_counts.entry(base_name.clone()).or_insert(0);
2114 let column_name = if *count == 0 {
2115 base_name.clone()
2117 } else {
2118 format!("{base_name}_{count}")
2120 };
2121 *count += 1;
2122
2123 computed_table.add_column(DataColumn::new(&column_name));
2124 }
2125
2126 let can_use_batch = expanded_items.iter().all(|item| {
2130 match item {
2131 SelectItem::Expression { expr, .. } => {
2132 matches!(expr, SqlExpression::WindowFunction { .. })
2135 || !Self::contains_window_function(expr)
2136 }
2137 _ => true, }
2139 });
2140
2141 let use_batch_evaluation = can_use_batch
2144 && std::env::var("SQL_CLI_BATCH_WINDOW")
2145 .map(|v| v != "0" && v.to_lowercase() != "false")
2146 .unwrap_or(true);
2147
2148 let batch_window_specs = if use_batch_evaluation && has_window_functions {
2150 debug!("BATCH window function evaluation flag is enabled");
2151 let specs = Self::extract_window_specs(&expanded_items);
2153 debug!(
2154 "Extracted {} window function specs for batch evaluation",
2155 specs.len()
2156 );
2157 Some(specs)
2158 } else {
2159 None
2160 };
2161
2162 let mut evaluator =
2164 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2165
2166 if let Some(exec_ctx) = exec_context {
2168 let aliases = exec_ctx.get_aliases();
2169 if !aliases.is_empty() {
2170 debug!(
2171 "Applying {} aliases to evaluator: {:?}",
2172 aliases.len(),
2173 aliases
2174 );
2175 evaluator = evaluator.with_table_aliases(aliases);
2176 }
2177 }
2178
2179 if has_window_functions {
2182 let preload_start = Instant::now();
2183
2184 let mut window_specs = Vec::new();
2186 for item in &expanded_items {
2187 if let SelectItem::Expression { expr, .. } = item {
2188 Self::collect_window_specs(expr, &mut window_specs);
2189 }
2190 }
2191
2192 for spec in &window_specs {
2194 let _ = evaluator.get_or_create_window_context(spec);
2195 }
2196
2197 debug!(
2198 "Pre-created {} WindowContext(s) in {:.2}ms",
2199 window_specs.len(),
2200 preload_start.elapsed().as_secs_f64() * 1000.0
2201 );
2202 }
2203
2204 if let Some(window_specs) = batch_window_specs {
2206 debug!("Starting batch window function evaluation");
2207 let batch_start = Instant::now();
2208
2209 let mut batch_results: Vec<Vec<DataValue>> =
2211 vec![vec![DataValue::Null; expanded_items.len()]; visible_rows.len()];
2212
2213 let detailed_window_specs = &window_specs;
2215
2216 let mut specs_by_window: HashMap<
2218 u64,
2219 Vec<&crate::data::batch_window_evaluator::WindowFunctionSpec>,
2220 > = HashMap::new();
2221 for spec in detailed_window_specs {
2222 let hash = spec.spec.compute_hash();
2223 specs_by_window
2224 .entry(hash)
2225 .or_insert_with(Vec::new)
2226 .push(spec);
2227 }
2228
2229 for (_window_hash, specs) in specs_by_window {
2231 let context = evaluator.get_or_create_window_context(&specs[0].spec)?;
2233
2234 for spec in specs {
2236 match spec.function_name.as_str() {
2237 "LAG" => {
2238 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2240 let column_name = col_ref.name.as_str();
2241 let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2242 spec.args.get(1)
2243 {
2244 n.parse::<i64>().unwrap_or(1)
2245 } else {
2246 1 };
2248
2249 let values = context.evaluate_lag_batch(
2250 visible_rows,
2251 column_name,
2252 offset,
2253 )?;
2254
2255 for (row_idx, value) in values.into_iter().enumerate() {
2257 batch_results[row_idx][spec.output_column_index] = value;
2258 }
2259 }
2260 }
2261 "LEAD" => {
2262 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2264 let column_name = col_ref.name.as_str();
2265 let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2266 spec.args.get(1)
2267 {
2268 n.parse::<i64>().unwrap_or(1)
2269 } else {
2270 1 };
2272
2273 let values = context.evaluate_lead_batch(
2274 visible_rows,
2275 column_name,
2276 offset,
2277 )?;
2278
2279 for (row_idx, value) in values.into_iter().enumerate() {
2281 batch_results[row_idx][spec.output_column_index] = value;
2282 }
2283 }
2284 }
2285 "ROW_NUMBER" => {
2286 let values = context.evaluate_row_number_batch(visible_rows)?;
2287
2288 for (row_idx, value) in values.into_iter().enumerate() {
2290 batch_results[row_idx][spec.output_column_index] = value;
2291 }
2292 }
2293 "RANK" => {
2294 let values = context.evaluate_rank_batch(visible_rows)?;
2295
2296 for (row_idx, value) in values.into_iter().enumerate() {
2298 batch_results[row_idx][spec.output_column_index] = value;
2299 }
2300 }
2301 "DENSE_RANK" => {
2302 let values = context.evaluate_dense_rank_batch(visible_rows)?;
2303
2304 for (row_idx, value) in values.into_iter().enumerate() {
2306 batch_results[row_idx][spec.output_column_index] = value;
2307 }
2308 }
2309 "SUM" => {
2310 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2311 let column_name = col_ref.name.as_str();
2312 let values =
2313 context.evaluate_sum_batch(visible_rows, column_name)?;
2314
2315 for (row_idx, value) in values.into_iter().enumerate() {
2316 batch_results[row_idx][spec.output_column_index] = value;
2317 }
2318 }
2319 }
2320 "AVG" => {
2321 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2322 let column_name = col_ref.name.as_str();
2323 let values =
2324 context.evaluate_avg_batch(visible_rows, column_name)?;
2325
2326 for (row_idx, value) in values.into_iter().enumerate() {
2327 batch_results[row_idx][spec.output_column_index] = value;
2328 }
2329 }
2330 }
2331 "MIN" => {
2332 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2333 let column_name = col_ref.name.as_str();
2334 let values =
2335 context.evaluate_min_batch(visible_rows, column_name)?;
2336
2337 for (row_idx, value) in values.into_iter().enumerate() {
2338 batch_results[row_idx][spec.output_column_index] = value;
2339 }
2340 }
2341 }
2342 "MAX" => {
2343 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2344 let column_name = col_ref.name.as_str();
2345 let values =
2346 context.evaluate_max_batch(visible_rows, column_name)?;
2347
2348 for (row_idx, value) in values.into_iter().enumerate() {
2349 batch_results[row_idx][spec.output_column_index] = value;
2350 }
2351 }
2352 }
2353 "COUNT" => {
2354 let column_name = match spec.args.get(0) {
2356 Some(SqlExpression::Column(col_ref)) => Some(col_ref.name.as_str()),
2357 Some(SqlExpression::StringLiteral(s)) if s == "*" => None,
2358 _ => None,
2359 };
2360
2361 let values = context.evaluate_count_batch(visible_rows, column_name)?;
2362
2363 for (row_idx, value) in values.into_iter().enumerate() {
2364 batch_results[row_idx][spec.output_column_index] = value;
2365 }
2366 }
2367 "FIRST_VALUE" => {
2368 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2369 let column_name = col_ref.name.as_str();
2370 let values = context
2371 .evaluate_first_value_batch(visible_rows, column_name)?;
2372
2373 for (row_idx, value) in values.into_iter().enumerate() {
2374 batch_results[row_idx][spec.output_column_index] = value;
2375 }
2376 }
2377 }
2378 "LAST_VALUE" => {
2379 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2380 let column_name = col_ref.name.as_str();
2381 let values =
2382 context.evaluate_last_value_batch(visible_rows, column_name)?;
2383
2384 for (row_idx, value) in values.into_iter().enumerate() {
2385 batch_results[row_idx][spec.output_column_index] = value;
2386 }
2387 }
2388 }
2389 _ => {
2390 debug!(
2392 "Window function {} not supported in batch mode, using per-row",
2393 spec.function_name
2394 );
2395 }
2396 }
2397 }
2398 }
2399
2400 for (result_row_idx, &source_row_idx) in visible_rows.iter().enumerate() {
2402 for (col_idx, item) in expanded_items.iter().enumerate() {
2403 if !matches!(batch_results[result_row_idx][col_idx], DataValue::Null) {
2405 continue;
2406 }
2407
2408 let value = match item {
2409 SelectItem::Column {
2410 column: col_ref, ..
2411 } => {
2412 match evaluator
2413 .evaluate(&SqlExpression::Column(col_ref.clone()), source_row_idx)
2414 {
2415 Ok(val) => val,
2416 Err(e) => {
2417 return Err(anyhow!(
2418 "Failed to evaluate column {}: {}",
2419 col_ref.to_sql(),
2420 e
2421 ));
2422 }
2423 }
2424 }
2425 SelectItem::Expression { expr, .. } => {
2426 if matches!(expr, SqlExpression::WindowFunction { .. }) {
2429 continue;
2431 }
2432 evaluator.evaluate(&expr, source_row_idx)?
2435 }
2436 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2437 SelectItem::StarExclude { .. } => {
2438 unreachable!("StarExclude should have been expanded")
2439 }
2440 };
2441 batch_results[result_row_idx][col_idx] = value;
2442 }
2443 }
2444
2445 for row_values in batch_results {
2447 computed_table
2448 .add_row(DataRow::new(row_values))
2449 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2450 }
2451
2452 debug!(
2453 "Batch window evaluation completed in {:.3}ms",
2454 batch_start.elapsed().as_secs_f64() * 1000.0
2455 );
2456 } else {
2457 for &row_idx in visible_rows {
2459 let mut row_values = Vec::new();
2460
2461 for item in &expanded_items {
2462 let value = match item {
2463 SelectItem::Column {
2464 column: col_ref, ..
2465 } => {
2466 match evaluator
2468 .evaluate(&SqlExpression::Column(col_ref.clone()), row_idx)
2469 {
2470 Ok(val) => val,
2471 Err(e) => {
2472 return Err(anyhow!(
2473 "Failed to evaluate column {}: {}",
2474 col_ref.to_sql(),
2475 e
2476 ));
2477 }
2478 }
2479 }
2480 SelectItem::Expression { expr, .. } => {
2481 evaluator.evaluate(&expr, row_idx)?
2483 }
2484 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2485 SelectItem::StarExclude { .. } => {
2486 unreachable!("StarExclude should have been expanded")
2487 }
2488 };
2489 row_values.push(value);
2490 }
2491
2492 computed_table
2493 .add_row(DataRow::new(row_values))
2494 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2495 }
2496 }
2497
2498 if let Some(start) = window_start {
2500 let window_duration = start.elapsed();
2501 info!(
2502 "Window function evaluation took {:.2}ms for {} rows ({} window functions)",
2503 window_duration.as_secs_f64() * 1000.0,
2504 visible_rows.len(),
2505 window_func_count
2506 );
2507
2508 plan.begin_step(
2510 StepType::WindowFunction,
2511 format!("Evaluate {} window function(s)", window_func_count),
2512 );
2513 plan.set_rows_in(visible_rows.len());
2514 plan.set_rows_out(visible_rows.len());
2515 plan.add_detail(format!("Input: {} rows", visible_rows.len()));
2516 plan.add_detail(format!("{} window functions evaluated", window_func_count));
2517 plan.add_detail(format!(
2518 "Evaluation time: {:.3}ms",
2519 window_duration.as_secs_f64() * 1000.0
2520 ));
2521 plan.end_step();
2522 }
2523
2524 Ok(DataView::new(Arc::new(computed_table)))
2527 }
2528
2529 fn apply_select_with_row_expansion(
2531 &self,
2532 view: DataView,
2533 select_items: &[SelectItem],
2534 ) -> Result<DataView> {
2535 debug!("QueryEngine::apply_select_with_row_expansion - expanding rows");
2536
2537 let source_table = view.source();
2538 let visible_rows = view.visible_row_indices();
2539 let expander_registry = RowExpanderRegistry::new();
2540
2541 let mut result_table = DataTable::new("unnest_result");
2543
2544 let mut expanded_items = Vec::new();
2546 for item in select_items {
2547 match item {
2548 SelectItem::Star { table_prefix, .. } => {
2549 if let Some(prefix) = table_prefix {
2550 debug!(
2552 "QueryEngine::apply_select_with_row_expansion - expanding {}.*",
2553 prefix
2554 );
2555 for col in &source_table.columns {
2556 if Self::column_matches_table(col, prefix) {
2557 expanded_items.push(SelectItem::Column {
2558 column: ColumnRef::unquoted(col.name.clone()),
2559 leading_comments: vec![],
2560 trailing_comment: None,
2561 });
2562 }
2563 }
2564 } else {
2565 debug!("QueryEngine::apply_select_with_row_expansion - expanding *");
2567 for col_name in source_table.column_names() {
2568 expanded_items.push(SelectItem::Column {
2569 column: ColumnRef::unquoted(col_name.to_string()),
2570 leading_comments: vec![],
2571 trailing_comment: None,
2572 });
2573 }
2574 }
2575 }
2576 _ => expanded_items.push(item.clone()),
2577 }
2578 }
2579
2580 for item in &expanded_items {
2582 let column_name = match item {
2583 SelectItem::Column {
2584 column: col_ref, ..
2585 } => col_ref.name.clone(),
2586 SelectItem::Expression { alias, .. } => alias.clone(),
2587 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2588 SelectItem::StarExclude { .. } => {
2589 unreachable!("StarExclude should have been expanded")
2590 }
2591 };
2592 result_table.add_column(DataColumn::new(&column_name));
2593 }
2594
2595 let mut evaluator =
2597 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2598
2599 for &row_idx in visible_rows {
2600 let mut unnest_expansions = Vec::new();
2602 let mut unnest_indices = Vec::new();
2603
2604 for (col_idx, item) in expanded_items.iter().enumerate() {
2605 if let SelectItem::Expression { expr, .. } = item {
2606 if let Some(expansion_result) = self.try_expand_unnest(
2607 &expr,
2608 source_table,
2609 row_idx,
2610 &mut evaluator,
2611 &expander_registry,
2612 )? {
2613 unnest_expansions.push(expansion_result);
2614 unnest_indices.push(col_idx);
2615 }
2616 }
2617 }
2618
2619 let expansion_count = if unnest_expansions.is_empty() {
2621 1 } else {
2623 unnest_expansions
2624 .iter()
2625 .map(|exp| exp.row_count())
2626 .max()
2627 .unwrap_or(1)
2628 };
2629
2630 for output_idx in 0..expansion_count {
2632 let mut row_values = Vec::new();
2633
2634 for (col_idx, item) in expanded_items.iter().enumerate() {
2635 let unnest_position = unnest_indices.iter().position(|&idx| idx == col_idx);
2637
2638 let value = if let Some(unnest_idx) = unnest_position {
2639 let expansion = &unnest_expansions[unnest_idx];
2641 expansion
2642 .values
2643 .get(output_idx)
2644 .cloned()
2645 .unwrap_or(DataValue::Null)
2646 } else {
2647 match item {
2649 SelectItem::Column {
2650 column: col_ref, ..
2651 } => {
2652 let col_idx =
2653 source_table.get_column_index(&col_ref.name).ok_or_else(
2654 || anyhow::anyhow!("Column '{}' not found", col_ref.name),
2655 )?;
2656 let row = source_table
2657 .get_row(row_idx)
2658 .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
2659 row.get(col_idx)
2660 .ok_or_else(|| {
2661 anyhow::anyhow!("Column {} not found in row", col_idx)
2662 })?
2663 .clone()
2664 }
2665 SelectItem::Expression { expr, .. } => {
2666 evaluator.evaluate(&expr, row_idx)?
2668 }
2669 SelectItem::Star { .. } => unreachable!(),
2670 SelectItem::StarExclude { .. } => {
2671 unreachable!("StarExclude should have been expanded")
2672 }
2673 }
2674 };
2675
2676 row_values.push(value);
2677 }
2678
2679 result_table
2680 .add_row(DataRow::new(row_values))
2681 .map_err(|e| anyhow::anyhow!("Failed to add expanded row: {}", e))?;
2682 }
2683 }
2684
2685 debug!(
2686 "QueryEngine::apply_select_with_row_expansion - input rows: {}, output rows: {}",
2687 visible_rows.len(),
2688 result_table.row_count()
2689 );
2690
2691 Ok(DataView::new(Arc::new(result_table)))
2692 }
2693
2694 fn try_expand_unnest(
2697 &self,
2698 expr: &SqlExpression,
2699 _source_table: &DataTable,
2700 row_idx: usize,
2701 evaluator: &mut ArithmeticEvaluator,
2702 expander_registry: &RowExpanderRegistry,
2703 ) -> Result<Option<crate::data::row_expanders::ExpansionResult>> {
2704 if let SqlExpression::Unnest { column, delimiter } = expr {
2706 let column_value = evaluator.evaluate(column, row_idx)?;
2708
2709 let delimiter_value = DataValue::String(delimiter.clone());
2711
2712 let expander = expander_registry
2714 .get("UNNEST")
2715 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2716
2717 let expansion = expander.expand(&column_value, &[delimiter_value])?;
2719 return Ok(Some(expansion));
2720 }
2721
2722 if let SqlExpression::FunctionCall { name, args, .. } = expr {
2724 if name.to_uppercase() == "UNNEST" {
2725 if args.len() != 2 {
2727 return Err(anyhow::anyhow!(
2728 "UNNEST requires exactly 2 arguments: UNNEST(column, delimiter)"
2729 ));
2730 }
2731
2732 let column_value = evaluator.evaluate(&args[0], row_idx)?;
2734
2735 let delimiter_value = evaluator.evaluate(&args[1], row_idx)?;
2737
2738 let expander = expander_registry
2740 .get("UNNEST")
2741 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2742
2743 let expansion = expander.expand(&column_value, &[delimiter_value])?;
2745 return Ok(Some(expansion));
2746 }
2747 }
2748
2749 Ok(None)
2750 }
2751
2752 fn apply_aggregate_select(
2754 &self,
2755 view: DataView,
2756 select_items: &[SelectItem],
2757 ) -> Result<DataView> {
2758 debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
2759
2760 let source_table = view.source();
2761 let mut result_table = DataTable::new("aggregate_result");
2762
2763 for item in select_items {
2765 let column_name = match item {
2766 SelectItem::Expression { alias, .. } => alias.clone(),
2767 _ => unreachable!("Should only have expressions in aggregate-only query"),
2768 };
2769 result_table.add_column(DataColumn::new(&column_name));
2770 }
2771
2772 let visible_rows = view.visible_row_indices().to_vec();
2774 let mut evaluator =
2775 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
2776 .with_visible_rows(visible_rows);
2777
2778 let mut row_values = Vec::new();
2780 for item in select_items {
2781 match item {
2782 SelectItem::Expression { expr, .. } => {
2783 let value = evaluator.evaluate(expr, 0)?;
2786 row_values.push(value);
2787 }
2788 _ => unreachable!("Should only have expressions in aggregate-only query"),
2789 }
2790 }
2791
2792 result_table
2794 .add_row(DataRow::new(row_values))
2795 .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
2796
2797 Ok(DataView::new(Arc::new(result_table)))
2798 }
2799
2800 fn column_matches_table(col: &DataColumn, table_name: &str) -> bool {
2812 if let Some(ref source) = col.source_table {
2814 if source == table_name || source.ends_with(&format!(".{}", table_name)) {
2816 return true;
2817 }
2818 }
2819
2820 if let Some(ref qualified) = col.qualified_name {
2822 if qualified.starts_with(&format!("{}.", table_name)) {
2824 return true;
2825 }
2826 }
2827
2828 false
2829 }
2830
2831 fn resolve_select_columns(
2833 &self,
2834 table: &DataTable,
2835 select_items: &[SelectItem],
2836 ) -> Result<Vec<usize>> {
2837 let mut indices = Vec::new();
2838 let table_columns = table.column_names();
2839
2840 for item in select_items {
2841 match item {
2842 SelectItem::Column {
2843 column: col_ref, ..
2844 } => {
2845 let index = if let Some(table_prefix) = &col_ref.table_prefix {
2847 let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
2849 table.find_column_by_qualified_name(&qualified_name)
2850 .ok_or_else(|| {
2851 let has_qualified = table.columns.iter()
2853 .any(|c| c.qualified_name.is_some());
2854 if !has_qualified {
2855 anyhow::anyhow!(
2856 "Column '{}' not found. Note: Table '{}' may not support qualified column names",
2857 qualified_name, table_prefix
2858 )
2859 } else {
2860 anyhow::anyhow!("Column '{}' not found", qualified_name)
2861 }
2862 })?
2863 } else {
2864 table_columns
2866 .iter()
2867 .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
2868 .ok_or_else(|| {
2869 let suggestion = self.find_similar_column(table, &col_ref.name);
2870 match suggestion {
2871 Some(similar) => anyhow::anyhow!(
2872 "Column '{}' not found. Did you mean '{}'?",
2873 col_ref.name,
2874 similar
2875 ),
2876 None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
2877 }
2878 })?
2879 };
2880 indices.push(index);
2881 }
2882 SelectItem::Star { table_prefix, .. } => {
2883 if let Some(prefix) = table_prefix {
2884 for (i, col) in table.columns.iter().enumerate() {
2886 if Self::column_matches_table(col, prefix) {
2887 indices.push(i);
2888 }
2889 }
2890 } else {
2891 for i in 0..table_columns.len() {
2893 indices.push(i);
2894 }
2895 }
2896 }
2897 SelectItem::StarExclude {
2898 table_prefix,
2899 excluded_columns,
2900 ..
2901 } => {
2902 if let Some(prefix) = table_prefix {
2904 for (i, col) in table.columns.iter().enumerate() {
2906 if Self::column_matches_table(col, prefix)
2907 && !excluded_columns.contains(&col.name)
2908 {
2909 indices.push(i);
2910 }
2911 }
2912 } else {
2913 for (i, col_name) in table_columns.iter().enumerate() {
2915 if !excluded_columns
2916 .iter()
2917 .any(|exc| exc.eq_ignore_ascii_case(col_name))
2918 {
2919 indices.push(i);
2920 }
2921 }
2922 }
2923 }
2924 SelectItem::Expression { .. } => {
2925 return Err(anyhow::anyhow!(
2926 "Computed expressions require new table creation"
2927 ));
2928 }
2929 }
2930 }
2931
2932 Ok(indices)
2933 }
2934
2935 fn apply_distinct(&self, view: DataView) -> Result<DataView> {
2937 use std::collections::HashSet;
2938
2939 let source = view.source();
2940 let visible_cols = view.visible_column_indices();
2941 let visible_rows = view.visible_row_indices();
2942
2943 let mut seen_rows = HashSet::new();
2945 let mut unique_row_indices = Vec::new();
2946
2947 for &row_idx in visible_rows {
2948 let mut row_key = Vec::new();
2950 for &col_idx in visible_cols {
2951 let value = source
2952 .get_value(row_idx, col_idx)
2953 .ok_or_else(|| anyhow!("Invalid cell reference"))?;
2954 row_key.push(format!("{:?}", value));
2956 }
2957
2958 if seen_rows.insert(row_key) {
2960 unique_row_indices.push(row_idx);
2962 }
2963 }
2964
2965 Ok(view.with_rows(unique_row_indices))
2967 }
2968
2969 fn apply_multi_order_by(
2971 &self,
2972 view: DataView,
2973 order_by_columns: &[OrderByItem],
2974 ) -> Result<DataView> {
2975 self.apply_multi_order_by_with_context(view, order_by_columns, None)
2976 }
2977
2978 fn apply_multi_order_by_with_context(
2980 &self,
2981 mut view: DataView,
2982 order_by_columns: &[OrderByItem],
2983 _exec_context: Option<&ExecutionContext>,
2984 ) -> Result<DataView> {
2985 let mut sort_columns = Vec::new();
2987
2988 for order_col in order_by_columns {
2989 let column_name = match &order_col.expr {
2991 SqlExpression::Column(col_ref) => col_ref.name.clone(),
2992 _ => {
2993 return Err(anyhow!(
2995 "ORDER BY expressions not yet supported - only simple columns allowed"
2996 ));
2997 }
2998 };
2999
3000 let col_index = if column_name.contains('.') {
3002 if let Some(dot_pos) = column_name.rfind('.') {
3004 let col_name = &column_name[dot_pos + 1..];
3005
3006 debug!(
3009 "ORDER BY: Extracting unqualified column '{}' from '{}'",
3010 col_name, column_name
3011 );
3012 view.source().get_column_index(col_name)
3013 } else {
3014 view.source().get_column_index(&column_name)
3015 }
3016 } else {
3017 view.source().get_column_index(&column_name)
3019 }
3020 .ok_or_else(|| {
3021 let suggestion = self.find_similar_column(view.source(), &column_name);
3023 match suggestion {
3024 Some(similar) => anyhow::anyhow!(
3025 "Column '{}' not found. Did you mean '{}'?",
3026 column_name,
3027 similar
3028 ),
3029 None => {
3030 let available_cols = view.source().column_names().join(", ");
3032 anyhow::anyhow!(
3033 "Column '{}' not found. Available columns: {}",
3034 column_name,
3035 available_cols
3036 )
3037 }
3038 }
3039 })?;
3040
3041 let ascending = matches!(order_col.direction, SortDirection::Asc);
3042 sort_columns.push((col_index, ascending));
3043 }
3044
3045 view.apply_multi_sort(&sort_columns)?;
3047 Ok(view)
3048 }
3049
3050 fn apply_group_by(
3052 &self,
3053 view: DataView,
3054 group_by_exprs: &[SqlExpression],
3055 select_items: &[SelectItem],
3056 having: Option<&SqlExpression>,
3057 plan: &mut ExecutionPlanBuilder,
3058 ) -> Result<DataView> {
3059 let (result_view, phase_info) = self.apply_group_by_expressions(
3061 view,
3062 group_by_exprs,
3063 select_items,
3064 having,
3065 self.case_insensitive,
3066 self.date_notation.clone(),
3067 )?;
3068
3069 plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
3071 plan.add_detail(format!(
3072 "Phase 1 - Group Building: {:.3}ms",
3073 phase_info.phase2_key_building.as_secs_f64() * 1000.0
3074 ));
3075 plan.add_detail(format!(
3076 " • Processing {} rows into {} groups",
3077 phase_info.total_rows, phase_info.num_groups
3078 ));
3079 plan.add_detail(format!(
3080 "Phase 2 - Aggregation: {:.3}ms",
3081 phase_info.phase4_aggregation.as_secs_f64() * 1000.0
3082 ));
3083 if phase_info.phase4_having_evaluation > Duration::ZERO {
3084 plan.add_detail(format!(
3085 "Phase 3 - HAVING Filter: {:.3}ms",
3086 phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
3087 ));
3088 plan.add_detail(format!(
3089 " • Filtered {} groups",
3090 phase_info.groups_filtered_by_having
3091 ));
3092 }
3093 plan.add_detail(format!(
3094 "Total GROUP BY time: {:.3}ms",
3095 phase_info.total_time.as_secs_f64() * 1000.0
3096 ));
3097
3098 Ok(result_view)
3099 }
3100
3101 pub fn estimate_group_cardinality(
3104 &self,
3105 view: &DataView,
3106 group_by_exprs: &[SqlExpression],
3107 ) -> usize {
3108 let row_count = view.get_visible_rows().len();
3110 if row_count <= 100 {
3111 return row_count;
3112 }
3113
3114 let sample_size = min(1000, row_count / 10).max(100);
3116 let mut seen = FxHashSet::default();
3117
3118 let visible_rows = view.get_visible_rows();
3119 for (i, &row_idx) in visible_rows.iter().enumerate() {
3120 if i >= sample_size {
3121 break;
3122 }
3123
3124 let mut key_values = Vec::new();
3126 for expr in group_by_exprs {
3127 let mut evaluator = ArithmeticEvaluator::new(view.source());
3128 let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
3129 key_values.push(value);
3130 }
3131
3132 seen.insert(key_values);
3133 }
3134
3135 let sample_cardinality = seen.len();
3137 let estimated = (sample_cardinality * row_count) / sample_size;
3138
3139 estimated.min(row_count).max(sample_cardinality)
3141 }
3142}
3143
3144#[cfg(test)]
3145mod tests {
3146 use super::*;
3147 use crate::data::datatable::{DataColumn, DataRow, DataValue};
3148
3149 fn create_test_table() -> Arc<DataTable> {
3150 let mut table = DataTable::new("test");
3151
3152 table.add_column(DataColumn::new("id"));
3154 table.add_column(DataColumn::new("name"));
3155 table.add_column(DataColumn::new("age"));
3156
3157 table
3159 .add_row(DataRow::new(vec![
3160 DataValue::Integer(1),
3161 DataValue::String("Alice".to_string()),
3162 DataValue::Integer(30),
3163 ]))
3164 .unwrap();
3165
3166 table
3167 .add_row(DataRow::new(vec![
3168 DataValue::Integer(2),
3169 DataValue::String("Bob".to_string()),
3170 DataValue::Integer(25),
3171 ]))
3172 .unwrap();
3173
3174 table
3175 .add_row(DataRow::new(vec![
3176 DataValue::Integer(3),
3177 DataValue::String("Charlie".to_string()),
3178 DataValue::Integer(35),
3179 ]))
3180 .unwrap();
3181
3182 Arc::new(table)
3183 }
3184
3185 #[test]
3186 fn test_select_all() {
3187 let table = create_test_table();
3188 let engine = QueryEngine::new();
3189
3190 let view = engine
3191 .execute(table.clone(), "SELECT * FROM users")
3192 .unwrap();
3193 assert_eq!(view.row_count(), 3);
3194 assert_eq!(view.column_count(), 3);
3195 }
3196
3197 #[test]
3198 fn test_select_columns() {
3199 let table = create_test_table();
3200 let engine = QueryEngine::new();
3201
3202 let view = engine
3203 .execute(table.clone(), "SELECT name, age FROM users")
3204 .unwrap();
3205 assert_eq!(view.row_count(), 3);
3206 assert_eq!(view.column_count(), 2);
3207 }
3208
3209 #[test]
3210 fn test_select_with_limit() {
3211 let table = create_test_table();
3212 let engine = QueryEngine::new();
3213
3214 let view = engine
3215 .execute(table.clone(), "SELECT * FROM users LIMIT 2")
3216 .unwrap();
3217 assert_eq!(view.row_count(), 2);
3218 }
3219
3220 #[test]
3221 fn test_type_coercion_contains() {
3222 let _ = tracing_subscriber::fmt()
3224 .with_max_level(tracing::Level::DEBUG)
3225 .try_init();
3226
3227 let mut table = DataTable::new("test");
3228 table.add_column(DataColumn::new("id"));
3229 table.add_column(DataColumn::new("status"));
3230 table.add_column(DataColumn::new("price"));
3231
3232 table
3234 .add_row(DataRow::new(vec![
3235 DataValue::Integer(1),
3236 DataValue::String("Pending".to_string()),
3237 DataValue::Float(99.99),
3238 ]))
3239 .unwrap();
3240
3241 table
3242 .add_row(DataRow::new(vec![
3243 DataValue::Integer(2),
3244 DataValue::String("Confirmed".to_string()),
3245 DataValue::Float(150.50),
3246 ]))
3247 .unwrap();
3248
3249 table
3250 .add_row(DataRow::new(vec![
3251 DataValue::Integer(3),
3252 DataValue::String("Pending".to_string()),
3253 DataValue::Float(75.00),
3254 ]))
3255 .unwrap();
3256
3257 let table = Arc::new(table);
3258 let engine = QueryEngine::new();
3259
3260 println!("\n=== Testing WHERE clause with Contains ===");
3261 println!("Table has {} rows", table.row_count());
3262 for i in 0..table.row_count() {
3263 let status = table.get_value(i, 1);
3264 println!("Row {i}: status = {status:?}");
3265 }
3266
3267 println!("\n--- Test 1: status.Contains('pend') ---");
3269 let result = engine.execute(
3270 table.clone(),
3271 "SELECT * FROM test WHERE status.Contains('pend')",
3272 );
3273 match result {
3274 Ok(view) => {
3275 println!("SUCCESS: Found {} matching rows", view.row_count());
3276 assert_eq!(view.row_count(), 2); }
3278 Err(e) => {
3279 panic!("Query failed: {e}");
3280 }
3281 }
3282
3283 println!("\n--- Test 2: price.Contains('9') ---");
3285 let result = engine.execute(
3286 table.clone(),
3287 "SELECT * FROM test WHERE price.Contains('9')",
3288 );
3289 match result {
3290 Ok(view) => {
3291 println!(
3292 "SUCCESS: Found {} matching rows with price containing '9'",
3293 view.row_count()
3294 );
3295 assert!(view.row_count() >= 1);
3297 }
3298 Err(e) => {
3299 panic!("Numeric coercion query failed: {e}");
3300 }
3301 }
3302
3303 println!("\n=== All tests passed! ===");
3304 }
3305
3306 #[test]
3307 fn test_not_in_clause() {
3308 let _ = tracing_subscriber::fmt()
3310 .with_max_level(tracing::Level::DEBUG)
3311 .try_init();
3312
3313 let mut table = DataTable::new("test");
3314 table.add_column(DataColumn::new("id"));
3315 table.add_column(DataColumn::new("country"));
3316
3317 table
3319 .add_row(DataRow::new(vec![
3320 DataValue::Integer(1),
3321 DataValue::String("CA".to_string()),
3322 ]))
3323 .unwrap();
3324
3325 table
3326 .add_row(DataRow::new(vec![
3327 DataValue::Integer(2),
3328 DataValue::String("US".to_string()),
3329 ]))
3330 .unwrap();
3331
3332 table
3333 .add_row(DataRow::new(vec![
3334 DataValue::Integer(3),
3335 DataValue::String("UK".to_string()),
3336 ]))
3337 .unwrap();
3338
3339 let table = Arc::new(table);
3340 let engine = QueryEngine::new();
3341
3342 println!("\n=== Testing NOT IN clause ===");
3343 println!("Table has {} rows", table.row_count());
3344 for i in 0..table.row_count() {
3345 let country = table.get_value(i, 1);
3346 println!("Row {i}: country = {country:?}");
3347 }
3348
3349 println!("\n--- Test: country NOT IN ('CA') ---");
3351 let result = engine.execute(
3352 table.clone(),
3353 "SELECT * FROM test WHERE country NOT IN ('CA')",
3354 );
3355 match result {
3356 Ok(view) => {
3357 println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
3358 assert_eq!(view.row_count(), 2); }
3360 Err(e) => {
3361 panic!("NOT IN query failed: {e}");
3362 }
3363 }
3364
3365 println!("\n=== NOT IN test complete! ===");
3366 }
3367
3368 #[test]
3369 fn test_case_insensitive_in_and_not_in() {
3370 let _ = tracing_subscriber::fmt()
3372 .with_max_level(tracing::Level::DEBUG)
3373 .try_init();
3374
3375 let mut table = DataTable::new("test");
3376 table.add_column(DataColumn::new("id"));
3377 table.add_column(DataColumn::new("country"));
3378
3379 table
3381 .add_row(DataRow::new(vec![
3382 DataValue::Integer(1),
3383 DataValue::String("CA".to_string()), ]))
3385 .unwrap();
3386
3387 table
3388 .add_row(DataRow::new(vec![
3389 DataValue::Integer(2),
3390 DataValue::String("us".to_string()), ]))
3392 .unwrap();
3393
3394 table
3395 .add_row(DataRow::new(vec![
3396 DataValue::Integer(3),
3397 DataValue::String("UK".to_string()), ]))
3399 .unwrap();
3400
3401 let table = Arc::new(table);
3402
3403 println!("\n=== Testing Case-Insensitive IN clause ===");
3404 println!("Table has {} rows", table.row_count());
3405 for i in 0..table.row_count() {
3406 let country = table.get_value(i, 1);
3407 println!("Row {i}: country = {country:?}");
3408 }
3409
3410 println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
3412 let engine = QueryEngine::with_case_insensitive(true);
3413 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3414 match result {
3415 Ok(view) => {
3416 println!(
3417 "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
3418 view.row_count()
3419 );
3420 assert_eq!(view.row_count(), 1); }
3422 Err(e) => {
3423 panic!("Case-insensitive IN query failed: {e}");
3424 }
3425 }
3426
3427 println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
3429 let result = engine.execute(
3430 table.clone(),
3431 "SELECT * FROM test WHERE country NOT IN ('ca')",
3432 );
3433 match result {
3434 Ok(view) => {
3435 println!(
3436 "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
3437 view.row_count()
3438 );
3439 assert_eq!(view.row_count(), 2); }
3441 Err(e) => {
3442 panic!("Case-insensitive NOT IN query failed: {e}");
3443 }
3444 }
3445
3446 println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
3448 let engine_case_sensitive = QueryEngine::new(); let result = engine_case_sensitive
3450 .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3451 match result {
3452 Ok(view) => {
3453 println!(
3454 "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
3455 view.row_count()
3456 );
3457 assert_eq!(view.row_count(), 0); }
3459 Err(e) => {
3460 panic!("Case-sensitive IN query failed: {e}");
3461 }
3462 }
3463
3464 println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
3465 }
3466
3467 #[test]
3468 #[ignore = "Parentheses in WHERE clause not yet implemented"]
3469 fn test_parentheses_in_where_clause() {
3470 let _ = tracing_subscriber::fmt()
3472 .with_max_level(tracing::Level::DEBUG)
3473 .try_init();
3474
3475 let mut table = DataTable::new("test");
3476 table.add_column(DataColumn::new("id"));
3477 table.add_column(DataColumn::new("status"));
3478 table.add_column(DataColumn::new("priority"));
3479
3480 table
3482 .add_row(DataRow::new(vec![
3483 DataValue::Integer(1),
3484 DataValue::String("Pending".to_string()),
3485 DataValue::String("High".to_string()),
3486 ]))
3487 .unwrap();
3488
3489 table
3490 .add_row(DataRow::new(vec![
3491 DataValue::Integer(2),
3492 DataValue::String("Complete".to_string()),
3493 DataValue::String("High".to_string()),
3494 ]))
3495 .unwrap();
3496
3497 table
3498 .add_row(DataRow::new(vec![
3499 DataValue::Integer(3),
3500 DataValue::String("Pending".to_string()),
3501 DataValue::String("Low".to_string()),
3502 ]))
3503 .unwrap();
3504
3505 table
3506 .add_row(DataRow::new(vec![
3507 DataValue::Integer(4),
3508 DataValue::String("Complete".to_string()),
3509 DataValue::String("Low".to_string()),
3510 ]))
3511 .unwrap();
3512
3513 let table = Arc::new(table);
3514 let engine = QueryEngine::new();
3515
3516 println!("\n=== Testing Parentheses in WHERE clause ===");
3517 println!("Table has {} rows", table.row_count());
3518 for i in 0..table.row_count() {
3519 let status = table.get_value(i, 1);
3520 let priority = table.get_value(i, 2);
3521 println!("Row {i}: status = {status:?}, priority = {priority:?}");
3522 }
3523
3524 println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
3526 let result = engine.execute(
3527 table.clone(),
3528 "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
3529 );
3530 match result {
3531 Ok(view) => {
3532 println!(
3533 "SUCCESS: Found {} rows with parenthetical logic",
3534 view.row_count()
3535 );
3536 assert_eq!(view.row_count(), 2); }
3538 Err(e) => {
3539 panic!("Parentheses query failed: {e}");
3540 }
3541 }
3542
3543 println!("\n=== Parentheses test complete! ===");
3544 }
3545
3546 #[test]
3547 #[ignore = "Numeric type coercion needs fixing"]
3548 fn test_numeric_type_coercion() {
3549 let _ = tracing_subscriber::fmt()
3551 .with_max_level(tracing::Level::DEBUG)
3552 .try_init();
3553
3554 let mut table = DataTable::new("test");
3555 table.add_column(DataColumn::new("id"));
3556 table.add_column(DataColumn::new("price"));
3557 table.add_column(DataColumn::new("quantity"));
3558
3559 table
3561 .add_row(DataRow::new(vec![
3562 DataValue::Integer(1),
3563 DataValue::Float(99.50), DataValue::Integer(100),
3565 ]))
3566 .unwrap();
3567
3568 table
3569 .add_row(DataRow::new(vec![
3570 DataValue::Integer(2),
3571 DataValue::Float(150.0), DataValue::Integer(200),
3573 ]))
3574 .unwrap();
3575
3576 table
3577 .add_row(DataRow::new(vec![
3578 DataValue::Integer(3),
3579 DataValue::Integer(75), DataValue::Integer(50),
3581 ]))
3582 .unwrap();
3583
3584 let table = Arc::new(table);
3585 let engine = QueryEngine::new();
3586
3587 println!("\n=== Testing Numeric Type Coercion ===");
3588 println!("Table has {} rows", table.row_count());
3589 for i in 0..table.row_count() {
3590 let price = table.get_value(i, 1);
3591 let quantity = table.get_value(i, 2);
3592 println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
3593 }
3594
3595 println!("\n--- Test: price.Contains('.') ---");
3597 let result = engine.execute(
3598 table.clone(),
3599 "SELECT * FROM test WHERE price.Contains('.')",
3600 );
3601 match result {
3602 Ok(view) => {
3603 println!(
3604 "SUCCESS: Found {} rows with decimal points in price",
3605 view.row_count()
3606 );
3607 assert_eq!(view.row_count(), 2); }
3609 Err(e) => {
3610 panic!("Numeric Contains query failed: {e}");
3611 }
3612 }
3613
3614 println!("\n--- Test: quantity.Contains('0') ---");
3616 let result = engine.execute(
3617 table.clone(),
3618 "SELECT * FROM test WHERE quantity.Contains('0')",
3619 );
3620 match result {
3621 Ok(view) => {
3622 println!(
3623 "SUCCESS: Found {} rows with '0' in quantity",
3624 view.row_count()
3625 );
3626 assert_eq!(view.row_count(), 2); }
3628 Err(e) => {
3629 panic!("Integer Contains query failed: {e}");
3630 }
3631 }
3632
3633 println!("\n=== Numeric type coercion test complete! ===");
3634 }
3635
3636 #[test]
3637 fn test_datetime_comparisons() {
3638 let _ = tracing_subscriber::fmt()
3640 .with_max_level(tracing::Level::DEBUG)
3641 .try_init();
3642
3643 let mut table = DataTable::new("test");
3644 table.add_column(DataColumn::new("id"));
3645 table.add_column(DataColumn::new("created_date"));
3646
3647 table
3649 .add_row(DataRow::new(vec![
3650 DataValue::Integer(1),
3651 DataValue::String("2024-12-15".to_string()),
3652 ]))
3653 .unwrap();
3654
3655 table
3656 .add_row(DataRow::new(vec![
3657 DataValue::Integer(2),
3658 DataValue::String("2025-01-15".to_string()),
3659 ]))
3660 .unwrap();
3661
3662 table
3663 .add_row(DataRow::new(vec![
3664 DataValue::Integer(3),
3665 DataValue::String("2025-02-15".to_string()),
3666 ]))
3667 .unwrap();
3668
3669 let table = Arc::new(table);
3670 let engine = QueryEngine::new();
3671
3672 println!("\n=== Testing DateTime Comparisons ===");
3673 println!("Table has {} rows", table.row_count());
3674 for i in 0..table.row_count() {
3675 let date = table.get_value(i, 1);
3676 println!("Row {i}: created_date = {date:?}");
3677 }
3678
3679 println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
3681 let result = engine.execute(
3682 table.clone(),
3683 "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
3684 );
3685 match result {
3686 Ok(view) => {
3687 println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
3688 assert_eq!(view.row_count(), 2); }
3690 Err(e) => {
3691 panic!("DateTime comparison query failed: {e}");
3692 }
3693 }
3694
3695 println!("\n=== DateTime comparison test complete! ===");
3696 }
3697
3698 #[test]
3699 fn test_not_with_method_calls() {
3700 let _ = tracing_subscriber::fmt()
3702 .with_max_level(tracing::Level::DEBUG)
3703 .try_init();
3704
3705 let mut table = DataTable::new("test");
3706 table.add_column(DataColumn::new("id"));
3707 table.add_column(DataColumn::new("status"));
3708
3709 table
3711 .add_row(DataRow::new(vec![
3712 DataValue::Integer(1),
3713 DataValue::String("Pending Review".to_string()),
3714 ]))
3715 .unwrap();
3716
3717 table
3718 .add_row(DataRow::new(vec![
3719 DataValue::Integer(2),
3720 DataValue::String("Complete".to_string()),
3721 ]))
3722 .unwrap();
3723
3724 table
3725 .add_row(DataRow::new(vec![
3726 DataValue::Integer(3),
3727 DataValue::String("Pending Approval".to_string()),
3728 ]))
3729 .unwrap();
3730
3731 let table = Arc::new(table);
3732 let engine = QueryEngine::with_case_insensitive(true);
3733
3734 println!("\n=== Testing NOT with Method Calls ===");
3735 println!("Table has {} rows", table.row_count());
3736 for i in 0..table.row_count() {
3737 let status = table.get_value(i, 1);
3738 println!("Row {i}: status = {status:?}");
3739 }
3740
3741 println!("\n--- Test: NOT status.Contains('pend') ---");
3743 let result = engine.execute(
3744 table.clone(),
3745 "SELECT * FROM test WHERE NOT status.Contains('pend')",
3746 );
3747 match result {
3748 Ok(view) => {
3749 println!(
3750 "SUCCESS: Found {} rows NOT containing 'pend'",
3751 view.row_count()
3752 );
3753 assert_eq!(view.row_count(), 1); }
3755 Err(e) => {
3756 panic!("NOT Contains query failed: {e}");
3757 }
3758 }
3759
3760 println!("\n--- Test: NOT status.StartsWith('Pending') ---");
3762 let result = engine.execute(
3763 table.clone(),
3764 "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
3765 );
3766 match result {
3767 Ok(view) => {
3768 println!(
3769 "SUCCESS: Found {} rows NOT starting with 'Pending'",
3770 view.row_count()
3771 );
3772 assert_eq!(view.row_count(), 1); }
3774 Err(e) => {
3775 panic!("NOT StartsWith query failed: {e}");
3776 }
3777 }
3778
3779 println!("\n=== NOT with method calls test complete! ===");
3780 }
3781
3782 #[test]
3783 #[ignore = "Complex logical expressions with parentheses not yet implemented"]
3784 fn test_complex_logical_expressions() {
3785 let _ = tracing_subscriber::fmt()
3787 .with_max_level(tracing::Level::DEBUG)
3788 .try_init();
3789
3790 let mut table = DataTable::new("test");
3791 table.add_column(DataColumn::new("id"));
3792 table.add_column(DataColumn::new("status"));
3793 table.add_column(DataColumn::new("priority"));
3794 table.add_column(DataColumn::new("assigned"));
3795
3796 table
3798 .add_row(DataRow::new(vec![
3799 DataValue::Integer(1),
3800 DataValue::String("Pending".to_string()),
3801 DataValue::String("High".to_string()),
3802 DataValue::String("John".to_string()),
3803 ]))
3804 .unwrap();
3805
3806 table
3807 .add_row(DataRow::new(vec![
3808 DataValue::Integer(2),
3809 DataValue::String("Complete".to_string()),
3810 DataValue::String("High".to_string()),
3811 DataValue::String("Jane".to_string()),
3812 ]))
3813 .unwrap();
3814
3815 table
3816 .add_row(DataRow::new(vec![
3817 DataValue::Integer(3),
3818 DataValue::String("Pending".to_string()),
3819 DataValue::String("Low".to_string()),
3820 DataValue::String("John".to_string()),
3821 ]))
3822 .unwrap();
3823
3824 table
3825 .add_row(DataRow::new(vec![
3826 DataValue::Integer(4),
3827 DataValue::String("In Progress".to_string()),
3828 DataValue::String("Medium".to_string()),
3829 DataValue::String("Jane".to_string()),
3830 ]))
3831 .unwrap();
3832
3833 let table = Arc::new(table);
3834 let engine = QueryEngine::new();
3835
3836 println!("\n=== Testing Complex Logical Expressions ===");
3837 println!("Table has {} rows", table.row_count());
3838 for i in 0..table.row_count() {
3839 let status = table.get_value(i, 1);
3840 let priority = table.get_value(i, 2);
3841 let assigned = table.get_value(i, 3);
3842 println!(
3843 "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
3844 );
3845 }
3846
3847 println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
3849 let result = engine.execute(
3850 table.clone(),
3851 "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
3852 );
3853 match result {
3854 Ok(view) => {
3855 println!(
3856 "SUCCESS: Found {} rows with complex logic",
3857 view.row_count()
3858 );
3859 assert_eq!(view.row_count(), 2); }
3861 Err(e) => {
3862 panic!("Complex logic query failed: {e}");
3863 }
3864 }
3865
3866 println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
3868 let result = engine.execute(
3869 table.clone(),
3870 "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
3871 );
3872 match result {
3873 Ok(view) => {
3874 println!(
3875 "SUCCESS: Found {} rows with NOT complex logic",
3876 view.row_count()
3877 );
3878 assert_eq!(view.row_count(), 2); }
3880 Err(e) => {
3881 panic!("NOT complex logic query failed: {e}");
3882 }
3883 }
3884
3885 println!("\n=== Complex logical expressions test complete! ===");
3886 }
3887
3888 #[test]
3889 fn test_mixed_data_types_and_edge_cases() {
3890 let _ = tracing_subscriber::fmt()
3892 .with_max_level(tracing::Level::DEBUG)
3893 .try_init();
3894
3895 let mut table = DataTable::new("test");
3896 table.add_column(DataColumn::new("id"));
3897 table.add_column(DataColumn::new("value"));
3898 table.add_column(DataColumn::new("nullable_field"));
3899
3900 table
3902 .add_row(DataRow::new(vec![
3903 DataValue::Integer(1),
3904 DataValue::String("123.45".to_string()),
3905 DataValue::String("present".to_string()),
3906 ]))
3907 .unwrap();
3908
3909 table
3910 .add_row(DataRow::new(vec![
3911 DataValue::Integer(2),
3912 DataValue::Float(678.90),
3913 DataValue::Null,
3914 ]))
3915 .unwrap();
3916
3917 table
3918 .add_row(DataRow::new(vec![
3919 DataValue::Integer(3),
3920 DataValue::Boolean(true),
3921 DataValue::String("also present".to_string()),
3922 ]))
3923 .unwrap();
3924
3925 table
3926 .add_row(DataRow::new(vec![
3927 DataValue::Integer(4),
3928 DataValue::String("false".to_string()),
3929 DataValue::Null,
3930 ]))
3931 .unwrap();
3932
3933 let table = Arc::new(table);
3934 let engine = QueryEngine::new();
3935
3936 println!("\n=== Testing Mixed Data Types and Edge Cases ===");
3937 println!("Table has {} rows", table.row_count());
3938 for i in 0..table.row_count() {
3939 let value = table.get_value(i, 1);
3940 let nullable = table.get_value(i, 2);
3941 println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
3942 }
3943
3944 println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
3946 let result = engine.execute(
3947 table.clone(),
3948 "SELECT * FROM test WHERE value.Contains('true')",
3949 );
3950 match result {
3951 Ok(view) => {
3952 println!(
3953 "SUCCESS: Found {} rows with boolean coercion",
3954 view.row_count()
3955 );
3956 assert_eq!(view.row_count(), 1); }
3958 Err(e) => {
3959 panic!("Boolean coercion query failed: {e}");
3960 }
3961 }
3962
3963 println!("\n--- Test: id IN (1, 3) ---");
3965 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
3966 match result {
3967 Ok(view) => {
3968 println!("SUCCESS: Found {} rows with IN clause", view.row_count());
3969 assert_eq!(view.row_count(), 2); }
3971 Err(e) => {
3972 panic!("Multiple IN values query failed: {e}");
3973 }
3974 }
3975
3976 println!("\n=== Mixed data types test complete! ===");
3977 }
3978
3979 #[test]
3981 fn test_aggregate_only_single_row() {
3982 let table = create_test_stock_data();
3983 let engine = QueryEngine::new();
3984
3985 let result = engine
3987 .execute(
3988 table.clone(),
3989 "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
3990 )
3991 .expect("Query should succeed");
3992
3993 assert_eq!(
3994 result.row_count(),
3995 1,
3996 "Aggregate-only query should return exactly 1 row"
3997 );
3998 assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
3999
4000 let source = result.source();
4002 let row = source.get_row(0).expect("Should have first row");
4003
4004 assert_eq!(row.values[0], DataValue::Integer(5));
4006
4007 assert_eq!(row.values[1], DataValue::Float(99.5));
4009
4010 assert_eq!(row.values[2], DataValue::Float(105.0));
4012
4013 if let DataValue::Float(avg) = &row.values[3] {
4015 assert!(
4016 (avg - 102.4).abs() < 0.01,
4017 "Average should be approximately 102.4, got {}",
4018 avg
4019 );
4020 } else {
4021 panic!("AVG should return a Float value");
4022 }
4023 }
4024
4025 #[test]
4027 fn test_single_aggregate_single_row() {
4028 let table = create_test_stock_data();
4029 let engine = QueryEngine::new();
4030
4031 let result = engine
4032 .execute(table.clone(), "SELECT COUNT(*) FROM stock")
4033 .expect("Query should succeed");
4034
4035 assert_eq!(
4036 result.row_count(),
4037 1,
4038 "Single aggregate query should return exactly 1 row"
4039 );
4040 assert_eq!(result.column_count(), 1, "Should have 1 column");
4041
4042 let source = result.source();
4043 let row = source.get_row(0).expect("Should have first row");
4044 assert_eq!(row.values[0], DataValue::Integer(5));
4045 }
4046
4047 #[test]
4049 fn test_aggregate_with_where_single_row() {
4050 let table = create_test_stock_data();
4051 let engine = QueryEngine::new();
4052
4053 let result = engine
4055 .execute(
4056 table.clone(),
4057 "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
4058 )
4059 .expect("Query should succeed");
4060
4061 assert_eq!(
4062 result.row_count(),
4063 1,
4064 "Filtered aggregate query should return exactly 1 row"
4065 );
4066 assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
4067
4068 let source = result.source();
4069 let row = source.get_row(0).expect("Should have first row");
4070
4071 assert_eq!(row.values[0], DataValue::Integer(2));
4073 assert_eq!(row.values[1], DataValue::Float(103.5)); assert_eq!(row.values[2], DataValue::Float(105.0)); }
4076
4077 #[test]
4078 fn test_not_in_parsing() {
4079 use crate::sql::recursive_parser::Parser;
4080
4081 let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
4082 println!("\n=== Testing NOT IN parsing ===");
4083 println!("Parsing query: {query}");
4084
4085 let mut parser = Parser::new(query);
4086 match parser.parse() {
4087 Ok(statement) => {
4088 println!("Parsed statement: {statement:#?}");
4089 if let Some(where_clause) = statement.where_clause {
4090 println!("WHERE conditions: {:#?}", where_clause.conditions);
4091 if let Some(first_condition) = where_clause.conditions.first() {
4092 println!("First condition expression: {:#?}", first_condition.expr);
4093 }
4094 }
4095 }
4096 Err(e) => {
4097 panic!("Parse error: {e}");
4098 }
4099 }
4100 }
4101
4102 fn create_test_stock_data() -> Arc<DataTable> {
4104 let mut table = DataTable::new("stock");
4105
4106 table.add_column(DataColumn::new("symbol"));
4107 table.add_column(DataColumn::new("close"));
4108 table.add_column(DataColumn::new("volume"));
4109
4110 let test_data = vec![
4112 ("AAPL", 99.5, 1000),
4113 ("AAPL", 101.2, 1500),
4114 ("AAPL", 103.5, 2000),
4115 ("AAPL", 105.0, 1200),
4116 ("AAPL", 102.8, 1800),
4117 ];
4118
4119 for (symbol, close, volume) in test_data {
4120 table
4121 .add_row(DataRow::new(vec![
4122 DataValue::String(symbol.to_string()),
4123 DataValue::Float(close),
4124 DataValue::Integer(volume),
4125 ]))
4126 .expect("Should add row successfully");
4127 }
4128
4129 Arc::new(table)
4130 }
4131}
4132
4133#[cfg(test)]
4134#[path = "query_engine_tests.rs"]
4135mod query_engine_tests;