Skip to main content

datafusion_proto/logical_plan/
to_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Code to convert Arrow schemas and DataFusion logical plans to protocol buffer format, allowing
19//! DataFusion logical plans to be serialized and transmitted between
20//! processes.
21
22use std::collections::HashMap;
23
24use datafusion_common::{NullEquality, TableReference, UnnestOptions};
25use datafusion_expr::WriteOp;
26use datafusion_expr::dml::InsertOp;
27use datafusion_expr::expr::{
28    self, AggregateFunctionParams, Alias, Between, BinaryExpr, Cast, GroupingSet, InList,
29    Like, NullTreatment, Placeholder, ScalarFunction, Unnest,
30};
31use datafusion_expr::logical_plan::Subquery;
32use datafusion_expr::{
33    Expr, JoinConstraint, JoinType, SortExpr, TryCast, WindowFrame, WindowFrameBound,
34    WindowFrameUnits, WindowFunctionDefinition, logical_plan::PlanType,
35    logical_plan::StringifiedPlan,
36};
37
38use crate::protobuf::RecursionUnnestOption;
39use crate::protobuf::{
40    self, AnalyzedLogicalPlanType, CubeNode, EmptyMessage, GroupingSetNode,
41    LogicalExprList, OptimizedLogicalPlanType, OptimizedPhysicalPlanType,
42    PlaceholderNode, RollupNode, ToProtoError as Error,
43    plan_type::PlanTypeEnum::{
44        AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
45        FinalPhysicalPlan, FinalPhysicalPlanWithSchema, FinalPhysicalPlanWithStats,
46        InitialLogicalPlan, InitialPhysicalPlan, InitialPhysicalPlanWithSchema,
47        InitialPhysicalPlanWithStats, OptimizedLogicalPlan, OptimizedPhysicalPlan,
48        PhysicalPlanError,
49    },
50};
51
52use super::{AsLogicalPlan, LogicalExtensionCodec};
53use crate::protobuf::LogicalPlanNode;
54
55impl From<&UnnestOptions> for protobuf::UnnestOptions {
56    fn from(opts: &UnnestOptions) -> Self {
57        Self {
58            preserve_nulls: opts.preserve_nulls,
59            recursions: opts
60                .recursions
61                .iter()
62                .map(|r| RecursionUnnestOption {
63                    input_column: Some((&r.input_column).into()),
64                    output_column: Some((&r.output_column).into()),
65                    depth: r.depth as u32,
66                })
67                .collect(),
68        }
69    }
70}
71
72impl From<&StringifiedPlan> for protobuf::StringifiedPlan {
73    fn from(stringified_plan: &StringifiedPlan) -> Self {
74        Self {
75            plan_type: match stringified_plan.clone().plan_type {
76                PlanType::InitialLogicalPlan => Some(protobuf::PlanType {
77                    plan_type_enum: Some(InitialLogicalPlan(EmptyMessage {})),
78                }),
79                PlanType::AnalyzedLogicalPlan { analyzer_name } => {
80                    Some(protobuf::PlanType {
81                        plan_type_enum: Some(AnalyzedLogicalPlan(
82                            AnalyzedLogicalPlanType { analyzer_name },
83                        )),
84                    })
85                }
86                PlanType::FinalAnalyzedLogicalPlan => Some(protobuf::PlanType {
87                    plan_type_enum: Some(FinalAnalyzedLogicalPlan(EmptyMessage {})),
88                }),
89                PlanType::OptimizedLogicalPlan { optimizer_name } => {
90                    Some(protobuf::PlanType {
91                        plan_type_enum: Some(OptimizedLogicalPlan(
92                            OptimizedLogicalPlanType { optimizer_name },
93                        )),
94                    })
95                }
96                PlanType::FinalLogicalPlan => Some(protobuf::PlanType {
97                    plan_type_enum: Some(FinalLogicalPlan(EmptyMessage {})),
98                }),
99                PlanType::InitialPhysicalPlan => Some(protobuf::PlanType {
100                    plan_type_enum: Some(InitialPhysicalPlan(EmptyMessage {})),
101                }),
102                PlanType::OptimizedPhysicalPlan { optimizer_name } => {
103                    Some(protobuf::PlanType {
104                        plan_type_enum: Some(OptimizedPhysicalPlan(
105                            OptimizedPhysicalPlanType { optimizer_name },
106                        )),
107                    })
108                }
109                PlanType::FinalPhysicalPlan => Some(protobuf::PlanType {
110                    plan_type_enum: Some(FinalPhysicalPlan(EmptyMessage {})),
111                }),
112                PlanType::InitialPhysicalPlanWithStats => Some(protobuf::PlanType {
113                    plan_type_enum: Some(InitialPhysicalPlanWithStats(EmptyMessage {})),
114                }),
115                PlanType::InitialPhysicalPlanWithSchema => Some(protobuf::PlanType {
116                    plan_type_enum: Some(InitialPhysicalPlanWithSchema(EmptyMessage {})),
117                }),
118                PlanType::FinalPhysicalPlanWithStats => Some(protobuf::PlanType {
119                    plan_type_enum: Some(FinalPhysicalPlanWithStats(EmptyMessage {})),
120                }),
121                PlanType::FinalPhysicalPlanWithSchema => Some(protobuf::PlanType {
122                    plan_type_enum: Some(FinalPhysicalPlanWithSchema(EmptyMessage {})),
123                }),
124                PlanType::PhysicalPlanError => Some(protobuf::PlanType {
125                    plan_type_enum: Some(PhysicalPlanError(EmptyMessage {})),
126                }),
127            },
128            plan: stringified_plan.plan.to_string(),
129        }
130    }
131}
132
133impl From<WindowFrameUnits> for protobuf::WindowFrameUnits {
134    fn from(units: WindowFrameUnits) -> Self {
135        match units {
136            WindowFrameUnits::Rows => Self::Rows,
137            WindowFrameUnits::Range => Self::Range,
138            WindowFrameUnits::Groups => Self::Groups,
139        }
140    }
141}
142
143impl TryFrom<&WindowFrameBound> for protobuf::WindowFrameBound {
144    type Error = Error;
145
146    fn try_from(bound: &WindowFrameBound) -> Result<Self, Self::Error> {
147        Ok(match bound {
148            WindowFrameBound::CurrentRow => Self {
149                window_frame_bound_type: protobuf::WindowFrameBoundType::CurrentRow
150                    .into(),
151                bound_value: None,
152            },
153            WindowFrameBound::Preceding(v) => Self {
154                window_frame_bound_type: protobuf::WindowFrameBoundType::Preceding.into(),
155                bound_value: Some(v.try_into()?),
156            },
157            WindowFrameBound::Following(v) => Self {
158                window_frame_bound_type: protobuf::WindowFrameBoundType::Following.into(),
159                bound_value: Some(v.try_into()?),
160            },
161        })
162    }
163}
164
165impl TryFrom<&WindowFrame> for protobuf::WindowFrame {
166    type Error = Error;
167
168    fn try_from(window: &WindowFrame) -> Result<Self, Self::Error> {
169        Ok(Self {
170            window_frame_units: protobuf::WindowFrameUnits::from(window.units).into(),
171            start_bound: Some((&window.start_bound).try_into()?),
172            end_bound: Some(protobuf::window_frame::EndBound::Bound(
173                (&window.end_bound).try_into()?,
174            )),
175        })
176    }
177}
178
179pub fn serialize_exprs<'a, I>(
180    exprs: I,
181    codec: &dyn LogicalExtensionCodec,
182) -> Result<Vec<protobuf::LogicalExprNode>, Error>
183where
184    I: IntoIterator<Item = &'a Expr>,
185{
186    exprs
187        .into_iter()
188        .map(|expr| serialize_expr(expr, codec))
189        .collect::<Result<Vec<_>, Error>>()
190}
191
192pub fn serialize_expr(
193    expr: &Expr,
194    codec: &dyn LogicalExtensionCodec,
195) -> Result<protobuf::LogicalExprNode, Error> {
196    use protobuf::logical_expr_node::ExprType;
197
198    let expr_node = match expr {
199        Expr::Column(c) => protobuf::LogicalExprNode {
200            expr_type: Some(ExprType::Column(c.into())),
201        },
202        Expr::Alias(Alias {
203            expr,
204            relation,
205            name,
206            metadata,
207        }) => {
208            let alias = Box::new(protobuf::AliasNode {
209                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
210                relation: relation
211                    .to_owned()
212                    .map(|r| vec![r.into()])
213                    .unwrap_or(vec![]),
214                alias: name.to_owned(),
215                metadata: metadata
216                    .as_ref()
217                    .map(|m| m.to_hashmap())
218                    .unwrap_or(HashMap::new()),
219            });
220            protobuf::LogicalExprNode {
221                expr_type: Some(ExprType::Alias(alias)),
222            }
223        }
224        Expr::Literal(value, _) => {
225            let pb_value: protobuf::ScalarValue = value.try_into()?;
226            protobuf::LogicalExprNode {
227                expr_type: Some(ExprType::Literal(pb_value)),
228            }
229        }
230        Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
231            // Try to linerize a nested binary expression tree of the same operator
232            // into a flat vector of expressions.
233            let mut exprs = vec![right.as_ref()];
234            let mut current_expr = left.as_ref();
235            while let Expr::BinaryExpr(BinaryExpr {
236                left,
237                op: current_op,
238                right,
239            }) = current_expr
240            {
241                if current_op == op {
242                    exprs.push(right.as_ref());
243                    current_expr = left.as_ref();
244                } else {
245                    break;
246                }
247            }
248            exprs.push(current_expr);
249
250            let binary_expr = protobuf::BinaryExprNode {
251                // We need to reverse exprs since operands are expected to be
252                // linearized from left innermost to right outermost (but while
253                // traversing the chain we do the exact opposite).
254                operands: serialize_exprs(exprs.into_iter().rev(), codec)?,
255                op: format!("{op:?}"),
256            };
257            protobuf::LogicalExprNode {
258                expr_type: Some(ExprType::BinaryExpr(binary_expr)),
259            }
260        }
261        Expr::Like(Like {
262            negated,
263            expr,
264            pattern,
265            escape_char,
266            case_insensitive,
267        }) => {
268            if *case_insensitive {
269                let pb = Box::new(protobuf::ILikeNode {
270                    negated: *negated,
271                    expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
272                    pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
273                    escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
274                });
275
276                protobuf::LogicalExprNode {
277                    expr_type: Some(ExprType::Ilike(pb)),
278                }
279            } else {
280                let pb = Box::new(protobuf::LikeNode {
281                    negated: *negated,
282                    expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
283                    pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
284                    escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
285                });
286
287                protobuf::LogicalExprNode {
288                    expr_type: Some(ExprType::Like(pb)),
289                }
290            }
291        }
292        Expr::SimilarTo(Like {
293            negated,
294            expr,
295            pattern,
296            escape_char,
297            case_insensitive: _,
298        }) => {
299            let pb = Box::new(protobuf::SimilarToNode {
300                negated: *negated,
301                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
302                pattern: Some(Box::new(serialize_expr(pattern.as_ref(), codec)?)),
303                escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
304            });
305            protobuf::LogicalExprNode {
306                expr_type: Some(ExprType::SimilarTo(pb)),
307            }
308        }
309        Expr::WindowFunction(window_fun) => {
310            let expr::WindowFunction {
311                fun,
312                params:
313                    expr::WindowFunctionParams {
314                        args,
315                        partition_by,
316                        order_by,
317                        window_frame,
318                        null_treatment,
319                        distinct,
320                        filter,
321                    },
322            } = window_fun.as_ref();
323            let mut buf = Vec::new();
324            let window_function = match fun {
325                WindowFunctionDefinition::AggregateUDF(aggr_udf) => {
326                    let _ = codec.try_encode_udaf(aggr_udf, &mut buf);
327                    protobuf::window_expr_node::WindowFunction::Udaf(
328                        aggr_udf.name().to_string(),
329                    )
330                }
331                WindowFunctionDefinition::WindowUDF(window_udf) => {
332                    let _ = codec.try_encode_udwf(window_udf, &mut buf);
333                    protobuf::window_expr_node::WindowFunction::Udwf(
334                        window_udf.name().to_string(),
335                    )
336                }
337            };
338            let fun_definition = (!buf.is_empty()).then_some(buf);
339            let partition_by = serialize_exprs(partition_by, codec)?;
340            let order_by = serialize_sorts(order_by, codec)?;
341
342            let window_frame: Option<protobuf::WindowFrame> =
343                Some(window_frame.try_into()?);
344
345            let window_expr = protobuf::WindowExprNode {
346                exprs: serialize_exprs(args, codec)?,
347                window_function: Some(window_function),
348                partition_by,
349                order_by,
350                window_frame,
351                distinct: *distinct,
352                filter: match filter {
353                    Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
354                    None => None,
355                },
356                null_treatment: null_treatment
357                    .map(|nt| protobuf::NullTreatment::from(nt).into()),
358                fun_definition,
359            };
360            protobuf::LogicalExprNode {
361                expr_type: Some(ExprType::WindowExpr(Box::new(window_expr))),
362            }
363        }
364        Expr::AggregateFunction(expr::AggregateFunction {
365            func,
366            params:
367                AggregateFunctionParams {
368                    args,
369                    distinct,
370                    filter,
371                    order_by,
372                    null_treatment,
373                },
374        }) => {
375            let mut buf = Vec::new();
376            let _ = codec.try_encode_udaf(func, &mut buf);
377            protobuf::LogicalExprNode {
378                expr_type: Some(ExprType::AggregateUdfExpr(Box::new(
379                    protobuf::AggregateUdfExprNode {
380                        fun_name: func.name().to_string(),
381                        args: serialize_exprs(args, codec)?,
382                        distinct: *distinct,
383                        filter: match filter {
384                            Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
385                            None => None,
386                        },
387                        order_by: serialize_sorts(order_by, codec)?,
388                        fun_definition: (!buf.is_empty()).then_some(buf),
389                        null_treatment: null_treatment
390                            .map(|nt| protobuf::NullTreatment::from(nt).into()),
391                    },
392                ))),
393            }
394        }
395
396        Expr::ScalarVariable(_, _) => {
397            return Err(Error::General(
398                "Proto serialization error: Scalar Variable not supported".to_string(),
399            ));
400        }
401        Expr::ScalarFunction(ScalarFunction { func, args }) => {
402            let mut buf = Vec::new();
403            let _ = codec.try_encode_udf(func, &mut buf);
404            protobuf::LogicalExprNode {
405                expr_type: Some(ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
406                    fun_name: func.name().to_string(),
407                    fun_definition: (!buf.is_empty()).then_some(buf),
408                    args: serialize_exprs(args, codec)?,
409                })),
410            }
411        }
412        Expr::Not(expr) => {
413            let expr = Box::new(protobuf::Not {
414                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
415            });
416            protobuf::LogicalExprNode {
417                expr_type: Some(ExprType::NotExpr(expr)),
418            }
419        }
420        Expr::IsNull(expr) => {
421            let expr = Box::new(protobuf::IsNull {
422                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
423            });
424            protobuf::LogicalExprNode {
425                expr_type: Some(ExprType::IsNullExpr(expr)),
426            }
427        }
428        Expr::IsNotNull(expr) => {
429            let expr = Box::new(protobuf::IsNotNull {
430                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
431            });
432            protobuf::LogicalExprNode {
433                expr_type: Some(ExprType::IsNotNullExpr(expr)),
434            }
435        }
436        Expr::IsTrue(expr) => {
437            let expr = Box::new(protobuf::IsTrue {
438                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
439            });
440            protobuf::LogicalExprNode {
441                expr_type: Some(ExprType::IsTrue(expr)),
442            }
443        }
444        Expr::IsFalse(expr) => {
445            let expr = Box::new(protobuf::IsFalse {
446                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
447            });
448            protobuf::LogicalExprNode {
449                expr_type: Some(ExprType::IsFalse(expr)),
450            }
451        }
452        Expr::IsUnknown(expr) => {
453            let expr = Box::new(protobuf::IsUnknown {
454                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
455            });
456            protobuf::LogicalExprNode {
457                expr_type: Some(ExprType::IsUnknown(expr)),
458            }
459        }
460        Expr::IsNotTrue(expr) => {
461            let expr = Box::new(protobuf::IsNotTrue {
462                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
463            });
464            protobuf::LogicalExprNode {
465                expr_type: Some(ExprType::IsNotTrue(expr)),
466            }
467        }
468        Expr::IsNotFalse(expr) => {
469            let expr = Box::new(protobuf::IsNotFalse {
470                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
471            });
472            protobuf::LogicalExprNode {
473                expr_type: Some(ExprType::IsNotFalse(expr)),
474            }
475        }
476        Expr::IsNotUnknown(expr) => {
477            let expr = Box::new(protobuf::IsNotUnknown {
478                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
479            });
480            protobuf::LogicalExprNode {
481                expr_type: Some(ExprType::IsNotUnknown(expr)),
482            }
483        }
484        Expr::Between(Between {
485            expr,
486            negated,
487            low,
488            high,
489        }) => {
490            let expr = Box::new(protobuf::BetweenNode {
491                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
492                negated: *negated,
493                low: Some(Box::new(serialize_expr(low.as_ref(), codec)?)),
494                high: Some(Box::new(serialize_expr(high.as_ref(), codec)?)),
495            });
496            protobuf::LogicalExprNode {
497                expr_type: Some(ExprType::Between(expr)),
498            }
499        }
500        Expr::Case(case) => {
501            let when_then_expr = case
502                .when_then_expr
503                .iter()
504                .map(|(w, t)| {
505                    Ok(protobuf::WhenThen {
506                        when_expr: Some(serialize_expr(w.as_ref(), codec)?),
507                        then_expr: Some(serialize_expr(t.as_ref(), codec)?),
508                    })
509                })
510                .collect::<Result<Vec<protobuf::WhenThen>, Error>>()?;
511            let expr = Box::new(protobuf::CaseNode {
512                expr: match &case.expr {
513                    Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
514                    None => None,
515                },
516                when_then_expr,
517                else_expr: match &case.else_expr {
518                    Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)),
519                    None => None,
520                },
521            });
522            protobuf::LogicalExprNode {
523                expr_type: Some(ExprType::Case(expr)),
524            }
525        }
526        Expr::Cast(Cast { expr, field }) => {
527            let expr = Box::new(protobuf::CastNode {
528                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
529                arrow_type: Some(field.data_type().try_into()?),
530                metadata: field.metadata().clone(),
531                nullable: Some(field.is_nullable()),
532            });
533            protobuf::LogicalExprNode {
534                expr_type: Some(ExprType::Cast(expr)),
535            }
536        }
537        Expr::TryCast(TryCast { expr, field }) => {
538            let expr = Box::new(protobuf::TryCastNode {
539                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
540                arrow_type: Some(field.data_type().try_into()?),
541                metadata: field.metadata().clone(),
542                nullable: Some(field.is_nullable()),
543            });
544            protobuf::LogicalExprNode {
545                expr_type: Some(ExprType::TryCast(expr)),
546            }
547        }
548        Expr::Negative(expr) => {
549            let expr = Box::new(protobuf::NegativeNode {
550                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
551            });
552            protobuf::LogicalExprNode {
553                expr_type: Some(ExprType::Negative(expr)),
554            }
555        }
556        Expr::Unnest(Unnest { expr }) => {
557            let expr = protobuf::Unnest {
558                exprs: vec![serialize_expr(expr.as_ref(), codec)?],
559            };
560            protobuf::LogicalExprNode {
561                expr_type: Some(ExprType::Unnest(expr)),
562            }
563        }
564        Expr::InList(InList {
565            expr,
566            list,
567            negated,
568        }) => {
569            let expr = Box::new(protobuf::InListNode {
570                expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
571                list: serialize_exprs(list, codec)?,
572                negated: *negated,
573            });
574            protobuf::LogicalExprNode {
575                expr_type: Some(ExprType::InList(expr)),
576            }
577        }
578        #[expect(deprecated)]
579        Expr::Wildcard { qualifier, .. } => protobuf::LogicalExprNode {
580            expr_type: Some(ExprType::Wildcard(protobuf::Wildcard {
581                qualifier: qualifier.to_owned().map(|x| x.into()),
582            })),
583        },
584        Expr::ScalarSubquery(subquery) => protobuf::LogicalExprNode {
585            expr_type: Some(ExprType::ScalarSubqueryExpr(Box::new(
586                protobuf::ScalarSubqueryExprNode {
587                    subquery: Some(Box::new(serialize_subquery(subquery, codec)?)),
588                },
589            ))),
590        },
591        Expr::InSubquery(_)
592        | Expr::Exists(_)
593        | Expr::OuterReferenceColumn(_, _)
594        | Expr::SetComparison(_) => {
595            return Err(Error::General(format!(
596                "Proto serialization error: {expr} is not yet supported"
597            )));
598        }
599        Expr::GroupingSet(GroupingSet::Cube(exprs)) => protobuf::LogicalExprNode {
600            expr_type: Some(ExprType::Cube(CubeNode {
601                expr: serialize_exprs(exprs, codec)?,
602            })),
603        },
604        Expr::GroupingSet(GroupingSet::Rollup(exprs)) => protobuf::LogicalExprNode {
605            expr_type: Some(ExprType::Rollup(RollupNode {
606                expr: serialize_exprs(exprs, codec)?,
607            })),
608        },
609        Expr::GroupingSet(GroupingSet::GroupingSets(exprs)) => {
610            protobuf::LogicalExprNode {
611                expr_type: Some(ExprType::GroupingSet(GroupingSetNode {
612                    expr: exprs
613                        .iter()
614                        .map(|expr_list| {
615                            Ok(LogicalExprList {
616                                expr: serialize_exprs(expr_list, codec)?,
617                            })
618                        })
619                        .collect::<Result<Vec<_>, Error>>()?,
620                })),
621            }
622        }
623        Expr::Placeholder(Placeholder { id, field }) => protobuf::LogicalExprNode {
624            expr_type: Some(ExprType::Placeholder(PlaceholderNode {
625                id: id.clone(),
626                data_type: match field {
627                    Some(field) => Some(field.data_type().try_into()?),
628                    None => None,
629                },
630                nullable: field.as_ref().map(|f| f.is_nullable()),
631                metadata: field
632                    .as_ref()
633                    .map(|f| f.metadata().clone())
634                    .unwrap_or(HashMap::new()),
635            })),
636        },
637        Expr::HigherOrderFunction(_) | Expr::Lambda(_) | Expr::LambdaVariable(_) => {
638            return Err(Error::General(
639                "Proto serialization error: Lambda not implemented".to_string(),
640            ));
641        }
642    };
643
644    Ok(expr_node)
645}
646
647fn serialize_subquery(
648    subquery: &Subquery,
649    codec: &dyn LogicalExtensionCodec,
650) -> Result<protobuf::SubqueryNode, Error> {
651    let plan = LogicalPlanNode::try_from_logical_plan(&subquery.subquery, codec)
652        .map_err(|e| Error::General(e.to_string()))?;
653    let outer_ref_columns = serialize_exprs(&subquery.outer_ref_columns, codec)?;
654    Ok(protobuf::SubqueryNode {
655        subquery: Some(Box::new(plan)),
656        outer_ref_columns,
657    })
658}
659
660pub fn serialize_sorts<'a, I>(
661    sorts: I,
662    codec: &dyn LogicalExtensionCodec,
663) -> Result<Vec<protobuf::SortExprNode>, Error>
664where
665    I: IntoIterator<Item = &'a SortExpr>,
666{
667    sorts
668        .into_iter()
669        .map(|sort| {
670            let SortExpr {
671                expr,
672                asc,
673                nulls_first,
674            } = sort;
675            Ok(protobuf::SortExprNode {
676                expr: Some(serialize_expr(expr, codec)?),
677                asc: *asc,
678                nulls_first: *nulls_first,
679            })
680        })
681        .collect::<Result<Vec<_>, Error>>()
682}
683
684impl From<TableReference> for protobuf::TableReference {
685    fn from(t: TableReference) -> Self {
686        use protobuf::table_reference::TableReferenceEnum;
687        let table_reference_enum = match t {
688            TableReference::Bare { table } => {
689                TableReferenceEnum::Bare(protobuf::BareTableReference {
690                    table: table.to_string(),
691                })
692            }
693            TableReference::Partial { schema, table } => {
694                TableReferenceEnum::Partial(protobuf::PartialTableReference {
695                    schema: schema.to_string(),
696                    table: table.to_string(),
697                })
698            }
699            TableReference::Full {
700                catalog,
701                schema,
702                table,
703            } => TableReferenceEnum::Full(protobuf::FullTableReference {
704                catalog: catalog.to_string(),
705                schema: schema.to_string(),
706                table: table.to_string(),
707            }),
708        };
709
710        protobuf::TableReference {
711            table_reference_enum: Some(table_reference_enum),
712        }
713    }
714}
715
716impl From<JoinType> for protobuf::JoinType {
717    fn from(t: JoinType) -> Self {
718        match t {
719            JoinType::Inner => protobuf::JoinType::Inner,
720            JoinType::Left => protobuf::JoinType::Left,
721            JoinType::Right => protobuf::JoinType::Right,
722            JoinType::Full => protobuf::JoinType::Full,
723            JoinType::LeftSemi => protobuf::JoinType::Leftsemi,
724            JoinType::RightSemi => protobuf::JoinType::Rightsemi,
725            JoinType::LeftAnti => protobuf::JoinType::Leftanti,
726            JoinType::RightAnti => protobuf::JoinType::Rightanti,
727            JoinType::LeftMark => protobuf::JoinType::Leftmark,
728            JoinType::RightMark => protobuf::JoinType::Rightmark,
729        }
730    }
731}
732
733impl From<JoinConstraint> for protobuf::JoinConstraint {
734    fn from(t: JoinConstraint) -> Self {
735        match t {
736            JoinConstraint::On => protobuf::JoinConstraint::On,
737            JoinConstraint::Using => protobuf::JoinConstraint::Using,
738        }
739    }
740}
741
742impl From<NullEquality> for protobuf::NullEquality {
743    fn from(t: NullEquality) -> Self {
744        match t {
745            NullEquality::NullEqualsNothing => protobuf::NullEquality::NullEqualsNothing,
746            NullEquality::NullEqualsNull => protobuf::NullEquality::NullEqualsNull,
747        }
748    }
749}
750
751impl From<&WriteOp> for protobuf::dml_node::Type {
752    fn from(t: &WriteOp) -> Self {
753        match t {
754            WriteOp::Insert(InsertOp::Append) => protobuf::dml_node::Type::InsertAppend,
755            WriteOp::Insert(InsertOp::Overwrite) => {
756                protobuf::dml_node::Type::InsertOverwrite
757            }
758            WriteOp::Insert(InsertOp::Replace) => protobuf::dml_node::Type::InsertReplace,
759            WriteOp::Delete => protobuf::dml_node::Type::Delete,
760            WriteOp::Update => protobuf::dml_node::Type::Update,
761            WriteOp::Ctas => protobuf::dml_node::Type::Ctas,
762            WriteOp::Truncate => protobuf::dml_node::Type::Truncate,
763        }
764    }
765}
766
767impl From<NullTreatment> for protobuf::NullTreatment {
768    fn from(t: NullTreatment) -> Self {
769        match t {
770            NullTreatment::RespectNulls => protobuf::NullTreatment::RespectNulls,
771            NullTreatment::IgnoreNulls => protobuf::NullTreatment::IgnoreNulls,
772        }
773    }
774}