Skip to main content

datafusion_proto/physical_plan/
to_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::array::RecordBatch;
21use arrow::datatypes::Schema;
22use arrow::ipc::writer::StreamWriter;
23use datafusion_common::{
24    DataFusionError, Result, internal_datafusion_err, internal_err, not_impl_err,
25};
26use datafusion_datasource::file_scan_config::FileScanConfig;
27use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
28use datafusion_datasource::{FileRange, PartitionedFile};
29use datafusion_datasource_csv::file_format::CsvSink;
30use datafusion_datasource_json::file_format::JsonSink;
31#[cfg(feature = "parquet")]
32use datafusion_datasource_parquet::file_format::ParquetSink;
33use datafusion_expr::WindowFrame;
34use datafusion_physical_expr::ScalarFunctionExpr;
35use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr;
36use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
37use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
38use datafusion_physical_plan::expressions::{
39    BinaryExpr, CaseExpr, CastExpr, Column, DynamicFilterPhysicalExpr, InListExpr,
40    IsNotNullExpr, IsNullExpr, LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr,
41    UnKnownColumn,
42};
43use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr};
44use datafusion_physical_plan::udaf::AggregateFunctionExpr;
45use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
46use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
47
48use super::{
49    DefaultPhysicalProtoConverter, PhysicalExtensionCodec,
50    PhysicalProtoConverterExtension, encode_human_display_alias,
51};
52use crate::protobuf::{
53    self, PhysicalSortExprNode, PhysicalSortExprNodeCollection,
54    physical_aggregate_expr_node, physical_window_expr_node,
55};
56
57#[expect(clippy::needless_pass_by_value)]
58pub fn serialize_physical_aggr_expr(
59    aggr_expr: Arc<AggregateFunctionExpr>,
60    codec: &dyn PhysicalExtensionCodec,
61    proto_converter: &dyn PhysicalProtoConverterExtension,
62) -> Result<protobuf::PhysicalExprNode> {
63    let expressions =
64        serialize_physical_exprs(&aggr_expr.expressions(), codec, proto_converter)?;
65    let order_bys = serialize_physical_sort_exprs(
66        aggr_expr.order_bys().iter().cloned(),
67        codec,
68        proto_converter,
69    )?;
70
71    let name = aggr_expr.fun().name().to_string();
72    let mut buf = Vec::new();
73    codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
74    let human_display = match (aggr_expr.human_display(), aggr_expr.human_display_alias())
75    {
76        (Some(display), Some(alias)) => encode_human_display_alias(display, alias),
77        (Some(display), None) => display.to_string(),
78        (None, _) => String::new(),
79    };
80    Ok(protobuf::PhysicalExprNode {
81        expr_id: None,
82        expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
83            protobuf::PhysicalAggregateExprNode {
84                aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)),
85                expr: expressions,
86                ordering_req: order_bys,
87                distinct: aggr_expr.is_distinct(),
88                ignore_nulls: aggr_expr.ignore_nulls(),
89                fun_definition: (!buf.is_empty()).then_some(buf),
90                human_display,
91            },
92        )),
93    })
94}
95
96fn serialize_physical_window_aggr_expr(
97    aggr_expr: &AggregateFunctionExpr,
98    _window_frame: &WindowFrame,
99    codec: &dyn PhysicalExtensionCodec,
100) -> Result<(physical_window_expr_node::WindowFunction, Option<Vec<u8>>)> {
101    // Distinct and ignore_nulls are now supported in window expressions
102
103    let mut buf = Vec::new();
104    codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
105    Ok((
106        physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(
107            aggr_expr.fun().name().to_string(),
108        ),
109        (!buf.is_empty()).then_some(buf),
110    ))
111}
112
113pub fn serialize_physical_window_expr(
114    window_expr: &Arc<dyn WindowExpr>,
115    codec: &dyn PhysicalExtensionCodec,
116    proto_converter: &dyn PhysicalProtoConverterExtension,
117) -> Result<protobuf::PhysicalWindowExprNode> {
118    let expr = window_expr.as_any();
119    let mut args = window_expr.expressions().to_vec();
120    let window_frame = window_expr.get_window_frame();
121
122    let (window_function, fun_definition, ignore_nulls, distinct) =
123        if let Some(plain_aggr_window_expr) =
124            expr.downcast_ref::<PlainAggregateWindowExpr>()
125        {
126            let aggr_expr = plain_aggr_window_expr.get_aggregate_expr();
127            let (window_function, fun_definition) =
128                serialize_physical_window_aggr_expr(aggr_expr, window_frame, codec)?;
129            (
130                window_function,
131                fun_definition,
132                aggr_expr.ignore_nulls(),
133                aggr_expr.is_distinct(),
134            )
135        } else if let Some(sliding_aggr_window_expr) =
136            expr.downcast_ref::<SlidingAggregateWindowExpr>()
137        {
138            let aggr_expr = sliding_aggr_window_expr.get_aggregate_expr();
139            let (window_function, fun_definition) =
140                serialize_physical_window_aggr_expr(aggr_expr, window_frame, codec)?;
141            (
142                window_function,
143                fun_definition,
144                aggr_expr.ignore_nulls(),
145                aggr_expr.is_distinct(),
146            )
147        } else if let Some(udf_window_expr) = expr.downcast_ref::<StandardWindowExpr>() {
148            if let Some(expr) = udf_window_expr
149                .get_standard_func_expr()
150                .as_any()
151                .downcast_ref::<WindowUDFExpr>()
152            {
153                let mut buf = Vec::new();
154                codec.try_encode_udwf(expr.fun(), &mut buf)?;
155                args = expr.args().to_vec();
156                (
157                    physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(
158                        expr.fun().name().to_string(),
159                    ),
160                    (!buf.is_empty()).then_some(buf),
161                    false, // WindowUDFExpr doesn't have ignore_nulls/distinct
162                    false,
163                )
164            } else {
165                return not_impl_err!(
166                    "User-defined window function not supported: {window_expr:?}"
167                );
168            }
169        } else {
170            return not_impl_err!("WindowExpr not supported: {window_expr:?}");
171        };
172
173    let args = serialize_physical_exprs(&args, codec, proto_converter)?;
174    let partition_by =
175        serialize_physical_exprs(window_expr.partition_by(), codec, proto_converter)?;
176    let order_by = serialize_physical_sort_exprs(
177        window_expr.order_by().to_vec(),
178        codec,
179        proto_converter,
180    )?;
181    let window_frame: protobuf::WindowFrame = window_frame
182        .as_ref()
183        .try_into()
184        .map_err(|e| internal_datafusion_err!("{e}"))?;
185
186    Ok(protobuf::PhysicalWindowExprNode {
187        args,
188        partition_by,
189        order_by,
190        window_frame: Some(window_frame),
191        window_function: Some(window_function),
192        name: window_expr.name().to_string(),
193        fun_definition,
194        ignore_nulls,
195        distinct,
196    })
197}
198
199pub fn serialize_physical_sort_exprs<I>(
200    sort_exprs: I,
201    codec: &dyn PhysicalExtensionCodec,
202    proto_converter: &dyn PhysicalProtoConverterExtension,
203) -> Result<Vec<PhysicalSortExprNode>>
204where
205    I: IntoIterator<Item = PhysicalSortExpr>,
206{
207    sort_exprs
208        .into_iter()
209        .map(|sort_expr| serialize_physical_sort_expr(sort_expr, codec, proto_converter))
210        .collect()
211}
212
213pub fn serialize_physical_sort_expr(
214    sort_expr: PhysicalSortExpr,
215    codec: &dyn PhysicalExtensionCodec,
216    proto_converter: &dyn PhysicalProtoConverterExtension,
217) -> Result<PhysicalSortExprNode> {
218    let PhysicalSortExpr { expr, options } = sort_expr;
219    let expr = proto_converter.physical_expr_to_proto(&expr, codec)?;
220    Ok(PhysicalSortExprNode {
221        expr: Some(Box::new(expr)),
222        asc: !options.descending,
223        nulls_first: options.nulls_first,
224    })
225}
226
227pub fn serialize_physical_exprs<'a, I>(
228    values: I,
229    codec: &dyn PhysicalExtensionCodec,
230    proto_converter: &dyn PhysicalProtoConverterExtension,
231) -> Result<Vec<protobuf::PhysicalExprNode>>
232where
233    I: IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
234{
235    values
236        .into_iter()
237        .map(|value| proto_converter.physical_expr_to_proto(value, codec))
238        .collect()
239}
240
241/// Serialize a `PhysicalExpr` to default protobuf representation.
242///
243/// If required, a [`PhysicalExtensionCodec`] can be provided which can handle
244/// serialization of udfs requiring specialized serialization (see [`PhysicalExtensionCodec::try_encode_udf`])
245pub fn serialize_physical_expr(
246    value: &Arc<dyn PhysicalExpr>,
247    codec: &dyn PhysicalExtensionCodec,
248) -> Result<protobuf::PhysicalExprNode> {
249    serialize_physical_expr_with_converter(
250        value,
251        codec,
252        &DefaultPhysicalProtoConverter {},
253    )
254}
255
256/// Serialize a `PhysicalExpr` to default protobuf representation.
257///
258/// If required, a [`PhysicalExtensionCodec`] can be provided which can handle
259/// serialization of udfs requiring specialized serialization (see [`PhysicalExtensionCodec::try_encode_udf`]).
260/// A [`PhysicalProtoConverterExtension`] can be provided to handle the
261/// conversion process (see [`PhysicalProtoConverterExtension::physical_expr_to_proto`]).
262pub fn serialize_physical_expr_with_converter(
263    value: &Arc<dyn PhysicalExpr>,
264    codec: &dyn PhysicalExtensionCodec,
265    proto_converter: &dyn PhysicalProtoConverterExtension,
266) -> Result<protobuf::PhysicalExprNode> {
267    let expr = value.as_ref();
268    let expr_id = value.expression_id();
269    // HashTableLookupExpr is used for dynamic filter pushdown in hash joins.
270    // It contains an Arc<dyn JoinHashMapType> (the build-side hash table) which
271    // cannot be serialized - the hash table is a runtime structure built during
272    // execution on the build side.
273    //
274    // We replace it with lit(true) which is safe because:
275    // 1. The filter is a performance optimization, not a correctness requirement
276    // 2. lit(true) passes all rows, so no valid rows are incorrectly filtered out
277    // 3. The join itself will still produce correct results, just without the
278    //    benefit of early filtering on the probe side
279    //
280    // In distributed execution, the remote worker won't have access to the hash
281    // table anyway, so the best we can do is skip this optimization.
282    if expr.downcast_ref::<HashTableLookupExpr>().is_some() {
283        let value = datafusion_proto_common::ScalarValue {
284            value: Some(datafusion_proto_common::scalar_value::Value::BoolValue(
285                true,
286            )),
287        };
288        return Ok(protobuf::PhysicalExprNode {
289            expr_id,
290            expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(value)),
291        });
292    }
293
294    if let Some(expr) = expr.downcast_ref::<Column>() {
295        Ok(protobuf::PhysicalExprNode {
296            expr_id,
297            expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
298                protobuf::PhysicalColumn {
299                    name: expr.name().to_string(),
300                    index: expr.index() as u32,
301                },
302            )),
303        })
304    } else if let Some(expr) = expr.downcast_ref::<UnKnownColumn>() {
305        Ok(protobuf::PhysicalExprNode {
306            expr_id,
307            expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn(
308                protobuf::UnknownColumn {
309                    name: expr.name().to_string(),
310                },
311            )),
312        })
313    } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
314        // Linearize a nested binary expression tree of the same operator
315        // into a flat vector of operands to avoid deep recursion in proto.
316        let op = expr.op();
317        let mut operand_refs: Vec<&Arc<dyn PhysicalExpr>> = vec![expr.right()];
318        let mut current_expr: &BinaryExpr = expr;
319        loop {
320            match current_expr.left().downcast_ref::<BinaryExpr>() {
321                Some(bin) if bin.op() == op => {
322                    operand_refs.push(bin.right());
323                    current_expr = bin;
324                }
325                _ => {
326                    operand_refs.push(current_expr.left());
327                    break;
328                }
329            }
330        }
331
332        // Reverse so operands are ordered from left innermost to right outermost
333        operand_refs.reverse();
334
335        let operands = operand_refs
336            .iter()
337            .map(|e| proto_converter.physical_expr_to_proto(e, codec))
338            .collect::<Result<Vec<_>>>()?;
339
340        let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
341            l: None,
342            r: None,
343            op: format!("{:?}", op),
344            operands,
345        });
346
347        Ok(protobuf::PhysicalExprNode {
348            expr_id,
349            expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
350                binary_expr,
351            )),
352        })
353    } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
354        Ok(protobuf::PhysicalExprNode {
355            expr_id,
356            expr_type: Some(
357                protobuf::physical_expr_node::ExprType::Case(
358                    Box::new(
359                        protobuf::PhysicalCaseNode {
360                            expr: expr
361                                .expr()
362                                .map(|exp| {
363                                    proto_converter
364                                        .physical_expr_to_proto(exp, codec)
365                                        .map(Box::new)
366                                })
367                                .transpose()?,
368                            when_then_expr: expr
369                                .when_then_expr()
370                                .iter()
371                                .map(|(when_expr, then_expr)| {
372                                    serialize_when_then_expr(
373                                        when_expr,
374                                        then_expr,
375                                        codec,
376                                        proto_converter,
377                                    )
378                                })
379                                .collect::<Result<
380                                    Vec<protobuf::PhysicalWhenThen>,
381                                    DataFusionError,
382                                >>()?,
383                            else_expr: expr
384                                .else_expr()
385                                .map(|a| {
386                                    proto_converter
387                                        .physical_expr_to_proto(a, codec)
388                                        .map(Box::new)
389                                })
390                                .transpose()?,
391                        },
392                    ),
393                ),
394            ),
395        })
396    } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
397        Ok(protobuf::PhysicalExprNode {
398            expr_id,
399            expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
400                protobuf::PhysicalNot {
401                    expr: Some(Box::new(
402                        proto_converter.physical_expr_to_proto(expr.arg(), codec)?,
403                    )),
404                },
405            ))),
406        })
407    } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
408        Ok(protobuf::PhysicalExprNode {
409            expr_id,
410            expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
411                Box::new(protobuf::PhysicalIsNull {
412                    expr: Some(Box::new(
413                        proto_converter.physical_expr_to_proto(expr.arg(), codec)?,
414                    )),
415                }),
416            )),
417        })
418    } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
419        Ok(protobuf::PhysicalExprNode {
420            expr_id,
421            expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
422                Box::new(protobuf::PhysicalIsNotNull {
423                    expr: Some(Box::new(
424                        proto_converter.physical_expr_to_proto(expr.arg(), codec)?,
425                    )),
426                }),
427            )),
428        })
429    } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
430        Ok(protobuf::PhysicalExprNode {
431            expr_id,
432            expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new(
433                protobuf::PhysicalInListNode {
434                    expr: Some(Box::new(
435                        proto_converter.physical_expr_to_proto(expr.expr(), codec)?,
436                    )),
437                    list: serialize_physical_exprs(expr.list(), codec, proto_converter)?,
438                    negated: expr.negated(),
439                },
440            ))),
441        })
442    } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
443        Ok(protobuf::PhysicalExprNode {
444            expr_id,
445            expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
446                protobuf::PhysicalNegativeNode {
447                    expr: Some(Box::new(
448                        proto_converter.physical_expr_to_proto(expr.arg(), codec)?,
449                    )),
450                },
451            ))),
452        })
453    } else if let Some(lit) = expr.downcast_ref::<Literal>() {
454        Ok(protobuf::PhysicalExprNode {
455            expr_id,
456            expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
457                lit.value().try_into()?,
458            )),
459        })
460    } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
461        Ok(protobuf::PhysicalExprNode {
462            expr_id,
463            expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
464                protobuf::PhysicalCastNode {
465                    expr: Some(Box::new(
466                        proto_converter.physical_expr_to_proto(cast.expr(), codec)?,
467                    )),
468                    arrow_type: Some(cast.cast_type().try_into()?),
469                },
470            ))),
471        })
472    } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
473        Ok(protobuf::PhysicalExprNode {
474            expr_id,
475            expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new(
476                protobuf::PhysicalTryCastNode {
477                    expr: Some(Box::new(
478                        proto_converter.physical_expr_to_proto(cast.expr(), codec)?,
479                    )),
480                    arrow_type: Some(cast.cast_type().try_into()?),
481                },
482            ))),
483        })
484    } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
485        let mut buf = Vec::new();
486        codec.try_encode_udf(expr.fun(), &mut buf)?;
487        Ok(protobuf::PhysicalExprNode {
488            expr_id,
489            expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf(
490                protobuf::PhysicalScalarUdfNode {
491                    name: expr.name().to_string(),
492                    args: serialize_physical_exprs(expr.args(), codec, proto_converter)?,
493                    fun_definition: (!buf.is_empty()).then_some(buf),
494                    return_type: Some(expr.return_type().try_into()?),
495                    nullable: expr.nullable(),
496                    return_field_name: expr
497                        .return_field(&Schema::empty())?
498                        .name()
499                        .to_string(),
500                },
501            )),
502        })
503    } else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
504        Ok(protobuf::PhysicalExprNode {
505            expr_id,
506            expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new(
507                protobuf::PhysicalLikeExprNode {
508                    negated: expr.negated(),
509                    case_insensitive: expr.case_insensitive(),
510                    expr: Some(Box::new(
511                        proto_converter.physical_expr_to_proto(expr.expr(), codec)?,
512                    )),
513                    pattern: Some(Box::new(
514                        proto_converter.physical_expr_to_proto(expr.pattern(), codec)?,
515                    )),
516                },
517            ))),
518        })
519    } else if let Some(expr) = expr.downcast_ref::<HashExpr>() {
520        Ok(protobuf::PhysicalExprNode {
521            expr_id,
522            expr_type: Some(protobuf::physical_expr_node::ExprType::HashExpr(
523                protobuf::PhysicalHashExprNode {
524                    on_columns: serialize_physical_exprs(
525                        expr.on_columns(),
526                        codec,
527                        proto_converter,
528                    )?,
529                    seed0: expr.seed(),
530                    description: expr.description().to_string(),
531                },
532            )),
533        })
534    } else if let Some(expr) = expr.downcast_ref::<ScalarSubqueryExpr>() {
535        Ok(protobuf::PhysicalExprNode {
536            expr_id,
537            expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarSubquery(
538                protobuf::PhysicalScalarSubqueryExprNode {
539                    data_type: Some(expr.data_type().try_into()?),
540                    nullable: expr.nullable(),
541                    index: expr.index().as_usize() as u32,
542                },
543            )),
544        })
545    } else if let Some(df) = expr.downcast_ref::<DynamicFilterPhysicalExpr>() {
546        let children = df
547            .original_children()
548            .iter()
549            .map(|child| proto_converter.physical_expr_to_proto(child, codec))
550            .collect::<Result<Vec<_>>>()?;
551
552        let remapped_children = if let Some(remapped) = df.remapped_children() {
553            remapped
554                .iter()
555                .map(|child| proto_converter.physical_expr_to_proto(child, codec))
556                .collect::<Result<Vec<_>>>()?
557        } else {
558            vec![]
559        };
560
561        // Atomic snapshot of inner state.
562        let inner = df.inner();
563        let inner_expr =
564            Box::new(proto_converter.physical_expr_to_proto(&inner.expr, codec)?);
565
566        Ok(protobuf::PhysicalExprNode {
567            expr_id,
568            expr_type: Some(protobuf::physical_expr_node::ExprType::DynamicFilter(
569                Box::new(protobuf::PhysicalDynamicFilterNode {
570                    children,
571                    remapped_children,
572                    generation: inner.generation,
573                    inner_expr: Some(inner_expr),
574                    is_complete: inner.is_complete,
575                }),
576            )),
577        })
578    } else {
579        let mut buf: Vec<u8> = vec![];
580        match codec.try_encode_expr(value, &mut buf) {
581            Ok(_) => {
582                let inputs: Vec<protobuf::PhysicalExprNode> = value
583                    .children()
584                    .into_iter()
585                    .map(|e| proto_converter.physical_expr_to_proto(e, codec))
586                    .collect::<Result<_>>()?;
587                Ok(protobuf::PhysicalExprNode {
588                    expr_id,
589                    expr_type: Some(protobuf::physical_expr_node::ExprType::Extension(
590                        protobuf::PhysicalExtensionExprNode { expr: buf, inputs },
591                    )),
592                })
593            }
594            Err(e) => internal_err!(
595                "Unsupported physical expr and extension codec failed with [{e}]. Expr: {value:?}"
596            ),
597        }
598    }
599}
600
601pub fn serialize_partitioning(
602    partitioning: &Partitioning,
603    codec: &dyn PhysicalExtensionCodec,
604    proto_converter: &dyn PhysicalProtoConverterExtension,
605) -> Result<protobuf::Partitioning> {
606    let serialized_partitioning = match partitioning {
607        Partitioning::RoundRobinBatch(partition_count) => protobuf::Partitioning {
608            partition_method: Some(protobuf::partitioning::PartitionMethod::RoundRobin(
609                *partition_count as u64,
610            )),
611        },
612        Partitioning::Hash(exprs, partition_count) => {
613            let serialized_exprs =
614                serialize_physical_exprs(exprs, codec, proto_converter)?;
615            protobuf::Partitioning {
616                partition_method: Some(protobuf::partitioning::PartitionMethod::Hash(
617                    protobuf::PhysicalHashRepartition {
618                        hash_expr: serialized_exprs,
619                        partition_count: *partition_count as u64,
620                    },
621                )),
622            }
623        }
624        Partitioning::UnknownPartitioning(partition_count) => protobuf::Partitioning {
625            partition_method: Some(protobuf::partitioning::PartitionMethod::Unknown(
626                *partition_count as u64,
627            )),
628        },
629    };
630    Ok(serialized_partitioning)
631}
632
633fn serialize_when_then_expr(
634    when_expr: &Arc<dyn PhysicalExpr>,
635    then_expr: &Arc<dyn PhysicalExpr>,
636    codec: &dyn PhysicalExtensionCodec,
637    proto_converter: &dyn PhysicalProtoConverterExtension,
638) -> Result<protobuf::PhysicalWhenThen> {
639    Ok(protobuf::PhysicalWhenThen {
640        when_expr: Some(proto_converter.physical_expr_to_proto(when_expr, codec)?),
641        then_expr: Some(proto_converter.physical_expr_to_proto(then_expr, codec)?),
642    })
643}
644
645impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile {
646    type Error = DataFusionError;
647
648    fn try_from(pf: &PartitionedFile) -> Result<Self> {
649        let last_modified = pf.object_meta.last_modified;
650        let last_modified_ns = last_modified.timestamp_nanos_opt().ok_or_else(|| {
651            DataFusionError::Plan(format!(
652                "Invalid timestamp on PartitionedFile::ObjectMeta: {last_modified}"
653            ))
654        })? as u64;
655        Ok(protobuf::PartitionedFile {
656            path: pf.object_meta.location.as_ref().to_owned(),
657            size: pf.object_meta.size,
658            last_modified_ns,
659            partition_values: pf
660                .partition_values
661                .iter()
662                .map(|v| v.try_into())
663                .collect::<Result<Vec<_>, _>>()?,
664            range: pf.range.as_ref().map(|r| r.try_into()).transpose()?,
665            statistics: pf.statistics.as_ref().map(|s| s.as_ref().into()),
666        })
667    }
668}
669
670impl TryFrom<&FileRange> for protobuf::FileRange {
671    type Error = DataFusionError;
672
673    fn try_from(value: &FileRange) -> Result<Self> {
674        Ok(protobuf::FileRange {
675            start: value.start,
676            end: value.end,
677        })
678    }
679}
680
681impl TryFrom<&[PartitionedFile]> for protobuf::FileGroup {
682    type Error = DataFusionError;
683
684    fn try_from(gr: &[PartitionedFile]) -> Result<Self, Self::Error> {
685        Ok(protobuf::FileGroup {
686            files: gr
687                .iter()
688                .map(|f| f.try_into())
689                .collect::<Result<Vec<_>, _>>()?,
690        })
691    }
692}
693
694pub fn serialize_file_scan_config(
695    conf: &FileScanConfig,
696    codec: &dyn PhysicalExtensionCodec,
697    proto_converter: &dyn PhysicalProtoConverterExtension,
698) -> Result<protobuf::FileScanExecConf> {
699    let file_groups = conf
700        .file_groups
701        .iter()
702        .map(|p| p.files().try_into())
703        .collect::<Result<Vec<_>, _>>()?;
704
705    let mut output_orderings = vec![];
706    for order in &conf.output_ordering {
707        let ordering =
708            serialize_physical_sort_exprs(order.to_vec(), codec, proto_converter)?;
709        output_orderings.push(ordering)
710    }
711
712    // Fields must be added to the schema so that they can persist in the protobuf,
713    // and then they are to be removed from the schema in `parse_protobuf_file_scan_config`
714    let mut fields = conf
715        .file_schema()
716        .fields()
717        .iter()
718        .cloned()
719        .collect::<Vec<_>>();
720    fields.extend(conf.table_partition_cols().iter().cloned());
721
722    let schema = Arc::new(
723        Schema::new(fields.clone()).with_metadata(conf.file_schema().metadata.clone()),
724    );
725
726    let projection_exprs = conf
727        .file_source
728        .projection()
729        .as_ref()
730        .map(|projection_exprs| {
731            let projections = projection_exprs.iter().cloned().collect::<Vec<_>>();
732            Ok::<_, DataFusionError>(protobuf::ProjectionExprs {
733                projections: projections
734                    .into_iter()
735                    .map(|expr| {
736                        Ok(protobuf::ProjectionExpr {
737                            alias: expr.alias.to_string(),
738                            expr: Some(
739                                proto_converter
740                                    .physical_expr_to_proto(&expr.expr, codec)?,
741                            ),
742                        })
743                    })
744                    .collect::<Result<Vec<_>>>()?,
745            })
746        })
747        .transpose()?;
748
749    Ok(protobuf::FileScanExecConf {
750        file_groups,
751        statistics: Some((&conf.statistics()).into()),
752        limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }),
753        projection: vec![],
754        schema: Some(schema.as_ref().try_into()?),
755        table_partition_cols: conf
756            .table_partition_cols()
757            .iter()
758            .map(|x| x.name().clone())
759            .collect::<Vec<_>>(),
760        object_store_url: conf.object_store_url.to_string(),
761        output_ordering: output_orderings
762            .into_iter()
763            .map(|e| PhysicalSortExprNodeCollection {
764                physical_sort_expr_nodes: e,
765            })
766            .collect::<Vec<_>>(),
767        constraints: Some(conf.constraints.clone().into()),
768        batch_size: conf.batch_size.map(|s| s as u64),
769        projection_exprs,
770    })
771}
772
773pub fn serialize_maybe_filter(
774    expr: Option<Arc<dyn PhysicalExpr>>,
775    codec: &dyn PhysicalExtensionCodec,
776    proto_converter: &dyn PhysicalProtoConverterExtension,
777) -> Result<protobuf::MaybeFilter> {
778    match expr {
779        None => Ok(protobuf::MaybeFilter { expr: None }),
780        Some(expr) => Ok(protobuf::MaybeFilter {
781            expr: Some(proto_converter.physical_expr_to_proto(&expr, codec)?),
782        }),
783    }
784}
785
786pub fn serialize_record_batches(batches: &[RecordBatch]) -> Result<Vec<u8>> {
787    if batches.is_empty() {
788        return Ok(vec![]);
789    }
790    let schema = batches[0].schema();
791    let mut buf = Vec::new();
792    let mut writer = StreamWriter::try_new(&mut buf, &schema)?;
793    for batch in batches {
794        writer.write(batch)?;
795    }
796    writer.finish()?;
797    Ok(buf)
798}
799
800impl TryFrom<&JsonSink> for protobuf::JsonSink {
801    type Error = DataFusionError;
802
803    fn try_from(value: &JsonSink) -> Result<Self, Self::Error> {
804        Ok(Self {
805            config: Some(value.config().try_into()?),
806            writer_options: Some(value.writer_options().try_into()?),
807        })
808    }
809}
810
811impl TryFrom<&CsvSink> for protobuf::CsvSink {
812    type Error = DataFusionError;
813
814    fn try_from(value: &CsvSink) -> Result<Self, Self::Error> {
815        Ok(Self {
816            config: Some(value.config().try_into()?),
817            writer_options: Some(value.writer_options().try_into()?),
818        })
819    }
820}
821
822#[cfg(feature = "parquet")]
823impl TryFrom<&ParquetSink> for protobuf::ParquetSink {
824    type Error = DataFusionError;
825
826    fn try_from(value: &ParquetSink) -> Result<Self, Self::Error> {
827        Ok(Self {
828            config: Some(value.config().try_into()?),
829            parquet_options: Some(value.parquet_options().try_into()?),
830        })
831    }
832}
833
834impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
835    type Error = DataFusionError;
836
837    fn try_from(conf: &FileSinkConfig) -> Result<Self, Self::Error> {
838        let file_groups = conf
839            .file_group
840            .iter()
841            .map(TryInto::try_into)
842            .collect::<Result<Vec<_>>>()?;
843        let table_paths = conf
844            .table_paths
845            .iter()
846            .map(ToString::to_string)
847            .collect::<Vec<_>>();
848        let table_partition_cols = conf
849            .table_partition_cols
850            .iter()
851            .map(|(name, data_type)| {
852                Ok(protobuf::PartitionColumn {
853                    name: name.to_owned(),
854                    arrow_type: Some(data_type.try_into()?),
855                })
856            })
857            .collect::<Result<Vec<_>>>()?;
858        let file_output_mode = match conf.file_output_mode {
859            datafusion_datasource::file_sink_config::FileOutputMode::Automatic => {
860                protobuf::FileOutputMode::Automatic
861            }
862            datafusion_datasource::file_sink_config::FileOutputMode::SingleFile => {
863                protobuf::FileOutputMode::SingleFile
864            }
865            datafusion_datasource::file_sink_config::FileOutputMode::Directory => {
866                protobuf::FileOutputMode::Directory
867            }
868        };
869        Ok(Self {
870            object_store_url: conf.object_store_url.to_string(),
871            file_groups,
872            table_paths,
873            output_schema: Some(conf.output_schema.as_ref().try_into()?),
874            table_partition_cols,
875            keep_partition_by_columns: conf.keep_partition_by_columns,
876            insert_op: conf.insert_op as i32,
877            file_extension: conf.file_extension.to_string(),
878            file_output_mode: file_output_mode.into(),
879        })
880    }
881}