1use std::collections::HashMap;
23
24use datafusion_common::{TableReference, UnnestOptions};
25use datafusion_expr::dml::InsertOp;
26use datafusion_expr::expr::{
27 self, AggregateFunctionParams, Alias, Between, BinaryExpr, Cast, GroupingSet, InList,
28 Like, Placeholder, ScalarFunction, Unnest,
29};
30use datafusion_expr::WriteOp;
31use datafusion_expr::{
32 logical_plan::PlanType, logical_plan::StringifiedPlan, Expr, JoinConstraint,
33 JoinType, SortExpr, TryCast, WindowFrame, WindowFrameBound, WindowFrameUnits,
34 WindowFunctionDefinition,
35};
36
37use crate::protobuf::RecursionUnnestOption;
38use crate::protobuf::{
39 self,
40 plan_type::PlanTypeEnum::{
41 AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
42 FinalPhysicalPlan, FinalPhysicalPlanWithSchema, FinalPhysicalPlanWithStats,
43 InitialLogicalPlan, InitialPhysicalPlan, InitialPhysicalPlanWithSchema,
44 InitialPhysicalPlanWithStats, OptimizedLogicalPlan, OptimizedPhysicalPlan,
45 PhysicalPlanError,
46 },
47 AnalyzedLogicalPlanType, CubeNode, EmptyMessage, GroupingSetNode, LogicalExprList,
48 OptimizedLogicalPlanType, OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
49 ToProtoError as Error,
50};
51
52use super::LogicalExtensionCodec;
53
54impl From<&UnnestOptions> for protobuf::UnnestOptions {
55 fn from(opts: &UnnestOptions) -> Self {
56 Self {
57 preserve_nulls: opts.preserve_nulls,
58 recursions: opts
59 .recursions
60 .iter()
61 .map(|r| RecursionUnnestOption {
62 input_column: Some((&r.input_column).into()),
63 output_column: Some((&r.output_column).into()),
64 depth: r.depth as u32,
65 })
66 .collect(),
67 }
68 }
69}
70
71impl From<&StringifiedPlan> for protobuf::StringifiedPlan {
72 fn from(stringified_plan: &StringifiedPlan) -> Self {
73 Self {
74 plan_type: match stringified_plan.clone().plan_type {
75 PlanType::InitialLogicalPlan => Some(protobuf::PlanType {
76 plan_type_enum: Some(InitialLogicalPlan(EmptyMessage {})),
77 }),
78 PlanType::AnalyzedLogicalPlan { analyzer_name } => {
79 Some(protobuf::PlanType {
80 plan_type_enum: Some(AnalyzedLogicalPlan(
81 AnalyzedLogicalPlanType { analyzer_name },
82 )),
83 })
84 }
85 PlanType::FinalAnalyzedLogicalPlan => Some(protobuf::PlanType {
86 plan_type_enum: Some(FinalAnalyzedLogicalPlan(EmptyMessage {})),
87 }),
88 PlanType::OptimizedLogicalPlan { optimizer_name } => {
89 Some(protobuf::PlanType {
90 plan_type_enum: Some(OptimizedLogicalPlan(
91 OptimizedLogicalPlanType { optimizer_name },
92 )),
93 })
94 }
95 PlanType::FinalLogicalPlan => Some(protobuf::PlanType {
96 plan_type_enum: Some(FinalLogicalPlan(EmptyMessage {})),
97 }),
98 PlanType::InitialPhysicalPlan => Some(protobuf::PlanType {
99 plan_type_enum: Some(InitialPhysicalPlan(EmptyMessage {})),
100 }),
101 PlanType::OptimizedPhysicalPlan { optimizer_name } => {
102 Some(protobuf::PlanType {
103 plan_type_enum: Some(OptimizedPhysicalPlan(
104 OptimizedPhysicalPlanType { optimizer_name },
105 )),
106 })
107 }
108 PlanType::FinalPhysicalPlan => Some(protobuf::PlanType {
109 plan_type_enum: Some(FinalPhysicalPlan(EmptyMessage {})),
110 }),
111 PlanType::InitialPhysicalPlanWithStats => Some(protobuf::PlanType {
112 plan_type_enum: Some(InitialPhysicalPlanWithStats(EmptyMessage {})),
113 }),
114 PlanType::InitialPhysicalPlanWithSchema => Some(protobuf::PlanType {
115 plan_type_enum: Some(InitialPhysicalPlanWithSchema(EmptyMessage {})),
116 }),
117 PlanType::FinalPhysicalPlanWithStats => Some(protobuf::PlanType {
118 plan_type_enum: Some(FinalPhysicalPlanWithStats(EmptyMessage {})),
119 }),
120 PlanType::FinalPhysicalPlanWithSchema => Some(protobuf::PlanType {
121 plan_type_enum: Some(FinalPhysicalPlanWithSchema(EmptyMessage {})),
122 }),
123 PlanType::PhysicalPlanError => Some(protobuf::PlanType {
124 plan_type_enum: Some(PhysicalPlanError(EmptyMessage {})),
125 }),
126 },
127 plan: stringified_plan.plan.to_string(),
128 }
129 }
130}
131
132impl From<WindowFrameUnits> for protobuf::WindowFrameUnits {
133 fn from(units: WindowFrameUnits) -> Self {
134 match units {
135 WindowFrameUnits::Rows => Self::Rows,
136 WindowFrameUnits::Range => Self::Range,
137 WindowFrameUnits::Groups => Self::Groups,
138 }
139 }
140}
141
142impl TryFrom<&WindowFrameBound> for protobuf::WindowFrameBound {
143 type Error = Error;
144
145 fn try_from(bound: &WindowFrameBound) -> Result<Self, Self::Error> {
146 Ok(match bound {
147 WindowFrameBound::CurrentRow => Self {
148 window_frame_bound_type: protobuf::WindowFrameBoundType::CurrentRow
149 .into(),
150 bound_value: None,
151 },
152 WindowFrameBound::Preceding(v) => Self {
153 window_frame_bound_type: protobuf::WindowFrameBoundType::Preceding.into(),
154 bound_value: Some(v.try_into()?),
155 },
156 WindowFrameBound::Following(v) => Self {
157 window_frame_bound_type: protobuf::WindowFrameBoundType::Following.into(),
158 bound_value: Some(v.try_into()?),
159 },
160 })
161 }
162}
163
164impl TryFrom<&WindowFrame> for protobuf::WindowFrame {
165 type Error = Error;
166
167 fn try_from(window: &WindowFrame) -> Result<Self, Self::Error> {
168 Ok(Self {
169 window_frame_units: protobuf::WindowFrameUnits::from(window.units).into(),
170 start_bound: Some((&window.start_bound).try_into()?),
171 end_bound: Some(protobuf::window_frame::EndBound::Bound(
172 (&window.end_bound).try_into()?,
173 )),
174 })
175 }
176}
177
178pub fn serialize_exprs<'a, I>(
179 exprs: I,
180 codec: &dyn LogicalExtensionCodec,
181) -> Result<Vec<protobuf::LogicalExprNode>, Error>
182where
183 I: IntoIterator<Item = &'a Expr>,
184{
185 exprs
186 .into_iter()
187 .map(|expr| serialize_expr(expr, codec))
188 .collect::<Result<Vec<_>, Error>>()
189}
190
191pub fn serialize_expr(
192 expr: &Expr,
193 codec: &dyn LogicalExtensionCodec,
194) -> Result<protobuf::LogicalExprNode, Error> {
195 use protobuf::logical_expr_node::ExprType;
196
197 let expr_node = match expr {
198 Expr::Column(c) => protobuf::LogicalExprNode {
199 expr_type: Some(ExprType::Column(c.into())),
200 },
201 Expr::Alias(Alias {
202 expr,
203 relation,
204 name,
205 metadata,
206 }) => {
207 let alias = Box::new(protobuf::AliasNode {
208 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
209 relation: relation
210 .to_owned()
211 .map(|r| vec![r.into()])
212 .unwrap_or(vec![]),
213 alias: name.to_owned(),
214 metadata: metadata.to_owned().unwrap_or(HashMap::new()),
215 });
216 protobuf::LogicalExprNode {
217 expr_type: Some(ExprType::Alias(alias)),
218 }
219 }
220 Expr::Literal(value) => {
221 let pb_value: protobuf::ScalarValue = value.try_into()?;
222 protobuf::LogicalExprNode {
223 expr_type: Some(ExprType::Literal(pb_value)),
224 }
225 }
226 Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
227 let mut exprs = vec![right.as_ref()];
230 let mut current_expr = left.as_ref();
231 while let Expr::BinaryExpr(BinaryExpr {
232 left,
233 op: current_op,
234 right,
235 }) = current_expr
236 {
237 if current_op == op {
238 exprs.push(right.as_ref());
239 current_expr = left.as_ref();
240 } else {
241 break;
242 }
243 }
244 exprs.push(current_expr);
245
246 let binary_expr = protobuf::BinaryExprNode {
247 operands: serialize_exprs(exprs.into_iter().rev(), codec)?,
251 op: format!("{op:?}"),
252 };
253 protobuf::LogicalExprNode {
254 expr_type: Some(ExprType::BinaryExpr(binary_expr)),
255 }
256 }
257 Expr::Like(Like {
258 negated,
259 expr,
260 pattern,
261 escape_char,
262 case_insensitive,
263 }) => {
264 if *case_insensitive {
265 let pb = Box::new(protobuf::ILikeNode {
266 negated: *negated,
267 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
268 pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
269 escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
270 });
271
272 protobuf::LogicalExprNode {
273 expr_type: Some(ExprType::Ilike(pb)),
274 }
275 } else {
276 let pb = Box::new(protobuf::LikeNode {
277 negated: *negated,
278 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
279 pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
280 escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
281 });
282
283 protobuf::LogicalExprNode {
284 expr_type: Some(ExprType::Like(pb)),
285 }
286 }
287 }
288 Expr::SimilarTo(Like {
289 negated,
290 expr,
291 pattern,
292 escape_char,
293 case_insensitive: _,
294 }) => {
295 let pb = Box::new(protobuf::SimilarToNode {
296 negated: *negated,
297 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
298 pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
299 escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
300 });
301 protobuf::LogicalExprNode {
302 expr_type: Some(ExprType::SimilarTo(pb)),
303 }
304 }
305 Expr::WindowFunction(expr::WindowFunction {
306 ref fun,
307 params:
308 expr::WindowFunctionParams {
309 ref args,
310 ref partition_by,
311 ref order_by,
312 ref window_frame,
313 null_treatment: _,
315 },
316 }) => {
317 let (window_function, fun_definition) = match fun {
318 WindowFunctionDefinition::AggregateUDF(aggr_udf) => {
319 let mut buf = Vec::new();
320 let _ = codec.try_encode_udaf(aggr_udf, &mut buf);
321 (
322 protobuf::window_expr_node::WindowFunction::Udaf(
323 aggr_udf.name().to_string(),
324 ),
325 (!buf.is_empty()).then_some(buf),
326 )
327 }
328 WindowFunctionDefinition::WindowUDF(window_udf) => {
329 let mut buf = Vec::new();
330 let _ = codec.try_encode_udwf(window_udf, &mut buf);
331 (
332 protobuf::window_expr_node::WindowFunction::Udwf(
333 window_udf.name().to_string(),
334 ),
335 (!buf.is_empty()).then_some(buf),
336 )
337 }
338 };
339 let partition_by = serialize_exprs(partition_by, codec)?;
340 let order_by = serialize_sorts(order_by, codec)?;
341
342 let window_frame: Option<protobuf::WindowFrame> =
343 Some(window_frame.try_into()?);
344 let window_expr = protobuf::WindowExprNode {
345 exprs: serialize_exprs(args, codec)?,
346 window_function: Some(window_function),
347 partition_by,
348 order_by,
349 window_frame,
350 fun_definition,
351 };
352 protobuf::LogicalExprNode {
353 expr_type: Some(ExprType::WindowExpr(window_expr)),
354 }
355 }
356 Expr::AggregateFunction(expr::AggregateFunction {
357 ref func,
358 params:
359 AggregateFunctionParams {
360 ref args,
361 ref distinct,
362 ref filter,
363 ref order_by,
364 null_treatment: _,
365 },
366 }) => {
367 let mut buf = Vec::new();
368 let _ = codec.try_encode_udaf(func, &mut buf);
369 protobuf::LogicalExprNode {
370 expr_type: Some(ExprType::AggregateUdfExpr(Box::new(
371 protobuf::AggregateUdfExprNode {
372 fun_name: func.name().to_string(),
373 args: serialize_exprs(args, codec)?,
374 distinct: *distinct,
375 filter: match filter {
376 Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
377 None => None,
378 },
379 order_by: match order_by {
380 Some(e) => serialize_sorts(e, codec)?,
381 None => vec![],
382 },
383 fun_definition: (!buf.is_empty()).then_some(buf),
384 },
385 ))),
386 }
387 }
388
389 Expr::ScalarVariable(_, _) => {
390 return Err(Error::General(
391 "Proto serialization error: Scalar Variable not supported".to_string(),
392 ))
393 }
394 Expr::ScalarFunction(ScalarFunction { func, args }) => {
395 let mut buf = Vec::new();
396 let _ = codec.try_encode_udf(func, &mut buf);
397 protobuf::LogicalExprNode {
398 expr_type: Some(ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
399 fun_name: func.name().to_string(),
400 fun_definition: (!buf.is_empty()).then_some(buf),
401 args: serialize_exprs(args, codec)?,
402 })),
403 }
404 }
405 Expr::Not(expr) => {
406 let expr = Box::new(protobuf::Not {
407 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
408 });
409 protobuf::LogicalExprNode {
410 expr_type: Some(ExprType::NotExpr(expr)),
411 }
412 }
413 Expr::IsNull(expr) => {
414 let expr = Box::new(protobuf::IsNull {
415 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
416 });
417 protobuf::LogicalExprNode {
418 expr_type: Some(ExprType::IsNullExpr(expr)),
419 }
420 }
421 Expr::IsNotNull(expr) => {
422 let expr = Box::new(protobuf::IsNotNull {
423 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
424 });
425 protobuf::LogicalExprNode {
426 expr_type: Some(ExprType::IsNotNullExpr(expr)),
427 }
428 }
429 Expr::IsTrue(expr) => {
430 let expr = Box::new(protobuf::IsTrue {
431 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
432 });
433 protobuf::LogicalExprNode {
434 expr_type: Some(ExprType::IsTrue(expr)),
435 }
436 }
437 Expr::IsFalse(expr) => {
438 let expr = Box::new(protobuf::IsFalse {
439 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
440 });
441 protobuf::LogicalExprNode {
442 expr_type: Some(ExprType::IsFalse(expr)),
443 }
444 }
445 Expr::IsUnknown(expr) => {
446 let expr = Box::new(protobuf::IsUnknown {
447 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
448 });
449 protobuf::LogicalExprNode {
450 expr_type: Some(ExprType::IsUnknown(expr)),
451 }
452 }
453 Expr::IsNotTrue(expr) => {
454 let expr = Box::new(protobuf::IsNotTrue {
455 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
456 });
457 protobuf::LogicalExprNode {
458 expr_type: Some(ExprType::IsNotTrue(expr)),
459 }
460 }
461 Expr::IsNotFalse(expr) => {
462 let expr = Box::new(protobuf::IsNotFalse {
463 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
464 });
465 protobuf::LogicalExprNode {
466 expr_type: Some(ExprType::IsNotFalse(expr)),
467 }
468 }
469 Expr::IsNotUnknown(expr) => {
470 let expr = Box::new(protobuf::IsNotUnknown {
471 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
472 });
473 protobuf::LogicalExprNode {
474 expr_type: Some(ExprType::IsNotUnknown(expr)),
475 }
476 }
477 Expr::Between(Between {
478 expr,
479 negated,
480 low,
481 high,
482 }) => {
483 let expr = Box::new(protobuf::BetweenNode {
484 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
485 negated: *negated,
486 low: Some(Box::new(serialize_expr(low.as_ref(), codec)?)),
487 high: Some(Box::new(serialize_expr(high.as_ref(), codec)?)),
488 });
489 protobuf::LogicalExprNode {
490 expr_type: Some(ExprType::Between(expr)),
491 }
492 }
493 Expr::Case(case) => {
494 let when_then_expr = case
495 .when_then_expr
496 .iter()
497 .map(|(w, t)| {
498 Ok(protobuf::WhenThen {
499 when_expr: Some(serialize_expr(w.as_ref(), codec)?),
500 then_expr: Some(serialize_expr(t.as_ref(), codec)?),
501 })
502 })
503 .collect::<Result<Vec<protobuf::WhenThen>, Error>>()?;
504 let expr = Box::new(protobuf::CaseNode {
505 expr: match &case.expr {
506 Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
507 None => None,
508 },
509 when_then_expr,
510 else_expr: match &case.else_expr {
511 Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
512 None => None,
513 },
514 });
515 protobuf::LogicalExprNode {
516 expr_type: Some(ExprType::Case(expr)),
517 }
518 }
519 Expr::Cast(Cast { expr, data_type }) => {
520 let expr = Box::new(protobuf::CastNode {
521 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
522 arrow_type: Some(data_type.try_into()?),
523 });
524 protobuf::LogicalExprNode {
525 expr_type: Some(ExprType::Cast(expr)),
526 }
527 }
528 Expr::TryCast(TryCast { expr, data_type }) => {
529 let expr = Box::new(protobuf::TryCastNode {
530 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
531 arrow_type: Some(data_type.try_into()?),
532 });
533 protobuf::LogicalExprNode {
534 expr_type: Some(ExprType::TryCast(expr)),
535 }
536 }
537 Expr::Negative(expr) => {
538 let expr = Box::new(protobuf::NegativeNode {
539 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
540 });
541 protobuf::LogicalExprNode {
542 expr_type: Some(ExprType::Negative(expr)),
543 }
544 }
545 Expr::Unnest(Unnest { expr }) => {
546 let expr = protobuf::Unnest {
547 exprs: vec![serialize_expr(expr.as_ref(), codec)?],
548 };
549 protobuf::LogicalExprNode {
550 expr_type: Some(ExprType::Unnest(expr)),
551 }
552 }
553 Expr::InList(InList {
554 expr,
555 list,
556 negated,
557 }) => {
558 let expr = Box::new(protobuf::InListNode {
559 expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
560 list: serialize_exprs(list, codec)?,
561 negated: *negated,
562 });
563 protobuf::LogicalExprNode {
564 expr_type: Some(ExprType::InList(expr)),
565 }
566 }
567 #[expect(deprecated)]
568 Expr::Wildcard { qualifier, .. } => protobuf::LogicalExprNode {
569 expr_type: Some(ExprType::Wildcard(protobuf::Wildcard {
570 qualifier: qualifier.to_owned().map(|x| x.into()),
571 })),
572 },
573 Expr::ScalarSubquery(_)
574 | Expr::InSubquery(_)
575 | Expr::Exists { .. }
576 | Expr::OuterReferenceColumn { .. } => {
577 return Err(Error::General("Proto serialization error: Expr::ScalarSubquery(_) | Expr::InSubquery(_) | Expr::Exists { .. } | Exp:OuterReferenceColumn not supported".to_string()));
580 }
581 Expr::GroupingSet(GroupingSet::Cube(exprs)) => protobuf::LogicalExprNode {
582 expr_type: Some(ExprType::Cube(CubeNode {
583 expr: serialize_exprs(exprs, codec)?,
584 })),
585 },
586 Expr::GroupingSet(GroupingSet::Rollup(exprs)) => protobuf::LogicalExprNode {
587 expr_type: Some(ExprType::Rollup(RollupNode {
588 expr: serialize_exprs(exprs, codec)?,
589 })),
590 },
591 Expr::GroupingSet(GroupingSet::GroupingSets(exprs)) => {
592 protobuf::LogicalExprNode {
593 expr_type: Some(ExprType::GroupingSet(GroupingSetNode {
594 expr: exprs
595 .iter()
596 .map(|expr_list| {
597 Ok(LogicalExprList {
598 expr: serialize_exprs(expr_list, codec)?,
599 })
600 })
601 .collect::<Result<Vec<_>, Error>>()?,
602 })),
603 }
604 }
605 Expr::Placeholder(Placeholder { id, data_type }) => {
606 let data_type = match data_type {
607 Some(data_type) => Some(data_type.try_into()?),
608 None => None,
609 };
610 protobuf::LogicalExprNode {
611 expr_type: Some(ExprType::Placeholder(PlaceholderNode {
612 id: id.clone(),
613 data_type,
614 })),
615 }
616 }
617 };
618
619 Ok(expr_node)
620}
621
622pub fn serialize_sorts<'a, I>(
623 sorts: I,
624 codec: &dyn LogicalExtensionCodec,
625) -> Result<Vec<protobuf::SortExprNode>, Error>
626where
627 I: IntoIterator<Item = &'a SortExpr>,
628{
629 sorts
630 .into_iter()
631 .map(|sort| {
632 let SortExpr {
633 expr,
634 asc,
635 nulls_first,
636 } = sort;
637 Ok(protobuf::SortExprNode {
638 expr: Some(serialize_expr(expr, codec)?),
639 asc: *asc,
640 nulls_first: *nulls_first,
641 })
642 })
643 .collect::<Result<Vec<_>, Error>>()
644}
645
646impl From<TableReference> for protobuf::TableReference {
647 fn from(t: TableReference) -> Self {
648 use protobuf::table_reference::TableReferenceEnum;
649 let table_reference_enum = match t {
650 TableReference::Bare { table } => {
651 TableReferenceEnum::Bare(protobuf::BareTableReference {
652 table: table.to_string(),
653 })
654 }
655 TableReference::Partial { schema, table } => {
656 TableReferenceEnum::Partial(protobuf::PartialTableReference {
657 schema: schema.to_string(),
658 table: table.to_string(),
659 })
660 }
661 TableReference::Full {
662 catalog,
663 schema,
664 table,
665 } => TableReferenceEnum::Full(protobuf::FullTableReference {
666 catalog: catalog.to_string(),
667 schema: schema.to_string(),
668 table: table.to_string(),
669 }),
670 };
671
672 protobuf::TableReference {
673 table_reference_enum: Some(table_reference_enum),
674 }
675 }
676}
677
678impl From<JoinType> for protobuf::JoinType {
679 fn from(t: JoinType) -> Self {
680 match t {
681 JoinType::Inner => protobuf::JoinType::Inner,
682 JoinType::Left => protobuf::JoinType::Left,
683 JoinType::Right => protobuf::JoinType::Right,
684 JoinType::Full => protobuf::JoinType::Full,
685 JoinType::LeftSemi => protobuf::JoinType::Leftsemi,
686 JoinType::RightSemi => protobuf::JoinType::Rightsemi,
687 JoinType::LeftAnti => protobuf::JoinType::Leftanti,
688 JoinType::RightAnti => protobuf::JoinType::Rightanti,
689 JoinType::LeftMark => protobuf::JoinType::Leftmark,
690 }
691 }
692}
693
694impl From<JoinConstraint> for protobuf::JoinConstraint {
695 fn from(t: JoinConstraint) -> Self {
696 match t {
697 JoinConstraint::On => protobuf::JoinConstraint::On,
698 JoinConstraint::Using => protobuf::JoinConstraint::Using,
699 }
700 }
701}
702
703impl From<&WriteOp> for protobuf::dml_node::Type {
704 fn from(t: &WriteOp) -> Self {
705 match t {
706 WriteOp::Insert(InsertOp::Append) => protobuf::dml_node::Type::InsertAppend,
707 WriteOp::Insert(InsertOp::Overwrite) => {
708 protobuf::dml_node::Type::InsertOverwrite
709 }
710 WriteOp::Insert(InsertOp::Replace) => protobuf::dml_node::Type::InsertReplace,
711 WriteOp::Delete => protobuf::dml_node::Type::Delete,
712 WriteOp::Update => protobuf::dml_node::Type::Update,
713 WriteOp::Ctas => protobuf::dml_node::Type::Ctas,
714 }
715 }
716}