1use std::sync::Arc;
19
20use arrow::datatypes::Field;
21use datafusion_common::{
22 exec_datafusion_err, internal_err, plan_datafusion_err, NullEquality,
23 RecursionUnnestOption, Result, ScalarValue, TableReference, UnnestOptions,
24};
25use datafusion_execution::registry::FunctionRegistry;
26use datafusion_expr::dml::InsertOp;
27use datafusion_expr::expr::{Alias, NullTreatment, Placeholder, Sort};
28use datafusion_expr::expr::{Unnest, WildcardOptions};
29use datafusion_expr::{
30 expr::{self, InList, WindowFunction},
31 logical_plan::{PlanType, StringifiedPlan},
32 Between, BinaryExpr, Case, Cast, Expr, GroupingSet,
33 GroupingSet::GroupingSets,
34 JoinConstraint, JoinType, Like, Operator, TryCast, WindowFrame, WindowFrameBound,
35 WindowFrameUnits,
36};
37use datafusion_expr::{ExprFunctionExt, WriteOp};
38use datafusion_proto_common::{from_proto::FromOptionalField, FromProtoError as Error};
39
40use crate::protobuf::plan_type::PlanTypeEnum::{
41 FinalPhysicalPlanWithSchema, InitialPhysicalPlanWithSchema,
42};
43use crate::protobuf::{
44 self,
45 plan_type::PlanTypeEnum::{
46 AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
47 FinalPhysicalPlan, FinalPhysicalPlanWithStats, InitialLogicalPlan,
48 InitialPhysicalPlan, InitialPhysicalPlanWithStats, OptimizedLogicalPlan,
49 OptimizedPhysicalPlan, PhysicalPlanError,
50 },
51 AnalyzedLogicalPlanType, CubeNode, GroupingSetNode, OptimizedLogicalPlanType,
52 OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
53};
54
55use super::LogicalExtensionCodec;
56
57impl From<&protobuf::UnnestOptions> for UnnestOptions {
58 fn from(opts: &protobuf::UnnestOptions) -> Self {
59 Self {
60 preserve_nulls: opts.preserve_nulls,
61 recursions: opts
62 .recursions
63 .iter()
64 .map(|r| RecursionUnnestOption {
65 input_column: r.input_column.as_ref().unwrap().into(),
66 output_column: r.output_column.as_ref().unwrap().into(),
67 depth: r.depth as usize,
68 })
69 .collect::<Vec<_>>(),
70 }
71 }
72}
73
74impl From<protobuf::WindowFrameUnits> for WindowFrameUnits {
75 fn from(units: protobuf::WindowFrameUnits) -> Self {
76 match units {
77 protobuf::WindowFrameUnits::Rows => Self::Rows,
78 protobuf::WindowFrameUnits::Range => Self::Range,
79 protobuf::WindowFrameUnits::Groups => Self::Groups,
80 }
81 }
82}
83
84impl TryFrom<protobuf::TableReference> for TableReference {
85 type Error = Error;
86
87 fn try_from(value: protobuf::TableReference) -> Result<Self, Self::Error> {
88 use protobuf::table_reference::TableReferenceEnum;
89 let table_reference_enum = value
90 .table_reference_enum
91 .ok_or_else(|| Error::required("table_reference_enum"))?;
92
93 match table_reference_enum {
94 TableReferenceEnum::Bare(protobuf::BareTableReference { table }) => {
95 Ok(TableReference::bare(table))
96 }
97 TableReferenceEnum::Partial(protobuf::PartialTableReference {
98 schema,
99 table,
100 }) => Ok(TableReference::partial(schema, table)),
101 TableReferenceEnum::Full(protobuf::FullTableReference {
102 catalog,
103 schema,
104 table,
105 }) => Ok(TableReference::full(catalog, schema, table)),
106 }
107 }
108}
109
110impl From<&protobuf::StringifiedPlan> for StringifiedPlan {
111 fn from(stringified_plan: &protobuf::StringifiedPlan) -> Self {
112 Self {
113 plan_type: match stringified_plan
114 .plan_type
115 .as_ref()
116 .and_then(|pt| pt.plan_type_enum.as_ref())
117 .unwrap_or_else(|| {
118 panic!(
119 "Cannot create protobuf::StringifiedPlan from {stringified_plan:?}"
120 )
121 }) {
122 InitialLogicalPlan(_) => PlanType::InitialLogicalPlan,
123 AnalyzedLogicalPlan(AnalyzedLogicalPlanType { analyzer_name }) => {
124 PlanType::AnalyzedLogicalPlan {
125 analyzer_name:analyzer_name.clone()
126 }
127 }
128 FinalAnalyzedLogicalPlan(_) => PlanType::FinalAnalyzedLogicalPlan,
129 OptimizedLogicalPlan(OptimizedLogicalPlanType { optimizer_name }) => {
130 PlanType::OptimizedLogicalPlan {
131 optimizer_name: optimizer_name.clone(),
132 }
133 }
134 FinalLogicalPlan(_) => PlanType::FinalLogicalPlan,
135 InitialPhysicalPlan(_) => PlanType::InitialPhysicalPlan,
136 InitialPhysicalPlanWithStats(_) => PlanType::InitialPhysicalPlanWithStats,
137 InitialPhysicalPlanWithSchema(_) => PlanType::InitialPhysicalPlanWithSchema,
138 OptimizedPhysicalPlan(OptimizedPhysicalPlanType { optimizer_name }) => {
139 PlanType::OptimizedPhysicalPlan {
140 optimizer_name: optimizer_name.clone(),
141 }
142 }
143 FinalPhysicalPlan(_) => PlanType::FinalPhysicalPlan,
144 FinalPhysicalPlanWithStats(_) => PlanType::FinalPhysicalPlanWithStats,
145 FinalPhysicalPlanWithSchema(_) => PlanType::FinalPhysicalPlanWithSchema,
146 PhysicalPlanError(_) => PlanType::PhysicalPlanError,
147 },
148 plan: Arc::new(stringified_plan.plan.clone()),
149 }
150 }
151}
152
153impl TryFrom<protobuf::WindowFrame> for WindowFrame {
154 type Error = Error;
155
156 fn try_from(window: protobuf::WindowFrame) -> Result<Self, Self::Error> {
157 let units = protobuf::WindowFrameUnits::try_from(window.window_frame_units)
158 .map_err(|_| Error::unknown("WindowFrameUnits", window.window_frame_units))?
159 .into();
160 let start_bound = window.start_bound.required("start_bound")?;
161 let end_bound = window
162 .end_bound
163 .map(|end_bound| match end_bound {
164 protobuf::window_frame::EndBound::Bound(end_bound) => {
165 end_bound.try_into()
166 }
167 })
168 .transpose()?
169 .unwrap_or(WindowFrameBound::CurrentRow);
170 Ok(WindowFrame::new_bounds(units, start_bound, end_bound))
171 }
172}
173
174impl TryFrom<protobuf::WindowFrameBound> for WindowFrameBound {
175 type Error = Error;
176
177 fn try_from(bound: protobuf::WindowFrameBound) -> Result<Self, Self::Error> {
178 let bound_type =
179 protobuf::WindowFrameBoundType::try_from(bound.window_frame_bound_type)
180 .map_err(|_| {
181 Error::unknown("WindowFrameBoundType", bound.window_frame_bound_type)
182 })?;
183 match bound_type {
184 protobuf::WindowFrameBoundType::CurrentRow => Ok(Self::CurrentRow),
185 protobuf::WindowFrameBoundType::Preceding => match bound.bound_value {
186 Some(x) => Ok(Self::Preceding(ScalarValue::try_from(&x)?)),
187 None => Ok(Self::Preceding(ScalarValue::UInt64(None))),
188 },
189 protobuf::WindowFrameBoundType::Following => match bound.bound_value {
190 Some(x) => Ok(Self::Following(ScalarValue::try_from(&x)?)),
191 None => Ok(Self::Following(ScalarValue::UInt64(None))),
192 },
193 }
194 }
195}
196
197impl From<protobuf::JoinType> for JoinType {
198 fn from(t: protobuf::JoinType) -> Self {
199 match t {
200 protobuf::JoinType::Inner => JoinType::Inner,
201 protobuf::JoinType::Left => JoinType::Left,
202 protobuf::JoinType::Right => JoinType::Right,
203 protobuf::JoinType::Full => JoinType::Full,
204 protobuf::JoinType::Leftsemi => JoinType::LeftSemi,
205 protobuf::JoinType::Rightsemi => JoinType::RightSemi,
206 protobuf::JoinType::Leftanti => JoinType::LeftAnti,
207 protobuf::JoinType::Rightanti => JoinType::RightAnti,
208 protobuf::JoinType::Leftmark => JoinType::LeftMark,
209 protobuf::JoinType::Rightmark => JoinType::RightMark,
210 }
211 }
212}
213
214impl From<protobuf::JoinConstraint> for JoinConstraint {
215 fn from(t: protobuf::JoinConstraint) -> Self {
216 match t {
217 protobuf::JoinConstraint::On => JoinConstraint::On,
218 protobuf::JoinConstraint::Using => JoinConstraint::Using,
219 }
220 }
221}
222
223impl From<protobuf::NullEquality> for NullEquality {
224 fn from(t: protobuf::NullEquality) -> Self {
225 match t {
226 protobuf::NullEquality::NullEqualsNothing => NullEquality::NullEqualsNothing,
227 protobuf::NullEquality::NullEqualsNull => NullEquality::NullEqualsNull,
228 }
229 }
230}
231
232impl From<protobuf::dml_node::Type> for WriteOp {
233 fn from(t: protobuf::dml_node::Type) -> Self {
234 match t {
235 protobuf::dml_node::Type::Update => WriteOp::Update,
236 protobuf::dml_node::Type::Delete => WriteOp::Delete,
237 protobuf::dml_node::Type::InsertAppend => WriteOp::Insert(InsertOp::Append),
238 protobuf::dml_node::Type::InsertOverwrite => {
239 WriteOp::Insert(InsertOp::Overwrite)
240 }
241 protobuf::dml_node::Type::InsertReplace => WriteOp::Insert(InsertOp::Replace),
242 protobuf::dml_node::Type::Ctas => WriteOp::Ctas,
243 }
244 }
245}
246
247impl From<protobuf::NullTreatment> for NullTreatment {
248 fn from(t: protobuf::NullTreatment) -> Self {
249 match t {
250 protobuf::NullTreatment::RespectNulls => NullTreatment::RespectNulls,
251 protobuf::NullTreatment::IgnoreNulls => NullTreatment::IgnoreNulls,
252 }
253 }
254}
255
256pub fn parse_expr(
257 proto: &protobuf::LogicalExprNode,
258 registry: &dyn FunctionRegistry,
259 codec: &dyn LogicalExtensionCodec,
260) -> Result<Expr, Error> {
261 use protobuf::{logical_expr_node::ExprType, window_expr_node};
262
263 let expr_type = proto
264 .expr_type
265 .as_ref()
266 .ok_or_else(|| Error::required("expr_type"))?;
267
268 match expr_type {
269 ExprType::BinaryExpr(binary_expr) => {
270 let op = from_proto_binary_op(&binary_expr.op)?;
271 let operands = parse_exprs(&binary_expr.operands, registry, codec)?;
272
273 if operands.len() < 2 {
274 return Err(proto_error(
275 "A binary expression must always have at least 2 operands",
276 ));
277 }
278
279 Ok(operands
282 .into_iter()
283 .reduce(|left, right| {
284 Expr::BinaryExpr(BinaryExpr::new(Box::new(left), op, Box::new(right)))
285 })
286 .expect("Binary expression could not be reduced to a single expression."))
287 }
288 ExprType::Column(column) => Ok(Expr::Column(column.into())),
289 ExprType::Literal(literal) => {
290 let scalar_value: ScalarValue = literal.try_into()?;
291 Ok(Expr::Literal(scalar_value, None))
292 }
293 ExprType::WindowExpr(expr) => {
294 let window_function = expr
295 .window_function
296 .as_ref()
297 .ok_or_else(|| Error::required("window_function"))?;
298 let partition_by = parse_exprs(&expr.partition_by, registry, codec)?;
299 let mut order_by = parse_sorts(&expr.order_by, registry, codec)?;
300 let window_frame = expr
301 .window_frame
302 .as_ref()
303 .map::<Result<WindowFrame, _>, _>(|window_frame| {
304 let window_frame: WindowFrame = window_frame.clone().try_into()?;
305 window_frame
306 .regularize_order_bys(&mut order_by)
307 .map(|_| window_frame)
308 })
309 .transpose()?
310 .ok_or_else(|| {
311 exec_datafusion_err!("missing window frame during deserialization")
312 })?;
313
314 let null_treatment = match expr.null_treatment {
315 Some(null_treatment) => {
316 let null_treatment = protobuf::NullTreatment::try_from(null_treatment)
317 .map_err(|_| {
318 proto_error(format!(
319 "Received a WindowExprNode message with unknown NullTreatment {null_treatment}",
320 ))
321 })?;
322 Some(NullTreatment::from(null_treatment))
323 }
324 None => None,
325 };
326
327 let agg_fn = match window_function {
328 window_expr_node::WindowFunction::Udaf(udaf_name) => {
329 let udaf_function = match &expr.fun_definition {
330 Some(buf) => codec.try_decode_udaf(udaf_name, buf)?,
331 None => registry
332 .udaf(udaf_name)
333 .or_else(|_| codec.try_decode_udaf(udaf_name, &[]))?,
334 };
335 expr::WindowFunctionDefinition::AggregateUDF(udaf_function)
336 }
337 window_expr_node::WindowFunction::Udwf(udwf_name) => {
338 let udwf_function = match &expr.fun_definition {
339 Some(buf) => codec.try_decode_udwf(udwf_name, buf)?,
340 None => registry
341 .udwf(udwf_name)
342 .or_else(|_| codec.try_decode_udwf(udwf_name, &[]))?,
343 };
344 expr::WindowFunctionDefinition::WindowUDF(udwf_function)
345 }
346 };
347
348 let args = parse_exprs(&expr.exprs, registry, codec)?;
349 let mut builder = Expr::from(WindowFunction::new(agg_fn, args))
350 .partition_by(partition_by)
351 .order_by(order_by)
352 .window_frame(window_frame)
353 .null_treatment(null_treatment);
354
355 if expr.distinct {
356 builder = builder.distinct();
357 };
358
359 if let Some(filter) =
360 parse_optional_expr(expr.filter.as_deref(), registry, codec)?
361 {
362 builder = builder.filter(filter);
363 }
364
365 builder.build().map_err(Error::DataFusionError)
366 }
367 ExprType::Alias(alias) => Ok(Expr::Alias(Alias::new(
368 parse_required_expr(alias.expr.as_deref(), registry, "expr", codec)?,
369 alias
370 .relation
371 .first()
372 .map(|r| TableReference::try_from(r.clone()))
373 .transpose()?,
374 alias.alias.clone(),
375 ))),
376 ExprType::IsNullExpr(is_null) => Ok(Expr::IsNull(Box::new(parse_required_expr(
377 is_null.expr.as_deref(),
378 registry,
379 "expr",
380 codec,
381 )?))),
382 ExprType::IsNotNullExpr(is_not_null) => Ok(Expr::IsNotNull(Box::new(
383 parse_required_expr(is_not_null.expr.as_deref(), registry, "expr", codec)?,
384 ))),
385 ExprType::NotExpr(not) => Ok(Expr::Not(Box::new(parse_required_expr(
386 not.expr.as_deref(),
387 registry,
388 "expr",
389 codec,
390 )?))),
391 ExprType::IsTrue(msg) => Ok(Expr::IsTrue(Box::new(parse_required_expr(
392 msg.expr.as_deref(),
393 registry,
394 "expr",
395 codec,
396 )?))),
397 ExprType::IsFalse(msg) => Ok(Expr::IsFalse(Box::new(parse_required_expr(
398 msg.expr.as_deref(),
399 registry,
400 "expr",
401 codec,
402 )?))),
403 ExprType::IsUnknown(msg) => Ok(Expr::IsUnknown(Box::new(parse_required_expr(
404 msg.expr.as_deref(),
405 registry,
406 "expr",
407 codec,
408 )?))),
409 ExprType::IsNotTrue(msg) => Ok(Expr::IsNotTrue(Box::new(parse_required_expr(
410 msg.expr.as_deref(),
411 registry,
412 "expr",
413 codec,
414 )?))),
415 ExprType::IsNotFalse(msg) => Ok(Expr::IsNotFalse(Box::new(parse_required_expr(
416 msg.expr.as_deref(),
417 registry,
418 "expr",
419 codec,
420 )?))),
421 ExprType::IsNotUnknown(msg) => Ok(Expr::IsNotUnknown(Box::new(
422 parse_required_expr(msg.expr.as_deref(), registry, "expr", codec)?,
423 ))),
424 ExprType::Between(between) => Ok(Expr::Between(Between::new(
425 Box::new(parse_required_expr(
426 between.expr.as_deref(),
427 registry,
428 "expr",
429 codec,
430 )?),
431 between.negated,
432 Box::new(parse_required_expr(
433 between.low.as_deref(),
434 registry,
435 "expr",
436 codec,
437 )?),
438 Box::new(parse_required_expr(
439 between.high.as_deref(),
440 registry,
441 "expr",
442 codec,
443 )?),
444 ))),
445 ExprType::Like(like) => Ok(Expr::Like(Like::new(
446 like.negated,
447 Box::new(parse_required_expr(
448 like.expr.as_deref(),
449 registry,
450 "expr",
451 codec,
452 )?),
453 Box::new(parse_required_expr(
454 like.pattern.as_deref(),
455 registry,
456 "pattern",
457 codec,
458 )?),
459 parse_escape_char(&like.escape_char)?,
460 false,
461 ))),
462 ExprType::Ilike(like) => Ok(Expr::Like(Like::new(
463 like.negated,
464 Box::new(parse_required_expr(
465 like.expr.as_deref(),
466 registry,
467 "expr",
468 codec,
469 )?),
470 Box::new(parse_required_expr(
471 like.pattern.as_deref(),
472 registry,
473 "pattern",
474 codec,
475 )?),
476 parse_escape_char(&like.escape_char)?,
477 true,
478 ))),
479 ExprType::SimilarTo(like) => Ok(Expr::SimilarTo(Like::new(
480 like.negated,
481 Box::new(parse_required_expr(
482 like.expr.as_deref(),
483 registry,
484 "expr",
485 codec,
486 )?),
487 Box::new(parse_required_expr(
488 like.pattern.as_deref(),
489 registry,
490 "pattern",
491 codec,
492 )?),
493 parse_escape_char(&like.escape_char)?,
494 false,
495 ))),
496 ExprType::Case(case) => {
497 let when_then_expr = case
498 .when_then_expr
499 .iter()
500 .map(|e| {
501 let when_expr = parse_required_expr(
502 e.when_expr.as_ref(),
503 registry,
504 "when_expr",
505 codec,
506 )?;
507 let then_expr = parse_required_expr(
508 e.then_expr.as_ref(),
509 registry,
510 "then_expr",
511 codec,
512 )?;
513 Ok((Box::new(when_expr), Box::new(then_expr)))
514 })
515 .collect::<Result<Vec<(Box<Expr>, Box<Expr>)>, Error>>()?;
516 Ok(Expr::Case(Case::new(
517 parse_optional_expr(case.expr.as_deref(), registry, codec)?.map(Box::new),
518 when_then_expr,
519 parse_optional_expr(case.else_expr.as_deref(), registry, codec)?
520 .map(Box::new),
521 )))
522 }
523 ExprType::Cast(cast) => {
524 let expr = Box::new(parse_required_expr(
525 cast.expr.as_deref(),
526 registry,
527 "expr",
528 codec,
529 )?);
530 let data_type = cast.arrow_type.as_ref().required("arrow_type")?;
531 Ok(Expr::Cast(Cast::new(expr, data_type)))
532 }
533 ExprType::TryCast(cast) => {
534 let expr = Box::new(parse_required_expr(
535 cast.expr.as_deref(),
536 registry,
537 "expr",
538 codec,
539 )?);
540 let data_type = cast.arrow_type.as_ref().required("arrow_type")?;
541 Ok(Expr::TryCast(TryCast::new(expr, data_type)))
542 }
543 ExprType::Negative(negative) => Ok(Expr::Negative(Box::new(
544 parse_required_expr(negative.expr.as_deref(), registry, "expr", codec)?,
545 ))),
546 ExprType::Unnest(unnest) => {
547 let mut exprs = parse_exprs(&unnest.exprs, registry, codec)?;
548 if exprs.len() != 1 {
549 return Err(proto_error("Unnest must have exactly one expression"));
550 }
551 Ok(Expr::Unnest(Unnest::new(exprs.swap_remove(0))))
552 }
553 ExprType::InList(in_list) => Ok(Expr::InList(InList::new(
554 Box::new(parse_required_expr(
555 in_list.expr.as_deref(),
556 registry,
557 "expr",
558 codec,
559 )?),
560 parse_exprs(&in_list.list, registry, codec)?,
561 in_list.negated,
562 ))),
563 ExprType::Wildcard(protobuf::Wildcard { qualifier }) => {
564 let qualifier = qualifier.to_owned().map(|x| x.try_into()).transpose()?;
565 #[expect(deprecated)]
566 Ok(Expr::Wildcard {
567 qualifier,
568 options: Box::new(WildcardOptions::default()),
569 })
570 }
571 ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
572 fun_name,
573 args,
574 fun_definition,
575 }) => {
576 let scalar_fn = match fun_definition {
577 Some(buf) => codec.try_decode_udf(fun_name, buf)?,
578 None => registry
579 .udf(fun_name.as_str())
580 .or_else(|_| codec.try_decode_udf(fun_name, &[]))?,
581 };
582 Ok(Expr::ScalarFunction(expr::ScalarFunction::new_udf(
583 scalar_fn,
584 parse_exprs(args, registry, codec)?,
585 )))
586 }
587 ExprType::AggregateUdfExpr(pb) => {
588 let agg_fn = match &pb.fun_definition {
589 Some(buf) => codec.try_decode_udaf(&pb.fun_name, buf)?,
590 None => registry
591 .udaf(&pb.fun_name)
592 .or_else(|_| codec.try_decode_udaf(&pb.fun_name, &[]))?,
593 };
594 let null_treatment = match pb.null_treatment {
595 Some(null_treatment) => {
596 let null_treatment = protobuf::NullTreatment::try_from(null_treatment)
597 .map_err(|_| {
598 proto_error(format!(
599 "Received an AggregateUdfExprNode message with unknown NullTreatment {null_treatment}",
600 ))
601 })?;
602 Some(NullTreatment::from(null_treatment))
603 }
604 None => None,
605 };
606
607 Ok(Expr::AggregateFunction(expr::AggregateFunction::new_udf(
608 agg_fn,
609 parse_exprs(&pb.args, registry, codec)?,
610 pb.distinct,
611 parse_optional_expr(pb.filter.as_deref(), registry, codec)?.map(Box::new),
612 parse_sorts(&pb.order_by, registry, codec)?,
613 null_treatment,
614 )))
615 }
616
617 ExprType::GroupingSet(GroupingSetNode { expr }) => {
618 Ok(Expr::GroupingSet(GroupingSets(
619 expr.iter()
620 .map(|expr_list| parse_exprs(&expr_list.expr, registry, codec))
621 .collect::<Result<Vec<_>, Error>>()?,
622 )))
623 }
624 ExprType::Cube(CubeNode { expr }) => Ok(Expr::GroupingSet(GroupingSet::Cube(
625 parse_exprs(expr, registry, codec)?,
626 ))),
627 ExprType::Rollup(RollupNode { expr }) => Ok(Expr::GroupingSet(
628 GroupingSet::Rollup(parse_exprs(expr, registry, codec)?),
629 )),
630 ExprType::Placeholder(PlaceholderNode {
631 id,
632 data_type,
633 nullable,
634 metadata,
635 }) => match data_type {
636 None => Ok(Expr::Placeholder(Placeholder::new_with_field(
637 id.clone(),
638 None,
639 ))),
640 Some(data_type) => {
641 let field =
642 Field::new("", data_type.try_into()?, nullable.unwrap_or(true))
643 .with_metadata(metadata.clone());
644 Ok(Expr::Placeholder(Placeholder::new_with_field(
645 id.clone(),
646 Some(field.into()),
647 )))
648 }
649 },
650 }
651}
652
653pub fn parse_exprs<'a, I>(
655 protos: I,
656 registry: &dyn FunctionRegistry,
657 codec: &dyn LogicalExtensionCodec,
658) -> Result<Vec<Expr>, Error>
659where
660 I: IntoIterator<Item = &'a protobuf::LogicalExprNode>,
661{
662 let res = protos
663 .into_iter()
664 .map(|elem| {
665 parse_expr(elem, registry, codec).map_err(|e| plan_datafusion_err!("{}", e))
666 })
667 .collect::<Result<Vec<_>>>()?;
668 Ok(res)
669}
670
671pub fn parse_sorts<'a, I>(
672 protos: I,
673 registry: &dyn FunctionRegistry,
674 codec: &dyn LogicalExtensionCodec,
675) -> Result<Vec<Sort>, Error>
676where
677 I: IntoIterator<Item = &'a protobuf::SortExprNode>,
678{
679 protos
680 .into_iter()
681 .map(|sort| parse_sort(sort, registry, codec))
682 .collect::<Result<Vec<Sort>, Error>>()
683}
684
685pub fn parse_sort(
686 sort: &protobuf::SortExprNode,
687 registry: &dyn FunctionRegistry,
688 codec: &dyn LogicalExtensionCodec,
689) -> Result<Sort, Error> {
690 Ok(Sort::new(
691 parse_required_expr(sort.expr.as_ref(), registry, "expr", codec)?,
692 sort.asc,
693 sort.nulls_first,
694 ))
695}
696
697fn parse_escape_char(s: &str) -> Result<Option<char>> {
699 match s.len() {
700 0 => Ok(None),
701 1 => Ok(s.chars().next()),
702 _ => internal_err!("Invalid length for escape char"),
703 }
704}
705
706pub fn from_proto_binary_op(op: &str) -> Result<Operator, Error> {
707 match op {
708 "And" => Ok(Operator::And),
709 "Or" => Ok(Operator::Or),
710 "Eq" => Ok(Operator::Eq),
711 "NotEq" => Ok(Operator::NotEq),
712 "LtEq" => Ok(Operator::LtEq),
713 "Lt" => Ok(Operator::Lt),
714 "Gt" => Ok(Operator::Gt),
715 "GtEq" => Ok(Operator::GtEq),
716 "Plus" => Ok(Operator::Plus),
717 "Minus" => Ok(Operator::Minus),
718 "Multiply" => Ok(Operator::Multiply),
719 "Divide" => Ok(Operator::Divide),
720 "Modulo" => Ok(Operator::Modulo),
721 "IsDistinctFrom" => Ok(Operator::IsDistinctFrom),
722 "IsNotDistinctFrom" => Ok(Operator::IsNotDistinctFrom),
723 "BitwiseAnd" => Ok(Operator::BitwiseAnd),
724 "BitwiseOr" => Ok(Operator::BitwiseOr),
725 "BitwiseXor" => Ok(Operator::BitwiseXor),
726 "BitwiseShiftLeft" => Ok(Operator::BitwiseShiftLeft),
727 "BitwiseShiftRight" => Ok(Operator::BitwiseShiftRight),
728 "RegexIMatch" => Ok(Operator::RegexIMatch),
729 "RegexMatch" => Ok(Operator::RegexMatch),
730 "RegexNotIMatch" => Ok(Operator::RegexNotIMatch),
731 "RegexNotMatch" => Ok(Operator::RegexNotMatch),
732 "StringConcat" => Ok(Operator::StringConcat),
733 "AtArrow" => Ok(Operator::AtArrow),
734 "ArrowAt" => Ok(Operator::ArrowAt),
735 other => Err(proto_error(format!(
736 "Unsupported binary operator '{other:?}'"
737 ))),
738 }
739}
740
741fn parse_optional_expr(
742 p: Option<&protobuf::LogicalExprNode>,
743 registry: &dyn FunctionRegistry,
744 codec: &dyn LogicalExtensionCodec,
745) -> Result<Option<Expr>, Error> {
746 match p {
747 Some(expr) => parse_expr(expr, registry, codec).map(Some),
748 None => Ok(None),
749 }
750}
751
752fn parse_required_expr(
753 p: Option<&protobuf::LogicalExprNode>,
754 registry: &dyn FunctionRegistry,
755 field: impl Into<String>,
756 codec: &dyn LogicalExtensionCodec,
757) -> Result<Expr, Error> {
758 match p {
759 Some(expr) => parse_expr(expr, registry, codec),
760 None => Err(Error::required(field)),
761 }
762}
763
764fn proto_error<S: Into<String>>(message: S) -> Error {
765 Error::General(message.into())
766}