1use crate::data::data_view::DataView;
5use crate::data::datatable::{DataTable, DataValue};
6use crate::data::query_engine::QueryEngine;
7use crate::sql::parser::ast::{Condition, SelectItem, SelectStatement, SqlExpression, WhereClause};
8use anyhow::{anyhow, Result};
9use std::collections::{HashMap, HashSet};
10use std::sync::Arc;
11use tracing::{debug, info};
12
13#[derive(Debug, Clone)]
15pub enum SubqueryResult {
16 Scalar(DataValue),
18 ValueSet(HashSet<DataValue>),
20 Table(Arc<DataView>),
22}
23
24pub struct SubqueryExecutor {
26 query_engine: QueryEngine,
27 source_table: Arc<DataTable>,
28 cache: HashMap<String, SubqueryResult>,
30 cte_context: HashMap<String, Arc<DataView>>,
32}
33
34impl SubqueryExecutor {
35 pub fn new(query_engine: QueryEngine, source_table: Arc<DataTable>) -> Self {
37 Self {
38 query_engine,
39 source_table,
40 cache: HashMap::new(),
41 cte_context: HashMap::new(),
42 }
43 }
44
45 pub fn with_cte_context(
47 query_engine: QueryEngine,
48 source_table: Arc<DataTable>,
49 cte_context: HashMap<String, Arc<DataView>>,
50 ) -> Self {
51 Self {
52 query_engine,
53 source_table,
54 cache: HashMap::new(),
55 cte_context,
56 }
57 }
58
59 pub fn execute_subqueries(&mut self, statement: &SelectStatement) -> Result<SelectStatement> {
62 debug!("SubqueryExecutor: Starting subquery execution pass");
63
64 let mut modified_statement = statement.clone();
66
67 if let Some(ref where_clause) = statement.where_clause {
69 debug!("SubqueryExecutor: Processing WHERE clause for subqueries");
70 let mut new_conditions = Vec::new();
71 for condition in &where_clause.conditions {
72 new_conditions.push(Condition {
73 expr: self.process_expression(&condition.expr)?,
74 connector: condition.connector.clone(),
75 });
76 }
77 modified_statement.where_clause = Some(WhereClause {
78 conditions: new_conditions,
79 });
80 }
81
82 let mut new_select_items = Vec::new();
84 for item in &statement.select_items {
85 match item {
86 SelectItem::Column(col) => {
87 new_select_items.push(SelectItem::Column(col.clone()));
88 }
89 SelectItem::Expression { expr, alias } => {
90 new_select_items.push(SelectItem::Expression {
91 expr: self.process_expression(expr)?,
92 alias: alias.clone(),
93 });
94 }
95 SelectItem::Star => {
96 new_select_items.push(SelectItem::Star);
97 }
98 }
99 }
100 modified_statement.select_items = new_select_items;
101
102 if let Some(ref having) = statement.having {
104 debug!("SubqueryExecutor: Processing HAVING clause for subqueries");
105 modified_statement.having = Some(self.process_expression(having)?);
106 }
107
108 debug!("SubqueryExecutor: Subquery execution complete");
109 Ok(modified_statement)
110 }
111
112 fn process_expression(&mut self, expr: &SqlExpression) -> Result<SqlExpression> {
114 match expr {
115 SqlExpression::ScalarSubquery { query } => {
116 debug!("SubqueryExecutor: Executing scalar subquery");
117 let result = self.execute_scalar_subquery(query)?;
118 Ok(result)
119 }
120
121 SqlExpression::InSubquery { expr, subquery } => {
122 debug!("SubqueryExecutor: Executing IN subquery");
123 let values = self.execute_in_subquery(subquery)?;
124
125 Ok(SqlExpression::InList {
127 expr: Box::new(self.process_expression(expr)?),
128 values: values
129 .into_iter()
130 .map(|v| match v {
131 DataValue::Null => SqlExpression::Null,
132 DataValue::Integer(i) => SqlExpression::NumberLiteral(i.to_string()),
133 DataValue::Float(f) => SqlExpression::NumberLiteral(f.to_string()),
134 DataValue::String(s) => SqlExpression::StringLiteral(s),
135 DataValue::InternedString(s) => {
136 SqlExpression::StringLiteral(s.to_string())
137 }
138 DataValue::Boolean(b) => SqlExpression::BooleanLiteral(b),
139 DataValue::DateTime(dt) => SqlExpression::StringLiteral(dt),
140 })
141 .collect(),
142 })
143 }
144
145 SqlExpression::NotInSubquery { expr, subquery } => {
146 debug!("SubqueryExecutor: Executing NOT IN subquery");
147 let values = self.execute_in_subquery(subquery)?;
148
149 Ok(SqlExpression::NotInList {
151 expr: Box::new(self.process_expression(expr)?),
152 values: values
153 .into_iter()
154 .map(|v| match v {
155 DataValue::Null => SqlExpression::Null,
156 DataValue::Integer(i) => SqlExpression::NumberLiteral(i.to_string()),
157 DataValue::Float(f) => SqlExpression::NumberLiteral(f.to_string()),
158 DataValue::String(s) => SqlExpression::StringLiteral(s),
159 DataValue::InternedString(s) => {
160 SqlExpression::StringLiteral(s.to_string())
161 }
162 DataValue::Boolean(b) => SqlExpression::BooleanLiteral(b),
163 DataValue::DateTime(dt) => SqlExpression::StringLiteral(dt),
164 })
165 .collect(),
166 })
167 }
168
169 SqlExpression::BinaryOp { left, op, right } => Ok(SqlExpression::BinaryOp {
171 left: Box::new(self.process_expression(left)?),
172 op: op.clone(),
173 right: Box::new(self.process_expression(right)?),
174 }),
175
176 SqlExpression::Between { expr, lower, upper } => Ok(SqlExpression::Between {
179 expr: Box::new(self.process_expression(expr)?),
180 lower: Box::new(self.process_expression(lower)?),
181 upper: Box::new(self.process_expression(upper)?),
182 }),
183
184 SqlExpression::InList { expr, values } => Ok(SqlExpression::InList {
185 expr: Box::new(self.process_expression(expr)?),
186 values: values
187 .iter()
188 .map(|v| self.process_expression(v))
189 .collect::<Result<Vec<_>>>()?,
190 }),
191
192 SqlExpression::NotInList { expr, values } => Ok(SqlExpression::NotInList {
193 expr: Box::new(self.process_expression(expr)?),
194 values: values
195 .iter()
196 .map(|v| self.process_expression(v))
197 .collect::<Result<Vec<_>>>()?,
198 }),
199
200 SqlExpression::FunctionCall {
202 name,
203 args,
204 distinct,
205 } => Ok(SqlExpression::FunctionCall {
206 name: name.clone(),
207 args: args
208 .iter()
209 .map(|a| self.process_expression(a))
210 .collect::<Result<Vec<_>>>()?,
211 distinct: *distinct,
212 }),
213
214 _ => Ok(expr.clone()),
216 }
217 }
218
219 fn execute_scalar_subquery(&mut self, query: &SelectStatement) -> Result<SqlExpression> {
221 let cache_key = format!("scalar:{:?}", query);
222
223 if let Some(cached) = self.cache.get(&cache_key) {
225 debug!("SubqueryExecutor: Using cached scalar subquery result");
226 if let SubqueryResult::Scalar(value) = cached {
227 return Ok(self.datavalue_to_expression(value.clone()));
228 }
229 }
230
231 info!("SubqueryExecutor: Executing scalar subquery");
232
233 let result_view = self.query_engine.execute_statement_with_cte_context(
235 self.source_table.clone(),
236 query.clone(),
237 &self.cte_context,
238 )?;
239
240 if result_view.row_count() != 1 {
242 return Err(anyhow!(
243 "Scalar subquery returned {} rows, expected exactly 1",
244 result_view.row_count()
245 ));
246 }
247
248 if result_view.column_count() != 1 {
249 return Err(anyhow!(
250 "Scalar subquery returned {} columns, expected exactly 1",
251 result_view.column_count()
252 ));
253 }
254
255 let value = if let Some(row) = result_view.get_row(0) {
257 row.values.get(0).cloned().unwrap_or(DataValue::Null)
258 } else {
259 DataValue::Null
260 };
261
262 self.cache
264 .insert(cache_key, SubqueryResult::Scalar(value.clone()));
265
266 Ok(self.datavalue_to_expression(value))
267 }
268
269 fn execute_in_subquery(&mut self, query: &SelectStatement) -> Result<Vec<DataValue>> {
271 let cache_key = format!("in:{:?}", query);
272
273 if let Some(cached) = self.cache.get(&cache_key) {
275 debug!("SubqueryExecutor: Using cached IN subquery result");
276 if let SubqueryResult::ValueSet(values) = cached {
277 return Ok(values.iter().cloned().collect());
278 }
279 }
280
281 info!("SubqueryExecutor: Executing IN subquery");
282
283 let result_view = self.query_engine.execute_statement_with_cte_context(
285 self.source_table.clone(),
286 query.clone(),
287 &self.cte_context,
288 )?;
289
290 if result_view.column_count() != 1 {
292 return Err(anyhow!(
293 "IN subquery returned {} columns, expected exactly 1",
294 result_view.column_count()
295 ));
296 }
297
298 let mut values = HashSet::new();
300 for row_idx in 0..result_view.row_count() {
301 if let Some(row) = result_view.get_row(row_idx) {
302 if let Some(value) = row.values.get(0) {
303 values.insert(value.clone());
304 }
305 }
306 }
307
308 self.cache
310 .insert(cache_key, SubqueryResult::ValueSet(values.clone()));
311
312 Ok(values.into_iter().collect())
313 }
314
315 fn datavalue_to_expression(&self, value: DataValue) -> SqlExpression {
317 match value {
318 DataValue::Null => SqlExpression::Null,
319 DataValue::Integer(i) => SqlExpression::NumberLiteral(i.to_string()),
320 DataValue::Float(f) => SqlExpression::NumberLiteral(f.to_string()),
321 DataValue::String(s) => SqlExpression::StringLiteral(s),
322 DataValue::InternedString(s) => SqlExpression::StringLiteral(s.to_string()),
323 DataValue::Boolean(b) => SqlExpression::BooleanLiteral(b),
324 DataValue::DateTime(dt) => SqlExpression::StringLiteral(dt),
325 }
326 }
327}