datafusion_proto/physical_plan/
from_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Serde code to convert from protocol buffers to Rust data structures.
19
20use std::sync::Arc;
21
22use arrow::array::RecordBatch;
23use arrow::compute::SortOptions;
24use arrow::datatypes::Field;
25use arrow::ipc::reader::StreamReader;
26use chrono::{TimeZone, Utc};
27use datafusion_expr::dml::InsertOp;
28use object_store::path::Path;
29use object_store::ObjectMeta;
30
31use datafusion::arrow::datatypes::Schema;
32use datafusion::datasource::file_format::csv::CsvSink;
33use datafusion::datasource::file_format::json::JsonSink;
34#[cfg(feature = "parquet")]
35use datafusion::datasource::file_format::parquet::ParquetSink;
36use datafusion::datasource::listing::{FileRange, ListingTableUrl, PartitionedFile};
37use datafusion::datasource::object_store::ObjectStoreUrl;
38use datafusion::datasource::physical_plan::{
39    FileGroup, FileScanConfig, FileScanConfigBuilder, FileSinkConfig, FileSource,
40};
41use datafusion::execution::FunctionRegistry;
42use datafusion::logical_expr::WindowFunctionDefinition;
43use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
44use datafusion::physical_plan::expressions::{
45    in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr,
46    Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
47};
48use datafusion::physical_plan::windows::{create_window_expr, schema_add_window_field};
49use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
50use datafusion::prelude::SessionContext;
51use datafusion_common::config::ConfigOptions;
52use datafusion_common::{not_impl_err, DataFusionError, Result};
53use datafusion_proto_common::common::proto_error;
54
55use crate::convert_required;
56use crate::logical_plan::{self};
57use crate::protobuf;
58use crate::protobuf::physical_expr_node::ExprType;
59
60use super::PhysicalExtensionCodec;
61
62impl From<&protobuf::PhysicalColumn> for Column {
63    fn from(c: &protobuf::PhysicalColumn) -> Column {
64        Column::new(&c.name, c.index as usize)
65    }
66}
67
68/// Parses a physical sort expression from a protobuf.
69///
70/// # Arguments
71///
72/// * `proto` - Input proto with physical sort expression node
73/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
74/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
75///   when performing type coercion.
76/// * `codec` - An extension codec used to decode custom UDFs.
77pub fn parse_physical_sort_expr(
78    proto: &protobuf::PhysicalSortExprNode,
79    ctx: &SessionContext,
80    input_schema: &Schema,
81    codec: &dyn PhysicalExtensionCodec,
82) -> Result<PhysicalSortExpr> {
83    if let Some(expr) = &proto.expr {
84        let expr = parse_physical_expr(expr.as_ref(), ctx, input_schema, codec)?;
85        let options = SortOptions {
86            descending: !proto.asc,
87            nulls_first: proto.nulls_first,
88        };
89        Ok(PhysicalSortExpr { expr, options })
90    } else {
91        Err(proto_error("Unexpected empty physical expression"))
92    }
93}
94
95/// Parses a physical sort expressions from a protobuf.
96///
97/// # Arguments
98///
99/// * `proto` - Input proto with vector of physical sort expression node
100/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
101/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
102///   when performing type coercion.
103/// * `codec` - An extension codec used to decode custom UDFs.
104pub fn parse_physical_sort_exprs(
105    proto: &[protobuf::PhysicalSortExprNode],
106    ctx: &SessionContext,
107    input_schema: &Schema,
108    codec: &dyn PhysicalExtensionCodec,
109) -> Result<Vec<PhysicalSortExpr>> {
110    proto
111        .iter()
112        .map(|sort_expr| parse_physical_sort_expr(sort_expr, ctx, input_schema, codec))
113        .collect()
114}
115
116/// Parses a physical window expr from a protobuf.
117///
118/// # Arguments
119///
120/// * `proto` - Input proto with physical window expression node.
121/// * `name` - Name of the window expression.
122/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
123/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
124///   when performing type coercion.
125/// * `codec` - An extension codec used to decode custom UDFs.
126pub fn parse_physical_window_expr(
127    proto: &protobuf::PhysicalWindowExprNode,
128    ctx: &SessionContext,
129    input_schema: &Schema,
130    codec: &dyn PhysicalExtensionCodec,
131) -> Result<Arc<dyn WindowExpr>> {
132    let window_node_expr = parse_physical_exprs(&proto.args, ctx, input_schema, codec)?;
133    let partition_by =
134        parse_physical_exprs(&proto.partition_by, ctx, input_schema, codec)?;
135
136    let order_by = parse_physical_sort_exprs(&proto.order_by, ctx, input_schema, codec)?;
137
138    let window_frame = proto
139        .window_frame
140        .as_ref()
141        .map(|wf| wf.clone().try_into())
142        .transpose()
143        .map_err(|e| DataFusionError::Internal(format!("{e}")))?
144        .ok_or_else(|| {
145            DataFusionError::Internal(
146                "Missing required field 'window_frame' in protobuf".to_string(),
147            )
148        })?;
149
150    let fun = if let Some(window_func) = proto.window_function.as_ref() {
151        match window_func {
152            protobuf::physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(udaf_name) => {
153                WindowFunctionDefinition::AggregateUDF(match &proto.fun_definition {
154                    Some(buf) => codec.try_decode_udaf(udaf_name, buf)?,
155                    None => ctx.udaf(udaf_name).or_else(|_| codec.try_decode_udaf(udaf_name, &[]))?,
156                })
157            }
158            protobuf::physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(udwf_name) => {
159                WindowFunctionDefinition::WindowUDF(match &proto.fun_definition {
160                    Some(buf) => codec.try_decode_udwf(udwf_name, buf)?,
161                    None => ctx.udwf(udwf_name).or_else(|_| codec.try_decode_udwf(udwf_name, &[]))?
162                })
163            }
164        }
165    } else {
166        return Err(proto_error("Missing required field in protobuf"));
167    };
168
169    let name = proto.name.clone();
170    // TODO: Remove extended_schema if functions are all UDAF
171    let extended_schema =
172        schema_add_window_field(&window_node_expr, input_schema, &fun, &name)?;
173    create_window_expr(
174        &fun,
175        name,
176        &window_node_expr,
177        &partition_by,
178        &order_by,
179        Arc::new(window_frame),
180        &extended_schema,
181        proto.ignore_nulls,
182        proto.distinct,
183        None,
184    )
185}
186
187pub fn parse_physical_exprs<'a, I>(
188    protos: I,
189    ctx: &SessionContext,
190    input_schema: &Schema,
191    codec: &dyn PhysicalExtensionCodec,
192) -> Result<Vec<Arc<dyn PhysicalExpr>>>
193where
194    I: IntoIterator<Item = &'a protobuf::PhysicalExprNode>,
195{
196    protos
197        .into_iter()
198        .map(|p| parse_physical_expr(p, ctx, input_schema, codec))
199        .collect::<Result<Vec<_>>>()
200}
201
202/// Parses a physical expression from a protobuf.
203///
204/// # Arguments
205///
206/// * `proto` - Input proto with physical expression node
207/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
208/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
209///   when performing type coercion.
210/// * `codec` - An extension codec used to decode custom UDFs.
211pub fn parse_physical_expr(
212    proto: &protobuf::PhysicalExprNode,
213    ctx: &SessionContext,
214    input_schema: &Schema,
215    codec: &dyn PhysicalExtensionCodec,
216) -> Result<Arc<dyn PhysicalExpr>> {
217    let expr_type = proto
218        .expr_type
219        .as_ref()
220        .ok_or_else(|| proto_error("Unexpected empty physical expression"))?;
221
222    let pexpr: Arc<dyn PhysicalExpr> = match expr_type {
223        ExprType::Column(c) => {
224            let pcol: Column = c.into();
225            Arc::new(pcol)
226        }
227        ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
228        ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
229        ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new(
230            parse_required_physical_expr(
231                binary_expr.l.as_deref(),
232                ctx,
233                "left",
234                input_schema,
235                codec,
236            )?,
237            logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?,
238            parse_required_physical_expr(
239                binary_expr.r.as_deref(),
240                ctx,
241                "right",
242                input_schema,
243                codec,
244            )?,
245        )),
246        ExprType::AggregateExpr(_) => {
247            return not_impl_err!(
248                "Cannot convert aggregate expr node to physical expression"
249            );
250        }
251        ExprType::WindowExpr(_) => {
252            return not_impl_err!(
253                "Cannot convert window expr node to physical expression"
254            );
255        }
256        ExprType::Sort(_) => {
257            return not_impl_err!("Cannot convert sort expr node to physical expression");
258        }
259        ExprType::IsNullExpr(e) => {
260            Arc::new(IsNullExpr::new(parse_required_physical_expr(
261                e.expr.as_deref(),
262                ctx,
263                "expr",
264                input_schema,
265                codec,
266            )?))
267        }
268        ExprType::IsNotNullExpr(e) => {
269            Arc::new(IsNotNullExpr::new(parse_required_physical_expr(
270                e.expr.as_deref(),
271                ctx,
272                "expr",
273                input_schema,
274                codec,
275            )?))
276        }
277        ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr(
278            e.expr.as_deref(),
279            ctx,
280            "expr",
281            input_schema,
282            codec,
283        )?)),
284        ExprType::Negative(e) => {
285            Arc::new(NegativeExpr::new(parse_required_physical_expr(
286                e.expr.as_deref(),
287                ctx,
288                "expr",
289                input_schema,
290                codec,
291            )?))
292        }
293        ExprType::InList(e) => in_list(
294            parse_required_physical_expr(
295                e.expr.as_deref(),
296                ctx,
297                "expr",
298                input_schema,
299                codec,
300            )?,
301            parse_physical_exprs(&e.list, ctx, input_schema, codec)?,
302            &e.negated,
303            input_schema,
304        )?,
305        ExprType::Case(e) => Arc::new(CaseExpr::try_new(
306            e.expr
307                .as_ref()
308                .map(|e| parse_physical_expr(e.as_ref(), ctx, input_schema, codec))
309                .transpose()?,
310            e.when_then_expr
311                .iter()
312                .map(|e| {
313                    Ok((
314                        parse_required_physical_expr(
315                            e.when_expr.as_ref(),
316                            ctx,
317                            "when_expr",
318                            input_schema,
319                            codec,
320                        )?,
321                        parse_required_physical_expr(
322                            e.then_expr.as_ref(),
323                            ctx,
324                            "then_expr",
325                            input_schema,
326                            codec,
327                        )?,
328                    ))
329                })
330                .collect::<Result<Vec<_>>>()?,
331            e.else_expr
332                .as_ref()
333                .map(|e| parse_physical_expr(e.as_ref(), ctx, input_schema, codec))
334                .transpose()?,
335        )?),
336        ExprType::Cast(e) => Arc::new(CastExpr::new(
337            parse_required_physical_expr(
338                e.expr.as_deref(),
339                ctx,
340                "expr",
341                input_schema,
342                codec,
343            )?,
344            convert_required!(e.arrow_type)?,
345            None,
346        )),
347        ExprType::TryCast(e) => Arc::new(TryCastExpr::new(
348            parse_required_physical_expr(
349                e.expr.as_deref(),
350                ctx,
351                "expr",
352                input_schema,
353                codec,
354            )?,
355            convert_required!(e.arrow_type)?,
356        )),
357        ExprType::ScalarUdf(e) => {
358            let udf = match &e.fun_definition {
359                Some(buf) => codec.try_decode_udf(&e.name, buf)?,
360                None => ctx
361                    .udf(e.name.as_str())
362                    .or_else(|_| codec.try_decode_udf(&e.name, &[]))?,
363            };
364            let scalar_fun_def = Arc::clone(&udf);
365
366            let args = parse_physical_exprs(&e.args, ctx, input_schema, codec)?;
367            let config_options =
368                match ctx.state().execution_props().config_options.as_ref() {
369                    Some(config_options) => Arc::clone(config_options),
370                    None => Arc::new(ConfigOptions::default()),
371                };
372
373            Arc::new(
374                ScalarFunctionExpr::new(
375                    e.name.as_str(),
376                    scalar_fun_def,
377                    args,
378                    Field::new(
379                        &e.return_field_name,
380                        convert_required!(e.return_type)?,
381                        true,
382                    )
383                    .into(),
384                    config_options,
385                )
386                .with_nullable(e.nullable),
387            )
388        }
389        ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new(
390            like_expr.negated,
391            like_expr.case_insensitive,
392            parse_required_physical_expr(
393                like_expr.expr.as_deref(),
394                ctx,
395                "expr",
396                input_schema,
397                codec,
398            )?,
399            parse_required_physical_expr(
400                like_expr.pattern.as_deref(),
401                ctx,
402                "pattern",
403                input_schema,
404                codec,
405            )?,
406        )),
407        ExprType::Extension(extension) => {
408            let inputs: Vec<Arc<dyn PhysicalExpr>> = extension
409                .inputs
410                .iter()
411                .map(|e| parse_physical_expr(e, ctx, input_schema, codec))
412                .collect::<Result<_>>()?;
413            (codec.try_decode_expr(extension.expr.as_slice(), &inputs)?) as _
414        }
415    };
416
417    Ok(pexpr)
418}
419
420fn parse_required_physical_expr(
421    expr: Option<&protobuf::PhysicalExprNode>,
422    ctx: &SessionContext,
423    field: &str,
424    input_schema: &Schema,
425    codec: &dyn PhysicalExtensionCodec,
426) -> Result<Arc<dyn PhysicalExpr>> {
427    expr.map(|e| parse_physical_expr(e, ctx, input_schema, codec))
428        .transpose()?
429        .ok_or_else(|| {
430            DataFusionError::Internal(format!("Missing required field {field:?}"))
431        })
432}
433
434pub fn parse_protobuf_hash_partitioning(
435    partitioning: Option<&protobuf::PhysicalHashRepartition>,
436    ctx: &SessionContext,
437    input_schema: &Schema,
438    codec: &dyn PhysicalExtensionCodec,
439) -> Result<Option<Partitioning>> {
440    match partitioning {
441        Some(hash_part) => {
442            let expr =
443                parse_physical_exprs(&hash_part.hash_expr, ctx, input_schema, codec)?;
444
445            Ok(Some(Partitioning::Hash(
446                expr,
447                hash_part.partition_count.try_into().unwrap(),
448            )))
449        }
450        None => Ok(None),
451    }
452}
453
454pub fn parse_protobuf_partitioning(
455    partitioning: Option<&protobuf::Partitioning>,
456    ctx: &SessionContext,
457    input_schema: &Schema,
458    codec: &dyn PhysicalExtensionCodec,
459) -> Result<Option<Partitioning>> {
460    match partitioning {
461        Some(protobuf::Partitioning { partition_method }) => match partition_method {
462            Some(protobuf::partitioning::PartitionMethod::RoundRobin(
463                partition_count,
464            )) => Ok(Some(Partitioning::RoundRobinBatch(
465                *partition_count as usize,
466            ))),
467            Some(protobuf::partitioning::PartitionMethod::Hash(hash_repartition)) => {
468                parse_protobuf_hash_partitioning(
469                    Some(hash_repartition),
470                    ctx,
471                    input_schema,
472                    codec,
473                )
474            }
475            Some(protobuf::partitioning::PartitionMethod::Unknown(partition_count)) => {
476                Ok(Some(Partitioning::UnknownPartitioning(
477                    *partition_count as usize,
478                )))
479            }
480            None => Ok(None),
481        },
482        None => Ok(None),
483    }
484}
485
486pub fn parse_protobuf_file_scan_schema(
487    proto: &protobuf::FileScanExecConf,
488) -> Result<Arc<Schema>> {
489    Ok(Arc::new(convert_required!(proto.schema)?))
490}
491
492pub fn parse_protobuf_file_scan_config(
493    proto: &protobuf::FileScanExecConf,
494    ctx: &SessionContext,
495    codec: &dyn PhysicalExtensionCodec,
496    file_source: Arc<dyn FileSource>,
497) -> Result<FileScanConfig> {
498    let schema: Arc<Schema> = parse_protobuf_file_scan_schema(proto)?;
499    let projection = proto
500        .projection
501        .iter()
502        .map(|i| *i as usize)
503        .collect::<Vec<_>>();
504
505    let constraints = convert_required!(proto.constraints)?;
506    let statistics = convert_required!(proto.statistics)?;
507
508    let file_groups = proto
509        .file_groups
510        .iter()
511        .map(|f| f.try_into())
512        .collect::<Result<Vec<_>, _>>()?;
513
514    let object_store_url = match proto.object_store_url.is_empty() {
515        false => ObjectStoreUrl::parse(&proto.object_store_url)?,
516        true => ObjectStoreUrl::local_filesystem(),
517    };
518
519    // Reacquire the partition column types from the schema before removing them below.
520    let table_partition_cols = proto
521        .table_partition_cols
522        .iter()
523        .map(|col| Ok(schema.field_with_name(col)?.clone()))
524        .collect::<Result<Vec<_>>>()?;
525
526    // Remove partition columns from the schema after recreating table_partition_cols
527    // because the partition columns are not in the file. They are present to allow
528    // the partition column types to be reconstructed after serde.
529    let file_schema = Arc::new(Schema::new(
530        schema
531            .fields()
532            .iter()
533            .filter(|field| !table_partition_cols.contains(field))
534            .cloned()
535            .collect::<Vec<_>>(),
536    ));
537
538    let mut output_ordering = vec![];
539    for node_collection in &proto.output_ordering {
540        let sort_exprs = parse_physical_sort_exprs(
541            &node_collection.physical_sort_expr_nodes,
542            ctx,
543            &schema,
544            codec,
545        )?;
546        output_ordering.extend(LexOrdering::new(sort_exprs));
547    }
548
549    let config = FileScanConfigBuilder::new(object_store_url, file_schema, file_source)
550        .with_file_groups(file_groups)
551        .with_constraints(constraints)
552        .with_statistics(statistics)
553        .with_projection(Some(projection))
554        .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize))
555        .with_table_partition_cols(table_partition_cols)
556        .with_output_ordering(output_ordering)
557        .with_batch_size(proto.batch_size.map(|s| s as usize))
558        .build();
559    Ok(config)
560}
561
562pub fn parse_record_batches(buf: &[u8]) -> Result<Vec<RecordBatch>> {
563    if buf.is_empty() {
564        return Ok(vec![]);
565    }
566    let reader = StreamReader::try_new(buf, None)?;
567    let mut batches = Vec::new();
568    for batch in reader {
569        batches.push(batch?);
570    }
571    Ok(batches)
572}
573
574impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
575    type Error = DataFusionError;
576
577    fn try_from(val: &protobuf::PartitionedFile) -> Result<Self, Self::Error> {
578        Ok(PartitionedFile {
579            object_meta: ObjectMeta {
580                location: Path::from(val.path.as_str()),
581                last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64),
582                size: val.size,
583                e_tag: None,
584                version: None,
585            },
586            partition_values: val
587                .partition_values
588                .iter()
589                .map(|v| v.try_into())
590                .collect::<Result<Vec<_>, _>>()?,
591            range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
592            statistics: val
593                .statistics
594                .as_ref()
595                .map(|v| v.try_into().map(Arc::new))
596                .transpose()?,
597            extensions: None,
598            metadata_size_hint: None,
599        })
600    }
601}
602
603impl TryFrom<&protobuf::FileRange> for FileRange {
604    type Error = DataFusionError;
605
606    fn try_from(value: &protobuf::FileRange) -> Result<Self, Self::Error> {
607        Ok(FileRange {
608            start: value.start,
609            end: value.end,
610        })
611    }
612}
613
614impl TryFrom<&protobuf::FileGroup> for FileGroup {
615    type Error = DataFusionError;
616
617    fn try_from(val: &protobuf::FileGroup) -> Result<Self, Self::Error> {
618        let files = val
619            .files
620            .iter()
621            .map(|f| f.try_into())
622            .collect::<Result<Vec<_>, _>>()?;
623        Ok(FileGroup::new(files))
624    }
625}
626
627impl TryFrom<&protobuf::JsonSink> for JsonSink {
628    type Error = DataFusionError;
629
630    fn try_from(value: &protobuf::JsonSink) -> Result<Self, Self::Error> {
631        Ok(Self::new(
632            convert_required!(value.config)?,
633            convert_required!(value.writer_options)?,
634        ))
635    }
636}
637
638#[cfg(feature = "parquet")]
639impl TryFrom<&protobuf::ParquetSink> for ParquetSink {
640    type Error = DataFusionError;
641
642    fn try_from(value: &protobuf::ParquetSink) -> Result<Self, Self::Error> {
643        Ok(Self::new(
644            convert_required!(value.config)?,
645            convert_required!(value.parquet_options)?,
646        ))
647    }
648}
649
650impl TryFrom<&protobuf::CsvSink> for CsvSink {
651    type Error = DataFusionError;
652
653    fn try_from(value: &protobuf::CsvSink) -> Result<Self, Self::Error> {
654        Ok(Self::new(
655            convert_required!(value.config)?,
656            convert_required!(value.writer_options)?,
657        ))
658    }
659}
660
661impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
662    type Error = DataFusionError;
663
664    fn try_from(conf: &protobuf::FileSinkConfig) -> Result<Self, Self::Error> {
665        let file_group = FileGroup::new(
666            conf.file_groups
667                .iter()
668                .map(|f| f.try_into())
669                .collect::<Result<Vec<_>>>()?,
670        );
671        let table_paths = conf
672            .table_paths
673            .iter()
674            .map(ListingTableUrl::parse)
675            .collect::<Result<Vec<_>>>()?;
676        let table_partition_cols = conf
677            .table_partition_cols
678            .iter()
679            .map(|protobuf::PartitionColumn { name, arrow_type }| {
680                let data_type = convert_required!(arrow_type)?;
681                Ok((name.clone(), data_type))
682            })
683            .collect::<Result<Vec<_>>>()?;
684        let insert_op = match conf.insert_op() {
685            protobuf::InsertOp::Append => InsertOp::Append,
686            protobuf::InsertOp::Overwrite => InsertOp::Overwrite,
687            protobuf::InsertOp::Replace => InsertOp::Replace,
688        };
689        Ok(Self {
690            original_url: String::default(),
691            object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?,
692            file_group,
693            table_paths,
694            output_schema: Arc::new(convert_required!(conf.output_schema)?),
695            table_partition_cols,
696            insert_op,
697            keep_partition_by_columns: conf.keep_partition_by_columns,
698            file_extension: conf.file_extension.clone(),
699        })
700    }
701}