1use crate::{ExecutorResult, ExecutorSchema, FieldId};
2use llkv_column_map::ROW_ID_COLUMN_NAME;
3use llkv_expr::expr::{AggregateCall, Expr as LlkvExpr, Filter, Operator, ScalarExpr};
4use llkv_result::{Error, Result as LlkvResult};
5use llkv_table::ROW_ID_FIELD_ID;
6use std::ops::Bound;
7
8pub fn full_table_scan_filter(field_id: FieldId) -> LlkvExpr<'static, FieldId> {
9 LlkvExpr::Pred(Filter {
10 field_id,
11 op: Operator::Range {
12 lower: Bound::Unbounded,
13 upper: Bound::Unbounded,
14 },
15 })
16}
17
18pub fn translate_predicate<F>(
19 expr: LlkvExpr<'static, String>,
20 schema: &ExecutorSchema,
21 unknown_column: F,
22) -> ExecutorResult<LlkvExpr<'static, FieldId>>
23where
24 F: Fn(&str) -> Error + Copy,
25{
26 translate_predicate_with(expr, schema, unknown_column, unknown_column)
27}
28
29pub fn translate_predicate_with<F, G>(
30 expr: LlkvExpr<'static, String>,
31 schema: &ExecutorSchema,
32 unknown_column: F,
33 unknown_aggregate: G,
34) -> ExecutorResult<LlkvExpr<'static, FieldId>>
35where
36 F: Fn(&str) -> Error + Copy,
37 G: Fn(&str) -> Error + Copy,
38{
39 enum PredicateExitContext {
50 And(usize), Or(usize), Not,
53 }
54
55 enum OwnedFrame {
57 Enter(LlkvExpr<'static, String>),
58 Exit(PredicateExitContext),
59 Leaf(LlkvExpr<'static, FieldId>),
60 }
61
62 let mut owned_stack: Vec<OwnedFrame> = vec![OwnedFrame::Enter(expr)];
63 let mut result_stack: Vec<LlkvExpr<'static, FieldId>> = Vec::new();
64
65 while let Some(frame) = owned_stack.pop() {
66 match frame {
67 OwnedFrame::Enter(node) => match node {
68 LlkvExpr::And(children) => {
69 let count = children.len();
70 owned_stack.push(OwnedFrame::Exit(PredicateExitContext::And(count)));
71 for child in children.into_iter().rev() {
72 owned_stack.push(OwnedFrame::Enter(child));
73 }
74 }
75 LlkvExpr::Or(children) => {
76 let count = children.len();
77 owned_stack.push(OwnedFrame::Exit(PredicateExitContext::Or(count)));
78 for child in children.into_iter().rev() {
79 owned_stack.push(OwnedFrame::Enter(child));
80 }
81 }
82 LlkvExpr::Not(inner) => {
83 owned_stack.push(OwnedFrame::Exit(PredicateExitContext::Not));
84 owned_stack.push(OwnedFrame::Enter(*inner));
85 }
86 LlkvExpr::Pred(filter) => {
87 let field_id = resolve_field_id(schema, &filter.field_id, unknown_column)?;
88 owned_stack.push(OwnedFrame::Leaf(LlkvExpr::Pred(Filter {
89 field_id,
90 op: filter.op,
91 })));
92 }
93 LlkvExpr::Compare { left, op, right } => {
94 let left_expr =
95 translate_scalar_with(&left, schema, unknown_column, unknown_aggregate)?;
96 let right_expr =
97 translate_scalar_with(&right, schema, unknown_column, unknown_aggregate)?;
98 owned_stack.push(OwnedFrame::Leaf(LlkvExpr::Compare {
99 left: left_expr,
100 op,
101 right: right_expr,
102 }));
103 }
104 LlkvExpr::InList {
105 expr: target,
106 list,
107 negated,
108 } => {
109 let translated_target =
110 translate_scalar_with(&target, schema, unknown_column, unknown_aggregate)?;
111 let mut translated_list: Vec<ScalarExpr<FieldId>> =
112 Vec::with_capacity(list.len());
113 for value in list {
114 translated_list.push(translate_scalar_with(
115 &value,
116 schema,
117 unknown_column,
118 unknown_aggregate,
119 )?);
120 }
121 owned_stack.push(OwnedFrame::Leaf(LlkvExpr::InList {
122 expr: translated_target,
123 list: translated_list,
124 negated,
125 }));
126 }
127 LlkvExpr::IsNull {
128 expr: target,
129 negated,
130 } => {
131 let translated_target =
132 translate_scalar_with(&target, schema, unknown_column, unknown_aggregate)?;
133 owned_stack.push(OwnedFrame::Leaf(LlkvExpr::IsNull {
134 expr: translated_target,
135 negated,
136 }));
137 }
138 LlkvExpr::Literal(value) => {
139 owned_stack.push(OwnedFrame::Leaf(LlkvExpr::Literal(value)));
140 }
141 LlkvExpr::Exists(subquery) => {
142 owned_stack.push(OwnedFrame::Leaf(LlkvExpr::Exists(subquery)));
143 }
144 },
145 OwnedFrame::Leaf(translated) => {
146 result_stack.push(translated);
147 }
148 OwnedFrame::Exit(exit_context) => match exit_context {
149 PredicateExitContext::And(count) => {
150 let translated: Vec<_> =
151 result_stack.drain(result_stack.len() - count..).collect();
152 result_stack.push(LlkvExpr::And(translated));
153 }
154 PredicateExitContext::Or(count) => {
155 let translated: Vec<_> =
156 result_stack.drain(result_stack.len() - count..).collect();
157 result_stack.push(LlkvExpr::Or(translated));
158 }
159 PredicateExitContext::Not => {
160 let inner = result_stack.pop().ok_or_else(|| {
161 Error::Internal(
162 "translate_predicate_with: result stack underflow for Not".into(),
163 )
164 })?;
165 result_stack.push(LlkvExpr::Not(Box::new(inner)));
166 }
167 },
168 }
169 }
170
171 result_stack
172 .pop()
173 .ok_or_else(|| Error::Internal("translate_predicate_with: empty result stack".into()))
174}
175
176pub fn translate_scalar<F>(
177 expr: &ScalarExpr<String>,
178 schema: &ExecutorSchema,
179 unknown_column: F,
180) -> ExecutorResult<ScalarExpr<FieldId>>
181where
182 F: Fn(&str) -> Error + Copy,
183{
184 translate_scalar_with(expr, schema, unknown_column, unknown_column)
185}
186
187pub fn translate_scalar_with<F, G>(
188 expr: &ScalarExpr<String>,
189 schema: &ExecutorSchema,
190 unknown_column: F,
191 _unknown_aggregate: G,
192) -> ExecutorResult<ScalarExpr<FieldId>>
193where
194 F: Fn(&str) -> Error + Copy,
195 G: Fn(&str) -> Error + Copy,
196{
197 match expr {
198 ScalarExpr::Literal(lit) => Ok(ScalarExpr::Literal(lit.clone())),
199 ScalarExpr::Column(name) => {
200 let field_id = resolve_field_id(schema, name, unknown_column)?;
201 Ok(ScalarExpr::Column(field_id))
202 }
203 ScalarExpr::Binary { left, op, right } => Ok(ScalarExpr::Binary {
204 left: Box::new(translate_scalar_with(
205 left,
206 schema,
207 unknown_column,
208 _unknown_aggregate,
209 )?),
210 op: *op,
211 right: Box::new(translate_scalar_with(
212 right,
213 schema,
214 unknown_column,
215 _unknown_aggregate,
216 )?),
217 }),
218 ScalarExpr::Not(inner) => Ok(ScalarExpr::Not(Box::new(translate_scalar_with(
219 inner,
220 schema,
221 unknown_column,
222 _unknown_aggregate,
223 )?))),
224 ScalarExpr::IsNull { expr, negated } => Ok(ScalarExpr::IsNull {
225 expr: Box::new(translate_scalar_with(
226 expr,
227 schema,
228 unknown_column,
229 _unknown_aggregate,
230 )?),
231 negated: *negated,
232 }),
233 ScalarExpr::Compare { left, op, right } => Ok(ScalarExpr::Compare {
234 left: Box::new(translate_scalar_with(
235 left,
236 schema,
237 unknown_column,
238 _unknown_aggregate,
239 )?),
240 op: *op,
241 right: Box::new(translate_scalar_with(
242 right,
243 schema,
244 unknown_column,
245 _unknown_aggregate,
246 )?),
247 }),
248 ScalarExpr::Aggregate(agg) => {
249 let translated =
250 match agg {
251 AggregateCall::CountStar => AggregateCall::CountStar,
252 AggregateCall::Count { expr, distinct } => AggregateCall::Count {
253 expr: Box::new(translate_scalar_with(
254 expr,
255 schema,
256 unknown_column,
257 _unknown_aggregate,
258 )?),
259 distinct: *distinct,
260 },
261 AggregateCall::Sum { expr, distinct } => AggregateCall::Sum {
262 expr: Box::new(translate_scalar_with(
263 expr,
264 schema,
265 unknown_column,
266 _unknown_aggregate,
267 )?),
268 distinct: *distinct,
269 },
270 AggregateCall::Total { expr, distinct } => AggregateCall::Total {
271 expr: Box::new(translate_scalar_with(
272 expr,
273 schema,
274 unknown_column,
275 _unknown_aggregate,
276 )?),
277 distinct: *distinct,
278 },
279 AggregateCall::Avg { expr, distinct } => AggregateCall::Avg {
280 expr: Box::new(translate_scalar_with(
281 expr,
282 schema,
283 unknown_column,
284 _unknown_aggregate,
285 )?),
286 distinct: *distinct,
287 },
288 AggregateCall::Min(expr) => AggregateCall::Min(Box::new(
289 translate_scalar_with(expr, schema, unknown_column, _unknown_aggregate)?,
290 )),
291 AggregateCall::Max(expr) => AggregateCall::Max(Box::new(
292 translate_scalar_with(expr, schema, unknown_column, _unknown_aggregate)?,
293 )),
294 AggregateCall::CountNulls(expr) => AggregateCall::CountNulls(Box::new(
295 translate_scalar_with(expr, schema, unknown_column, _unknown_aggregate)?,
296 )),
297 AggregateCall::GroupConcat {
298 expr,
299 distinct,
300 separator,
301 } => AggregateCall::GroupConcat {
302 expr: Box::new(translate_scalar_with(
303 expr,
304 schema,
305 unknown_column,
306 _unknown_aggregate,
307 )?),
308 distinct: *distinct,
309 separator: separator.clone(),
310 },
311 };
312 Ok(ScalarExpr::Aggregate(translated))
313 }
314 ScalarExpr::GetField { base, field_name } => Ok(ScalarExpr::GetField {
315 base: Box::new(translate_scalar_with(
316 base,
317 schema,
318 unknown_column,
319 _unknown_aggregate,
320 )?),
321 field_name: field_name.clone(),
322 }),
323 ScalarExpr::Cast { expr, data_type } => Ok(ScalarExpr::Cast {
324 expr: Box::new(translate_scalar_with(
325 expr,
326 schema,
327 unknown_column,
328 _unknown_aggregate,
329 )?),
330 data_type: data_type.clone(),
331 }),
332 ScalarExpr::Case {
333 operand,
334 branches,
335 else_expr,
336 } => {
337 let translated_operand = match operand.as_deref() {
338 Some(inner) => Some(translate_scalar_with(
339 inner,
340 schema,
341 unknown_column,
342 _unknown_aggregate,
343 )?),
344 None => None,
345 };
346
347 let mut translated_branches = Vec::with_capacity(branches.len());
348 for (when_expr, then_expr) in branches {
349 let translated_when =
350 translate_scalar_with(when_expr, schema, unknown_column, _unknown_aggregate)?;
351 let translated_then =
352 translate_scalar_with(then_expr, schema, unknown_column, _unknown_aggregate)?;
353 translated_branches.push((translated_when, translated_then));
354 }
355
356 let translated_else = match else_expr.as_deref() {
357 Some(inner) => Some(translate_scalar_with(
358 inner,
359 schema,
360 unknown_column,
361 _unknown_aggregate,
362 )?),
363 None => None,
364 };
365
366 Ok(ScalarExpr::Case {
367 operand: translated_operand.map(Box::new),
368 branches: translated_branches,
369 else_expr: translated_else.map(Box::new),
370 })
371 }
372 ScalarExpr::Coalesce(items) => {
373 let mut translated_items = Vec::with_capacity(items.len());
374 for item in items {
375 translated_items.push(translate_scalar_with(
376 item,
377 schema,
378 unknown_column,
379 _unknown_aggregate,
380 )?);
381 }
382 Ok(ScalarExpr::Coalesce(translated_items))
383 }
384 ScalarExpr::Random => Ok(ScalarExpr::Random),
385 ScalarExpr::ScalarSubquery(subquery) => Ok(ScalarExpr::ScalarSubquery(subquery.clone())),
386 }
387}
388
389fn resolve_field_id<F>(
391 schema: &ExecutorSchema,
392 name: &str,
393 unknown_column: F,
394) -> ExecutorResult<FieldId>
395where
396 F: Fn(&str) -> Error,
397{
398 if name.eq_ignore_ascii_case(ROW_ID_COLUMN_NAME) {
400 return Ok(ROW_ID_FIELD_ID);
401 }
402
403 schema
404 .resolve(name)
405 .map(|column| column.field_id)
406 .ok_or_else(|| unknown_column(name))
407}
408
409pub fn resolve_field_id_from_schema(schema: &ExecutorSchema, name: &str) -> LlkvResult<FieldId> {
411 if name.eq_ignore_ascii_case(ROW_ID_COLUMN_NAME) {
412 return Ok(ROW_ID_FIELD_ID);
413 }
414
415 schema
416 .resolve(name)
417 .map(|column| column.field_id)
418 .ok_or_else(|| {
419 Error::InvalidArgumentError(format!(
420 "Binder Error: does not have a column named '{name}'"
421 ))
422 })
423}