datafusion_proto/logical_plan/
to_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Code to convert Arrow schemas and DataFusion logical plans to protocol buffer format, allowing
19//! DataFusion logical plans to be serialized and transmitted between
20//! processes.
21
22use std::collections::HashMap;
23
24use datafusion_common::{NullEquality, TableReference, UnnestOptions};
25use datafusion_expr::dml::InsertOp;
26use datafusion_expr::expr::{
27    self, AggregateFunctionParams, Alias, Between, BinaryExpr, Cast, GroupingSet, InList,
28    Like, Placeholder, ScalarFunction, Unnest,
29};
30use datafusion_expr::WriteOp;
31use datafusion_expr::{
32    logical_plan::PlanType, logical_plan::StringifiedPlan, Expr, JoinConstraint,
33    JoinType, SortExpr, TryCast, WindowFrame, WindowFrameBound, WindowFrameUnits,
34    WindowFunctionDefinition,
35};
36
37use crate::protobuf::RecursionUnnestOption;
38use crate::protobuf::{
39    self,
40    plan_type::PlanTypeEnum::{
41        AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
42        FinalPhysicalPlan, FinalPhysicalPlanWithSchema, FinalPhysicalPlanWithStats,
43        InitialLogicalPlan, InitialPhysicalPlan, InitialPhysicalPlanWithSchema,
44        InitialPhysicalPlanWithStats, OptimizedLogicalPlan, OptimizedPhysicalPlan,
45        PhysicalPlanError,
46    },
47    AnalyzedLogicalPlanType, CubeNode, EmptyMessage, GroupingSetNode, LogicalExprList,
48    OptimizedLogicalPlanType, OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
49    ToProtoError as Error,
50};
51
52use super::LogicalExtensionCodec;
53
54impl From<&UnnestOptions> for protobuf::UnnestOptions {
55    fn from(opts: &UnnestOptions) -> Self {
56        Self {
57            preserve_nulls: opts.preserve_nulls,
58            recursions: opts
59                .recursions
60                .iter()
61                .map(|r| RecursionUnnestOption {
62                    input_column: Some((&r.input_column).into()),
63                    output_column: Some((&r.output_column).into()),
64                    depth: r.depth as u32,
65                })
66                .collect(),
67        }
68    }
69}
70
71impl From<&StringifiedPlan> for protobuf::StringifiedPlan {
72    fn from(stringified_plan: &StringifiedPlan) -> Self {
73        Self {
74            plan_type: match stringified_plan.clone().plan_type {
75                PlanType::InitialLogicalPlan => Some(protobuf::PlanType {
76                    plan_type_enum: Some(InitialLogicalPlan(EmptyMessage {})),
77                }),
78                PlanType::AnalyzedLogicalPlan { analyzer_name } => {
79                    Some(protobuf::PlanType {
80                        plan_type_enum: Some(AnalyzedLogicalPlan(
81                            AnalyzedLogicalPlanType { analyzer_name },
82                        )),
83                    })
84                }
85                PlanType::FinalAnalyzedLogicalPlan => Some(protobuf::PlanType {
86                    plan_type_enum: Some(FinalAnalyzedLogicalPlan(EmptyMessage {})),
87                }),
88                PlanType::OptimizedLogicalPlan { optimizer_name } => {
89                    Some(protobuf::PlanType {
90                        plan_type_enum: Some(OptimizedLogicalPlan(
91                            OptimizedLogicalPlanType { optimizer_name },
92                        )),
93                    })
94                }
95                PlanType::FinalLogicalPlan => Some(protobuf::PlanType {
96                    plan_type_enum: Some(FinalLogicalPlan(EmptyMessage {})),
97                }),
98                PlanType::InitialPhysicalPlan => Some(protobuf::PlanType {
99                    plan_type_enum: Some(InitialPhysicalPlan(EmptyMessage {})),
100                }),
101                PlanType::OptimizedPhysicalPlan { optimizer_name } => {
102                    Some(protobuf::PlanType {
103                        plan_type_enum: Some(OptimizedPhysicalPlan(
104                            OptimizedPhysicalPlanType { optimizer_name },
105                        )),
106                    })
107                }
108                PlanType::FinalPhysicalPlan => Some(protobuf::PlanType {
109                    plan_type_enum: Some(FinalPhysicalPlan(EmptyMessage {})),
110                }),
111                PlanType::InitialPhysicalPlanWithStats => Some(protobuf::PlanType {
112                    plan_type_enum: Some(InitialPhysicalPlanWithStats(EmptyMessage {})),
113                }),
114                PlanType::InitialPhysicalPlanWithSchema => Some(protobuf::PlanType {
115                    plan_type_enum: Some(InitialPhysicalPlanWithSchema(EmptyMessage {})),
116                }),
117                PlanType::FinalPhysicalPlanWithStats => Some(protobuf::PlanType {
118                    plan_type_enum: Some(FinalPhysicalPlanWithStats(EmptyMessage {})),
119                }),
120                PlanType::FinalPhysicalPlanWithSchema => Some(protobuf::PlanType {
121                    plan_type_enum: Some(FinalPhysicalPlanWithSchema(EmptyMessage {})),
122                }),
123                PlanType::PhysicalPlanError => Some(protobuf::PlanType {
124                    plan_type_enum: Some(PhysicalPlanError(EmptyMessage {})),
125                }),
126            },
127            plan: stringified_plan.plan.to_string(),
128        }
129    }
130}
131
132impl From<WindowFrameUnits> for protobuf::WindowFrameUnits {
133    fn from(units: WindowFrameUnits) -> Self {
134        match units {
135            WindowFrameUnits::Rows => Self::Rows,
136            WindowFrameUnits::Range => Self::Range,
137            WindowFrameUnits::Groups => Self::Groups,
138        }
139    }
140}
141
142impl TryFrom<&WindowFrameBound> for protobuf::WindowFrameBound {
143    type Error = Error;
144
145    fn try_from(bound: &WindowFrameBound) -> Result<Self, Self::Error> {
146        Ok(match bound {
147            WindowFrameBound::CurrentRow => Self {
148                window_frame_bound_type: protobuf::WindowFrameBoundType::CurrentRow
149                    .into(),
150                bound_value: None,
151            },
152            WindowFrameBound::Preceding(v) => Self {
153                window_frame_bound_type: protobuf::WindowFrameBoundType::Preceding.into(),
154                bound_value: Some(v.try_into()?),
155            },
156            WindowFrameBound::Following(v) => Self {
157                window_frame_bound_type: protobuf::WindowFrameBoundType::Following.into(),
158                bound_value: Some(v.try_into()?),
159            },
160        })
161    }
162}
163
164impl TryFrom<&WindowFrame> for protobuf::WindowFrame {
165    type Error = Error;
166
167    fn try_from(window: &WindowFrame) -> Result<Self, Self::Error> {
168        Ok(Self {
169            window_frame_units: protobuf::WindowFrameUnits::from(window.units).into(),
170            start_bound: Some((&window.start_bound).try_into()?),
171            end_bound: Some(protobuf::window_frame::EndBound::Bound(
172                (&window.end_bound).try_into()?,
173            )),
174        })
175    }
176}
177
178pub fn serialize_exprs<'a, I>(
179    exprs: I,
180    codec: &dyn LogicalExtensionCodec,
181) -> Result<Vec<protobuf::LogicalExprNode>, Error>
182where
183    I: IntoIterator<Item = &'a Expr>,
184{
185    exprs
186        .into_iter()
187        .map(|expr| serialize_expr(expr, codec))
188        .collect::<Result<Vec<_>, Error>>()
189}
190
191pub fn serialize_expr(
192    expr: &Expr,
193    codec: &dyn LogicalExtensionCodec,
194) -> Result<protobuf::LogicalExprNode, Error> {
195    use protobuf::logical_expr_node::ExprType;
196
197    let expr_node = match expr {
198        Expr::Column(c) => protobuf::LogicalExprNode {
199            expr_type: Some(ExprType::Column(c.into())),
200        },
201        Expr::Alias(Alias {
202            expr,
203            relation,
204            name,
205            metadata,
206        }) => {
207            let alias = Box::new(protobuf::AliasNode {
208                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
209                relation: relation
210                    .to_owned()
211                    .map(|r| vec![r.into()])
212                    .unwrap_or(vec![]),
213                alias: name.to_owned(),
214                metadata: metadata
215                    .as_ref()
216                    .map(|m| m.to_hashmap())
217                    .unwrap_or(HashMap::new()),
218            });
219            protobuf::LogicalExprNode {
220                expr_type: Some(ExprType::Alias(alias)),
221            }
222        }
223        Expr::Literal(value, _) => {
224            let pb_value: protobuf::ScalarValue = value.try_into()?;
225            protobuf::LogicalExprNode {
226                expr_type: Some(ExprType::Literal(pb_value)),
227            }
228        }
229        Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
230            // Try to linerize a nested binary expression tree of the same operator
231            // into a flat vector of expressions.
232            let mut exprs = vec![right.as_ref()];
233            let mut current_expr = left.as_ref();
234            while let Expr::BinaryExpr(BinaryExpr {
235                left,
236                op: current_op,
237                right,
238            }) = current_expr
239            {
240                if current_op == op {
241                    exprs.push(right.as_ref());
242                    current_expr = left.as_ref();
243                } else {
244                    break;
245                }
246            }
247            exprs.push(current_expr);
248
249            let binary_expr = protobuf::BinaryExprNode {
250                // We need to reverse exprs since operands are expected to be
251                // linearized from left innermost to right outermost (but while
252                // traversing the chain we do the exact opposite).
253                operands: serialize_exprs(exprs.into_iter().rev(), codec)?,
254                op: format!("{op:?}"),
255            };
256            protobuf::LogicalExprNode {
257                expr_type: Some(ExprType::BinaryExpr(binary_expr)),
258            }
259        }
260        Expr::Like(Like {
261            negated,
262            expr,
263            pattern,
264            escape_char,
265            case_insensitive,
266        }) => {
267            if *case_insensitive {
268                let pb = Box::new(protobuf::ILikeNode {
269                    negated: *negated,
270                    expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
271                    pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
272                    escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
273                });
274
275                protobuf::LogicalExprNode {
276                    expr_type: Some(ExprType::Ilike(pb)),
277                }
278            } else {
279                let pb = Box::new(protobuf::LikeNode {
280                    negated: *negated,
281                    expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
282                    pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
283                    escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
284                });
285
286                protobuf::LogicalExprNode {
287                    expr_type: Some(ExprType::Like(pb)),
288                }
289            }
290        }
291        Expr::SimilarTo(Like {
292            negated,
293            expr,
294            pattern,
295            escape_char,
296            case_insensitive: _,
297        }) => {
298            let pb = Box::new(protobuf::SimilarToNode {
299                negated: *negated,
300                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
301                pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
302                escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
303            });
304            protobuf::LogicalExprNode {
305                expr_type: Some(ExprType::SimilarTo(pb)),
306            }
307        }
308        Expr::WindowFunction(window_fun) => {
309            let expr::WindowFunction {
310                ref fun,
311                params:
312                    expr::WindowFunctionParams {
313                        ref args,
314                        ref partition_by,
315                        ref order_by,
316                        ref window_frame,
317                        // TODO: support null treatment, distinct, and filter in proto.
318                        // See https://github.com/apache/datafusion/issues/17417
319                        null_treatment: _,
320                        distinct: _,
321                        filter: _,
322                    },
323            } = window_fun.as_ref();
324            let mut buf = Vec::new();
325            let window_function = match fun {
326                WindowFunctionDefinition::AggregateUDF(aggr_udf) => {
327                    let _ = codec.try_encode_udaf(aggr_udf, &mut buf);
328                    protobuf::window_expr_node::WindowFunction::Udaf(
329                        aggr_udf.name().to_string(),
330                    )
331                }
332                WindowFunctionDefinition::WindowUDF(window_udf) => {
333                    let _ = codec.try_encode_udwf(window_udf, &mut buf);
334                    protobuf::window_expr_node::WindowFunction::Udwf(
335                        window_udf.name().to_string(),
336                    )
337                }
338            };
339            let fun_definition = (!buf.is_empty()).then_some(buf);
340            let partition_by = serialize_exprs(partition_by, codec)?;
341            let order_by = serialize_sorts(order_by, codec)?;
342
343            let window_frame: Option<protobuf::WindowFrame> =
344                Some(window_frame.try_into()?);
345            let window_expr = protobuf::WindowExprNode {
346                exprs: serialize_exprs(args, codec)?,
347                window_function: Some(window_function),
348                partition_by,
349                order_by,
350                window_frame,
351                fun_definition,
352            };
353            protobuf::LogicalExprNode {
354                expr_type: Some(ExprType::WindowExpr(window_expr)),
355            }
356        }
357        Expr::AggregateFunction(expr::AggregateFunction {
358            ref func,
359            params:
360                AggregateFunctionParams {
361                    ref args,
362                    ref distinct,
363                    ref filter,
364                    ref order_by,
365                    null_treatment: _,
366                },
367        }) => {
368            let mut buf = Vec::new();
369            let _ = codec.try_encode_udaf(func, &mut buf);
370            protobuf::LogicalExprNode {
371                expr_type: Some(ExprType::AggregateUdfExpr(Box::new(
372                    protobuf::AggregateUdfExprNode {
373                        fun_name: func.name().to_string(),
374                        args: serialize_exprs(args, codec)?,
375                        distinct: *distinct,
376                        filter: match filter {
377                            Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
378                            None => None,
379                        },
380                        order_by: serialize_sorts(order_by, codec)?,
381                        fun_definition: (!buf.is_empty()).then_some(buf),
382                    },
383                ))),
384            }
385        }
386
387        Expr::ScalarVariable(_, _) => {
388            return Err(Error::General(
389                "Proto serialization error: Scalar Variable not supported".to_string(),
390            ))
391        }
392        Expr::ScalarFunction(ScalarFunction { func, args }) => {
393            let mut buf = Vec::new();
394            let _ = codec.try_encode_udf(func, &mut buf);
395            protobuf::LogicalExprNode {
396                expr_type: Some(ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
397                    fun_name: func.name().to_string(),
398                    fun_definition: (!buf.is_empty()).then_some(buf),
399                    args: serialize_exprs(args, codec)?,
400                })),
401            }
402        }
403        Expr::Not(expr) => {
404            let expr = Box::new(protobuf::Not {
405                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
406            });
407            protobuf::LogicalExprNode {
408                expr_type: Some(ExprType::NotExpr(expr)),
409            }
410        }
411        Expr::IsNull(expr) => {
412            let expr = Box::new(protobuf::IsNull {
413                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
414            });
415            protobuf::LogicalExprNode {
416                expr_type: Some(ExprType::IsNullExpr(expr)),
417            }
418        }
419        Expr::IsNotNull(expr) => {
420            let expr = Box::new(protobuf::IsNotNull {
421                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
422            });
423            protobuf::LogicalExprNode {
424                expr_type: Some(ExprType::IsNotNullExpr(expr)),
425            }
426        }
427        Expr::IsTrue(expr) => {
428            let expr = Box::new(protobuf::IsTrue {
429                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
430            });
431            protobuf::LogicalExprNode {
432                expr_type: Some(ExprType::IsTrue(expr)),
433            }
434        }
435        Expr::IsFalse(expr) => {
436            let expr = Box::new(protobuf::IsFalse {
437                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
438            });
439            protobuf::LogicalExprNode {
440                expr_type: Some(ExprType::IsFalse(expr)),
441            }
442        }
443        Expr::IsUnknown(expr) => {
444            let expr = Box::new(protobuf::IsUnknown {
445                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
446            });
447            protobuf::LogicalExprNode {
448                expr_type: Some(ExprType::IsUnknown(expr)),
449            }
450        }
451        Expr::IsNotTrue(expr) => {
452            let expr = Box::new(protobuf::IsNotTrue {
453                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
454            });
455            protobuf::LogicalExprNode {
456                expr_type: Some(ExprType::IsNotTrue(expr)),
457            }
458        }
459        Expr::IsNotFalse(expr) => {
460            let expr = Box::new(protobuf::IsNotFalse {
461                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
462            });
463            protobuf::LogicalExprNode {
464                expr_type: Some(ExprType::IsNotFalse(expr)),
465            }
466        }
467        Expr::IsNotUnknown(expr) => {
468            let expr = Box::new(protobuf::IsNotUnknown {
469                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
470            });
471            protobuf::LogicalExprNode {
472                expr_type: Some(ExprType::IsNotUnknown(expr)),
473            }
474        }
475        Expr::Between(Between {
476            expr,
477            negated,
478            low,
479            high,
480        }) => {
481            let expr = Box::new(protobuf::BetweenNode {
482                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
483                negated: *negated,
484                low: Some(Box::new(serialize_expr(low.as_ref(), codec)?)),
485                high: Some(Box::new(serialize_expr(high.as_ref(), codec)?)),
486            });
487            protobuf::LogicalExprNode {
488                expr_type: Some(ExprType::Between(expr)),
489            }
490        }
491        Expr::Case(case) => {
492            let when_then_expr = case
493                .when_then_expr
494                .iter()
495                .map(|(w, t)| {
496                    Ok(protobuf::WhenThen {
497                        when_expr: Some(serialize_expr(w.as_ref(), codec)?),
498                        then_expr: Some(serialize_expr(t.as_ref(), codec)?),
499                    })
500                })
501                .collect::<Result<Vec<protobuf::WhenThen>, Error>>()?;
502            let expr = Box::new(protobuf::CaseNode {
503                expr: match &case.expr {
504                    Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
505                    None => None,
506                },
507                when_then_expr,
508                else_expr: match &case.else_expr {
509                    Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
510                    None => None,
511                },
512            });
513            protobuf::LogicalExprNode {
514                expr_type: Some(ExprType::Case(expr)),
515            }
516        }
517        Expr::Cast(Cast { expr, data_type }) => {
518            let expr = Box::new(protobuf::CastNode {
519                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
520                arrow_type: Some(data_type.try_into()?),
521            });
522            protobuf::LogicalExprNode {
523                expr_type: Some(ExprType::Cast(expr)),
524            }
525        }
526        Expr::TryCast(TryCast { expr, data_type }) => {
527            let expr = Box::new(protobuf::TryCastNode {
528                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
529                arrow_type: Some(data_type.try_into()?),
530            });
531            protobuf::LogicalExprNode {
532                expr_type: Some(ExprType::TryCast(expr)),
533            }
534        }
535        Expr::Negative(expr) => {
536            let expr = Box::new(protobuf::NegativeNode {
537                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
538            });
539            protobuf::LogicalExprNode {
540                expr_type: Some(ExprType::Negative(expr)),
541            }
542        }
543        Expr::Unnest(Unnest { expr }) => {
544            let expr = protobuf::Unnest {
545                exprs: vec![serialize_expr(expr.as_ref(), codec)?],
546            };
547            protobuf::LogicalExprNode {
548                expr_type: Some(ExprType::Unnest(expr)),
549            }
550        }
551        Expr::InList(InList {
552            expr,
553            list,
554            negated,
555        }) => {
556            let expr = Box::new(protobuf::InListNode {
557                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
558                list: serialize_exprs(list, codec)?,
559                negated: *negated,
560            });
561            protobuf::LogicalExprNode {
562                expr_type: Some(ExprType::InList(expr)),
563            }
564        }
565        #[expect(deprecated)]
566        Expr::Wildcard { qualifier, .. } => protobuf::LogicalExprNode {
567            expr_type: Some(ExprType::Wildcard(protobuf::Wildcard {
568                qualifier: qualifier.to_owned().map(|x| x.into()),
569            })),
570        },
571        Expr::ScalarSubquery(_)
572        | Expr::InSubquery(_)
573        | Expr::Exists { .. }
574        | Expr::OuterReferenceColumn { .. } => {
575            // we would need to add logical plan operators to datafusion.proto to support this
576            // see discussion in https://github.com/apache/datafusion/issues/2565
577            return Err(Error::General("Proto serialization error: Expr::ScalarSubquery(_) | Expr::InSubquery(_) | Expr::Exists { .. } | Exp:OuterReferenceColumn not supported".to_string()));
578        }
579        Expr::GroupingSet(GroupingSet::Cube(exprs)) => protobuf::LogicalExprNode {
580            expr_type: Some(ExprType::Cube(CubeNode {
581                expr: serialize_exprs(exprs, codec)?,
582            })),
583        },
584        Expr::GroupingSet(GroupingSet::Rollup(exprs)) => protobuf::LogicalExprNode {
585            expr_type: Some(ExprType::Rollup(RollupNode {
586                expr: serialize_exprs(exprs, codec)?,
587            })),
588        },
589        Expr::GroupingSet(GroupingSet::GroupingSets(exprs)) => {
590            protobuf::LogicalExprNode {
591                expr_type: Some(ExprType::GroupingSet(GroupingSetNode {
592                    expr: exprs
593                        .iter()
594                        .map(|expr_list| {
595                            Ok(LogicalExprList {
596                                expr: serialize_exprs(expr_list, codec)?,
597                            })
598                        })
599                        .collect::<Result<Vec<_>, Error>>()?,
600                })),
601            }
602        }
603        Expr::Placeholder(Placeholder { id, data_type }) => {
604            let data_type = match data_type {
605                Some(data_type) => Some(data_type.try_into()?),
606                None => None,
607            };
608            protobuf::LogicalExprNode {
609                expr_type: Some(ExprType::Placeholder(PlaceholderNode {
610                    id: id.clone(),
611                    data_type,
612                })),
613            }
614        }
615    };
616
617    Ok(expr_node)
618}
619
620pub fn serialize_sorts<'a, I>(
621    sorts: I,
622    codec: &dyn LogicalExtensionCodec,
623) -> Result<Vec<protobuf::SortExprNode>, Error>
624where
625    I: IntoIterator<Item = &'a SortExpr>,
626{
627    sorts
628        .into_iter()
629        .map(|sort| {
630            let SortExpr {
631                expr,
632                asc,
633                nulls_first,
634            } = sort;
635            Ok(protobuf::SortExprNode {
636                expr: Some(serialize_expr(expr, codec)?),
637                asc: *asc,
638                nulls_first: *nulls_first,
639            })
640        })
641        .collect::<Result<Vec<_>, Error>>()
642}
643
644impl From<TableReference> for protobuf::TableReference {
645    fn from(t: TableReference) -> Self {
646        use protobuf::table_reference::TableReferenceEnum;
647        let table_reference_enum = match t {
648            TableReference::Bare { table } => {
649                TableReferenceEnum::Bare(protobuf::BareTableReference {
650                    table: table.to_string(),
651                })
652            }
653            TableReference::Partial { schema, table } => {
654                TableReferenceEnum::Partial(protobuf::PartialTableReference {
655                    schema: schema.to_string(),
656                    table: table.to_string(),
657                })
658            }
659            TableReference::Full {
660                catalog,
661                schema,
662                table,
663            } => TableReferenceEnum::Full(protobuf::FullTableReference {
664                catalog: catalog.to_string(),
665                schema: schema.to_string(),
666                table: table.to_string(),
667            }),
668        };
669
670        protobuf::TableReference {
671            table_reference_enum: Some(table_reference_enum),
672        }
673    }
674}
675
676impl From<JoinType> for protobuf::JoinType {
677    fn from(t: JoinType) -> Self {
678        match t {
679            JoinType::Inner => protobuf::JoinType::Inner,
680            JoinType::Left => protobuf::JoinType::Left,
681            JoinType::Right => protobuf::JoinType::Right,
682            JoinType::Full => protobuf::JoinType::Full,
683            JoinType::LeftSemi => protobuf::JoinType::Leftsemi,
684            JoinType::RightSemi => protobuf::JoinType::Rightsemi,
685            JoinType::LeftAnti => protobuf::JoinType::Leftanti,
686            JoinType::RightAnti => protobuf::JoinType::Rightanti,
687            JoinType::LeftMark => protobuf::JoinType::Leftmark,
688            JoinType::RightMark => protobuf::JoinType::Rightmark,
689        }
690    }
691}
692
693impl From<JoinConstraint> for protobuf::JoinConstraint {
694    fn from(t: JoinConstraint) -> Self {
695        match t {
696            JoinConstraint::On => protobuf::JoinConstraint::On,
697            JoinConstraint::Using => protobuf::JoinConstraint::Using,
698        }
699    }
700}
701
702impl From<NullEquality> for protobuf::NullEquality {
703    fn from(t: NullEquality) -> Self {
704        match t {
705            NullEquality::NullEqualsNothing => protobuf::NullEquality::NullEqualsNothing,
706            NullEquality::NullEqualsNull => protobuf::NullEquality::NullEqualsNull,
707        }
708    }
709}
710
711impl From<&WriteOp> for protobuf::dml_node::Type {
712    fn from(t: &WriteOp) -> Self {
713        match t {
714            WriteOp::Insert(InsertOp::Append) => protobuf::dml_node::Type::InsertAppend,
715            WriteOp::Insert(InsertOp::Overwrite) => {
716                protobuf::dml_node::Type::InsertOverwrite
717            }
718            WriteOp::Insert(InsertOp::Replace) => protobuf::dml_node::Type::InsertReplace,
719            WriteOp::Delete => protobuf::dml_node::Type::Delete,
720            WriteOp::Update => protobuf::dml_node::Type::Update,
721            WriteOp::Ctas => protobuf::dml_node::Type::Ctas,
722        }
723    }
724}