datafusion_proto/logical_plan/
to_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Code to convert Arrow schemas and DataFusion logical plans to protocol buffer format, allowing
19//! DataFusion logical plans to be serialized and transmitted between
20//! processes.
21
22use std::collections::HashMap;
23
24use datafusion_common::{NullEquality, TableReference, UnnestOptions};
25use datafusion_expr::dml::InsertOp;
26use datafusion_expr::expr::{
27    self, AggregateFunctionParams, Alias, Between, BinaryExpr, Cast, GroupingSet, InList,
28    Like, Placeholder, ScalarFunction, Unnest,
29};
30use datafusion_expr::WriteOp;
31use datafusion_expr::{
32    logical_plan::PlanType, logical_plan::StringifiedPlan, Expr, JoinConstraint,
33    JoinType, SortExpr, TryCast, WindowFrame, WindowFrameBound, WindowFrameUnits,
34    WindowFunctionDefinition,
35};
36
37use crate::protobuf::RecursionUnnestOption;
38use crate::protobuf::{
39    self,
40    plan_type::PlanTypeEnum::{
41        AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
42        FinalPhysicalPlan, FinalPhysicalPlanWithSchema, FinalPhysicalPlanWithStats,
43        InitialLogicalPlan, InitialPhysicalPlan, InitialPhysicalPlanWithSchema,
44        InitialPhysicalPlanWithStats, OptimizedLogicalPlan, OptimizedPhysicalPlan,
45        PhysicalPlanError,
46    },
47    AnalyzedLogicalPlanType, CubeNode, EmptyMessage, GroupingSetNode, LogicalExprList,
48    OptimizedLogicalPlanType, OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
49    ToProtoError as Error,
50};
51
52use super::LogicalExtensionCodec;
53
54impl From<&UnnestOptions> for protobuf::UnnestOptions {
55    fn from(opts: &UnnestOptions) -> Self {
56        Self {
57            preserve_nulls: opts.preserve_nulls,
58            recursions: opts
59                .recursions
60                .iter()
61                .map(|r| RecursionUnnestOption {
62                    input_column: Some((&r.input_column).into()),
63                    output_column: Some((&r.output_column).into()),
64                    depth: r.depth as u32,
65                })
66                .collect(),
67        }
68    }
69}
70
71impl From<&StringifiedPlan> for protobuf::StringifiedPlan {
72    fn from(stringified_plan: &StringifiedPlan) -> Self {
73        Self {
74            plan_type: match stringified_plan.clone().plan_type {
75                PlanType::InitialLogicalPlan => Some(protobuf::PlanType {
76                    plan_type_enum: Some(InitialLogicalPlan(EmptyMessage {})),
77                }),
78                PlanType::AnalyzedLogicalPlan { analyzer_name } => {
79                    Some(protobuf::PlanType {
80                        plan_type_enum: Some(AnalyzedLogicalPlan(
81                            AnalyzedLogicalPlanType { analyzer_name },
82                        )),
83                    })
84                }
85                PlanType::FinalAnalyzedLogicalPlan => Some(protobuf::PlanType {
86                    plan_type_enum: Some(FinalAnalyzedLogicalPlan(EmptyMessage {})),
87                }),
88                PlanType::OptimizedLogicalPlan { optimizer_name } => {
89                    Some(protobuf::PlanType {
90                        plan_type_enum: Some(OptimizedLogicalPlan(
91                            OptimizedLogicalPlanType { optimizer_name },
92                        )),
93                    })
94                }
95                PlanType::FinalLogicalPlan => Some(protobuf::PlanType {
96                    plan_type_enum: Some(FinalLogicalPlan(EmptyMessage {})),
97                }),
98                PlanType::InitialPhysicalPlan => Some(protobuf::PlanType {
99                    plan_type_enum: Some(InitialPhysicalPlan(EmptyMessage {})),
100                }),
101                PlanType::OptimizedPhysicalPlan { optimizer_name } => {
102                    Some(protobuf::PlanType {
103                        plan_type_enum: Some(OptimizedPhysicalPlan(
104                            OptimizedPhysicalPlanType { optimizer_name },
105                        )),
106                    })
107                }
108                PlanType::FinalPhysicalPlan => Some(protobuf::PlanType {
109                    plan_type_enum: Some(FinalPhysicalPlan(EmptyMessage {})),
110                }),
111                PlanType::InitialPhysicalPlanWithStats => Some(protobuf::PlanType {
112                    plan_type_enum: Some(InitialPhysicalPlanWithStats(EmptyMessage {})),
113                }),
114                PlanType::InitialPhysicalPlanWithSchema => Some(protobuf::PlanType {
115                    plan_type_enum: Some(InitialPhysicalPlanWithSchema(EmptyMessage {})),
116                }),
117                PlanType::FinalPhysicalPlanWithStats => Some(protobuf::PlanType {
118                    plan_type_enum: Some(FinalPhysicalPlanWithStats(EmptyMessage {})),
119                }),
120                PlanType::FinalPhysicalPlanWithSchema => Some(protobuf::PlanType {
121                    plan_type_enum: Some(FinalPhysicalPlanWithSchema(EmptyMessage {})),
122                }),
123                PlanType::PhysicalPlanError => Some(protobuf::PlanType {
124                    plan_type_enum: Some(PhysicalPlanError(EmptyMessage {})),
125                }),
126            },
127            plan: stringified_plan.plan.to_string(),
128        }
129    }
130}
131
132impl From<WindowFrameUnits> for protobuf::WindowFrameUnits {
133    fn from(units: WindowFrameUnits) -> Self {
134        match units {
135            WindowFrameUnits::Rows => Self::Rows,
136            WindowFrameUnits::Range => Self::Range,
137            WindowFrameUnits::Groups => Self::Groups,
138        }
139    }
140}
141
142impl TryFrom<&WindowFrameBound> for protobuf::WindowFrameBound {
143    type Error = Error;
144
145    fn try_from(bound: &WindowFrameBound) -> Result<Self, Self::Error> {
146        Ok(match bound {
147            WindowFrameBound::CurrentRow => Self {
148                window_frame_bound_type: protobuf::WindowFrameBoundType::CurrentRow
149                    .into(),
150                bound_value: None,
151            },
152            WindowFrameBound::Preceding(v) => Self {
153                window_frame_bound_type: protobuf::WindowFrameBoundType::Preceding.into(),
154                bound_value: Some(v.try_into()?),
155            },
156            WindowFrameBound::Following(v) => Self {
157                window_frame_bound_type: protobuf::WindowFrameBoundType::Following.into(),
158                bound_value: Some(v.try_into()?),
159            },
160        })
161    }
162}
163
164impl TryFrom<&WindowFrame> for protobuf::WindowFrame {
165    type Error = Error;
166
167    fn try_from(window: &WindowFrame) -> Result<Self, Self::Error> {
168        Ok(Self {
169            window_frame_units: protobuf::WindowFrameUnits::from(window.units).into(),
170            start_bound: Some((&window.start_bound).try_into()?),
171            end_bound: Some(protobuf::window_frame::EndBound::Bound(
172                (&window.end_bound).try_into()?,
173            )),
174        })
175    }
176}
177
178pub fn serialize_exprs<'a, I>(
179    exprs: I,
180    codec: &dyn LogicalExtensionCodec,
181) -> Result<Vec<protobuf::LogicalExprNode>, Error>
182where
183    I: IntoIterator<Item = &'a Expr>,
184{
185    exprs
186        .into_iter()
187        .map(|expr| serialize_expr(expr, codec))
188        .collect::<Result<Vec<_>, Error>>()
189}
190
191pub fn serialize_expr(
192    expr: &Expr,
193    codec: &dyn LogicalExtensionCodec,
194) -> Result<protobuf::LogicalExprNode, Error> {
195    use protobuf::logical_expr_node::ExprType;
196
197    let expr_node = match expr {
198        Expr::Column(c) => protobuf::LogicalExprNode {
199            expr_type: Some(ExprType::Column(c.into())),
200        },
201        Expr::Alias(Alias {
202            expr,
203            relation,
204            name,
205            metadata,
206        }) => {
207            let alias = Box::new(protobuf::AliasNode {
208                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
209                relation: relation
210                    .to_owned()
211                    .map(|r| vec![r.into()])
212                    .unwrap_or(vec![]),
213                alias: name.to_owned(),
214                metadata: metadata
215                    .as_ref()
216                    .map(|m| m.to_hashmap())
217                    .unwrap_or(HashMap::new()),
218            });
219            protobuf::LogicalExprNode {
220                expr_type: Some(ExprType::Alias(alias)),
221            }
222        }
223        Expr::Literal(value, _) => {
224            let pb_value: protobuf::ScalarValue = value.try_into()?;
225            protobuf::LogicalExprNode {
226                expr_type: Some(ExprType::Literal(pb_value)),
227            }
228        }
229        Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
230            // Try to linerize a nested binary expression tree of the same operator
231            // into a flat vector of expressions.
232            let mut exprs = vec![right.as_ref()];
233            let mut current_expr = left.as_ref();
234            while let Expr::BinaryExpr(BinaryExpr {
235                left,
236                op: current_op,
237                right,
238            }) = current_expr
239            {
240                if current_op == op {
241                    exprs.push(right.as_ref());
242                    current_expr = left.as_ref();
243                } else {
244                    break;
245                }
246            }
247            exprs.push(current_expr);
248
249            let binary_expr = protobuf::BinaryExprNode {
250                // We need to reverse exprs since operands are expected to be
251                // linearized from left innermost to right outermost (but while
252                // traversing the chain we do the exact opposite).
253                operands: serialize_exprs(exprs.into_iter().rev(), codec)?,
254                op: format!("{op:?}"),
255            };
256            protobuf::LogicalExprNode {
257                expr_type: Some(ExprType::BinaryExpr(binary_expr)),
258            }
259        }
260        Expr::Like(Like {
261            negated,
262            expr,
263            pattern,
264            escape_char,
265            case_insensitive,
266        }) => {
267            if *case_insensitive {
268                let pb = Box::new(protobuf::ILikeNode {
269                    negated: *negated,
270                    expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
271                    pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
272                    escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
273                });
274
275                protobuf::LogicalExprNode {
276                    expr_type: Some(ExprType::Ilike(pb)),
277                }
278            } else {
279                let pb = Box::new(protobuf::LikeNode {
280                    negated: *negated,
281                    expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
282                    pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
283                    escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
284                });
285
286                protobuf::LogicalExprNode {
287                    expr_type: Some(ExprType::Like(pb)),
288                }
289            }
290        }
291        Expr::SimilarTo(Like {
292            negated,
293            expr,
294            pattern,
295            escape_char,
296            case_insensitive: _,
297        }) => {
298            let pb = Box::new(protobuf::SimilarToNode {
299                negated: *negated,
300                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
301                pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
302                escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
303            });
304            protobuf::LogicalExprNode {
305                expr_type: Some(ExprType::SimilarTo(pb)),
306            }
307        }
308        Expr::WindowFunction(window_fun) => {
309            let expr::WindowFunction {
310                ref fun,
311                params:
312                    expr::WindowFunctionParams {
313                        ref args,
314                        ref partition_by,
315                        ref order_by,
316                        ref window_frame,
317                        // TODO: support null treatment in proto
318                        null_treatment: _,
319                    },
320            } = window_fun.as_ref();
321            let mut buf = Vec::new();
322            let window_function = match fun {
323                WindowFunctionDefinition::AggregateUDF(aggr_udf) => {
324                    let _ = codec.try_encode_udaf(aggr_udf, &mut buf);
325                    protobuf::window_expr_node::WindowFunction::Udaf(
326                        aggr_udf.name().to_string(),
327                    )
328                }
329                WindowFunctionDefinition::WindowUDF(window_udf) => {
330                    let _ = codec.try_encode_udwf(window_udf, &mut buf);
331                    protobuf::window_expr_node::WindowFunction::Udwf(
332                        window_udf.name().to_string(),
333                    )
334                }
335            };
336            let fun_definition = (!buf.is_empty()).then_some(buf);
337            let partition_by = serialize_exprs(partition_by, codec)?;
338            let order_by = serialize_sorts(order_by, codec)?;
339
340            let window_frame: Option<protobuf::WindowFrame> =
341                Some(window_frame.try_into()?);
342            let window_expr = protobuf::WindowExprNode {
343                exprs: serialize_exprs(args, codec)?,
344                window_function: Some(window_function),
345                partition_by,
346                order_by,
347                window_frame,
348                fun_definition,
349            };
350            protobuf::LogicalExprNode {
351                expr_type: Some(ExprType::WindowExpr(window_expr)),
352            }
353        }
354        Expr::AggregateFunction(expr::AggregateFunction {
355            ref func,
356            params:
357                AggregateFunctionParams {
358                    ref args,
359                    ref distinct,
360                    ref filter,
361                    ref order_by,
362                    null_treatment: _,
363                },
364        }) => {
365            let mut buf = Vec::new();
366            let _ = codec.try_encode_udaf(func, &mut buf);
367            protobuf::LogicalExprNode {
368                expr_type: Some(ExprType::AggregateUdfExpr(Box::new(
369                    protobuf::AggregateUdfExprNode {
370                        fun_name: func.name().to_string(),
371                        args: serialize_exprs(args, codec)?,
372                        distinct: *distinct,
373                        filter: match filter {
374                            Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
375                            None => None,
376                        },
377                        order_by: serialize_sorts(order_by, codec)?,
378                        fun_definition: (!buf.is_empty()).then_some(buf),
379                    },
380                ))),
381            }
382        }
383
384        Expr::ScalarVariable(_, _) => {
385            return Err(Error::General(
386                "Proto serialization error: Scalar Variable not supported".to_string(),
387            ))
388        }
389        Expr::ScalarFunction(ScalarFunction { func, args }) => {
390            let mut buf = Vec::new();
391            let _ = codec.try_encode_udf(func, &mut buf);
392            protobuf::LogicalExprNode {
393                expr_type: Some(ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
394                    fun_name: func.name().to_string(),
395                    fun_definition: (!buf.is_empty()).then_some(buf),
396                    args: serialize_exprs(args, codec)?,
397                })),
398            }
399        }
400        Expr::Not(expr) => {
401            let expr = Box::new(protobuf::Not {
402                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
403            });
404            protobuf::LogicalExprNode {
405                expr_type: Some(ExprType::NotExpr(expr)),
406            }
407        }
408        Expr::IsNull(expr) => {
409            let expr = Box::new(protobuf::IsNull {
410                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
411            });
412            protobuf::LogicalExprNode {
413                expr_type: Some(ExprType::IsNullExpr(expr)),
414            }
415        }
416        Expr::IsNotNull(expr) => {
417            let expr = Box::new(protobuf::IsNotNull {
418                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
419            });
420            protobuf::LogicalExprNode {
421                expr_type: Some(ExprType::IsNotNullExpr(expr)),
422            }
423        }
424        Expr::IsTrue(expr) => {
425            let expr = Box::new(protobuf::IsTrue {
426                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
427            });
428            protobuf::LogicalExprNode {
429                expr_type: Some(ExprType::IsTrue(expr)),
430            }
431        }
432        Expr::IsFalse(expr) => {
433            let expr = Box::new(protobuf::IsFalse {
434                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
435            });
436            protobuf::LogicalExprNode {
437                expr_type: Some(ExprType::IsFalse(expr)),
438            }
439        }
440        Expr::IsUnknown(expr) => {
441            let expr = Box::new(protobuf::IsUnknown {
442                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
443            });
444            protobuf::LogicalExprNode {
445                expr_type: Some(ExprType::IsUnknown(expr)),
446            }
447        }
448        Expr::IsNotTrue(expr) => {
449            let expr = Box::new(protobuf::IsNotTrue {
450                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
451            });
452            protobuf::LogicalExprNode {
453                expr_type: Some(ExprType::IsNotTrue(expr)),
454            }
455        }
456        Expr::IsNotFalse(expr) => {
457            let expr = Box::new(protobuf::IsNotFalse {
458                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
459            });
460            protobuf::LogicalExprNode {
461                expr_type: Some(ExprType::IsNotFalse(expr)),
462            }
463        }
464        Expr::IsNotUnknown(expr) => {
465            let expr = Box::new(protobuf::IsNotUnknown {
466                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
467            });
468            protobuf::LogicalExprNode {
469                expr_type: Some(ExprType::IsNotUnknown(expr)),
470            }
471        }
472        Expr::Between(Between {
473            expr,
474            negated,
475            low,
476            high,
477        }) => {
478            let expr = Box::new(protobuf::BetweenNode {
479                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
480                negated: *negated,
481                low: Some(Box::new(serialize_expr(low.as_ref(), codec)?)),
482                high: Some(Box::new(serialize_expr(high.as_ref(), codec)?)),
483            });
484            protobuf::LogicalExprNode {
485                expr_type: Some(ExprType::Between(expr)),
486            }
487        }
488        Expr::Case(case) => {
489            let when_then_expr = case
490                .when_then_expr
491                .iter()
492                .map(|(w, t)| {
493                    Ok(protobuf::WhenThen {
494                        when_expr: Some(serialize_expr(w.as_ref(), codec)?),
495                        then_expr: Some(serialize_expr(t.as_ref(), codec)?),
496                    })
497                })
498                .collect::<Result<Vec<protobuf::WhenThen>, Error>>()?;
499            let expr = Box::new(protobuf::CaseNode {
500                expr: match &case.expr {
501                    Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
502                    None => None,
503                },
504                when_then_expr,
505                else_expr: match &case.else_expr {
506                    Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
507                    None => None,
508                },
509            });
510            protobuf::LogicalExprNode {
511                expr_type: Some(ExprType::Case(expr)),
512            }
513        }
514        Expr::Cast(Cast { expr, data_type }) => {
515            let expr = Box::new(protobuf::CastNode {
516                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
517                arrow_type: Some(data_type.try_into()?),
518            });
519            protobuf::LogicalExprNode {
520                expr_type: Some(ExprType::Cast(expr)),
521            }
522        }
523        Expr::TryCast(TryCast { expr, data_type }) => {
524            let expr = Box::new(protobuf::TryCastNode {
525                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
526                arrow_type: Some(data_type.try_into()?),
527            });
528            protobuf::LogicalExprNode {
529                expr_type: Some(ExprType::TryCast(expr)),
530            }
531        }
532        Expr::Negative(expr) => {
533            let expr = Box::new(protobuf::NegativeNode {
534                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
535            });
536            protobuf::LogicalExprNode {
537                expr_type: Some(ExprType::Negative(expr)),
538            }
539        }
540        Expr::Unnest(Unnest { expr }) => {
541            let expr = protobuf::Unnest {
542                exprs: vec![serialize_expr(expr.as_ref(), codec)?],
543            };
544            protobuf::LogicalExprNode {
545                expr_type: Some(ExprType::Unnest(expr)),
546            }
547        }
548        Expr::InList(InList {
549            expr,
550            list,
551            negated,
552        }) => {
553            let expr = Box::new(protobuf::InListNode {
554                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
555                list: serialize_exprs(list, codec)?,
556                negated: *negated,
557            });
558            protobuf::LogicalExprNode {
559                expr_type: Some(ExprType::InList(expr)),
560            }
561        }
562        #[expect(deprecated)]
563        Expr::Wildcard { qualifier, .. } => protobuf::LogicalExprNode {
564            expr_type: Some(ExprType::Wildcard(protobuf::Wildcard {
565                qualifier: qualifier.to_owned().map(|x| x.into()),
566            })),
567        },
568        Expr::ScalarSubquery(_)
569        | Expr::InSubquery(_)
570        | Expr::Exists { .. }
571        | Expr::OuterReferenceColumn { .. } => {
572            // we would need to add logical plan operators to datafusion.proto to support this
573            // see discussion in https://github.com/apache/datafusion/issues/2565
574            return Err(Error::General("Proto serialization error: Expr::ScalarSubquery(_) | Expr::InSubquery(_) | Expr::Exists { .. } | Exp:OuterReferenceColumn not supported".to_string()));
575        }
576        Expr::GroupingSet(GroupingSet::Cube(exprs)) => protobuf::LogicalExprNode {
577            expr_type: Some(ExprType::Cube(CubeNode {
578                expr: serialize_exprs(exprs, codec)?,
579            })),
580        },
581        Expr::GroupingSet(GroupingSet::Rollup(exprs)) => protobuf::LogicalExprNode {
582            expr_type: Some(ExprType::Rollup(RollupNode {
583                expr: serialize_exprs(exprs, codec)?,
584            })),
585        },
586        Expr::GroupingSet(GroupingSet::GroupingSets(exprs)) => {
587            protobuf::LogicalExprNode {
588                expr_type: Some(ExprType::GroupingSet(GroupingSetNode {
589                    expr: exprs
590                        .iter()
591                        .map(|expr_list| {
592                            Ok(LogicalExprList {
593                                expr: serialize_exprs(expr_list, codec)?,
594                            })
595                        })
596                        .collect::<Result<Vec<_>, Error>>()?,
597                })),
598            }
599        }
600        Expr::Placeholder(Placeholder { id, data_type }) => {
601            let data_type = match data_type {
602                Some(data_type) => Some(data_type.try_into()?),
603                None => None,
604            };
605            protobuf::LogicalExprNode {
606                expr_type: Some(ExprType::Placeholder(PlaceholderNode {
607                    id: id.clone(),
608                    data_type,
609                })),
610            }
611        }
612    };
613
614    Ok(expr_node)
615}
616
617pub fn serialize_sorts<'a, I>(
618    sorts: I,
619    codec: &dyn LogicalExtensionCodec,
620) -> Result<Vec<protobuf::SortExprNode>, Error>
621where
622    I: IntoIterator<Item = &'a SortExpr>,
623{
624    sorts
625        .into_iter()
626        .map(|sort| {
627            let SortExpr {
628                expr,
629                asc,
630                nulls_first,
631            } = sort;
632            Ok(protobuf::SortExprNode {
633                expr: Some(serialize_expr(expr, codec)?),
634                asc: *asc,
635                nulls_first: *nulls_first,
636            })
637        })
638        .collect::<Result<Vec<_>, Error>>()
639}
640
641impl From<TableReference> for protobuf::TableReference {
642    fn from(t: TableReference) -> Self {
643        use protobuf::table_reference::TableReferenceEnum;
644        let table_reference_enum = match t {
645            TableReference::Bare { table } => {
646                TableReferenceEnum::Bare(protobuf::BareTableReference {
647                    table: table.to_string(),
648                })
649            }
650            TableReference::Partial { schema, table } => {
651                TableReferenceEnum::Partial(protobuf::PartialTableReference {
652                    schema: schema.to_string(),
653                    table: table.to_string(),
654                })
655            }
656            TableReference::Full {
657                catalog,
658                schema,
659                table,
660            } => TableReferenceEnum::Full(protobuf::FullTableReference {
661                catalog: catalog.to_string(),
662                schema: schema.to_string(),
663                table: table.to_string(),
664            }),
665        };
666
667        protobuf::TableReference {
668            table_reference_enum: Some(table_reference_enum),
669        }
670    }
671}
672
673impl From<JoinType> for protobuf::JoinType {
674    fn from(t: JoinType) -> Self {
675        match t {
676            JoinType::Inner => protobuf::JoinType::Inner,
677            JoinType::Left => protobuf::JoinType::Left,
678            JoinType::Right => protobuf::JoinType::Right,
679            JoinType::Full => protobuf::JoinType::Full,
680            JoinType::LeftSemi => protobuf::JoinType::Leftsemi,
681            JoinType::RightSemi => protobuf::JoinType::Rightsemi,
682            JoinType::LeftAnti => protobuf::JoinType::Leftanti,
683            JoinType::RightAnti => protobuf::JoinType::Rightanti,
684            JoinType::LeftMark => protobuf::JoinType::Leftmark,
685            JoinType::RightMark => protobuf::JoinType::Rightmark,
686        }
687    }
688}
689
690impl From<JoinConstraint> for protobuf::JoinConstraint {
691    fn from(t: JoinConstraint) -> Self {
692        match t {
693            JoinConstraint::On => protobuf::JoinConstraint::On,
694            JoinConstraint::Using => protobuf::JoinConstraint::Using,
695        }
696    }
697}
698
699impl From<NullEquality> for protobuf::NullEquality {
700    fn from(t: NullEquality) -> Self {
701        match t {
702            NullEquality::NullEqualsNothing => protobuf::NullEquality::NullEqualsNothing,
703            NullEquality::NullEqualsNull => protobuf::NullEquality::NullEqualsNull,
704        }
705    }
706}
707
708impl From<&WriteOp> for protobuf::dml_node::Type {
709    fn from(t: &WriteOp) -> Self {
710        match t {
711            WriteOp::Insert(InsertOp::Append) => protobuf::dml_node::Type::InsertAppend,
712            WriteOp::Insert(InsertOp::Overwrite) => {
713                protobuf::dml_node::Type::InsertOverwrite
714            }
715            WriteOp::Insert(InsertOp::Replace) => protobuf::dml_node::Type::InsertReplace,
716            WriteOp::Delete => protobuf::dml_node::Type::Delete,
717            WriteOp::Update => protobuf::dml_node::Type::Update,
718            WriteOp::Ctas => protobuf::dml_node::Type::Ctas,
719        }
720    }
721}