1use std::sync::Arc;
19
20use datafusion::execution::registry::FunctionRegistry;
21use datafusion_common::{
22 exec_datafusion_err, internal_err, plan_datafusion_err, RecursionUnnestOption,
23 Result, ScalarValue, TableReference, UnnestOptions,
24};
25use datafusion_expr::dml::InsertOp;
26use datafusion_expr::expr::{Alias, Placeholder, Sort};
27use datafusion_expr::expr::{Unnest, WildcardOptions};
28use datafusion_expr::{
29 expr::{self, InList, WindowFunction},
30 logical_plan::{PlanType, StringifiedPlan},
31 Between, BinaryExpr, Case, Cast, Expr, GroupingSet,
32 GroupingSet::GroupingSets,
33 JoinConstraint, JoinType, Like, Operator, TryCast, WindowFrame, WindowFrameBound,
34 WindowFrameUnits,
35};
36use datafusion_expr::{ExprFunctionExt, WriteOp};
37use datafusion_proto_common::{from_proto::FromOptionalField, FromProtoError as Error};
38
39use crate::protobuf::plan_type::PlanTypeEnum::{
40 FinalPhysicalPlanWithSchema, InitialPhysicalPlanWithSchema,
41};
42use crate::protobuf::{
43 self,
44 plan_type::PlanTypeEnum::{
45 AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
46 FinalPhysicalPlan, FinalPhysicalPlanWithStats, InitialLogicalPlan,
47 InitialPhysicalPlan, InitialPhysicalPlanWithStats, OptimizedLogicalPlan,
48 OptimizedPhysicalPlan, PhysicalPlanError,
49 },
50 AnalyzedLogicalPlanType, CubeNode, GroupingSetNode, OptimizedLogicalPlanType,
51 OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
52};
53
54use super::LogicalExtensionCodec;
55
56impl From<&protobuf::UnnestOptions> for UnnestOptions {
57 fn from(opts: &protobuf::UnnestOptions) -> Self {
58 Self {
59 preserve_nulls: opts.preserve_nulls,
60 recursions: opts
61 .recursions
62 .iter()
63 .map(|r| RecursionUnnestOption {
64 input_column: r.input_column.as_ref().unwrap().into(),
65 output_column: r.output_column.as_ref().unwrap().into(),
66 depth: r.depth as usize,
67 })
68 .collect::<Vec<_>>(),
69 }
70 }
71}
72
73impl From<protobuf::WindowFrameUnits> for WindowFrameUnits {
74 fn from(units: protobuf::WindowFrameUnits) -> Self {
75 match units {
76 protobuf::WindowFrameUnits::Rows => Self::Rows,
77 protobuf::WindowFrameUnits::Range => Self::Range,
78 protobuf::WindowFrameUnits::Groups => Self::Groups,
79 }
80 }
81}
82
83impl TryFrom<protobuf::TableReference> for TableReference {
84 type Error = Error;
85
86 fn try_from(value: protobuf::TableReference) -> Result<Self, Self::Error> {
87 use protobuf::table_reference::TableReferenceEnum;
88 let table_reference_enum = value
89 .table_reference_enum
90 .ok_or_else(|| Error::required("table_reference_enum"))?;
91
92 match table_reference_enum {
93 TableReferenceEnum::Bare(protobuf::BareTableReference { table }) => {
94 Ok(TableReference::bare(table))
95 }
96 TableReferenceEnum::Partial(protobuf::PartialTableReference {
97 schema,
98 table,
99 }) => Ok(TableReference::partial(schema, table)),
100 TableReferenceEnum::Full(protobuf::FullTableReference {
101 catalog,
102 schema,
103 table,
104 }) => Ok(TableReference::full(catalog, schema, table)),
105 }
106 }
107}
108
109impl From<&protobuf::StringifiedPlan> for StringifiedPlan {
110 fn from(stringified_plan: &protobuf::StringifiedPlan) -> Self {
111 Self {
112 plan_type: match stringified_plan
113 .plan_type
114 .as_ref()
115 .and_then(|pt| pt.plan_type_enum.as_ref())
116 .unwrap_or_else(|| {
117 panic!(
118 "Cannot create protobuf::StringifiedPlan from {stringified_plan:?}"
119 )
120 }) {
121 InitialLogicalPlan(_) => PlanType::InitialLogicalPlan,
122 AnalyzedLogicalPlan(AnalyzedLogicalPlanType { analyzer_name }) => {
123 PlanType::AnalyzedLogicalPlan {
124 analyzer_name:analyzer_name.clone()
125 }
126 }
127 FinalAnalyzedLogicalPlan(_) => PlanType::FinalAnalyzedLogicalPlan,
128 OptimizedLogicalPlan(OptimizedLogicalPlanType { optimizer_name }) => {
129 PlanType::OptimizedLogicalPlan {
130 optimizer_name: optimizer_name.clone(),
131 }
132 }
133 FinalLogicalPlan(_) => PlanType::FinalLogicalPlan,
134 InitialPhysicalPlan(_) => PlanType::InitialPhysicalPlan,
135 InitialPhysicalPlanWithStats(_) => PlanType::InitialPhysicalPlanWithStats,
136 InitialPhysicalPlanWithSchema(_) => PlanType::InitialPhysicalPlanWithSchema,
137 OptimizedPhysicalPlan(OptimizedPhysicalPlanType { optimizer_name }) => {
138 PlanType::OptimizedPhysicalPlan {
139 optimizer_name: optimizer_name.clone(),
140 }
141 }
142 FinalPhysicalPlan(_) => PlanType::FinalPhysicalPlan,
143 FinalPhysicalPlanWithStats(_) => PlanType::FinalPhysicalPlanWithStats,
144 FinalPhysicalPlanWithSchema(_) => PlanType::FinalPhysicalPlanWithSchema,
145 PhysicalPlanError(_) => PlanType::PhysicalPlanError,
146 },
147 plan: Arc::new(stringified_plan.plan.clone()),
148 }
149 }
150}
151
152impl TryFrom<protobuf::WindowFrame> for WindowFrame {
153 type Error = Error;
154
155 fn try_from(window: protobuf::WindowFrame) -> Result<Self, Self::Error> {
156 let units = protobuf::WindowFrameUnits::try_from(window.window_frame_units)
157 .map_err(|_| Error::unknown("WindowFrameUnits", window.window_frame_units))?
158 .into();
159 let start_bound = window.start_bound.required("start_bound")?;
160 let end_bound = window
161 .end_bound
162 .map(|end_bound| match end_bound {
163 protobuf::window_frame::EndBound::Bound(end_bound) => {
164 end_bound.try_into()
165 }
166 })
167 .transpose()?
168 .unwrap_or(WindowFrameBound::CurrentRow);
169 Ok(WindowFrame::new_bounds(units, start_bound, end_bound))
170 }
171}
172
173impl TryFrom<protobuf::WindowFrameBound> for WindowFrameBound {
174 type Error = Error;
175
176 fn try_from(bound: protobuf::WindowFrameBound) -> Result<Self, Self::Error> {
177 let bound_type =
178 protobuf::WindowFrameBoundType::try_from(bound.window_frame_bound_type)
179 .map_err(|_| {
180 Error::unknown("WindowFrameBoundType", bound.window_frame_bound_type)
181 })?;
182 match bound_type {
183 protobuf::WindowFrameBoundType::CurrentRow => Ok(Self::CurrentRow),
184 protobuf::WindowFrameBoundType::Preceding => match bound.bound_value {
185 Some(x) => Ok(Self::Preceding(ScalarValue::try_from(&x)?)),
186 None => Ok(Self::Preceding(ScalarValue::UInt64(None))),
187 },
188 protobuf::WindowFrameBoundType::Following => match bound.bound_value {
189 Some(x) => Ok(Self::Following(ScalarValue::try_from(&x)?)),
190 None => Ok(Self::Following(ScalarValue::UInt64(None))),
191 },
192 }
193 }
194}
195
196impl From<protobuf::JoinType> for JoinType {
197 fn from(t: protobuf::JoinType) -> Self {
198 match t {
199 protobuf::JoinType::Inner => JoinType::Inner,
200 protobuf::JoinType::Left => JoinType::Left,
201 protobuf::JoinType::Right => JoinType::Right,
202 protobuf::JoinType::Full => JoinType::Full,
203 protobuf::JoinType::Leftsemi => JoinType::LeftSemi,
204 protobuf::JoinType::Rightsemi => JoinType::RightSemi,
205 protobuf::JoinType::Leftanti => JoinType::LeftAnti,
206 protobuf::JoinType::Rightanti => JoinType::RightAnti,
207 protobuf::JoinType::Leftmark => JoinType::LeftMark,
208 }
209 }
210}
211
212impl From<protobuf::JoinConstraint> for JoinConstraint {
213 fn from(t: protobuf::JoinConstraint) -> Self {
214 match t {
215 protobuf::JoinConstraint::On => JoinConstraint::On,
216 protobuf::JoinConstraint::Using => JoinConstraint::Using,
217 }
218 }
219}
220
221impl From<protobuf::dml_node::Type> for WriteOp {
222 fn from(t: protobuf::dml_node::Type) -> Self {
223 match t {
224 protobuf::dml_node::Type::Update => WriteOp::Update,
225 protobuf::dml_node::Type::Delete => WriteOp::Delete,
226 protobuf::dml_node::Type::InsertAppend => WriteOp::Insert(InsertOp::Append),
227 protobuf::dml_node::Type::InsertOverwrite => {
228 WriteOp::Insert(InsertOp::Overwrite)
229 }
230 protobuf::dml_node::Type::InsertReplace => WriteOp::Insert(InsertOp::Replace),
231 protobuf::dml_node::Type::Ctas => WriteOp::Ctas,
232 }
233 }
234}
235
236pub fn parse_expr(
237 proto: &protobuf::LogicalExprNode,
238 registry: &dyn FunctionRegistry,
239 codec: &dyn LogicalExtensionCodec,
240) -> Result<Expr, Error> {
241 use protobuf::{logical_expr_node::ExprType, window_expr_node};
242
243 let expr_type = proto
244 .expr_type
245 .as_ref()
246 .ok_or_else(|| Error::required("expr_type"))?;
247
248 match expr_type {
249 ExprType::BinaryExpr(binary_expr) => {
250 let op = from_proto_binary_op(&binary_expr.op)?;
251 let operands = parse_exprs(&binary_expr.operands, registry, codec)?;
252
253 if operands.len() < 2 {
254 return Err(proto_error(
255 "A binary expression must always have at least 2 operands",
256 ));
257 }
258
259 Ok(operands
262 .into_iter()
263 .reduce(|left, right| {
264 Expr::BinaryExpr(BinaryExpr::new(Box::new(left), op, Box::new(right)))
265 })
266 .expect("Binary expression could not be reduced to a single expression."))
267 }
268 ExprType::Column(column) => Ok(Expr::Column(column.into())),
269 ExprType::Literal(literal) => {
270 let scalar_value: ScalarValue = literal.try_into()?;
271 Ok(Expr::Literal(scalar_value, None))
272 }
273 ExprType::WindowExpr(expr) => {
274 let window_function = expr
275 .window_function
276 .as_ref()
277 .ok_or_else(|| Error::required("window_function"))?;
278 let partition_by = parse_exprs(&expr.partition_by, registry, codec)?;
279 let mut order_by = parse_sorts(&expr.order_by, registry, codec)?;
280 let window_frame = expr
281 .window_frame
282 .as_ref()
283 .map::<Result<WindowFrame, _>, _>(|window_frame| {
284 let window_frame: WindowFrame = window_frame.clone().try_into()?;
285 window_frame
286 .regularize_order_bys(&mut order_by)
287 .map(|_| window_frame)
288 })
289 .transpose()?
290 .ok_or_else(|| {
291 exec_datafusion_err!("missing window frame during deserialization")
292 })?;
293
294 match window_function {
296 window_expr_node::WindowFunction::Udaf(udaf_name) => {
297 let udaf_function = match &expr.fun_definition {
298 Some(buf) => codec.try_decode_udaf(udaf_name, buf)?,
299 None => registry
300 .udaf(udaf_name)
301 .or_else(|_| codec.try_decode_udaf(udaf_name, &[]))?,
302 };
303
304 let args = parse_exprs(&expr.exprs, registry, codec)?;
305 Expr::from(WindowFunction::new(
306 expr::WindowFunctionDefinition::AggregateUDF(udaf_function),
307 args,
308 ))
309 .partition_by(partition_by)
310 .order_by(order_by)
311 .window_frame(window_frame)
312 .build()
313 .map_err(Error::DataFusionError)
314 }
315 window_expr_node::WindowFunction::Udwf(udwf_name) => {
316 let udwf_function = match &expr.fun_definition {
317 Some(buf) => codec.try_decode_udwf(udwf_name, buf)?,
318 None => registry
319 .udwf(udwf_name)
320 .or_else(|_| codec.try_decode_udwf(udwf_name, &[]))?,
321 };
322
323 let args = parse_exprs(&expr.exprs, registry, codec)?;
324 Expr::from(WindowFunction::new(
325 expr::WindowFunctionDefinition::WindowUDF(udwf_function),
326 args,
327 ))
328 .partition_by(partition_by)
329 .order_by(order_by)
330 .window_frame(window_frame)
331 .build()
332 .map_err(Error::DataFusionError)
333 }
334 }
335 }
336 ExprType::Alias(alias) => Ok(Expr::Alias(Alias::new(
337 parse_required_expr(alias.expr.as_deref(), registry, "expr", codec)?,
338 alias
339 .relation
340 .first()
341 .map(|r| TableReference::try_from(r.clone()))
342 .transpose()?,
343 alias.alias.clone(),
344 ))),
345 ExprType::IsNullExpr(is_null) => Ok(Expr::IsNull(Box::new(parse_required_expr(
346 is_null.expr.as_deref(),
347 registry,
348 "expr",
349 codec,
350 )?))),
351 ExprType::IsNotNullExpr(is_not_null) => Ok(Expr::IsNotNull(Box::new(
352 parse_required_expr(is_not_null.expr.as_deref(), registry, "expr", codec)?,
353 ))),
354 ExprType::NotExpr(not) => Ok(Expr::Not(Box::new(parse_required_expr(
355 not.expr.as_deref(),
356 registry,
357 "expr",
358 codec,
359 )?))),
360 ExprType::IsTrue(msg) => Ok(Expr::IsTrue(Box::new(parse_required_expr(
361 msg.expr.as_deref(),
362 registry,
363 "expr",
364 codec,
365 )?))),
366 ExprType::IsFalse(msg) => Ok(Expr::IsFalse(Box::new(parse_required_expr(
367 msg.expr.as_deref(),
368 registry,
369 "expr",
370 codec,
371 )?))),
372 ExprType::IsUnknown(msg) => Ok(Expr::IsUnknown(Box::new(parse_required_expr(
373 msg.expr.as_deref(),
374 registry,
375 "expr",
376 codec,
377 )?))),
378 ExprType::IsNotTrue(msg) => Ok(Expr::IsNotTrue(Box::new(parse_required_expr(
379 msg.expr.as_deref(),
380 registry,
381 "expr",
382 codec,
383 )?))),
384 ExprType::IsNotFalse(msg) => Ok(Expr::IsNotFalse(Box::new(parse_required_expr(
385 msg.expr.as_deref(),
386 registry,
387 "expr",
388 codec,
389 )?))),
390 ExprType::IsNotUnknown(msg) => Ok(Expr::IsNotUnknown(Box::new(
391 parse_required_expr(msg.expr.as_deref(), registry, "expr", codec)?,
392 ))),
393 ExprType::Between(between) => Ok(Expr::Between(Between::new(
394 Box::new(parse_required_expr(
395 between.expr.as_deref(),
396 registry,
397 "expr",
398 codec,
399 )?),
400 between.negated,
401 Box::new(parse_required_expr(
402 between.low.as_deref(),
403 registry,
404 "expr",
405 codec,
406 )?),
407 Box::new(parse_required_expr(
408 between.high.as_deref(),
409 registry,
410 "expr",
411 codec,
412 )?),
413 ))),
414 ExprType::Like(like) => Ok(Expr::Like(Like::new(
415 like.negated,
416 Box::new(parse_required_expr(
417 like.expr.as_deref(),
418 registry,
419 "expr",
420 codec,
421 )?),
422 Box::new(parse_required_expr(
423 like.pattern.as_deref(),
424 registry,
425 "pattern",
426 codec,
427 )?),
428 parse_escape_char(&like.escape_char)?,
429 false,
430 ))),
431 ExprType::Ilike(like) => Ok(Expr::Like(Like::new(
432 like.negated,
433 Box::new(parse_required_expr(
434 like.expr.as_deref(),
435 registry,
436 "expr",
437 codec,
438 )?),
439 Box::new(parse_required_expr(
440 like.pattern.as_deref(),
441 registry,
442 "pattern",
443 codec,
444 )?),
445 parse_escape_char(&like.escape_char)?,
446 true,
447 ))),
448 ExprType::SimilarTo(like) => Ok(Expr::SimilarTo(Like::new(
449 like.negated,
450 Box::new(parse_required_expr(
451 like.expr.as_deref(),
452 registry,
453 "expr",
454 codec,
455 )?),
456 Box::new(parse_required_expr(
457 like.pattern.as_deref(),
458 registry,
459 "pattern",
460 codec,
461 )?),
462 parse_escape_char(&like.escape_char)?,
463 false,
464 ))),
465 ExprType::Case(case) => {
466 let when_then_expr = case
467 .when_then_expr
468 .iter()
469 .map(|e| {
470 let when_expr = parse_required_expr(
471 e.when_expr.as_ref(),
472 registry,
473 "when_expr",
474 codec,
475 )?;
476 let then_expr = parse_required_expr(
477 e.then_expr.as_ref(),
478 registry,
479 "then_expr",
480 codec,
481 )?;
482 Ok((Box::new(when_expr), Box::new(then_expr)))
483 })
484 .collect::<Result<Vec<(Box<Expr>, Box<Expr>)>, Error>>()?;
485 Ok(Expr::Case(Case::new(
486 parse_optional_expr(case.expr.as_deref(), registry, codec)?.map(Box::new),
487 when_then_expr,
488 parse_optional_expr(case.else_expr.as_deref(), registry, codec)?
489 .map(Box::new),
490 )))
491 }
492 ExprType::Cast(cast) => {
493 let expr = Box::new(parse_required_expr(
494 cast.expr.as_deref(),
495 registry,
496 "expr",
497 codec,
498 )?);
499 let data_type = cast.arrow_type.as_ref().required("arrow_type")?;
500 Ok(Expr::Cast(Cast::new(expr, data_type)))
501 }
502 ExprType::TryCast(cast) => {
503 let expr = Box::new(parse_required_expr(
504 cast.expr.as_deref(),
505 registry,
506 "expr",
507 codec,
508 )?);
509 let data_type = cast.arrow_type.as_ref().required("arrow_type")?;
510 Ok(Expr::TryCast(TryCast::new(expr, data_type)))
511 }
512 ExprType::Negative(negative) => Ok(Expr::Negative(Box::new(
513 parse_required_expr(negative.expr.as_deref(), registry, "expr", codec)?,
514 ))),
515 ExprType::Unnest(unnest) => {
516 let mut exprs = parse_exprs(&unnest.exprs, registry, codec)?;
517 if exprs.len() != 1 {
518 return Err(proto_error("Unnest must have exactly one expression"));
519 }
520 Ok(Expr::Unnest(Unnest::new(exprs.swap_remove(0))))
521 }
522 ExprType::InList(in_list) => Ok(Expr::InList(InList::new(
523 Box::new(parse_required_expr(
524 in_list.expr.as_deref(),
525 registry,
526 "expr",
527 codec,
528 )?),
529 parse_exprs(&in_list.list, registry, codec)?,
530 in_list.negated,
531 ))),
532 ExprType::Wildcard(protobuf::Wildcard { qualifier }) => {
533 let qualifier = qualifier.to_owned().map(|x| x.try_into()).transpose()?;
534 #[expect(deprecated)]
535 Ok(Expr::Wildcard {
536 qualifier,
537 options: Box::new(WildcardOptions::default()),
538 })
539 }
540 ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
541 fun_name,
542 args,
543 fun_definition,
544 }) => {
545 let scalar_fn = match fun_definition {
546 Some(buf) => codec.try_decode_udf(fun_name, buf)?,
547 None => registry
548 .udf(fun_name.as_str())
549 .or_else(|_| codec.try_decode_udf(fun_name, &[]))?,
550 };
551 Ok(Expr::ScalarFunction(expr::ScalarFunction::new_udf(
552 scalar_fn,
553 parse_exprs(args, registry, codec)?,
554 )))
555 }
556 ExprType::AggregateUdfExpr(pb) => {
557 let agg_fn = match &pb.fun_definition {
558 Some(buf) => codec.try_decode_udaf(&pb.fun_name, buf)?,
559 None => registry
560 .udaf(&pb.fun_name)
561 .or_else(|_| codec.try_decode_udaf(&pb.fun_name, &[]))?,
562 };
563
564 Ok(Expr::AggregateFunction(expr::AggregateFunction::new_udf(
565 agg_fn,
566 parse_exprs(&pb.args, registry, codec)?,
567 pb.distinct,
568 parse_optional_expr(pb.filter.as_deref(), registry, codec)?.map(Box::new),
569 match pb.order_by.len() {
570 0 => None,
571 _ => Some(parse_sorts(&pb.order_by, registry, codec)?),
572 },
573 None,
574 )))
575 }
576
577 ExprType::GroupingSet(GroupingSetNode { expr }) => {
578 Ok(Expr::GroupingSet(GroupingSets(
579 expr.iter()
580 .map(|expr_list| parse_exprs(&expr_list.expr, registry, codec))
581 .collect::<Result<Vec<_>, Error>>()?,
582 )))
583 }
584 ExprType::Cube(CubeNode { expr }) => Ok(Expr::GroupingSet(GroupingSet::Cube(
585 parse_exprs(expr, registry, codec)?,
586 ))),
587 ExprType::Rollup(RollupNode { expr }) => Ok(Expr::GroupingSet(
588 GroupingSet::Rollup(parse_exprs(expr, registry, codec)?),
589 )),
590 ExprType::Placeholder(PlaceholderNode { id, data_type }) => match data_type {
591 None => Ok(Expr::Placeholder(Placeholder::new(id.clone(), None))),
592 Some(data_type) => Ok(Expr::Placeholder(Placeholder::new(
593 id.clone(),
594 Some(data_type.try_into()?),
595 ))),
596 },
597 }
598}
599
600pub fn parse_exprs<'a, I>(
602 protos: I,
603 registry: &dyn FunctionRegistry,
604 codec: &dyn LogicalExtensionCodec,
605) -> Result<Vec<Expr>, Error>
606where
607 I: IntoIterator<Item = &'a protobuf::LogicalExprNode>,
608{
609 let res = protos
610 .into_iter()
611 .map(|elem| {
612 parse_expr(elem, registry, codec).map_err(|e| plan_datafusion_err!("{}", e))
613 })
614 .collect::<Result<Vec<_>>>()?;
615 Ok(res)
616}
617
618pub fn parse_sorts<'a, I>(
619 protos: I,
620 registry: &dyn FunctionRegistry,
621 codec: &dyn LogicalExtensionCodec,
622) -> Result<Vec<Sort>, Error>
623where
624 I: IntoIterator<Item = &'a protobuf::SortExprNode>,
625{
626 protos
627 .into_iter()
628 .map(|sort| parse_sort(sort, registry, codec))
629 .collect::<Result<Vec<Sort>, Error>>()
630}
631
632pub fn parse_sort(
633 sort: &protobuf::SortExprNode,
634 registry: &dyn FunctionRegistry,
635 codec: &dyn LogicalExtensionCodec,
636) -> Result<Sort, Error> {
637 Ok(Sort::new(
638 parse_required_expr(sort.expr.as_ref(), registry, "expr", codec)?,
639 sort.asc,
640 sort.nulls_first,
641 ))
642}
643
644fn parse_escape_char(s: &str) -> Result<Option<char>> {
646 match s.len() {
647 0 => Ok(None),
648 1 => Ok(s.chars().next()),
649 _ => internal_err!("Invalid length for escape char"),
650 }
651}
652
653pub fn from_proto_binary_op(op: &str) -> Result<Operator, Error> {
654 match op {
655 "And" => Ok(Operator::And),
656 "Or" => Ok(Operator::Or),
657 "Eq" => Ok(Operator::Eq),
658 "NotEq" => Ok(Operator::NotEq),
659 "LtEq" => Ok(Operator::LtEq),
660 "Lt" => Ok(Operator::Lt),
661 "Gt" => Ok(Operator::Gt),
662 "GtEq" => Ok(Operator::GtEq),
663 "Plus" => Ok(Operator::Plus),
664 "Minus" => Ok(Operator::Minus),
665 "Multiply" => Ok(Operator::Multiply),
666 "Divide" => Ok(Operator::Divide),
667 "Modulo" => Ok(Operator::Modulo),
668 "IsDistinctFrom" => Ok(Operator::IsDistinctFrom),
669 "IsNotDistinctFrom" => Ok(Operator::IsNotDistinctFrom),
670 "BitwiseAnd" => Ok(Operator::BitwiseAnd),
671 "BitwiseOr" => Ok(Operator::BitwiseOr),
672 "BitwiseXor" => Ok(Operator::BitwiseXor),
673 "BitwiseShiftLeft" => Ok(Operator::BitwiseShiftLeft),
674 "BitwiseShiftRight" => Ok(Operator::BitwiseShiftRight),
675 "RegexIMatch" => Ok(Operator::RegexIMatch),
676 "RegexMatch" => Ok(Operator::RegexMatch),
677 "RegexNotIMatch" => Ok(Operator::RegexNotIMatch),
678 "RegexNotMatch" => Ok(Operator::RegexNotMatch),
679 "StringConcat" => Ok(Operator::StringConcat),
680 "AtArrow" => Ok(Operator::AtArrow),
681 "ArrowAt" => Ok(Operator::ArrowAt),
682 other => Err(proto_error(format!(
683 "Unsupported binary operator '{other:?}'"
684 ))),
685 }
686}
687
688fn parse_optional_expr(
689 p: Option<&protobuf::LogicalExprNode>,
690 registry: &dyn FunctionRegistry,
691 codec: &dyn LogicalExtensionCodec,
692) -> Result<Option<Expr>, Error> {
693 match p {
694 Some(expr) => parse_expr(expr, registry, codec).map(Some),
695 None => Ok(None),
696 }
697}
698
699fn parse_required_expr(
700 p: Option<&protobuf::LogicalExprNode>,
701 registry: &dyn FunctionRegistry,
702 field: impl Into<String>,
703 codec: &dyn LogicalExtensionCodec,
704) -> Result<Expr, Error> {
705 match p {
706 Some(expr) => parse_expr(expr, registry, codec),
707 None => Err(Error::required(field)),
708 }
709}
710
711fn proto_error<S: Into<String>>(message: S) -> Error {
712 Error::General(message.into())
713}