datafusion_proto/logical_plan/
from_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use datafusion::execution::registry::FunctionRegistry;
21use datafusion_common::{
22    exec_datafusion_err, internal_err, plan_datafusion_err, RecursionUnnestOption,
23    Result, ScalarValue, TableReference, UnnestOptions,
24};
25use datafusion_expr::dml::InsertOp;
26use datafusion_expr::expr::{Alias, Placeholder, Sort};
27use datafusion_expr::expr::{Unnest, WildcardOptions};
28use datafusion_expr::{
29    expr::{self, InList, WindowFunction},
30    logical_plan::{PlanType, StringifiedPlan},
31    Between, BinaryExpr, Case, Cast, Expr, GroupingSet,
32    GroupingSet::GroupingSets,
33    JoinConstraint, JoinType, Like, Operator, TryCast, WindowFrame, WindowFrameBound,
34    WindowFrameUnits,
35};
36use datafusion_expr::{ExprFunctionExt, WriteOp};
37use datafusion_proto_common::{from_proto::FromOptionalField, FromProtoError as Error};
38
39use crate::protobuf::plan_type::PlanTypeEnum::{
40    FinalPhysicalPlanWithSchema, InitialPhysicalPlanWithSchema,
41};
42use crate::protobuf::{
43    self,
44    plan_type::PlanTypeEnum::{
45        AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
46        FinalPhysicalPlan, FinalPhysicalPlanWithStats, InitialLogicalPlan,
47        InitialPhysicalPlan, InitialPhysicalPlanWithStats, OptimizedLogicalPlan,
48        OptimizedPhysicalPlan, PhysicalPlanError,
49    },
50    AnalyzedLogicalPlanType, CubeNode, GroupingSetNode, OptimizedLogicalPlanType,
51    OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
52};
53
54use super::LogicalExtensionCodec;
55
56impl From<&protobuf::UnnestOptions> for UnnestOptions {
57    fn from(opts: &protobuf::UnnestOptions) -> Self {
58        Self {
59            preserve_nulls: opts.preserve_nulls,
60            recursions: opts
61                .recursions
62                .iter()
63                .map(|r| RecursionUnnestOption {
64                    input_column: r.input_column.as_ref().unwrap().into(),
65                    output_column: r.output_column.as_ref().unwrap().into(),
66                    depth: r.depth as usize,
67                })
68                .collect::<Vec<_>>(),
69        }
70    }
71}
72
73impl From<protobuf::WindowFrameUnits> for WindowFrameUnits {
74    fn from(units: protobuf::WindowFrameUnits) -> Self {
75        match units {
76            protobuf::WindowFrameUnits::Rows => Self::Rows,
77            protobuf::WindowFrameUnits::Range => Self::Range,
78            protobuf::WindowFrameUnits::Groups => Self::Groups,
79        }
80    }
81}
82
83impl TryFrom<protobuf::TableReference> for TableReference {
84    type Error = Error;
85
86    fn try_from(value: protobuf::TableReference) -> Result<Self, Self::Error> {
87        use protobuf::table_reference::TableReferenceEnum;
88        let table_reference_enum = value
89            .table_reference_enum
90            .ok_or_else(|| Error::required("table_reference_enum"))?;
91
92        match table_reference_enum {
93            TableReferenceEnum::Bare(protobuf::BareTableReference { table }) => {
94                Ok(TableReference::bare(table))
95            }
96            TableReferenceEnum::Partial(protobuf::PartialTableReference {
97                schema,
98                table,
99            }) => Ok(TableReference::partial(schema, table)),
100            TableReferenceEnum::Full(protobuf::FullTableReference {
101                catalog,
102                schema,
103                table,
104            }) => Ok(TableReference::full(catalog, schema, table)),
105        }
106    }
107}
108
109impl From<&protobuf::StringifiedPlan> for StringifiedPlan {
110    fn from(stringified_plan: &protobuf::StringifiedPlan) -> Self {
111        Self {
112            plan_type: match stringified_plan
113                .plan_type
114                .as_ref()
115                .and_then(|pt| pt.plan_type_enum.as_ref())
116                .unwrap_or_else(|| {
117                    panic!(
118                        "Cannot create protobuf::StringifiedPlan from {stringified_plan:?}"
119                    )
120                }) {
121                InitialLogicalPlan(_) => PlanType::InitialLogicalPlan,
122                AnalyzedLogicalPlan(AnalyzedLogicalPlanType { analyzer_name }) => {
123                    PlanType::AnalyzedLogicalPlan {
124                        analyzer_name:analyzer_name.clone()
125                    }
126                }
127                FinalAnalyzedLogicalPlan(_) => PlanType::FinalAnalyzedLogicalPlan,
128                OptimizedLogicalPlan(OptimizedLogicalPlanType { optimizer_name }) => {
129                    PlanType::OptimizedLogicalPlan {
130                        optimizer_name: optimizer_name.clone(),
131                    }
132                }
133                FinalLogicalPlan(_) => PlanType::FinalLogicalPlan,
134                InitialPhysicalPlan(_) => PlanType::InitialPhysicalPlan,
135                InitialPhysicalPlanWithStats(_) => PlanType::InitialPhysicalPlanWithStats,
136                InitialPhysicalPlanWithSchema(_) => PlanType::InitialPhysicalPlanWithSchema,
137                OptimizedPhysicalPlan(OptimizedPhysicalPlanType { optimizer_name }) => {
138                    PlanType::OptimizedPhysicalPlan {
139                        optimizer_name: optimizer_name.clone(),
140                    }
141                }
142                FinalPhysicalPlan(_) => PlanType::FinalPhysicalPlan,
143                FinalPhysicalPlanWithStats(_) => PlanType::FinalPhysicalPlanWithStats,
144                FinalPhysicalPlanWithSchema(_) => PlanType::FinalPhysicalPlanWithSchema,
145                PhysicalPlanError(_) => PlanType::PhysicalPlanError,
146            },
147            plan: Arc::new(stringified_plan.plan.clone()),
148        }
149    }
150}
151
152impl TryFrom<protobuf::WindowFrame> for WindowFrame {
153    type Error = Error;
154
155    fn try_from(window: protobuf::WindowFrame) -> Result<Self, Self::Error> {
156        let units = protobuf::WindowFrameUnits::try_from(window.window_frame_units)
157            .map_err(|_| Error::unknown("WindowFrameUnits", window.window_frame_units))?
158            .into();
159        let start_bound = window.start_bound.required("start_bound")?;
160        let end_bound = window
161            .end_bound
162            .map(|end_bound| match end_bound {
163                protobuf::window_frame::EndBound::Bound(end_bound) => {
164                    end_bound.try_into()
165                }
166            })
167            .transpose()?
168            .unwrap_or(WindowFrameBound::CurrentRow);
169        Ok(WindowFrame::new_bounds(units, start_bound, end_bound))
170    }
171}
172
173impl TryFrom<protobuf::WindowFrameBound> for WindowFrameBound {
174    type Error = Error;
175
176    fn try_from(bound: protobuf::WindowFrameBound) -> Result<Self, Self::Error> {
177        let bound_type =
178            protobuf::WindowFrameBoundType::try_from(bound.window_frame_bound_type)
179                .map_err(|_| {
180                    Error::unknown("WindowFrameBoundType", bound.window_frame_bound_type)
181                })?;
182        match bound_type {
183            protobuf::WindowFrameBoundType::CurrentRow => Ok(Self::CurrentRow),
184            protobuf::WindowFrameBoundType::Preceding => match bound.bound_value {
185                Some(x) => Ok(Self::Preceding(ScalarValue::try_from(&x)?)),
186                None => Ok(Self::Preceding(ScalarValue::UInt64(None))),
187            },
188            protobuf::WindowFrameBoundType::Following => match bound.bound_value {
189                Some(x) => Ok(Self::Following(ScalarValue::try_from(&x)?)),
190                None => Ok(Self::Following(ScalarValue::UInt64(None))),
191            },
192        }
193    }
194}
195
196impl From<protobuf::JoinType> for JoinType {
197    fn from(t: protobuf::JoinType) -> Self {
198        match t {
199            protobuf::JoinType::Inner => JoinType::Inner,
200            protobuf::JoinType::Left => JoinType::Left,
201            protobuf::JoinType::Right => JoinType::Right,
202            protobuf::JoinType::Full => JoinType::Full,
203            protobuf::JoinType::Leftsemi => JoinType::LeftSemi,
204            protobuf::JoinType::Rightsemi => JoinType::RightSemi,
205            protobuf::JoinType::Leftanti => JoinType::LeftAnti,
206            protobuf::JoinType::Rightanti => JoinType::RightAnti,
207            protobuf::JoinType::Leftmark => JoinType::LeftMark,
208        }
209    }
210}
211
212impl From<protobuf::JoinConstraint> for JoinConstraint {
213    fn from(t: protobuf::JoinConstraint) -> Self {
214        match t {
215            protobuf::JoinConstraint::On => JoinConstraint::On,
216            protobuf::JoinConstraint::Using => JoinConstraint::Using,
217        }
218    }
219}
220
221impl From<protobuf::dml_node::Type> for WriteOp {
222    fn from(t: protobuf::dml_node::Type) -> Self {
223        match t {
224            protobuf::dml_node::Type::Update => WriteOp::Update,
225            protobuf::dml_node::Type::Delete => WriteOp::Delete,
226            protobuf::dml_node::Type::InsertAppend => WriteOp::Insert(InsertOp::Append),
227            protobuf::dml_node::Type::InsertOverwrite => {
228                WriteOp::Insert(InsertOp::Overwrite)
229            }
230            protobuf::dml_node::Type::InsertReplace => WriteOp::Insert(InsertOp::Replace),
231            protobuf::dml_node::Type::Ctas => WriteOp::Ctas,
232        }
233    }
234}
235
236pub fn parse_expr(
237    proto: &protobuf::LogicalExprNode,
238    registry: &dyn FunctionRegistry,
239    codec: &dyn LogicalExtensionCodec,
240) -> Result<Expr, Error> {
241    use protobuf::{logical_expr_node::ExprType, window_expr_node};
242
243    let expr_type = proto
244        .expr_type
245        .as_ref()
246        .ok_or_else(|| Error::required("expr_type"))?;
247
248    match expr_type {
249        ExprType::BinaryExpr(binary_expr) => {
250            let op = from_proto_binary_op(&binary_expr.op)?;
251            let operands = parse_exprs(&binary_expr.operands, registry, codec)?;
252
253            if operands.len() < 2 {
254                return Err(proto_error(
255                    "A binary expression must always have at least 2 operands",
256                ));
257            }
258
259            // Reduce the linearized operands (ordered by left innermost to right
260            // outermost) into a single expression tree.
261            Ok(operands
262                .into_iter()
263                .reduce(|left, right| {
264                    Expr::BinaryExpr(BinaryExpr::new(Box::new(left), op, Box::new(right)))
265                })
266                .expect("Binary expression could not be reduced to a single expression."))
267        }
268        ExprType::Column(column) => Ok(Expr::Column(column.into())),
269        ExprType::Literal(literal) => {
270            let scalar_value: ScalarValue = literal.try_into()?;
271            Ok(Expr::Literal(scalar_value, None))
272        }
273        ExprType::WindowExpr(expr) => {
274            let window_function = expr
275                .window_function
276                .as_ref()
277                .ok_or_else(|| Error::required("window_function"))?;
278            let partition_by = parse_exprs(&expr.partition_by, registry, codec)?;
279            let mut order_by = parse_sorts(&expr.order_by, registry, codec)?;
280            let window_frame = expr
281                .window_frame
282                .as_ref()
283                .map::<Result<WindowFrame, _>, _>(|window_frame| {
284                    let window_frame: WindowFrame = window_frame.clone().try_into()?;
285                    window_frame
286                        .regularize_order_bys(&mut order_by)
287                        .map(|_| window_frame)
288                })
289                .transpose()?
290                .ok_or_else(|| {
291                    exec_datafusion_err!("missing window frame during deserialization")
292                })?;
293
294            // TODO: support proto for null treatment
295            match window_function {
296                window_expr_node::WindowFunction::Udaf(udaf_name) => {
297                    let udaf_function = match &expr.fun_definition {
298                        Some(buf) => codec.try_decode_udaf(udaf_name, buf)?,
299                        None => registry
300                            .udaf(udaf_name)
301                            .or_else(|_| codec.try_decode_udaf(udaf_name, &[]))?,
302                    };
303
304                    let args = parse_exprs(&expr.exprs, registry, codec)?;
305                    Expr::from(WindowFunction::new(
306                        expr::WindowFunctionDefinition::AggregateUDF(udaf_function),
307                        args,
308                    ))
309                    .partition_by(partition_by)
310                    .order_by(order_by)
311                    .window_frame(window_frame)
312                    .build()
313                    .map_err(Error::DataFusionError)
314                }
315                window_expr_node::WindowFunction::Udwf(udwf_name) => {
316                    let udwf_function = match &expr.fun_definition {
317                        Some(buf) => codec.try_decode_udwf(udwf_name, buf)?,
318                        None => registry
319                            .udwf(udwf_name)
320                            .or_else(|_| codec.try_decode_udwf(udwf_name, &[]))?,
321                    };
322
323                    let args = parse_exprs(&expr.exprs, registry, codec)?;
324                    Expr::from(WindowFunction::new(
325                        expr::WindowFunctionDefinition::WindowUDF(udwf_function),
326                        args,
327                    ))
328                    .partition_by(partition_by)
329                    .order_by(order_by)
330                    .window_frame(window_frame)
331                    .build()
332                    .map_err(Error::DataFusionError)
333                }
334            }
335        }
336        ExprType::Alias(alias) => Ok(Expr::Alias(Alias::new(
337            parse_required_expr(alias.expr.as_deref(), registry, "expr", codec)?,
338            alias
339                .relation
340                .first()
341                .map(|r| TableReference::try_from(r.clone()))
342                .transpose()?,
343            alias.alias.clone(),
344        ))),
345        ExprType::IsNullExpr(is_null) => Ok(Expr::IsNull(Box::new(parse_required_expr(
346            is_null.expr.as_deref(),
347            registry,
348            "expr",
349            codec,
350        )?))),
351        ExprType::IsNotNullExpr(is_not_null) => Ok(Expr::IsNotNull(Box::new(
352            parse_required_expr(is_not_null.expr.as_deref(), registry, "expr", codec)?,
353        ))),
354        ExprType::NotExpr(not) => Ok(Expr::Not(Box::new(parse_required_expr(
355            not.expr.as_deref(),
356            registry,
357            "expr",
358            codec,
359        )?))),
360        ExprType::IsTrue(msg) => Ok(Expr::IsTrue(Box::new(parse_required_expr(
361            msg.expr.as_deref(),
362            registry,
363            "expr",
364            codec,
365        )?))),
366        ExprType::IsFalse(msg) => Ok(Expr::IsFalse(Box::new(parse_required_expr(
367            msg.expr.as_deref(),
368            registry,
369            "expr",
370            codec,
371        )?))),
372        ExprType::IsUnknown(msg) => Ok(Expr::IsUnknown(Box::new(parse_required_expr(
373            msg.expr.as_deref(),
374            registry,
375            "expr",
376            codec,
377        )?))),
378        ExprType::IsNotTrue(msg) => Ok(Expr::IsNotTrue(Box::new(parse_required_expr(
379            msg.expr.as_deref(),
380            registry,
381            "expr",
382            codec,
383        )?))),
384        ExprType::IsNotFalse(msg) => Ok(Expr::IsNotFalse(Box::new(parse_required_expr(
385            msg.expr.as_deref(),
386            registry,
387            "expr",
388            codec,
389        )?))),
390        ExprType::IsNotUnknown(msg) => Ok(Expr::IsNotUnknown(Box::new(
391            parse_required_expr(msg.expr.as_deref(), registry, "expr", codec)?,
392        ))),
393        ExprType::Between(between) => Ok(Expr::Between(Between::new(
394            Box::new(parse_required_expr(
395                between.expr.as_deref(),
396                registry,
397                "expr",
398                codec,
399            )?),
400            between.negated,
401            Box::new(parse_required_expr(
402                between.low.as_deref(),
403                registry,
404                "expr",
405                codec,
406            )?),
407            Box::new(parse_required_expr(
408                between.high.as_deref(),
409                registry,
410                "expr",
411                codec,
412            )?),
413        ))),
414        ExprType::Like(like) => Ok(Expr::Like(Like::new(
415            like.negated,
416            Box::new(parse_required_expr(
417                like.expr.as_deref(),
418                registry,
419                "expr",
420                codec,
421            )?),
422            Box::new(parse_required_expr(
423                like.pattern.as_deref(),
424                registry,
425                "pattern",
426                codec,
427            )?),
428            parse_escape_char(&like.escape_char)?,
429            false,
430        ))),
431        ExprType::Ilike(like) => Ok(Expr::Like(Like::new(
432            like.negated,
433            Box::new(parse_required_expr(
434                like.expr.as_deref(),
435                registry,
436                "expr",
437                codec,
438            )?),
439            Box::new(parse_required_expr(
440                like.pattern.as_deref(),
441                registry,
442                "pattern",
443                codec,
444            )?),
445            parse_escape_char(&like.escape_char)?,
446            true,
447        ))),
448        ExprType::SimilarTo(like) => Ok(Expr::SimilarTo(Like::new(
449            like.negated,
450            Box::new(parse_required_expr(
451                like.expr.as_deref(),
452                registry,
453                "expr",
454                codec,
455            )?),
456            Box::new(parse_required_expr(
457                like.pattern.as_deref(),
458                registry,
459                "pattern",
460                codec,
461            )?),
462            parse_escape_char(&like.escape_char)?,
463            false,
464        ))),
465        ExprType::Case(case) => {
466            let when_then_expr = case
467                .when_then_expr
468                .iter()
469                .map(|e| {
470                    let when_expr = parse_required_expr(
471                        e.when_expr.as_ref(),
472                        registry,
473                        "when_expr",
474                        codec,
475                    )?;
476                    let then_expr = parse_required_expr(
477                        e.then_expr.as_ref(),
478                        registry,
479                        "then_expr",
480                        codec,
481                    )?;
482                    Ok((Box::new(when_expr), Box::new(then_expr)))
483                })
484                .collect::<Result<Vec<(Box<Expr>, Box<Expr>)>, Error>>()?;
485            Ok(Expr::Case(Case::new(
486                parse_optional_expr(case.expr.as_deref(), registry, codec)?.map(Box::new),
487                when_then_expr,
488                parse_optional_expr(case.else_expr.as_deref(), registry, codec)?
489                    .map(Box::new),
490            )))
491        }
492        ExprType::Cast(cast) => {
493            let expr = Box::new(parse_required_expr(
494                cast.expr.as_deref(),
495                registry,
496                "expr",
497                codec,
498            )?);
499            let data_type = cast.arrow_type.as_ref().required("arrow_type")?;
500            Ok(Expr::Cast(Cast::new(expr, data_type)))
501        }
502        ExprType::TryCast(cast) => {
503            let expr = Box::new(parse_required_expr(
504                cast.expr.as_deref(),
505                registry,
506                "expr",
507                codec,
508            )?);
509            let data_type = cast.arrow_type.as_ref().required("arrow_type")?;
510            Ok(Expr::TryCast(TryCast::new(expr, data_type)))
511        }
512        ExprType::Negative(negative) => Ok(Expr::Negative(Box::new(
513            parse_required_expr(negative.expr.as_deref(), registry, "expr", codec)?,
514        ))),
515        ExprType::Unnest(unnest) => {
516            let mut exprs = parse_exprs(&unnest.exprs, registry, codec)?;
517            if exprs.len() != 1 {
518                return Err(proto_error("Unnest must have exactly one expression"));
519            }
520            Ok(Expr::Unnest(Unnest::new(exprs.swap_remove(0))))
521        }
522        ExprType::InList(in_list) => Ok(Expr::InList(InList::new(
523            Box::new(parse_required_expr(
524                in_list.expr.as_deref(),
525                registry,
526                "expr",
527                codec,
528            )?),
529            parse_exprs(&in_list.list, registry, codec)?,
530            in_list.negated,
531        ))),
532        ExprType::Wildcard(protobuf::Wildcard { qualifier }) => {
533            let qualifier = qualifier.to_owned().map(|x| x.try_into()).transpose()?;
534            #[expect(deprecated)]
535            Ok(Expr::Wildcard {
536                qualifier,
537                options: Box::new(WildcardOptions::default()),
538            })
539        }
540        ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
541            fun_name,
542            args,
543            fun_definition,
544        }) => {
545            let scalar_fn = match fun_definition {
546                Some(buf) => codec.try_decode_udf(fun_name, buf)?,
547                None => registry
548                    .udf(fun_name.as_str())
549                    .or_else(|_| codec.try_decode_udf(fun_name, &[]))?,
550            };
551            Ok(Expr::ScalarFunction(expr::ScalarFunction::new_udf(
552                scalar_fn,
553                parse_exprs(args, registry, codec)?,
554            )))
555        }
556        ExprType::AggregateUdfExpr(pb) => {
557            let agg_fn = match &pb.fun_definition {
558                Some(buf) => codec.try_decode_udaf(&pb.fun_name, buf)?,
559                None => registry
560                    .udaf(&pb.fun_name)
561                    .or_else(|_| codec.try_decode_udaf(&pb.fun_name, &[]))?,
562            };
563
564            Ok(Expr::AggregateFunction(expr::AggregateFunction::new_udf(
565                agg_fn,
566                parse_exprs(&pb.args, registry, codec)?,
567                pb.distinct,
568                parse_optional_expr(pb.filter.as_deref(), registry, codec)?.map(Box::new),
569                match pb.order_by.len() {
570                    0 => None,
571                    _ => Some(parse_sorts(&pb.order_by, registry, codec)?),
572                },
573                None,
574            )))
575        }
576
577        ExprType::GroupingSet(GroupingSetNode { expr }) => {
578            Ok(Expr::GroupingSet(GroupingSets(
579                expr.iter()
580                    .map(|expr_list| parse_exprs(&expr_list.expr, registry, codec))
581                    .collect::<Result<Vec<_>, Error>>()?,
582            )))
583        }
584        ExprType::Cube(CubeNode { expr }) => Ok(Expr::GroupingSet(GroupingSet::Cube(
585            parse_exprs(expr, registry, codec)?,
586        ))),
587        ExprType::Rollup(RollupNode { expr }) => Ok(Expr::GroupingSet(
588            GroupingSet::Rollup(parse_exprs(expr, registry, codec)?),
589        )),
590        ExprType::Placeholder(PlaceholderNode { id, data_type }) => match data_type {
591            None => Ok(Expr::Placeholder(Placeholder::new(id.clone(), None))),
592            Some(data_type) => Ok(Expr::Placeholder(Placeholder::new(
593                id.clone(),
594                Some(data_type.try_into()?),
595            ))),
596        },
597    }
598}
599
600/// Parse a vector of `protobuf::LogicalExprNode`s.
601pub fn parse_exprs<'a, I>(
602    protos: I,
603    registry: &dyn FunctionRegistry,
604    codec: &dyn LogicalExtensionCodec,
605) -> Result<Vec<Expr>, Error>
606where
607    I: IntoIterator<Item = &'a protobuf::LogicalExprNode>,
608{
609    let res = protos
610        .into_iter()
611        .map(|elem| {
612            parse_expr(elem, registry, codec).map_err(|e| plan_datafusion_err!("{}", e))
613        })
614        .collect::<Result<Vec<_>>>()?;
615    Ok(res)
616}
617
618pub fn parse_sorts<'a, I>(
619    protos: I,
620    registry: &dyn FunctionRegistry,
621    codec: &dyn LogicalExtensionCodec,
622) -> Result<Vec<Sort>, Error>
623where
624    I: IntoIterator<Item = &'a protobuf::SortExprNode>,
625{
626    protos
627        .into_iter()
628        .map(|sort| parse_sort(sort, registry, codec))
629        .collect::<Result<Vec<Sort>, Error>>()
630}
631
632pub fn parse_sort(
633    sort: &protobuf::SortExprNode,
634    registry: &dyn FunctionRegistry,
635    codec: &dyn LogicalExtensionCodec,
636) -> Result<Sort, Error> {
637    Ok(Sort::new(
638        parse_required_expr(sort.expr.as_ref(), registry, "expr", codec)?,
639        sort.asc,
640        sort.nulls_first,
641    ))
642}
643
644/// Parse an optional escape_char for Like, ILike, SimilarTo
645fn parse_escape_char(s: &str) -> Result<Option<char>> {
646    match s.len() {
647        0 => Ok(None),
648        1 => Ok(s.chars().next()),
649        _ => internal_err!("Invalid length for escape char"),
650    }
651}
652
653pub fn from_proto_binary_op(op: &str) -> Result<Operator, Error> {
654    match op {
655        "And" => Ok(Operator::And),
656        "Or" => Ok(Operator::Or),
657        "Eq" => Ok(Operator::Eq),
658        "NotEq" => Ok(Operator::NotEq),
659        "LtEq" => Ok(Operator::LtEq),
660        "Lt" => Ok(Operator::Lt),
661        "Gt" => Ok(Operator::Gt),
662        "GtEq" => Ok(Operator::GtEq),
663        "Plus" => Ok(Operator::Plus),
664        "Minus" => Ok(Operator::Minus),
665        "Multiply" => Ok(Operator::Multiply),
666        "Divide" => Ok(Operator::Divide),
667        "Modulo" => Ok(Operator::Modulo),
668        "IsDistinctFrom" => Ok(Operator::IsDistinctFrom),
669        "IsNotDistinctFrom" => Ok(Operator::IsNotDistinctFrom),
670        "BitwiseAnd" => Ok(Operator::BitwiseAnd),
671        "BitwiseOr" => Ok(Operator::BitwiseOr),
672        "BitwiseXor" => Ok(Operator::BitwiseXor),
673        "BitwiseShiftLeft" => Ok(Operator::BitwiseShiftLeft),
674        "BitwiseShiftRight" => Ok(Operator::BitwiseShiftRight),
675        "RegexIMatch" => Ok(Operator::RegexIMatch),
676        "RegexMatch" => Ok(Operator::RegexMatch),
677        "RegexNotIMatch" => Ok(Operator::RegexNotIMatch),
678        "RegexNotMatch" => Ok(Operator::RegexNotMatch),
679        "StringConcat" => Ok(Operator::StringConcat),
680        "AtArrow" => Ok(Operator::AtArrow),
681        "ArrowAt" => Ok(Operator::ArrowAt),
682        other => Err(proto_error(format!(
683            "Unsupported binary operator '{other:?}'"
684        ))),
685    }
686}
687
688fn parse_optional_expr(
689    p: Option<&protobuf::LogicalExprNode>,
690    registry: &dyn FunctionRegistry,
691    codec: &dyn LogicalExtensionCodec,
692) -> Result<Option<Expr>, Error> {
693    match p {
694        Some(expr) => parse_expr(expr, registry, codec).map(Some),
695        None => Ok(None),
696    }
697}
698
699fn parse_required_expr(
700    p: Option<&protobuf::LogicalExprNode>,
701    registry: &dyn FunctionRegistry,
702    field: impl Into<String>,
703    codec: &dyn LogicalExtensionCodec,
704) -> Result<Expr, Error> {
705    match p {
706        Some(expr) => parse_expr(expr, registry, codec),
707        None => Err(Error::required(field)),
708    }
709}
710
711fn proto_error<S: Into<String>>(message: S) -> Error {
712    Error::General(message.into())
713}