datafusion_proto/physical_plan/
to_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20#[cfg(feature = "parquet")]
21use datafusion::datasource::file_format::parquet::ParquetSink;
22use datafusion::datasource::physical_plan::FileSink;
23use datafusion::physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
24use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
25use datafusion::physical_expr_common::physical_expr::snapshot_physical_expr;
26use datafusion::physical_plan::expressions::{
27    BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
28    Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
29};
30use datafusion::physical_plan::udaf::AggregateFunctionExpr;
31use datafusion::physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
32use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
33use datafusion::{
34    datasource::{
35        file_format::{csv::CsvSink, json::JsonSink},
36        listing::{FileRange, PartitionedFile},
37        physical_plan::{FileScanConfig, FileSinkConfig},
38    },
39    physical_plan::expressions::LikeExpr,
40};
41use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
42use datafusion_expr::WindowFrame;
43
44use crate::protobuf::{
45    self, physical_aggregate_expr_node, physical_window_expr_node, PhysicalSortExprNode,
46    PhysicalSortExprNodeCollection,
47};
48
49use super::PhysicalExtensionCodec;
50
51pub fn serialize_physical_aggr_expr(
52    aggr_expr: Arc<AggregateFunctionExpr>,
53    codec: &dyn PhysicalExtensionCodec,
54) -> Result<protobuf::PhysicalExprNode> {
55    let expressions = serialize_physical_exprs(&aggr_expr.expressions(), codec)?;
56    let ordering_req = match aggr_expr.order_bys() {
57        Some(order) => order.clone(),
58        None => LexOrdering::default(),
59    };
60    let ordering_req = serialize_physical_sort_exprs(ordering_req, codec)?;
61
62    let name = aggr_expr.fun().name().to_string();
63    let mut buf = Vec::new();
64    codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
65    Ok(protobuf::PhysicalExprNode {
66        expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
67            protobuf::PhysicalAggregateExprNode {
68                aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)),
69                expr: expressions,
70                ordering_req,
71                distinct: aggr_expr.is_distinct(),
72                ignore_nulls: aggr_expr.ignore_nulls(),
73                fun_definition: (!buf.is_empty()).then_some(buf),
74            },
75        )),
76    })
77}
78
79fn serialize_physical_window_aggr_expr(
80    aggr_expr: &AggregateFunctionExpr,
81    _window_frame: &WindowFrame,
82    codec: &dyn PhysicalExtensionCodec,
83) -> Result<(physical_window_expr_node::WindowFunction, Option<Vec<u8>>)> {
84    if aggr_expr.is_distinct() || aggr_expr.ignore_nulls() {
85        // TODO
86        return not_impl_err!(
87            "Distinct aggregate functions not supported in window expressions"
88        );
89    }
90
91    let mut buf = Vec::new();
92    codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
93    Ok((
94        physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(
95            aggr_expr.fun().name().to_string(),
96        ),
97        (!buf.is_empty()).then_some(buf),
98    ))
99}
100
101pub fn serialize_physical_window_expr(
102    window_expr: &Arc<dyn WindowExpr>,
103    codec: &dyn PhysicalExtensionCodec,
104) -> Result<protobuf::PhysicalWindowExprNode> {
105    let expr = window_expr.as_any();
106    let args = window_expr.expressions().to_vec();
107    let window_frame = window_expr.get_window_frame();
108
109    let (window_function, fun_definition) = if let Some(plain_aggr_window_expr) =
110        expr.downcast_ref::<PlainAggregateWindowExpr>()
111    {
112        serialize_physical_window_aggr_expr(
113            plain_aggr_window_expr.get_aggregate_expr(),
114            window_frame,
115            codec,
116        )?
117    } else if let Some(sliding_aggr_window_expr) =
118        expr.downcast_ref::<SlidingAggregateWindowExpr>()
119    {
120        serialize_physical_window_aggr_expr(
121            sliding_aggr_window_expr.get_aggregate_expr(),
122            window_frame,
123            codec,
124        )?
125    } else if let Some(udf_window_expr) = expr.downcast_ref::<StandardWindowExpr>() {
126        if let Some(expr) = udf_window_expr
127            .get_standard_func_expr()
128            .as_any()
129            .downcast_ref::<WindowUDFExpr>()
130        {
131            let mut buf = Vec::new();
132            codec.try_encode_udwf(expr.fun(), &mut buf)?;
133            (
134                physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(
135                    expr.fun().name().to_string(),
136                ),
137                (!buf.is_empty()).then_some(buf),
138            )
139        } else {
140            return not_impl_err!(
141                "User-defined window function not supported: {window_expr:?}"
142            );
143        }
144    } else {
145        return not_impl_err!("WindowExpr not supported: {window_expr:?}");
146    };
147
148    let args = serialize_physical_exprs(&args, codec)?;
149    let partition_by = serialize_physical_exprs(window_expr.partition_by(), codec)?;
150    let order_by = serialize_physical_sort_exprs(window_expr.order_by().to_vec(), codec)?;
151    let window_frame: protobuf::WindowFrame = window_frame
152        .as_ref()
153        .try_into()
154        .map_err(|e| DataFusionError::Internal(format!("{e}")))?;
155
156    Ok(protobuf::PhysicalWindowExprNode {
157        args,
158        partition_by,
159        order_by,
160        window_frame: Some(window_frame),
161        window_function: Some(window_function),
162        name: window_expr.name().to_string(),
163        fun_definition,
164    })
165}
166
167pub fn serialize_physical_sort_exprs<I>(
168    sort_exprs: I,
169    codec: &dyn PhysicalExtensionCodec,
170) -> Result<Vec<PhysicalSortExprNode>>
171where
172    I: IntoIterator<Item = PhysicalSortExpr>,
173{
174    sort_exprs
175        .into_iter()
176        .map(|sort_expr| serialize_physical_sort_expr(sort_expr, codec))
177        .collect()
178}
179
180pub fn serialize_physical_sort_expr(
181    sort_expr: PhysicalSortExpr,
182    codec: &dyn PhysicalExtensionCodec,
183) -> Result<PhysicalSortExprNode> {
184    let PhysicalSortExpr { expr, options } = sort_expr;
185    let expr = serialize_physical_expr(&expr, codec)?;
186    Ok(PhysicalSortExprNode {
187        expr: Some(Box::new(expr)),
188        asc: !options.descending,
189        nulls_first: options.nulls_first,
190    })
191}
192
193pub fn serialize_physical_exprs<'a, I>(
194    values: I,
195    codec: &dyn PhysicalExtensionCodec,
196) -> Result<Vec<protobuf::PhysicalExprNode>>
197where
198    I: IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
199{
200    values
201        .into_iter()
202        .map(|value| serialize_physical_expr(value, codec))
203        .collect()
204}
205
206/// Serialize a `PhysicalExpr` to default protobuf representation.
207///
208/// If required, a [`PhysicalExtensionCodec`] can be provided which can handle
209/// serialization of udfs requiring specialized serialization (see [`PhysicalExtensionCodec::try_encode_udf`])
210pub fn serialize_physical_expr(
211    value: &Arc<dyn PhysicalExpr>,
212    codec: &dyn PhysicalExtensionCodec,
213) -> Result<protobuf::PhysicalExprNode> {
214    // Snapshot the expr in case it has dynamic predicate state so
215    // it can be serialized
216    let value = snapshot_physical_expr(Arc::clone(value))?;
217    let expr = value.as_any();
218
219    if let Some(expr) = expr.downcast_ref::<Column>() {
220        Ok(protobuf::PhysicalExprNode {
221            expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
222                protobuf::PhysicalColumn {
223                    name: expr.name().to_string(),
224                    index: expr.index() as u32,
225                },
226            )),
227        })
228    } else if let Some(expr) = expr.downcast_ref::<UnKnownColumn>() {
229        Ok(protobuf::PhysicalExprNode {
230            expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn(
231                protobuf::UnknownColumn {
232                    name: expr.name().to_string(),
233                },
234            )),
235        })
236    } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
237        let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
238            l: Some(Box::new(serialize_physical_expr(expr.left(), codec)?)),
239            r: Some(Box::new(serialize_physical_expr(expr.right(), codec)?)),
240            op: format!("{:?}", expr.op()),
241        });
242
243        Ok(protobuf::PhysicalExprNode {
244            expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
245                binary_expr,
246            )),
247        })
248    } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
249        Ok(protobuf::PhysicalExprNode {
250            expr_type: Some(
251                protobuf::physical_expr_node::ExprType::Case(
252                    Box::new(
253                        protobuf::PhysicalCaseNode {
254                            expr: expr
255                                .expr()
256                                .map(|exp| {
257                                    serialize_physical_expr(exp, codec).map(Box::new)
258                                })
259                                .transpose()?,
260                            when_then_expr: expr
261                                .when_then_expr()
262                                .iter()
263                                .map(|(when_expr, then_expr)| {
264                                    serialize_when_then_expr(when_expr, then_expr, codec)
265                                })
266                                .collect::<Result<
267                                    Vec<protobuf::PhysicalWhenThen>,
268                                    DataFusionError,
269                                >>()?,
270                            else_expr: expr
271                                .else_expr()
272                                .map(|a| serialize_physical_expr(a, codec).map(Box::new))
273                                .transpose()?,
274                        },
275                    ),
276                ),
277            ),
278        })
279    } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
280        Ok(protobuf::PhysicalExprNode {
281            expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
282                protobuf::PhysicalNot {
283                    expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
284                },
285            ))),
286        })
287    } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
288        Ok(protobuf::PhysicalExprNode {
289            expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
290                Box::new(protobuf::PhysicalIsNull {
291                    expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
292                }),
293            )),
294        })
295    } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
296        Ok(protobuf::PhysicalExprNode {
297            expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
298                Box::new(protobuf::PhysicalIsNotNull {
299                    expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
300                }),
301            )),
302        })
303    } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
304        Ok(protobuf::PhysicalExprNode {
305            expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new(
306                protobuf::PhysicalInListNode {
307                    expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)),
308                    list: serialize_physical_exprs(expr.list(), codec)?,
309                    negated: expr.negated(),
310                },
311            ))),
312        })
313    } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
314        Ok(protobuf::PhysicalExprNode {
315            expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
316                protobuf::PhysicalNegativeNode {
317                    expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
318                },
319            ))),
320        })
321    } else if let Some(lit) = expr.downcast_ref::<Literal>() {
322        Ok(protobuf::PhysicalExprNode {
323            expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
324                lit.value().try_into()?,
325            )),
326        })
327    } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
328        Ok(protobuf::PhysicalExprNode {
329            expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
330                protobuf::PhysicalCastNode {
331                    expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)),
332                    arrow_type: Some(cast.cast_type().try_into()?),
333                },
334            ))),
335        })
336    } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
337        Ok(protobuf::PhysicalExprNode {
338            expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new(
339                protobuf::PhysicalTryCastNode {
340                    expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)),
341                    arrow_type: Some(cast.cast_type().try_into()?),
342                },
343            ))),
344        })
345    } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
346        let mut buf = Vec::new();
347        codec.try_encode_udf(expr.fun(), &mut buf)?;
348        Ok(protobuf::PhysicalExprNode {
349            expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf(
350                protobuf::PhysicalScalarUdfNode {
351                    name: expr.name().to_string(),
352                    args: serialize_physical_exprs(expr.args(), codec)?,
353                    fun_definition: (!buf.is_empty()).then_some(buf),
354                    return_type: Some(expr.return_type().try_into()?),
355                    nullable: expr.nullable(),
356                },
357            )),
358        })
359    } else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
360        Ok(protobuf::PhysicalExprNode {
361            expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new(
362                protobuf::PhysicalLikeExprNode {
363                    negated: expr.negated(),
364                    case_insensitive: expr.case_insensitive(),
365                    expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)),
366                    pattern: Some(Box::new(serialize_physical_expr(
367                        expr.pattern(),
368                        codec,
369                    )?)),
370                },
371            ))),
372        })
373    } else {
374        let mut buf: Vec<u8> = vec![];
375        match codec.try_encode_expr(&value, &mut buf) {
376            Ok(_) => {
377                let inputs: Vec<protobuf::PhysicalExprNode> = value
378                    .children()
379                    .into_iter()
380                    .map(|e| serialize_physical_expr(e, codec))
381                    .collect::<Result<_>>()?;
382                Ok(protobuf::PhysicalExprNode {
383                    expr_type: Some(protobuf::physical_expr_node::ExprType::Extension(
384                        protobuf::PhysicalExtensionExprNode { expr: buf, inputs },
385                    )),
386                })
387            }
388            Err(e) => internal_err!(
389                "Unsupported physical expr and extension codec failed with [{e}]. Expr: {value:?}"
390            ),
391        }
392    }
393}
394
395pub fn serialize_partitioning(
396    partitioning: &Partitioning,
397    codec: &dyn PhysicalExtensionCodec,
398) -> Result<protobuf::Partitioning> {
399    let serialized_partitioning = match partitioning {
400        Partitioning::RoundRobinBatch(partition_count) => protobuf::Partitioning {
401            partition_method: Some(protobuf::partitioning::PartitionMethod::RoundRobin(
402                *partition_count as u64,
403            )),
404        },
405        Partitioning::Hash(exprs, partition_count) => {
406            let serialized_exprs = serialize_physical_exprs(exprs, codec)?;
407            protobuf::Partitioning {
408                partition_method: Some(protobuf::partitioning::PartitionMethod::Hash(
409                    protobuf::PhysicalHashRepartition {
410                        hash_expr: serialized_exprs,
411                        partition_count: *partition_count as u64,
412                    },
413                )),
414            }
415        }
416        Partitioning::UnknownPartitioning(partition_count) => protobuf::Partitioning {
417            partition_method: Some(protobuf::partitioning::PartitionMethod::Unknown(
418                *partition_count as u64,
419            )),
420        },
421    };
422    Ok(serialized_partitioning)
423}
424
425fn serialize_when_then_expr(
426    when_expr: &Arc<dyn PhysicalExpr>,
427    then_expr: &Arc<dyn PhysicalExpr>,
428    codec: &dyn PhysicalExtensionCodec,
429) -> Result<protobuf::PhysicalWhenThen> {
430    Ok(protobuf::PhysicalWhenThen {
431        when_expr: Some(serialize_physical_expr(when_expr, codec)?),
432        then_expr: Some(serialize_physical_expr(then_expr, codec)?),
433    })
434}
435
436impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile {
437    type Error = DataFusionError;
438
439    fn try_from(pf: &PartitionedFile) -> Result<Self> {
440        let last_modified = pf.object_meta.last_modified;
441        let last_modified_ns = last_modified.timestamp_nanos_opt().ok_or_else(|| {
442            DataFusionError::Plan(format!(
443                "Invalid timestamp on PartitionedFile::ObjectMeta: {last_modified}"
444            ))
445        })? as u64;
446        Ok(protobuf::PartitionedFile {
447            path: pf.object_meta.location.as_ref().to_owned(),
448            size: pf.object_meta.size,
449            last_modified_ns,
450            partition_values: pf
451                .partition_values
452                .iter()
453                .map(|v| v.try_into())
454                .collect::<Result<Vec<_>, _>>()?,
455            range: pf.range.as_ref().map(|r| r.try_into()).transpose()?,
456            statistics: pf.statistics.as_ref().map(|s| s.as_ref().into()),
457        })
458    }
459}
460
461impl TryFrom<&FileRange> for protobuf::FileRange {
462    type Error = DataFusionError;
463
464    fn try_from(value: &FileRange) -> Result<Self> {
465        Ok(protobuf::FileRange {
466            start: value.start,
467            end: value.end,
468        })
469    }
470}
471
472impl TryFrom<&[PartitionedFile]> for protobuf::FileGroup {
473    type Error = DataFusionError;
474
475    fn try_from(gr: &[PartitionedFile]) -> Result<Self, Self::Error> {
476        Ok(protobuf::FileGroup {
477            files: gr
478                .iter()
479                .map(|f| f.try_into())
480                .collect::<Result<Vec<_>, _>>()?,
481        })
482    }
483}
484
485pub fn serialize_file_scan_config(
486    conf: &FileScanConfig,
487    codec: &dyn PhysicalExtensionCodec,
488) -> Result<protobuf::FileScanExecConf> {
489    let file_groups = conf
490        .file_groups
491        .iter()
492        .map(|p| p.files().try_into())
493        .collect::<Result<Vec<_>, _>>()?;
494
495    let mut output_orderings = vec![];
496    for order in &conf.output_ordering {
497        let ordering = serialize_physical_sort_exprs(order.to_vec(), codec)?;
498        output_orderings.push(ordering)
499    }
500
501    // Fields must be added to the schema so that they can persist in the protobuf,
502    // and then they are to be removed from the schema in `parse_protobuf_file_scan_config`
503    let mut fields = conf
504        .file_schema
505        .fields()
506        .iter()
507        .cloned()
508        .collect::<Vec<_>>();
509    fields.extend(conf.table_partition_cols.iter().cloned().map(Arc::new));
510    let schema = Arc::new(arrow::datatypes::Schema::new(fields.clone()));
511
512    Ok(protobuf::FileScanExecConf {
513        file_groups,
514        statistics: Some((&conf.file_source.statistics().unwrap()).into()),
515        limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }),
516        projection: conf
517            .projection
518            .as_ref()
519            .unwrap_or(&(0..schema.fields().len()).collect::<Vec<_>>())
520            .iter()
521            .map(|n| *n as u32)
522            .collect(),
523        schema: Some(schema.as_ref().try_into()?),
524        table_partition_cols: conf
525            .table_partition_cols
526            .iter()
527            .map(|x| x.name().clone())
528            .collect::<Vec<_>>(),
529        object_store_url: conf.object_store_url.to_string(),
530        output_ordering: output_orderings
531            .into_iter()
532            .map(|e| PhysicalSortExprNodeCollection {
533                physical_sort_expr_nodes: e,
534            })
535            .collect::<Vec<_>>(),
536        constraints: Some(conf.constraints.clone().into()),
537        batch_size: conf.batch_size.map(|s| s as u64),
538    })
539}
540
541pub fn serialize_maybe_filter(
542    expr: Option<Arc<dyn PhysicalExpr>>,
543    codec: &dyn PhysicalExtensionCodec,
544) -> Result<protobuf::MaybeFilter> {
545    match expr {
546        None => Ok(protobuf::MaybeFilter { expr: None }),
547        Some(expr) => Ok(protobuf::MaybeFilter {
548            expr: Some(serialize_physical_expr(&expr, codec)?),
549        }),
550    }
551}
552
553impl TryFrom<&JsonSink> for protobuf::JsonSink {
554    type Error = DataFusionError;
555
556    fn try_from(value: &JsonSink) -> Result<Self, Self::Error> {
557        Ok(Self {
558            config: Some(value.config().try_into()?),
559            writer_options: Some(value.writer_options().try_into()?),
560        })
561    }
562}
563
564impl TryFrom<&CsvSink> for protobuf::CsvSink {
565    type Error = DataFusionError;
566
567    fn try_from(value: &CsvSink) -> Result<Self, Self::Error> {
568        Ok(Self {
569            config: Some(value.config().try_into()?),
570            writer_options: Some(value.writer_options().try_into()?),
571        })
572    }
573}
574
575#[cfg(feature = "parquet")]
576impl TryFrom<&ParquetSink> for protobuf::ParquetSink {
577    type Error = DataFusionError;
578
579    fn try_from(value: &ParquetSink) -> Result<Self, Self::Error> {
580        Ok(Self {
581            config: Some(value.config().try_into()?),
582            parquet_options: Some(value.parquet_options().try_into()?),
583        })
584    }
585}
586
587impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
588    type Error = DataFusionError;
589
590    fn try_from(conf: &FileSinkConfig) -> Result<Self, Self::Error> {
591        let file_groups = conf
592            .file_group
593            .iter()
594            .map(TryInto::try_into)
595            .collect::<Result<Vec<_>>>()?;
596        let table_paths = conf
597            .table_paths
598            .iter()
599            .map(ToString::to_string)
600            .collect::<Vec<_>>();
601        let table_partition_cols = conf
602            .table_partition_cols
603            .iter()
604            .map(|(name, data_type)| {
605                Ok(protobuf::PartitionColumn {
606                    name: name.to_owned(),
607                    arrow_type: Some(data_type.try_into()?),
608                })
609            })
610            .collect::<Result<Vec<_>>>()?;
611        Ok(Self {
612            object_store_url: conf.object_store_url.to_string(),
613            file_groups,
614            table_paths,
615            output_schema: Some(conf.output_schema.as_ref().try_into()?),
616            table_partition_cols,
617            keep_partition_by_columns: conf.keep_partition_by_columns,
618            insert_op: conf.insert_op as i32,
619            file_extension: conf.file_extension.to_string(),
620        })
621    }
622}