datafusion_proto/physical_plan/
from_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Serde code to convert from protocol buffers to Rust data structures.
19
20use std::sync::Arc;
21
22use arrow::array::RecordBatch;
23use arrow::compute::SortOptions;
24use arrow::datatypes::Field;
25use arrow::ipc::reader::StreamReader;
26use chrono::{TimeZone, Utc};
27use datafusion_expr::dml::InsertOp;
28use object_store::path::Path;
29use object_store::ObjectMeta;
30
31use arrow::datatypes::Schema;
32use datafusion_common::{internal_datafusion_err, not_impl_err, DataFusionError, Result};
33use datafusion_datasource::file::FileSource;
34use datafusion_datasource::file_groups::FileGroup;
35use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
36use datafusion_datasource::file_sink_config::FileSinkConfig;
37use datafusion_datasource::{FileRange, ListingTableUrl, PartitionedFile};
38use datafusion_datasource_csv::file_format::CsvSink;
39use datafusion_datasource_json::file_format::JsonSink;
40#[cfg(feature = "parquet")]
41use datafusion_datasource_parquet::file_format::ParquetSink;
42use datafusion_execution::object_store::ObjectStoreUrl;
43use datafusion_execution::{FunctionRegistry, TaskContext};
44use datafusion_expr::WindowFunctionDefinition;
45use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
46use datafusion_physical_plan::expressions::{
47    in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr,
48    Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
49};
50use datafusion_physical_plan::windows::{create_window_expr, schema_add_window_field};
51use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
52use datafusion_proto_common::common::proto_error;
53
54use crate::convert_required;
55use crate::logical_plan::{self};
56use crate::protobuf;
57use crate::protobuf::physical_expr_node::ExprType;
58
59use super::PhysicalExtensionCodec;
60
61impl From<&protobuf::PhysicalColumn> for Column {
62    fn from(c: &protobuf::PhysicalColumn) -> Column {
63        Column::new(&c.name, c.index as usize)
64    }
65}
66
67/// Parses a physical sort expression from a protobuf.
68///
69/// # Arguments
70///
71/// * `proto` - Input proto with physical sort expression node
72/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
73/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
74///   when performing type coercion.
75/// * `codec` - An extension codec used to decode custom UDFs.
76pub fn parse_physical_sort_expr(
77    proto: &protobuf::PhysicalSortExprNode,
78    ctx: &TaskContext,
79    input_schema: &Schema,
80    codec: &dyn PhysicalExtensionCodec,
81) -> Result<PhysicalSortExpr> {
82    if let Some(expr) = &proto.expr {
83        let expr = parse_physical_expr(expr.as_ref(), ctx, input_schema, codec)?;
84        let options = SortOptions {
85            descending: !proto.asc,
86            nulls_first: proto.nulls_first,
87        };
88        Ok(PhysicalSortExpr { expr, options })
89    } else {
90        Err(proto_error("Unexpected empty physical expression"))
91    }
92}
93
94/// Parses a physical sort expressions from a protobuf.
95///
96/// # Arguments
97///
98/// * `proto` - Input proto with vector of physical sort expression node
99/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
100/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
101///   when performing type coercion.
102/// * `codec` - An extension codec used to decode custom UDFs.
103pub fn parse_physical_sort_exprs(
104    proto: &[protobuf::PhysicalSortExprNode],
105    ctx: &TaskContext,
106    input_schema: &Schema,
107    codec: &dyn PhysicalExtensionCodec,
108) -> Result<Vec<PhysicalSortExpr>> {
109    proto
110        .iter()
111        .map(|sort_expr| parse_physical_sort_expr(sort_expr, ctx, input_schema, codec))
112        .collect()
113}
114
115/// Parses a physical window expr from a protobuf.
116///
117/// # Arguments
118///
119/// * `proto` - Input proto with physical window expression node.
120/// * `name` - Name of the window expression.
121/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
122/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
123///   when performing type coercion.
124/// * `codec` - An extension codec used to decode custom UDFs.
125pub fn parse_physical_window_expr(
126    proto: &protobuf::PhysicalWindowExprNode,
127    ctx: &TaskContext,
128    input_schema: &Schema,
129    codec: &dyn PhysicalExtensionCodec,
130) -> Result<Arc<dyn WindowExpr>> {
131    let window_node_expr = parse_physical_exprs(&proto.args, ctx, input_schema, codec)?;
132    let partition_by =
133        parse_physical_exprs(&proto.partition_by, ctx, input_schema, codec)?;
134
135    let order_by = parse_physical_sort_exprs(&proto.order_by, ctx, input_schema, codec)?;
136
137    let window_frame = proto
138        .window_frame
139        .as_ref()
140        .map(|wf| wf.clone().try_into())
141        .transpose()
142        .map_err(|e| internal_datafusion_err!("{e}"))?
143        .ok_or_else(|| {
144            internal_datafusion_err!("Missing required field 'window_frame' in protobuf")
145        })?;
146
147    let fun = if let Some(window_func) = proto.window_function.as_ref() {
148        match window_func {
149            protobuf::physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(udaf_name) => {
150                WindowFunctionDefinition::AggregateUDF(match &proto.fun_definition {
151                    Some(buf) => codec.try_decode_udaf(udaf_name, buf)?,
152                    None => ctx.udaf(udaf_name).or_else(|_| codec.try_decode_udaf(udaf_name, &[]))?,
153                })
154            }
155            protobuf::physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(udwf_name) => {
156                WindowFunctionDefinition::WindowUDF(match &proto.fun_definition {
157                    Some(buf) => codec.try_decode_udwf(udwf_name, buf)?,
158                    None => ctx.udwf(udwf_name).or_else(|_| codec.try_decode_udwf(udwf_name, &[]))?
159                })
160            }
161        }
162    } else {
163        return Err(proto_error("Missing required field in protobuf"));
164    };
165
166    let name = proto.name.clone();
167    // TODO: Remove extended_schema if functions are all UDAF
168    let extended_schema =
169        schema_add_window_field(&window_node_expr, input_schema, &fun, &name)?;
170    create_window_expr(
171        &fun,
172        name,
173        &window_node_expr,
174        &partition_by,
175        &order_by,
176        Arc::new(window_frame),
177        extended_schema,
178        proto.ignore_nulls,
179        proto.distinct,
180        None,
181    )
182}
183
184pub fn parse_physical_exprs<'a, I>(
185    protos: I,
186    ctx: &TaskContext,
187    input_schema: &Schema,
188    codec: &dyn PhysicalExtensionCodec,
189) -> Result<Vec<Arc<dyn PhysicalExpr>>>
190where
191    I: IntoIterator<Item = &'a protobuf::PhysicalExprNode>,
192{
193    protos
194        .into_iter()
195        .map(|p| parse_physical_expr(p, ctx, input_schema, codec))
196        .collect::<Result<Vec<_>>>()
197}
198
199/// Parses a physical expression from a protobuf.
200///
201/// # Arguments
202///
203/// * `proto` - Input proto with physical expression node
204/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
205/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
206///   when performing type coercion.
207/// * `codec` - An extension codec used to decode custom UDFs.
208pub fn parse_physical_expr(
209    proto: &protobuf::PhysicalExprNode,
210    ctx: &TaskContext,
211    input_schema: &Schema,
212    codec: &dyn PhysicalExtensionCodec,
213) -> Result<Arc<dyn PhysicalExpr>> {
214    let expr_type = proto
215        .expr_type
216        .as_ref()
217        .ok_or_else(|| proto_error("Unexpected empty physical expression"))?;
218
219    let pexpr: Arc<dyn PhysicalExpr> = match expr_type {
220        ExprType::Column(c) => {
221            let pcol: Column = c.into();
222            Arc::new(pcol)
223        }
224        ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
225        ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
226        ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new(
227            parse_required_physical_expr(
228                binary_expr.l.as_deref(),
229                ctx,
230                "left",
231                input_schema,
232                codec,
233            )?,
234            logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?,
235            parse_required_physical_expr(
236                binary_expr.r.as_deref(),
237                ctx,
238                "right",
239                input_schema,
240                codec,
241            )?,
242        )),
243        ExprType::AggregateExpr(_) => {
244            return not_impl_err!(
245                "Cannot convert aggregate expr node to physical expression"
246            );
247        }
248        ExprType::WindowExpr(_) => {
249            return not_impl_err!(
250                "Cannot convert window expr node to physical expression"
251            );
252        }
253        ExprType::Sort(_) => {
254            return not_impl_err!("Cannot convert sort expr node to physical expression");
255        }
256        ExprType::IsNullExpr(e) => {
257            Arc::new(IsNullExpr::new(parse_required_physical_expr(
258                e.expr.as_deref(),
259                ctx,
260                "expr",
261                input_schema,
262                codec,
263            )?))
264        }
265        ExprType::IsNotNullExpr(e) => {
266            Arc::new(IsNotNullExpr::new(parse_required_physical_expr(
267                e.expr.as_deref(),
268                ctx,
269                "expr",
270                input_schema,
271                codec,
272            )?))
273        }
274        ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr(
275            e.expr.as_deref(),
276            ctx,
277            "expr",
278            input_schema,
279            codec,
280        )?)),
281        ExprType::Negative(e) => {
282            Arc::new(NegativeExpr::new(parse_required_physical_expr(
283                e.expr.as_deref(),
284                ctx,
285                "expr",
286                input_schema,
287                codec,
288            )?))
289        }
290        ExprType::InList(e) => in_list(
291            parse_required_physical_expr(
292                e.expr.as_deref(),
293                ctx,
294                "expr",
295                input_schema,
296                codec,
297            )?,
298            parse_physical_exprs(&e.list, ctx, input_schema, codec)?,
299            &e.negated,
300            input_schema,
301        )?,
302        ExprType::Case(e) => Arc::new(CaseExpr::try_new(
303            e.expr
304                .as_ref()
305                .map(|e| parse_physical_expr(e.as_ref(), ctx, input_schema, codec))
306                .transpose()?,
307            e.when_then_expr
308                .iter()
309                .map(|e| {
310                    Ok((
311                        parse_required_physical_expr(
312                            e.when_expr.as_ref(),
313                            ctx,
314                            "when_expr",
315                            input_schema,
316                            codec,
317                        )?,
318                        parse_required_physical_expr(
319                            e.then_expr.as_ref(),
320                            ctx,
321                            "then_expr",
322                            input_schema,
323                            codec,
324                        )?,
325                    ))
326                })
327                .collect::<Result<Vec<_>>>()?,
328            e.else_expr
329                .as_ref()
330                .map(|e| parse_physical_expr(e.as_ref(), ctx, input_schema, codec))
331                .transpose()?,
332        )?),
333        ExprType::Cast(e) => Arc::new(CastExpr::new(
334            parse_required_physical_expr(
335                e.expr.as_deref(),
336                ctx,
337                "expr",
338                input_schema,
339                codec,
340            )?,
341            convert_required!(e.arrow_type)?,
342            None,
343        )),
344        ExprType::TryCast(e) => Arc::new(TryCastExpr::new(
345            parse_required_physical_expr(
346                e.expr.as_deref(),
347                ctx,
348                "expr",
349                input_schema,
350                codec,
351            )?,
352            convert_required!(e.arrow_type)?,
353        )),
354        ExprType::ScalarUdf(e) => {
355            let udf = match &e.fun_definition {
356                Some(buf) => codec.try_decode_udf(&e.name, buf)?,
357                None => ctx
358                    .udf(e.name.as_str())
359                    .or_else(|_| codec.try_decode_udf(&e.name, &[]))?,
360            };
361            let scalar_fun_def = Arc::clone(&udf);
362
363            let args = parse_physical_exprs(&e.args, ctx, input_schema, codec)?;
364
365            let config_options = Arc::clone(ctx.session_config().options());
366
367            Arc::new(
368                ScalarFunctionExpr::new(
369                    e.name.as_str(),
370                    scalar_fun_def,
371                    args,
372                    Field::new(
373                        &e.return_field_name,
374                        convert_required!(e.return_type)?,
375                        true,
376                    )
377                    .into(),
378                    config_options,
379                )
380                .with_nullable(e.nullable),
381            )
382        }
383        ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new(
384            like_expr.negated,
385            like_expr.case_insensitive,
386            parse_required_physical_expr(
387                like_expr.expr.as_deref(),
388                ctx,
389                "expr",
390                input_schema,
391                codec,
392            )?,
393            parse_required_physical_expr(
394                like_expr.pattern.as_deref(),
395                ctx,
396                "pattern",
397                input_schema,
398                codec,
399            )?,
400        )),
401        ExprType::Extension(extension) => {
402            let inputs: Vec<Arc<dyn PhysicalExpr>> = extension
403                .inputs
404                .iter()
405                .map(|e| parse_physical_expr(e, ctx, input_schema, codec))
406                .collect::<Result<_>>()?;
407            (codec.try_decode_expr(extension.expr.as_slice(), &inputs)?) as _
408        }
409    };
410
411    Ok(pexpr)
412}
413
414fn parse_required_physical_expr(
415    expr: Option<&protobuf::PhysicalExprNode>,
416    ctx: &TaskContext,
417    field: &str,
418    input_schema: &Schema,
419    codec: &dyn PhysicalExtensionCodec,
420) -> Result<Arc<dyn PhysicalExpr>> {
421    expr.map(|e| parse_physical_expr(e, ctx, input_schema, codec))
422        .transpose()?
423        .ok_or_else(|| internal_datafusion_err!("Missing required field {field:?}"))
424}
425
426pub fn parse_protobuf_hash_partitioning(
427    partitioning: Option<&protobuf::PhysicalHashRepartition>,
428    ctx: &TaskContext,
429    input_schema: &Schema,
430    codec: &dyn PhysicalExtensionCodec,
431) -> Result<Option<Partitioning>> {
432    match partitioning {
433        Some(hash_part) => {
434            let expr =
435                parse_physical_exprs(&hash_part.hash_expr, ctx, input_schema, codec)?;
436
437            Ok(Some(Partitioning::Hash(
438                expr,
439                hash_part.partition_count.try_into().unwrap(),
440            )))
441        }
442        None => Ok(None),
443    }
444}
445
446pub fn parse_protobuf_partitioning(
447    partitioning: Option<&protobuf::Partitioning>,
448    ctx: &TaskContext,
449    input_schema: &Schema,
450    codec: &dyn PhysicalExtensionCodec,
451) -> Result<Option<Partitioning>> {
452    match partitioning {
453        Some(protobuf::Partitioning { partition_method }) => match partition_method {
454            Some(protobuf::partitioning::PartitionMethod::RoundRobin(
455                partition_count,
456            )) => Ok(Some(Partitioning::RoundRobinBatch(
457                *partition_count as usize,
458            ))),
459            Some(protobuf::partitioning::PartitionMethod::Hash(hash_repartition)) => {
460                parse_protobuf_hash_partitioning(
461                    Some(hash_repartition),
462                    ctx,
463                    input_schema,
464                    codec,
465                )
466            }
467            Some(protobuf::partitioning::PartitionMethod::Unknown(partition_count)) => {
468                Ok(Some(Partitioning::UnknownPartitioning(
469                    *partition_count as usize,
470                )))
471            }
472            None => Ok(None),
473        },
474        None => Ok(None),
475    }
476}
477
478pub fn parse_protobuf_file_scan_schema(
479    proto: &protobuf::FileScanExecConf,
480) -> Result<Arc<Schema>> {
481    Ok(Arc::new(convert_required!(proto.schema)?))
482}
483
484pub fn parse_protobuf_file_scan_config(
485    proto: &protobuf::FileScanExecConf,
486    ctx: &TaskContext,
487    codec: &dyn PhysicalExtensionCodec,
488    file_source: Arc<dyn FileSource>,
489) -> Result<FileScanConfig> {
490    let schema: Arc<Schema> = parse_protobuf_file_scan_schema(proto)?;
491    let projection = proto
492        .projection
493        .iter()
494        .map(|i| *i as usize)
495        .collect::<Vec<_>>();
496
497    let constraints = convert_required!(proto.constraints)?;
498    let statistics = convert_required!(proto.statistics)?;
499
500    let file_groups = proto
501        .file_groups
502        .iter()
503        .map(|f| f.try_into())
504        .collect::<Result<Vec<_>, _>>()?;
505
506    let object_store_url = match proto.object_store_url.is_empty() {
507        false => ObjectStoreUrl::parse(&proto.object_store_url)?,
508        true => ObjectStoreUrl::local_filesystem(),
509    };
510
511    // Reacquire the partition column types from the schema before removing them below.
512    let table_partition_cols = proto
513        .table_partition_cols
514        .iter()
515        .map(|col| Ok(schema.field_with_name(col)?.clone()))
516        .collect::<Result<Vec<_>>>()?;
517
518    // Remove partition columns from the schema after recreating table_partition_cols
519    // because the partition columns are not in the file. They are present to allow
520    // the partition column types to be reconstructed after serde.
521    let file_schema = Arc::new(
522        Schema::new(
523            schema
524                .fields()
525                .iter()
526                .filter(|field| !table_partition_cols.contains(field))
527                .cloned()
528                .collect::<Vec<_>>(),
529        )
530        .with_metadata(schema.metadata.clone()),
531    );
532
533    let mut output_ordering = vec![];
534    for node_collection in &proto.output_ordering {
535        let sort_exprs = parse_physical_sort_exprs(
536            &node_collection.physical_sort_expr_nodes,
537            ctx,
538            &schema,
539            codec,
540        )?;
541        output_ordering.extend(LexOrdering::new(sort_exprs));
542    }
543
544    let config = FileScanConfigBuilder::new(object_store_url, file_schema, file_source)
545        .with_file_groups(file_groups)
546        .with_constraints(constraints)
547        .with_statistics(statistics)
548        .with_projection_indices(Some(projection))
549        .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize))
550        .with_table_partition_cols(table_partition_cols)
551        .with_output_ordering(output_ordering)
552        .with_batch_size(proto.batch_size.map(|s| s as usize))
553        .build();
554    Ok(config)
555}
556
557pub fn parse_record_batches(buf: &[u8]) -> Result<Vec<RecordBatch>> {
558    if buf.is_empty() {
559        return Ok(vec![]);
560    }
561    let reader = StreamReader::try_new(buf, None)?;
562    let mut batches = Vec::new();
563    for batch in reader {
564        batches.push(batch?);
565    }
566    Ok(batches)
567}
568
569impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
570    type Error = DataFusionError;
571
572    fn try_from(val: &protobuf::PartitionedFile) -> Result<Self, Self::Error> {
573        Ok(PartitionedFile {
574            object_meta: ObjectMeta {
575                location: Path::parse(val.path.as_str()).map_err(|e| {
576                    proto_error(format!("Invalid object_store path: {e}"))
577                })?,
578                last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64),
579                size: val.size,
580                e_tag: None,
581                version: None,
582            },
583            partition_values: val
584                .partition_values
585                .iter()
586                .map(|v| v.try_into())
587                .collect::<Result<Vec<_>, _>>()?,
588            range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
589            statistics: val
590                .statistics
591                .as_ref()
592                .map(|v| v.try_into().map(Arc::new))
593                .transpose()?,
594            extensions: None,
595            metadata_size_hint: None,
596        })
597    }
598}
599
600impl TryFrom<&protobuf::FileRange> for FileRange {
601    type Error = DataFusionError;
602
603    fn try_from(value: &protobuf::FileRange) -> Result<Self, Self::Error> {
604        Ok(FileRange {
605            start: value.start,
606            end: value.end,
607        })
608    }
609}
610
611impl TryFrom<&protobuf::FileGroup> for FileGroup {
612    type Error = DataFusionError;
613
614    fn try_from(val: &protobuf::FileGroup) -> Result<Self, Self::Error> {
615        let files = val
616            .files
617            .iter()
618            .map(|f| f.try_into())
619            .collect::<Result<Vec<_>, _>>()?;
620        Ok(FileGroup::new(files))
621    }
622}
623
624impl TryFrom<&protobuf::JsonSink> for JsonSink {
625    type Error = DataFusionError;
626
627    fn try_from(value: &protobuf::JsonSink) -> Result<Self, Self::Error> {
628        Ok(Self::new(
629            convert_required!(value.config)?,
630            convert_required!(value.writer_options)?,
631        ))
632    }
633}
634
635#[cfg(feature = "parquet")]
636impl TryFrom<&protobuf::ParquetSink> for ParquetSink {
637    type Error = DataFusionError;
638
639    fn try_from(value: &protobuf::ParquetSink) -> Result<Self, Self::Error> {
640        Ok(Self::new(
641            convert_required!(value.config)?,
642            convert_required!(value.parquet_options)?,
643        ))
644    }
645}
646
647impl TryFrom<&protobuf::CsvSink> for CsvSink {
648    type Error = DataFusionError;
649
650    fn try_from(value: &protobuf::CsvSink) -> Result<Self, Self::Error> {
651        Ok(Self::new(
652            convert_required!(value.config)?,
653            convert_required!(value.writer_options)?,
654        ))
655    }
656}
657
658impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
659    type Error = DataFusionError;
660
661    fn try_from(conf: &protobuf::FileSinkConfig) -> Result<Self, Self::Error> {
662        let file_group = FileGroup::new(
663            conf.file_groups
664                .iter()
665                .map(|f| f.try_into())
666                .collect::<Result<Vec<_>>>()?,
667        );
668        let table_paths = conf
669            .table_paths
670            .iter()
671            .map(ListingTableUrl::parse)
672            .collect::<Result<Vec<_>>>()?;
673        let table_partition_cols = conf
674            .table_partition_cols
675            .iter()
676            .map(|protobuf::PartitionColumn { name, arrow_type }| {
677                let data_type = convert_required!(arrow_type)?;
678                Ok((name.clone(), data_type))
679            })
680            .collect::<Result<Vec<_>>>()?;
681        let insert_op = match conf.insert_op() {
682            protobuf::InsertOp::Append => InsertOp::Append,
683            protobuf::InsertOp::Overwrite => InsertOp::Overwrite,
684            protobuf::InsertOp::Replace => InsertOp::Replace,
685        };
686        Ok(Self {
687            original_url: String::default(),
688            object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?,
689            file_group,
690            table_paths,
691            output_schema: Arc::new(convert_required!(conf.output_schema)?),
692            table_partition_cols,
693            insert_op,
694            keep_partition_by_columns: conf.keep_partition_by_columns,
695            file_extension: conf.file_extension.clone(),
696        })
697    }
698}
699
700#[cfg(test)]
701mod tests {
702    use super::*;
703    use chrono::{TimeZone, Utc};
704    use datafusion_datasource::PartitionedFile;
705    use object_store::path::Path;
706    use object_store::ObjectMeta;
707
708    #[test]
709    fn partitioned_file_path_roundtrip_percent_encoded() {
710        let path_str = "foo/foo%2Fbar/baz%252Fqux";
711        let pf = PartitionedFile {
712            object_meta: ObjectMeta {
713                location: Path::parse(path_str).unwrap(),
714                last_modified: Utc.timestamp_nanos(1_000),
715                size: 42,
716                e_tag: None,
717                version: None,
718            },
719            partition_values: vec![],
720            range: None,
721            statistics: None,
722            extensions: None,
723            metadata_size_hint: None,
724        };
725
726        let proto = protobuf::PartitionedFile::try_from(&pf).unwrap();
727        assert_eq!(proto.path, path_str);
728
729        let pf2 = PartitionedFile::try_from(&proto).unwrap();
730        assert_eq!(pf2.object_meta.location.as_ref(), path_str);
731        assert_eq!(pf2.object_meta.location, pf.object_meta.location);
732        assert_eq!(pf2.object_meta.size, pf.object_meta.size);
733        assert_eq!(pf2.object_meta.last_modified, pf.object_meta.last_modified);
734    }
735
736    #[test]
737    fn partitioned_file_from_proto_invalid_path() {
738        let proto = protobuf::PartitionedFile {
739            path: "foo//bar".to_string(),
740            size: 1,
741            last_modified_ns: 0,
742            partition_values: vec![],
743            range: None,
744            statistics: None,
745        };
746
747        let err = PartitionedFile::try_from(&proto).unwrap_err();
748        assert!(err.to_string().contains("Invalid object_store path"));
749    }
750}