1use crate::data::data_view::DataView;
5use crate::data::datatable::{DataTable, DataValue};
6use crate::data::query_engine::QueryEngine;
7use crate::sql::parser::ast::{Condition, SelectItem, SelectStatement, SqlExpression, WhereClause};
8use anyhow::{anyhow, Result};
9use std::collections::{HashMap, HashSet};
10use std::sync::Arc;
11use tracing::{debug, info};
12
13#[derive(Debug, Clone)]
15pub enum SubqueryResult {
16 Scalar(DataValue),
18 ValueSet(HashSet<DataValue>),
20 Table(Arc<DataView>),
22}
23
24pub struct SubqueryExecutor {
26 query_engine: QueryEngine,
27 source_table: Arc<DataTable>,
28 cache: HashMap<String, SubqueryResult>,
30 cte_context: HashMap<String, Arc<DataView>>,
32}
33
34impl SubqueryExecutor {
35 pub fn new(query_engine: QueryEngine, source_table: Arc<DataTable>) -> Self {
37 Self {
38 query_engine,
39 source_table,
40 cache: HashMap::new(),
41 cte_context: HashMap::new(),
42 }
43 }
44
45 pub fn with_cte_context(
47 query_engine: QueryEngine,
48 source_table: Arc<DataTable>,
49 cte_context: HashMap<String, Arc<DataView>>,
50 ) -> Self {
51 Self {
52 query_engine,
53 source_table,
54 cache: HashMap::new(),
55 cte_context,
56 }
57 }
58
59 pub fn execute_subqueries(&mut self, statement: &SelectStatement) -> Result<SelectStatement> {
62 info!("SubqueryExecutor: Starting subquery execution pass");
63 info!(
64 "SubqueryExecutor: Available CTEs: {:?}",
65 self.cte_context.keys().collect::<Vec<_>>()
66 );
67
68 let mut modified_statement = statement.clone();
70
71 if let Some(ref where_clause) = statement.where_clause {
73 debug!("SubqueryExecutor: Processing WHERE clause for subqueries");
74 let mut new_conditions = Vec::new();
75 for condition in &where_clause.conditions {
76 new_conditions.push(Condition {
77 expr: self.process_expression(&condition.expr)?,
78 connector: condition.connector.clone(),
79 });
80 }
81 modified_statement.where_clause = Some(WhereClause {
82 conditions: new_conditions,
83 });
84 }
85
86 let mut new_select_items = Vec::new();
88 for item in &statement.select_items {
89 match item {
90 SelectItem::Column {
91 column: col,
92 leading_comments,
93 trailing_comment,
94 } => {
95 new_select_items.push(SelectItem::Column {
96 column: col.clone(),
97 leading_comments: leading_comments.clone(),
98 trailing_comment: trailing_comment.clone(),
99 });
100 }
101 SelectItem::Expression {
102 expr,
103 alias,
104 leading_comments,
105 trailing_comment,
106 } => {
107 new_select_items.push(SelectItem::Expression {
108 expr: self.process_expression(expr)?,
109 alias: alias.clone(),
110 leading_comments: leading_comments.clone(),
111 trailing_comment: trailing_comment.clone(),
112 });
113 }
114 SelectItem::Star {
115 table_prefix,
116 leading_comments,
117 trailing_comment,
118 } => {
119 new_select_items.push(SelectItem::Star {
120 table_prefix: table_prefix.clone(),
121 leading_comments: leading_comments.clone(),
122 trailing_comment: trailing_comment.clone(),
123 });
124 }
125 }
126 }
127 modified_statement.select_items = new_select_items;
128
129 if let Some(ref having) = statement.having {
131 debug!("SubqueryExecutor: Processing HAVING clause for subqueries");
132 modified_statement.having = Some(self.process_expression(having)?);
133 }
134
135 debug!("SubqueryExecutor: Subquery execution complete");
136 Ok(modified_statement)
137 }
138
139 fn process_expression(&mut self, expr: &SqlExpression) -> Result<SqlExpression> {
141 match expr {
142 SqlExpression::ScalarSubquery { query } => {
143 debug!("SubqueryExecutor: Executing scalar subquery");
144 let result = self.execute_scalar_subquery(query)?;
145 Ok(result)
146 }
147
148 SqlExpression::InSubquery { expr, subquery } => {
149 debug!("SubqueryExecutor: Executing IN subquery");
150 let values = self.execute_in_subquery(subquery)?;
151
152 Ok(SqlExpression::InList {
154 expr: Box::new(self.process_expression(expr)?),
155 values: values
156 .into_iter()
157 .map(|v| match v {
158 DataValue::Null => SqlExpression::Null,
159 DataValue::Integer(i) => SqlExpression::NumberLiteral(i.to_string()),
160 DataValue::Float(f) => SqlExpression::NumberLiteral(f.to_string()),
161 DataValue::String(s) => SqlExpression::StringLiteral(s),
162 DataValue::InternedString(s) => {
163 SqlExpression::StringLiteral(s.to_string())
164 }
165 DataValue::Boolean(b) => SqlExpression::BooleanLiteral(b),
166 DataValue::DateTime(dt) => SqlExpression::StringLiteral(dt),
167 })
168 .collect(),
169 })
170 }
171
172 SqlExpression::NotInSubquery { expr, subquery } => {
173 debug!("SubqueryExecutor: Executing NOT IN subquery");
174 let values = self.execute_in_subquery(subquery)?;
175
176 Ok(SqlExpression::NotInList {
178 expr: Box::new(self.process_expression(expr)?),
179 values: values
180 .into_iter()
181 .map(|v| match v {
182 DataValue::Null => SqlExpression::Null,
183 DataValue::Integer(i) => SqlExpression::NumberLiteral(i.to_string()),
184 DataValue::Float(f) => SqlExpression::NumberLiteral(f.to_string()),
185 DataValue::String(s) => SqlExpression::StringLiteral(s),
186 DataValue::InternedString(s) => {
187 SqlExpression::StringLiteral(s.to_string())
188 }
189 DataValue::Boolean(b) => SqlExpression::BooleanLiteral(b),
190 DataValue::DateTime(dt) => SqlExpression::StringLiteral(dt),
191 })
192 .collect(),
193 })
194 }
195
196 SqlExpression::BinaryOp { left, op, right } => Ok(SqlExpression::BinaryOp {
198 left: Box::new(self.process_expression(left)?),
199 op: op.clone(),
200 right: Box::new(self.process_expression(right)?),
201 }),
202
203 SqlExpression::Between { expr, lower, upper } => Ok(SqlExpression::Between {
206 expr: Box::new(self.process_expression(expr)?),
207 lower: Box::new(self.process_expression(lower)?),
208 upper: Box::new(self.process_expression(upper)?),
209 }),
210
211 SqlExpression::InList { expr, values } => Ok(SqlExpression::InList {
212 expr: Box::new(self.process_expression(expr)?),
213 values: values
214 .iter()
215 .map(|v| self.process_expression(v))
216 .collect::<Result<Vec<_>>>()?,
217 }),
218
219 SqlExpression::NotInList { expr, values } => Ok(SqlExpression::NotInList {
220 expr: Box::new(self.process_expression(expr)?),
221 values: values
222 .iter()
223 .map(|v| self.process_expression(v))
224 .collect::<Result<Vec<_>>>()?,
225 }),
226
227 SqlExpression::FunctionCall {
229 name,
230 args,
231 distinct,
232 } => Ok(SqlExpression::FunctionCall {
233 name: name.clone(),
234 args: args
235 .iter()
236 .map(|a| self.process_expression(a))
237 .collect::<Result<Vec<_>>>()?,
238 distinct: *distinct,
239 }),
240
241 _ => Ok(expr.clone()),
243 }
244 }
245
246 fn execute_scalar_subquery(&mut self, query: &SelectStatement) -> Result<SqlExpression> {
248 let cache_key = format!("scalar:{:?}", query);
249
250 if let Some(cached) = self.cache.get(&cache_key) {
252 debug!("SubqueryExecutor: Using cached scalar subquery result");
253 if let SubqueryResult::Scalar(value) = cached {
254 return Ok(self.datavalue_to_expression(value.clone()));
255 }
256 }
257
258 info!("SubqueryExecutor: Executing scalar subquery");
259
260 let result_view = self.query_engine.execute_statement_with_cte_context(
262 self.source_table.clone(),
263 query.clone(),
264 &self.cte_context,
265 )?;
266
267 if result_view.row_count() != 1 {
269 return Err(anyhow!(
270 "Scalar subquery returned {} rows, expected exactly 1",
271 result_view.row_count()
272 ));
273 }
274
275 if result_view.column_count() != 1 {
276 return Err(anyhow!(
277 "Scalar subquery returned {} columns, expected exactly 1",
278 result_view.column_count()
279 ));
280 }
281
282 let value = if let Some(row) = result_view.get_row(0) {
284 row.values.get(0).cloned().unwrap_or(DataValue::Null)
285 } else {
286 DataValue::Null
287 };
288
289 self.cache
291 .insert(cache_key, SubqueryResult::Scalar(value.clone()));
292
293 Ok(self.datavalue_to_expression(value))
294 }
295
296 fn execute_in_subquery(&mut self, query: &SelectStatement) -> Result<Vec<DataValue>> {
298 let cache_key = format!("in:{:?}", query);
299
300 if let Some(cached) = self.cache.get(&cache_key) {
302 debug!("SubqueryExecutor: Using cached IN subquery result");
303 if let SubqueryResult::ValueSet(values) = cached {
304 return Ok(values.iter().cloned().collect());
305 }
306 }
307
308 info!("SubqueryExecutor: Executing IN subquery");
309 debug!(
310 "SubqueryExecutor: Available CTEs in context: {:?}",
311 self.cte_context.keys().collect::<Vec<_>>()
312 );
313 debug!("SubqueryExecutor: Subquery: {:?}", query);
314
315 let result_view = self.query_engine.execute_statement_with_cte_context(
317 self.source_table.clone(),
318 query.clone(),
319 &self.cte_context,
320 )?;
321
322 debug!(
323 "SubqueryExecutor: IN subquery returned {} rows",
324 result_view.row_count()
325 );
326
327 if result_view.column_count() != 1 {
329 return Err(anyhow!(
330 "IN subquery returned {} columns, expected exactly 1",
331 result_view.column_count()
332 ));
333 }
334
335 let mut values = HashSet::new();
337 for row_idx in 0..result_view.row_count() {
338 if let Some(row) = result_view.get_row(row_idx) {
339 if let Some(value) = row.values.get(0) {
340 values.insert(value.clone());
341 }
342 }
343 }
344
345 self.cache
347 .insert(cache_key, SubqueryResult::ValueSet(values.clone()));
348
349 Ok(values.into_iter().collect())
350 }
351
352 fn datavalue_to_expression(&self, value: DataValue) -> SqlExpression {
354 match value {
355 DataValue::Null => SqlExpression::Null,
356 DataValue::Integer(i) => SqlExpression::NumberLiteral(i.to_string()),
357 DataValue::Float(f) => SqlExpression::NumberLiteral(f.to_string()),
358 DataValue::String(s) => SqlExpression::StringLiteral(s),
359 DataValue::InternedString(s) => SqlExpression::StringLiteral(s.to_string()),
360 DataValue::Boolean(b) => SqlExpression::BooleanLiteral(b),
361 DataValue::DateTime(dt) => SqlExpression::StringLiteral(dt),
362 }
363 }
364}