1use crate::data::data_view::DataView;
5use crate::data::datatable::{DataTable, DataValue};
6use crate::data::query_engine::QueryEngine;
7use crate::sql::parser::ast::{Condition, SelectItem, SelectStatement, SqlExpression, WhereClause};
8use anyhow::{anyhow, Result};
9use std::collections::{HashMap, HashSet};
10use std::sync::Arc;
11use tracing::{debug, info};
12
13#[derive(Debug, Clone)]
15pub enum SubqueryResult {
16 Scalar(DataValue),
18 ValueSet(HashSet<DataValue>),
20 Table(Arc<DataView>),
22}
23
24pub struct SubqueryExecutor {
26 query_engine: QueryEngine,
27 source_table: Arc<DataTable>,
28 cache: HashMap<String, SubqueryResult>,
30 cte_context: HashMap<String, Arc<DataView>>,
32}
33
34impl SubqueryExecutor {
35 pub fn new(query_engine: QueryEngine, source_table: Arc<DataTable>) -> Self {
37 Self {
38 query_engine,
39 source_table,
40 cache: HashMap::new(),
41 cte_context: HashMap::new(),
42 }
43 }
44
45 pub fn with_cte_context(
47 query_engine: QueryEngine,
48 source_table: Arc<DataTable>,
49 cte_context: HashMap<String, Arc<DataView>>,
50 ) -> Self {
51 Self {
52 query_engine,
53 source_table,
54 cache: HashMap::new(),
55 cte_context,
56 }
57 }
58
59 pub fn execute_subqueries(&mut self, statement: &SelectStatement) -> Result<SelectStatement> {
62 info!("SubqueryExecutor: Starting subquery execution pass");
63 info!(
64 "SubqueryExecutor: Available CTEs: {:?}",
65 self.cte_context.keys().collect::<Vec<_>>()
66 );
67
68 let mut modified_statement = statement.clone();
70
71 if let Some(ref where_clause) = statement.where_clause {
73 debug!("SubqueryExecutor: Processing WHERE clause for subqueries");
74 let mut new_conditions = Vec::new();
75 for condition in &where_clause.conditions {
76 new_conditions.push(Condition {
77 expr: self.process_expression(&condition.expr)?,
78 connector: condition.connector.clone(),
79 });
80 }
81 modified_statement.where_clause = Some(WhereClause {
82 conditions: new_conditions,
83 });
84 }
85
86 let mut new_select_items = Vec::new();
88 for item in &statement.select_items {
89 match item {
90 SelectItem::Column {
91 column: col,
92 leading_comments,
93 trailing_comment,
94 } => {
95 new_select_items.push(SelectItem::Column {
96 column: col.clone(),
97 leading_comments: leading_comments.clone(),
98 trailing_comment: trailing_comment.clone(),
99 });
100 }
101 SelectItem::Expression {
102 expr,
103 alias,
104 leading_comments,
105 trailing_comment,
106 } => {
107 new_select_items.push(SelectItem::Expression {
108 expr: self.process_expression(expr)?,
109 alias: alias.clone(),
110 leading_comments: leading_comments.clone(),
111 trailing_comment: trailing_comment.clone(),
112 });
113 }
114 SelectItem::Star {
115 table_prefix,
116 leading_comments,
117 trailing_comment,
118 } => {
119 new_select_items.push(SelectItem::Star {
120 table_prefix: table_prefix.clone(),
121 leading_comments: leading_comments.clone(),
122 trailing_comment: trailing_comment.clone(),
123 });
124 }
125 SelectItem::StarExclude {
126 table_prefix,
127 excluded_columns,
128 leading_comments,
129 trailing_comment,
130 } => {
131 new_select_items.push(SelectItem::StarExclude {
132 table_prefix: table_prefix.clone(),
133 excluded_columns: excluded_columns.clone(),
134 leading_comments: leading_comments.clone(),
135 trailing_comment: trailing_comment.clone(),
136 });
137 }
138 }
139 }
140 modified_statement.select_items = new_select_items;
141
142 if let Some(ref having) = statement.having {
144 debug!("SubqueryExecutor: Processing HAVING clause for subqueries");
145 modified_statement.having = Some(self.process_expression(having)?);
146 }
147
148 debug!("SubqueryExecutor: Subquery execution complete");
149 Ok(modified_statement)
150 }
151
152 fn process_expression(&mut self, expr: &SqlExpression) -> Result<SqlExpression> {
154 match expr {
155 SqlExpression::ScalarSubquery { query } => {
156 debug!("SubqueryExecutor: Executing scalar subquery");
157 let result = self.execute_scalar_subquery(query)?;
158 Ok(result)
159 }
160
161 SqlExpression::InSubquery { expr, subquery } => {
162 debug!("SubqueryExecutor: Executing IN subquery");
163 let values = self.execute_in_subquery(subquery)?;
164
165 Ok(SqlExpression::InList {
167 expr: Box::new(self.process_expression(expr)?),
168 values: values
169 .into_iter()
170 .map(|v| match v {
171 DataValue::Null => SqlExpression::Null,
172 DataValue::Integer(i) => SqlExpression::NumberLiteral(i.to_string()),
173 DataValue::Float(f) => SqlExpression::NumberLiteral(f.to_string()),
174 DataValue::String(s) => SqlExpression::StringLiteral(s),
175 DataValue::InternedString(s) => {
176 SqlExpression::StringLiteral(s.to_string())
177 }
178 DataValue::Boolean(b) => SqlExpression::BooleanLiteral(b),
179 DataValue::DateTime(dt) => SqlExpression::StringLiteral(dt),
180 DataValue::Vector(v) => {
181 let components: Vec<String> =
182 v.iter().map(|f| f.to_string()).collect();
183 SqlExpression::StringLiteral(format!("[{}]", components.join(",")))
184 }
185 })
186 .collect(),
187 })
188 }
189
190 SqlExpression::NotInSubquery { expr, subquery } => {
191 debug!("SubqueryExecutor: Executing NOT IN subquery");
192 let values = self.execute_in_subquery(subquery)?;
193
194 Ok(SqlExpression::NotInList {
196 expr: Box::new(self.process_expression(expr)?),
197 values: values
198 .into_iter()
199 .map(|v| match v {
200 DataValue::Null => SqlExpression::Null,
201 DataValue::Integer(i) => SqlExpression::NumberLiteral(i.to_string()),
202 DataValue::Float(f) => SqlExpression::NumberLiteral(f.to_string()),
203 DataValue::String(s) => SqlExpression::StringLiteral(s),
204 DataValue::InternedString(s) => {
205 SqlExpression::StringLiteral(s.to_string())
206 }
207 DataValue::Boolean(b) => SqlExpression::BooleanLiteral(b),
208 DataValue::DateTime(dt) => SqlExpression::StringLiteral(dt),
209 DataValue::Vector(v) => {
210 let components: Vec<String> =
211 v.iter().map(|f| f.to_string()).collect();
212 SqlExpression::StringLiteral(format!("[{}]", components.join(",")))
213 }
214 })
215 .collect(),
216 })
217 }
218
219 SqlExpression::BinaryOp { left, op, right } => Ok(SqlExpression::BinaryOp {
221 left: Box::new(self.process_expression(left)?),
222 op: op.clone(),
223 right: Box::new(self.process_expression(right)?),
224 }),
225
226 SqlExpression::Between { expr, lower, upper } => Ok(SqlExpression::Between {
229 expr: Box::new(self.process_expression(expr)?),
230 lower: Box::new(self.process_expression(lower)?),
231 upper: Box::new(self.process_expression(upper)?),
232 }),
233
234 SqlExpression::InList { expr, values } => Ok(SqlExpression::InList {
235 expr: Box::new(self.process_expression(expr)?),
236 values: values
237 .iter()
238 .map(|v| self.process_expression(v))
239 .collect::<Result<Vec<_>>>()?,
240 }),
241
242 SqlExpression::NotInList { expr, values } => Ok(SqlExpression::NotInList {
243 expr: Box::new(self.process_expression(expr)?),
244 values: values
245 .iter()
246 .map(|v| self.process_expression(v))
247 .collect::<Result<Vec<_>>>()?,
248 }),
249
250 SqlExpression::FunctionCall {
252 name,
253 args,
254 distinct,
255 } => Ok(SqlExpression::FunctionCall {
256 name: name.clone(),
257 args: args
258 .iter()
259 .map(|a| self.process_expression(a))
260 .collect::<Result<Vec<_>>>()?,
261 distinct: *distinct,
262 }),
263
264 _ => Ok(expr.clone()),
266 }
267 }
268
269 fn execute_scalar_subquery(&mut self, query: &SelectStatement) -> Result<SqlExpression> {
271 let cache_key = format!("scalar:{:?}", query);
272
273 if let Some(cached) = self.cache.get(&cache_key) {
275 debug!("SubqueryExecutor: Using cached scalar subquery result");
276 if let SubqueryResult::Scalar(value) = cached {
277 return Ok(self.datavalue_to_expression(value.clone()));
278 }
279 }
280
281 info!("SubqueryExecutor: Executing scalar subquery");
282
283 let result_view = self.query_engine.execute_statement_with_cte_context(
285 self.source_table.clone(),
286 query.clone(),
287 &self.cte_context,
288 )?;
289
290 if result_view.row_count() != 1 {
292 return Err(anyhow!(
293 "Scalar subquery returned {} rows, expected exactly 1",
294 result_view.row_count()
295 ));
296 }
297
298 if result_view.column_count() != 1 {
299 return Err(anyhow!(
300 "Scalar subquery returned {} columns, expected exactly 1",
301 result_view.column_count()
302 ));
303 }
304
305 let value = if let Some(row) = result_view.get_row(0) {
307 row.values.get(0).cloned().unwrap_or(DataValue::Null)
308 } else {
309 DataValue::Null
310 };
311
312 self.cache
314 .insert(cache_key, SubqueryResult::Scalar(value.clone()));
315
316 Ok(self.datavalue_to_expression(value))
317 }
318
319 fn execute_in_subquery(&mut self, query: &SelectStatement) -> Result<Vec<DataValue>> {
321 let cache_key = format!("in:{:?}", query);
322
323 if let Some(cached) = self.cache.get(&cache_key) {
325 debug!("SubqueryExecutor: Using cached IN subquery result");
326 if let SubqueryResult::ValueSet(values) = cached {
327 return Ok(values.iter().cloned().collect());
328 }
329 }
330
331 info!("SubqueryExecutor: Executing IN subquery");
332 debug!(
333 "SubqueryExecutor: Available CTEs in context: {:?}",
334 self.cte_context.keys().collect::<Vec<_>>()
335 );
336 debug!("SubqueryExecutor: Subquery: {:?}", query);
337
338 let result_view = self.query_engine.execute_statement_with_cte_context(
340 self.source_table.clone(),
341 query.clone(),
342 &self.cte_context,
343 )?;
344
345 debug!(
346 "SubqueryExecutor: IN subquery returned {} rows",
347 result_view.row_count()
348 );
349
350 if result_view.column_count() != 1 {
352 return Err(anyhow!(
353 "IN subquery returned {} columns, expected exactly 1",
354 result_view.column_count()
355 ));
356 }
357
358 let mut values = HashSet::new();
360 for row_idx in 0..result_view.row_count() {
361 if let Some(row) = result_view.get_row(row_idx) {
362 if let Some(value) = row.values.get(0) {
363 values.insert(value.clone());
364 }
365 }
366 }
367
368 self.cache
370 .insert(cache_key, SubqueryResult::ValueSet(values.clone()));
371
372 Ok(values.into_iter().collect())
373 }
374
375 fn datavalue_to_expression(&self, value: DataValue) -> SqlExpression {
377 match value {
378 DataValue::Null => SqlExpression::Null,
379 DataValue::Integer(i) => SqlExpression::NumberLiteral(i.to_string()),
380 DataValue::Float(f) => SqlExpression::NumberLiteral(f.to_string()),
381 DataValue::String(s) => SqlExpression::StringLiteral(s),
382 DataValue::InternedString(s) => SqlExpression::StringLiteral(s.to_string()),
383 DataValue::Boolean(b) => SqlExpression::BooleanLiteral(b),
384 DataValue::DateTime(dt) => SqlExpression::StringLiteral(dt),
385 DataValue::Vector(v) => {
386 let components: Vec<String> = v.iter().map(|f| f.to_string()).collect();
387 SqlExpression::StringLiteral(format!("[{}]", components.join(",")))
388 }
389 }
390 }
391}