Skip to main content

datafusion_proto/physical_plan/
from_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Serde code to convert from protocol buffers to Rust data structures.
19
20use std::sync::Arc;
21
22use arrow::array::RecordBatch;
23use arrow::compute::SortOptions;
24use arrow::datatypes::{Field, Schema};
25use arrow::ipc::reader::StreamReader;
26use chrono::{TimeZone, Utc};
27use datafusion_common::{DataFusionError, Result, internal_datafusion_err, not_impl_err};
28use datafusion_datasource::file::FileSource;
29use datafusion_datasource::file_groups::FileGroup;
30use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
31use datafusion_datasource::file_sink_config::FileSinkConfig;
32use datafusion_datasource::{FileRange, ListingTableUrl, PartitionedFile, TableSchema};
33use datafusion_datasource_csv::file_format::CsvSink;
34use datafusion_datasource_json::file_format::JsonSink;
35#[cfg(feature = "parquet")]
36use datafusion_datasource_parquet::file_format::ParquetSink;
37use datafusion_execution::object_store::ObjectStoreUrl;
38use datafusion_execution::{FunctionRegistry, TaskContext};
39use datafusion_expr::WindowFunctionDefinition;
40use datafusion_expr::dml::InsertOp;
41use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs};
42use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
43use datafusion_physical_plan::expressions::{
44    BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr, Literal,
45    NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, in_list,
46};
47use datafusion_physical_plan::joins::{HashExpr, SeededRandomState};
48use datafusion_physical_plan::windows::{create_window_expr, schema_add_window_field};
49use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
50use datafusion_proto_common::common::proto_error;
51use object_store::ObjectMeta;
52use object_store::path::Path;
53
54use super::{
55    DefaultPhysicalProtoConverter, PhysicalExtensionCodec,
56    PhysicalProtoConverterExtension,
57};
58use crate::logical_plan::{self};
59use crate::protobuf::physical_expr_node::ExprType;
60use crate::{convert_required, protobuf};
61
62impl From<&protobuf::PhysicalColumn> for Column {
63    fn from(c: &protobuf::PhysicalColumn) -> Column {
64        Column::new(&c.name, c.index as usize)
65    }
66}
67
68/// Parses a physical sort expression from a protobuf.
69///
70/// # Arguments
71///
72/// * `proto` - Input proto with physical sort expression node
73/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
74/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
75///   when performing type coercion.
76/// * `codec` - An extension codec used to decode custom UDFs.
77pub fn parse_physical_sort_expr(
78    proto: &protobuf::PhysicalSortExprNode,
79    ctx: &TaskContext,
80    input_schema: &Schema,
81    codec: &dyn PhysicalExtensionCodec,
82    proto_converter: &dyn PhysicalProtoConverterExtension,
83) -> Result<PhysicalSortExpr> {
84    if let Some(expr) = &proto.expr {
85        let expr = proto_converter.proto_to_physical_expr(
86            expr.as_ref(),
87            ctx,
88            input_schema,
89            codec,
90        )?;
91        let options = SortOptions {
92            descending: !proto.asc,
93            nulls_first: proto.nulls_first,
94        };
95        Ok(PhysicalSortExpr { expr, options })
96    } else {
97        Err(proto_error("Unexpected empty physical expression"))
98    }
99}
100
101/// Parses a physical sort expressions from a protobuf.
102///
103/// # Arguments
104///
105/// * `proto` - Input proto with vector of physical sort expression node
106/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
107/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
108///   when performing type coercion.
109/// * `codec` - An extension codec used to decode custom UDFs.
110pub fn parse_physical_sort_exprs(
111    proto: &[protobuf::PhysicalSortExprNode],
112    ctx: &TaskContext,
113    input_schema: &Schema,
114    codec: &dyn PhysicalExtensionCodec,
115    proto_converter: &dyn PhysicalProtoConverterExtension,
116) -> Result<Vec<PhysicalSortExpr>> {
117    proto
118        .iter()
119        .map(|sort_expr| {
120            parse_physical_sort_expr(sort_expr, ctx, input_schema, codec, proto_converter)
121        })
122        .collect()
123}
124
125/// Parses a physical window expr from a protobuf.
126///
127/// # Arguments
128///
129/// * `proto` - Input proto with physical window expression node.
130/// * `name` - Name of the window expression.
131/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
132/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
133///   when performing type coercion.
134/// * `codec` - An extension codec used to decode custom UDFs.
135pub fn parse_physical_window_expr(
136    proto: &protobuf::PhysicalWindowExprNode,
137    ctx: &TaskContext,
138    input_schema: &Schema,
139    codec: &dyn PhysicalExtensionCodec,
140    proto_converter: &dyn PhysicalProtoConverterExtension,
141) -> Result<Arc<dyn WindowExpr>> {
142    let window_node_expr =
143        parse_physical_exprs(&proto.args, ctx, input_schema, codec, proto_converter)?;
144    let partition_by = parse_physical_exprs(
145        &proto.partition_by,
146        ctx,
147        input_schema,
148        codec,
149        proto_converter,
150    )?;
151
152    let order_by = parse_physical_sort_exprs(
153        &proto.order_by,
154        ctx,
155        input_schema,
156        codec,
157        proto_converter,
158    )?;
159
160    let window_frame = proto
161        .window_frame
162        .as_ref()
163        .map(|wf| wf.clone().try_into())
164        .transpose()
165        .map_err(|e| internal_datafusion_err!("{e}"))?
166        .ok_or_else(|| {
167            internal_datafusion_err!("Missing required field 'window_frame' in protobuf")
168        })?;
169
170    let fun = if let Some(window_func) = proto.window_function.as_ref() {
171        match window_func {
172            protobuf::physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(udaf_name) => {
173                WindowFunctionDefinition::AggregateUDF(match &proto.fun_definition {
174                    Some(buf) => codec.try_decode_udaf(udaf_name, buf)?,
175                    None => ctx.udaf(udaf_name).or_else(|_| codec.try_decode_udaf(udaf_name, &[]))?,
176                })
177            }
178            protobuf::physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(udwf_name) => {
179                WindowFunctionDefinition::WindowUDF(match &proto.fun_definition {
180                    Some(buf) => codec.try_decode_udwf(udwf_name, buf)?,
181                    None => ctx.udwf(udwf_name).or_else(|_| codec.try_decode_udwf(udwf_name, &[]))?
182                })
183            }
184        }
185    } else {
186        return Err(proto_error("Missing required field in protobuf"));
187    };
188
189    let name = proto.name.clone();
190    // TODO: Remove extended_schema if functions are all UDAF
191    let extended_schema =
192        schema_add_window_field(&window_node_expr, input_schema, &fun, &name)?;
193    create_window_expr(
194        &fun,
195        name,
196        &window_node_expr,
197        &partition_by,
198        &order_by,
199        Arc::new(window_frame),
200        extended_schema,
201        proto.ignore_nulls,
202        proto.distinct,
203        None,
204    )
205}
206
207pub fn parse_physical_exprs<'a, I>(
208    protos: I,
209    ctx: &TaskContext,
210    input_schema: &Schema,
211    codec: &dyn PhysicalExtensionCodec,
212    proto_converter: &dyn PhysicalProtoConverterExtension,
213) -> Result<Vec<Arc<dyn PhysicalExpr>>>
214where
215    I: IntoIterator<Item = &'a protobuf::PhysicalExprNode>,
216{
217    protos
218        .into_iter()
219        .map(|p| proto_converter.proto_to_physical_expr(p, ctx, input_schema, codec))
220        .collect::<Result<Vec<_>>>()
221}
222
223/// Parses a physical expression from a protobuf.
224///
225/// # Arguments
226///
227/// * `proto` - Input proto with physical expression node
228/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
229/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
230///   when performing type coercion.
231/// * `codec` - An extension codec used to decode custom UDFs.
232pub fn parse_physical_expr(
233    proto: &protobuf::PhysicalExprNode,
234    ctx: &TaskContext,
235    input_schema: &Schema,
236    codec: &dyn PhysicalExtensionCodec,
237) -> Result<Arc<dyn PhysicalExpr>> {
238    parse_physical_expr_with_converter(
239        proto,
240        ctx,
241        input_schema,
242        codec,
243        &DefaultPhysicalProtoConverter {},
244    )
245}
246
247/// Parses a physical expression from a protobuf.
248///
249/// # Arguments
250///
251/// * `proto` - Input proto with physical expression node
252/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
253/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
254///   when performing type coercion.
255/// * `codec` - An extension codec used to decode custom UDFs.
256/// * `proto_converter` - Conversion functions for physical plans and expressions
257pub fn parse_physical_expr_with_converter(
258    proto: &protobuf::PhysicalExprNode,
259    ctx: &TaskContext,
260    input_schema: &Schema,
261    codec: &dyn PhysicalExtensionCodec,
262    proto_converter: &dyn PhysicalProtoConverterExtension,
263) -> Result<Arc<dyn PhysicalExpr>> {
264    let expr_type = proto
265        .expr_type
266        .as_ref()
267        .ok_or_else(|| proto_error("Unexpected empty physical expression"))?;
268
269    let pexpr: Arc<dyn PhysicalExpr> = match expr_type {
270        ExprType::Column(c) => {
271            let pcol: Column = c.into();
272            Arc::new(pcol)
273        }
274        ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
275        ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
276        ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new(
277            parse_required_physical_expr(
278                binary_expr.l.as_deref(),
279                ctx,
280                "left",
281                input_schema,
282                codec,
283                proto_converter,
284            )?,
285            logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?,
286            parse_required_physical_expr(
287                binary_expr.r.as_deref(),
288                ctx,
289                "right",
290                input_schema,
291                codec,
292                proto_converter,
293            )?,
294        )),
295        ExprType::AggregateExpr(_) => {
296            return not_impl_err!(
297                "Cannot convert aggregate expr node to physical expression"
298            );
299        }
300        ExprType::WindowExpr(_) => {
301            return not_impl_err!(
302                "Cannot convert window expr node to physical expression"
303            );
304        }
305        ExprType::Sort(_) => {
306            return not_impl_err!("Cannot convert sort expr node to physical expression");
307        }
308        ExprType::IsNullExpr(e) => {
309            Arc::new(IsNullExpr::new(parse_required_physical_expr(
310                e.expr.as_deref(),
311                ctx,
312                "expr",
313                input_schema,
314                codec,
315                proto_converter,
316            )?))
317        }
318        ExprType::IsNotNullExpr(e) => {
319            Arc::new(IsNotNullExpr::new(parse_required_physical_expr(
320                e.expr.as_deref(),
321                ctx,
322                "expr",
323                input_schema,
324                codec,
325                proto_converter,
326            )?))
327        }
328        ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr(
329            e.expr.as_deref(),
330            ctx,
331            "expr",
332            input_schema,
333            codec,
334            proto_converter,
335        )?)),
336        ExprType::Negative(e) => {
337            Arc::new(NegativeExpr::new(parse_required_physical_expr(
338                e.expr.as_deref(),
339                ctx,
340                "expr",
341                input_schema,
342                codec,
343                proto_converter,
344            )?))
345        }
346        ExprType::InList(e) => in_list(
347            parse_required_physical_expr(
348                e.expr.as_deref(),
349                ctx,
350                "expr",
351                input_schema,
352                codec,
353                proto_converter,
354            )?,
355            parse_physical_exprs(&e.list, ctx, input_schema, codec, proto_converter)?,
356            &e.negated,
357            input_schema,
358        )?,
359        ExprType::Case(e) => Arc::new(CaseExpr::try_new(
360            e.expr
361                .as_ref()
362                .map(|e| {
363                    proto_converter.proto_to_physical_expr(
364                        e.as_ref(),
365                        ctx,
366                        input_schema,
367                        codec,
368                    )
369                })
370                .transpose()?,
371            e.when_then_expr
372                .iter()
373                .map(|e| {
374                    Ok((
375                        parse_required_physical_expr(
376                            e.when_expr.as_ref(),
377                            ctx,
378                            "when_expr",
379                            input_schema,
380                            codec,
381                            proto_converter,
382                        )?,
383                        parse_required_physical_expr(
384                            e.then_expr.as_ref(),
385                            ctx,
386                            "then_expr",
387                            input_schema,
388                            codec,
389                            proto_converter,
390                        )?,
391                    ))
392                })
393                .collect::<Result<Vec<_>>>()?,
394            e.else_expr
395                .as_ref()
396                .map(|e| {
397                    proto_converter.proto_to_physical_expr(
398                        e.as_ref(),
399                        ctx,
400                        input_schema,
401                        codec,
402                    )
403                })
404                .transpose()?,
405        )?),
406        ExprType::Cast(e) => Arc::new(CastExpr::new(
407            parse_required_physical_expr(
408                e.expr.as_deref(),
409                ctx,
410                "expr",
411                input_schema,
412                codec,
413                proto_converter,
414            )?,
415            convert_required!(e.arrow_type)?,
416            None,
417        )),
418        ExprType::TryCast(e) => Arc::new(TryCastExpr::new(
419            parse_required_physical_expr(
420                e.expr.as_deref(),
421                ctx,
422                "expr",
423                input_schema,
424                codec,
425                proto_converter,
426            )?,
427            convert_required!(e.arrow_type)?,
428        )),
429        ExprType::ScalarUdf(e) => {
430            let udf = match &e.fun_definition {
431                Some(buf) => codec.try_decode_udf(&e.name, buf)?,
432                None => ctx
433                    .udf(e.name.as_str())
434                    .or_else(|_| codec.try_decode_udf(&e.name, &[]))?,
435            };
436            let scalar_fun_def = Arc::clone(&udf);
437
438            let args =
439                parse_physical_exprs(&e.args, ctx, input_schema, codec, proto_converter)?;
440
441            let config_options = Arc::clone(ctx.session_config().options());
442
443            Arc::new(
444                ScalarFunctionExpr::new(
445                    e.name.as_str(),
446                    scalar_fun_def,
447                    args,
448                    Field::new(
449                        &e.return_field_name,
450                        convert_required!(e.return_type)?,
451                        true,
452                    )
453                    .into(),
454                    config_options,
455                )
456                .with_nullable(e.nullable),
457            )
458        }
459        ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new(
460            like_expr.negated,
461            like_expr.case_insensitive,
462            parse_required_physical_expr(
463                like_expr.expr.as_deref(),
464                ctx,
465                "expr",
466                input_schema,
467                codec,
468                proto_converter,
469            )?,
470            parse_required_physical_expr(
471                like_expr.pattern.as_deref(),
472                ctx,
473                "pattern",
474                input_schema,
475                codec,
476                proto_converter,
477            )?,
478        )),
479        ExprType::HashExpr(hash_expr) => {
480            let on_columns = parse_physical_exprs(
481                &hash_expr.on_columns,
482                ctx,
483                input_schema,
484                codec,
485                proto_converter,
486            )?;
487            Arc::new(HashExpr::new(
488                on_columns,
489                SeededRandomState::with_seeds(
490                    hash_expr.seed0,
491                    hash_expr.seed1,
492                    hash_expr.seed2,
493                    hash_expr.seed3,
494                ),
495                hash_expr.description.clone(),
496            ))
497        }
498        ExprType::Extension(extension) => {
499            let inputs: Vec<Arc<dyn PhysicalExpr>> = extension
500                .inputs
501                .iter()
502                .map(|e| {
503                    proto_converter.proto_to_physical_expr(e, ctx, input_schema, codec)
504                })
505                .collect::<Result<_>>()?;
506            codec.try_decode_expr(extension.expr.as_slice(), &inputs)? as _
507        }
508    };
509
510    Ok(pexpr)
511}
512
513fn parse_required_physical_expr(
514    expr: Option<&protobuf::PhysicalExprNode>,
515    ctx: &TaskContext,
516    field: &str,
517    input_schema: &Schema,
518    codec: &dyn PhysicalExtensionCodec,
519    proto_converter: &dyn PhysicalProtoConverterExtension,
520) -> Result<Arc<dyn PhysicalExpr>> {
521    expr.map(|e| proto_converter.proto_to_physical_expr(e, ctx, input_schema, codec))
522        .transpose()?
523        .ok_or_else(|| internal_datafusion_err!("Missing required field {field:?}"))
524}
525
526pub fn parse_protobuf_hash_partitioning(
527    partitioning: Option<&protobuf::PhysicalHashRepartition>,
528    ctx: &TaskContext,
529    input_schema: &Schema,
530    codec: &dyn PhysicalExtensionCodec,
531    proto_converter: &dyn PhysicalProtoConverterExtension,
532) -> Result<Option<Partitioning>> {
533    match partitioning {
534        Some(hash_part) => {
535            let expr = parse_physical_exprs(
536                &hash_part.hash_expr,
537                ctx,
538                input_schema,
539                codec,
540                proto_converter,
541            )?;
542
543            Ok(Some(Partitioning::Hash(
544                expr,
545                hash_part.partition_count.try_into().unwrap(),
546            )))
547        }
548        None => Ok(None),
549    }
550}
551
552pub fn parse_protobuf_partitioning(
553    partitioning: Option<&protobuf::Partitioning>,
554    ctx: &TaskContext,
555    input_schema: &Schema,
556    codec: &dyn PhysicalExtensionCodec,
557    proto_converter: &dyn PhysicalProtoConverterExtension,
558) -> Result<Option<Partitioning>> {
559    match partitioning {
560        Some(protobuf::Partitioning { partition_method }) => match partition_method {
561            Some(protobuf::partitioning::PartitionMethod::RoundRobin(
562                partition_count,
563            )) => Ok(Some(Partitioning::RoundRobinBatch(
564                *partition_count as usize,
565            ))),
566            Some(protobuf::partitioning::PartitionMethod::Hash(hash_repartition)) => {
567                parse_protobuf_hash_partitioning(
568                    Some(hash_repartition),
569                    ctx,
570                    input_schema,
571                    codec,
572                    proto_converter,
573                )
574            }
575            Some(protobuf::partitioning::PartitionMethod::Unknown(partition_count)) => {
576                Ok(Some(Partitioning::UnknownPartitioning(
577                    *partition_count as usize,
578                )))
579            }
580            None => Ok(None),
581        },
582        None => Ok(None),
583    }
584}
585
586pub fn parse_protobuf_file_scan_schema(
587    proto: &protobuf::FileScanExecConf,
588) -> Result<Arc<Schema>> {
589    Ok(Arc::new(convert_required!(proto.schema)?))
590}
591
592/// Parses a TableSchema from protobuf, extracting the file schema and partition columns
593pub fn parse_table_schema_from_proto(
594    proto: &protobuf::FileScanExecConf,
595) -> Result<TableSchema> {
596    let schema: Arc<Schema> = parse_protobuf_file_scan_schema(proto)?;
597
598    // Reacquire the partition column types from the schema before removing them below.
599    let table_partition_cols = proto
600        .table_partition_cols
601        .iter()
602        .map(|col| Ok(Arc::new(schema.field_with_name(col)?.clone())))
603        .collect::<Result<Vec<_>>>()?;
604
605    // Remove partition columns from the schema after recreating table_partition_cols
606    // because the partition columns are not in the file. They are present to allow
607    // the partition column types to be reconstructed after serde.
608    let file_schema = Arc::new(
609        Schema::new(
610            schema
611                .fields()
612                .iter()
613                .filter(|field| !table_partition_cols.contains(field))
614                .cloned()
615                .collect::<Vec<_>>(),
616        )
617        .with_metadata(schema.metadata.clone()),
618    );
619
620    Ok(TableSchema::new(file_schema, table_partition_cols))
621}
622
623pub fn parse_protobuf_file_scan_config(
624    proto: &protobuf::FileScanExecConf,
625    ctx: &TaskContext,
626    codec: &dyn PhysicalExtensionCodec,
627    proto_converter: &dyn PhysicalProtoConverterExtension,
628    file_source: Arc<dyn FileSource>,
629) -> Result<FileScanConfig> {
630    let schema: Arc<Schema> = parse_protobuf_file_scan_schema(proto)?;
631
632    let constraints = convert_required!(proto.constraints)?;
633    let statistics = convert_required!(proto.statistics)?;
634
635    let file_groups = proto
636        .file_groups
637        .iter()
638        .map(|f| f.try_into())
639        .collect::<Result<Vec<_>, _>>()?;
640
641    let object_store_url = match proto.object_store_url.is_empty() {
642        false => ObjectStoreUrl::parse(&proto.object_store_url)?,
643        true => ObjectStoreUrl::local_filesystem(),
644    };
645
646    let mut output_ordering = vec![];
647    for node_collection in &proto.output_ordering {
648        let sort_exprs = parse_physical_sort_exprs(
649            &node_collection.physical_sort_expr_nodes,
650            ctx,
651            &schema,
652            codec,
653            proto_converter,
654        )?;
655        output_ordering.extend(LexOrdering::new(sort_exprs));
656    }
657
658    // Parse projection expressions if present and apply to file source
659    let file_source = if let Some(proto_projection_exprs) = &proto.projection_exprs {
660        let projection_exprs: Vec<ProjectionExpr> = proto_projection_exprs
661            .projections
662            .iter()
663            .map(|proto_expr| {
664                let expr = proto_converter.proto_to_physical_expr(
665                    proto_expr.expr.as_ref().ok_or_else(|| {
666                        internal_datafusion_err!("ProjectionExpr missing expr field")
667                    })?,
668                    ctx,
669                    &schema,
670                    codec,
671                )?;
672                Ok(ProjectionExpr::new(expr, proto_expr.alias.clone()))
673            })
674            .collect::<Result<Vec<_>>>()?;
675
676        let projection_exprs = ProjectionExprs::new(projection_exprs);
677
678        // Apply projection to file source
679        file_source
680            .try_pushdown_projection(&projection_exprs)?
681            .unwrap_or(file_source)
682    } else {
683        file_source
684    };
685
686    let config = FileScanConfigBuilder::new(object_store_url, file_source)
687        .with_file_groups(file_groups)
688        .with_constraints(constraints)
689        .with_statistics(statistics)
690        .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize))
691        .with_output_ordering(output_ordering)
692        .with_batch_size(proto.batch_size.map(|s| s as usize))
693        .build();
694    Ok(config)
695}
696
697pub fn parse_record_batches(buf: &[u8]) -> Result<Vec<RecordBatch>> {
698    if buf.is_empty() {
699        return Ok(vec![]);
700    }
701    let reader = StreamReader::try_new(buf, None)?;
702    let mut batches = Vec::new();
703    for batch in reader {
704        batches.push(batch?);
705    }
706    Ok(batches)
707}
708
709impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
710    type Error = DataFusionError;
711
712    fn try_from(val: &protobuf::PartitionedFile) -> Result<Self, Self::Error> {
713        let mut pf = PartitionedFile::new_from_meta(ObjectMeta {
714            location: Path::parse(val.path.as_str())
715                .map_err(|e| proto_error(format!("Invalid object_store path: {e}")))?,
716            last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64),
717            size: val.size,
718            e_tag: None,
719            version: None,
720        })
721        .with_partition_values(
722            val.partition_values
723                .iter()
724                .map(|v| v.try_into())
725                .collect::<Result<Vec<_>, _>>()?,
726        );
727        if let Some(range) = val.range.as_ref() {
728            let file_range: FileRange = range.try_into()?;
729            pf = pf.with_range(file_range.start, file_range.end);
730        }
731        if let Some(proto_stats) = val.statistics.as_ref() {
732            pf = pf.with_statistics(Arc::new(proto_stats.try_into()?));
733        }
734        Ok(pf)
735    }
736}
737
738impl TryFrom<&protobuf::FileRange> for FileRange {
739    type Error = DataFusionError;
740
741    fn try_from(value: &protobuf::FileRange) -> Result<Self, Self::Error> {
742        Ok(FileRange {
743            start: value.start,
744            end: value.end,
745        })
746    }
747}
748
749impl TryFrom<&protobuf::FileGroup> for FileGroup {
750    type Error = DataFusionError;
751
752    fn try_from(val: &protobuf::FileGroup) -> Result<Self, Self::Error> {
753        let files = val
754            .files
755            .iter()
756            .map(|f| f.try_into())
757            .collect::<Result<Vec<_>, _>>()?;
758        Ok(FileGroup::new(files))
759    }
760}
761
762impl TryFrom<&protobuf::JsonSink> for JsonSink {
763    type Error = DataFusionError;
764
765    fn try_from(value: &protobuf::JsonSink) -> Result<Self, Self::Error> {
766        Ok(Self::new(
767            convert_required!(value.config)?,
768            convert_required!(value.writer_options)?,
769        ))
770    }
771}
772
773#[cfg(feature = "parquet")]
774impl TryFrom<&protobuf::ParquetSink> for ParquetSink {
775    type Error = DataFusionError;
776
777    fn try_from(value: &protobuf::ParquetSink) -> Result<Self, Self::Error> {
778        Ok(Self::new(
779            convert_required!(value.config)?,
780            convert_required!(value.parquet_options)?,
781        ))
782    }
783}
784
785impl TryFrom<&protobuf::CsvSink> for CsvSink {
786    type Error = DataFusionError;
787
788    fn try_from(value: &protobuf::CsvSink) -> Result<Self, Self::Error> {
789        Ok(Self::new(
790            convert_required!(value.config)?,
791            convert_required!(value.writer_options)?,
792        ))
793    }
794}
795
796impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
797    type Error = DataFusionError;
798
799    fn try_from(conf: &protobuf::FileSinkConfig) -> Result<Self, Self::Error> {
800        let file_group = FileGroup::new(
801            conf.file_groups
802                .iter()
803                .map(|f| f.try_into())
804                .collect::<Result<Vec<_>>>()?,
805        );
806        let table_paths = conf
807            .table_paths
808            .iter()
809            .map(ListingTableUrl::parse)
810            .collect::<Result<Vec<_>>>()?;
811        let table_partition_cols = conf
812            .table_partition_cols
813            .iter()
814            .map(|protobuf::PartitionColumn { name, arrow_type }| {
815                let data_type = convert_required!(arrow_type)?;
816                Ok((name.clone(), data_type))
817            })
818            .collect::<Result<Vec<_>>>()?;
819        let insert_op = match conf.insert_op() {
820            protobuf::InsertOp::Append => InsertOp::Append,
821            protobuf::InsertOp::Overwrite => InsertOp::Overwrite,
822            protobuf::InsertOp::Replace => InsertOp::Replace,
823        };
824        let file_output_mode = match conf.file_output_mode() {
825            protobuf::FileOutputMode::Automatic => {
826                datafusion_datasource::file_sink_config::FileOutputMode::Automatic
827            }
828            protobuf::FileOutputMode::SingleFile => {
829                datafusion_datasource::file_sink_config::FileOutputMode::SingleFile
830            }
831            protobuf::FileOutputMode::Directory => {
832                datafusion_datasource::file_sink_config::FileOutputMode::Directory
833            }
834        };
835        Ok(Self {
836            original_url: String::default(),
837            object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?,
838            file_group,
839            table_paths,
840            output_schema: Arc::new(convert_required!(conf.output_schema)?),
841            table_partition_cols,
842            insert_op,
843            keep_partition_by_columns: conf.keep_partition_by_columns,
844            file_extension: conf.file_extension.clone(),
845            file_output_mode,
846        })
847    }
848}
849
850#[cfg(test)]
851mod tests {
852    use chrono::{TimeZone, Utc};
853    use datafusion_datasource::PartitionedFile;
854    use object_store::ObjectMeta;
855    use object_store::path::Path;
856
857    use super::*;
858
859    #[test]
860    fn partitioned_file_path_roundtrip_percent_encoded() {
861        let path_str = "foo/foo%2Fbar/baz%252Fqux";
862        let pf = PartitionedFile::new_from_meta(ObjectMeta {
863            location: Path::parse(path_str).unwrap(),
864            last_modified: Utc.timestamp_nanos(1_000),
865            size: 42,
866            e_tag: None,
867            version: None,
868        });
869
870        let proto = protobuf::PartitionedFile::try_from(&pf).unwrap();
871        assert_eq!(proto.path, path_str);
872
873        let pf2 = PartitionedFile::try_from(&proto).unwrap();
874        assert_eq!(pf2.object_meta.location.as_ref(), path_str);
875        assert_eq!(pf2.object_meta.location, pf.object_meta.location);
876        assert_eq!(pf2.object_meta.size, pf.object_meta.size);
877        assert_eq!(pf2.object_meta.last_modified, pf.object_meta.last_modified);
878    }
879
880    #[test]
881    fn partitioned_file_from_proto_invalid_path() {
882        let proto = protobuf::PartitionedFile {
883            path: "foo//bar".to_string(),
884            size: 1,
885            last_modified_ns: 0,
886            partition_values: vec![],
887            range: None,
888            statistics: None,
889        };
890
891        let err = PartitionedFile::try_from(&proto).unwrap_err();
892        assert!(err.to_string().contains("Invalid object_store path"));
893    }
894}