datafusion_proto/physical_plan/
to_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::array::RecordBatch;
21use arrow::datatypes::Schema;
22use arrow::ipc::writer::StreamWriter;
23#[cfg(feature = "parquet")]
24use datafusion::datasource::file_format::parquet::ParquetSink;
25use datafusion::datasource::physical_plan::FileSink;
26use datafusion::physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
27use datafusion::physical_expr::ScalarFunctionExpr;
28use datafusion::physical_expr_common::physical_expr::snapshot_physical_expr;
29use datafusion::physical_expr_common::sort_expr::PhysicalSortExpr;
30use datafusion::physical_plan::expressions::{
31    BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
32    Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
33};
34use datafusion::physical_plan::udaf::AggregateFunctionExpr;
35use datafusion::physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
36use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
37use datafusion::{
38    datasource::{
39        file_format::{csv::CsvSink, json::JsonSink},
40        listing::{FileRange, PartitionedFile},
41        physical_plan::{FileScanConfig, FileSinkConfig},
42    },
43    physical_plan::expressions::LikeExpr,
44};
45use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
46use datafusion_expr::WindowFrame;
47
48use crate::protobuf::{
49    self, physical_aggregate_expr_node, physical_window_expr_node, PhysicalSortExprNode,
50    PhysicalSortExprNodeCollection,
51};
52
53use super::PhysicalExtensionCodec;
54
55pub fn serialize_physical_aggr_expr(
56    aggr_expr: Arc<AggregateFunctionExpr>,
57    codec: &dyn PhysicalExtensionCodec,
58) -> Result<protobuf::PhysicalExprNode> {
59    let expressions = serialize_physical_exprs(&aggr_expr.expressions(), codec)?;
60    let order_bys =
61        serialize_physical_sort_exprs(aggr_expr.order_bys().iter().cloned(), codec)?;
62
63    let name = aggr_expr.fun().name().to_string();
64    let mut buf = Vec::new();
65    codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
66    Ok(protobuf::PhysicalExprNode {
67        expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
68            protobuf::PhysicalAggregateExprNode {
69                aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)),
70                expr: expressions,
71                ordering_req: order_bys,
72                distinct: aggr_expr.is_distinct(),
73                ignore_nulls: aggr_expr.ignore_nulls(),
74                fun_definition: (!buf.is_empty()).then_some(buf),
75                human_display: aggr_expr.human_display().to_string(),
76            },
77        )),
78    })
79}
80
81fn serialize_physical_window_aggr_expr(
82    aggr_expr: &AggregateFunctionExpr,
83    _window_frame: &WindowFrame,
84    codec: &dyn PhysicalExtensionCodec,
85) -> Result<(physical_window_expr_node::WindowFunction, Option<Vec<u8>>)> {
86    // Distinct and ignore_nulls are now supported in window expressions
87
88    let mut buf = Vec::new();
89    codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
90    Ok((
91        physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(
92            aggr_expr.fun().name().to_string(),
93        ),
94        (!buf.is_empty()).then_some(buf),
95    ))
96}
97
98pub fn serialize_physical_window_expr(
99    window_expr: &Arc<dyn WindowExpr>,
100    codec: &dyn PhysicalExtensionCodec,
101) -> Result<protobuf::PhysicalWindowExprNode> {
102    let expr = window_expr.as_any();
103    let args = window_expr.expressions().to_vec();
104    let window_frame = window_expr.get_window_frame();
105
106    let (window_function, fun_definition, ignore_nulls, distinct) =
107        if let Some(plain_aggr_window_expr) =
108            expr.downcast_ref::<PlainAggregateWindowExpr>()
109        {
110            let aggr_expr = plain_aggr_window_expr.get_aggregate_expr();
111            let (window_function, fun_definition) =
112                serialize_physical_window_aggr_expr(aggr_expr, window_frame, codec)?;
113            (
114                window_function,
115                fun_definition,
116                aggr_expr.ignore_nulls(),
117                aggr_expr.is_distinct(),
118            )
119        } else if let Some(sliding_aggr_window_expr) =
120            expr.downcast_ref::<SlidingAggregateWindowExpr>()
121        {
122            let aggr_expr = sliding_aggr_window_expr.get_aggregate_expr();
123            let (window_function, fun_definition) =
124                serialize_physical_window_aggr_expr(aggr_expr, window_frame, codec)?;
125            (
126                window_function,
127                fun_definition,
128                aggr_expr.ignore_nulls(),
129                aggr_expr.is_distinct(),
130            )
131        } else if let Some(udf_window_expr) = expr.downcast_ref::<StandardWindowExpr>() {
132            if let Some(expr) = udf_window_expr
133                .get_standard_func_expr()
134                .as_any()
135                .downcast_ref::<WindowUDFExpr>()
136            {
137                let mut buf = Vec::new();
138                codec.try_encode_udwf(expr.fun(), &mut buf)?;
139                (
140                    physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(
141                        expr.fun().name().to_string(),
142                    ),
143                    (!buf.is_empty()).then_some(buf),
144                    false, // WindowUDFExpr doesn't have ignore_nulls/distinct
145                    false,
146                )
147            } else {
148                return not_impl_err!(
149                    "User-defined window function not supported: {window_expr:?}"
150                );
151            }
152        } else {
153            return not_impl_err!("WindowExpr not supported: {window_expr:?}");
154        };
155
156    let args = serialize_physical_exprs(&args, codec)?;
157    let partition_by = serialize_physical_exprs(window_expr.partition_by(), codec)?;
158    let order_by = serialize_physical_sort_exprs(window_expr.order_by().to_vec(), codec)?;
159    let window_frame: protobuf::WindowFrame = window_frame
160        .as_ref()
161        .try_into()
162        .map_err(|e| DataFusionError::Internal(format!("{e}")))?;
163
164    Ok(protobuf::PhysicalWindowExprNode {
165        args,
166        partition_by,
167        order_by,
168        window_frame: Some(window_frame),
169        window_function: Some(window_function),
170        name: window_expr.name().to_string(),
171        fun_definition,
172        ignore_nulls,
173        distinct,
174    })
175}
176
177pub fn serialize_physical_sort_exprs<I>(
178    sort_exprs: I,
179    codec: &dyn PhysicalExtensionCodec,
180) -> Result<Vec<PhysicalSortExprNode>>
181where
182    I: IntoIterator<Item = PhysicalSortExpr>,
183{
184    sort_exprs
185        .into_iter()
186        .map(|sort_expr| serialize_physical_sort_expr(sort_expr, codec))
187        .collect()
188}
189
190pub fn serialize_physical_sort_expr(
191    sort_expr: PhysicalSortExpr,
192    codec: &dyn PhysicalExtensionCodec,
193) -> Result<PhysicalSortExprNode> {
194    let PhysicalSortExpr { expr, options } = sort_expr;
195    let expr = serialize_physical_expr(&expr, codec)?;
196    Ok(PhysicalSortExprNode {
197        expr: Some(Box::new(expr)),
198        asc: !options.descending,
199        nulls_first: options.nulls_first,
200    })
201}
202
203pub fn serialize_physical_exprs<'a, I>(
204    values: I,
205    codec: &dyn PhysicalExtensionCodec,
206) -> Result<Vec<protobuf::PhysicalExprNode>>
207where
208    I: IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
209{
210    values
211        .into_iter()
212        .map(|value| serialize_physical_expr(value, codec))
213        .collect()
214}
215
216/// Serialize a `PhysicalExpr` to default protobuf representation.
217///
218/// If required, a [`PhysicalExtensionCodec`] can be provided which can handle
219/// serialization of udfs requiring specialized serialization (see [`PhysicalExtensionCodec::try_encode_udf`])
220pub fn serialize_physical_expr(
221    value: &Arc<dyn PhysicalExpr>,
222    codec: &dyn PhysicalExtensionCodec,
223) -> Result<protobuf::PhysicalExprNode> {
224    // Snapshot the expr in case it has dynamic predicate state so
225    // it can be serialized
226    let value = snapshot_physical_expr(Arc::clone(value))?;
227    let expr = value.as_any();
228
229    if let Some(expr) = expr.downcast_ref::<Column>() {
230        Ok(protobuf::PhysicalExprNode {
231            expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
232                protobuf::PhysicalColumn {
233                    name: expr.name().to_string(),
234                    index: expr.index() as u32,
235                },
236            )),
237        })
238    } else if let Some(expr) = expr.downcast_ref::<UnKnownColumn>() {
239        Ok(protobuf::PhysicalExprNode {
240            expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn(
241                protobuf::UnknownColumn {
242                    name: expr.name().to_string(),
243                },
244            )),
245        })
246    } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
247        let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
248            l: Some(Box::new(serialize_physical_expr(expr.left(), codec)?)),
249            r: Some(Box::new(serialize_physical_expr(expr.right(), codec)?)),
250            op: format!("{:?}", expr.op()),
251        });
252
253        Ok(protobuf::PhysicalExprNode {
254            expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
255                binary_expr,
256            )),
257        })
258    } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
259        Ok(protobuf::PhysicalExprNode {
260            expr_type: Some(
261                protobuf::physical_expr_node::ExprType::Case(
262                    Box::new(
263                        protobuf::PhysicalCaseNode {
264                            expr: expr
265                                .expr()
266                                .map(|exp| {
267                                    serialize_physical_expr(exp, codec).map(Box::new)
268                                })
269                                .transpose()?,
270                            when_then_expr: expr
271                                .when_then_expr()
272                                .iter()
273                                .map(|(when_expr, then_expr)| {
274                                    serialize_when_then_expr(when_expr, then_expr, codec)
275                                })
276                                .collect::<Result<
277                                    Vec<protobuf::PhysicalWhenThen>,
278                                    DataFusionError,
279                                >>()?,
280                            else_expr: expr
281                                .else_expr()
282                                .map(|a| serialize_physical_expr(a, codec).map(Box::new))
283                                .transpose()?,
284                        },
285                    ),
286                ),
287            ),
288        })
289    } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
290        Ok(protobuf::PhysicalExprNode {
291            expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
292                protobuf::PhysicalNot {
293                    expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
294                },
295            ))),
296        })
297    } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
298        Ok(protobuf::PhysicalExprNode {
299            expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
300                Box::new(protobuf::PhysicalIsNull {
301                    expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
302                }),
303            )),
304        })
305    } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
306        Ok(protobuf::PhysicalExprNode {
307            expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
308                Box::new(protobuf::PhysicalIsNotNull {
309                    expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
310                }),
311            )),
312        })
313    } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
314        Ok(protobuf::PhysicalExprNode {
315            expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new(
316                protobuf::PhysicalInListNode {
317                    expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)),
318                    list: serialize_physical_exprs(expr.list(), codec)?,
319                    negated: expr.negated(),
320                },
321            ))),
322        })
323    } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
324        Ok(protobuf::PhysicalExprNode {
325            expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
326                protobuf::PhysicalNegativeNode {
327                    expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
328                },
329            ))),
330        })
331    } else if let Some(lit) = expr.downcast_ref::<Literal>() {
332        Ok(protobuf::PhysicalExprNode {
333            expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
334                lit.value().try_into()?,
335            )),
336        })
337    } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
338        Ok(protobuf::PhysicalExprNode {
339            expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
340                protobuf::PhysicalCastNode {
341                    expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)),
342                    arrow_type: Some(cast.cast_type().try_into()?),
343                },
344            ))),
345        })
346    } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
347        Ok(protobuf::PhysicalExprNode {
348            expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new(
349                protobuf::PhysicalTryCastNode {
350                    expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)),
351                    arrow_type: Some(cast.cast_type().try_into()?),
352                },
353            ))),
354        })
355    } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
356        let mut buf = Vec::new();
357        codec.try_encode_udf(expr.fun(), &mut buf)?;
358        Ok(protobuf::PhysicalExprNode {
359            expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf(
360                protobuf::PhysicalScalarUdfNode {
361                    name: expr.name().to_string(),
362                    args: serialize_physical_exprs(expr.args(), codec)?,
363                    fun_definition: (!buf.is_empty()).then_some(buf),
364                    return_type: Some(expr.return_type().try_into()?),
365                    nullable: expr.nullable(),
366                    return_field_name: expr
367                        .return_field(&Schema::empty())?
368                        .name()
369                        .to_string(),
370                },
371            )),
372        })
373    } else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
374        Ok(protobuf::PhysicalExprNode {
375            expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new(
376                protobuf::PhysicalLikeExprNode {
377                    negated: expr.negated(),
378                    case_insensitive: expr.case_insensitive(),
379                    expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)),
380                    pattern: Some(Box::new(serialize_physical_expr(
381                        expr.pattern(),
382                        codec,
383                    )?)),
384                },
385            ))),
386        })
387    } else {
388        let mut buf: Vec<u8> = vec![];
389        match codec.try_encode_expr(&value, &mut buf) {
390            Ok(_) => {
391                let inputs: Vec<protobuf::PhysicalExprNode> = value
392                    .children()
393                    .into_iter()
394                    .map(|e| serialize_physical_expr(e, codec))
395                    .collect::<Result<_>>()?;
396                Ok(protobuf::PhysicalExprNode {
397                    expr_type: Some(protobuf::physical_expr_node::ExprType::Extension(
398                        protobuf::PhysicalExtensionExprNode { expr: buf, inputs },
399                    )),
400                })
401            }
402            Err(e) => internal_err!(
403                "Unsupported physical expr and extension codec failed with [{e}]. Expr: {value:?}"
404            ),
405        }
406    }
407}
408
409pub fn serialize_partitioning(
410    partitioning: &Partitioning,
411    codec: &dyn PhysicalExtensionCodec,
412) -> Result<protobuf::Partitioning> {
413    let serialized_partitioning = match partitioning {
414        Partitioning::RoundRobinBatch(partition_count) => protobuf::Partitioning {
415            partition_method: Some(protobuf::partitioning::PartitionMethod::RoundRobin(
416                *partition_count as u64,
417            )),
418        },
419        Partitioning::Hash(exprs, partition_count) => {
420            let serialized_exprs = serialize_physical_exprs(exprs, codec)?;
421            protobuf::Partitioning {
422                partition_method: Some(protobuf::partitioning::PartitionMethod::Hash(
423                    protobuf::PhysicalHashRepartition {
424                        hash_expr: serialized_exprs,
425                        partition_count: *partition_count as u64,
426                    },
427                )),
428            }
429        }
430        Partitioning::UnknownPartitioning(partition_count) => protobuf::Partitioning {
431            partition_method: Some(protobuf::partitioning::PartitionMethod::Unknown(
432                *partition_count as u64,
433            )),
434        },
435    };
436    Ok(serialized_partitioning)
437}
438
439fn serialize_when_then_expr(
440    when_expr: &Arc<dyn PhysicalExpr>,
441    then_expr: &Arc<dyn PhysicalExpr>,
442    codec: &dyn PhysicalExtensionCodec,
443) -> Result<protobuf::PhysicalWhenThen> {
444    Ok(protobuf::PhysicalWhenThen {
445        when_expr: Some(serialize_physical_expr(when_expr, codec)?),
446        then_expr: Some(serialize_physical_expr(then_expr, codec)?),
447    })
448}
449
450impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile {
451    type Error = DataFusionError;
452
453    fn try_from(pf: &PartitionedFile) -> Result<Self> {
454        let last_modified = pf.object_meta.last_modified;
455        let last_modified_ns = last_modified.timestamp_nanos_opt().ok_or_else(|| {
456            DataFusionError::Plan(format!(
457                "Invalid timestamp on PartitionedFile::ObjectMeta: {last_modified}"
458            ))
459        })? as u64;
460        Ok(protobuf::PartitionedFile {
461            path: pf.object_meta.location.as_ref().to_owned(),
462            size: pf.object_meta.size,
463            last_modified_ns,
464            partition_values: pf
465                .partition_values
466                .iter()
467                .map(|v| v.try_into())
468                .collect::<Result<Vec<_>, _>>()?,
469            range: pf.range.as_ref().map(|r| r.try_into()).transpose()?,
470            statistics: pf.statistics.as_ref().map(|s| s.as_ref().into()),
471        })
472    }
473}
474
475impl TryFrom<&FileRange> for protobuf::FileRange {
476    type Error = DataFusionError;
477
478    fn try_from(value: &FileRange) -> Result<Self> {
479        Ok(protobuf::FileRange {
480            start: value.start,
481            end: value.end,
482        })
483    }
484}
485
486impl TryFrom<&[PartitionedFile]> for protobuf::FileGroup {
487    type Error = DataFusionError;
488
489    fn try_from(gr: &[PartitionedFile]) -> Result<Self, Self::Error> {
490        Ok(protobuf::FileGroup {
491            files: gr
492                .iter()
493                .map(|f| f.try_into())
494                .collect::<Result<Vec<_>, _>>()?,
495        })
496    }
497}
498
499pub fn serialize_file_scan_config(
500    conf: &FileScanConfig,
501    codec: &dyn PhysicalExtensionCodec,
502) -> Result<protobuf::FileScanExecConf> {
503    let file_groups = conf
504        .file_groups
505        .iter()
506        .map(|p| p.files().try_into())
507        .collect::<Result<Vec<_>, _>>()?;
508
509    let mut output_orderings = vec![];
510    for order in &conf.output_ordering {
511        let ordering = serialize_physical_sort_exprs(order.to_vec(), codec)?;
512        output_orderings.push(ordering)
513    }
514
515    // Fields must be added to the schema so that they can persist in the protobuf,
516    // and then they are to be removed from the schema in `parse_protobuf_file_scan_config`
517    let mut fields = conf
518        .file_schema
519        .fields()
520        .iter()
521        .cloned()
522        .collect::<Vec<_>>();
523    fields.extend(conf.table_partition_cols.iter().cloned());
524    let schema = Arc::new(arrow::datatypes::Schema::new(fields.clone()));
525
526    Ok(protobuf::FileScanExecConf {
527        file_groups,
528        statistics: Some((&conf.file_source.statistics().unwrap()).into()),
529        limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }),
530        projection: conf
531            .projection
532            .as_ref()
533            .unwrap_or(&(0..schema.fields().len()).collect::<Vec<_>>())
534            .iter()
535            .map(|n| *n as u32)
536            .collect(),
537        schema: Some(schema.as_ref().try_into()?),
538        table_partition_cols: conf
539            .table_partition_cols
540            .iter()
541            .map(|x| x.name().clone())
542            .collect::<Vec<_>>(),
543        object_store_url: conf.object_store_url.to_string(),
544        output_ordering: output_orderings
545            .into_iter()
546            .map(|e| PhysicalSortExprNodeCollection {
547                physical_sort_expr_nodes: e,
548            })
549            .collect::<Vec<_>>(),
550        constraints: Some(conf.constraints.clone().into()),
551        batch_size: conf.batch_size.map(|s| s as u64),
552    })
553}
554
555pub fn serialize_maybe_filter(
556    expr: Option<Arc<dyn PhysicalExpr>>,
557    codec: &dyn PhysicalExtensionCodec,
558) -> Result<protobuf::MaybeFilter> {
559    match expr {
560        None => Ok(protobuf::MaybeFilter { expr: None }),
561        Some(expr) => Ok(protobuf::MaybeFilter {
562            expr: Some(serialize_physical_expr(&expr, codec)?),
563        }),
564    }
565}
566
567pub fn serialize_record_batches(batches: &[RecordBatch]) -> Result<Vec<u8>> {
568    if batches.is_empty() {
569        return Ok(vec![]);
570    }
571    let schema = batches[0].schema();
572    let mut buf = Vec::new();
573    let mut writer = StreamWriter::try_new(&mut buf, &schema)?;
574    for batch in batches {
575        writer.write(batch)?;
576    }
577    writer.finish()?;
578    Ok(buf)
579}
580
581impl TryFrom<&JsonSink> for protobuf::JsonSink {
582    type Error = DataFusionError;
583
584    fn try_from(value: &JsonSink) -> Result<Self, Self::Error> {
585        Ok(Self {
586            config: Some(value.config().try_into()?),
587            writer_options: Some(value.writer_options().try_into()?),
588        })
589    }
590}
591
592impl TryFrom<&CsvSink> for protobuf::CsvSink {
593    type Error = DataFusionError;
594
595    fn try_from(value: &CsvSink) -> Result<Self, Self::Error> {
596        Ok(Self {
597            config: Some(value.config().try_into()?),
598            writer_options: Some(value.writer_options().try_into()?),
599        })
600    }
601}
602
603#[cfg(feature = "parquet")]
604impl TryFrom<&ParquetSink> for protobuf::ParquetSink {
605    type Error = DataFusionError;
606
607    fn try_from(value: &ParquetSink) -> Result<Self, Self::Error> {
608        Ok(Self {
609            config: Some(value.config().try_into()?),
610            parquet_options: Some(value.parquet_options().try_into()?),
611        })
612    }
613}
614
615impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
616    type Error = DataFusionError;
617
618    fn try_from(conf: &FileSinkConfig) -> Result<Self, Self::Error> {
619        let file_groups = conf
620            .file_group
621            .iter()
622            .map(TryInto::try_into)
623            .collect::<Result<Vec<_>>>()?;
624        let table_paths = conf
625            .table_paths
626            .iter()
627            .map(ToString::to_string)
628            .collect::<Vec<_>>();
629        let table_partition_cols = conf
630            .table_partition_cols
631            .iter()
632            .map(|(name, data_type)| {
633                Ok(protobuf::PartitionColumn {
634                    name: name.to_owned(),
635                    arrow_type: Some(data_type.try_into()?),
636                })
637            })
638            .collect::<Result<Vec<_>>>()?;
639        Ok(Self {
640            object_store_url: conf.object_store_url.to_string(),
641            file_groups,
642            table_paths,
643            output_schema: Some(conf.output_schema.as_ref().try_into()?),
644            table_partition_cols,
645            keep_partition_by_columns: conf.keep_partition_by_columns,
646            insert_op: conf.insert_op as i32,
647            file_extension: conf.file_extension.to_string(),
648        })
649    }
650}