1use std::sync::Arc;
19
20use arrow::datatypes::{DataType, Field};
21use datafusion_common::datatype::DataTypeExt;
22use datafusion_common::{
23 NullEquality, RecursionUnnestOption, Result, ScalarValue, TableReference,
24 UnnestOptions, exec_datafusion_err, internal_err, plan_datafusion_err,
25};
26use datafusion_execution::TaskContext;
27use datafusion_execution::registry::FunctionRegistry;
28use datafusion_expr::dml::InsertOp;
29use datafusion_expr::expr::{Alias, NullTreatment, Placeholder, Sort};
30use datafusion_expr::expr::{Unnest, WildcardOptions};
31use datafusion_expr::logical_plan::Subquery;
32use datafusion_expr::{
33 Between, BinaryExpr, Case, Cast, Expr, GroupingSet,
34 GroupingSet::GroupingSets,
35 JoinConstraint, JoinType, Like, Operator, TryCast, WindowFrame, WindowFrameBound,
36 WindowFrameUnits,
37 expr::{self, InList, WindowFunction},
38 logical_plan::{PlanType, StringifiedPlan},
39};
40use datafusion_expr::{ExprFunctionExt, WriteOp};
41use datafusion_proto_common::{FromProtoError as Error, from_proto::FromOptionalField};
42
43use crate::protobuf::plan_type::PlanTypeEnum::{
44 FinalPhysicalPlanWithSchema, InitialPhysicalPlanWithSchema,
45};
46use crate::protobuf::{
47 self, AnalyzedLogicalPlanType, CubeNode, GroupingSetNode, OptimizedLogicalPlanType,
48 OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
49 plan_type::PlanTypeEnum::{
50 AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
51 FinalPhysicalPlan, FinalPhysicalPlanWithStats, InitialLogicalPlan,
52 InitialPhysicalPlan, InitialPhysicalPlanWithStats, OptimizedLogicalPlan,
53 OptimizedPhysicalPlan, PhysicalPlanError,
54 },
55};
56
57use super::{AsLogicalPlan, LogicalExtensionCodec};
58
59impl From<&protobuf::UnnestOptions> for UnnestOptions {
60 fn from(opts: &protobuf::UnnestOptions) -> Self {
61 Self {
62 preserve_nulls: opts.preserve_nulls,
63 recursions: opts
64 .recursions
65 .iter()
66 .map(|r| RecursionUnnestOption {
67 input_column: r.input_column.as_ref().unwrap().into(),
68 output_column: r.output_column.as_ref().unwrap().into(),
69 depth: r.depth as usize,
70 })
71 .collect::<Vec<_>>(),
72 }
73 }
74}
75
76impl From<protobuf::WindowFrameUnits> for WindowFrameUnits {
77 fn from(units: protobuf::WindowFrameUnits) -> Self {
78 match units {
79 protobuf::WindowFrameUnits::Rows => Self::Rows,
80 protobuf::WindowFrameUnits::Range => Self::Range,
81 protobuf::WindowFrameUnits::Groups => Self::Groups,
82 }
83 }
84}
85
86impl TryFrom<protobuf::TableReference> for TableReference {
87 type Error = Error;
88
89 fn try_from(value: protobuf::TableReference) -> Result<Self, Self::Error> {
90 use protobuf::table_reference::TableReferenceEnum;
91 let table_reference_enum = value
92 .table_reference_enum
93 .ok_or_else(|| Error::required("table_reference_enum"))?;
94
95 match table_reference_enum {
96 TableReferenceEnum::Bare(protobuf::BareTableReference { table }) => {
97 Ok(TableReference::bare(table))
98 }
99 TableReferenceEnum::Partial(protobuf::PartialTableReference {
100 schema,
101 table,
102 }) => Ok(TableReference::partial(schema, table)),
103 TableReferenceEnum::Full(protobuf::FullTableReference {
104 catalog,
105 schema,
106 table,
107 }) => Ok(TableReference::full(catalog, schema, table)),
108 }
109 }
110}
111
112impl From<&protobuf::StringifiedPlan> for StringifiedPlan {
113 fn from(stringified_plan: &protobuf::StringifiedPlan) -> Self {
114 Self {
115 plan_type: match stringified_plan
116 .plan_type
117 .as_ref()
118 .and_then(|pt| pt.plan_type_enum.as_ref())
119 .unwrap_or_else(|| {
120 panic!(
121 "Cannot create protobuf::StringifiedPlan from {stringified_plan:?}"
122 )
123 }) {
124 InitialLogicalPlan(_) => PlanType::InitialLogicalPlan,
125 AnalyzedLogicalPlan(AnalyzedLogicalPlanType { analyzer_name }) => {
126 PlanType::AnalyzedLogicalPlan {
127 analyzer_name:analyzer_name.clone()
128 }
129 }
130 FinalAnalyzedLogicalPlan(_) => PlanType::FinalAnalyzedLogicalPlan,
131 OptimizedLogicalPlan(OptimizedLogicalPlanType { optimizer_name }) => {
132 PlanType::OptimizedLogicalPlan {
133 optimizer_name: optimizer_name.clone(),
134 }
135 }
136 FinalLogicalPlan(_) => PlanType::FinalLogicalPlan,
137 InitialPhysicalPlan(_) => PlanType::InitialPhysicalPlan,
138 InitialPhysicalPlanWithStats(_) => PlanType::InitialPhysicalPlanWithStats,
139 InitialPhysicalPlanWithSchema(_) => PlanType::InitialPhysicalPlanWithSchema,
140 OptimizedPhysicalPlan(OptimizedPhysicalPlanType { optimizer_name }) => {
141 PlanType::OptimizedPhysicalPlan {
142 optimizer_name: optimizer_name.clone(),
143 }
144 }
145 FinalPhysicalPlan(_) => PlanType::FinalPhysicalPlan,
146 FinalPhysicalPlanWithStats(_) => PlanType::FinalPhysicalPlanWithStats,
147 FinalPhysicalPlanWithSchema(_) => PlanType::FinalPhysicalPlanWithSchema,
148 PhysicalPlanError(_) => PlanType::PhysicalPlanError,
149 },
150 plan: Arc::new(stringified_plan.plan.clone()),
151 }
152 }
153}
154
155impl TryFrom<protobuf::WindowFrame> for WindowFrame {
156 type Error = Error;
157
158 fn try_from(window: protobuf::WindowFrame) -> Result<Self, Self::Error> {
159 let units = protobuf::WindowFrameUnits::try_from(window.window_frame_units)
160 .map_err(|_| Error::unknown("WindowFrameUnits", window.window_frame_units))?
161 .into();
162 let start_bound = window.start_bound.required("start_bound")?;
163 let end_bound = window
164 .end_bound
165 .map(|end_bound| match end_bound {
166 protobuf::window_frame::EndBound::Bound(end_bound) => {
167 end_bound.try_into()
168 }
169 })
170 .transpose()?
171 .unwrap_or(WindowFrameBound::CurrentRow);
172 Ok(WindowFrame::new_bounds(units, start_bound, end_bound))
173 }
174}
175
176impl TryFrom<protobuf::WindowFrameBound> for WindowFrameBound {
177 type Error = Error;
178
179 fn try_from(bound: protobuf::WindowFrameBound) -> Result<Self, Self::Error> {
180 let bound_type =
181 protobuf::WindowFrameBoundType::try_from(bound.window_frame_bound_type)
182 .map_err(|_| {
183 Error::unknown("WindowFrameBoundType", bound.window_frame_bound_type)
184 })?;
185 match bound_type {
186 protobuf::WindowFrameBoundType::CurrentRow => Ok(Self::CurrentRow),
187 protobuf::WindowFrameBoundType::Preceding => match bound.bound_value {
188 Some(x) => Ok(Self::Preceding(ScalarValue::try_from(&x)?)),
189 None => Ok(Self::Preceding(ScalarValue::UInt64(None))),
190 },
191 protobuf::WindowFrameBoundType::Following => match bound.bound_value {
192 Some(x) => Ok(Self::Following(ScalarValue::try_from(&x)?)),
193 None => Ok(Self::Following(ScalarValue::UInt64(None))),
194 },
195 }
196 }
197}
198
199impl From<protobuf::JoinType> for JoinType {
200 fn from(t: protobuf::JoinType) -> Self {
201 match t {
202 protobuf::JoinType::Inner => JoinType::Inner,
203 protobuf::JoinType::Left => JoinType::Left,
204 protobuf::JoinType::Right => JoinType::Right,
205 protobuf::JoinType::Full => JoinType::Full,
206 protobuf::JoinType::Leftsemi => JoinType::LeftSemi,
207 protobuf::JoinType::Rightsemi => JoinType::RightSemi,
208 protobuf::JoinType::Leftanti => JoinType::LeftAnti,
209 protobuf::JoinType::Rightanti => JoinType::RightAnti,
210 protobuf::JoinType::Leftmark => JoinType::LeftMark,
211 protobuf::JoinType::Rightmark => JoinType::RightMark,
212 }
213 }
214}
215
216impl From<protobuf::JoinConstraint> for JoinConstraint {
217 fn from(t: protobuf::JoinConstraint) -> Self {
218 match t {
219 protobuf::JoinConstraint::On => JoinConstraint::On,
220 protobuf::JoinConstraint::Using => JoinConstraint::Using,
221 }
222 }
223}
224
225impl From<protobuf::NullEquality> for NullEquality {
226 fn from(t: protobuf::NullEquality) -> Self {
227 match t {
228 protobuf::NullEquality::NullEqualsNothing => NullEquality::NullEqualsNothing,
229 protobuf::NullEquality::NullEqualsNull => NullEquality::NullEqualsNull,
230 }
231 }
232}
233
234impl From<protobuf::dml_node::Type> for WriteOp {
235 fn from(t: protobuf::dml_node::Type) -> Self {
236 match t {
237 protobuf::dml_node::Type::Update => WriteOp::Update,
238 protobuf::dml_node::Type::Delete => WriteOp::Delete,
239 protobuf::dml_node::Type::InsertAppend => WriteOp::Insert(InsertOp::Append),
240 protobuf::dml_node::Type::InsertOverwrite => {
241 WriteOp::Insert(InsertOp::Overwrite)
242 }
243 protobuf::dml_node::Type::InsertReplace => WriteOp::Insert(InsertOp::Replace),
244 protobuf::dml_node::Type::Ctas => WriteOp::Ctas,
245 protobuf::dml_node::Type::Truncate => WriteOp::Truncate,
246 }
247 }
248}
249
250impl From<protobuf::NullTreatment> for NullTreatment {
251 fn from(t: protobuf::NullTreatment) -> Self {
252 match t {
253 protobuf::NullTreatment::RespectNulls => NullTreatment::RespectNulls,
254 protobuf::NullTreatment::IgnoreNulls => NullTreatment::IgnoreNulls,
255 }
256 }
257}
258
259pub fn parse_expr(
260 proto: &protobuf::LogicalExprNode,
261 ctx: &TaskContext,
262 codec: &dyn LogicalExtensionCodec,
263) -> Result<Expr, Error> {
264 use protobuf::{logical_expr_node::ExprType, window_expr_node};
265
266 let expr_type = proto
267 .expr_type
268 .as_ref()
269 .ok_or_else(|| Error::required("expr_type"))?;
270
271 match expr_type {
272 ExprType::BinaryExpr(binary_expr) => {
273 let op = from_proto_binary_op(&binary_expr.op)?;
274 let operands = parse_exprs(&binary_expr.operands, ctx, codec)?;
275
276 if operands.len() < 2 {
277 return Err(proto_error(
278 "A binary expression must always have at least 2 operands",
279 ));
280 }
281
282 Ok(operands
285 .into_iter()
286 .reduce(|left, right| {
287 Expr::BinaryExpr(BinaryExpr::new(Box::new(left), op, Box::new(right)))
288 })
289 .expect("Binary expression could not be reduced to a single expression."))
290 }
291 ExprType::Column(column) => Ok(Expr::Column(column.into())),
292 ExprType::Literal(literal) => {
293 let scalar_value: ScalarValue = literal.try_into()?;
294 Ok(Expr::Literal(scalar_value, None))
295 }
296 ExprType::WindowExpr(expr) => {
297 let window_function = expr
298 .window_function
299 .as_ref()
300 .ok_or_else(|| Error::required("window_function"))?;
301 let partition_by = parse_exprs(&expr.partition_by, ctx, codec)?;
302 let mut order_by = parse_sorts(&expr.order_by, ctx, codec)?;
303 let window_frame = expr
304 .window_frame
305 .as_ref()
306 .map::<Result<WindowFrame, _>, _>(|window_frame| {
307 let window_frame: WindowFrame = window_frame.clone().try_into()?;
308 window_frame
309 .regularize_order_bys(&mut order_by)
310 .map(|_| window_frame)
311 })
312 .transpose()?
313 .ok_or_else(|| {
314 exec_datafusion_err!("missing window frame during deserialization")
315 })?;
316
317 let null_treatment = match expr.null_treatment {
318 Some(null_treatment) => {
319 let null_treatment = protobuf::NullTreatment::try_from(null_treatment)
320 .map_err(|_| {
321 proto_error(format!(
322 "Received a WindowExprNode message with unknown NullTreatment {null_treatment}",
323 ))
324 })?;
325 Some(NullTreatment::from(null_treatment))
326 }
327 None => None,
328 };
329
330 let agg_fn = match window_function {
331 window_expr_node::WindowFunction::Udaf(udaf_name) => {
332 let udaf_function = match &expr.fun_definition {
333 Some(buf) => codec.try_decode_udaf(udaf_name, buf)?,
334 None => ctx
335 .udaf(udaf_name)
336 .or_else(|_| codec.try_decode_udaf(udaf_name, &[]))?,
337 };
338 expr::WindowFunctionDefinition::AggregateUDF(udaf_function)
339 }
340 window_expr_node::WindowFunction::Udwf(udwf_name) => {
341 let udwf_function = match &expr.fun_definition {
342 Some(buf) => codec.try_decode_udwf(udwf_name, buf)?,
343 None => ctx
344 .udwf(udwf_name)
345 .or_else(|_| codec.try_decode_udwf(udwf_name, &[]))?,
346 };
347 expr::WindowFunctionDefinition::WindowUDF(udwf_function)
348 }
349 };
350
351 let args = parse_exprs(&expr.exprs, ctx, codec)?;
352 let mut builder = Expr::from(WindowFunction::new(agg_fn, args))
353 .partition_by(partition_by)
354 .order_by(order_by)
355 .window_frame(window_frame)
356 .null_treatment(null_treatment);
357
358 if expr.distinct {
359 builder = builder.distinct();
360 };
361
362 if let Some(filter) = parse_optional_expr(expr.filter.as_deref(), ctx, codec)?
363 {
364 builder = builder.filter(filter);
365 }
366
367 builder.build().map_err(Error::DataFusionError)
368 }
369 ExprType::Alias(alias) => Ok(Expr::Alias(Alias::new(
370 parse_required_expr(alias.expr.as_deref(), ctx, "expr", codec)?,
371 alias
372 .relation
373 .first()
374 .map(|r| TableReference::try_from(r.clone()))
375 .transpose()?,
376 alias.alias.clone(),
377 ))),
378 ExprType::IsNullExpr(is_null) => Ok(Expr::IsNull(Box::new(parse_required_expr(
379 is_null.expr.as_deref(),
380 ctx,
381 "expr",
382 codec,
383 )?))),
384 ExprType::IsNotNullExpr(is_not_null) => Ok(Expr::IsNotNull(Box::new(
385 parse_required_expr(is_not_null.expr.as_deref(), ctx, "expr", codec)?,
386 ))),
387 ExprType::NotExpr(not) => Ok(Expr::Not(Box::new(parse_required_expr(
388 not.expr.as_deref(),
389 ctx,
390 "expr",
391 codec,
392 )?))),
393 ExprType::IsTrue(msg) => Ok(Expr::IsTrue(Box::new(parse_required_expr(
394 msg.expr.as_deref(),
395 ctx,
396 "expr",
397 codec,
398 )?))),
399 ExprType::IsFalse(msg) => Ok(Expr::IsFalse(Box::new(parse_required_expr(
400 msg.expr.as_deref(),
401 ctx,
402 "expr",
403 codec,
404 )?))),
405 ExprType::IsUnknown(msg) => Ok(Expr::IsUnknown(Box::new(parse_required_expr(
406 msg.expr.as_deref(),
407 ctx,
408 "expr",
409 codec,
410 )?))),
411 ExprType::IsNotTrue(msg) => Ok(Expr::IsNotTrue(Box::new(parse_required_expr(
412 msg.expr.as_deref(),
413 ctx,
414 "expr",
415 codec,
416 )?))),
417 ExprType::IsNotFalse(msg) => Ok(Expr::IsNotFalse(Box::new(parse_required_expr(
418 msg.expr.as_deref(),
419 ctx,
420 "expr",
421 codec,
422 )?))),
423 ExprType::IsNotUnknown(msg) => Ok(Expr::IsNotUnknown(Box::new(
424 parse_required_expr(msg.expr.as_deref(), ctx, "expr", codec)?,
425 ))),
426 ExprType::Between(between) => Ok(Expr::Between(Between::new(
427 Box::new(parse_required_expr(
428 between.expr.as_deref(),
429 ctx,
430 "expr",
431 codec,
432 )?),
433 between.negated,
434 Box::new(parse_required_expr(
435 between.low.as_deref(),
436 ctx,
437 "expr",
438 codec,
439 )?),
440 Box::new(parse_required_expr(
441 between.high.as_deref(),
442 ctx,
443 "expr",
444 codec,
445 )?),
446 ))),
447 ExprType::Like(like) => Ok(Expr::Like(Like::new(
448 like.negated,
449 Box::new(parse_required_expr(
450 like.expr.as_deref(),
451 ctx,
452 "expr",
453 codec,
454 )?),
455 Box::new(parse_required_expr(
456 like.pattern.as_deref(),
457 ctx,
458 "pattern",
459 codec,
460 )?),
461 parse_escape_char(&like.escape_char)?,
462 false,
463 ))),
464 ExprType::Ilike(like) => Ok(Expr::Like(Like::new(
465 like.negated,
466 Box::new(parse_required_expr(
467 like.expr.as_deref(),
468 ctx,
469 "expr",
470 codec,
471 )?),
472 Box::new(parse_required_expr(
473 like.pattern.as_deref(),
474 ctx,
475 "pattern",
476 codec,
477 )?),
478 parse_escape_char(&like.escape_char)?,
479 true,
480 ))),
481 ExprType::SimilarTo(like) => Ok(Expr::SimilarTo(Like::new(
482 like.negated,
483 Box::new(parse_required_expr(
484 like.expr.as_deref(),
485 ctx,
486 "expr",
487 codec,
488 )?),
489 Box::new(parse_required_expr(
490 like.pattern.as_deref(),
491 ctx,
492 "pattern",
493 codec,
494 )?),
495 parse_escape_char(&like.escape_char)?,
496 false,
497 ))),
498 ExprType::Case(case) => {
499 let when_then_expr = case
500 .when_then_expr
501 .iter()
502 .map(|e| {
503 let when_expr = parse_required_expr(
504 e.when_expr.as_ref(),
505 ctx,
506 "when_expr",
507 codec,
508 )?;
509 let then_expr = parse_required_expr(
510 e.then_expr.as_ref(),
511 ctx,
512 "then_expr",
513 codec,
514 )?;
515 Ok((Box::new(when_expr), Box::new(then_expr)))
516 })
517 .collect::<Result<Vec<(Box<Expr>, Box<Expr>)>, Error>>()?;
518 Ok(Expr::Case(Case::new(
519 parse_optional_expr(case.expr.as_deref(), ctx, codec)?.map(Box::new),
520 when_then_expr,
521 parse_optional_expr(case.else_expr.as_deref(), ctx, codec)?.map(Box::new),
522 )))
523 }
524 ExprType::Cast(cast) => {
525 let expr = Box::new(parse_required_expr(
526 cast.expr.as_deref(),
527 ctx,
528 "expr",
529 codec,
530 )?);
531 let data_type: DataType = cast.arrow_type.as_ref().required("arrow_type")?;
532 let field = data_type
533 .into_nullable_field()
534 .with_nullable(cast.nullable.unwrap_or(true));
535 Ok(Expr::Cast(Cast::new_from_field(expr, Arc::new(field))))
536 }
537 ExprType::TryCast(cast) => {
538 let expr = Box::new(parse_required_expr(
539 cast.expr.as_deref(),
540 ctx,
541 "expr",
542 codec,
543 )?);
544 let data_type: DataType = cast.arrow_type.as_ref().required("arrow_type")?;
545 let field = data_type
546 .into_nullable_field()
547 .with_nullable(cast.nullable.unwrap_or(true));
548 Ok(Expr::TryCast(TryCast::new_from_field(
549 expr,
550 Arc::new(field),
551 )))
552 }
553 ExprType::Negative(negative) => Ok(Expr::Negative(Box::new(
554 parse_required_expr(negative.expr.as_deref(), ctx, "expr", codec)?,
555 ))),
556 ExprType::Unnest(unnest) => {
557 let mut exprs = parse_exprs(&unnest.exprs, ctx, codec)?;
558 if exprs.len() != 1 {
559 return Err(proto_error("Unnest must have exactly one expression"));
560 }
561 Ok(Expr::Unnest(Unnest::new(exprs.swap_remove(0))))
562 }
563 ExprType::InList(in_list) => Ok(Expr::InList(InList::new(
564 Box::new(parse_required_expr(
565 in_list.expr.as_deref(),
566 ctx,
567 "expr",
568 codec,
569 )?),
570 parse_exprs(&in_list.list, ctx, codec)?,
571 in_list.negated,
572 ))),
573 ExprType::Wildcard(protobuf::Wildcard { qualifier }) => {
574 let qualifier = qualifier.to_owned().map(|x| x.try_into()).transpose()?;
575 #[expect(deprecated)]
576 Ok(Expr::Wildcard {
577 qualifier,
578 options: Box::new(WildcardOptions::default()),
579 })
580 }
581 ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
582 fun_name,
583 args,
584 fun_definition,
585 }) => {
586 let scalar_fn = match fun_definition {
587 Some(buf) => codec.try_decode_udf(fun_name, buf)?,
588 None => ctx
589 .udf(fun_name.as_str())
590 .or_else(|_| codec.try_decode_udf(fun_name, &[]))?,
591 };
592 Ok(Expr::ScalarFunction(expr::ScalarFunction::new_udf(
593 scalar_fn,
594 parse_exprs(args, ctx, codec)?,
595 )))
596 }
597 ExprType::AggregateUdfExpr(pb) => {
598 let agg_fn = match &pb.fun_definition {
599 Some(buf) => codec.try_decode_udaf(&pb.fun_name, buf)?,
600 None => ctx
601 .udaf(&pb.fun_name)
602 .or_else(|_| codec.try_decode_udaf(&pb.fun_name, &[]))?,
603 };
604 let null_treatment = match pb.null_treatment {
605 Some(null_treatment) => {
606 let null_treatment = protobuf::NullTreatment::try_from(null_treatment)
607 .map_err(|_| {
608 proto_error(format!(
609 "Received an AggregateUdfExprNode message with unknown NullTreatment {null_treatment}",
610 ))
611 })?;
612 Some(NullTreatment::from(null_treatment))
613 }
614 None => None,
615 };
616
617 Ok(Expr::AggregateFunction(expr::AggregateFunction::new_udf(
618 agg_fn,
619 parse_exprs(&pb.args, ctx, codec)?,
620 pb.distinct,
621 parse_optional_expr(pb.filter.as_deref(), ctx, codec)?.map(Box::new),
622 parse_sorts(&pb.order_by, ctx, codec)?,
623 null_treatment,
624 )))
625 }
626
627 ExprType::GroupingSet(GroupingSetNode { expr }) => {
628 Ok(Expr::GroupingSet(GroupingSets(
629 expr.iter()
630 .map(|expr_list| parse_exprs(&expr_list.expr, ctx, codec))
631 .collect::<Result<Vec<_>, Error>>()?,
632 )))
633 }
634 ExprType::Cube(CubeNode { expr }) => Ok(Expr::GroupingSet(GroupingSet::Cube(
635 parse_exprs(expr, ctx, codec)?,
636 ))),
637 ExprType::Rollup(RollupNode { expr }) => Ok(Expr::GroupingSet(
638 GroupingSet::Rollup(parse_exprs(expr, ctx, codec)?),
639 )),
640 ExprType::Placeholder(PlaceholderNode {
641 id,
642 data_type,
643 nullable,
644 metadata,
645 }) => match data_type {
646 None => Ok(Expr::Placeholder(Placeholder::new_with_field(
647 id.clone(),
648 None,
649 ))),
650 Some(data_type) => {
651 let field =
652 Field::new("", data_type.try_into()?, nullable.unwrap_or(true))
653 .with_metadata(metadata.clone());
654 Ok(Expr::Placeholder(Placeholder::new_with_field(
655 id.clone(),
656 Some(field.into()),
657 )))
658 }
659 },
660 ExprType::ScalarSubqueryExpr(sq) => {
661 let subquery = parse_subquery(
662 sq.subquery
663 .as_deref()
664 .ok_or_else(|| Error::required("ScalarSubqueryExprNode.subquery"))?,
665 ctx,
666 codec,
667 )?;
668 Ok(Expr::ScalarSubquery(subquery))
669 }
670 }
671}
672
673fn parse_subquery(
674 proto: &protobuf::SubqueryNode,
675 ctx: &TaskContext,
676 codec: &dyn LogicalExtensionCodec,
677) -> Result<Subquery, Error> {
678 let plan_node = proto
679 .subquery
680 .as_ref()
681 .ok_or_else(|| Error::required("SubqueryNode.subquery"))?;
682 let plan = plan_node.try_into_logical_plan(ctx, codec)?;
683 let outer_ref_columns = parse_exprs(&proto.outer_ref_columns, ctx, codec)?;
684 Ok(Subquery {
685 subquery: Arc::new(plan),
686 outer_ref_columns,
687 spans: Default::default(),
688 })
689}
690
691pub fn parse_exprs<'a, I>(
693 protos: I,
694 ctx: &TaskContext,
695 codec: &dyn LogicalExtensionCodec,
696) -> Result<Vec<Expr>, Error>
697where
698 I: IntoIterator<Item = &'a protobuf::LogicalExprNode>,
699{
700 let res = protos
701 .into_iter()
702 .map(|elem| {
703 parse_expr(elem, ctx, codec).map_err(|e| plan_datafusion_err!("{}", e))
704 })
705 .collect::<Result<Vec<_>>>()?;
706 Ok(res)
707}
708
709pub fn parse_sorts<'a, I>(
710 protos: I,
711 ctx: &TaskContext,
712 codec: &dyn LogicalExtensionCodec,
713) -> Result<Vec<Sort>, Error>
714where
715 I: IntoIterator<Item = &'a protobuf::SortExprNode>,
716{
717 protos
718 .into_iter()
719 .map(|sort| parse_sort(sort, ctx, codec))
720 .collect::<Result<Vec<Sort>, Error>>()
721}
722
723pub fn parse_sort(
724 sort: &protobuf::SortExprNode,
725 ctx: &TaskContext,
726 codec: &dyn LogicalExtensionCodec,
727) -> Result<Sort, Error> {
728 Ok(Sort::new(
729 parse_required_expr(sort.expr.as_ref(), ctx, "expr", codec)?,
730 sort.asc,
731 sort.nulls_first,
732 ))
733}
734
735fn parse_escape_char(s: &str) -> Result<Option<char>> {
737 match s.len() {
738 0 => Ok(None),
739 1 => Ok(s.chars().next()),
740 _ => internal_err!("Invalid length for escape char"),
741 }
742}
743
744pub fn from_proto_binary_op(op: &str) -> Result<Operator, Error> {
745 match op {
746 "And" => Ok(Operator::And),
747 "Or" => Ok(Operator::Or),
748 "Eq" => Ok(Operator::Eq),
749 "NotEq" => Ok(Operator::NotEq),
750 "LtEq" => Ok(Operator::LtEq),
751 "Lt" => Ok(Operator::Lt),
752 "Gt" => Ok(Operator::Gt),
753 "GtEq" => Ok(Operator::GtEq),
754 "Plus" => Ok(Operator::Plus),
755 "Minus" => Ok(Operator::Minus),
756 "Multiply" => Ok(Operator::Multiply),
757 "Divide" => Ok(Operator::Divide),
758 "Modulo" => Ok(Operator::Modulo),
759 "IsDistinctFrom" => Ok(Operator::IsDistinctFrom),
760 "IsNotDistinctFrom" => Ok(Operator::IsNotDistinctFrom),
761 "BitwiseAnd" => Ok(Operator::BitwiseAnd),
762 "BitwiseOr" => Ok(Operator::BitwiseOr),
763 "BitwiseXor" => Ok(Operator::BitwiseXor),
764 "BitwiseShiftLeft" => Ok(Operator::BitwiseShiftLeft),
765 "BitwiseShiftRight" => Ok(Operator::BitwiseShiftRight),
766 "RegexIMatch" => Ok(Operator::RegexIMatch),
767 "RegexMatch" => Ok(Operator::RegexMatch),
768 "RegexNotIMatch" => Ok(Operator::RegexNotIMatch),
769 "RegexNotMatch" => Ok(Operator::RegexNotMatch),
770 "LikeMatch" => Ok(Operator::LikeMatch),
771 "ILikeMatch" => Ok(Operator::ILikeMatch),
772 "NotLikeMatch" => Ok(Operator::NotLikeMatch),
773 "NotILikeMatch" => Ok(Operator::NotILikeMatch),
774 "StringConcat" => Ok(Operator::StringConcat),
775 "AtArrow" => Ok(Operator::AtArrow),
776 "ArrowAt" => Ok(Operator::ArrowAt),
777 other => Err(proto_error(format!(
778 "Unsupported binary operator '{other:?}'"
779 ))),
780 }
781}
782
783fn parse_optional_expr(
784 p: Option<&protobuf::LogicalExprNode>,
785 ctx: &TaskContext,
786 codec: &dyn LogicalExtensionCodec,
787) -> Result<Option<Expr>, Error> {
788 match p {
789 Some(expr) => parse_expr(expr, ctx, codec).map(Some),
790 None => Ok(None),
791 }
792}
793
794fn parse_required_expr(
795 p: Option<&protobuf::LogicalExprNode>,
796 ctx: &TaskContext,
797 field: impl Into<String>,
798 codec: &dyn LogicalExtensionCodec,
799) -> Result<Expr, Error> {
800 match p {
801 Some(expr) => parse_expr(expr, ctx, codec),
802 None => Err(Error::required(field)),
803 }
804}
805
806fn proto_error<S: Into<String>>(message: S) -> Error {
807 Error::General(message.into())
808}