datafusion_proto/physical_plan/
to_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::array::RecordBatch;
21use arrow::datatypes::Schema;
22use arrow::ipc::writer::StreamWriter;
23use datafusion_common::{
24    DataFusionError, Result, internal_datafusion_err, internal_err, not_impl_err,
25};
26use datafusion_datasource::file_scan_config::FileScanConfig;
27use datafusion_datasource::file_sink_config::FileSink;
28use datafusion_datasource::file_sink_config::FileSinkConfig;
29use datafusion_datasource::{FileRange, PartitionedFile};
30use datafusion_datasource_csv::file_format::CsvSink;
31use datafusion_datasource_json::file_format::JsonSink;
32#[cfg(feature = "parquet")]
33use datafusion_datasource_parquet::file_format::ParquetSink;
34use datafusion_expr::WindowFrame;
35use datafusion_physical_expr::ScalarFunctionExpr;
36use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
37use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr;
38use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
39use datafusion_physical_plan::expressions::LikeExpr;
40use datafusion_physical_plan::expressions::{
41    BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
42    Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
43};
44use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr};
45use datafusion_physical_plan::udaf::AggregateFunctionExpr;
46use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
47use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
48
49use crate::protobuf::{
50    self, PhysicalSortExprNode, PhysicalSortExprNodeCollection,
51    physical_aggregate_expr_node, physical_window_expr_node,
52};
53
54use super::PhysicalExtensionCodec;
55
56#[expect(clippy::needless_pass_by_value)]
57pub fn serialize_physical_aggr_expr(
58    aggr_expr: Arc<AggregateFunctionExpr>,
59    codec: &dyn PhysicalExtensionCodec,
60) -> Result<protobuf::PhysicalExprNode> {
61    let expressions = serialize_physical_exprs(&aggr_expr.expressions(), codec)?;
62    let order_bys =
63        serialize_physical_sort_exprs(aggr_expr.order_bys().iter().cloned(), codec)?;
64
65    let name = aggr_expr.fun().name().to_string();
66    let mut buf = Vec::new();
67    codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
68    Ok(protobuf::PhysicalExprNode {
69        expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
70            protobuf::PhysicalAggregateExprNode {
71                aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)),
72                expr: expressions,
73                ordering_req: order_bys,
74                distinct: aggr_expr.is_distinct(),
75                ignore_nulls: aggr_expr.ignore_nulls(),
76                fun_definition: (!buf.is_empty()).then_some(buf),
77                human_display: aggr_expr.human_display().to_string(),
78            },
79        )),
80    })
81}
82
83fn serialize_physical_window_aggr_expr(
84    aggr_expr: &AggregateFunctionExpr,
85    _window_frame: &WindowFrame,
86    codec: &dyn PhysicalExtensionCodec,
87) -> Result<(physical_window_expr_node::WindowFunction, Option<Vec<u8>>)> {
88    // Distinct and ignore_nulls are now supported in window expressions
89
90    let mut buf = Vec::new();
91    codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
92    Ok((
93        physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(
94            aggr_expr.fun().name().to_string(),
95        ),
96        (!buf.is_empty()).then_some(buf),
97    ))
98}
99
100pub fn serialize_physical_window_expr(
101    window_expr: &Arc<dyn WindowExpr>,
102    codec: &dyn PhysicalExtensionCodec,
103) -> Result<protobuf::PhysicalWindowExprNode> {
104    let expr = window_expr.as_any();
105    let args = window_expr.expressions().to_vec();
106    let window_frame = window_expr.get_window_frame();
107
108    let (window_function, fun_definition, ignore_nulls, distinct) =
109        if let Some(plain_aggr_window_expr) =
110            expr.downcast_ref::<PlainAggregateWindowExpr>()
111        {
112            let aggr_expr = plain_aggr_window_expr.get_aggregate_expr();
113            let (window_function, fun_definition) =
114                serialize_physical_window_aggr_expr(aggr_expr, window_frame, codec)?;
115            (
116                window_function,
117                fun_definition,
118                aggr_expr.ignore_nulls(),
119                aggr_expr.is_distinct(),
120            )
121        } else if let Some(sliding_aggr_window_expr) =
122            expr.downcast_ref::<SlidingAggregateWindowExpr>()
123        {
124            let aggr_expr = sliding_aggr_window_expr.get_aggregate_expr();
125            let (window_function, fun_definition) =
126                serialize_physical_window_aggr_expr(aggr_expr, window_frame, codec)?;
127            (
128                window_function,
129                fun_definition,
130                aggr_expr.ignore_nulls(),
131                aggr_expr.is_distinct(),
132            )
133        } else if let Some(udf_window_expr) = expr.downcast_ref::<StandardWindowExpr>() {
134            if let Some(expr) = udf_window_expr
135                .get_standard_func_expr()
136                .as_any()
137                .downcast_ref::<WindowUDFExpr>()
138            {
139                let mut buf = Vec::new();
140                codec.try_encode_udwf(expr.fun(), &mut buf)?;
141                (
142                    physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(
143                        expr.fun().name().to_string(),
144                    ),
145                    (!buf.is_empty()).then_some(buf),
146                    false, // WindowUDFExpr doesn't have ignore_nulls/distinct
147                    false,
148                )
149            } else {
150                return not_impl_err!(
151                    "User-defined window function not supported: {window_expr:?}"
152                );
153            }
154        } else {
155            return not_impl_err!("WindowExpr not supported: {window_expr:?}");
156        };
157
158    let args = serialize_physical_exprs(&args, codec)?;
159    let partition_by = serialize_physical_exprs(window_expr.partition_by(), codec)?;
160    let order_by = serialize_physical_sort_exprs(window_expr.order_by().to_vec(), codec)?;
161    let window_frame: protobuf::WindowFrame = window_frame
162        .as_ref()
163        .try_into()
164        .map_err(|e| internal_datafusion_err!("{e}"))?;
165
166    Ok(protobuf::PhysicalWindowExprNode {
167        args,
168        partition_by,
169        order_by,
170        window_frame: Some(window_frame),
171        window_function: Some(window_function),
172        name: window_expr.name().to_string(),
173        fun_definition,
174        ignore_nulls,
175        distinct,
176    })
177}
178
179pub fn serialize_physical_sort_exprs<I>(
180    sort_exprs: I,
181    codec: &dyn PhysicalExtensionCodec,
182) -> Result<Vec<PhysicalSortExprNode>>
183where
184    I: IntoIterator<Item = PhysicalSortExpr>,
185{
186    sort_exprs
187        .into_iter()
188        .map(|sort_expr| serialize_physical_sort_expr(sort_expr, codec))
189        .collect()
190}
191
192pub fn serialize_physical_sort_expr(
193    sort_expr: PhysicalSortExpr,
194    codec: &dyn PhysicalExtensionCodec,
195) -> Result<PhysicalSortExprNode> {
196    let PhysicalSortExpr { expr, options } = sort_expr;
197    let expr = serialize_physical_expr(&expr, codec)?;
198    Ok(PhysicalSortExprNode {
199        expr: Some(Box::new(expr)),
200        asc: !options.descending,
201        nulls_first: options.nulls_first,
202    })
203}
204
205pub fn serialize_physical_exprs<'a, I>(
206    values: I,
207    codec: &dyn PhysicalExtensionCodec,
208) -> Result<Vec<protobuf::PhysicalExprNode>>
209where
210    I: IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
211{
212    values
213        .into_iter()
214        .map(|value| serialize_physical_expr(value, codec))
215        .collect()
216}
217
218/// Serialize a `PhysicalExpr` to default protobuf representation.
219///
220/// If required, a [`PhysicalExtensionCodec`] can be provided which can handle
221/// serialization of udfs requiring specialized serialization (see [`PhysicalExtensionCodec::try_encode_udf`])
222pub fn serialize_physical_expr(
223    value: &Arc<dyn PhysicalExpr>,
224    codec: &dyn PhysicalExtensionCodec,
225) -> Result<protobuf::PhysicalExprNode> {
226    // Snapshot the expr in case it has dynamic predicate state so
227    // it can be serialized
228    let value = snapshot_physical_expr(Arc::clone(value))?;
229    let expr = value.as_any();
230
231    // HashTableLookupExpr is used for dynamic filter pushdown in hash joins.
232    // It contains an Arc<dyn JoinHashMapType> (the build-side hash table) which
233    // cannot be serialized - the hash table is a runtime structure built during
234    // execution on the build side.
235    //
236    // We replace it with lit(true) which is safe because:
237    // 1. The filter is a performance optimization, not a correctness requirement
238    // 2. lit(true) passes all rows, so no valid rows are incorrectly filtered out
239    // 3. The join itself will still produce correct results, just without the
240    //    benefit of early filtering on the probe side
241    //
242    // In distributed execution, the remote worker won't have access to the hash
243    // table anyway, so the best we can do is skip this optimization.
244    if expr.downcast_ref::<HashTableLookupExpr>().is_some() {
245        let value = datafusion_proto_common::ScalarValue {
246            value: Some(datafusion_proto_common::scalar_value::Value::BoolValue(
247                true,
248            )),
249        };
250        return Ok(protobuf::PhysicalExprNode {
251            expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(value)),
252        });
253    }
254
255    if let Some(expr) = expr.downcast_ref::<Column>() {
256        Ok(protobuf::PhysicalExprNode {
257            expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
258                protobuf::PhysicalColumn {
259                    name: expr.name().to_string(),
260                    index: expr.index() as u32,
261                },
262            )),
263        })
264    } else if let Some(expr) = expr.downcast_ref::<UnKnownColumn>() {
265        Ok(protobuf::PhysicalExprNode {
266            expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn(
267                protobuf::UnknownColumn {
268                    name: expr.name().to_string(),
269                },
270            )),
271        })
272    } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
273        let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
274            l: Some(Box::new(serialize_physical_expr(expr.left(), codec)?)),
275            r: Some(Box::new(serialize_physical_expr(expr.right(), codec)?)),
276            op: format!("{:?}", expr.op()),
277        });
278
279        Ok(protobuf::PhysicalExprNode {
280            expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
281                binary_expr,
282            )),
283        })
284    } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
285        Ok(protobuf::PhysicalExprNode {
286            expr_type: Some(
287                protobuf::physical_expr_node::ExprType::Case(
288                    Box::new(
289                        protobuf::PhysicalCaseNode {
290                            expr: expr
291                                .expr()
292                                .map(|exp| {
293                                    serialize_physical_expr(exp, codec).map(Box::new)
294                                })
295                                .transpose()?,
296                            when_then_expr: expr
297                                .when_then_expr()
298                                .iter()
299                                .map(|(when_expr, then_expr)| {
300                                    serialize_when_then_expr(when_expr, then_expr, codec)
301                                })
302                                .collect::<Result<
303                                    Vec<protobuf::PhysicalWhenThen>,
304                                    DataFusionError,
305                                >>()?,
306                            else_expr: expr
307                                .else_expr()
308                                .map(|a| serialize_physical_expr(a, codec).map(Box::new))
309                                .transpose()?,
310                        },
311                    ),
312                ),
313            ),
314        })
315    } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
316        Ok(protobuf::PhysicalExprNode {
317            expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
318                protobuf::PhysicalNot {
319                    expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
320                },
321            ))),
322        })
323    } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
324        Ok(protobuf::PhysicalExprNode {
325            expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
326                Box::new(protobuf::PhysicalIsNull {
327                    expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
328                }),
329            )),
330        })
331    } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
332        Ok(protobuf::PhysicalExprNode {
333            expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
334                Box::new(protobuf::PhysicalIsNotNull {
335                    expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
336                }),
337            )),
338        })
339    } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
340        Ok(protobuf::PhysicalExprNode {
341            expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new(
342                protobuf::PhysicalInListNode {
343                    expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)),
344                    list: serialize_physical_exprs(expr.list(), codec)?,
345                    negated: expr.negated(),
346                },
347            ))),
348        })
349    } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
350        Ok(protobuf::PhysicalExprNode {
351            expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
352                protobuf::PhysicalNegativeNode {
353                    expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
354                },
355            ))),
356        })
357    } else if let Some(lit) = expr.downcast_ref::<Literal>() {
358        Ok(protobuf::PhysicalExprNode {
359            expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
360                lit.value().try_into()?,
361            )),
362        })
363    } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
364        Ok(protobuf::PhysicalExprNode {
365            expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
366                protobuf::PhysicalCastNode {
367                    expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)),
368                    arrow_type: Some(cast.cast_type().try_into()?),
369                },
370            ))),
371        })
372    } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
373        Ok(protobuf::PhysicalExprNode {
374            expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new(
375                protobuf::PhysicalTryCastNode {
376                    expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)),
377                    arrow_type: Some(cast.cast_type().try_into()?),
378                },
379            ))),
380        })
381    } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
382        let mut buf = Vec::new();
383        codec.try_encode_udf(expr.fun(), &mut buf)?;
384        Ok(protobuf::PhysicalExprNode {
385            expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf(
386                protobuf::PhysicalScalarUdfNode {
387                    name: expr.name().to_string(),
388                    args: serialize_physical_exprs(expr.args(), codec)?,
389                    fun_definition: (!buf.is_empty()).then_some(buf),
390                    return_type: Some(expr.return_type().try_into()?),
391                    nullable: expr.nullable(),
392                    return_field_name: expr
393                        .return_field(&Schema::empty())?
394                        .name()
395                        .to_string(),
396                },
397            )),
398        })
399    } else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
400        Ok(protobuf::PhysicalExprNode {
401            expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new(
402                protobuf::PhysicalLikeExprNode {
403                    negated: expr.negated(),
404                    case_insensitive: expr.case_insensitive(),
405                    expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)),
406                    pattern: Some(Box::new(serialize_physical_expr(
407                        expr.pattern(),
408                        codec,
409                    )?)),
410                },
411            ))),
412        })
413    } else if let Some(expr) = expr.downcast_ref::<HashExpr>() {
414        let (s0, s1, s2, s3) = expr.seeds();
415        Ok(protobuf::PhysicalExprNode {
416            expr_type: Some(protobuf::physical_expr_node::ExprType::HashExpr(
417                protobuf::PhysicalHashExprNode {
418                    on_columns: serialize_physical_exprs(expr.on_columns(), codec)?,
419                    seed0: s0,
420                    seed1: s1,
421                    seed2: s2,
422                    seed3: s3,
423                    description: expr.description().to_string(),
424                },
425            )),
426        })
427    } else {
428        let mut buf: Vec<u8> = vec![];
429        match codec.try_encode_expr(&value, &mut buf) {
430            Ok(_) => {
431                let inputs: Vec<protobuf::PhysicalExprNode> = value
432                    .children()
433                    .into_iter()
434                    .map(|e| serialize_physical_expr(e, codec))
435                    .collect::<Result<_>>()?;
436                Ok(protobuf::PhysicalExprNode {
437                    expr_type: Some(protobuf::physical_expr_node::ExprType::Extension(
438                        protobuf::PhysicalExtensionExprNode { expr: buf, inputs },
439                    )),
440                })
441            }
442            Err(e) => internal_err!(
443                "Unsupported physical expr and extension codec failed with [{e}]. Expr: {value:?}"
444            ),
445        }
446    }
447}
448
449pub fn serialize_partitioning(
450    partitioning: &Partitioning,
451    codec: &dyn PhysicalExtensionCodec,
452) -> Result<protobuf::Partitioning> {
453    let serialized_partitioning = match partitioning {
454        Partitioning::RoundRobinBatch(partition_count) => protobuf::Partitioning {
455            partition_method: Some(protobuf::partitioning::PartitionMethod::RoundRobin(
456                *partition_count as u64,
457            )),
458        },
459        Partitioning::Hash(exprs, partition_count) => {
460            let serialized_exprs = serialize_physical_exprs(exprs, codec)?;
461            protobuf::Partitioning {
462                partition_method: Some(protobuf::partitioning::PartitionMethod::Hash(
463                    protobuf::PhysicalHashRepartition {
464                        hash_expr: serialized_exprs,
465                        partition_count: *partition_count as u64,
466                    },
467                )),
468            }
469        }
470        Partitioning::UnknownPartitioning(partition_count) => protobuf::Partitioning {
471            partition_method: Some(protobuf::partitioning::PartitionMethod::Unknown(
472                *partition_count as u64,
473            )),
474        },
475    };
476    Ok(serialized_partitioning)
477}
478
479fn serialize_when_then_expr(
480    when_expr: &Arc<dyn PhysicalExpr>,
481    then_expr: &Arc<dyn PhysicalExpr>,
482    codec: &dyn PhysicalExtensionCodec,
483) -> Result<protobuf::PhysicalWhenThen> {
484    Ok(protobuf::PhysicalWhenThen {
485        when_expr: Some(serialize_physical_expr(when_expr, codec)?),
486        then_expr: Some(serialize_physical_expr(then_expr, codec)?),
487    })
488}
489
490impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile {
491    type Error = DataFusionError;
492
493    fn try_from(pf: &PartitionedFile) -> Result<Self> {
494        let last_modified = pf.object_meta.last_modified;
495        let last_modified_ns = last_modified.timestamp_nanos_opt().ok_or_else(|| {
496            DataFusionError::Plan(format!(
497                "Invalid timestamp on PartitionedFile::ObjectMeta: {last_modified}"
498            ))
499        })? as u64;
500        Ok(protobuf::PartitionedFile {
501            path: pf.object_meta.location.as_ref().to_owned(),
502            size: pf.object_meta.size,
503            last_modified_ns,
504            partition_values: pf
505                .partition_values
506                .iter()
507                .map(|v| v.try_into())
508                .collect::<Result<Vec<_>, _>>()?,
509            range: pf.range.as_ref().map(|r| r.try_into()).transpose()?,
510            statistics: pf.statistics.as_ref().map(|s| s.as_ref().into()),
511        })
512    }
513}
514
515impl TryFrom<&FileRange> for protobuf::FileRange {
516    type Error = DataFusionError;
517
518    fn try_from(value: &FileRange) -> Result<Self> {
519        Ok(protobuf::FileRange {
520            start: value.start,
521            end: value.end,
522        })
523    }
524}
525
526impl TryFrom<&[PartitionedFile]> for protobuf::FileGroup {
527    type Error = DataFusionError;
528
529    fn try_from(gr: &[PartitionedFile]) -> Result<Self, Self::Error> {
530        Ok(protobuf::FileGroup {
531            files: gr
532                .iter()
533                .map(|f| f.try_into())
534                .collect::<Result<Vec<_>, _>>()?,
535        })
536    }
537}
538
539pub fn serialize_file_scan_config(
540    conf: &FileScanConfig,
541    codec: &dyn PhysicalExtensionCodec,
542) -> Result<protobuf::FileScanExecConf> {
543    let file_groups = conf
544        .file_groups
545        .iter()
546        .map(|p| p.files().try_into())
547        .collect::<Result<Vec<_>, _>>()?;
548
549    let mut output_orderings = vec![];
550    for order in &conf.output_ordering {
551        let ordering = serialize_physical_sort_exprs(order.to_vec(), codec)?;
552        output_orderings.push(ordering)
553    }
554
555    // Fields must be added to the schema so that they can persist in the protobuf,
556    // and then they are to be removed from the schema in `parse_protobuf_file_scan_config`
557    let mut fields = conf
558        .file_schema()
559        .fields()
560        .iter()
561        .cloned()
562        .collect::<Vec<_>>();
563    fields.extend(conf.table_partition_cols().iter().cloned());
564
565    let schema = Arc::new(
566        arrow::datatypes::Schema::new(fields.clone())
567            .with_metadata(conf.file_schema().metadata.clone()),
568    );
569
570    let projection_exprs = conf
571        .file_source
572        .projection()
573        .as_ref()
574        .map(|projection_exprs| {
575            let projections = projection_exprs.iter().cloned().collect::<Vec<_>>();
576            Ok::<_, DataFusionError>(protobuf::ProjectionExprs {
577                projections: projections
578                    .into_iter()
579                    .map(|expr| {
580                        Ok(protobuf::ProjectionExpr {
581                            alias: expr.alias.to_string(),
582                            expr: Some(serialize_physical_expr(&expr.expr, codec)?),
583                        })
584                    })
585                    .collect::<Result<Vec<_>>>()?,
586            })
587        })
588        .transpose()?;
589
590    Ok(protobuf::FileScanExecConf {
591        file_groups,
592        statistics: Some((&conf.statistics()).into()),
593        limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }),
594        projection: vec![],
595        schema: Some(schema.as_ref().try_into()?),
596        table_partition_cols: conf
597            .table_partition_cols()
598            .iter()
599            .map(|x| x.name().clone())
600            .collect::<Vec<_>>(),
601        object_store_url: conf.object_store_url.to_string(),
602        output_ordering: output_orderings
603            .into_iter()
604            .map(|e| PhysicalSortExprNodeCollection {
605                physical_sort_expr_nodes: e,
606            })
607            .collect::<Vec<_>>(),
608        constraints: Some(conf.constraints.clone().into()),
609        batch_size: conf.batch_size.map(|s| s as u64),
610        projection_exprs,
611    })
612}
613
614pub fn serialize_maybe_filter(
615    expr: Option<Arc<dyn PhysicalExpr>>,
616    codec: &dyn PhysicalExtensionCodec,
617) -> Result<protobuf::MaybeFilter> {
618    match expr {
619        None => Ok(protobuf::MaybeFilter { expr: None }),
620        Some(expr) => Ok(protobuf::MaybeFilter {
621            expr: Some(serialize_physical_expr(&expr, codec)?),
622        }),
623    }
624}
625
626pub fn serialize_record_batches(batches: &[RecordBatch]) -> Result<Vec<u8>> {
627    if batches.is_empty() {
628        return Ok(vec![]);
629    }
630    let schema = batches[0].schema();
631    let mut buf = Vec::new();
632    let mut writer = StreamWriter::try_new(&mut buf, &schema)?;
633    for batch in batches {
634        writer.write(batch)?;
635    }
636    writer.finish()?;
637    Ok(buf)
638}
639
640impl TryFrom<&JsonSink> for protobuf::JsonSink {
641    type Error = DataFusionError;
642
643    fn try_from(value: &JsonSink) -> Result<Self, Self::Error> {
644        Ok(Self {
645            config: Some(value.config().try_into()?),
646            writer_options: Some(value.writer_options().try_into()?),
647        })
648    }
649}
650
651impl TryFrom<&CsvSink> for protobuf::CsvSink {
652    type Error = DataFusionError;
653
654    fn try_from(value: &CsvSink) -> Result<Self, Self::Error> {
655        Ok(Self {
656            config: Some(value.config().try_into()?),
657            writer_options: Some(value.writer_options().try_into()?),
658        })
659    }
660}
661
662#[cfg(feature = "parquet")]
663impl TryFrom<&ParquetSink> for protobuf::ParquetSink {
664    type Error = DataFusionError;
665
666    fn try_from(value: &ParquetSink) -> Result<Self, Self::Error> {
667        Ok(Self {
668            config: Some(value.config().try_into()?),
669            parquet_options: Some(value.parquet_options().try_into()?),
670        })
671    }
672}
673
674impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
675    type Error = DataFusionError;
676
677    fn try_from(conf: &FileSinkConfig) -> Result<Self, Self::Error> {
678        let file_groups = conf
679            .file_group
680            .iter()
681            .map(TryInto::try_into)
682            .collect::<Result<Vec<_>>>()?;
683        let table_paths = conf
684            .table_paths
685            .iter()
686            .map(ToString::to_string)
687            .collect::<Vec<_>>();
688        let table_partition_cols = conf
689            .table_partition_cols
690            .iter()
691            .map(|(name, data_type)| {
692                Ok(protobuf::PartitionColumn {
693                    name: name.to_owned(),
694                    arrow_type: Some(data_type.try_into()?),
695                })
696            })
697            .collect::<Result<Vec<_>>>()?;
698        Ok(Self {
699            object_store_url: conf.object_store_url.to_string(),
700            file_groups,
701            table_paths,
702            output_schema: Some(conf.output_schema.as_ref().try_into()?),
703            table_partition_cols,
704            keep_partition_by_columns: conf.keep_partition_by_columns,
705            insert_op: conf.insert_op as i32,
706            file_extension: conf.file_extension.to_string(),
707        })
708    }
709}