llkv_executor/translation/
expression.rs

1use crate::{ExecutorResult, ExecutorSchema, FieldId};
2use llkv_column_map::ROW_ID_COLUMN_NAME;
3use llkv_expr::expr::{AggregateCall, Expr as LlkvExpr, Filter, Operator, ScalarExpr};
4use llkv_result::{Error, Result as LlkvResult};
5use llkv_table::ROW_ID_FIELD_ID;
6use std::ops::Bound;
7
8pub fn full_table_scan_filter(field_id: FieldId) -> LlkvExpr<'static, FieldId> {
9    LlkvExpr::Pred(Filter {
10        field_id,
11        op: Operator::Range {
12            lower: Bound::Unbounded,
13            upper: Bound::Unbounded,
14        },
15    })
16}
17
18pub fn translate_predicate<F>(
19    expr: LlkvExpr<'static, String>,
20    schema: &ExecutorSchema,
21    unknown_column: F,
22) -> ExecutorResult<LlkvExpr<'static, FieldId>>
23where
24    F: Fn(&str) -> Error + Copy,
25{
26    translate_predicate_with(expr, schema, unknown_column, unknown_column)
27}
28
29pub fn translate_predicate_with<F, G>(
30    expr: LlkvExpr<'static, String>,
31    schema: &ExecutorSchema,
32    unknown_column: F,
33    unknown_aggregate: G,
34) -> ExecutorResult<LlkvExpr<'static, FieldId>>
35where
36    F: Fn(&str) -> Error + Copy,
37    G: Fn(&str) -> Error + Copy,
38{
39    // Iterative postorder traversal using a Frame-based pattern.
40    // See llkv-plan::traversal module documentation for pattern details.
41    //
42    // This avoids stack overflow on deeply nested expressions (50k+ nodes) by using
43    // explicit work_stack and result_stack instead of recursion.
44    //
45    // Note: We use a manual OwnedFrame enum here instead of TransformFrame because
46    // this function takes ownership of the input expr, requiring owned values on the stack.
47
48    /// Context passed through Exit frames during predicate translation
49    enum PredicateExitContext {
50        And(usize), // child count
51        Or(usize),  // child count
52        Not,
53    }
54
55    /// Frame enum for owned value traversal
56    enum OwnedFrame {
57        Enter(LlkvExpr<'static, String>),
58        Exit(PredicateExitContext),
59        Leaf(LlkvExpr<'static, FieldId>),
60    }
61
62    let mut owned_stack: Vec<OwnedFrame> = vec![OwnedFrame::Enter(expr)];
63    let mut result_stack: Vec<LlkvExpr<'static, FieldId>> = Vec::new();
64
65    while let Some(frame) = owned_stack.pop() {
66        match frame {
67            OwnedFrame::Enter(node) => match node {
68                LlkvExpr::And(children) => {
69                    let count = children.len();
70                    owned_stack.push(OwnedFrame::Exit(PredicateExitContext::And(count)));
71                    for child in children.into_iter().rev() {
72                        owned_stack.push(OwnedFrame::Enter(child));
73                    }
74                }
75                LlkvExpr::Or(children) => {
76                    let count = children.len();
77                    owned_stack.push(OwnedFrame::Exit(PredicateExitContext::Or(count)));
78                    for child in children.into_iter().rev() {
79                        owned_stack.push(OwnedFrame::Enter(child));
80                    }
81                }
82                LlkvExpr::Not(inner) => {
83                    owned_stack.push(OwnedFrame::Exit(PredicateExitContext::Not));
84                    owned_stack.push(OwnedFrame::Enter(*inner));
85                }
86                LlkvExpr::Pred(filter) => {
87                    let field_id = resolve_field_id(schema, &filter.field_id, unknown_column)?;
88                    owned_stack.push(OwnedFrame::Leaf(LlkvExpr::Pred(Filter {
89                        field_id,
90                        op: filter.op,
91                    })));
92                }
93                LlkvExpr::Compare { left, op, right } => {
94                    let left_expr =
95                        translate_scalar_with(&left, schema, unknown_column, unknown_aggregate)?;
96                    let right_expr =
97                        translate_scalar_with(&right, schema, unknown_column, unknown_aggregate)?;
98                    owned_stack.push(OwnedFrame::Leaf(LlkvExpr::Compare {
99                        left: left_expr,
100                        op,
101                        right: right_expr,
102                    }));
103                }
104                LlkvExpr::InList {
105                    expr: target,
106                    list,
107                    negated,
108                } => {
109                    let translated_target =
110                        translate_scalar_with(&target, schema, unknown_column, unknown_aggregate)?;
111                    let mut translated_list: Vec<ScalarExpr<FieldId>> =
112                        Vec::with_capacity(list.len());
113                    for value in list {
114                        translated_list.push(translate_scalar_with(
115                            &value,
116                            schema,
117                            unknown_column,
118                            unknown_aggregate,
119                        )?);
120                    }
121                    owned_stack.push(OwnedFrame::Leaf(LlkvExpr::InList {
122                        expr: translated_target,
123                        list: translated_list,
124                        negated,
125                    }));
126                }
127                LlkvExpr::IsNull {
128                    expr: target,
129                    negated,
130                } => {
131                    let translated_target =
132                        translate_scalar_with(&target, schema, unknown_column, unknown_aggregate)?;
133                    owned_stack.push(OwnedFrame::Leaf(LlkvExpr::IsNull {
134                        expr: translated_target,
135                        negated,
136                    }));
137                }
138                LlkvExpr::Literal(value) => {
139                    owned_stack.push(OwnedFrame::Leaf(LlkvExpr::Literal(value)));
140                }
141                LlkvExpr::Exists(subquery) => {
142                    owned_stack.push(OwnedFrame::Leaf(LlkvExpr::Exists(subquery)));
143                }
144            },
145            OwnedFrame::Leaf(translated) => {
146                result_stack.push(translated);
147            }
148            OwnedFrame::Exit(exit_context) => match exit_context {
149                PredicateExitContext::And(count) => {
150                    let translated: Vec<_> =
151                        result_stack.drain(result_stack.len() - count..).collect();
152                    result_stack.push(LlkvExpr::And(translated));
153                }
154                PredicateExitContext::Or(count) => {
155                    let translated: Vec<_> =
156                        result_stack.drain(result_stack.len() - count..).collect();
157                    result_stack.push(LlkvExpr::Or(translated));
158                }
159                PredicateExitContext::Not => {
160                    let inner = result_stack.pop().ok_or_else(|| {
161                        Error::Internal(
162                            "translate_predicate_with: result stack underflow for Not".into(),
163                        )
164                    })?;
165                    result_stack.push(LlkvExpr::Not(Box::new(inner)));
166                }
167            },
168        }
169    }
170
171    result_stack
172        .pop()
173        .ok_or_else(|| Error::Internal("translate_predicate_with: empty result stack".into()))
174}
175
176pub fn translate_scalar<F>(
177    expr: &ScalarExpr<String>,
178    schema: &ExecutorSchema,
179    unknown_column: F,
180) -> ExecutorResult<ScalarExpr<FieldId>>
181where
182    F: Fn(&str) -> Error + Copy,
183{
184    translate_scalar_with(expr, schema, unknown_column, unknown_column)
185}
186
187pub fn translate_scalar_with<F, G>(
188    expr: &ScalarExpr<String>,
189    schema: &ExecutorSchema,
190    unknown_column: F,
191    _unknown_aggregate: G,
192) -> ExecutorResult<ScalarExpr<FieldId>>
193where
194    F: Fn(&str) -> Error + Copy,
195    G: Fn(&str) -> Error + Copy,
196{
197    match expr {
198        ScalarExpr::Literal(lit) => Ok(ScalarExpr::Literal(lit.clone())),
199        ScalarExpr::Column(name) => {
200            let field_id = resolve_field_id(schema, name, unknown_column)?;
201            Ok(ScalarExpr::Column(field_id))
202        }
203        ScalarExpr::Binary { left, op, right } => Ok(ScalarExpr::Binary {
204            left: Box::new(translate_scalar_with(
205                left,
206                schema,
207                unknown_column,
208                _unknown_aggregate,
209            )?),
210            op: *op,
211            right: Box::new(translate_scalar_with(
212                right,
213                schema,
214                unknown_column,
215                _unknown_aggregate,
216            )?),
217        }),
218        ScalarExpr::Not(inner) => Ok(ScalarExpr::Not(Box::new(translate_scalar_with(
219            inner,
220            schema,
221            unknown_column,
222            _unknown_aggregate,
223        )?))),
224        ScalarExpr::IsNull { expr, negated } => Ok(ScalarExpr::IsNull {
225            expr: Box::new(translate_scalar_with(
226                expr,
227                schema,
228                unknown_column,
229                _unknown_aggregate,
230            )?),
231            negated: *negated,
232        }),
233        ScalarExpr::Compare { left, op, right } => Ok(ScalarExpr::Compare {
234            left: Box::new(translate_scalar_with(
235                left,
236                schema,
237                unknown_column,
238                _unknown_aggregate,
239            )?),
240            op: *op,
241            right: Box::new(translate_scalar_with(
242                right,
243                schema,
244                unknown_column,
245                _unknown_aggregate,
246            )?),
247        }),
248        ScalarExpr::Aggregate(agg) => {
249            let translated =
250                match agg {
251                    AggregateCall::CountStar => AggregateCall::CountStar,
252                    AggregateCall::Count { expr, distinct } => AggregateCall::Count {
253                        expr: Box::new(translate_scalar_with(
254                            expr,
255                            schema,
256                            unknown_column,
257                            _unknown_aggregate,
258                        )?),
259                        distinct: *distinct,
260                    },
261                    AggregateCall::Sum { expr, distinct } => AggregateCall::Sum {
262                        expr: Box::new(translate_scalar_with(
263                            expr,
264                            schema,
265                            unknown_column,
266                            _unknown_aggregate,
267                        )?),
268                        distinct: *distinct,
269                    },
270                    AggregateCall::Total { expr, distinct } => AggregateCall::Total {
271                        expr: Box::new(translate_scalar_with(
272                            expr,
273                            schema,
274                            unknown_column,
275                            _unknown_aggregate,
276                        )?),
277                        distinct: *distinct,
278                    },
279                    AggregateCall::Avg { expr, distinct } => AggregateCall::Avg {
280                        expr: Box::new(translate_scalar_with(
281                            expr,
282                            schema,
283                            unknown_column,
284                            _unknown_aggregate,
285                        )?),
286                        distinct: *distinct,
287                    },
288                    AggregateCall::Min(expr) => AggregateCall::Min(Box::new(
289                        translate_scalar_with(expr, schema, unknown_column, _unknown_aggregate)?,
290                    )),
291                    AggregateCall::Max(expr) => AggregateCall::Max(Box::new(
292                        translate_scalar_with(expr, schema, unknown_column, _unknown_aggregate)?,
293                    )),
294                    AggregateCall::CountNulls(expr) => AggregateCall::CountNulls(Box::new(
295                        translate_scalar_with(expr, schema, unknown_column, _unknown_aggregate)?,
296                    )),
297                    AggregateCall::GroupConcat {
298                        expr,
299                        distinct,
300                        separator,
301                    } => AggregateCall::GroupConcat {
302                        expr: Box::new(translate_scalar_with(
303                            expr,
304                            schema,
305                            unknown_column,
306                            _unknown_aggregate,
307                        )?),
308                        distinct: *distinct,
309                        separator: separator.clone(),
310                    },
311                };
312            Ok(ScalarExpr::Aggregate(translated))
313        }
314        ScalarExpr::GetField { base, field_name } => Ok(ScalarExpr::GetField {
315            base: Box::new(translate_scalar_with(
316                base,
317                schema,
318                unknown_column,
319                _unknown_aggregate,
320            )?),
321            field_name: field_name.clone(),
322        }),
323        ScalarExpr::Cast { expr, data_type } => Ok(ScalarExpr::Cast {
324            expr: Box::new(translate_scalar_with(
325                expr,
326                schema,
327                unknown_column,
328                _unknown_aggregate,
329            )?),
330            data_type: data_type.clone(),
331        }),
332        ScalarExpr::Case {
333            operand,
334            branches,
335            else_expr,
336        } => {
337            let translated_operand = match operand.as_deref() {
338                Some(inner) => Some(translate_scalar_with(
339                    inner,
340                    schema,
341                    unknown_column,
342                    _unknown_aggregate,
343                )?),
344                None => None,
345            };
346
347            let mut translated_branches = Vec::with_capacity(branches.len());
348            for (when_expr, then_expr) in branches {
349                let translated_when =
350                    translate_scalar_with(when_expr, schema, unknown_column, _unknown_aggregate)?;
351                let translated_then =
352                    translate_scalar_with(then_expr, schema, unknown_column, _unknown_aggregate)?;
353                translated_branches.push((translated_when, translated_then));
354            }
355
356            let translated_else = match else_expr.as_deref() {
357                Some(inner) => Some(translate_scalar_with(
358                    inner,
359                    schema,
360                    unknown_column,
361                    _unknown_aggregate,
362                )?),
363                None => None,
364            };
365
366            Ok(ScalarExpr::Case {
367                operand: translated_operand.map(Box::new),
368                branches: translated_branches,
369                else_expr: translated_else.map(Box::new),
370            })
371        }
372        ScalarExpr::Coalesce(items) => {
373            let mut translated_items = Vec::with_capacity(items.len());
374            for item in items {
375                translated_items.push(translate_scalar_with(
376                    item,
377                    schema,
378                    unknown_column,
379                    _unknown_aggregate,
380                )?);
381            }
382            Ok(ScalarExpr::Coalesce(translated_items))
383        }
384        ScalarExpr::Random => Ok(ScalarExpr::Random),
385        ScalarExpr::ScalarSubquery(subquery) => Ok(ScalarExpr::ScalarSubquery(subquery.clone())),
386    }
387}
388
389// TODO: Move to `resolvers.rs`
390fn resolve_field_id<F>(
391    schema: &ExecutorSchema,
392    name: &str,
393    unknown_column: F,
394) -> ExecutorResult<FieldId>
395where
396    F: Fn(&str) -> Error,
397{
398    // Check for special rowid column
399    if name.eq_ignore_ascii_case(ROW_ID_COLUMN_NAME) {
400        return Ok(ROW_ID_FIELD_ID);
401    }
402
403    schema
404        .resolve(name)
405        .map(|column| column.field_id)
406        .ok_or_else(|| unknown_column(name))
407}
408
409// TODO: Move to `resolvers.rs`
410pub fn resolve_field_id_from_schema(schema: &ExecutorSchema, name: &str) -> LlkvResult<FieldId> {
411    if name.eq_ignore_ascii_case(ROW_ID_COLUMN_NAME) {
412        return Ok(ROW_ID_FIELD_ID);
413    }
414
415    schema
416        .resolve(name)
417        .map(|column| column.field_id)
418        .ok_or_else(|| {
419            Error::InvalidArgumentError(format!(
420                "Binder Error: does not have a column named '{name}'"
421            ))
422        })
423}