1use sqlparser::ast::{self, Select};
4
5use crate::error::{Result, SqlError};
6use crate::functions::registry::FunctionRegistry;
7use crate::parser::normalize::normalize_ident;
8use crate::resolver::columns::TableScope;
9use crate::resolver::expr::convert_expr;
10use crate::types::*;
11
12pub fn plan_join_from_select(
14 select: &Select,
15 scope: &TableScope,
16 catalog: &dyn SqlCatalog,
17 functions: &FunctionRegistry,
18) -> Result<Option<SqlPlan>> {
19 let from = &select.from[0];
20 let left_table = scope
21 .tables
22 .values()
23 .find(|t| {
24 let rel_name =
25 crate::parser::normalize::table_name_from_factor(&from.relation).map(|(n, _)| n);
26 rel_name.as_deref() == Some(&t.name) || rel_name.as_deref() == t.alias.as_deref()
27 })
28 .ok_or_else(|| SqlError::Unsupported {
29 detail: "cannot resolve left table in JOIN".into(),
30 })?;
31
32 let left_plan = SqlPlan::Scan {
34 collection: left_table.name.clone(),
35 alias: left_table.alias.clone(),
36 engine: left_table.info.engine,
37 filters: Vec::new(),
38 projection: Vec::new(),
39 sort_keys: Vec::new(),
40 limit: None,
41 offset: 0,
42 distinct: false,
43 window_functions: Vec::new(),
44 };
45
46 let mut current_plan = left_plan;
47
48 for join_item in &from.joins {
49 let (right_name, right_alias) = crate::parser::normalize::table_name_from_factor(
50 &join_item.relation,
51 )
52 .ok_or_else(|| SqlError::Unsupported {
53 detail: "non-table JOIN target".into(),
54 })?;
55 let right_table = scope
56 .tables
57 .values()
58 .find(|t| t.name == right_name)
59 .ok_or_else(|| SqlError::UnknownTable {
60 name: right_name.clone(),
61 })?;
62
63 let right_plan = SqlPlan::Scan {
64 collection: right_table.name.clone(),
65 alias: right_alias,
66 engine: right_table.info.engine,
67 filters: Vec::new(),
68 projection: Vec::new(),
69 sort_keys: Vec::new(),
70 limit: None,
71 offset: 0,
72 distinct: false,
73 window_functions: Vec::new(),
74 };
75
76 let (join_type, on_keys, condition) = extract_join_spec(&join_item.join_operator)?;
77
78 current_plan = SqlPlan::Join {
79 left: Box::new(current_plan),
80 right: Box::new(right_plan),
81 on: on_keys,
82 join_type,
83 condition,
84 limit: 10000,
85 projection: Vec::new(),
86 filters: Vec::new(),
87 };
88 }
89
90 let (subquery_joins, effective_where) = if let Some(expr) = &select.selection {
92 let extraction = super::subquery::extract_subqueries(expr, catalog, functions)?;
93 (extraction.joins, extraction.remaining_where)
94 } else {
95 (Vec::new(), None)
96 };
97
98 let projection = super::select::convert_projection(&select.projection)?;
100 let filters = match &effective_where {
101 Some(expr) => super::select::convert_where_to_filters(expr)?,
102 None => Vec::new(),
103 };
104
105 for sq in subquery_joins {
107 current_plan = SqlPlan::Join {
108 left: Box::new(current_plan),
109 right: Box::new(sq.inner_plan),
110 on: vec![(sq.outer_column, sq.inner_column)],
111 join_type: sq.join_type,
112 condition: None,
113 limit: 10000,
114 projection: Vec::new(),
115 filters: Vec::new(),
116 };
117 }
118
119 let group_by_non_empty = match &select.group_by {
121 ast::GroupByExpr::All(_) => true,
122 ast::GroupByExpr::Expressions(exprs, _) => !exprs.is_empty(),
123 };
124 if super::select::convert_projection(&select.projection).is_ok() && group_by_non_empty {
125 let aggregates =
126 super::aggregate::extract_aggregates_from_projection(&select.projection, functions)?;
127 let group_by = super::aggregate::convert_group_by(&select.group_by)?;
128 let having = match &select.having {
129 Some(expr) => super::select::convert_where_to_filters(expr)?,
130 None => Vec::new(),
131 };
132 return Ok(Some(SqlPlan::Aggregate {
133 input: Box::new(current_plan),
134 group_by,
135 aggregates,
136 having,
137 limit: 10000,
138 }));
139 }
140
141 if let SqlPlan::Join {
143 projection: ref mut proj,
144 filters: ref mut filt,
145 ..
146 } = current_plan
147 {
148 *proj = projection;
149 *filt = filters;
150 }
151 Ok(Some(current_plan))
152}
153
154type JoinSpec = (JoinType, Vec<(String, String)>, Option<SqlExpr>);
156
157fn extract_join_spec(op: &ast::JoinOperator) -> Result<JoinSpec> {
159 match op {
160 ast::JoinOperator::Inner(constraint) | ast::JoinOperator::Join(constraint) => {
161 let (keys, cond) = extract_join_constraint(constraint)?;
162 Ok((JoinType::Inner, keys, cond))
163 }
164 ast::JoinOperator::Left(constraint) | ast::JoinOperator::LeftOuter(constraint) => {
165 let (keys, cond) = extract_join_constraint(constraint)?;
166 Ok((JoinType::Left, keys, cond))
167 }
168 ast::JoinOperator::Right(constraint) | ast::JoinOperator::RightOuter(constraint) => {
169 let (keys, cond) = extract_join_constraint(constraint)?;
170 Ok((JoinType::Right, keys, cond))
171 }
172 ast::JoinOperator::FullOuter(constraint) => {
173 let (keys, cond) = extract_join_constraint(constraint)?;
174 Ok((JoinType::Full, keys, cond))
175 }
176 ast::JoinOperator::CrossJoin(constraint) => {
177 let (keys, cond) = extract_join_constraint(constraint)?;
178 Ok((JoinType::Cross, keys, cond))
179 }
180 ast::JoinOperator::Semi(constraint) | ast::JoinOperator::LeftSemi(constraint) => {
181 let (keys, cond) = extract_join_constraint(constraint)?;
182 Ok((JoinType::Semi, keys, cond))
183 }
184 ast::JoinOperator::Anti(constraint) | ast::JoinOperator::LeftAnti(constraint) => {
185 let (keys, cond) = extract_join_constraint(constraint)?;
186 Ok((JoinType::Anti, keys, cond))
187 }
188 _ => Err(SqlError::Unsupported {
189 detail: format!("join type: {op:?}"),
190 }),
191 }
192}
193
194type JoinConstraintResult = (Vec<(String, String)>, Option<SqlExpr>);
196
197fn extract_join_constraint(constraint: &ast::JoinConstraint) -> Result<JoinConstraintResult> {
199 match constraint {
200 ast::JoinConstraint::On(expr) => {
201 let mut keys = Vec::new();
202 let mut non_equi = Vec::new();
203 extract_equi_keys(expr, &mut keys, &mut non_equi)?;
204 let cond = if non_equi.is_empty() {
205 None
206 } else {
207 Some(convert_expr(non_equi.first().unwrap())?)
208 };
209 Ok((keys, cond))
210 }
211 ast::JoinConstraint::Using(columns) => {
212 let keys = columns
213 .iter()
214 .map(|c| {
215 let name = crate::parser::normalize::normalize_object_name(c);
216 (name.clone(), name)
217 })
218 .collect();
219 Ok((keys, None))
220 }
221 ast::JoinConstraint::Natural => Ok((Vec::new(), None)),
222 ast::JoinConstraint::None => Ok((Vec::new(), None)),
223 }
224}
225
226fn extract_equi_keys(
228 expr: &ast::Expr,
229 keys: &mut Vec<(String, String)>,
230 non_equi: &mut Vec<ast::Expr>,
231) -> Result<()> {
232 match expr {
233 ast::Expr::BinaryOp {
234 left,
235 op: ast::BinaryOperator::And,
236 right,
237 } => {
238 extract_equi_keys(left, keys, non_equi)?;
239 extract_equi_keys(right, keys, non_equi)?;
240 }
241 ast::Expr::BinaryOp {
242 left,
243 op: ast::BinaryOperator::Eq,
244 right,
245 } => {
246 if let (Some(l), Some(r)) = (extract_col_ref(left), extract_col_ref(right)) {
247 keys.push((l, r));
248 } else {
249 non_equi.push(expr.clone());
250 }
251 }
252 _ => {
253 non_equi.push(expr.clone());
254 }
255 }
256 Ok(())
257}
258
259fn extract_col_ref(expr: &ast::Expr) -> Option<String> {
261 match expr {
262 ast::Expr::Identifier(ident) => Some(normalize_ident(ident)),
263 ast::Expr::CompoundIdentifier(parts) if !parts.is_empty() => Some(
264 parts
265 .iter()
266 .map(normalize_ident)
267 .collect::<Vec<_>>()
268 .join("."),
269 ),
270 _ => None,
271 }
272}