datafusion_proto/physical_plan/
to_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20#[cfg(feature = "parquet")]
21use datafusion::datasource::file_format::parquet::ParquetSink;
22use datafusion::datasource::physical_plan::FileSink;
23use datafusion::physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
24use datafusion::physical_expr::ScalarFunctionExpr;
25use datafusion::physical_expr_common::physical_expr::snapshot_physical_expr;
26use datafusion::physical_expr_common::sort_expr::PhysicalSortExpr;
27use datafusion::physical_plan::expressions::{
28    BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
29    Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
30};
31use datafusion::physical_plan::udaf::AggregateFunctionExpr;
32use datafusion::physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
33use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
34use datafusion::{
35    datasource::{
36        file_format::{csv::CsvSink, json::JsonSink},
37        listing::{FileRange, PartitionedFile},
38        physical_plan::{FileScanConfig, FileSinkConfig},
39    },
40    physical_plan::expressions::LikeExpr,
41};
42use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
43use datafusion_expr::WindowFrame;
44
45use crate::protobuf::{
46    self, physical_aggregate_expr_node, physical_window_expr_node, PhysicalSortExprNode,
47    PhysicalSortExprNodeCollection,
48};
49
50use super::PhysicalExtensionCodec;
51
52pub fn serialize_physical_aggr_expr(
53    aggr_expr: Arc<AggregateFunctionExpr>,
54    codec: &dyn PhysicalExtensionCodec,
55) -> Result<protobuf::PhysicalExprNode> {
56    let expressions = serialize_physical_exprs(&aggr_expr.expressions(), codec)?;
57    let order_bys =
58        serialize_physical_sort_exprs(aggr_expr.order_bys().iter().cloned(), codec)?;
59
60    let name = aggr_expr.fun().name().to_string();
61    let mut buf = Vec::new();
62    codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
63    Ok(protobuf::PhysicalExprNode {
64        expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
65            protobuf::PhysicalAggregateExprNode {
66                aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)),
67                expr: expressions,
68                ordering_req: order_bys,
69                distinct: aggr_expr.is_distinct(),
70                ignore_nulls: aggr_expr.ignore_nulls(),
71                fun_definition: (!buf.is_empty()).then_some(buf),
72            },
73        )),
74    })
75}
76
77fn serialize_physical_window_aggr_expr(
78    aggr_expr: &AggregateFunctionExpr,
79    _window_frame: &WindowFrame,
80    codec: &dyn PhysicalExtensionCodec,
81) -> Result<(physical_window_expr_node::WindowFunction, Option<Vec<u8>>)> {
82    if aggr_expr.is_distinct() || aggr_expr.ignore_nulls() {
83        // TODO
84        return not_impl_err!(
85            "Distinct aggregate functions not supported in window expressions"
86        );
87    }
88
89    let mut buf = Vec::new();
90    codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
91    Ok((
92        physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(
93            aggr_expr.fun().name().to_string(),
94        ),
95        (!buf.is_empty()).then_some(buf),
96    ))
97}
98
99pub fn serialize_physical_window_expr(
100    window_expr: &Arc<dyn WindowExpr>,
101    codec: &dyn PhysicalExtensionCodec,
102) -> Result<protobuf::PhysicalWindowExprNode> {
103    let expr = window_expr.as_any();
104    let args = window_expr.expressions().to_vec();
105    let window_frame = window_expr.get_window_frame();
106
107    let (window_function, fun_definition) = if let Some(plain_aggr_window_expr) =
108        expr.downcast_ref::<PlainAggregateWindowExpr>()
109    {
110        serialize_physical_window_aggr_expr(
111            plain_aggr_window_expr.get_aggregate_expr(),
112            window_frame,
113            codec,
114        )?
115    } else if let Some(sliding_aggr_window_expr) =
116        expr.downcast_ref::<SlidingAggregateWindowExpr>()
117    {
118        serialize_physical_window_aggr_expr(
119            sliding_aggr_window_expr.get_aggregate_expr(),
120            window_frame,
121            codec,
122        )?
123    } else if let Some(udf_window_expr) = expr.downcast_ref::<StandardWindowExpr>() {
124        if let Some(expr) = udf_window_expr
125            .get_standard_func_expr()
126            .as_any()
127            .downcast_ref::<WindowUDFExpr>()
128        {
129            let mut buf = Vec::new();
130            codec.try_encode_udwf(expr.fun(), &mut buf)?;
131            (
132                physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(
133                    expr.fun().name().to_string(),
134                ),
135                (!buf.is_empty()).then_some(buf),
136            )
137        } else {
138            return not_impl_err!(
139                "User-defined window function not supported: {window_expr:?}"
140            );
141        }
142    } else {
143        return not_impl_err!("WindowExpr not supported: {window_expr:?}");
144    };
145
146    let args = serialize_physical_exprs(&args, codec)?;
147    let partition_by = serialize_physical_exprs(window_expr.partition_by(), codec)?;
148    let order_by = serialize_physical_sort_exprs(window_expr.order_by().to_vec(), codec)?;
149    let window_frame: protobuf::WindowFrame = window_frame
150        .as_ref()
151        .try_into()
152        .map_err(|e| DataFusionError::Internal(format!("{e}")))?;
153
154    Ok(protobuf::PhysicalWindowExprNode {
155        args,
156        partition_by,
157        order_by,
158        window_frame: Some(window_frame),
159        window_function: Some(window_function),
160        name: window_expr.name().to_string(),
161        fun_definition,
162    })
163}
164
165pub fn serialize_physical_sort_exprs<I>(
166    sort_exprs: I,
167    codec: &dyn PhysicalExtensionCodec,
168) -> Result<Vec<PhysicalSortExprNode>>
169where
170    I: IntoIterator<Item = PhysicalSortExpr>,
171{
172    sort_exprs
173        .into_iter()
174        .map(|sort_expr| serialize_physical_sort_expr(sort_expr, codec))
175        .collect()
176}
177
178pub fn serialize_physical_sort_expr(
179    sort_expr: PhysicalSortExpr,
180    codec: &dyn PhysicalExtensionCodec,
181) -> Result<PhysicalSortExprNode> {
182    let PhysicalSortExpr { expr, options } = sort_expr;
183    let expr = serialize_physical_expr(&expr, codec)?;
184    Ok(PhysicalSortExprNode {
185        expr: Some(Box::new(expr)),
186        asc: !options.descending,
187        nulls_first: options.nulls_first,
188    })
189}
190
191pub fn serialize_physical_exprs<'a, I>(
192    values: I,
193    codec: &dyn PhysicalExtensionCodec,
194) -> Result<Vec<protobuf::PhysicalExprNode>>
195where
196    I: IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
197{
198    values
199        .into_iter()
200        .map(|value| serialize_physical_expr(value, codec))
201        .collect()
202}
203
204/// Serialize a `PhysicalExpr` to default protobuf representation.
205///
206/// If required, a [`PhysicalExtensionCodec`] can be provided which can handle
207/// serialization of udfs requiring specialized serialization (see [`PhysicalExtensionCodec::try_encode_udf`])
208pub fn serialize_physical_expr(
209    value: &Arc<dyn PhysicalExpr>,
210    codec: &dyn PhysicalExtensionCodec,
211) -> Result<protobuf::PhysicalExprNode> {
212    // Snapshot the expr in case it has dynamic predicate state so
213    // it can be serialized
214    let value = snapshot_physical_expr(Arc::clone(value))?;
215    let expr = value.as_any();
216
217    if let Some(expr) = expr.downcast_ref::<Column>() {
218        Ok(protobuf::PhysicalExprNode {
219            expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
220                protobuf::PhysicalColumn {
221                    name: expr.name().to_string(),
222                    index: expr.index() as u32,
223                },
224            )),
225        })
226    } else if let Some(expr) = expr.downcast_ref::<UnKnownColumn>() {
227        Ok(protobuf::PhysicalExprNode {
228            expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn(
229                protobuf::UnknownColumn {
230                    name: expr.name().to_string(),
231                },
232            )),
233        })
234    } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
235        let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
236            l: Some(Box::new(serialize_physical_expr(expr.left(), codec)?)),
237            r: Some(Box::new(serialize_physical_expr(expr.right(), codec)?)),
238            op: format!("{:?}", expr.op()),
239        });
240
241        Ok(protobuf::PhysicalExprNode {
242            expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
243                binary_expr,
244            )),
245        })
246    } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
247        Ok(protobuf::PhysicalExprNode {
248            expr_type: Some(
249                protobuf::physical_expr_node::ExprType::Case(
250                    Box::new(
251                        protobuf::PhysicalCaseNode {
252                            expr: expr
253                                .expr()
254                                .map(|exp| {
255                                    serialize_physical_expr(exp, codec).map(Box::new)
256                                })
257                                .transpose()?,
258                            when_then_expr: expr
259                                .when_then_expr()
260                                .iter()
261                                .map(|(when_expr, then_expr)| {
262                                    serialize_when_then_expr(when_expr, then_expr, codec)
263                                })
264                                .collect::<Result<
265                                    Vec<protobuf::PhysicalWhenThen>,
266                                    DataFusionError,
267                                >>()?,
268                            else_expr: expr
269                                .else_expr()
270                                .map(|a| serialize_physical_expr(a, codec).map(Box::new))
271                                .transpose()?,
272                        },
273                    ),
274                ),
275            ),
276        })
277    } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
278        Ok(protobuf::PhysicalExprNode {
279            expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
280                protobuf::PhysicalNot {
281                    expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
282                },
283            ))),
284        })
285    } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
286        Ok(protobuf::PhysicalExprNode {
287            expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
288                Box::new(protobuf::PhysicalIsNull {
289                    expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
290                }),
291            )),
292        })
293    } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
294        Ok(protobuf::PhysicalExprNode {
295            expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
296                Box::new(protobuf::PhysicalIsNotNull {
297                    expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
298                }),
299            )),
300        })
301    } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
302        Ok(protobuf::PhysicalExprNode {
303            expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new(
304                protobuf::PhysicalInListNode {
305                    expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)),
306                    list: serialize_physical_exprs(expr.list(), codec)?,
307                    negated: expr.negated(),
308                },
309            ))),
310        })
311    } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
312        Ok(protobuf::PhysicalExprNode {
313            expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
314                protobuf::PhysicalNegativeNode {
315                    expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
316                },
317            ))),
318        })
319    } else if let Some(lit) = expr.downcast_ref::<Literal>() {
320        Ok(protobuf::PhysicalExprNode {
321            expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
322                lit.value().try_into()?,
323            )),
324        })
325    } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
326        Ok(protobuf::PhysicalExprNode {
327            expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
328                protobuf::PhysicalCastNode {
329                    expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)),
330                    arrow_type: Some(cast.cast_type().try_into()?),
331                },
332            ))),
333        })
334    } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
335        Ok(protobuf::PhysicalExprNode {
336            expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new(
337                protobuf::PhysicalTryCastNode {
338                    expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)),
339                    arrow_type: Some(cast.cast_type().try_into()?),
340                },
341            ))),
342        })
343    } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
344        let mut buf = Vec::new();
345        codec.try_encode_udf(expr.fun(), &mut buf)?;
346        Ok(protobuf::PhysicalExprNode {
347            expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf(
348                protobuf::PhysicalScalarUdfNode {
349                    name: expr.name().to_string(),
350                    args: serialize_physical_exprs(expr.args(), codec)?,
351                    fun_definition: (!buf.is_empty()).then_some(buf),
352                    return_type: Some(expr.return_type().try_into()?),
353                    nullable: expr.nullable(),
354                },
355            )),
356        })
357    } else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
358        Ok(protobuf::PhysicalExprNode {
359            expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new(
360                protobuf::PhysicalLikeExprNode {
361                    negated: expr.negated(),
362                    case_insensitive: expr.case_insensitive(),
363                    expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)),
364                    pattern: Some(Box::new(serialize_physical_expr(
365                        expr.pattern(),
366                        codec,
367                    )?)),
368                },
369            ))),
370        })
371    } else {
372        let mut buf: Vec<u8> = vec![];
373        match codec.try_encode_expr(&value, &mut buf) {
374            Ok(_) => {
375                let inputs: Vec<protobuf::PhysicalExprNode> = value
376                    .children()
377                    .into_iter()
378                    .map(|e| serialize_physical_expr(e, codec))
379                    .collect::<Result<_>>()?;
380                Ok(protobuf::PhysicalExprNode {
381                    expr_type: Some(protobuf::physical_expr_node::ExprType::Extension(
382                        protobuf::PhysicalExtensionExprNode { expr: buf, inputs },
383                    )),
384                })
385            }
386            Err(e) => internal_err!(
387                "Unsupported physical expr and extension codec failed with [{e}]. Expr: {value:?}"
388            ),
389        }
390    }
391}
392
393pub fn serialize_partitioning(
394    partitioning: &Partitioning,
395    codec: &dyn PhysicalExtensionCodec,
396) -> Result<protobuf::Partitioning> {
397    let serialized_partitioning = match partitioning {
398        Partitioning::RoundRobinBatch(partition_count) => protobuf::Partitioning {
399            partition_method: Some(protobuf::partitioning::PartitionMethod::RoundRobin(
400                *partition_count as u64,
401            )),
402        },
403        Partitioning::Hash(exprs, partition_count) => {
404            let serialized_exprs = serialize_physical_exprs(exprs, codec)?;
405            protobuf::Partitioning {
406                partition_method: Some(protobuf::partitioning::PartitionMethod::Hash(
407                    protobuf::PhysicalHashRepartition {
408                        hash_expr: serialized_exprs,
409                        partition_count: *partition_count as u64,
410                    },
411                )),
412            }
413        }
414        Partitioning::UnknownPartitioning(partition_count) => protobuf::Partitioning {
415            partition_method: Some(protobuf::partitioning::PartitionMethod::Unknown(
416                *partition_count as u64,
417            )),
418        },
419    };
420    Ok(serialized_partitioning)
421}
422
423fn serialize_when_then_expr(
424    when_expr: &Arc<dyn PhysicalExpr>,
425    then_expr: &Arc<dyn PhysicalExpr>,
426    codec: &dyn PhysicalExtensionCodec,
427) -> Result<protobuf::PhysicalWhenThen> {
428    Ok(protobuf::PhysicalWhenThen {
429        when_expr: Some(serialize_physical_expr(when_expr, codec)?),
430        then_expr: Some(serialize_physical_expr(then_expr, codec)?),
431    })
432}
433
434impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile {
435    type Error = DataFusionError;
436
437    fn try_from(pf: &PartitionedFile) -> Result<Self> {
438        let last_modified = pf.object_meta.last_modified;
439        let last_modified_ns = last_modified.timestamp_nanos_opt().ok_or_else(|| {
440            DataFusionError::Plan(format!(
441                "Invalid timestamp on PartitionedFile::ObjectMeta: {last_modified}"
442            ))
443        })? as u64;
444        Ok(protobuf::PartitionedFile {
445            path: pf.object_meta.location.as_ref().to_owned(),
446            size: pf.object_meta.size,
447            last_modified_ns,
448            partition_values: pf
449                .partition_values
450                .iter()
451                .map(|v| v.try_into())
452                .collect::<Result<Vec<_>, _>>()?,
453            range: pf.range.as_ref().map(|r| r.try_into()).transpose()?,
454            statistics: pf.statistics.as_ref().map(|s| s.as_ref().into()),
455        })
456    }
457}
458
459impl TryFrom<&FileRange> for protobuf::FileRange {
460    type Error = DataFusionError;
461
462    fn try_from(value: &FileRange) -> Result<Self> {
463        Ok(protobuf::FileRange {
464            start: value.start,
465            end: value.end,
466        })
467    }
468}
469
470impl TryFrom<&[PartitionedFile]> for protobuf::FileGroup {
471    type Error = DataFusionError;
472
473    fn try_from(gr: &[PartitionedFile]) -> Result<Self, Self::Error> {
474        Ok(protobuf::FileGroup {
475            files: gr
476                .iter()
477                .map(|f| f.try_into())
478                .collect::<Result<Vec<_>, _>>()?,
479        })
480    }
481}
482
483pub fn serialize_file_scan_config(
484    conf: &FileScanConfig,
485    codec: &dyn PhysicalExtensionCodec,
486) -> Result<protobuf::FileScanExecConf> {
487    let file_groups = conf
488        .file_groups
489        .iter()
490        .map(|p| p.files().try_into())
491        .collect::<Result<Vec<_>, _>>()?;
492
493    let mut output_orderings = vec![];
494    for order in &conf.output_ordering {
495        let ordering = serialize_physical_sort_exprs(order.to_vec(), codec)?;
496        output_orderings.push(ordering)
497    }
498
499    // Fields must be added to the schema so that they can persist in the protobuf,
500    // and then they are to be removed from the schema in `parse_protobuf_file_scan_config`
501    let mut fields = conf
502        .file_schema
503        .fields()
504        .iter()
505        .cloned()
506        .collect::<Vec<_>>();
507    fields.extend(conf.table_partition_cols.iter().cloned());
508    let schema = Arc::new(arrow::datatypes::Schema::new(fields.clone()));
509
510    Ok(protobuf::FileScanExecConf {
511        file_groups,
512        statistics: Some((&conf.file_source.statistics().unwrap()).into()),
513        limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }),
514        projection: conf
515            .projection
516            .as_ref()
517            .unwrap_or(&(0..schema.fields().len()).collect::<Vec<_>>())
518            .iter()
519            .map(|n| *n as u32)
520            .collect(),
521        schema: Some(schema.as_ref().try_into()?),
522        table_partition_cols: conf
523            .table_partition_cols
524            .iter()
525            .map(|x| x.name().clone())
526            .collect::<Vec<_>>(),
527        object_store_url: conf.object_store_url.to_string(),
528        output_ordering: output_orderings
529            .into_iter()
530            .map(|e| PhysicalSortExprNodeCollection {
531                physical_sort_expr_nodes: e,
532            })
533            .collect::<Vec<_>>(),
534        constraints: Some(conf.constraints.clone().into()),
535        batch_size: conf.batch_size.map(|s| s as u64),
536    })
537}
538
539pub fn serialize_maybe_filter(
540    expr: Option<Arc<dyn PhysicalExpr>>,
541    codec: &dyn PhysicalExtensionCodec,
542) -> Result<protobuf::MaybeFilter> {
543    match expr {
544        None => Ok(protobuf::MaybeFilter { expr: None }),
545        Some(expr) => Ok(protobuf::MaybeFilter {
546            expr: Some(serialize_physical_expr(&expr, codec)?),
547        }),
548    }
549}
550
551impl TryFrom<&JsonSink> for protobuf::JsonSink {
552    type Error = DataFusionError;
553
554    fn try_from(value: &JsonSink) -> Result<Self, Self::Error> {
555        Ok(Self {
556            config: Some(value.config().try_into()?),
557            writer_options: Some(value.writer_options().try_into()?),
558        })
559    }
560}
561
562impl TryFrom<&CsvSink> for protobuf::CsvSink {
563    type Error = DataFusionError;
564
565    fn try_from(value: &CsvSink) -> Result<Self, Self::Error> {
566        Ok(Self {
567            config: Some(value.config().try_into()?),
568            writer_options: Some(value.writer_options().try_into()?),
569        })
570    }
571}
572
573#[cfg(feature = "parquet")]
574impl TryFrom<&ParquetSink> for protobuf::ParquetSink {
575    type Error = DataFusionError;
576
577    fn try_from(value: &ParquetSink) -> Result<Self, Self::Error> {
578        Ok(Self {
579            config: Some(value.config().try_into()?),
580            parquet_options: Some(value.parquet_options().try_into()?),
581        })
582    }
583}
584
585impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
586    type Error = DataFusionError;
587
588    fn try_from(conf: &FileSinkConfig) -> Result<Self, Self::Error> {
589        let file_groups = conf
590            .file_group
591            .iter()
592            .map(TryInto::try_into)
593            .collect::<Result<Vec<_>>>()?;
594        let table_paths = conf
595            .table_paths
596            .iter()
597            .map(ToString::to_string)
598            .collect::<Vec<_>>();
599        let table_partition_cols = conf
600            .table_partition_cols
601            .iter()
602            .map(|(name, data_type)| {
603                Ok(protobuf::PartitionColumn {
604                    name: name.to_owned(),
605                    arrow_type: Some(data_type.try_into()?),
606                })
607            })
608            .collect::<Result<Vec<_>>>()?;
609        Ok(Self {
610            object_store_url: conf.object_store_url.to_string(),
611            file_groups,
612            table_paths,
613            output_schema: Some(conf.output_schema.as_ref().try_into()?),
614            table_partition_cols,
615            keep_partition_by_columns: conf.keep_partition_by_columns,
616            insert_op: conf.insert_op as i32,
617            file_extension: conf.file_extension.to_string(),
618        })
619    }
620}