Skip to main content

datafusion_proto/physical_plan/
to_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::array::RecordBatch;
21use arrow::datatypes::Schema;
22use arrow::ipc::writer::StreamWriter;
23use datafusion_common::{
24    DataFusionError, Result, internal_datafusion_err, internal_err, not_impl_err,
25};
26use datafusion_datasource::file_scan_config::FileScanConfig;
27use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
28use datafusion_datasource::{FileRange, PartitionedFile};
29use datafusion_datasource_csv::file_format::CsvSink;
30use datafusion_datasource_json::file_format::JsonSink;
31#[cfg(feature = "parquet")]
32use datafusion_datasource_parquet::file_format::ParquetSink;
33use datafusion_expr::WindowFrame;
34use datafusion_physical_expr::ScalarFunctionExpr;
35use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
36use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr;
37use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
38use datafusion_physical_plan::expressions::{
39    BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
40    LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
41};
42use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr};
43use datafusion_physical_plan::udaf::AggregateFunctionExpr;
44use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
45use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
46
47use super::{
48    DefaultPhysicalProtoConverter, PhysicalExtensionCodec,
49    PhysicalProtoConverterExtension,
50};
51use crate::protobuf::{
52    self, PhysicalSortExprNode, PhysicalSortExprNodeCollection,
53    physical_aggregate_expr_node, physical_window_expr_node,
54};
55
56#[expect(clippy::needless_pass_by_value)]
57pub fn serialize_physical_aggr_expr(
58    aggr_expr: Arc<AggregateFunctionExpr>,
59    codec: &dyn PhysicalExtensionCodec,
60    proto_converter: &dyn PhysicalProtoConverterExtension,
61) -> Result<protobuf::PhysicalExprNode> {
62    let expressions =
63        serialize_physical_exprs(&aggr_expr.expressions(), codec, proto_converter)?;
64    let order_bys = serialize_physical_sort_exprs(
65        aggr_expr.order_bys().iter().cloned(),
66        codec,
67        proto_converter,
68    )?;
69
70    let name = aggr_expr.fun().name().to_string();
71    let mut buf = Vec::new();
72    codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
73    Ok(protobuf::PhysicalExprNode {
74        expr_id: None,
75        expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
76            protobuf::PhysicalAggregateExprNode {
77                aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)),
78                expr: expressions,
79                ordering_req: order_bys,
80                distinct: aggr_expr.is_distinct(),
81                ignore_nulls: aggr_expr.ignore_nulls(),
82                fun_definition: (!buf.is_empty()).then_some(buf),
83                human_display: aggr_expr.human_display().to_string(),
84            },
85        )),
86    })
87}
88
89fn serialize_physical_window_aggr_expr(
90    aggr_expr: &AggregateFunctionExpr,
91    _window_frame: &WindowFrame,
92    codec: &dyn PhysicalExtensionCodec,
93) -> Result<(physical_window_expr_node::WindowFunction, Option<Vec<u8>>)> {
94    // Distinct and ignore_nulls are now supported in window expressions
95
96    let mut buf = Vec::new();
97    codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
98    Ok((
99        physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(
100            aggr_expr.fun().name().to_string(),
101        ),
102        (!buf.is_empty()).then_some(buf),
103    ))
104}
105
106pub fn serialize_physical_window_expr(
107    window_expr: &Arc<dyn WindowExpr>,
108    codec: &dyn PhysicalExtensionCodec,
109    proto_converter: &dyn PhysicalProtoConverterExtension,
110) -> Result<protobuf::PhysicalWindowExprNode> {
111    let expr = window_expr.as_any();
112    let mut args = window_expr.expressions().to_vec();
113    let window_frame = window_expr.get_window_frame();
114
115    let (window_function, fun_definition, ignore_nulls, distinct) =
116        if let Some(plain_aggr_window_expr) =
117            expr.downcast_ref::<PlainAggregateWindowExpr>()
118        {
119            let aggr_expr = plain_aggr_window_expr.get_aggregate_expr();
120            let (window_function, fun_definition) =
121                serialize_physical_window_aggr_expr(aggr_expr, window_frame, codec)?;
122            (
123                window_function,
124                fun_definition,
125                aggr_expr.ignore_nulls(),
126                aggr_expr.is_distinct(),
127            )
128        } else if let Some(sliding_aggr_window_expr) =
129            expr.downcast_ref::<SlidingAggregateWindowExpr>()
130        {
131            let aggr_expr = sliding_aggr_window_expr.get_aggregate_expr();
132            let (window_function, fun_definition) =
133                serialize_physical_window_aggr_expr(aggr_expr, window_frame, codec)?;
134            (
135                window_function,
136                fun_definition,
137                aggr_expr.ignore_nulls(),
138                aggr_expr.is_distinct(),
139            )
140        } else if let Some(udf_window_expr) = expr.downcast_ref::<StandardWindowExpr>() {
141            if let Some(expr) = udf_window_expr
142                .get_standard_func_expr()
143                .as_any()
144                .downcast_ref::<WindowUDFExpr>()
145            {
146                let mut buf = Vec::new();
147                codec.try_encode_udwf(expr.fun(), &mut buf)?;
148                args = expr.args().to_vec();
149                (
150                    physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(
151                        expr.fun().name().to_string(),
152                    ),
153                    (!buf.is_empty()).then_some(buf),
154                    false, // WindowUDFExpr doesn't have ignore_nulls/distinct
155                    false,
156                )
157            } else {
158                return not_impl_err!(
159                    "User-defined window function not supported: {window_expr:?}"
160                );
161            }
162        } else {
163            return not_impl_err!("WindowExpr not supported: {window_expr:?}");
164        };
165
166    let args = serialize_physical_exprs(&args, codec, proto_converter)?;
167    let partition_by =
168        serialize_physical_exprs(window_expr.partition_by(), codec, proto_converter)?;
169    let order_by = serialize_physical_sort_exprs(
170        window_expr.order_by().to_vec(),
171        codec,
172        proto_converter,
173    )?;
174    let window_frame: protobuf::WindowFrame = window_frame
175        .as_ref()
176        .try_into()
177        .map_err(|e| internal_datafusion_err!("{e}"))?;
178
179    Ok(protobuf::PhysicalWindowExprNode {
180        args,
181        partition_by,
182        order_by,
183        window_frame: Some(window_frame),
184        window_function: Some(window_function),
185        name: window_expr.name().to_string(),
186        fun_definition,
187        ignore_nulls,
188        distinct,
189    })
190}
191
192pub fn serialize_physical_sort_exprs<I>(
193    sort_exprs: I,
194    codec: &dyn PhysicalExtensionCodec,
195    proto_converter: &dyn PhysicalProtoConverterExtension,
196) -> Result<Vec<PhysicalSortExprNode>>
197where
198    I: IntoIterator<Item = PhysicalSortExpr>,
199{
200    sort_exprs
201        .into_iter()
202        .map(|sort_expr| serialize_physical_sort_expr(sort_expr, codec, proto_converter))
203        .collect()
204}
205
206pub fn serialize_physical_sort_expr(
207    sort_expr: PhysicalSortExpr,
208    codec: &dyn PhysicalExtensionCodec,
209    proto_converter: &dyn PhysicalProtoConverterExtension,
210) -> Result<PhysicalSortExprNode> {
211    let PhysicalSortExpr { expr, options } = sort_expr;
212    let expr = proto_converter.physical_expr_to_proto(&expr, codec)?;
213    Ok(PhysicalSortExprNode {
214        expr: Some(Box::new(expr)),
215        asc: !options.descending,
216        nulls_first: options.nulls_first,
217    })
218}
219
220pub fn serialize_physical_exprs<'a, I>(
221    values: I,
222    codec: &dyn PhysicalExtensionCodec,
223    proto_converter: &dyn PhysicalProtoConverterExtension,
224) -> Result<Vec<protobuf::PhysicalExprNode>>
225where
226    I: IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
227{
228    values
229        .into_iter()
230        .map(|value| proto_converter.physical_expr_to_proto(value, codec))
231        .collect()
232}
233
234/// Serialize a `PhysicalExpr` to default protobuf representation.
235///
236/// If required, a [`PhysicalExtensionCodec`] can be provided which can handle
237/// serialization of udfs requiring specialized serialization (see [`PhysicalExtensionCodec::try_encode_udf`])
238pub fn serialize_physical_expr(
239    value: &Arc<dyn PhysicalExpr>,
240    codec: &dyn PhysicalExtensionCodec,
241) -> Result<protobuf::PhysicalExprNode> {
242    serialize_physical_expr_with_converter(
243        value,
244        codec,
245        &DefaultPhysicalProtoConverter {},
246    )
247}
248
249/// Serialize a `PhysicalExpr` to default protobuf representation.
250///
251/// If required, a [`PhysicalExtensionCodec`] can be provided which can handle
252/// serialization of udfs requiring specialized serialization (see [`PhysicalExtensionCodec::try_encode_udf`]).
253/// A [`PhysicalProtoConverterExtension`] can be provided to handle the
254/// conversion process (see [`PhysicalProtoConverterExtension::physical_expr_to_proto`]).
255pub fn serialize_physical_expr_with_converter(
256    value: &Arc<dyn PhysicalExpr>,
257    codec: &dyn PhysicalExtensionCodec,
258    proto_converter: &dyn PhysicalProtoConverterExtension,
259) -> Result<protobuf::PhysicalExprNode> {
260    // Snapshot the expr in case it has dynamic predicate state so
261    // it can be serialized
262    let value = snapshot_physical_expr(Arc::clone(value))?;
263    let expr = value.as_any();
264
265    // HashTableLookupExpr is used for dynamic filter pushdown in hash joins.
266    // It contains an Arc<dyn JoinHashMapType> (the build-side hash table) which
267    // cannot be serialized - the hash table is a runtime structure built during
268    // execution on the build side.
269    //
270    // We replace it with lit(true) which is safe because:
271    // 1. The filter is a performance optimization, not a correctness requirement
272    // 2. lit(true) passes all rows, so no valid rows are incorrectly filtered out
273    // 3. The join itself will still produce correct results, just without the
274    //    benefit of early filtering on the probe side
275    //
276    // In distributed execution, the remote worker won't have access to the hash
277    // table anyway, so the best we can do is skip this optimization.
278    if expr.downcast_ref::<HashTableLookupExpr>().is_some() {
279        let value = datafusion_proto_common::ScalarValue {
280            value: Some(datafusion_proto_common::scalar_value::Value::BoolValue(
281                true,
282            )),
283        };
284        return Ok(protobuf::PhysicalExprNode {
285            expr_id: None,
286            expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(value)),
287        });
288    }
289
290    if let Some(expr) = expr.downcast_ref::<Column>() {
291        Ok(protobuf::PhysicalExprNode {
292            expr_id: None,
293            expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
294                protobuf::PhysicalColumn {
295                    name: expr.name().to_string(),
296                    index: expr.index() as u32,
297                },
298            )),
299        })
300    } else if let Some(expr) = expr.downcast_ref::<UnKnownColumn>() {
301        Ok(protobuf::PhysicalExprNode {
302            expr_id: None,
303            expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn(
304                protobuf::UnknownColumn {
305                    name: expr.name().to_string(),
306                },
307            )),
308        })
309    } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
310        let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
311            l: Some(Box::new(
312                proto_converter.physical_expr_to_proto(expr.left(), codec)?,
313            )),
314            r: Some(Box::new(
315                proto_converter.physical_expr_to_proto(expr.right(), codec)?,
316            )),
317            op: format!("{:?}", expr.op()),
318        });
319
320        Ok(protobuf::PhysicalExprNode {
321            expr_id: None,
322            expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
323                binary_expr,
324            )),
325        })
326    } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
327        Ok(protobuf::PhysicalExprNode {
328            expr_id: None,
329            expr_type: Some(
330                protobuf::physical_expr_node::ExprType::Case(
331                    Box::new(
332                        protobuf::PhysicalCaseNode {
333                            expr: expr
334                                .expr()
335                                .map(|exp| {
336                                    proto_converter
337                                        .physical_expr_to_proto(exp, codec)
338                                        .map(Box::new)
339                                })
340                                .transpose()?,
341                            when_then_expr: expr
342                                .when_then_expr()
343                                .iter()
344                                .map(|(when_expr, then_expr)| {
345                                    serialize_when_then_expr(
346                                        when_expr,
347                                        then_expr,
348                                        codec,
349                                        proto_converter,
350                                    )
351                                })
352                                .collect::<Result<
353                                    Vec<protobuf::PhysicalWhenThen>,
354                                    DataFusionError,
355                                >>()?,
356                            else_expr: expr
357                                .else_expr()
358                                .map(|a| {
359                                    proto_converter
360                                        .physical_expr_to_proto(a, codec)
361                                        .map(Box::new)
362                                })
363                                .transpose()?,
364                        },
365                    ),
366                ),
367            ),
368        })
369    } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
370        Ok(protobuf::PhysicalExprNode {
371            expr_id: None,
372            expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
373                protobuf::PhysicalNot {
374                    expr: Some(Box::new(
375                        proto_converter.physical_expr_to_proto(expr.arg(), codec)?,
376                    )),
377                },
378            ))),
379        })
380    } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
381        Ok(protobuf::PhysicalExprNode {
382            expr_id: None,
383            expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
384                Box::new(protobuf::PhysicalIsNull {
385                    expr: Some(Box::new(
386                        proto_converter.physical_expr_to_proto(expr.arg(), codec)?,
387                    )),
388                }),
389            )),
390        })
391    } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
392        Ok(protobuf::PhysicalExprNode {
393            expr_id: None,
394            expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
395                Box::new(protobuf::PhysicalIsNotNull {
396                    expr: Some(Box::new(
397                        proto_converter.physical_expr_to_proto(expr.arg(), codec)?,
398                    )),
399                }),
400            )),
401        })
402    } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
403        Ok(protobuf::PhysicalExprNode {
404            expr_id: None,
405            expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new(
406                protobuf::PhysicalInListNode {
407                    expr: Some(Box::new(
408                        proto_converter.physical_expr_to_proto(expr.expr(), codec)?,
409                    )),
410                    list: serialize_physical_exprs(expr.list(), codec, proto_converter)?,
411                    negated: expr.negated(),
412                },
413            ))),
414        })
415    } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
416        Ok(protobuf::PhysicalExprNode {
417            expr_id: None,
418            expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
419                protobuf::PhysicalNegativeNode {
420                    expr: Some(Box::new(
421                        proto_converter.physical_expr_to_proto(expr.arg(), codec)?,
422                    )),
423                },
424            ))),
425        })
426    } else if let Some(lit) = expr.downcast_ref::<Literal>() {
427        Ok(protobuf::PhysicalExprNode {
428            expr_id: None,
429            expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
430                lit.value().try_into()?,
431            )),
432        })
433    } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
434        Ok(protobuf::PhysicalExprNode {
435            expr_id: None,
436            expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
437                protobuf::PhysicalCastNode {
438                    expr: Some(Box::new(
439                        proto_converter.physical_expr_to_proto(cast.expr(), codec)?,
440                    )),
441                    arrow_type: Some(cast.cast_type().try_into()?),
442                },
443            ))),
444        })
445    } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
446        Ok(protobuf::PhysicalExprNode {
447            expr_id: None,
448            expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new(
449                protobuf::PhysicalTryCastNode {
450                    expr: Some(Box::new(
451                        proto_converter.physical_expr_to_proto(cast.expr(), codec)?,
452                    )),
453                    arrow_type: Some(cast.cast_type().try_into()?),
454                },
455            ))),
456        })
457    } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
458        let mut buf = Vec::new();
459        codec.try_encode_udf(expr.fun(), &mut buf)?;
460        Ok(protobuf::PhysicalExprNode {
461            expr_id: None,
462            expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf(
463                protobuf::PhysicalScalarUdfNode {
464                    name: expr.name().to_string(),
465                    args: serialize_physical_exprs(expr.args(), codec, proto_converter)?,
466                    fun_definition: (!buf.is_empty()).then_some(buf),
467                    return_type: Some(expr.return_type().try_into()?),
468                    nullable: expr.nullable(),
469                    return_field_name: expr
470                        .return_field(&Schema::empty())?
471                        .name()
472                        .to_string(),
473                },
474            )),
475        })
476    } else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
477        Ok(protobuf::PhysicalExprNode {
478            expr_id: None,
479            expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new(
480                protobuf::PhysicalLikeExprNode {
481                    negated: expr.negated(),
482                    case_insensitive: expr.case_insensitive(),
483                    expr: Some(Box::new(
484                        proto_converter.physical_expr_to_proto(expr.expr(), codec)?,
485                    )),
486                    pattern: Some(Box::new(
487                        proto_converter.physical_expr_to_proto(expr.pattern(), codec)?,
488                    )),
489                },
490            ))),
491        })
492    } else if let Some(expr) = expr.downcast_ref::<HashExpr>() {
493        let (s0, s1, s2, s3) = expr.seeds();
494        Ok(protobuf::PhysicalExprNode {
495            expr_id: None,
496            expr_type: Some(protobuf::physical_expr_node::ExprType::HashExpr(
497                protobuf::PhysicalHashExprNode {
498                    on_columns: serialize_physical_exprs(
499                        expr.on_columns(),
500                        codec,
501                        proto_converter,
502                    )?,
503                    seed0: s0,
504                    seed1: s1,
505                    seed2: s2,
506                    seed3: s3,
507                    description: expr.description().to_string(),
508                },
509            )),
510        })
511    } else {
512        let mut buf: Vec<u8> = vec![];
513        match codec.try_encode_expr(&value, &mut buf) {
514            Ok(_) => {
515                let inputs: Vec<protobuf::PhysicalExprNode> = value
516                    .children()
517                    .into_iter()
518                    .map(|e| proto_converter.physical_expr_to_proto(e, codec))
519                    .collect::<Result<_>>()?;
520                Ok(protobuf::PhysicalExprNode {
521                    expr_id: None,
522                    expr_type: Some(protobuf::physical_expr_node::ExprType::Extension(
523                        protobuf::PhysicalExtensionExprNode { expr: buf, inputs },
524                    )),
525                })
526            }
527            Err(e) => internal_err!(
528                "Unsupported physical expr and extension codec failed with [{e}]. Expr: {value:?}"
529            ),
530        }
531    }
532}
533
534pub fn serialize_partitioning(
535    partitioning: &Partitioning,
536    codec: &dyn PhysicalExtensionCodec,
537    proto_converter: &dyn PhysicalProtoConverterExtension,
538) -> Result<protobuf::Partitioning> {
539    let serialized_partitioning = match partitioning {
540        Partitioning::RoundRobinBatch(partition_count) => protobuf::Partitioning {
541            partition_method: Some(protobuf::partitioning::PartitionMethod::RoundRobin(
542                *partition_count as u64,
543            )),
544        },
545        Partitioning::Hash(exprs, partition_count) => {
546            let serialized_exprs =
547                serialize_physical_exprs(exprs, codec, proto_converter)?;
548            protobuf::Partitioning {
549                partition_method: Some(protobuf::partitioning::PartitionMethod::Hash(
550                    protobuf::PhysicalHashRepartition {
551                        hash_expr: serialized_exprs,
552                        partition_count: *partition_count as u64,
553                    },
554                )),
555            }
556        }
557        Partitioning::UnknownPartitioning(partition_count) => protobuf::Partitioning {
558            partition_method: Some(protobuf::partitioning::PartitionMethod::Unknown(
559                *partition_count as u64,
560            )),
561        },
562    };
563    Ok(serialized_partitioning)
564}
565
566fn serialize_when_then_expr(
567    when_expr: &Arc<dyn PhysicalExpr>,
568    then_expr: &Arc<dyn PhysicalExpr>,
569    codec: &dyn PhysicalExtensionCodec,
570    proto_converter: &dyn PhysicalProtoConverterExtension,
571) -> Result<protobuf::PhysicalWhenThen> {
572    Ok(protobuf::PhysicalWhenThen {
573        when_expr: Some(proto_converter.physical_expr_to_proto(when_expr, codec)?),
574        then_expr: Some(proto_converter.physical_expr_to_proto(then_expr, codec)?),
575    })
576}
577
578impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile {
579    type Error = DataFusionError;
580
581    fn try_from(pf: &PartitionedFile) -> Result<Self> {
582        let last_modified = pf.object_meta.last_modified;
583        let last_modified_ns = last_modified.timestamp_nanos_opt().ok_or_else(|| {
584            DataFusionError::Plan(format!(
585                "Invalid timestamp on PartitionedFile::ObjectMeta: {last_modified}"
586            ))
587        })? as u64;
588        Ok(protobuf::PartitionedFile {
589            path: pf.object_meta.location.as_ref().to_owned(),
590            size: pf.object_meta.size,
591            last_modified_ns,
592            partition_values: pf
593                .partition_values
594                .iter()
595                .map(|v| v.try_into())
596                .collect::<Result<Vec<_>, _>>()?,
597            range: pf.range.as_ref().map(|r| r.try_into()).transpose()?,
598            statistics: pf.statistics.as_ref().map(|s| s.as_ref().into()),
599        })
600    }
601}
602
603impl TryFrom<&FileRange> for protobuf::FileRange {
604    type Error = DataFusionError;
605
606    fn try_from(value: &FileRange) -> Result<Self> {
607        Ok(protobuf::FileRange {
608            start: value.start,
609            end: value.end,
610        })
611    }
612}
613
614impl TryFrom<&[PartitionedFile]> for protobuf::FileGroup {
615    type Error = DataFusionError;
616
617    fn try_from(gr: &[PartitionedFile]) -> Result<Self, Self::Error> {
618        Ok(protobuf::FileGroup {
619            files: gr
620                .iter()
621                .map(|f| f.try_into())
622                .collect::<Result<Vec<_>, _>>()?,
623        })
624    }
625}
626
627pub fn serialize_file_scan_config(
628    conf: &FileScanConfig,
629    codec: &dyn PhysicalExtensionCodec,
630    proto_converter: &dyn PhysicalProtoConverterExtension,
631) -> Result<protobuf::FileScanExecConf> {
632    let file_groups = conf
633        .file_groups
634        .iter()
635        .map(|p| p.files().try_into())
636        .collect::<Result<Vec<_>, _>>()?;
637
638    let mut output_orderings = vec![];
639    for order in &conf.output_ordering {
640        let ordering =
641            serialize_physical_sort_exprs(order.to_vec(), codec, proto_converter)?;
642        output_orderings.push(ordering)
643    }
644
645    // Fields must be added to the schema so that they can persist in the protobuf,
646    // and then they are to be removed from the schema in `parse_protobuf_file_scan_config`
647    let mut fields = conf
648        .file_schema()
649        .fields()
650        .iter()
651        .cloned()
652        .collect::<Vec<_>>();
653    fields.extend(conf.table_partition_cols().iter().cloned());
654
655    let schema = Arc::new(
656        Schema::new(fields.clone()).with_metadata(conf.file_schema().metadata.clone()),
657    );
658
659    let projection_exprs = conf
660        .file_source
661        .projection()
662        .as_ref()
663        .map(|projection_exprs| {
664            let projections = projection_exprs.iter().cloned().collect::<Vec<_>>();
665            Ok::<_, DataFusionError>(protobuf::ProjectionExprs {
666                projections: projections
667                    .into_iter()
668                    .map(|expr| {
669                        Ok(protobuf::ProjectionExpr {
670                            alias: expr.alias.to_string(),
671                            expr: Some(
672                                proto_converter
673                                    .physical_expr_to_proto(&expr.expr, codec)?,
674                            ),
675                        })
676                    })
677                    .collect::<Result<Vec<_>>>()?,
678            })
679        })
680        .transpose()?;
681
682    Ok(protobuf::FileScanExecConf {
683        file_groups,
684        statistics: Some((&conf.statistics()).into()),
685        limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }),
686        projection: vec![],
687        schema: Some(schema.as_ref().try_into()?),
688        table_partition_cols: conf
689            .table_partition_cols()
690            .iter()
691            .map(|x| x.name().clone())
692            .collect::<Vec<_>>(),
693        object_store_url: conf.object_store_url.to_string(),
694        output_ordering: output_orderings
695            .into_iter()
696            .map(|e| PhysicalSortExprNodeCollection {
697                physical_sort_expr_nodes: e,
698            })
699            .collect::<Vec<_>>(),
700        constraints: Some(conf.constraints.clone().into()),
701        batch_size: conf.batch_size.map(|s| s as u64),
702        projection_exprs,
703    })
704}
705
706pub fn serialize_maybe_filter(
707    expr: Option<Arc<dyn PhysicalExpr>>,
708    codec: &dyn PhysicalExtensionCodec,
709    proto_converter: &dyn PhysicalProtoConverterExtension,
710) -> Result<protobuf::MaybeFilter> {
711    match expr {
712        None => Ok(protobuf::MaybeFilter { expr: None }),
713        Some(expr) => Ok(protobuf::MaybeFilter {
714            expr: Some(proto_converter.physical_expr_to_proto(&expr, codec)?),
715        }),
716    }
717}
718
719pub fn serialize_record_batches(batches: &[RecordBatch]) -> Result<Vec<u8>> {
720    if batches.is_empty() {
721        return Ok(vec![]);
722    }
723    let schema = batches[0].schema();
724    let mut buf = Vec::new();
725    let mut writer = StreamWriter::try_new(&mut buf, &schema)?;
726    for batch in batches {
727        writer.write(batch)?;
728    }
729    writer.finish()?;
730    Ok(buf)
731}
732
733impl TryFrom<&JsonSink> for protobuf::JsonSink {
734    type Error = DataFusionError;
735
736    fn try_from(value: &JsonSink) -> Result<Self, Self::Error> {
737        Ok(Self {
738            config: Some(value.config().try_into()?),
739            writer_options: Some(value.writer_options().try_into()?),
740        })
741    }
742}
743
744impl TryFrom<&CsvSink> for protobuf::CsvSink {
745    type Error = DataFusionError;
746
747    fn try_from(value: &CsvSink) -> Result<Self, Self::Error> {
748        Ok(Self {
749            config: Some(value.config().try_into()?),
750            writer_options: Some(value.writer_options().try_into()?),
751        })
752    }
753}
754
755#[cfg(feature = "parquet")]
756impl TryFrom<&ParquetSink> for protobuf::ParquetSink {
757    type Error = DataFusionError;
758
759    fn try_from(value: &ParquetSink) -> Result<Self, Self::Error> {
760        Ok(Self {
761            config: Some(value.config().try_into()?),
762            parquet_options: Some(value.parquet_options().try_into()?),
763        })
764    }
765}
766
767impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
768    type Error = DataFusionError;
769
770    fn try_from(conf: &FileSinkConfig) -> Result<Self, Self::Error> {
771        let file_groups = conf
772            .file_group
773            .iter()
774            .map(TryInto::try_into)
775            .collect::<Result<Vec<_>>>()?;
776        let table_paths = conf
777            .table_paths
778            .iter()
779            .map(ToString::to_string)
780            .collect::<Vec<_>>();
781        let table_partition_cols = conf
782            .table_partition_cols
783            .iter()
784            .map(|(name, data_type)| {
785                Ok(protobuf::PartitionColumn {
786                    name: name.to_owned(),
787                    arrow_type: Some(data_type.try_into()?),
788                })
789            })
790            .collect::<Result<Vec<_>>>()?;
791        let file_output_mode = match conf.file_output_mode {
792            datafusion_datasource::file_sink_config::FileOutputMode::Automatic => {
793                protobuf::FileOutputMode::Automatic
794            }
795            datafusion_datasource::file_sink_config::FileOutputMode::SingleFile => {
796                protobuf::FileOutputMode::SingleFile
797            }
798            datafusion_datasource::file_sink_config::FileOutputMode::Directory => {
799                protobuf::FileOutputMode::Directory
800            }
801        };
802        Ok(Self {
803            object_store_url: conf.object_store_url.to_string(),
804            file_groups,
805            table_paths,
806            output_schema: Some(conf.output_schema.as_ref().try_into()?),
807            table_partition_cols,
808            keep_partition_by_columns: conf.keep_partition_by_columns,
809            insert_op: conf.insert_op as i32,
810            file_extension: conf.file_extension.to_string(),
811            file_output_mode: file_output_mode.into(),
812        })
813    }
814}