1use std::sync::Arc;
19
20use arrow::datatypes::Field;
21use datafusion_common::{
22 NullEquality, RecursionUnnestOption, Result, ScalarValue, TableReference,
23 UnnestOptions, exec_datafusion_err, internal_err, plan_datafusion_err,
24};
25use datafusion_execution::registry::FunctionRegistry;
26use datafusion_expr::dml::InsertOp;
27use datafusion_expr::expr::{Alias, NullTreatment, Placeholder, Sort};
28use datafusion_expr::expr::{Unnest, WildcardOptions};
29use datafusion_expr::{
30 Between, BinaryExpr, Case, Cast, Expr, GroupingSet,
31 GroupingSet::GroupingSets,
32 JoinConstraint, JoinType, Like, Operator, TryCast, WindowFrame, WindowFrameBound,
33 WindowFrameUnits,
34 expr::{self, InList, WindowFunction},
35 logical_plan::{PlanType, StringifiedPlan},
36};
37use datafusion_expr::{ExprFunctionExt, WriteOp};
38use datafusion_proto_common::{FromProtoError as Error, from_proto::FromOptionalField};
39
40use crate::protobuf::plan_type::PlanTypeEnum::{
41 FinalPhysicalPlanWithSchema, InitialPhysicalPlanWithSchema,
42};
43use crate::protobuf::{
44 self, AnalyzedLogicalPlanType, CubeNode, GroupingSetNode, OptimizedLogicalPlanType,
45 OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
46 plan_type::PlanTypeEnum::{
47 AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
48 FinalPhysicalPlan, FinalPhysicalPlanWithStats, InitialLogicalPlan,
49 InitialPhysicalPlan, InitialPhysicalPlanWithStats, OptimizedLogicalPlan,
50 OptimizedPhysicalPlan, PhysicalPlanError,
51 },
52};
53
54use super::LogicalExtensionCodec;
55
56impl From<&protobuf::UnnestOptions> for UnnestOptions {
57 fn from(opts: &protobuf::UnnestOptions) -> Self {
58 Self {
59 preserve_nulls: opts.preserve_nulls,
60 recursions: opts
61 .recursions
62 .iter()
63 .map(|r| RecursionUnnestOption {
64 input_column: r.input_column.as_ref().unwrap().into(),
65 output_column: r.output_column.as_ref().unwrap().into(),
66 depth: r.depth as usize,
67 })
68 .collect::<Vec<_>>(),
69 }
70 }
71}
72
73impl From<protobuf::WindowFrameUnits> for WindowFrameUnits {
74 fn from(units: protobuf::WindowFrameUnits) -> Self {
75 match units {
76 protobuf::WindowFrameUnits::Rows => Self::Rows,
77 protobuf::WindowFrameUnits::Range => Self::Range,
78 protobuf::WindowFrameUnits::Groups => Self::Groups,
79 }
80 }
81}
82
83impl TryFrom<protobuf::TableReference> for TableReference {
84 type Error = Error;
85
86 fn try_from(value: protobuf::TableReference) -> Result<Self, Self::Error> {
87 use protobuf::table_reference::TableReferenceEnum;
88 let table_reference_enum = value
89 .table_reference_enum
90 .ok_or_else(|| Error::required("table_reference_enum"))?;
91
92 match table_reference_enum {
93 TableReferenceEnum::Bare(protobuf::BareTableReference { table }) => {
94 Ok(TableReference::bare(table))
95 }
96 TableReferenceEnum::Partial(protobuf::PartialTableReference {
97 schema,
98 table,
99 }) => Ok(TableReference::partial(schema, table)),
100 TableReferenceEnum::Full(protobuf::FullTableReference {
101 catalog,
102 schema,
103 table,
104 }) => Ok(TableReference::full(catalog, schema, table)),
105 }
106 }
107}
108
109impl From<&protobuf::StringifiedPlan> for StringifiedPlan {
110 fn from(stringified_plan: &protobuf::StringifiedPlan) -> Self {
111 Self {
112 plan_type: match stringified_plan
113 .plan_type
114 .as_ref()
115 .and_then(|pt| pt.plan_type_enum.as_ref())
116 .unwrap_or_else(|| {
117 panic!(
118 "Cannot create protobuf::StringifiedPlan from {stringified_plan:?}"
119 )
120 }) {
121 InitialLogicalPlan(_) => PlanType::InitialLogicalPlan,
122 AnalyzedLogicalPlan(AnalyzedLogicalPlanType { analyzer_name }) => {
123 PlanType::AnalyzedLogicalPlan {
124 analyzer_name:analyzer_name.clone()
125 }
126 }
127 FinalAnalyzedLogicalPlan(_) => PlanType::FinalAnalyzedLogicalPlan,
128 OptimizedLogicalPlan(OptimizedLogicalPlanType { optimizer_name }) => {
129 PlanType::OptimizedLogicalPlan {
130 optimizer_name: optimizer_name.clone(),
131 }
132 }
133 FinalLogicalPlan(_) => PlanType::FinalLogicalPlan,
134 InitialPhysicalPlan(_) => PlanType::InitialPhysicalPlan,
135 InitialPhysicalPlanWithStats(_) => PlanType::InitialPhysicalPlanWithStats,
136 InitialPhysicalPlanWithSchema(_) => PlanType::InitialPhysicalPlanWithSchema,
137 OptimizedPhysicalPlan(OptimizedPhysicalPlanType { optimizer_name }) => {
138 PlanType::OptimizedPhysicalPlan {
139 optimizer_name: optimizer_name.clone(),
140 }
141 }
142 FinalPhysicalPlan(_) => PlanType::FinalPhysicalPlan,
143 FinalPhysicalPlanWithStats(_) => PlanType::FinalPhysicalPlanWithStats,
144 FinalPhysicalPlanWithSchema(_) => PlanType::FinalPhysicalPlanWithSchema,
145 PhysicalPlanError(_) => PlanType::PhysicalPlanError,
146 },
147 plan: Arc::new(stringified_plan.plan.clone()),
148 }
149 }
150}
151
152impl TryFrom<protobuf::WindowFrame> for WindowFrame {
153 type Error = Error;
154
155 fn try_from(window: protobuf::WindowFrame) -> Result<Self, Self::Error> {
156 let units = protobuf::WindowFrameUnits::try_from(window.window_frame_units)
157 .map_err(|_| Error::unknown("WindowFrameUnits", window.window_frame_units))?
158 .into();
159 let start_bound = window.start_bound.required("start_bound")?;
160 let end_bound = window
161 .end_bound
162 .map(|end_bound| match end_bound {
163 protobuf::window_frame::EndBound::Bound(end_bound) => {
164 end_bound.try_into()
165 }
166 })
167 .transpose()?
168 .unwrap_or(WindowFrameBound::CurrentRow);
169 Ok(WindowFrame::new_bounds(units, start_bound, end_bound))
170 }
171}
172
173impl TryFrom<protobuf::WindowFrameBound> for WindowFrameBound {
174 type Error = Error;
175
176 fn try_from(bound: protobuf::WindowFrameBound) -> Result<Self, Self::Error> {
177 let bound_type =
178 protobuf::WindowFrameBoundType::try_from(bound.window_frame_bound_type)
179 .map_err(|_| {
180 Error::unknown("WindowFrameBoundType", bound.window_frame_bound_type)
181 })?;
182 match bound_type {
183 protobuf::WindowFrameBoundType::CurrentRow => Ok(Self::CurrentRow),
184 protobuf::WindowFrameBoundType::Preceding => match bound.bound_value {
185 Some(x) => Ok(Self::Preceding(ScalarValue::try_from(&x)?)),
186 None => Ok(Self::Preceding(ScalarValue::UInt64(None))),
187 },
188 protobuf::WindowFrameBoundType::Following => match bound.bound_value {
189 Some(x) => Ok(Self::Following(ScalarValue::try_from(&x)?)),
190 None => Ok(Self::Following(ScalarValue::UInt64(None))),
191 },
192 }
193 }
194}
195
196impl From<protobuf::JoinType> for JoinType {
197 fn from(t: protobuf::JoinType) -> Self {
198 match t {
199 protobuf::JoinType::Inner => JoinType::Inner,
200 protobuf::JoinType::Left => JoinType::Left,
201 protobuf::JoinType::Right => JoinType::Right,
202 protobuf::JoinType::Full => JoinType::Full,
203 protobuf::JoinType::Leftsemi => JoinType::LeftSemi,
204 protobuf::JoinType::Rightsemi => JoinType::RightSemi,
205 protobuf::JoinType::Leftanti => JoinType::LeftAnti,
206 protobuf::JoinType::Rightanti => JoinType::RightAnti,
207 protobuf::JoinType::Leftmark => JoinType::LeftMark,
208 protobuf::JoinType::Rightmark => JoinType::RightMark,
209 }
210 }
211}
212
213impl From<protobuf::JoinConstraint> for JoinConstraint {
214 fn from(t: protobuf::JoinConstraint) -> Self {
215 match t {
216 protobuf::JoinConstraint::On => JoinConstraint::On,
217 protobuf::JoinConstraint::Using => JoinConstraint::Using,
218 }
219 }
220}
221
222impl From<protobuf::NullEquality> for NullEquality {
223 fn from(t: protobuf::NullEquality) -> Self {
224 match t {
225 protobuf::NullEquality::NullEqualsNothing => NullEquality::NullEqualsNothing,
226 protobuf::NullEquality::NullEqualsNull => NullEquality::NullEqualsNull,
227 }
228 }
229}
230
231impl From<protobuf::dml_node::Type> for WriteOp {
232 fn from(t: protobuf::dml_node::Type) -> Self {
233 match t {
234 protobuf::dml_node::Type::Update => WriteOp::Update,
235 protobuf::dml_node::Type::Delete => WriteOp::Delete,
236 protobuf::dml_node::Type::InsertAppend => WriteOp::Insert(InsertOp::Append),
237 protobuf::dml_node::Type::InsertOverwrite => {
238 WriteOp::Insert(InsertOp::Overwrite)
239 }
240 protobuf::dml_node::Type::InsertReplace => WriteOp::Insert(InsertOp::Replace),
241 protobuf::dml_node::Type::Ctas => WriteOp::Ctas,
242 }
243 }
244}
245
246impl From<protobuf::NullTreatment> for NullTreatment {
247 fn from(t: protobuf::NullTreatment) -> Self {
248 match t {
249 protobuf::NullTreatment::RespectNulls => NullTreatment::RespectNulls,
250 protobuf::NullTreatment::IgnoreNulls => NullTreatment::IgnoreNulls,
251 }
252 }
253}
254
255pub fn parse_expr(
256 proto: &protobuf::LogicalExprNode,
257 registry: &dyn FunctionRegistry,
258 codec: &dyn LogicalExtensionCodec,
259) -> Result<Expr, Error> {
260 use protobuf::{logical_expr_node::ExprType, window_expr_node};
261
262 let expr_type = proto
263 .expr_type
264 .as_ref()
265 .ok_or_else(|| Error::required("expr_type"))?;
266
267 match expr_type {
268 ExprType::BinaryExpr(binary_expr) => {
269 let op = from_proto_binary_op(&binary_expr.op)?;
270 let operands = parse_exprs(&binary_expr.operands, registry, codec)?;
271
272 if operands.len() < 2 {
273 return Err(proto_error(
274 "A binary expression must always have at least 2 operands",
275 ));
276 }
277
278 Ok(operands
281 .into_iter()
282 .reduce(|left, right| {
283 Expr::BinaryExpr(BinaryExpr::new(Box::new(left), op, Box::new(right)))
284 })
285 .expect("Binary expression could not be reduced to a single expression."))
286 }
287 ExprType::Column(column) => Ok(Expr::Column(column.into())),
288 ExprType::Literal(literal) => {
289 let scalar_value: ScalarValue = literal.try_into()?;
290 Ok(Expr::Literal(scalar_value, None))
291 }
292 ExprType::WindowExpr(expr) => {
293 let window_function = expr
294 .window_function
295 .as_ref()
296 .ok_or_else(|| Error::required("window_function"))?;
297 let partition_by = parse_exprs(&expr.partition_by, registry, codec)?;
298 let mut order_by = parse_sorts(&expr.order_by, registry, codec)?;
299 let window_frame = expr
300 .window_frame
301 .as_ref()
302 .map::<Result<WindowFrame, _>, _>(|window_frame| {
303 let window_frame: WindowFrame = window_frame.clone().try_into()?;
304 window_frame
305 .regularize_order_bys(&mut order_by)
306 .map(|_| window_frame)
307 })
308 .transpose()?
309 .ok_or_else(|| {
310 exec_datafusion_err!("missing window frame during deserialization")
311 })?;
312
313 let null_treatment = match expr.null_treatment {
314 Some(null_treatment) => {
315 let null_treatment = protobuf::NullTreatment::try_from(null_treatment)
316 .map_err(|_| {
317 proto_error(format!(
318 "Received a WindowExprNode message with unknown NullTreatment {null_treatment}",
319 ))
320 })?;
321 Some(NullTreatment::from(null_treatment))
322 }
323 None => None,
324 };
325
326 let agg_fn = match window_function {
327 window_expr_node::WindowFunction::Udaf(udaf_name) => {
328 let udaf_function = match &expr.fun_definition {
329 Some(buf) => codec.try_decode_udaf(udaf_name, buf)?,
330 None => registry
331 .udaf(udaf_name)
332 .or_else(|_| codec.try_decode_udaf(udaf_name, &[]))?,
333 };
334 expr::WindowFunctionDefinition::AggregateUDF(udaf_function)
335 }
336 window_expr_node::WindowFunction::Udwf(udwf_name) => {
337 let udwf_function = match &expr.fun_definition {
338 Some(buf) => codec.try_decode_udwf(udwf_name, buf)?,
339 None => registry
340 .udwf(udwf_name)
341 .or_else(|_| codec.try_decode_udwf(udwf_name, &[]))?,
342 };
343 expr::WindowFunctionDefinition::WindowUDF(udwf_function)
344 }
345 };
346
347 let args = parse_exprs(&expr.exprs, registry, codec)?;
348 let mut builder = Expr::from(WindowFunction::new(agg_fn, args))
349 .partition_by(partition_by)
350 .order_by(order_by)
351 .window_frame(window_frame)
352 .null_treatment(null_treatment);
353
354 if expr.distinct {
355 builder = builder.distinct();
356 };
357
358 if let Some(filter) =
359 parse_optional_expr(expr.filter.as_deref(), registry, codec)?
360 {
361 builder = builder.filter(filter);
362 }
363
364 builder.build().map_err(Error::DataFusionError)
365 }
366 ExprType::Alias(alias) => Ok(Expr::Alias(Alias::new(
367 parse_required_expr(alias.expr.as_deref(), registry, "expr", codec)?,
368 alias
369 .relation
370 .first()
371 .map(|r| TableReference::try_from(r.clone()))
372 .transpose()?,
373 alias.alias.clone(),
374 ))),
375 ExprType::IsNullExpr(is_null) => Ok(Expr::IsNull(Box::new(parse_required_expr(
376 is_null.expr.as_deref(),
377 registry,
378 "expr",
379 codec,
380 )?))),
381 ExprType::IsNotNullExpr(is_not_null) => Ok(Expr::IsNotNull(Box::new(
382 parse_required_expr(is_not_null.expr.as_deref(), registry, "expr", codec)?,
383 ))),
384 ExprType::NotExpr(not) => Ok(Expr::Not(Box::new(parse_required_expr(
385 not.expr.as_deref(),
386 registry,
387 "expr",
388 codec,
389 )?))),
390 ExprType::IsTrue(msg) => Ok(Expr::IsTrue(Box::new(parse_required_expr(
391 msg.expr.as_deref(),
392 registry,
393 "expr",
394 codec,
395 )?))),
396 ExprType::IsFalse(msg) => Ok(Expr::IsFalse(Box::new(parse_required_expr(
397 msg.expr.as_deref(),
398 registry,
399 "expr",
400 codec,
401 )?))),
402 ExprType::IsUnknown(msg) => Ok(Expr::IsUnknown(Box::new(parse_required_expr(
403 msg.expr.as_deref(),
404 registry,
405 "expr",
406 codec,
407 )?))),
408 ExprType::IsNotTrue(msg) => Ok(Expr::IsNotTrue(Box::new(parse_required_expr(
409 msg.expr.as_deref(),
410 registry,
411 "expr",
412 codec,
413 )?))),
414 ExprType::IsNotFalse(msg) => Ok(Expr::IsNotFalse(Box::new(parse_required_expr(
415 msg.expr.as_deref(),
416 registry,
417 "expr",
418 codec,
419 )?))),
420 ExprType::IsNotUnknown(msg) => Ok(Expr::IsNotUnknown(Box::new(
421 parse_required_expr(msg.expr.as_deref(), registry, "expr", codec)?,
422 ))),
423 ExprType::Between(between) => Ok(Expr::Between(Between::new(
424 Box::new(parse_required_expr(
425 between.expr.as_deref(),
426 registry,
427 "expr",
428 codec,
429 )?),
430 between.negated,
431 Box::new(parse_required_expr(
432 between.low.as_deref(),
433 registry,
434 "expr",
435 codec,
436 )?),
437 Box::new(parse_required_expr(
438 between.high.as_deref(),
439 registry,
440 "expr",
441 codec,
442 )?),
443 ))),
444 ExprType::Like(like) => Ok(Expr::Like(Like::new(
445 like.negated,
446 Box::new(parse_required_expr(
447 like.expr.as_deref(),
448 registry,
449 "expr",
450 codec,
451 )?),
452 Box::new(parse_required_expr(
453 like.pattern.as_deref(),
454 registry,
455 "pattern",
456 codec,
457 )?),
458 parse_escape_char(&like.escape_char)?,
459 false,
460 ))),
461 ExprType::Ilike(like) => Ok(Expr::Like(Like::new(
462 like.negated,
463 Box::new(parse_required_expr(
464 like.expr.as_deref(),
465 registry,
466 "expr",
467 codec,
468 )?),
469 Box::new(parse_required_expr(
470 like.pattern.as_deref(),
471 registry,
472 "pattern",
473 codec,
474 )?),
475 parse_escape_char(&like.escape_char)?,
476 true,
477 ))),
478 ExprType::SimilarTo(like) => Ok(Expr::SimilarTo(Like::new(
479 like.negated,
480 Box::new(parse_required_expr(
481 like.expr.as_deref(),
482 registry,
483 "expr",
484 codec,
485 )?),
486 Box::new(parse_required_expr(
487 like.pattern.as_deref(),
488 registry,
489 "pattern",
490 codec,
491 )?),
492 parse_escape_char(&like.escape_char)?,
493 false,
494 ))),
495 ExprType::Case(case) => {
496 let when_then_expr = case
497 .when_then_expr
498 .iter()
499 .map(|e| {
500 let when_expr = parse_required_expr(
501 e.when_expr.as_ref(),
502 registry,
503 "when_expr",
504 codec,
505 )?;
506 let then_expr = parse_required_expr(
507 e.then_expr.as_ref(),
508 registry,
509 "then_expr",
510 codec,
511 )?;
512 Ok((Box::new(when_expr), Box::new(then_expr)))
513 })
514 .collect::<Result<Vec<(Box<Expr>, Box<Expr>)>, Error>>()?;
515 Ok(Expr::Case(Case::new(
516 parse_optional_expr(case.expr.as_deref(), registry, codec)?.map(Box::new),
517 when_then_expr,
518 parse_optional_expr(case.else_expr.as_deref(), registry, codec)?
519 .map(Box::new),
520 )))
521 }
522 ExprType::Cast(cast) => {
523 let expr = Box::new(parse_required_expr(
524 cast.expr.as_deref(),
525 registry,
526 "expr",
527 codec,
528 )?);
529 let data_type = cast.arrow_type.as_ref().required("arrow_type")?;
530 Ok(Expr::Cast(Cast::new(expr, data_type)))
531 }
532 ExprType::TryCast(cast) => {
533 let expr = Box::new(parse_required_expr(
534 cast.expr.as_deref(),
535 registry,
536 "expr",
537 codec,
538 )?);
539 let data_type = cast.arrow_type.as_ref().required("arrow_type")?;
540 Ok(Expr::TryCast(TryCast::new(expr, data_type)))
541 }
542 ExprType::Negative(negative) => Ok(Expr::Negative(Box::new(
543 parse_required_expr(negative.expr.as_deref(), registry, "expr", codec)?,
544 ))),
545 ExprType::Unnest(unnest) => {
546 let mut exprs = parse_exprs(&unnest.exprs, registry, codec)?;
547 if exprs.len() != 1 {
548 return Err(proto_error("Unnest must have exactly one expression"));
549 }
550 Ok(Expr::Unnest(Unnest::new(exprs.swap_remove(0))))
551 }
552 ExprType::InList(in_list) => Ok(Expr::InList(InList::new(
553 Box::new(parse_required_expr(
554 in_list.expr.as_deref(),
555 registry,
556 "expr",
557 codec,
558 )?),
559 parse_exprs(&in_list.list, registry, codec)?,
560 in_list.negated,
561 ))),
562 ExprType::Wildcard(protobuf::Wildcard { qualifier }) => {
563 let qualifier = qualifier.to_owned().map(|x| x.try_into()).transpose()?;
564 #[expect(deprecated)]
565 Ok(Expr::Wildcard {
566 qualifier,
567 options: Box::new(WildcardOptions::default()),
568 })
569 }
570 ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
571 fun_name,
572 args,
573 fun_definition,
574 }) => {
575 let scalar_fn = match fun_definition {
576 Some(buf) => codec.try_decode_udf(fun_name, buf)?,
577 None => registry
578 .udf(fun_name.as_str())
579 .or_else(|_| codec.try_decode_udf(fun_name, &[]))?,
580 };
581 Ok(Expr::ScalarFunction(expr::ScalarFunction::new_udf(
582 scalar_fn,
583 parse_exprs(args, registry, codec)?,
584 )))
585 }
586 ExprType::AggregateUdfExpr(pb) => {
587 let agg_fn = match &pb.fun_definition {
588 Some(buf) => codec.try_decode_udaf(&pb.fun_name, buf)?,
589 None => registry
590 .udaf(&pb.fun_name)
591 .or_else(|_| codec.try_decode_udaf(&pb.fun_name, &[]))?,
592 };
593 let null_treatment = match pb.null_treatment {
594 Some(null_treatment) => {
595 let null_treatment = protobuf::NullTreatment::try_from(null_treatment)
596 .map_err(|_| {
597 proto_error(format!(
598 "Received an AggregateUdfExprNode message with unknown NullTreatment {null_treatment}",
599 ))
600 })?;
601 Some(NullTreatment::from(null_treatment))
602 }
603 None => None,
604 };
605
606 Ok(Expr::AggregateFunction(expr::AggregateFunction::new_udf(
607 agg_fn,
608 parse_exprs(&pb.args, registry, codec)?,
609 pb.distinct,
610 parse_optional_expr(pb.filter.as_deref(), registry, codec)?.map(Box::new),
611 parse_sorts(&pb.order_by, registry, codec)?,
612 null_treatment,
613 )))
614 }
615
616 ExprType::GroupingSet(GroupingSetNode { expr }) => {
617 Ok(Expr::GroupingSet(GroupingSets(
618 expr.iter()
619 .map(|expr_list| parse_exprs(&expr_list.expr, registry, codec))
620 .collect::<Result<Vec<_>, Error>>()?,
621 )))
622 }
623 ExprType::Cube(CubeNode { expr }) => Ok(Expr::GroupingSet(GroupingSet::Cube(
624 parse_exprs(expr, registry, codec)?,
625 ))),
626 ExprType::Rollup(RollupNode { expr }) => Ok(Expr::GroupingSet(
627 GroupingSet::Rollup(parse_exprs(expr, registry, codec)?),
628 )),
629 ExprType::Placeholder(PlaceholderNode {
630 id,
631 data_type,
632 nullable,
633 metadata,
634 }) => match data_type {
635 None => Ok(Expr::Placeholder(Placeholder::new_with_field(
636 id.clone(),
637 None,
638 ))),
639 Some(data_type) => {
640 let field =
641 Field::new("", data_type.try_into()?, nullable.unwrap_or(true))
642 .with_metadata(metadata.clone());
643 Ok(Expr::Placeholder(Placeholder::new_with_field(
644 id.clone(),
645 Some(field.into()),
646 )))
647 }
648 },
649 }
650}
651
652pub fn parse_exprs<'a, I>(
654 protos: I,
655 registry: &dyn FunctionRegistry,
656 codec: &dyn LogicalExtensionCodec,
657) -> Result<Vec<Expr>, Error>
658where
659 I: IntoIterator<Item = &'a protobuf::LogicalExprNode>,
660{
661 let res = protos
662 .into_iter()
663 .map(|elem| {
664 parse_expr(elem, registry, codec).map_err(|e| plan_datafusion_err!("{}", e))
665 })
666 .collect::<Result<Vec<_>>>()?;
667 Ok(res)
668}
669
670pub fn parse_sorts<'a, I>(
671 protos: I,
672 registry: &dyn FunctionRegistry,
673 codec: &dyn LogicalExtensionCodec,
674) -> Result<Vec<Sort>, Error>
675where
676 I: IntoIterator<Item = &'a protobuf::SortExprNode>,
677{
678 protos
679 .into_iter()
680 .map(|sort| parse_sort(sort, registry, codec))
681 .collect::<Result<Vec<Sort>, Error>>()
682}
683
684pub fn parse_sort(
685 sort: &protobuf::SortExprNode,
686 registry: &dyn FunctionRegistry,
687 codec: &dyn LogicalExtensionCodec,
688) -> Result<Sort, Error> {
689 Ok(Sort::new(
690 parse_required_expr(sort.expr.as_ref(), registry, "expr", codec)?,
691 sort.asc,
692 sort.nulls_first,
693 ))
694}
695
696fn parse_escape_char(s: &str) -> Result<Option<char>> {
698 match s.len() {
699 0 => Ok(None),
700 1 => Ok(s.chars().next()),
701 _ => internal_err!("Invalid length for escape char"),
702 }
703}
704
705pub fn from_proto_binary_op(op: &str) -> Result<Operator, Error> {
706 match op {
707 "And" => Ok(Operator::And),
708 "Or" => Ok(Operator::Or),
709 "Eq" => Ok(Operator::Eq),
710 "NotEq" => Ok(Operator::NotEq),
711 "LtEq" => Ok(Operator::LtEq),
712 "Lt" => Ok(Operator::Lt),
713 "Gt" => Ok(Operator::Gt),
714 "GtEq" => Ok(Operator::GtEq),
715 "Plus" => Ok(Operator::Plus),
716 "Minus" => Ok(Operator::Minus),
717 "Multiply" => Ok(Operator::Multiply),
718 "Divide" => Ok(Operator::Divide),
719 "Modulo" => Ok(Operator::Modulo),
720 "IsDistinctFrom" => Ok(Operator::IsDistinctFrom),
721 "IsNotDistinctFrom" => Ok(Operator::IsNotDistinctFrom),
722 "BitwiseAnd" => Ok(Operator::BitwiseAnd),
723 "BitwiseOr" => Ok(Operator::BitwiseOr),
724 "BitwiseXor" => Ok(Operator::BitwiseXor),
725 "BitwiseShiftLeft" => Ok(Operator::BitwiseShiftLeft),
726 "BitwiseShiftRight" => Ok(Operator::BitwiseShiftRight),
727 "RegexIMatch" => Ok(Operator::RegexIMatch),
728 "RegexMatch" => Ok(Operator::RegexMatch),
729 "RegexNotIMatch" => Ok(Operator::RegexNotIMatch),
730 "RegexNotMatch" => Ok(Operator::RegexNotMatch),
731 "LikeMatch" => Ok(Operator::LikeMatch),
732 "ILikeMatch" => Ok(Operator::ILikeMatch),
733 "NotLikeMatch" => Ok(Operator::NotLikeMatch),
734 "NotILikeMatch" => Ok(Operator::NotILikeMatch),
735 "StringConcat" => Ok(Operator::StringConcat),
736 "AtArrow" => Ok(Operator::AtArrow),
737 "ArrowAt" => Ok(Operator::ArrowAt),
738 other => Err(proto_error(format!(
739 "Unsupported binary operator '{other:?}'"
740 ))),
741 }
742}
743
744fn parse_optional_expr(
745 p: Option<&protobuf::LogicalExprNode>,
746 registry: &dyn FunctionRegistry,
747 codec: &dyn LogicalExtensionCodec,
748) -> Result<Option<Expr>, Error> {
749 match p {
750 Some(expr) => parse_expr(expr, registry, codec).map(Some),
751 None => Ok(None),
752 }
753}
754
755fn parse_required_expr(
756 p: Option<&protobuf::LogicalExprNode>,
757 registry: &dyn FunctionRegistry,
758 field: impl Into<String>,
759 codec: &dyn LogicalExtensionCodec,
760) -> Result<Expr, Error> {
761 match p {
762 Some(expr) => parse_expr(expr, registry, codec),
763 None => Err(Error::required(field)),
764 }
765}
766
767fn proto_error<S: Into<String>>(message: S) -> Error {
768 Error::General(message.into())
769}