datafusion_proto/logical_plan/
to_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Code to convert Arrow schemas and DataFusion logical plans to protocol buffer format, allowing
19//! DataFusion logical plans to be serialized and transmitted between
20//! processes.
21
22use std::collections::HashMap;
23
24use datafusion_common::{NullEquality, TableReference, UnnestOptions};
25use datafusion_expr::WriteOp;
26use datafusion_expr::dml::InsertOp;
27use datafusion_expr::expr::{
28    self, AggregateFunctionParams, Alias, Between, BinaryExpr, Cast, GroupingSet, InList,
29    Like, NullTreatment, Placeholder, ScalarFunction, Unnest,
30};
31use datafusion_expr::{
32    Expr, JoinConstraint, JoinType, SortExpr, TryCast, WindowFrame, WindowFrameBound,
33    WindowFrameUnits, WindowFunctionDefinition, logical_plan::PlanType,
34    logical_plan::StringifiedPlan,
35};
36
37use crate::protobuf::RecursionUnnestOption;
38use crate::protobuf::{
39    self, AnalyzedLogicalPlanType, CubeNode, EmptyMessage, GroupingSetNode,
40    LogicalExprList, OptimizedLogicalPlanType, OptimizedPhysicalPlanType,
41    PlaceholderNode, RollupNode, ToProtoError as Error,
42    plan_type::PlanTypeEnum::{
43        AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
44        FinalPhysicalPlan, FinalPhysicalPlanWithSchema, FinalPhysicalPlanWithStats,
45        InitialLogicalPlan, InitialPhysicalPlan, InitialPhysicalPlanWithSchema,
46        InitialPhysicalPlanWithStats, OptimizedLogicalPlan, OptimizedPhysicalPlan,
47        PhysicalPlanError,
48    },
49};
50
51use super::LogicalExtensionCodec;
52
53impl From<&UnnestOptions> for protobuf::UnnestOptions {
54    fn from(opts: &UnnestOptions) -> Self {
55        Self {
56            preserve_nulls: opts.preserve_nulls,
57            recursions: opts
58                .recursions
59                .iter()
60                .map(|r| RecursionUnnestOption {
61                    input_column: Some((&r.input_column).into()),
62                    output_column: Some((&r.output_column).into()),
63                    depth: r.depth as u32,
64                })
65                .collect(),
66        }
67    }
68}
69
70impl From<&StringifiedPlan> for protobuf::StringifiedPlan {
71    fn from(stringified_plan: &StringifiedPlan) -> Self {
72        Self {
73            plan_type: match stringified_plan.clone().plan_type {
74                PlanType::InitialLogicalPlan => Some(protobuf::PlanType {
75                    plan_type_enum: Some(InitialLogicalPlan(EmptyMessage {})),
76                }),
77                PlanType::AnalyzedLogicalPlan { analyzer_name } => {
78                    Some(protobuf::PlanType {
79                        plan_type_enum: Some(AnalyzedLogicalPlan(
80                            AnalyzedLogicalPlanType { analyzer_name },
81                        )),
82                    })
83                }
84                PlanType::FinalAnalyzedLogicalPlan => Some(protobuf::PlanType {
85                    plan_type_enum: Some(FinalAnalyzedLogicalPlan(EmptyMessage {})),
86                }),
87                PlanType::OptimizedLogicalPlan { optimizer_name } => {
88                    Some(protobuf::PlanType {
89                        plan_type_enum: Some(OptimizedLogicalPlan(
90                            OptimizedLogicalPlanType { optimizer_name },
91                        )),
92                    })
93                }
94                PlanType::FinalLogicalPlan => Some(protobuf::PlanType {
95                    plan_type_enum: Some(FinalLogicalPlan(EmptyMessage {})),
96                }),
97                PlanType::InitialPhysicalPlan => Some(protobuf::PlanType {
98                    plan_type_enum: Some(InitialPhysicalPlan(EmptyMessage {})),
99                }),
100                PlanType::OptimizedPhysicalPlan { optimizer_name } => {
101                    Some(protobuf::PlanType {
102                        plan_type_enum: Some(OptimizedPhysicalPlan(
103                            OptimizedPhysicalPlanType { optimizer_name },
104                        )),
105                    })
106                }
107                PlanType::FinalPhysicalPlan => Some(protobuf::PlanType {
108                    plan_type_enum: Some(FinalPhysicalPlan(EmptyMessage {})),
109                }),
110                PlanType::InitialPhysicalPlanWithStats => Some(protobuf::PlanType {
111                    plan_type_enum: Some(InitialPhysicalPlanWithStats(EmptyMessage {})),
112                }),
113                PlanType::InitialPhysicalPlanWithSchema => Some(protobuf::PlanType {
114                    plan_type_enum: Some(InitialPhysicalPlanWithSchema(EmptyMessage {})),
115                }),
116                PlanType::FinalPhysicalPlanWithStats => Some(protobuf::PlanType {
117                    plan_type_enum: Some(FinalPhysicalPlanWithStats(EmptyMessage {})),
118                }),
119                PlanType::FinalPhysicalPlanWithSchema => Some(protobuf::PlanType {
120                    plan_type_enum: Some(FinalPhysicalPlanWithSchema(EmptyMessage {})),
121                }),
122                PlanType::PhysicalPlanError => Some(protobuf::PlanType {
123                    plan_type_enum: Some(PhysicalPlanError(EmptyMessage {})),
124                }),
125            },
126            plan: stringified_plan.plan.to_string(),
127        }
128    }
129}
130
131impl From<WindowFrameUnits> for protobuf::WindowFrameUnits {
132    fn from(units: WindowFrameUnits) -> Self {
133        match units {
134            WindowFrameUnits::Rows => Self::Rows,
135            WindowFrameUnits::Range => Self::Range,
136            WindowFrameUnits::Groups => Self::Groups,
137        }
138    }
139}
140
141impl TryFrom<&WindowFrameBound> for protobuf::WindowFrameBound {
142    type Error = Error;
143
144    fn try_from(bound: &WindowFrameBound) -> Result<Self, Self::Error> {
145        Ok(match bound {
146            WindowFrameBound::CurrentRow => Self {
147                window_frame_bound_type: protobuf::WindowFrameBoundType::CurrentRow
148                    .into(),
149                bound_value: None,
150            },
151            WindowFrameBound::Preceding(v) => Self {
152                window_frame_bound_type: protobuf::WindowFrameBoundType::Preceding.into(),
153                bound_value: Some(v.try_into()?),
154            },
155            WindowFrameBound::Following(v) => Self {
156                window_frame_bound_type: protobuf::WindowFrameBoundType::Following.into(),
157                bound_value: Some(v.try_into()?),
158            },
159        })
160    }
161}
162
163impl TryFrom<&WindowFrame> for protobuf::WindowFrame {
164    type Error = Error;
165
166    fn try_from(window: &WindowFrame) -> Result<Self, Self::Error> {
167        Ok(Self {
168            window_frame_units: protobuf::WindowFrameUnits::from(window.units).into(),
169            start_bound: Some((&window.start_bound).try_into()?),
170            end_bound: Some(protobuf::window_frame::EndBound::Bound(
171                (&window.end_bound).try_into()?,
172            )),
173        })
174    }
175}
176
177pub fn serialize_exprs<'a, I>(
178    exprs: I,
179    codec: &dyn LogicalExtensionCodec,
180) -> Result<Vec<protobuf::LogicalExprNode>, Error>
181where
182    I: IntoIterator<Item = &'a Expr>,
183{
184    exprs
185        .into_iter()
186        .map(|expr| serialize_expr(expr, codec))
187        .collect::<Result<Vec<_>, Error>>()
188}
189
190pub fn serialize_expr(
191    expr: &Expr,
192    codec: &dyn LogicalExtensionCodec,
193) -> Result<protobuf::LogicalExprNode, Error> {
194    use protobuf::logical_expr_node::ExprType;
195
196    let expr_node = match expr {
197        Expr::Column(c) => protobuf::LogicalExprNode {
198            expr_type: Some(ExprType::Column(c.into())),
199        },
200        Expr::Alias(Alias {
201            expr,
202            relation,
203            name,
204            metadata,
205        }) => {
206            let alias = Box::new(protobuf::AliasNode {
207                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
208                relation: relation
209                    .to_owned()
210                    .map(|r| vec![r.into()])
211                    .unwrap_or(vec![]),
212                alias: name.to_owned(),
213                metadata: metadata
214                    .as_ref()
215                    .map(|m| m.to_hashmap())
216                    .unwrap_or(HashMap::new()),
217            });
218            protobuf::LogicalExprNode {
219                expr_type: Some(ExprType::Alias(alias)),
220            }
221        }
222        Expr::Literal(value, _) => {
223            let pb_value: protobuf::ScalarValue = value.try_into()?;
224            protobuf::LogicalExprNode {
225                expr_type: Some(ExprType::Literal(pb_value)),
226            }
227        }
228        Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
229            // Try to linerize a nested binary expression tree of the same operator
230            // into a flat vector of expressions.
231            let mut exprs = vec![right.as_ref()];
232            let mut current_expr = left.as_ref();
233            while let Expr::BinaryExpr(BinaryExpr {
234                left,
235                op: current_op,
236                right,
237            }) = current_expr
238            {
239                if current_op == op {
240                    exprs.push(right.as_ref());
241                    current_expr = left.as_ref();
242                } else {
243                    break;
244                }
245            }
246            exprs.push(current_expr);
247
248            let binary_expr = protobuf::BinaryExprNode {
249                // We need to reverse exprs since operands are expected to be
250                // linearized from left innermost to right outermost (but while
251                // traversing the chain we do the exact opposite).
252                operands: serialize_exprs(exprs.into_iter().rev(), codec)?,
253                op: format!("{op:?}"),
254            };
255            protobuf::LogicalExprNode {
256                expr_type: Some(ExprType::BinaryExpr(binary_expr)),
257            }
258        }
259        Expr::Like(Like {
260            negated,
261            expr,
262            pattern,
263            escape_char,
264            case_insensitive,
265        }) => {
266            if *case_insensitive {
267                let pb = Box::new(protobuf::ILikeNode {
268                    negated: *negated,
269                    expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
270                    pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
271                    escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
272                });
273
274                protobuf::LogicalExprNode {
275                    expr_type: Some(ExprType::Ilike(pb)),
276                }
277            } else {
278                let pb = Box::new(protobuf::LikeNode {
279                    negated: *negated,
280                    expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
281                    pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
282                    escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
283                });
284
285                protobuf::LogicalExprNode {
286                    expr_type: Some(ExprType::Like(pb)),
287                }
288            }
289        }
290        Expr::SimilarTo(Like {
291            negated,
292            expr,
293            pattern,
294            escape_char,
295            case_insensitive: _,
296        }) => {
297            let pb = Box::new(protobuf::SimilarToNode {
298                negated: *negated,
299                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
300                pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
301                escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
302            });
303            protobuf::LogicalExprNode {
304                expr_type: Some(ExprType::SimilarTo(pb)),
305            }
306        }
307        Expr::WindowFunction(window_fun) => {
308            let expr::WindowFunction {
309                fun,
310                params:
311                    expr::WindowFunctionParams {
312                        args,
313                        partition_by,
314                        order_by,
315                        window_frame,
316                        null_treatment,
317                        distinct,
318                        filter,
319                    },
320            } = window_fun.as_ref();
321            let mut buf = Vec::new();
322            let window_function = match fun {
323                WindowFunctionDefinition::AggregateUDF(aggr_udf) => {
324                    let _ = codec.try_encode_udaf(aggr_udf, &mut buf);
325                    protobuf::window_expr_node::WindowFunction::Udaf(
326                        aggr_udf.name().to_string(),
327                    )
328                }
329                WindowFunctionDefinition::WindowUDF(window_udf) => {
330                    let _ = codec.try_encode_udwf(window_udf, &mut buf);
331                    protobuf::window_expr_node::WindowFunction::Udwf(
332                        window_udf.name().to_string(),
333                    )
334                }
335            };
336            let fun_definition = (!buf.is_empty()).then_some(buf);
337            let partition_by = serialize_exprs(partition_by, codec)?;
338            let order_by = serialize_sorts(order_by, codec)?;
339
340            let window_frame: Option<protobuf::WindowFrame> =
341                Some(window_frame.try_into()?);
342
343            let window_expr = protobuf::WindowExprNode {
344                exprs: serialize_exprs(args, codec)?,
345                window_function: Some(window_function),
346                partition_by,
347                order_by,
348                window_frame,
349                distinct: *distinct,
350                filter: match filter {
351                    Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
352                    None => None,
353                },
354                null_treatment: null_treatment
355                    .map(|nt| protobuf::NullTreatment::from(nt).into()),
356                fun_definition,
357            };
358            protobuf::LogicalExprNode {
359                expr_type: Some(ExprType::WindowExpr(Box::new(window_expr))),
360            }
361        }
362        Expr::AggregateFunction(expr::AggregateFunction {
363            func,
364            params:
365                AggregateFunctionParams {
366                    args,
367                    distinct,
368                    filter,
369                    order_by,
370                    null_treatment,
371                },
372        }) => {
373            let mut buf = Vec::new();
374            let _ = codec.try_encode_udaf(func, &mut buf);
375            protobuf::LogicalExprNode {
376                expr_type: Some(ExprType::AggregateUdfExpr(Box::new(
377                    protobuf::AggregateUdfExprNode {
378                        fun_name: func.name().to_string(),
379                        args: serialize_exprs(args, codec)?,
380                        distinct: *distinct,
381                        filter: match filter {
382                            Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
383                            None => None,
384                        },
385                        order_by: serialize_sorts(order_by, codec)?,
386                        fun_definition: (!buf.is_empty()).then_some(buf),
387                        null_treatment: null_treatment
388                            .map(|nt| protobuf::NullTreatment::from(nt).into()),
389                    },
390                ))),
391            }
392        }
393
394        Expr::ScalarVariable(_, _) => {
395            return Err(Error::General(
396                "Proto serialization error: Scalar Variable not supported".to_string(),
397            ));
398        }
399        Expr::ScalarFunction(ScalarFunction { func, args }) => {
400            let mut buf = Vec::new();
401            let _ = codec.try_encode_udf(func, &mut buf);
402            protobuf::LogicalExprNode {
403                expr_type: Some(ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
404                    fun_name: func.name().to_string(),
405                    fun_definition: (!buf.is_empty()).then_some(buf),
406                    args: serialize_exprs(args, codec)?,
407                })),
408            }
409        }
410        Expr::Not(expr) => {
411            let expr = Box::new(protobuf::Not {
412                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
413            });
414            protobuf::LogicalExprNode {
415                expr_type: Some(ExprType::NotExpr(expr)),
416            }
417        }
418        Expr::IsNull(expr) => {
419            let expr = Box::new(protobuf::IsNull {
420                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
421            });
422            protobuf::LogicalExprNode {
423                expr_type: Some(ExprType::IsNullExpr(expr)),
424            }
425        }
426        Expr::IsNotNull(expr) => {
427            let expr = Box::new(protobuf::IsNotNull {
428                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
429            });
430            protobuf::LogicalExprNode {
431                expr_type: Some(ExprType::IsNotNullExpr(expr)),
432            }
433        }
434        Expr::IsTrue(expr) => {
435            let expr = Box::new(protobuf::IsTrue {
436                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
437            });
438            protobuf::LogicalExprNode {
439                expr_type: Some(ExprType::IsTrue(expr)),
440            }
441        }
442        Expr::IsFalse(expr) => {
443            let expr = Box::new(protobuf::IsFalse {
444                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
445            });
446            protobuf::LogicalExprNode {
447                expr_type: Some(ExprType::IsFalse(expr)),
448            }
449        }
450        Expr::IsUnknown(expr) => {
451            let expr = Box::new(protobuf::IsUnknown {
452                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
453            });
454            protobuf::LogicalExprNode {
455                expr_type: Some(ExprType::IsUnknown(expr)),
456            }
457        }
458        Expr::IsNotTrue(expr) => {
459            let expr = Box::new(protobuf::IsNotTrue {
460                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
461            });
462            protobuf::LogicalExprNode {
463                expr_type: Some(ExprType::IsNotTrue(expr)),
464            }
465        }
466        Expr::IsNotFalse(expr) => {
467            let expr = Box::new(protobuf::IsNotFalse {
468                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
469            });
470            protobuf::LogicalExprNode {
471                expr_type: Some(ExprType::IsNotFalse(expr)),
472            }
473        }
474        Expr::IsNotUnknown(expr) => {
475            let expr = Box::new(protobuf::IsNotUnknown {
476                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
477            });
478            protobuf::LogicalExprNode {
479                expr_type: Some(ExprType::IsNotUnknown(expr)),
480            }
481        }
482        Expr::Between(Between {
483            expr,
484            negated,
485            low,
486            high,
487        }) => {
488            let expr = Box::new(protobuf::BetweenNode {
489                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
490                negated: *negated,
491                low: Some(Box::new(serialize_expr(low.as_ref(), codec)?)),
492                high: Some(Box::new(serialize_expr(high.as_ref(), codec)?)),
493            });
494            protobuf::LogicalExprNode {
495                expr_type: Some(ExprType::Between(expr)),
496            }
497        }
498        Expr::Case(case) => {
499            let when_then_expr = case
500                .when_then_expr
501                .iter()
502                .map(|(w, t)| {
503                    Ok(protobuf::WhenThen {
504                        when_expr: Some(serialize_expr(w.as_ref(), codec)?),
505                        then_expr: Some(serialize_expr(t.as_ref(), codec)?),
506                    })
507                })
508                .collect::<Result<Vec<protobuf::WhenThen>, Error>>()?;
509            let expr = Box::new(protobuf::CaseNode {
510                expr: match &case.expr {
511                    Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
512                    None => None,
513                },
514                when_then_expr,
515                else_expr: match &case.else_expr {
516                    Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
517                    None => None,
518                },
519            });
520            protobuf::LogicalExprNode {
521                expr_type: Some(ExprType::Case(expr)),
522            }
523        }
524        Expr::Cast(Cast { expr, data_type }) => {
525            let expr = Box::new(protobuf::CastNode {
526                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
527                arrow_type: Some(data_type.try_into()?),
528            });
529            protobuf::LogicalExprNode {
530                expr_type: Some(ExprType::Cast(expr)),
531            }
532        }
533        Expr::TryCast(TryCast { expr, data_type }) => {
534            let expr = Box::new(protobuf::TryCastNode {
535                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
536                arrow_type: Some(data_type.try_into()?),
537            });
538            protobuf::LogicalExprNode {
539                expr_type: Some(ExprType::TryCast(expr)),
540            }
541        }
542        Expr::Negative(expr) => {
543            let expr = Box::new(protobuf::NegativeNode {
544                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
545            });
546            protobuf::LogicalExprNode {
547                expr_type: Some(ExprType::Negative(expr)),
548            }
549        }
550        Expr::Unnest(Unnest { expr }) => {
551            let expr = protobuf::Unnest {
552                exprs: vec![serialize_expr(expr.as_ref(), codec)?],
553            };
554            protobuf::LogicalExprNode {
555                expr_type: Some(ExprType::Unnest(expr)),
556            }
557        }
558        Expr::InList(InList {
559            expr,
560            list,
561            negated,
562        }) => {
563            let expr = Box::new(protobuf::InListNode {
564                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
565                list: serialize_exprs(list, codec)?,
566                negated: *negated,
567            });
568            protobuf::LogicalExprNode {
569                expr_type: Some(ExprType::InList(expr)),
570            }
571        }
572        #[expect(deprecated)]
573        Expr::Wildcard { qualifier, .. } => protobuf::LogicalExprNode {
574            expr_type: Some(ExprType::Wildcard(protobuf::Wildcard {
575                qualifier: qualifier.to_owned().map(|x| x.into()),
576            })),
577        },
578        Expr::ScalarSubquery(_)
579        | Expr::InSubquery(_)
580        | Expr::Exists { .. }
581        | Expr::OuterReferenceColumn { .. } => {
582            // we would need to add logical plan operators to datafusion.proto to support this
583            // see discussion in https://github.com/apache/datafusion/issues/2565
584            return Err(Error::General("Proto serialization error: Expr::ScalarSubquery(_) | Expr::InSubquery(_) | Expr::Exists { .. } | Exp:OuterReferenceColumn not supported".to_string()));
585        }
586        Expr::GroupingSet(GroupingSet::Cube(exprs)) => protobuf::LogicalExprNode {
587            expr_type: Some(ExprType::Cube(CubeNode {
588                expr: serialize_exprs(exprs, codec)?,
589            })),
590        },
591        Expr::GroupingSet(GroupingSet::Rollup(exprs)) => protobuf::LogicalExprNode {
592            expr_type: Some(ExprType::Rollup(RollupNode {
593                expr: serialize_exprs(exprs, codec)?,
594            })),
595        },
596        Expr::GroupingSet(GroupingSet::GroupingSets(exprs)) => {
597            protobuf::LogicalExprNode {
598                expr_type: Some(ExprType::GroupingSet(GroupingSetNode {
599                    expr: exprs
600                        .iter()
601                        .map(|expr_list| {
602                            Ok(LogicalExprList {
603                                expr: serialize_exprs(expr_list, codec)?,
604                            })
605                        })
606                        .collect::<Result<Vec<_>, Error>>()?,
607                })),
608            }
609        }
610        Expr::Placeholder(Placeholder { id, field }) => protobuf::LogicalExprNode {
611            expr_type: Some(ExprType::Placeholder(PlaceholderNode {
612                id: id.clone(),
613                data_type: match field {
614                    Some(field) => Some(field.data_type().try_into()?),
615                    None => None,
616                },
617                nullable: field.as_ref().map(|f| f.is_nullable()),
618                metadata: field
619                    .as_ref()
620                    .map(|f| f.metadata().clone())
621                    .unwrap_or(HashMap::new()),
622            })),
623        },
624    };
625
626    Ok(expr_node)
627}
628
629pub fn serialize_sorts<'a, I>(
630    sorts: I,
631    codec: &dyn LogicalExtensionCodec,
632) -> Result<Vec<protobuf::SortExprNode>, Error>
633where
634    I: IntoIterator<Item = &'a SortExpr>,
635{
636    sorts
637        .into_iter()
638        .map(|sort| {
639            let SortExpr {
640                expr,
641                asc,
642                nulls_first,
643            } = sort;
644            Ok(protobuf::SortExprNode {
645                expr: Some(serialize_expr(expr, codec)?),
646                asc: *asc,
647                nulls_first: *nulls_first,
648            })
649        })
650        .collect::<Result<Vec<_>, Error>>()
651}
652
653impl From<TableReference> for protobuf::TableReference {
654    fn from(t: TableReference) -> Self {
655        use protobuf::table_reference::TableReferenceEnum;
656        let table_reference_enum = match t {
657            TableReference::Bare { table } => {
658                TableReferenceEnum::Bare(protobuf::BareTableReference {
659                    table: table.to_string(),
660                })
661            }
662            TableReference::Partial { schema, table } => {
663                TableReferenceEnum::Partial(protobuf::PartialTableReference {
664                    schema: schema.to_string(),
665                    table: table.to_string(),
666                })
667            }
668            TableReference::Full {
669                catalog,
670                schema,
671                table,
672            } => TableReferenceEnum::Full(protobuf::FullTableReference {
673                catalog: catalog.to_string(),
674                schema: schema.to_string(),
675                table: table.to_string(),
676            }),
677        };
678
679        protobuf::TableReference {
680            table_reference_enum: Some(table_reference_enum),
681        }
682    }
683}
684
685impl From<JoinType> for protobuf::JoinType {
686    fn from(t: JoinType) -> Self {
687        match t {
688            JoinType::Inner => protobuf::JoinType::Inner,
689            JoinType::Left => protobuf::JoinType::Left,
690            JoinType::Right => protobuf::JoinType::Right,
691            JoinType::Full => protobuf::JoinType::Full,
692            JoinType::LeftSemi => protobuf::JoinType::Leftsemi,
693            JoinType::RightSemi => protobuf::JoinType::Rightsemi,
694            JoinType::LeftAnti => protobuf::JoinType::Leftanti,
695            JoinType::RightAnti => protobuf::JoinType::Rightanti,
696            JoinType::LeftMark => protobuf::JoinType::Leftmark,
697            JoinType::RightMark => protobuf::JoinType::Rightmark,
698        }
699    }
700}
701
702impl From<JoinConstraint> for protobuf::JoinConstraint {
703    fn from(t: JoinConstraint) -> Self {
704        match t {
705            JoinConstraint::On => protobuf::JoinConstraint::On,
706            JoinConstraint::Using => protobuf::JoinConstraint::Using,
707        }
708    }
709}
710
711impl From<NullEquality> for protobuf::NullEquality {
712    fn from(t: NullEquality) -> Self {
713        match t {
714            NullEquality::NullEqualsNothing => protobuf::NullEquality::NullEqualsNothing,
715            NullEquality::NullEqualsNull => protobuf::NullEquality::NullEqualsNull,
716        }
717    }
718}
719
720impl From<&WriteOp> for protobuf::dml_node::Type {
721    fn from(t: &WriteOp) -> Self {
722        match t {
723            WriteOp::Insert(InsertOp::Append) => protobuf::dml_node::Type::InsertAppend,
724            WriteOp::Insert(InsertOp::Overwrite) => {
725                protobuf::dml_node::Type::InsertOverwrite
726            }
727            WriteOp::Insert(InsertOp::Replace) => protobuf::dml_node::Type::InsertReplace,
728            WriteOp::Delete => protobuf::dml_node::Type::Delete,
729            WriteOp::Update => protobuf::dml_node::Type::Update,
730            WriteOp::Ctas => protobuf::dml_node::Type::Ctas,
731        }
732    }
733}
734
735impl From<NullTreatment> for protobuf::NullTreatment {
736    fn from(t: NullTreatment) -> Self {
737        match t {
738            NullTreatment::RespectNulls => protobuf::NullTreatment::RespectNulls,
739            NullTreatment::IgnoreNulls => protobuf::NullTreatment::IgnoreNulls,
740        }
741    }
742}