datafusion_proto/logical_plan/
to_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Code to convert Arrow schemas and DataFusion logical plans to protocol buffer format, allowing
19//! DataFusion logical plans to be serialized and transmitted between
20//! processes.
21
22use std::collections::HashMap;
23
24use datafusion_common::{TableReference, UnnestOptions};
25use datafusion_expr::dml::InsertOp;
26use datafusion_expr::expr::{
27    self, AggregateFunctionParams, Alias, Between, BinaryExpr, Cast, GroupingSet, InList,
28    Like, Placeholder, ScalarFunction, Unnest,
29};
30use datafusion_expr::WriteOp;
31use datafusion_expr::{
32    logical_plan::PlanType, logical_plan::StringifiedPlan, Expr, JoinConstraint,
33    JoinType, SortExpr, TryCast, WindowFrame, WindowFrameBound, WindowFrameUnits,
34    WindowFunctionDefinition,
35};
36
37use crate::protobuf::RecursionUnnestOption;
38use crate::protobuf::{
39    self,
40    plan_type::PlanTypeEnum::{
41        AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
42        FinalPhysicalPlan, FinalPhysicalPlanWithSchema, FinalPhysicalPlanWithStats,
43        InitialLogicalPlan, InitialPhysicalPlan, InitialPhysicalPlanWithSchema,
44        InitialPhysicalPlanWithStats, OptimizedLogicalPlan, OptimizedPhysicalPlan,
45        PhysicalPlanError,
46    },
47    AnalyzedLogicalPlanType, CubeNode, EmptyMessage, GroupingSetNode, LogicalExprList,
48    OptimizedLogicalPlanType, OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
49    ToProtoError as Error,
50};
51
52use super::LogicalExtensionCodec;
53
54impl From<&UnnestOptions> for protobuf::UnnestOptions {
55    fn from(opts: &UnnestOptions) -> Self {
56        Self {
57            preserve_nulls: opts.preserve_nulls,
58            recursions: opts
59                .recursions
60                .iter()
61                .map(|r| RecursionUnnestOption {
62                    input_column: Some((&r.input_column).into()),
63                    output_column: Some((&r.output_column).into()),
64                    depth: r.depth as u32,
65                })
66                .collect(),
67        }
68    }
69}
70
71impl From<&StringifiedPlan> for protobuf::StringifiedPlan {
72    fn from(stringified_plan: &StringifiedPlan) -> Self {
73        Self {
74            plan_type: match stringified_plan.clone().plan_type {
75                PlanType::InitialLogicalPlan => Some(protobuf::PlanType {
76                    plan_type_enum: Some(InitialLogicalPlan(EmptyMessage {})),
77                }),
78                PlanType::AnalyzedLogicalPlan { analyzer_name } => {
79                    Some(protobuf::PlanType {
80                        plan_type_enum: Some(AnalyzedLogicalPlan(
81                            AnalyzedLogicalPlanType { analyzer_name },
82                        )),
83                    })
84                }
85                PlanType::FinalAnalyzedLogicalPlan => Some(protobuf::PlanType {
86                    plan_type_enum: Some(FinalAnalyzedLogicalPlan(EmptyMessage {})),
87                }),
88                PlanType::OptimizedLogicalPlan { optimizer_name } => {
89                    Some(protobuf::PlanType {
90                        plan_type_enum: Some(OptimizedLogicalPlan(
91                            OptimizedLogicalPlanType { optimizer_name },
92                        )),
93                    })
94                }
95                PlanType::FinalLogicalPlan => Some(protobuf::PlanType {
96                    plan_type_enum: Some(FinalLogicalPlan(EmptyMessage {})),
97                }),
98                PlanType::InitialPhysicalPlan => Some(protobuf::PlanType {
99                    plan_type_enum: Some(InitialPhysicalPlan(EmptyMessage {})),
100                }),
101                PlanType::OptimizedPhysicalPlan { optimizer_name } => {
102                    Some(protobuf::PlanType {
103                        plan_type_enum: Some(OptimizedPhysicalPlan(
104                            OptimizedPhysicalPlanType { optimizer_name },
105                        )),
106                    })
107                }
108                PlanType::FinalPhysicalPlan => Some(protobuf::PlanType {
109                    plan_type_enum: Some(FinalPhysicalPlan(EmptyMessage {})),
110                }),
111                PlanType::InitialPhysicalPlanWithStats => Some(protobuf::PlanType {
112                    plan_type_enum: Some(InitialPhysicalPlanWithStats(EmptyMessage {})),
113                }),
114                PlanType::InitialPhysicalPlanWithSchema => Some(protobuf::PlanType {
115                    plan_type_enum: Some(InitialPhysicalPlanWithSchema(EmptyMessage {})),
116                }),
117                PlanType::FinalPhysicalPlanWithStats => Some(protobuf::PlanType {
118                    plan_type_enum: Some(FinalPhysicalPlanWithStats(EmptyMessage {})),
119                }),
120                PlanType::FinalPhysicalPlanWithSchema => Some(protobuf::PlanType {
121                    plan_type_enum: Some(FinalPhysicalPlanWithSchema(EmptyMessage {})),
122                }),
123                PlanType::PhysicalPlanError => Some(protobuf::PlanType {
124                    plan_type_enum: Some(PhysicalPlanError(EmptyMessage {})),
125                }),
126            },
127            plan: stringified_plan.plan.to_string(),
128        }
129    }
130}
131
132impl From<WindowFrameUnits> for protobuf::WindowFrameUnits {
133    fn from(units: WindowFrameUnits) -> Self {
134        match units {
135            WindowFrameUnits::Rows => Self::Rows,
136            WindowFrameUnits::Range => Self::Range,
137            WindowFrameUnits::Groups => Self::Groups,
138        }
139    }
140}
141
142impl TryFrom<&WindowFrameBound> for protobuf::WindowFrameBound {
143    type Error = Error;
144
145    fn try_from(bound: &WindowFrameBound) -> Result<Self, Self::Error> {
146        Ok(match bound {
147            WindowFrameBound::CurrentRow => Self {
148                window_frame_bound_type: protobuf::WindowFrameBoundType::CurrentRow
149                    .into(),
150                bound_value: None,
151            },
152            WindowFrameBound::Preceding(v) => Self {
153                window_frame_bound_type: protobuf::WindowFrameBoundType::Preceding.into(),
154                bound_value: Some(v.try_into()?),
155            },
156            WindowFrameBound::Following(v) => Self {
157                window_frame_bound_type: protobuf::WindowFrameBoundType::Following.into(),
158                bound_value: Some(v.try_into()?),
159            },
160        })
161    }
162}
163
164impl TryFrom<&WindowFrame> for protobuf::WindowFrame {
165    type Error = Error;
166
167    fn try_from(window: &WindowFrame) -> Result<Self, Self::Error> {
168        Ok(Self {
169            window_frame_units: protobuf::WindowFrameUnits::from(window.units).into(),
170            start_bound: Some((&window.start_bound).try_into()?),
171            end_bound: Some(protobuf::window_frame::EndBound::Bound(
172                (&window.end_bound).try_into()?,
173            )),
174        })
175    }
176}
177
178pub fn serialize_exprs<'a, I>(
179    exprs: I,
180    codec: &dyn LogicalExtensionCodec,
181) -> Result<Vec<protobuf::LogicalExprNode>, Error>
182where
183    I: IntoIterator<Item = &'a Expr>,
184{
185    exprs
186        .into_iter()
187        .map(|expr| serialize_expr(expr, codec))
188        .collect::<Result<Vec<_>, Error>>()
189}
190
191pub fn serialize_expr(
192    expr: &Expr,
193    codec: &dyn LogicalExtensionCodec,
194) -> Result<protobuf::LogicalExprNode, Error> {
195    use protobuf::logical_expr_node::ExprType;
196
197    let expr_node = match expr {
198        Expr::Column(c) => protobuf::LogicalExprNode {
199            expr_type: Some(ExprType::Column(c.into())),
200        },
201        Expr::Alias(Alias {
202            expr,
203            relation,
204            name,
205            metadata,
206        }) => {
207            let alias = Box::new(protobuf::AliasNode {
208                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
209                relation: relation
210                    .to_owned()
211                    .map(|r| vec![r.into()])
212                    .unwrap_or(vec![]),
213                alias: name.to_owned(),
214                metadata: metadata.to_owned().unwrap_or(HashMap::new()),
215            });
216            protobuf::LogicalExprNode {
217                expr_type: Some(ExprType::Alias(alias)),
218            }
219        }
220        Expr::Literal(value, _) => {
221            let pb_value: protobuf::ScalarValue = value.try_into()?;
222            protobuf::LogicalExprNode {
223                expr_type: Some(ExprType::Literal(pb_value)),
224            }
225        }
226        Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
227            // Try to linerize a nested binary expression tree of the same operator
228            // into a flat vector of expressions.
229            let mut exprs = vec![right.as_ref()];
230            let mut current_expr = left.as_ref();
231            while let Expr::BinaryExpr(BinaryExpr {
232                left,
233                op: current_op,
234                right,
235            }) = current_expr
236            {
237                if current_op == op {
238                    exprs.push(right.as_ref());
239                    current_expr = left.as_ref();
240                } else {
241                    break;
242                }
243            }
244            exprs.push(current_expr);
245
246            let binary_expr = protobuf::BinaryExprNode {
247                // We need to reverse exprs since operands are expected to be
248                // linearized from left innermost to right outermost (but while
249                // traversing the chain we do the exact opposite).
250                operands: serialize_exprs(exprs.into_iter().rev(), codec)?,
251                op: format!("{op:?}"),
252            };
253            protobuf::LogicalExprNode {
254                expr_type: Some(ExprType::BinaryExpr(binary_expr)),
255            }
256        }
257        Expr::Like(Like {
258            negated,
259            expr,
260            pattern,
261            escape_char,
262            case_insensitive,
263        }) => {
264            if *case_insensitive {
265                let pb = Box::new(protobuf::ILikeNode {
266                    negated: *negated,
267                    expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
268                    pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
269                    escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
270                });
271
272                protobuf::LogicalExprNode {
273                    expr_type: Some(ExprType::Ilike(pb)),
274                }
275            } else {
276                let pb = Box::new(protobuf::LikeNode {
277                    negated: *negated,
278                    expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
279                    pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
280                    escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
281                });
282
283                protobuf::LogicalExprNode {
284                    expr_type: Some(ExprType::Like(pb)),
285                }
286            }
287        }
288        Expr::SimilarTo(Like {
289            negated,
290            expr,
291            pattern,
292            escape_char,
293            case_insensitive: _,
294        }) => {
295            let pb = Box::new(protobuf::SimilarToNode {
296                negated: *negated,
297                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
298                pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
299                escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
300            });
301            protobuf::LogicalExprNode {
302                expr_type: Some(ExprType::SimilarTo(pb)),
303            }
304        }
305        Expr::WindowFunction(window_fun) => {
306            let expr::WindowFunction {
307                ref fun,
308                params:
309                    expr::WindowFunctionParams {
310                        ref args,
311                        ref partition_by,
312                        ref order_by,
313                        ref window_frame,
314                        // TODO: support null treatment in proto
315                        null_treatment: _,
316                    },
317            } = window_fun.as_ref();
318            let (window_function, fun_definition) = match fun {
319                WindowFunctionDefinition::AggregateUDF(aggr_udf) => {
320                    let mut buf = Vec::new();
321                    let _ = codec.try_encode_udaf(aggr_udf, &mut buf);
322                    (
323                        protobuf::window_expr_node::WindowFunction::Udaf(
324                            aggr_udf.name().to_string(),
325                        ),
326                        (!buf.is_empty()).then_some(buf),
327                    )
328                }
329                WindowFunctionDefinition::WindowUDF(window_udf) => {
330                    let mut buf = Vec::new();
331                    let _ = codec.try_encode_udwf(window_udf, &mut buf);
332                    (
333                        protobuf::window_expr_node::WindowFunction::Udwf(
334                            window_udf.name().to_string(),
335                        ),
336                        (!buf.is_empty()).then_some(buf),
337                    )
338                }
339            };
340            let partition_by = serialize_exprs(partition_by, codec)?;
341            let order_by = serialize_sorts(order_by, codec)?;
342
343            let window_frame: Option<protobuf::WindowFrame> =
344                Some(window_frame.try_into()?);
345            let window_expr = protobuf::WindowExprNode {
346                exprs: serialize_exprs(args, codec)?,
347                window_function: Some(window_function),
348                partition_by,
349                order_by,
350                window_frame,
351                fun_definition,
352            };
353            protobuf::LogicalExprNode {
354                expr_type: Some(ExprType::WindowExpr(window_expr)),
355            }
356        }
357        Expr::AggregateFunction(expr::AggregateFunction {
358            ref func,
359            params:
360                AggregateFunctionParams {
361                    ref args,
362                    ref distinct,
363                    ref filter,
364                    ref order_by,
365                    null_treatment: _,
366                },
367        }) => {
368            let mut buf = Vec::new();
369            let _ = codec.try_encode_udaf(func, &mut buf);
370            protobuf::LogicalExprNode {
371                expr_type: Some(ExprType::AggregateUdfExpr(Box::new(
372                    protobuf::AggregateUdfExprNode {
373                        fun_name: func.name().to_string(),
374                        args: serialize_exprs(args, codec)?,
375                        distinct: *distinct,
376                        filter: match filter {
377                            Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
378                            None => None,
379                        },
380                        order_by: match order_by {
381                            Some(e) => serialize_sorts(e, codec)?,
382                            None => vec![],
383                        },
384                        fun_definition: (!buf.is_empty()).then_some(buf),
385                    },
386                ))),
387            }
388        }
389
390        Expr::ScalarVariable(_, _) => {
391            return Err(Error::General(
392                "Proto serialization error: Scalar Variable not supported".to_string(),
393            ))
394        }
395        Expr::ScalarFunction(ScalarFunction { func, args }) => {
396            let mut buf = Vec::new();
397            let _ = codec.try_encode_udf(func, &mut buf);
398            protobuf::LogicalExprNode {
399                expr_type: Some(ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
400                    fun_name: func.name().to_string(),
401                    fun_definition: (!buf.is_empty()).then_some(buf),
402                    args: serialize_exprs(args, codec)?,
403                })),
404            }
405        }
406        Expr::Not(expr) => {
407            let expr = Box::new(protobuf::Not {
408                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
409            });
410            protobuf::LogicalExprNode {
411                expr_type: Some(ExprType::NotExpr(expr)),
412            }
413        }
414        Expr::IsNull(expr) => {
415            let expr = Box::new(protobuf::IsNull {
416                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
417            });
418            protobuf::LogicalExprNode {
419                expr_type: Some(ExprType::IsNullExpr(expr)),
420            }
421        }
422        Expr::IsNotNull(expr) => {
423            let expr = Box::new(protobuf::IsNotNull {
424                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
425            });
426            protobuf::LogicalExprNode {
427                expr_type: Some(ExprType::IsNotNullExpr(expr)),
428            }
429        }
430        Expr::IsTrue(expr) => {
431            let expr = Box::new(protobuf::IsTrue {
432                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
433            });
434            protobuf::LogicalExprNode {
435                expr_type: Some(ExprType::IsTrue(expr)),
436            }
437        }
438        Expr::IsFalse(expr) => {
439            let expr = Box::new(protobuf::IsFalse {
440                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
441            });
442            protobuf::LogicalExprNode {
443                expr_type: Some(ExprType::IsFalse(expr)),
444            }
445        }
446        Expr::IsUnknown(expr) => {
447            let expr = Box::new(protobuf::IsUnknown {
448                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
449            });
450            protobuf::LogicalExprNode {
451                expr_type: Some(ExprType::IsUnknown(expr)),
452            }
453        }
454        Expr::IsNotTrue(expr) => {
455            let expr = Box::new(protobuf::IsNotTrue {
456                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
457            });
458            protobuf::LogicalExprNode {
459                expr_type: Some(ExprType::IsNotTrue(expr)),
460            }
461        }
462        Expr::IsNotFalse(expr) => {
463            let expr = Box::new(protobuf::IsNotFalse {
464                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
465            });
466            protobuf::LogicalExprNode {
467                expr_type: Some(ExprType::IsNotFalse(expr)),
468            }
469        }
470        Expr::IsNotUnknown(expr) => {
471            let expr = Box::new(protobuf::IsNotUnknown {
472                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
473            });
474            protobuf::LogicalExprNode {
475                expr_type: Some(ExprType::IsNotUnknown(expr)),
476            }
477        }
478        Expr::Between(Between {
479            expr,
480            negated,
481            low,
482            high,
483        }) => {
484            let expr = Box::new(protobuf::BetweenNode {
485                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
486                negated: *negated,
487                low: Some(Box::new(serialize_expr(low.as_ref(), codec)?)),
488                high: Some(Box::new(serialize_expr(high.as_ref(), codec)?)),
489            });
490            protobuf::LogicalExprNode {
491                expr_type: Some(ExprType::Between(expr)),
492            }
493        }
494        Expr::Case(case) => {
495            let when_then_expr = case
496                .when_then_expr
497                .iter()
498                .map(|(w, t)| {
499                    Ok(protobuf::WhenThen {
500                        when_expr: Some(serialize_expr(w.as_ref(), codec)?),
501                        then_expr: Some(serialize_expr(t.as_ref(), codec)?),
502                    })
503                })
504                .collect::<Result<Vec<protobuf::WhenThen>, Error>>()?;
505            let expr = Box::new(protobuf::CaseNode {
506                expr: match &case.expr {
507                    Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
508                    None => None,
509                },
510                when_then_expr,
511                else_expr: match &case.else_expr {
512                    Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
513                    None => None,
514                },
515            });
516            protobuf::LogicalExprNode {
517                expr_type: Some(ExprType::Case(expr)),
518            }
519        }
520        Expr::Cast(Cast { expr, data_type }) => {
521            let expr = Box::new(protobuf::CastNode {
522                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
523                arrow_type: Some(data_type.try_into()?),
524            });
525            protobuf::LogicalExprNode {
526                expr_type: Some(ExprType::Cast(expr)),
527            }
528        }
529        Expr::TryCast(TryCast { expr, data_type }) => {
530            let expr = Box::new(protobuf::TryCastNode {
531                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
532                arrow_type: Some(data_type.try_into()?),
533            });
534            protobuf::LogicalExprNode {
535                expr_type: Some(ExprType::TryCast(expr)),
536            }
537        }
538        Expr::Negative(expr) => {
539            let expr = Box::new(protobuf::NegativeNode {
540                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
541            });
542            protobuf::LogicalExprNode {
543                expr_type: Some(ExprType::Negative(expr)),
544            }
545        }
546        Expr::Unnest(Unnest { expr }) => {
547            let expr = protobuf::Unnest {
548                exprs: vec![serialize_expr(expr.as_ref(), codec)?],
549            };
550            protobuf::LogicalExprNode {
551                expr_type: Some(ExprType::Unnest(expr)),
552            }
553        }
554        Expr::InList(InList {
555            expr,
556            list,
557            negated,
558        }) => {
559            let expr = Box::new(protobuf::InListNode {
560                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
561                list: serialize_exprs(list, codec)?,
562                negated: *negated,
563            });
564            protobuf::LogicalExprNode {
565                expr_type: Some(ExprType::InList(expr)),
566            }
567        }
568        #[expect(deprecated)]
569        Expr::Wildcard { qualifier, .. } => protobuf::LogicalExprNode {
570            expr_type: Some(ExprType::Wildcard(protobuf::Wildcard {
571                qualifier: qualifier.to_owned().map(|x| x.into()),
572            })),
573        },
574        Expr::ScalarSubquery(_)
575        | Expr::InSubquery(_)
576        | Expr::Exists { .. }
577        | Expr::OuterReferenceColumn { .. } => {
578            // we would need to add logical plan operators to datafusion.proto to support this
579            // see discussion in https://github.com/apache/datafusion/issues/2565
580            return Err(Error::General("Proto serialization error: Expr::ScalarSubquery(_) | Expr::InSubquery(_) | Expr::Exists { .. } | Exp:OuterReferenceColumn not supported".to_string()));
581        }
582        Expr::GroupingSet(GroupingSet::Cube(exprs)) => protobuf::LogicalExprNode {
583            expr_type: Some(ExprType::Cube(CubeNode {
584                expr: serialize_exprs(exprs, codec)?,
585            })),
586        },
587        Expr::GroupingSet(GroupingSet::Rollup(exprs)) => protobuf::LogicalExprNode {
588            expr_type: Some(ExprType::Rollup(RollupNode {
589                expr: serialize_exprs(exprs, codec)?,
590            })),
591        },
592        Expr::GroupingSet(GroupingSet::GroupingSets(exprs)) => {
593            protobuf::LogicalExprNode {
594                expr_type: Some(ExprType::GroupingSet(GroupingSetNode {
595                    expr: exprs
596                        .iter()
597                        .map(|expr_list| {
598                            Ok(LogicalExprList {
599                                expr: serialize_exprs(expr_list, codec)?,
600                            })
601                        })
602                        .collect::<Result<Vec<_>, Error>>()?,
603                })),
604            }
605        }
606        Expr::Placeholder(Placeholder { id, data_type }) => {
607            let data_type = match data_type {
608                Some(data_type) => Some(data_type.try_into()?),
609                None => None,
610            };
611            protobuf::LogicalExprNode {
612                expr_type: Some(ExprType::Placeholder(PlaceholderNode {
613                    id: id.clone(),
614                    data_type,
615                })),
616            }
617        }
618    };
619
620    Ok(expr_node)
621}
622
623pub fn serialize_sorts<'a, I>(
624    sorts: I,
625    codec: &dyn LogicalExtensionCodec,
626) -> Result<Vec<protobuf::SortExprNode>, Error>
627where
628    I: IntoIterator<Item = &'a SortExpr>,
629{
630    sorts
631        .into_iter()
632        .map(|sort| {
633            let SortExpr {
634                expr,
635                asc,
636                nulls_first,
637            } = sort;
638            Ok(protobuf::SortExprNode {
639                expr: Some(serialize_expr(expr, codec)?),
640                asc: *asc,
641                nulls_first: *nulls_first,
642            })
643        })
644        .collect::<Result<Vec<_>, Error>>()
645}
646
647impl From<TableReference> for protobuf::TableReference {
648    fn from(t: TableReference) -> Self {
649        use protobuf::table_reference::TableReferenceEnum;
650        let table_reference_enum = match t {
651            TableReference::Bare { table } => {
652                TableReferenceEnum::Bare(protobuf::BareTableReference {
653                    table: table.to_string(),
654                })
655            }
656            TableReference::Partial { schema, table } => {
657                TableReferenceEnum::Partial(protobuf::PartialTableReference {
658                    schema: schema.to_string(),
659                    table: table.to_string(),
660                })
661            }
662            TableReference::Full {
663                catalog,
664                schema,
665                table,
666            } => TableReferenceEnum::Full(protobuf::FullTableReference {
667                catalog: catalog.to_string(),
668                schema: schema.to_string(),
669                table: table.to_string(),
670            }),
671        };
672
673        protobuf::TableReference {
674            table_reference_enum: Some(table_reference_enum),
675        }
676    }
677}
678
679impl From<JoinType> for protobuf::JoinType {
680    fn from(t: JoinType) -> Self {
681        match t {
682            JoinType::Inner => protobuf::JoinType::Inner,
683            JoinType::Left => protobuf::JoinType::Left,
684            JoinType::Right => protobuf::JoinType::Right,
685            JoinType::Full => protobuf::JoinType::Full,
686            JoinType::LeftSemi => protobuf::JoinType::Leftsemi,
687            JoinType::RightSemi => protobuf::JoinType::Rightsemi,
688            JoinType::LeftAnti => protobuf::JoinType::Leftanti,
689            JoinType::RightAnti => protobuf::JoinType::Rightanti,
690            JoinType::LeftMark => protobuf::JoinType::Leftmark,
691        }
692    }
693}
694
695impl From<JoinConstraint> for protobuf::JoinConstraint {
696    fn from(t: JoinConstraint) -> Self {
697        match t {
698            JoinConstraint::On => protobuf::JoinConstraint::On,
699            JoinConstraint::Using => protobuf::JoinConstraint::Using,
700        }
701    }
702}
703
704impl From<&WriteOp> for protobuf::dml_node::Type {
705    fn from(t: &WriteOp) -> Self {
706        match t {
707            WriteOp::Insert(InsertOp::Append) => protobuf::dml_node::Type::InsertAppend,
708            WriteOp::Insert(InsertOp::Overwrite) => {
709                protobuf::dml_node::Type::InsertOverwrite
710            }
711            WriteOp::Insert(InsertOp::Replace) => protobuf::dml_node::Type::InsertReplace,
712            WriteOp::Delete => protobuf::dml_node::Type::Delete,
713            WriteOp::Update => protobuf::dml_node::Type::Update,
714            WriteOp::Ctas => protobuf::dml_node::Type::Ctas,
715        }
716    }
717}