datafusion_proto/logical_plan/
to_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Code to convert Arrow schemas and DataFusion logical plans to protocol buffer format, allowing
19//! DataFusion logical plans to be serialized and transmitted between
20//! processes.
21
22use std::collections::HashMap;
23
24use datafusion_common::{NullEquality, TableReference, UnnestOptions};
25use datafusion_expr::dml::InsertOp;
26use datafusion_expr::expr::{
27    self, AggregateFunctionParams, Alias, Between, BinaryExpr, Cast, GroupingSet, InList,
28    Like, NullTreatment, Placeholder, ScalarFunction, Unnest,
29};
30use datafusion_expr::WriteOp;
31use datafusion_expr::{
32    logical_plan::PlanType, logical_plan::StringifiedPlan, Expr, JoinConstraint,
33    JoinType, SortExpr, TryCast, WindowFrame, WindowFrameBound, WindowFrameUnits,
34    WindowFunctionDefinition,
35};
36
37use crate::protobuf::RecursionUnnestOption;
38use crate::protobuf::{
39    self,
40    plan_type::PlanTypeEnum::{
41        AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
42        FinalPhysicalPlan, FinalPhysicalPlanWithSchema, FinalPhysicalPlanWithStats,
43        InitialLogicalPlan, InitialPhysicalPlan, InitialPhysicalPlanWithSchema,
44        InitialPhysicalPlanWithStats, OptimizedLogicalPlan, OptimizedPhysicalPlan,
45        PhysicalPlanError,
46    },
47    AnalyzedLogicalPlanType, CubeNode, EmptyMessage, GroupingSetNode, LogicalExprList,
48    OptimizedLogicalPlanType, OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
49    ToProtoError as Error,
50};
51
52use super::LogicalExtensionCodec;
53
54impl From<&UnnestOptions> for protobuf::UnnestOptions {
55    fn from(opts: &UnnestOptions) -> Self {
56        Self {
57            preserve_nulls: opts.preserve_nulls,
58            recursions: opts
59                .recursions
60                .iter()
61                .map(|r| RecursionUnnestOption {
62                    input_column: Some((&r.input_column).into()),
63                    output_column: Some((&r.output_column).into()),
64                    depth: r.depth as u32,
65                })
66                .collect(),
67        }
68    }
69}
70
71impl From<&StringifiedPlan> for protobuf::StringifiedPlan {
72    fn from(stringified_plan: &StringifiedPlan) -> Self {
73        Self {
74            plan_type: match stringified_plan.clone().plan_type {
75                PlanType::InitialLogicalPlan => Some(protobuf::PlanType {
76                    plan_type_enum: Some(InitialLogicalPlan(EmptyMessage {})),
77                }),
78                PlanType::AnalyzedLogicalPlan { analyzer_name } => {
79                    Some(protobuf::PlanType {
80                        plan_type_enum: Some(AnalyzedLogicalPlan(
81                            AnalyzedLogicalPlanType { analyzer_name },
82                        )),
83                    })
84                }
85                PlanType::FinalAnalyzedLogicalPlan => Some(protobuf::PlanType {
86                    plan_type_enum: Some(FinalAnalyzedLogicalPlan(EmptyMessage {})),
87                }),
88                PlanType::OptimizedLogicalPlan { optimizer_name } => {
89                    Some(protobuf::PlanType {
90                        plan_type_enum: Some(OptimizedLogicalPlan(
91                            OptimizedLogicalPlanType { optimizer_name },
92                        )),
93                    })
94                }
95                PlanType::FinalLogicalPlan => Some(protobuf::PlanType {
96                    plan_type_enum: Some(FinalLogicalPlan(EmptyMessage {})),
97                }),
98                PlanType::InitialPhysicalPlan => Some(protobuf::PlanType {
99                    plan_type_enum: Some(InitialPhysicalPlan(EmptyMessage {})),
100                }),
101                PlanType::OptimizedPhysicalPlan { optimizer_name } => {
102                    Some(protobuf::PlanType {
103                        plan_type_enum: Some(OptimizedPhysicalPlan(
104                            OptimizedPhysicalPlanType { optimizer_name },
105                        )),
106                    })
107                }
108                PlanType::FinalPhysicalPlan => Some(protobuf::PlanType {
109                    plan_type_enum: Some(FinalPhysicalPlan(EmptyMessage {})),
110                }),
111                PlanType::InitialPhysicalPlanWithStats => Some(protobuf::PlanType {
112                    plan_type_enum: Some(InitialPhysicalPlanWithStats(EmptyMessage {})),
113                }),
114                PlanType::InitialPhysicalPlanWithSchema => Some(protobuf::PlanType {
115                    plan_type_enum: Some(InitialPhysicalPlanWithSchema(EmptyMessage {})),
116                }),
117                PlanType::FinalPhysicalPlanWithStats => Some(protobuf::PlanType {
118                    plan_type_enum: Some(FinalPhysicalPlanWithStats(EmptyMessage {})),
119                }),
120                PlanType::FinalPhysicalPlanWithSchema => Some(protobuf::PlanType {
121                    plan_type_enum: Some(FinalPhysicalPlanWithSchema(EmptyMessage {})),
122                }),
123                PlanType::PhysicalPlanError => Some(protobuf::PlanType {
124                    plan_type_enum: Some(PhysicalPlanError(EmptyMessage {})),
125                }),
126            },
127            plan: stringified_plan.plan.to_string(),
128        }
129    }
130}
131
132impl From<WindowFrameUnits> for protobuf::WindowFrameUnits {
133    fn from(units: WindowFrameUnits) -> Self {
134        match units {
135            WindowFrameUnits::Rows => Self::Rows,
136            WindowFrameUnits::Range => Self::Range,
137            WindowFrameUnits::Groups => Self::Groups,
138        }
139    }
140}
141
142impl TryFrom<&WindowFrameBound> for protobuf::WindowFrameBound {
143    type Error = Error;
144
145    fn try_from(bound: &WindowFrameBound) -> Result<Self, Self::Error> {
146        Ok(match bound {
147            WindowFrameBound::CurrentRow => Self {
148                window_frame_bound_type: protobuf::WindowFrameBoundType::CurrentRow
149                    .into(),
150                bound_value: None,
151            },
152            WindowFrameBound::Preceding(v) => Self {
153                window_frame_bound_type: protobuf::WindowFrameBoundType::Preceding.into(),
154                bound_value: Some(v.try_into()?),
155            },
156            WindowFrameBound::Following(v) => Self {
157                window_frame_bound_type: protobuf::WindowFrameBoundType::Following.into(),
158                bound_value: Some(v.try_into()?),
159            },
160        })
161    }
162}
163
164impl TryFrom<&WindowFrame> for protobuf::WindowFrame {
165    type Error = Error;
166
167    fn try_from(window: &WindowFrame) -> Result<Self, Self::Error> {
168        Ok(Self {
169            window_frame_units: protobuf::WindowFrameUnits::from(window.units).into(),
170            start_bound: Some((&window.start_bound).try_into()?),
171            end_bound: Some(protobuf::window_frame::EndBound::Bound(
172                (&window.end_bound).try_into()?,
173            )),
174        })
175    }
176}
177
178pub fn serialize_exprs<'a, I>(
179    exprs: I,
180    codec: &dyn LogicalExtensionCodec,
181) -> Result<Vec<protobuf::LogicalExprNode>, Error>
182where
183    I: IntoIterator<Item = &'a Expr>,
184{
185    exprs
186        .into_iter()
187        .map(|expr| serialize_expr(expr, codec))
188        .collect::<Result<Vec<_>, Error>>()
189}
190
191pub fn serialize_expr(
192    expr: &Expr,
193    codec: &dyn LogicalExtensionCodec,
194) -> Result<protobuf::LogicalExprNode, Error> {
195    use protobuf::logical_expr_node::ExprType;
196
197    let expr_node = match expr {
198        Expr::Column(c) => protobuf::LogicalExprNode {
199            expr_type: Some(ExprType::Column(c.into())),
200        },
201        Expr::Alias(Alias {
202            expr,
203            relation,
204            name,
205            metadata,
206        }) => {
207            let alias = Box::new(protobuf::AliasNode {
208                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
209                relation: relation
210                    .to_owned()
211                    .map(|r| vec![r.into()])
212                    .unwrap_or(vec![]),
213                alias: name.to_owned(),
214                metadata: metadata
215                    .as_ref()
216                    .map(|m| m.to_hashmap())
217                    .unwrap_or(HashMap::new()),
218            });
219            protobuf::LogicalExprNode {
220                expr_type: Some(ExprType::Alias(alias)),
221            }
222        }
223        Expr::Literal(value, _) => {
224            let pb_value: protobuf::ScalarValue = value.try_into()?;
225            protobuf::LogicalExprNode {
226                expr_type: Some(ExprType::Literal(pb_value)),
227            }
228        }
229        Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
230            // Try to linerize a nested binary expression tree of the same operator
231            // into a flat vector of expressions.
232            let mut exprs = vec![right.as_ref()];
233            let mut current_expr = left.as_ref();
234            while let Expr::BinaryExpr(BinaryExpr {
235                left,
236                op: current_op,
237                right,
238            }) = current_expr
239            {
240                if current_op == op {
241                    exprs.push(right.as_ref());
242                    current_expr = left.as_ref();
243                } else {
244                    break;
245                }
246            }
247            exprs.push(current_expr);
248
249            let binary_expr = protobuf::BinaryExprNode {
250                // We need to reverse exprs since operands are expected to be
251                // linearized from left innermost to right outermost (but while
252                // traversing the chain we do the exact opposite).
253                operands: serialize_exprs(exprs.into_iter().rev(), codec)?,
254                op: format!("{op:?}"),
255            };
256            protobuf::LogicalExprNode {
257                expr_type: Some(ExprType::BinaryExpr(binary_expr)),
258            }
259        }
260        Expr::Like(Like {
261            negated,
262            expr,
263            pattern,
264            escape_char,
265            case_insensitive,
266        }) => {
267            if *case_insensitive {
268                let pb = Box::new(protobuf::ILikeNode {
269                    negated: *negated,
270                    expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
271                    pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
272                    escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
273                });
274
275                protobuf::LogicalExprNode {
276                    expr_type: Some(ExprType::Ilike(pb)),
277                }
278            } else {
279                let pb = Box::new(protobuf::LikeNode {
280                    negated: *negated,
281                    expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
282                    pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
283                    escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
284                });
285
286                protobuf::LogicalExprNode {
287                    expr_type: Some(ExprType::Like(pb)),
288                }
289            }
290        }
291        Expr::SimilarTo(Like {
292            negated,
293            expr,
294            pattern,
295            escape_char,
296            case_insensitive: _,
297        }) => {
298            let pb = Box::new(protobuf::SimilarToNode {
299                negated: *negated,
300                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
301                pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
302                escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
303            });
304            protobuf::LogicalExprNode {
305                expr_type: Some(ExprType::SimilarTo(pb)),
306            }
307        }
308        Expr::WindowFunction(window_fun) => {
309            let expr::WindowFunction {
310                ref fun,
311                params:
312                    expr::WindowFunctionParams {
313                        ref args,
314                        ref partition_by,
315                        ref order_by,
316                        ref window_frame,
317                        ref null_treatment,
318                        ref distinct,
319                        ref filter,
320                    },
321            } = window_fun.as_ref();
322            let mut buf = Vec::new();
323            let window_function = match fun {
324                WindowFunctionDefinition::AggregateUDF(aggr_udf) => {
325                    let _ = codec.try_encode_udaf(aggr_udf, &mut buf);
326                    protobuf::window_expr_node::WindowFunction::Udaf(
327                        aggr_udf.name().to_string(),
328                    )
329                }
330                WindowFunctionDefinition::WindowUDF(window_udf) => {
331                    let _ = codec.try_encode_udwf(window_udf, &mut buf);
332                    protobuf::window_expr_node::WindowFunction::Udwf(
333                        window_udf.name().to_string(),
334                    )
335                }
336            };
337            let fun_definition = (!buf.is_empty()).then_some(buf);
338            let partition_by = serialize_exprs(partition_by, codec)?;
339            let order_by = serialize_sorts(order_by, codec)?;
340
341            let window_frame: Option<protobuf::WindowFrame> =
342                Some(window_frame.try_into()?);
343
344            let window_expr = protobuf::WindowExprNode {
345                exprs: serialize_exprs(args, codec)?,
346                window_function: Some(window_function),
347                partition_by,
348                order_by,
349                window_frame,
350                distinct: *distinct,
351                filter: match filter {
352                    Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
353                    None => None,
354                },
355                null_treatment: null_treatment
356                    .map(|nt| protobuf::NullTreatment::from(nt).into()),
357                fun_definition,
358            };
359            protobuf::LogicalExprNode {
360                expr_type: Some(ExprType::WindowExpr(Box::new(window_expr))),
361            }
362        }
363        Expr::AggregateFunction(expr::AggregateFunction {
364            ref func,
365            params:
366                AggregateFunctionParams {
367                    ref args,
368                    ref distinct,
369                    ref filter,
370                    ref order_by,
371                    ref null_treatment,
372                },
373        }) => {
374            let mut buf = Vec::new();
375            let _ = codec.try_encode_udaf(func, &mut buf);
376            protobuf::LogicalExprNode {
377                expr_type: Some(ExprType::AggregateUdfExpr(Box::new(
378                    protobuf::AggregateUdfExprNode {
379                        fun_name: func.name().to_string(),
380                        args: serialize_exprs(args, codec)?,
381                        distinct: *distinct,
382                        filter: match filter {
383                            Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
384                            None => None,
385                        },
386                        order_by: serialize_sorts(order_by, codec)?,
387                        fun_definition: (!buf.is_empty()).then_some(buf),
388                        null_treatment: null_treatment
389                            .map(|nt| protobuf::NullTreatment::from(nt).into()),
390                    },
391                ))),
392            }
393        }
394
395        Expr::ScalarVariable(_, _) => {
396            return Err(Error::General(
397                "Proto serialization error: Scalar Variable not supported".to_string(),
398            ))
399        }
400        Expr::ScalarFunction(ScalarFunction { func, args }) => {
401            let mut buf = Vec::new();
402            let _ = codec.try_encode_udf(func, &mut buf);
403            protobuf::LogicalExprNode {
404                expr_type: Some(ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
405                    fun_name: func.name().to_string(),
406                    fun_definition: (!buf.is_empty()).then_some(buf),
407                    args: serialize_exprs(args, codec)?,
408                })),
409            }
410        }
411        Expr::Not(expr) => {
412            let expr = Box::new(protobuf::Not {
413                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
414            });
415            protobuf::LogicalExprNode {
416                expr_type: Some(ExprType::NotExpr(expr)),
417            }
418        }
419        Expr::IsNull(expr) => {
420            let expr = Box::new(protobuf::IsNull {
421                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
422            });
423            protobuf::LogicalExprNode {
424                expr_type: Some(ExprType::IsNullExpr(expr)),
425            }
426        }
427        Expr::IsNotNull(expr) => {
428            let expr = Box::new(protobuf::IsNotNull {
429                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
430            });
431            protobuf::LogicalExprNode {
432                expr_type: Some(ExprType::IsNotNullExpr(expr)),
433            }
434        }
435        Expr::IsTrue(expr) => {
436            let expr = Box::new(protobuf::IsTrue {
437                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
438            });
439            protobuf::LogicalExprNode {
440                expr_type: Some(ExprType::IsTrue(expr)),
441            }
442        }
443        Expr::IsFalse(expr) => {
444            let expr = Box::new(protobuf::IsFalse {
445                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
446            });
447            protobuf::LogicalExprNode {
448                expr_type: Some(ExprType::IsFalse(expr)),
449            }
450        }
451        Expr::IsUnknown(expr) => {
452            let expr = Box::new(protobuf::IsUnknown {
453                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
454            });
455            protobuf::LogicalExprNode {
456                expr_type: Some(ExprType::IsUnknown(expr)),
457            }
458        }
459        Expr::IsNotTrue(expr) => {
460            let expr = Box::new(protobuf::IsNotTrue {
461                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
462            });
463            protobuf::LogicalExprNode {
464                expr_type: Some(ExprType::IsNotTrue(expr)),
465            }
466        }
467        Expr::IsNotFalse(expr) => {
468            let expr = Box::new(protobuf::IsNotFalse {
469                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
470            });
471            protobuf::LogicalExprNode {
472                expr_type: Some(ExprType::IsNotFalse(expr)),
473            }
474        }
475        Expr::IsNotUnknown(expr) => {
476            let expr = Box::new(protobuf::IsNotUnknown {
477                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
478            });
479            protobuf::LogicalExprNode {
480                expr_type: Some(ExprType::IsNotUnknown(expr)),
481            }
482        }
483        Expr::Between(Between {
484            expr,
485            negated,
486            low,
487            high,
488        }) => {
489            let expr = Box::new(protobuf::BetweenNode {
490                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
491                negated: *negated,
492                low: Some(Box::new(serialize_expr(low.as_ref(), codec)?)),
493                high: Some(Box::new(serialize_expr(high.as_ref(), codec)?)),
494            });
495            protobuf::LogicalExprNode {
496                expr_type: Some(ExprType::Between(expr)),
497            }
498        }
499        Expr::Case(case) => {
500            let when_then_expr = case
501                .when_then_expr
502                .iter()
503                .map(|(w, t)| {
504                    Ok(protobuf::WhenThen {
505                        when_expr: Some(serialize_expr(w.as_ref(), codec)?),
506                        then_expr: Some(serialize_expr(t.as_ref(), codec)?),
507                    })
508                })
509                .collect::<Result<Vec<protobuf::WhenThen>, Error>>()?;
510            let expr = Box::new(protobuf::CaseNode {
511                expr: match &case.expr {
512                    Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
513                    None => None,
514                },
515                when_then_expr,
516                else_expr: match &case.else_expr {
517                    Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
518                    None => None,
519                },
520            });
521            protobuf::LogicalExprNode {
522                expr_type: Some(ExprType::Case(expr)),
523            }
524        }
525        Expr::Cast(Cast { expr, data_type }) => {
526            let expr = Box::new(protobuf::CastNode {
527                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
528                arrow_type: Some(data_type.try_into()?),
529            });
530            protobuf::LogicalExprNode {
531                expr_type: Some(ExprType::Cast(expr)),
532            }
533        }
534        Expr::TryCast(TryCast { expr, data_type }) => {
535            let expr = Box::new(protobuf::TryCastNode {
536                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
537                arrow_type: Some(data_type.try_into()?),
538            });
539            protobuf::LogicalExprNode {
540                expr_type: Some(ExprType::TryCast(expr)),
541            }
542        }
543        Expr::Negative(expr) => {
544            let expr = Box::new(protobuf::NegativeNode {
545                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
546            });
547            protobuf::LogicalExprNode {
548                expr_type: Some(ExprType::Negative(expr)),
549            }
550        }
551        Expr::Unnest(Unnest { expr }) => {
552            let expr = protobuf::Unnest {
553                exprs: vec![serialize_expr(expr.as_ref(), codec)?],
554            };
555            protobuf::LogicalExprNode {
556                expr_type: Some(ExprType::Unnest(expr)),
557            }
558        }
559        Expr::InList(InList {
560            expr,
561            list,
562            negated,
563        }) => {
564            let expr = Box::new(protobuf::InListNode {
565                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
566                list: serialize_exprs(list, codec)?,
567                negated: *negated,
568            });
569            protobuf::LogicalExprNode {
570                expr_type: Some(ExprType::InList(expr)),
571            }
572        }
573        #[expect(deprecated)]
574        Expr::Wildcard { qualifier, .. } => protobuf::LogicalExprNode {
575            expr_type: Some(ExprType::Wildcard(protobuf::Wildcard {
576                qualifier: qualifier.to_owned().map(|x| x.into()),
577            })),
578        },
579        Expr::ScalarSubquery(_)
580        | Expr::InSubquery(_)
581        | Expr::Exists { .. }
582        | Expr::OuterReferenceColumn { .. } => {
583            // we would need to add logical plan operators to datafusion.proto to support this
584            // see discussion in https://github.com/apache/datafusion/issues/2565
585            return Err(Error::General("Proto serialization error: Expr::ScalarSubquery(_) | Expr::InSubquery(_) | Expr::Exists { .. } | Exp:OuterReferenceColumn not supported".to_string()));
586        }
587        Expr::GroupingSet(GroupingSet::Cube(exprs)) => protobuf::LogicalExprNode {
588            expr_type: Some(ExprType::Cube(CubeNode {
589                expr: serialize_exprs(exprs, codec)?,
590            })),
591        },
592        Expr::GroupingSet(GroupingSet::Rollup(exprs)) => protobuf::LogicalExprNode {
593            expr_type: Some(ExprType::Rollup(RollupNode {
594                expr: serialize_exprs(exprs, codec)?,
595            })),
596        },
597        Expr::GroupingSet(GroupingSet::GroupingSets(exprs)) => {
598            protobuf::LogicalExprNode {
599                expr_type: Some(ExprType::GroupingSet(GroupingSetNode {
600                    expr: exprs
601                        .iter()
602                        .map(|expr_list| {
603                            Ok(LogicalExprList {
604                                expr: serialize_exprs(expr_list, codec)?,
605                            })
606                        })
607                        .collect::<Result<Vec<_>, Error>>()?,
608                })),
609            }
610        }
611        Expr::Placeholder(Placeholder { id, field }) => protobuf::LogicalExprNode {
612            expr_type: Some(ExprType::Placeholder(PlaceholderNode {
613                id: id.clone(),
614                data_type: match field {
615                    Some(field) => Some(field.data_type().try_into()?),
616                    None => None,
617                },
618                nullable: field.as_ref().map(|f| f.is_nullable()),
619                metadata: field
620                    .as_ref()
621                    .map(|f| f.metadata().clone())
622                    .unwrap_or(HashMap::new()),
623            })),
624        },
625    };
626
627    Ok(expr_node)
628}
629
630pub fn serialize_sorts<'a, I>(
631    sorts: I,
632    codec: &dyn LogicalExtensionCodec,
633) -> Result<Vec<protobuf::SortExprNode>, Error>
634where
635    I: IntoIterator<Item = &'a SortExpr>,
636{
637    sorts
638        .into_iter()
639        .map(|sort| {
640            let SortExpr {
641                expr,
642                asc,
643                nulls_first,
644            } = sort;
645            Ok(protobuf::SortExprNode {
646                expr: Some(serialize_expr(expr, codec)?),
647                asc: *asc,
648                nulls_first: *nulls_first,
649            })
650        })
651        .collect::<Result<Vec<_>, Error>>()
652}
653
654impl From<TableReference> for protobuf::TableReference {
655    fn from(t: TableReference) -> Self {
656        use protobuf::table_reference::TableReferenceEnum;
657        let table_reference_enum = match t {
658            TableReference::Bare { table } => {
659                TableReferenceEnum::Bare(protobuf::BareTableReference {
660                    table: table.to_string(),
661                })
662            }
663            TableReference::Partial { schema, table } => {
664                TableReferenceEnum::Partial(protobuf::PartialTableReference {
665                    schema: schema.to_string(),
666                    table: table.to_string(),
667                })
668            }
669            TableReference::Full {
670                catalog,
671                schema,
672                table,
673            } => TableReferenceEnum::Full(protobuf::FullTableReference {
674                catalog: catalog.to_string(),
675                schema: schema.to_string(),
676                table: table.to_string(),
677            }),
678        };
679
680        protobuf::TableReference {
681            table_reference_enum: Some(table_reference_enum),
682        }
683    }
684}
685
686impl From<JoinType> for protobuf::JoinType {
687    fn from(t: JoinType) -> Self {
688        match t {
689            JoinType::Inner => protobuf::JoinType::Inner,
690            JoinType::Left => protobuf::JoinType::Left,
691            JoinType::Right => protobuf::JoinType::Right,
692            JoinType::Full => protobuf::JoinType::Full,
693            JoinType::LeftSemi => protobuf::JoinType::Leftsemi,
694            JoinType::RightSemi => protobuf::JoinType::Rightsemi,
695            JoinType::LeftAnti => protobuf::JoinType::Leftanti,
696            JoinType::RightAnti => protobuf::JoinType::Rightanti,
697            JoinType::LeftMark => protobuf::JoinType::Leftmark,
698            JoinType::RightMark => protobuf::JoinType::Rightmark,
699        }
700    }
701}
702
703impl From<JoinConstraint> for protobuf::JoinConstraint {
704    fn from(t: JoinConstraint) -> Self {
705        match t {
706            JoinConstraint::On => protobuf::JoinConstraint::On,
707            JoinConstraint::Using => protobuf::JoinConstraint::Using,
708        }
709    }
710}
711
712impl From<NullEquality> for protobuf::NullEquality {
713    fn from(t: NullEquality) -> Self {
714        match t {
715            NullEquality::NullEqualsNothing => protobuf::NullEquality::NullEqualsNothing,
716            NullEquality::NullEqualsNull => protobuf::NullEquality::NullEqualsNull,
717        }
718    }
719}
720
721impl From<&WriteOp> for protobuf::dml_node::Type {
722    fn from(t: &WriteOp) -> Self {
723        match t {
724            WriteOp::Insert(InsertOp::Append) => protobuf::dml_node::Type::InsertAppend,
725            WriteOp::Insert(InsertOp::Overwrite) => {
726                protobuf::dml_node::Type::InsertOverwrite
727            }
728            WriteOp::Insert(InsertOp::Replace) => protobuf::dml_node::Type::InsertReplace,
729            WriteOp::Delete => protobuf::dml_node::Type::Delete,
730            WriteOp::Update => protobuf::dml_node::Type::Update,
731            WriteOp::Ctas => protobuf::dml_node::Type::Ctas,
732        }
733    }
734}
735
736impl From<NullTreatment> for protobuf::NullTreatment {
737    fn from(t: NullTreatment) -> Self {
738        match t {
739            NullTreatment::RespectNulls => protobuf::NullTreatment::RespectNulls,
740            NullTreatment::IgnoreNulls => protobuf::NullTreatment::IgnoreNulls,
741        }
742    }
743}