1use crate::data::data_view::DataView;
5use crate::data::datatable::{DataTable, DataValue};
6use crate::data::query_engine::QueryEngine;
7use crate::sql::parser::ast::{Condition, SelectItem, SelectStatement, SqlExpression, WhereClause};
8use anyhow::{anyhow, Result};
9use std::collections::{HashMap, HashSet};
10use std::sync::Arc;
11use tracing::{debug, info};
12
13#[derive(Debug, Clone)]
15pub enum SubqueryResult {
16 Scalar(DataValue),
18 ValueSet(HashSet<DataValue>),
20 Table(Arc<DataView>),
22}
23
24pub struct SubqueryExecutor {
26 query_engine: QueryEngine,
27 source_table: Arc<DataTable>,
28 cache: HashMap<String, SubqueryResult>,
30 cte_context: HashMap<String, Arc<DataView>>,
32}
33
34impl SubqueryExecutor {
35 pub fn new(query_engine: QueryEngine, source_table: Arc<DataTable>) -> Self {
37 Self {
38 query_engine,
39 source_table,
40 cache: HashMap::new(),
41 cte_context: HashMap::new(),
42 }
43 }
44
45 pub fn with_cte_context(
47 query_engine: QueryEngine,
48 source_table: Arc<DataTable>,
49 cte_context: HashMap<String, Arc<DataView>>,
50 ) -> Self {
51 Self {
52 query_engine,
53 source_table,
54 cache: HashMap::new(),
55 cte_context,
56 }
57 }
58
59 pub fn execute_subqueries(&mut self, statement: &SelectStatement) -> Result<SelectStatement> {
62 info!("SubqueryExecutor: Starting subquery execution pass");
63 info!(
64 "SubqueryExecutor: Available CTEs: {:?}",
65 self.cte_context.keys().collect::<Vec<_>>()
66 );
67
68 let mut modified_statement = statement.clone();
70
71 if let Some(ref where_clause) = statement.where_clause {
73 debug!("SubqueryExecutor: Processing WHERE clause for subqueries");
74 let mut new_conditions = Vec::new();
75 for condition in &where_clause.conditions {
76 new_conditions.push(Condition {
77 expr: self.process_expression(&condition.expr)?,
78 connector: condition.connector.clone(),
79 });
80 }
81 modified_statement.where_clause = Some(WhereClause {
82 conditions: new_conditions,
83 });
84 }
85
86 let mut new_select_items = Vec::new();
88 for item in &statement.select_items {
89 match item {
90 SelectItem::Column(col) => {
91 new_select_items.push(SelectItem::Column(col.clone()));
92 }
93 SelectItem::Expression { expr, alias } => {
94 new_select_items.push(SelectItem::Expression {
95 expr: self.process_expression(expr)?,
96 alias: alias.clone(),
97 });
98 }
99 SelectItem::Star => {
100 new_select_items.push(SelectItem::Star);
101 }
102 }
103 }
104 modified_statement.select_items = new_select_items;
105
106 if let Some(ref having) = statement.having {
108 debug!("SubqueryExecutor: Processing HAVING clause for subqueries");
109 modified_statement.having = Some(self.process_expression(having)?);
110 }
111
112 debug!("SubqueryExecutor: Subquery execution complete");
113 Ok(modified_statement)
114 }
115
116 fn process_expression(&mut self, expr: &SqlExpression) -> Result<SqlExpression> {
118 match expr {
119 SqlExpression::ScalarSubquery { query } => {
120 debug!("SubqueryExecutor: Executing scalar subquery");
121 let result = self.execute_scalar_subquery(query)?;
122 Ok(result)
123 }
124
125 SqlExpression::InSubquery { expr, subquery } => {
126 debug!("SubqueryExecutor: Executing IN subquery");
127 let values = self.execute_in_subquery(subquery)?;
128
129 Ok(SqlExpression::InList {
131 expr: Box::new(self.process_expression(expr)?),
132 values: values
133 .into_iter()
134 .map(|v| match v {
135 DataValue::Null => SqlExpression::Null,
136 DataValue::Integer(i) => SqlExpression::NumberLiteral(i.to_string()),
137 DataValue::Float(f) => SqlExpression::NumberLiteral(f.to_string()),
138 DataValue::String(s) => SqlExpression::StringLiteral(s),
139 DataValue::InternedString(s) => {
140 SqlExpression::StringLiteral(s.to_string())
141 }
142 DataValue::Boolean(b) => SqlExpression::BooleanLiteral(b),
143 DataValue::DateTime(dt) => SqlExpression::StringLiteral(dt),
144 })
145 .collect(),
146 })
147 }
148
149 SqlExpression::NotInSubquery { expr, subquery } => {
150 debug!("SubqueryExecutor: Executing NOT IN subquery");
151 let values = self.execute_in_subquery(subquery)?;
152
153 Ok(SqlExpression::NotInList {
155 expr: Box::new(self.process_expression(expr)?),
156 values: values
157 .into_iter()
158 .map(|v| match v {
159 DataValue::Null => SqlExpression::Null,
160 DataValue::Integer(i) => SqlExpression::NumberLiteral(i.to_string()),
161 DataValue::Float(f) => SqlExpression::NumberLiteral(f.to_string()),
162 DataValue::String(s) => SqlExpression::StringLiteral(s),
163 DataValue::InternedString(s) => {
164 SqlExpression::StringLiteral(s.to_string())
165 }
166 DataValue::Boolean(b) => SqlExpression::BooleanLiteral(b),
167 DataValue::DateTime(dt) => SqlExpression::StringLiteral(dt),
168 })
169 .collect(),
170 })
171 }
172
173 SqlExpression::BinaryOp { left, op, right } => Ok(SqlExpression::BinaryOp {
175 left: Box::new(self.process_expression(left)?),
176 op: op.clone(),
177 right: Box::new(self.process_expression(right)?),
178 }),
179
180 SqlExpression::Between { expr, lower, upper } => Ok(SqlExpression::Between {
183 expr: Box::new(self.process_expression(expr)?),
184 lower: Box::new(self.process_expression(lower)?),
185 upper: Box::new(self.process_expression(upper)?),
186 }),
187
188 SqlExpression::InList { expr, values } => Ok(SqlExpression::InList {
189 expr: Box::new(self.process_expression(expr)?),
190 values: values
191 .iter()
192 .map(|v| self.process_expression(v))
193 .collect::<Result<Vec<_>>>()?,
194 }),
195
196 SqlExpression::NotInList { expr, values } => Ok(SqlExpression::NotInList {
197 expr: Box::new(self.process_expression(expr)?),
198 values: values
199 .iter()
200 .map(|v| self.process_expression(v))
201 .collect::<Result<Vec<_>>>()?,
202 }),
203
204 SqlExpression::FunctionCall {
206 name,
207 args,
208 distinct,
209 } => Ok(SqlExpression::FunctionCall {
210 name: name.clone(),
211 args: args
212 .iter()
213 .map(|a| self.process_expression(a))
214 .collect::<Result<Vec<_>>>()?,
215 distinct: *distinct,
216 }),
217
218 _ => Ok(expr.clone()),
220 }
221 }
222
223 fn execute_scalar_subquery(&mut self, query: &SelectStatement) -> Result<SqlExpression> {
225 let cache_key = format!("scalar:{:?}", query);
226
227 if let Some(cached) = self.cache.get(&cache_key) {
229 debug!("SubqueryExecutor: Using cached scalar subquery result");
230 if let SubqueryResult::Scalar(value) = cached {
231 return Ok(self.datavalue_to_expression(value.clone()));
232 }
233 }
234
235 info!("SubqueryExecutor: Executing scalar subquery");
236
237 let result_view = self.query_engine.execute_statement_with_cte_context(
239 self.source_table.clone(),
240 query.clone(),
241 &self.cte_context,
242 )?;
243
244 if result_view.row_count() != 1 {
246 return Err(anyhow!(
247 "Scalar subquery returned {} rows, expected exactly 1",
248 result_view.row_count()
249 ));
250 }
251
252 if result_view.column_count() != 1 {
253 return Err(anyhow!(
254 "Scalar subquery returned {} columns, expected exactly 1",
255 result_view.column_count()
256 ));
257 }
258
259 let value = if let Some(row) = result_view.get_row(0) {
261 row.values.get(0).cloned().unwrap_or(DataValue::Null)
262 } else {
263 DataValue::Null
264 };
265
266 self.cache
268 .insert(cache_key, SubqueryResult::Scalar(value.clone()));
269
270 Ok(self.datavalue_to_expression(value))
271 }
272
273 fn execute_in_subquery(&mut self, query: &SelectStatement) -> Result<Vec<DataValue>> {
275 let cache_key = format!("in:{:?}", query);
276
277 if let Some(cached) = self.cache.get(&cache_key) {
279 debug!("SubqueryExecutor: Using cached IN subquery result");
280 if let SubqueryResult::ValueSet(values) = cached {
281 return Ok(values.iter().cloned().collect());
282 }
283 }
284
285 info!("SubqueryExecutor: Executing IN subquery");
286 debug!(
287 "SubqueryExecutor: Available CTEs in context: {:?}",
288 self.cte_context.keys().collect::<Vec<_>>()
289 );
290 debug!("SubqueryExecutor: Subquery: {:?}", query);
291
292 let result_view = self.query_engine.execute_statement_with_cte_context(
294 self.source_table.clone(),
295 query.clone(),
296 &self.cte_context,
297 )?;
298
299 debug!(
300 "SubqueryExecutor: IN subquery returned {} rows",
301 result_view.row_count()
302 );
303
304 if result_view.column_count() != 1 {
306 return Err(anyhow!(
307 "IN subquery returned {} columns, expected exactly 1",
308 result_view.column_count()
309 ));
310 }
311
312 let mut values = HashSet::new();
314 for row_idx in 0..result_view.row_count() {
315 if let Some(row) = result_view.get_row(row_idx) {
316 if let Some(value) = row.values.get(0) {
317 values.insert(value.clone());
318 }
319 }
320 }
321
322 self.cache
324 .insert(cache_key, SubqueryResult::ValueSet(values.clone()));
325
326 Ok(values.into_iter().collect())
327 }
328
329 fn datavalue_to_expression(&self, value: DataValue) -> SqlExpression {
331 match value {
332 DataValue::Null => SqlExpression::Null,
333 DataValue::Integer(i) => SqlExpression::NumberLiteral(i.to_string()),
334 DataValue::Float(f) => SqlExpression::NumberLiteral(f.to_string()),
335 DataValue::String(s) => SqlExpression::StringLiteral(s),
336 DataValue::InternedString(s) => SqlExpression::StringLiteral(s.to_string()),
337 DataValue::Boolean(b) => SqlExpression::BooleanLiteral(b),
338 DataValue::DateTime(dt) => SqlExpression::StringLiteral(dt),
339 }
340 }
341}