datafusion_proto/logical_plan/
to_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Code to convert Arrow schemas and DataFusion logical plans to protocol buffer format, allowing
19//! DataFusion logical plans to be serialized and transmitted between
20//! processes.
21
22use std::collections::HashMap;
23
24use datafusion_common::{TableReference, UnnestOptions};
25use datafusion_expr::dml::InsertOp;
26use datafusion_expr::expr::{
27    self, AggregateFunctionParams, Alias, Between, BinaryExpr, Cast, GroupingSet, InList,
28    Like, Placeholder, ScalarFunction, Unnest,
29};
30use datafusion_expr::WriteOp;
31use datafusion_expr::{
32    logical_plan::PlanType, logical_plan::StringifiedPlan, Expr, JoinConstraint,
33    JoinType, SortExpr, TryCast, WindowFrame, WindowFrameBound, WindowFrameUnits,
34    WindowFunctionDefinition,
35};
36
37use crate::protobuf::RecursionUnnestOption;
38use crate::protobuf::{
39    self,
40    plan_type::PlanTypeEnum::{
41        AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
42        FinalPhysicalPlan, FinalPhysicalPlanWithSchema, FinalPhysicalPlanWithStats,
43        InitialLogicalPlan, InitialPhysicalPlan, InitialPhysicalPlanWithSchema,
44        InitialPhysicalPlanWithStats, OptimizedLogicalPlan, OptimizedPhysicalPlan,
45        PhysicalPlanError,
46    },
47    AnalyzedLogicalPlanType, CubeNode, EmptyMessage, GroupingSetNode, LogicalExprList,
48    OptimizedLogicalPlanType, OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
49    ToProtoError as Error,
50};
51
52use super::LogicalExtensionCodec;
53
54impl From<&UnnestOptions> for protobuf::UnnestOptions {
55    fn from(opts: &UnnestOptions) -> Self {
56        Self {
57            preserve_nulls: opts.preserve_nulls,
58            recursions: opts
59                .recursions
60                .iter()
61                .map(|r| RecursionUnnestOption {
62                    input_column: Some((&r.input_column).into()),
63                    output_column: Some((&r.output_column).into()),
64                    depth: r.depth as u32,
65                })
66                .collect(),
67        }
68    }
69}
70
71impl From<&StringifiedPlan> for protobuf::StringifiedPlan {
72    fn from(stringified_plan: &StringifiedPlan) -> Self {
73        Self {
74            plan_type: match stringified_plan.clone().plan_type {
75                PlanType::InitialLogicalPlan => Some(protobuf::PlanType {
76                    plan_type_enum: Some(InitialLogicalPlan(EmptyMessage {})),
77                }),
78                PlanType::AnalyzedLogicalPlan { analyzer_name } => {
79                    Some(protobuf::PlanType {
80                        plan_type_enum: Some(AnalyzedLogicalPlan(
81                            AnalyzedLogicalPlanType { analyzer_name },
82                        )),
83                    })
84                }
85                PlanType::FinalAnalyzedLogicalPlan => Some(protobuf::PlanType {
86                    plan_type_enum: Some(FinalAnalyzedLogicalPlan(EmptyMessage {})),
87                }),
88                PlanType::OptimizedLogicalPlan { optimizer_name } => {
89                    Some(protobuf::PlanType {
90                        plan_type_enum: Some(OptimizedLogicalPlan(
91                            OptimizedLogicalPlanType { optimizer_name },
92                        )),
93                    })
94                }
95                PlanType::FinalLogicalPlan => Some(protobuf::PlanType {
96                    plan_type_enum: Some(FinalLogicalPlan(EmptyMessage {})),
97                }),
98                PlanType::InitialPhysicalPlan => Some(protobuf::PlanType {
99                    plan_type_enum: Some(InitialPhysicalPlan(EmptyMessage {})),
100                }),
101                PlanType::OptimizedPhysicalPlan { optimizer_name } => {
102                    Some(protobuf::PlanType {
103                        plan_type_enum: Some(OptimizedPhysicalPlan(
104                            OptimizedPhysicalPlanType { optimizer_name },
105                        )),
106                    })
107                }
108                PlanType::FinalPhysicalPlan => Some(protobuf::PlanType {
109                    plan_type_enum: Some(FinalPhysicalPlan(EmptyMessage {})),
110                }),
111                PlanType::InitialPhysicalPlanWithStats => Some(protobuf::PlanType {
112                    plan_type_enum: Some(InitialPhysicalPlanWithStats(EmptyMessage {})),
113                }),
114                PlanType::InitialPhysicalPlanWithSchema => Some(protobuf::PlanType {
115                    plan_type_enum: Some(InitialPhysicalPlanWithSchema(EmptyMessage {})),
116                }),
117                PlanType::FinalPhysicalPlanWithStats => Some(protobuf::PlanType {
118                    plan_type_enum: Some(FinalPhysicalPlanWithStats(EmptyMessage {})),
119                }),
120                PlanType::FinalPhysicalPlanWithSchema => Some(protobuf::PlanType {
121                    plan_type_enum: Some(FinalPhysicalPlanWithSchema(EmptyMessage {})),
122                }),
123                PlanType::PhysicalPlanError => Some(protobuf::PlanType {
124                    plan_type_enum: Some(PhysicalPlanError(EmptyMessage {})),
125                }),
126            },
127            plan: stringified_plan.plan.to_string(),
128        }
129    }
130}
131
132impl From<WindowFrameUnits> for protobuf::WindowFrameUnits {
133    fn from(units: WindowFrameUnits) -> Self {
134        match units {
135            WindowFrameUnits::Rows => Self::Rows,
136            WindowFrameUnits::Range => Self::Range,
137            WindowFrameUnits::Groups => Self::Groups,
138        }
139    }
140}
141
142impl TryFrom<&WindowFrameBound> for protobuf::WindowFrameBound {
143    type Error = Error;
144
145    fn try_from(bound: &WindowFrameBound) -> Result<Self, Self::Error> {
146        Ok(match bound {
147            WindowFrameBound::CurrentRow => Self {
148                window_frame_bound_type: protobuf::WindowFrameBoundType::CurrentRow
149                    .into(),
150                bound_value: None,
151            },
152            WindowFrameBound::Preceding(v) => Self {
153                window_frame_bound_type: protobuf::WindowFrameBoundType::Preceding.into(),
154                bound_value: Some(v.try_into()?),
155            },
156            WindowFrameBound::Following(v) => Self {
157                window_frame_bound_type: protobuf::WindowFrameBoundType::Following.into(),
158                bound_value: Some(v.try_into()?),
159            },
160        })
161    }
162}
163
164impl TryFrom<&WindowFrame> for protobuf::WindowFrame {
165    type Error = Error;
166
167    fn try_from(window: &WindowFrame) -> Result<Self, Self::Error> {
168        Ok(Self {
169            window_frame_units: protobuf::WindowFrameUnits::from(window.units).into(),
170            start_bound: Some((&window.start_bound).try_into()?),
171            end_bound: Some(protobuf::window_frame::EndBound::Bound(
172                (&window.end_bound).try_into()?,
173            )),
174        })
175    }
176}
177
178pub fn serialize_exprs<'a, I>(
179    exprs: I,
180    codec: &dyn LogicalExtensionCodec,
181) -> Result<Vec<protobuf::LogicalExprNode>, Error>
182where
183    I: IntoIterator<Item = &'a Expr>,
184{
185    exprs
186        .into_iter()
187        .map(|expr| serialize_expr(expr, codec))
188        .collect::<Result<Vec<_>, Error>>()
189}
190
191pub fn serialize_expr(
192    expr: &Expr,
193    codec: &dyn LogicalExtensionCodec,
194) -> Result<protobuf::LogicalExprNode, Error> {
195    use protobuf::logical_expr_node::ExprType;
196
197    let expr_node = match expr {
198        Expr::Column(c) => protobuf::LogicalExprNode {
199            expr_type: Some(ExprType::Column(c.into())),
200        },
201        Expr::Alias(Alias {
202            expr,
203            relation,
204            name,
205            metadata,
206        }) => {
207            let alias = Box::new(protobuf::AliasNode {
208                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
209                relation: relation
210                    .to_owned()
211                    .map(|r| vec![r.into()])
212                    .unwrap_or(vec![]),
213                alias: name.to_owned(),
214                metadata: metadata.to_owned().unwrap_or(HashMap::new()),
215            });
216            protobuf::LogicalExprNode {
217                expr_type: Some(ExprType::Alias(alias)),
218            }
219        }
220        Expr::Literal(value) => {
221            let pb_value: protobuf::ScalarValue = value.try_into()?;
222            protobuf::LogicalExprNode {
223                expr_type: Some(ExprType::Literal(pb_value)),
224            }
225        }
226        Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
227            // Try to linerize a nested binary expression tree of the same operator
228            // into a flat vector of expressions.
229            let mut exprs = vec![right.as_ref()];
230            let mut current_expr = left.as_ref();
231            while let Expr::BinaryExpr(BinaryExpr {
232                left,
233                op: current_op,
234                right,
235            }) = current_expr
236            {
237                if current_op == op {
238                    exprs.push(right.as_ref());
239                    current_expr = left.as_ref();
240                } else {
241                    break;
242                }
243            }
244            exprs.push(current_expr);
245
246            let binary_expr = protobuf::BinaryExprNode {
247                // We need to reverse exprs since operands are expected to be
248                // linearized from left innermost to right outermost (but while
249                // traversing the chain we do the exact opposite).
250                operands: serialize_exprs(exprs.into_iter().rev(), codec)?,
251                op: format!("{op:?}"),
252            };
253            protobuf::LogicalExprNode {
254                expr_type: Some(ExprType::BinaryExpr(binary_expr)),
255            }
256        }
257        Expr::Like(Like {
258            negated,
259            expr,
260            pattern,
261            escape_char,
262            case_insensitive,
263        }) => {
264            if *case_insensitive {
265                let pb = Box::new(protobuf::ILikeNode {
266                    negated: *negated,
267                    expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
268                    pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
269                    escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
270                });
271
272                protobuf::LogicalExprNode {
273                    expr_type: Some(ExprType::Ilike(pb)),
274                }
275            } else {
276                let pb = Box::new(protobuf::LikeNode {
277                    negated: *negated,
278                    expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
279                    pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
280                    escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
281                });
282
283                protobuf::LogicalExprNode {
284                    expr_type: Some(ExprType::Like(pb)),
285                }
286            }
287        }
288        Expr::SimilarTo(Like {
289            negated,
290            expr,
291            pattern,
292            escape_char,
293            case_insensitive: _,
294        }) => {
295            let pb = Box::new(protobuf::SimilarToNode {
296                negated: *negated,
297                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
298                pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
299                escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
300            });
301            protobuf::LogicalExprNode {
302                expr_type: Some(ExprType::SimilarTo(pb)),
303            }
304        }
305        Expr::WindowFunction(expr::WindowFunction {
306            ref fun,
307            params:
308                expr::WindowFunctionParams {
309                    ref args,
310                    ref partition_by,
311                    ref order_by,
312                    ref window_frame,
313                    // TODO: support null treatment in proto
314                    null_treatment: _,
315                },
316        }) => {
317            let (window_function, fun_definition) = match fun {
318                WindowFunctionDefinition::AggregateUDF(aggr_udf) => {
319                    let mut buf = Vec::new();
320                    let _ = codec.try_encode_udaf(aggr_udf, &mut buf);
321                    (
322                        protobuf::window_expr_node::WindowFunction::Udaf(
323                            aggr_udf.name().to_string(),
324                        ),
325                        (!buf.is_empty()).then_some(buf),
326                    )
327                }
328                WindowFunctionDefinition::WindowUDF(window_udf) => {
329                    let mut buf = Vec::new();
330                    let _ = codec.try_encode_udwf(window_udf, &mut buf);
331                    (
332                        protobuf::window_expr_node::WindowFunction::Udwf(
333                            window_udf.name().to_string(),
334                        ),
335                        (!buf.is_empty()).then_some(buf),
336                    )
337                }
338            };
339            let partition_by = serialize_exprs(partition_by, codec)?;
340            let order_by = serialize_sorts(order_by, codec)?;
341
342            let window_frame: Option<protobuf::WindowFrame> =
343                Some(window_frame.try_into()?);
344            let window_expr = protobuf::WindowExprNode {
345                exprs: serialize_exprs(args, codec)?,
346                window_function: Some(window_function),
347                partition_by,
348                order_by,
349                window_frame,
350                fun_definition,
351            };
352            protobuf::LogicalExprNode {
353                expr_type: Some(ExprType::WindowExpr(window_expr)),
354            }
355        }
356        Expr::AggregateFunction(expr::AggregateFunction {
357            ref func,
358            params:
359                AggregateFunctionParams {
360                    ref args,
361                    ref distinct,
362                    ref filter,
363                    ref order_by,
364                    null_treatment: _,
365                },
366        }) => {
367            let mut buf = Vec::new();
368            let _ = codec.try_encode_udaf(func, &mut buf);
369            protobuf::LogicalExprNode {
370                expr_type: Some(ExprType::AggregateUdfExpr(Box::new(
371                    protobuf::AggregateUdfExprNode {
372                        fun_name: func.name().to_string(),
373                        args: serialize_exprs(args, codec)?,
374                        distinct: *distinct,
375                        filter: match filter {
376                            Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
377                            None => None,
378                        },
379                        order_by: match order_by {
380                            Some(e) => serialize_sorts(e, codec)?,
381                            None => vec![],
382                        },
383                        fun_definition: (!buf.is_empty()).then_some(buf),
384                    },
385                ))),
386            }
387        }
388
389        Expr::ScalarVariable(_, _) => {
390            return Err(Error::General(
391                "Proto serialization error: Scalar Variable not supported".to_string(),
392            ))
393        }
394        Expr::ScalarFunction(ScalarFunction { func, args }) => {
395            let mut buf = Vec::new();
396            let _ = codec.try_encode_udf(func, &mut buf);
397            protobuf::LogicalExprNode {
398                expr_type: Some(ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
399                    fun_name: func.name().to_string(),
400                    fun_definition: (!buf.is_empty()).then_some(buf),
401                    args: serialize_exprs(args, codec)?,
402                })),
403            }
404        }
405        Expr::Not(expr) => {
406            let expr = Box::new(protobuf::Not {
407                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
408            });
409            protobuf::LogicalExprNode {
410                expr_type: Some(ExprType::NotExpr(expr)),
411            }
412        }
413        Expr::IsNull(expr) => {
414            let expr = Box::new(protobuf::IsNull {
415                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
416            });
417            protobuf::LogicalExprNode {
418                expr_type: Some(ExprType::IsNullExpr(expr)),
419            }
420        }
421        Expr::IsNotNull(expr) => {
422            let expr = Box::new(protobuf::IsNotNull {
423                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
424            });
425            protobuf::LogicalExprNode {
426                expr_type: Some(ExprType::IsNotNullExpr(expr)),
427            }
428        }
429        Expr::IsTrue(expr) => {
430            let expr = Box::new(protobuf::IsTrue {
431                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
432            });
433            protobuf::LogicalExprNode {
434                expr_type: Some(ExprType::IsTrue(expr)),
435            }
436        }
437        Expr::IsFalse(expr) => {
438            let expr = Box::new(protobuf::IsFalse {
439                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
440            });
441            protobuf::LogicalExprNode {
442                expr_type: Some(ExprType::IsFalse(expr)),
443            }
444        }
445        Expr::IsUnknown(expr) => {
446            let expr = Box::new(protobuf::IsUnknown {
447                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
448            });
449            protobuf::LogicalExprNode {
450                expr_type: Some(ExprType::IsUnknown(expr)),
451            }
452        }
453        Expr::IsNotTrue(expr) => {
454            let expr = Box::new(protobuf::IsNotTrue {
455                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
456            });
457            protobuf::LogicalExprNode {
458                expr_type: Some(ExprType::IsNotTrue(expr)),
459            }
460        }
461        Expr::IsNotFalse(expr) => {
462            let expr = Box::new(protobuf::IsNotFalse {
463                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
464            });
465            protobuf::LogicalExprNode {
466                expr_type: Some(ExprType::IsNotFalse(expr)),
467            }
468        }
469        Expr::IsNotUnknown(expr) => {
470            let expr = Box::new(protobuf::IsNotUnknown {
471                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
472            });
473            protobuf::LogicalExprNode {
474                expr_type: Some(ExprType::IsNotUnknown(expr)),
475            }
476        }
477        Expr::Between(Between {
478            expr,
479            negated,
480            low,
481            high,
482        }) => {
483            let expr = Box::new(protobuf::BetweenNode {
484                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
485                negated: *negated,
486                low: Some(Box::new(serialize_expr(low.as_ref(), codec)?)),
487                high: Some(Box::new(serialize_expr(high.as_ref(), codec)?)),
488            });
489            protobuf::LogicalExprNode {
490                expr_type: Some(ExprType::Between(expr)),
491            }
492        }
493        Expr::Case(case) => {
494            let when_then_expr = case
495                .when_then_expr
496                .iter()
497                .map(|(w, t)| {
498                    Ok(protobuf::WhenThen {
499                        when_expr: Some(serialize_expr(w.as_ref(), codec)?),
500                        then_expr: Some(serialize_expr(t.as_ref(), codec)?),
501                    })
502                })
503                .collect::<Result<Vec<protobuf::WhenThen>, Error>>()?;
504            let expr = Box::new(protobuf::CaseNode {
505                expr: match &case.expr {
506                    Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
507                    None => None,
508                },
509                when_then_expr,
510                else_expr: match &case.else_expr {
511                    Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
512                    None => None,
513                },
514            });
515            protobuf::LogicalExprNode {
516                expr_type: Some(ExprType::Case(expr)),
517            }
518        }
519        Expr::Cast(Cast { expr, data_type }) => {
520            let expr = Box::new(protobuf::CastNode {
521                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
522                arrow_type: Some(data_type.try_into()?),
523            });
524            protobuf::LogicalExprNode {
525                expr_type: Some(ExprType::Cast(expr)),
526            }
527        }
528        Expr::TryCast(TryCast { expr, data_type }) => {
529            let expr = Box::new(protobuf::TryCastNode {
530                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
531                arrow_type: Some(data_type.try_into()?),
532            });
533            protobuf::LogicalExprNode {
534                expr_type: Some(ExprType::TryCast(expr)),
535            }
536        }
537        Expr::Negative(expr) => {
538            let expr = Box::new(protobuf::NegativeNode {
539                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
540            });
541            protobuf::LogicalExprNode {
542                expr_type: Some(ExprType::Negative(expr)),
543            }
544        }
545        Expr::Unnest(Unnest { expr }) => {
546            let expr = protobuf::Unnest {
547                exprs: vec![serialize_expr(expr.as_ref(), codec)?],
548            };
549            protobuf::LogicalExprNode {
550                expr_type: Some(ExprType::Unnest(expr)),
551            }
552        }
553        Expr::InList(InList {
554            expr,
555            list,
556            negated,
557        }) => {
558            let expr = Box::new(protobuf::InListNode {
559                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
560                list: serialize_exprs(list, codec)?,
561                negated: *negated,
562            });
563            protobuf::LogicalExprNode {
564                expr_type: Some(ExprType::InList(expr)),
565            }
566        }
567        #[expect(deprecated)]
568        Expr::Wildcard { qualifier, .. } => protobuf::LogicalExprNode {
569            expr_type: Some(ExprType::Wildcard(protobuf::Wildcard {
570                qualifier: qualifier.to_owned().map(|x| x.into()),
571            })),
572        },
573        Expr::ScalarSubquery(_)
574        | Expr::InSubquery(_)
575        | Expr::Exists { .. }
576        | Expr::OuterReferenceColumn { .. } => {
577            // we would need to add logical plan operators to datafusion.proto to support this
578            // see discussion in https://github.com/apache/datafusion/issues/2565
579            return Err(Error::General("Proto serialization error: Expr::ScalarSubquery(_) | Expr::InSubquery(_) | Expr::Exists { .. } | Exp:OuterReferenceColumn not supported".to_string()));
580        }
581        Expr::GroupingSet(GroupingSet::Cube(exprs)) => protobuf::LogicalExprNode {
582            expr_type: Some(ExprType::Cube(CubeNode {
583                expr: serialize_exprs(exprs, codec)?,
584            })),
585        },
586        Expr::GroupingSet(GroupingSet::Rollup(exprs)) => protobuf::LogicalExprNode {
587            expr_type: Some(ExprType::Rollup(RollupNode {
588                expr: serialize_exprs(exprs, codec)?,
589            })),
590        },
591        Expr::GroupingSet(GroupingSet::GroupingSets(exprs)) => {
592            protobuf::LogicalExprNode {
593                expr_type: Some(ExprType::GroupingSet(GroupingSetNode {
594                    expr: exprs
595                        .iter()
596                        .map(|expr_list| {
597                            Ok(LogicalExprList {
598                                expr: serialize_exprs(expr_list, codec)?,
599                            })
600                        })
601                        .collect::<Result<Vec<_>, Error>>()?,
602                })),
603            }
604        }
605        Expr::Placeholder(Placeholder { id, data_type }) => {
606            let data_type = match data_type {
607                Some(data_type) => Some(data_type.try_into()?),
608                None => None,
609            };
610            protobuf::LogicalExprNode {
611                expr_type: Some(ExprType::Placeholder(PlaceholderNode {
612                    id: id.clone(),
613                    data_type,
614                })),
615            }
616        }
617    };
618
619    Ok(expr_node)
620}
621
622pub fn serialize_sorts<'a, I>(
623    sorts: I,
624    codec: &dyn LogicalExtensionCodec,
625) -> Result<Vec<protobuf::SortExprNode>, Error>
626where
627    I: IntoIterator<Item = &'a SortExpr>,
628{
629    sorts
630        .into_iter()
631        .map(|sort| {
632            let SortExpr {
633                expr,
634                asc,
635                nulls_first,
636            } = sort;
637            Ok(protobuf::SortExprNode {
638                expr: Some(serialize_expr(expr, codec)?),
639                asc: *asc,
640                nulls_first: *nulls_first,
641            })
642        })
643        .collect::<Result<Vec<_>, Error>>()
644}
645
646impl From<TableReference> for protobuf::TableReference {
647    fn from(t: TableReference) -> Self {
648        use protobuf::table_reference::TableReferenceEnum;
649        let table_reference_enum = match t {
650            TableReference::Bare { table } => {
651                TableReferenceEnum::Bare(protobuf::BareTableReference {
652                    table: table.to_string(),
653                })
654            }
655            TableReference::Partial { schema, table } => {
656                TableReferenceEnum::Partial(protobuf::PartialTableReference {
657                    schema: schema.to_string(),
658                    table: table.to_string(),
659                })
660            }
661            TableReference::Full {
662                catalog,
663                schema,
664                table,
665            } => TableReferenceEnum::Full(protobuf::FullTableReference {
666                catalog: catalog.to_string(),
667                schema: schema.to_string(),
668                table: table.to_string(),
669            }),
670        };
671
672        protobuf::TableReference {
673            table_reference_enum: Some(table_reference_enum),
674        }
675    }
676}
677
678impl From<JoinType> for protobuf::JoinType {
679    fn from(t: JoinType) -> Self {
680        match t {
681            JoinType::Inner => protobuf::JoinType::Inner,
682            JoinType::Left => protobuf::JoinType::Left,
683            JoinType::Right => protobuf::JoinType::Right,
684            JoinType::Full => protobuf::JoinType::Full,
685            JoinType::LeftSemi => protobuf::JoinType::Leftsemi,
686            JoinType::RightSemi => protobuf::JoinType::Rightsemi,
687            JoinType::LeftAnti => protobuf::JoinType::Leftanti,
688            JoinType::RightAnti => protobuf::JoinType::Rightanti,
689            JoinType::LeftMark => protobuf::JoinType::Leftmark,
690        }
691    }
692}
693
694impl From<JoinConstraint> for protobuf::JoinConstraint {
695    fn from(t: JoinConstraint) -> Self {
696        match t {
697            JoinConstraint::On => protobuf::JoinConstraint::On,
698            JoinConstraint::Using => protobuf::JoinConstraint::Using,
699        }
700    }
701}
702
703impl From<&WriteOp> for protobuf::dml_node::Type {
704    fn from(t: &WriteOp) -> Self {
705        match t {
706            WriteOp::Insert(InsertOp::Append) => protobuf::dml_node::Type::InsertAppend,
707            WriteOp::Insert(InsertOp::Overwrite) => {
708                protobuf::dml_node::Type::InsertOverwrite
709            }
710            WriteOp::Insert(InsertOp::Replace) => protobuf::dml_node::Type::InsertReplace,
711            WriteOp::Delete => protobuf::dml_node::Type::Delete,
712            WriteOp::Update => protobuf::dml_node::Type::Update,
713            WriteOp::Ctas => protobuf::dml_node::Type::Ctas,
714        }
715    }
716}