1use std::collections::HashMap;
23
24use datafusion_common::{NullEquality, TableReference, UnnestOptions};
25use datafusion_expr::WriteOp;
26use datafusion_expr::dml::InsertOp;
27use datafusion_expr::expr::{
28 self, AggregateFunctionParams, Alias, Between, BinaryExpr, Cast, GroupingSet, InList,
29 Like, NullTreatment, Placeholder, ScalarFunction, Unnest,
30};
31use datafusion_expr::logical_plan::Subquery;
32use datafusion_expr::{
33 Expr, JoinConstraint, JoinType, SortExpr, TryCast, WindowFrame, WindowFrameBound,
34 WindowFrameUnits, WindowFunctionDefinition, logical_plan::PlanType,
35 logical_plan::StringifiedPlan,
36};
37
38use crate::protobuf::RecursionUnnestOption;
39use crate::protobuf::{
40 self, AnalyzedLogicalPlanType, CubeNode, EmptyMessage, GroupingSetNode,
41 LogicalExprList, OptimizedLogicalPlanType, OptimizedPhysicalPlanType,
42 PlaceholderNode, RollupNode, ToProtoError as Error,
43 plan_type::PlanTypeEnum::{
44 AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
45 FinalPhysicalPlan, FinalPhysicalPlanWithSchema, FinalPhysicalPlanWithStats,
46 InitialLogicalPlan, InitialPhysicalPlan, InitialPhysicalPlanWithSchema,
47 InitialPhysicalPlanWithStats, OptimizedLogicalPlan, OptimizedPhysicalPlan,
48 PhysicalPlanError,
49 },
50};
51
52use super::{AsLogicalPlan, LogicalExtensionCodec};
53use crate::protobuf::LogicalPlanNode;
54
55impl From<&UnnestOptions> for protobuf::UnnestOptions {
56 fn from(opts: &UnnestOptions) -> Self {
57 Self {
58 preserve_nulls: opts.preserve_nulls,
59 recursions: opts
60 .recursions
61 .iter()
62 .map(|r| RecursionUnnestOption {
63 input_column: Some((&r.input_column).into()),
64 output_column: Some((&r.output_column).into()),
65 depth: r.depth as u32,
66 })
67 .collect(),
68 }
69 }
70}
71
72impl From<&StringifiedPlan> for protobuf::StringifiedPlan {
73 fn from(stringified_plan: &StringifiedPlan) -> Self {
74 Self {
75 plan_type: match stringified_plan.clone().plan_type {
76 PlanType::InitialLogicalPlan => Some(protobuf::PlanType {
77 plan_type_enum: Some(InitialLogicalPlan(EmptyMessage {})),
78 }),
79 PlanType::AnalyzedLogicalPlan { analyzer_name } => {
80 Some(protobuf::PlanType {
81 plan_type_enum: Some(AnalyzedLogicalPlan(
82 AnalyzedLogicalPlanType { analyzer_name },
83 )),
84 })
85 }
86 PlanType::FinalAnalyzedLogicalPlan => Some(protobuf::PlanType {
87 plan_type_enum: Some(FinalAnalyzedLogicalPlan(EmptyMessage {})),
88 }),
89 PlanType::OptimizedLogicalPlan { optimizer_name } => {
90 Some(protobuf::PlanType {
91 plan_type_enum: Some(OptimizedLogicalPlan(
92 OptimizedLogicalPlanType { optimizer_name },
93 )),
94 })
95 }
96 PlanType::FinalLogicalPlan => Some(protobuf::PlanType {
97 plan_type_enum: Some(FinalLogicalPlan(EmptyMessage {})),
98 }),
99 PlanType::InitialPhysicalPlan => Some(protobuf::PlanType {
100 plan_type_enum: Some(InitialPhysicalPlan(EmptyMessage {})),
101 }),
102 PlanType::OptimizedPhysicalPlan { optimizer_name } => {
103 Some(protobuf::PlanType {
104 plan_type_enum: Some(OptimizedPhysicalPlan(
105 OptimizedPhysicalPlanType { optimizer_name },
106 )),
107 })
108 }
109 PlanType::FinalPhysicalPlan => Some(protobuf::PlanType {
110 plan_type_enum: Some(FinalPhysicalPlan(EmptyMessage {})),
111 }),
112 PlanType::InitialPhysicalPlanWithStats => Some(protobuf::PlanType {
113 plan_type_enum: Some(InitialPhysicalPlanWithStats(EmptyMessage {})),
114 }),
115 PlanType::InitialPhysicalPlanWithSchema => Some(protobuf::PlanType {
116 plan_type_enum: Some(InitialPhysicalPlanWithSchema(EmptyMessage {})),
117 }),
118 PlanType::FinalPhysicalPlanWithStats => Some(protobuf::PlanType {
119 plan_type_enum: Some(FinalPhysicalPlanWithStats(EmptyMessage {})),
120 }),
121 PlanType::FinalPhysicalPlanWithSchema => Some(protobuf::PlanType {
122 plan_type_enum: Some(FinalPhysicalPlanWithSchema(EmptyMessage {})),
123 }),
124 PlanType::PhysicalPlanError => Some(protobuf::PlanType {
125 plan_type_enum: Some(PhysicalPlanError(EmptyMessage {})),
126 }),
127 },
128 plan: stringified_plan.plan.to_string(),
129 }
130 }
131}
132
133impl From<WindowFrameUnits> for protobuf::WindowFrameUnits {
134 fn from(units: WindowFrameUnits) -> Self {
135 match units {
136 WindowFrameUnits::Rows => Self::Rows,
137 WindowFrameUnits::Range => Self::Range,
138 WindowFrameUnits::Groups => Self::Groups,
139 }
140 }
141}
142
143impl TryFrom<&WindowFrameBound> for protobuf::WindowFrameBound {
144 type Error = Error;
145
146 fn try_from(bound: &WindowFrameBound) -> Result<Self, Self::Error> {
147 Ok(match bound {
148 WindowFrameBound::CurrentRow => Self {
149 window_frame_bound_type: protobuf::WindowFrameBoundType::CurrentRow
150 .into(),
151 bound_value: None,
152 },
153 WindowFrameBound::Preceding(v) => Self {
154 window_frame_bound_type: protobuf::WindowFrameBoundType::Preceding.into(),
155 bound_value: Some(v.try_into()?),
156 },
157 WindowFrameBound::Following(v) => Self {
158 window_frame_bound_type: protobuf::WindowFrameBoundType::Following.into(),
159 bound_value: Some(v.try_into()?),
160 },
161 })
162 }
163}
164
165impl TryFrom<&WindowFrame> for protobuf::WindowFrame {
166 type Error = Error;
167
168 fn try_from(window: &WindowFrame) -> Result<Self, Self::Error> {
169 Ok(Self {
170 window_frame_units: protobuf::WindowFrameUnits::from(window.units).into(),
171 start_bound: Some((&window.start_bound).try_into()?),
172 end_bound: Some(protobuf::window_frame::EndBound::Bound(
173 (&window.end_bound).try_into()?,
174 )),
175 })
176 }
177}
178
179pub fn serialize_exprs<'a, I>(
180 exprs: I,
181 codec: &dyn LogicalExtensionCodec,
182) -> Result<Vec<protobuf::LogicalExprNode>, Error>
183where
184 I: IntoIterator<Item = &'a Expr>,
185{
186 exprs
187 .into_iter()
188 .map(|expr| serialize_expr(expr, codec))
189 .collect::<Result<Vec<_>, Error>>()
190}
191
192pub fn serialize_expr(
193 expr: &Expr,
194 codec: &dyn LogicalExtensionCodec,
195) -> Result<protobuf::LogicalExprNode, Error> {
196 use protobuf::logical_expr_node::ExprType;
197
198 let expr_node = match expr {
199 Expr::Column(c) => protobuf::LogicalExprNode {
200 expr_type: Some(ExprType::Column(c.into())),
201 },
202 Expr::Alias(Alias {
203 expr,
204 relation,
205 name,
206 metadata,
207 }) => {
208 let alias = Box::new(protobuf::AliasNode {
209 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
210 relation: relation
211 .to_owned()
212 .map(|r| vec![r.into()])
213 .unwrap_or(vec![]),
214 alias: name.to_owned(),
215 metadata: metadata
216 .as_ref()
217 .map(|m| m.to_hashmap())
218 .unwrap_or(HashMap::new()),
219 });
220 protobuf::LogicalExprNode {
221 expr_type: Some(ExprType::Alias(alias)),
222 }
223 }
224 Expr::Literal(value, _) => {
225 let pb_value: protobuf::ScalarValue = value.try_into()?;
226 protobuf::LogicalExprNode {
227 expr_type: Some(ExprType::Literal(pb_value)),
228 }
229 }
230 Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
231 let mut exprs = vec![right.as_ref()];
234 let mut current_expr = left.as_ref();
235 while let Expr::BinaryExpr(BinaryExpr {
236 left,
237 op: current_op,
238 right,
239 }) = current_expr
240 {
241 if current_op == op {
242 exprs.push(right.as_ref());
243 current_expr = left.as_ref();
244 } else {
245 break;
246 }
247 }
248 exprs.push(current_expr);
249
250 let binary_expr = protobuf::BinaryExprNode {
251 operands: serialize_exprs(exprs.into_iter().rev(), codec)?,
255 op: format!("{op:?}"),
256 };
257 protobuf::LogicalExprNode {
258 expr_type: Some(ExprType::BinaryExpr(binary_expr)),
259 }
260 }
261 Expr::Like(Like {
262 negated,
263 expr,
264 pattern,
265 escape_char,
266 case_insensitive,
267 }) => {
268 if *case_insensitive {
269 let pb = Box::new(protobuf::ILikeNode {
270 negated: *negated,
271 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
272 pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
273 escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
274 });
275
276 protobuf::LogicalExprNode {
277 expr_type: Some(ExprType::Ilike(pb)),
278 }
279 } else {
280 let pb = Box::new(protobuf::LikeNode {
281 negated: *negated,
282 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
283 pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
284 escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
285 });
286
287 protobuf::LogicalExprNode {
288 expr_type: Some(ExprType::Like(pb)),
289 }
290 }
291 }
292 Expr::SimilarTo(Like {
293 negated,
294 expr,
295 pattern,
296 escape_char,
297 case_insensitive: _,
298 }) => {
299 let pb = Box::new(protobuf::SimilarToNode {
300 negated: *negated,
301 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
302 pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
303 escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
304 });
305 protobuf::LogicalExprNode {
306 expr_type: Some(ExprType::SimilarTo(pb)),
307 }
308 }
309 Expr::WindowFunction(window_fun) => {
310 let expr::WindowFunction {
311 fun,
312 params:
313 expr::WindowFunctionParams {
314 args,
315 partition_by,
316 order_by,
317 window_frame,
318 null_treatment,
319 distinct,
320 filter,
321 },
322 } = window_fun.as_ref();
323 let mut buf = Vec::new();
324 let window_function = match fun {
325 WindowFunctionDefinition::AggregateUDF(aggr_udf) => {
326 let _ = codec.try_encode_udaf(aggr_udf, &mut buf);
327 protobuf::window_expr_node::WindowFunction::Udaf(
328 aggr_udf.name().to_string(),
329 )
330 }
331 WindowFunctionDefinition::WindowUDF(window_udf) => {
332 let _ = codec.try_encode_udwf(window_udf, &mut buf);
333 protobuf::window_expr_node::WindowFunction::Udwf(
334 window_udf.name().to_string(),
335 )
336 }
337 };
338 let fun_definition = (!buf.is_empty()).then_some(buf);
339 let partition_by = serialize_exprs(partition_by, codec)?;
340 let order_by = serialize_sorts(order_by, codec)?;
341
342 let window_frame: Option<protobuf::WindowFrame> =
343 Some(window_frame.try_into()?);
344
345 let window_expr = protobuf::WindowExprNode {
346 exprs: serialize_exprs(args, codec)?,
347 window_function: Some(window_function),
348 partition_by,
349 order_by,
350 window_frame,
351 distinct: *distinct,
352 filter: match filter {
353 Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
354 None => None,
355 },
356 null_treatment: null_treatment
357 .map(|nt| protobuf::NullTreatment::from(nt).into()),
358 fun_definition,
359 };
360 protobuf::LogicalExprNode {
361 expr_type: Some(ExprType::WindowExpr(Box::new(window_expr))),
362 }
363 }
364 Expr::AggregateFunction(expr::AggregateFunction {
365 func,
366 params:
367 AggregateFunctionParams {
368 args,
369 distinct,
370 filter,
371 order_by,
372 null_treatment,
373 },
374 }) => {
375 let mut buf = Vec::new();
376 let _ = codec.try_encode_udaf(func, &mut buf);
377 protobuf::LogicalExprNode {
378 expr_type: Some(ExprType::AggregateUdfExpr(Box::new(
379 protobuf::AggregateUdfExprNode {
380 fun_name: func.name().to_string(),
381 args: serialize_exprs(args, codec)?,
382 distinct: *distinct,
383 filter: match filter {
384 Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
385 None => None,
386 },
387 order_by: serialize_sorts(order_by, codec)?,
388 fun_definition: (!buf.is_empty()).then_some(buf),
389 null_treatment: null_treatment
390 .map(|nt| protobuf::NullTreatment::from(nt).into()),
391 },
392 ))),
393 }
394 }
395
396 Expr::ScalarVariable(_, _) => {
397 return Err(Error::General(
398 "Proto serialization error: Scalar Variable not supported".to_string(),
399 ));
400 }
401 Expr::ScalarFunction(ScalarFunction { func, args }) => {
402 let mut buf = Vec::new();
403 let _ = codec.try_encode_udf(func, &mut buf);
404 protobuf::LogicalExprNode {
405 expr_type: Some(ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
406 fun_name: func.name().to_string(),
407 fun_definition: (!buf.is_empty()).then_some(buf),
408 args: serialize_exprs(args, codec)?,
409 })),
410 }
411 }
412 Expr::Not(expr) => {
413 let expr = Box::new(protobuf::Not {
414 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
415 });
416 protobuf::LogicalExprNode {
417 expr_type: Some(ExprType::NotExpr(expr)),
418 }
419 }
420 Expr::IsNull(expr) => {
421 let expr = Box::new(protobuf::IsNull {
422 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
423 });
424 protobuf::LogicalExprNode {
425 expr_type: Some(ExprType::IsNullExpr(expr)),
426 }
427 }
428 Expr::IsNotNull(expr) => {
429 let expr = Box::new(protobuf::IsNotNull {
430 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
431 });
432 protobuf::LogicalExprNode {
433 expr_type: Some(ExprType::IsNotNullExpr(expr)),
434 }
435 }
436 Expr::IsTrue(expr) => {
437 let expr = Box::new(protobuf::IsTrue {
438 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
439 });
440 protobuf::LogicalExprNode {
441 expr_type: Some(ExprType::IsTrue(expr)),
442 }
443 }
444 Expr::IsFalse(expr) => {
445 let expr = Box::new(protobuf::IsFalse {
446 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
447 });
448 protobuf::LogicalExprNode {
449 expr_type: Some(ExprType::IsFalse(expr)),
450 }
451 }
452 Expr::IsUnknown(expr) => {
453 let expr = Box::new(protobuf::IsUnknown {
454 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
455 });
456 protobuf::LogicalExprNode {
457 expr_type: Some(ExprType::IsUnknown(expr)),
458 }
459 }
460 Expr::IsNotTrue(expr) => {
461 let expr = Box::new(protobuf::IsNotTrue {
462 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
463 });
464 protobuf::LogicalExprNode {
465 expr_type: Some(ExprType::IsNotTrue(expr)),
466 }
467 }
468 Expr::IsNotFalse(expr) => {
469 let expr = Box::new(protobuf::IsNotFalse {
470 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
471 });
472 protobuf::LogicalExprNode {
473 expr_type: Some(ExprType::IsNotFalse(expr)),
474 }
475 }
476 Expr::IsNotUnknown(expr) => {
477 let expr = Box::new(protobuf::IsNotUnknown {
478 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
479 });
480 protobuf::LogicalExprNode {
481 expr_type: Some(ExprType::IsNotUnknown(expr)),
482 }
483 }
484 Expr::Between(Between {
485 expr,
486 negated,
487 low,
488 high,
489 }) => {
490 let expr = Box::new(protobuf::BetweenNode {
491 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
492 negated: *negated,
493 low: Some(Box::new(serialize_expr(low.as_ref(), codec)?)),
494 high: Some(Box::new(serialize_expr(high.as_ref(), codec)?)),
495 });
496 protobuf::LogicalExprNode {
497 expr_type: Some(ExprType::Between(expr)),
498 }
499 }
500 Expr::Case(case) => {
501 let when_then_expr = case
502 .when_then_expr
503 .iter()
504 .map(|(w, t)| {
505 Ok(protobuf::WhenThen {
506 when_expr: Some(serialize_expr(w.as_ref(), codec)?),
507 then_expr: Some(serialize_expr(t.as_ref(), codec)?),
508 })
509 })
510 .collect::<Result<Vec<protobuf::WhenThen>, Error>>()?;
511 let expr = Box::new(protobuf::CaseNode {
512 expr: match &case.expr {
513 Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
514 None => None,
515 },
516 when_then_expr,
517 else_expr: match &case.else_expr {
518 Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
519 None => None,
520 },
521 });
522 protobuf::LogicalExprNode {
523 expr_type: Some(ExprType::Case(expr)),
524 }
525 }
526 Expr::Cast(Cast { expr, field }) => {
527 let expr = Box::new(protobuf::CastNode {
528 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
529 arrow_type: Some(field.data_type().try_into()?),
530 metadata: field.metadata().clone(),
531 nullable: Some(field.is_nullable()),
532 });
533 protobuf::LogicalExprNode {
534 expr_type: Some(ExprType::Cast(expr)),
535 }
536 }
537 Expr::TryCast(TryCast { expr, field }) => {
538 let expr = Box::new(protobuf::TryCastNode {
539 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
540 arrow_type: Some(field.data_type().try_into()?),
541 metadata: field.metadata().clone(),
542 nullable: Some(field.is_nullable()),
543 });
544 protobuf::LogicalExprNode {
545 expr_type: Some(ExprType::TryCast(expr)),
546 }
547 }
548 Expr::Negative(expr) => {
549 let expr = Box::new(protobuf::NegativeNode {
550 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
551 });
552 protobuf::LogicalExprNode {
553 expr_type: Some(ExprType::Negative(expr)),
554 }
555 }
556 Expr::Unnest(Unnest { expr }) => {
557 let expr = protobuf::Unnest {
558 exprs: vec![serialize_expr(expr.as_ref(), codec)?],
559 };
560 protobuf::LogicalExprNode {
561 expr_type: Some(ExprType::Unnest(expr)),
562 }
563 }
564 Expr::InList(InList {
565 expr,
566 list,
567 negated,
568 }) => {
569 let expr = Box::new(protobuf::InListNode {
570 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
571 list: serialize_exprs(list, codec)?,
572 negated: *negated,
573 });
574 protobuf::LogicalExprNode {
575 expr_type: Some(ExprType::InList(expr)),
576 }
577 }
578 #[expect(deprecated)]
579 Expr::Wildcard { qualifier, .. } => protobuf::LogicalExprNode {
580 expr_type: Some(ExprType::Wildcard(protobuf::Wildcard {
581 qualifier: qualifier.to_owned().map(|x| x.into()),
582 })),
583 },
584 Expr::ScalarSubquery(subquery) => protobuf::LogicalExprNode {
585 expr_type: Some(ExprType::ScalarSubqueryExpr(Box::new(
586 protobuf::ScalarSubqueryExprNode {
587 subquery: Some(Box::new(serialize_subquery(subquery, codec)?)),
588 },
589 ))),
590 },
591 Expr::InSubquery(_)
592 | Expr::Exists(_)
593 | Expr::OuterReferenceColumn(_, _)
594 | Expr::SetComparison(_) => {
595 return Err(Error::General(format!(
596 "Proto serialization error: {expr} is not yet supported"
597 )));
598 }
599 Expr::GroupingSet(GroupingSet::Cube(exprs)) => protobuf::LogicalExprNode {
600 expr_type: Some(ExprType::Cube(CubeNode {
601 expr: serialize_exprs(exprs, codec)?,
602 })),
603 },
604 Expr::GroupingSet(GroupingSet::Rollup(exprs)) => protobuf::LogicalExprNode {
605 expr_type: Some(ExprType::Rollup(RollupNode {
606 expr: serialize_exprs(exprs, codec)?,
607 })),
608 },
609 Expr::GroupingSet(GroupingSet::GroupingSets(exprs)) => {
610 protobuf::LogicalExprNode {
611 expr_type: Some(ExprType::GroupingSet(GroupingSetNode {
612 expr: exprs
613 .iter()
614 .map(|expr_list| {
615 Ok(LogicalExprList {
616 expr: serialize_exprs(expr_list, codec)?,
617 })
618 })
619 .collect::<Result<Vec<_>, Error>>()?,
620 })),
621 }
622 }
623 Expr::Placeholder(Placeholder { id, field }) => protobuf::LogicalExprNode {
624 expr_type: Some(ExprType::Placeholder(PlaceholderNode {
625 id: id.clone(),
626 data_type: match field {
627 Some(field) => Some(field.data_type().try_into()?),
628 None => None,
629 },
630 nullable: field.as_ref().map(|f| f.is_nullable()),
631 metadata: field
632 .as_ref()
633 .map(|f| f.metadata().clone())
634 .unwrap_or(HashMap::new()),
635 })),
636 },
637 Expr::HigherOrderFunction(_) | Expr::Lambda(_) | Expr::LambdaVariable(_) => {
638 return Err(Error::General(
639 "Proto serialization error: Lambda not implemented".to_string(),
640 ));
641 }
642 };
643
644 Ok(expr_node)
645}
646
647fn serialize_subquery(
648 subquery: &Subquery,
649 codec: &dyn LogicalExtensionCodec,
650) -> Result<protobuf::SubqueryNode, Error> {
651 let plan = LogicalPlanNode::try_from_logical_plan(&subquery.subquery, codec)
652 .map_err(|e| Error::General(e.to_string()))?;
653 let outer_ref_columns = serialize_exprs(&subquery.outer_ref_columns, codec)?;
654 Ok(protobuf::SubqueryNode {
655 subquery: Some(Box::new(plan)),
656 outer_ref_columns,
657 })
658}
659
660pub fn serialize_sorts<'a, I>(
661 sorts: I,
662 codec: &dyn LogicalExtensionCodec,
663) -> Result<Vec<protobuf::SortExprNode>, Error>
664where
665 I: IntoIterator<Item = &'a SortExpr>,
666{
667 sorts
668 .into_iter()
669 .map(|sort| {
670 let SortExpr {
671 expr,
672 asc,
673 nulls_first,
674 } = sort;
675 Ok(protobuf::SortExprNode {
676 expr: Some(serialize_expr(expr, codec)?),
677 asc: *asc,
678 nulls_first: *nulls_first,
679 })
680 })
681 .collect::<Result<Vec<_>, Error>>()
682}
683
684impl From<TableReference> for protobuf::TableReference {
685 fn from(t: TableReference) -> Self {
686 use protobuf::table_reference::TableReferenceEnum;
687 let table_reference_enum = match t {
688 TableReference::Bare { table } => {
689 TableReferenceEnum::Bare(protobuf::BareTableReference {
690 table: table.to_string(),
691 })
692 }
693 TableReference::Partial { schema, table } => {
694 TableReferenceEnum::Partial(protobuf::PartialTableReference {
695 schema: schema.to_string(),
696 table: table.to_string(),
697 })
698 }
699 TableReference::Full {
700 catalog,
701 schema,
702 table,
703 } => TableReferenceEnum::Full(protobuf::FullTableReference {
704 catalog: catalog.to_string(),
705 schema: schema.to_string(),
706 table: table.to_string(),
707 }),
708 };
709
710 protobuf::TableReference {
711 table_reference_enum: Some(table_reference_enum),
712 }
713 }
714}
715
716impl From<JoinType> for protobuf::JoinType {
717 fn from(t: JoinType) -> Self {
718 match t {
719 JoinType::Inner => protobuf::JoinType::Inner,
720 JoinType::Left => protobuf::JoinType::Left,
721 JoinType::Right => protobuf::JoinType::Right,
722 JoinType::Full => protobuf::JoinType::Full,
723 JoinType::LeftSemi => protobuf::JoinType::Leftsemi,
724 JoinType::RightSemi => protobuf::JoinType::Rightsemi,
725 JoinType::LeftAnti => protobuf::JoinType::Leftanti,
726 JoinType::RightAnti => protobuf::JoinType::Rightanti,
727 JoinType::LeftMark => protobuf::JoinType::Leftmark,
728 JoinType::RightMark => protobuf::JoinType::Rightmark,
729 }
730 }
731}
732
733impl From<JoinConstraint> for protobuf::JoinConstraint {
734 fn from(t: JoinConstraint) -> Self {
735 match t {
736 JoinConstraint::On => protobuf::JoinConstraint::On,
737 JoinConstraint::Using => protobuf::JoinConstraint::Using,
738 }
739 }
740}
741
742impl From<NullEquality> for protobuf::NullEquality {
743 fn from(t: NullEquality) -> Self {
744 match t {
745 NullEquality::NullEqualsNothing => protobuf::NullEquality::NullEqualsNothing,
746 NullEquality::NullEqualsNull => protobuf::NullEquality::NullEqualsNull,
747 }
748 }
749}
750
751impl From<&WriteOp> for protobuf::dml_node::Type {
752 fn from(t: &WriteOp) -> Self {
753 match t {
754 WriteOp::Insert(InsertOp::Append) => protobuf::dml_node::Type::InsertAppend,
755 WriteOp::Insert(InsertOp::Overwrite) => {
756 protobuf::dml_node::Type::InsertOverwrite
757 }
758 WriteOp::Insert(InsertOp::Replace) => protobuf::dml_node::Type::InsertReplace,
759 WriteOp::Delete => protobuf::dml_node::Type::Delete,
760 WriteOp::Update => protobuf::dml_node::Type::Update,
761 WriteOp::Ctas => protobuf::dml_node::Type::Ctas,
762 WriteOp::Truncate => protobuf::dml_node::Type::Truncate,
763 }
764 }
765}
766
767impl From<NullTreatment> for protobuf::NullTreatment {
768 fn from(t: NullTreatment) -> Self {
769 match t {
770 NullTreatment::RespectNulls => protobuf::NullTreatment::RespectNulls,
771 NullTreatment::IgnoreNulls => protobuf::NullTreatment::IgnoreNulls,
772 }
773 }
774}