Skip to main content

datafusion_proto/physical_plan/
from_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Serde code to convert from protocol buffers to Rust data structures.
19
20use std::sync::Arc;
21
22use arrow::array::RecordBatch;
23use arrow::compute::SortOptions;
24use arrow::datatypes::{Field, Schema};
25use arrow::ipc::reader::StreamReader;
26use chrono::{TimeZone, Utc};
27use datafusion_common::{DataFusionError, Result, internal_datafusion_err, not_impl_err};
28use datafusion_datasource::file::FileSource;
29use datafusion_datasource::file_groups::FileGroup;
30use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
31use datafusion_datasource::file_sink_config::FileSinkConfig;
32use datafusion_datasource::{FileRange, ListingTableUrl, PartitionedFile, TableSchema};
33use datafusion_datasource_csv::file_format::CsvSink;
34use datafusion_datasource_json::file_format::JsonSink;
35#[cfg(feature = "parquet")]
36use datafusion_datasource_parquet::file_format::ParquetSink;
37use datafusion_execution::object_store::ObjectStoreUrl;
38use datafusion_execution::{FunctionRegistry, TaskContext};
39use datafusion_expr::WindowFunctionDefinition;
40use datafusion_expr::dml::InsertOp;
41use datafusion_expr::execution_props::SubqueryIndex;
42use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs};
43use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr;
44use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
45use datafusion_physical_plan::expressions::{
46    BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr, Literal,
47    NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, in_list,
48};
49use datafusion_physical_plan::joins::{HashExpr, SeededRandomState};
50use datafusion_physical_plan::windows::{create_window_expr, schema_add_window_field};
51use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
52use datafusion_proto_common::common::proto_error;
53use object_store::ObjectMeta;
54use object_store::path::Path;
55
56use super::{
57    DefaultPhysicalProtoConverter, PhysicalExtensionCodec, PhysicalPlanDecodeContext,
58    PhysicalProtoConverterExtension,
59};
60use crate::logical_plan::{self};
61use crate::protobuf::physical_expr_node::ExprType;
62use crate::{convert_required, protobuf};
63use datafusion_physical_expr::expressions::{
64    DynamicFilterInner, DynamicFilterPhysicalExpr,
65};
66
67impl From<&protobuf::PhysicalColumn> for Column {
68    fn from(c: &protobuf::PhysicalColumn) -> Column {
69        Column::new(&c.name, c.index as usize)
70    }
71}
72
73/// Parses a physical sort expression from a protobuf.
74///
75/// # Arguments
76///
77/// * `proto` - Input proto with physical sort expression node
78/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
79///   when performing type coercion.
80/// * `ctx` - Decode context carrying the task context, extension codec, and
81///   any scoped state needed during recursive deserialization.
82/// * `proto_converter` - Converter hooks used for recursive physical plan and
83///   expression deserialization.
84pub fn parse_physical_sort_expr(
85    proto: &protobuf::PhysicalSortExprNode,
86    ctx: &PhysicalPlanDecodeContext<'_>,
87    input_schema: &Schema,
88    proto_converter: &dyn PhysicalProtoConverterExtension,
89) -> Result<PhysicalSortExpr> {
90    if let Some(expr) = &proto.expr {
91        let expr =
92            proto_converter.proto_to_physical_expr(expr.as_ref(), input_schema, ctx)?;
93        let options = SortOptions {
94            descending: !proto.asc,
95            nulls_first: proto.nulls_first,
96        };
97        Ok(PhysicalSortExpr { expr, options })
98    } else {
99        Err(proto_error("Unexpected empty physical expression"))
100    }
101}
102
103/// Parses a physical sort expressions from a protobuf.
104///
105/// # Arguments
106///
107/// * `proto` - Input proto with vector of physical sort expression node
108/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
109///   when performing type coercion.
110/// * `ctx` - Decode context carrying the task context, extension codec, and
111///   any scoped state needed during recursive deserialization.
112/// * `proto_converter` - Converter hooks used for recursive physical plan and
113///   expression deserialization.
114pub fn parse_physical_sort_exprs(
115    proto: &[protobuf::PhysicalSortExprNode],
116    ctx: &PhysicalPlanDecodeContext<'_>,
117    input_schema: &Schema,
118    proto_converter: &dyn PhysicalProtoConverterExtension,
119) -> Result<Vec<PhysicalSortExpr>> {
120    proto
121        .iter()
122        .map(|sort_expr| {
123            parse_physical_sort_expr(sort_expr, ctx, input_schema, proto_converter)
124        })
125        .collect()
126}
127
128/// Parses a physical window expr from a protobuf.
129///
130/// # Arguments
131///
132/// * `proto` - Input proto with physical window expression node.
133/// * `name` - Name of the window expression.
134/// * `input_schema` - The Arrow schema for the input, used for determining
135///   expression data types when performing type coercion.
136/// * `ctx` - Decode context carrying the task context, extension codec, and
137///   any scoped state needed during recursive deserialization.
138/// * `proto_converter` - Converter hooks used for recursive physical plan and
139///   expression deserialization.
140pub fn parse_physical_window_expr(
141    proto: &protobuf::PhysicalWindowExprNode,
142    ctx: &PhysicalPlanDecodeContext<'_>,
143    input_schema: &Schema,
144    proto_converter: &dyn PhysicalProtoConverterExtension,
145) -> Result<Arc<dyn WindowExpr>> {
146    let window_node_expr =
147        parse_physical_exprs(&proto.args, ctx, input_schema, proto_converter)?;
148    let partition_by =
149        parse_physical_exprs(&proto.partition_by, ctx, input_schema, proto_converter)?;
150
151    let order_by =
152        parse_physical_sort_exprs(&proto.order_by, ctx, input_schema, proto_converter)?;
153
154    let window_frame = proto
155        .window_frame
156        .as_ref()
157        .map(|wf| wf.clone().try_into())
158        .transpose()
159        .map_err(|e| internal_datafusion_err!("{e}"))?
160        .ok_or_else(|| {
161            internal_datafusion_err!("Missing required field 'window_frame' in protobuf")
162        })?;
163
164    let fun = if let Some(window_func) = proto.window_function.as_ref() {
165        match window_func {
166            protobuf::physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(udaf_name) => {
167                WindowFunctionDefinition::AggregateUDF(match &proto.fun_definition {
168                    Some(buf) => ctx.codec().try_decode_udaf(udaf_name, buf)?,
169                    None => ctx
170                        .task_ctx()
171                        .udaf(udaf_name)
172                        .or_else(|_| ctx.codec().try_decode_udaf(udaf_name, &[]))?,
173                })
174            }
175            protobuf::physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(udwf_name) => {
176                WindowFunctionDefinition::WindowUDF(match &proto.fun_definition {
177                    Some(buf) => ctx.codec().try_decode_udwf(udwf_name, buf)?,
178                    None => ctx
179                        .task_ctx()
180                        .udwf(udwf_name)
181                        .or_else(|_| ctx.codec().try_decode_udwf(udwf_name, &[]))?
182                })
183            }
184        }
185    } else {
186        return Err(proto_error("Missing required field in protobuf"));
187    };
188
189    let name = proto.name.clone();
190    // TODO: Remove extended_schema if functions are all UDAF
191    let extended_schema =
192        schema_add_window_field(&window_node_expr, input_schema, &fun, &name)?;
193    create_window_expr(
194        &fun,
195        name,
196        &window_node_expr,
197        &partition_by,
198        &order_by,
199        Arc::new(window_frame),
200        extended_schema,
201        proto.ignore_nulls,
202        proto.distinct,
203        None,
204    )
205}
206
207pub fn parse_physical_exprs<'a, I>(
208    protos: I,
209    ctx: &PhysicalPlanDecodeContext<'_>,
210    input_schema: &Schema,
211    proto_converter: &dyn PhysicalProtoConverterExtension,
212) -> Result<Vec<Arc<dyn PhysicalExpr>>>
213where
214    I: IntoIterator<Item = &'a protobuf::PhysicalExprNode>,
215{
216    protos
217        .into_iter()
218        .map(|p| proto_converter.proto_to_physical_expr(p, input_schema, ctx))
219        .collect::<Result<Vec<_>>>()
220}
221
222/// Parses a physical expression from a protobuf.
223///
224/// # Arguments
225///
226/// * `proto` - Input proto with physical expression node
227/// * `ctx` - Task context used to resolve registered functions.
228/// * `input_schema` - The Arrow schema for the input, used for determining
229///   expression data types when performing type coercion.
230/// * `codec` - Physical extension codec used to construct the root decode
231///   context for deserialization.
232pub fn parse_physical_expr(
233    proto: &protobuf::PhysicalExprNode,
234    ctx: &TaskContext,
235    input_schema: &Schema,
236    codec: &dyn PhysicalExtensionCodec,
237) -> Result<Arc<dyn PhysicalExpr>> {
238    let decode_ctx = PhysicalPlanDecodeContext::new(ctx, codec);
239    parse_physical_expr_with_converter(
240        proto,
241        input_schema,
242        &decode_ctx,
243        &DefaultPhysicalProtoConverter {},
244    )
245}
246
247/// Parses a physical expression from a protobuf.
248///
249/// # Arguments
250///
251/// * `proto` - Input proto with physical expression node
252/// * `input_schema` - The Arrow schema for the input, used for determining
253///   expression data types when performing type coercion.
254/// * `ctx` - Decode context carrying the task context, extension codec, and
255///   any scoped state needed during recursive deserialization.
256/// * `proto_converter` - Converter hooks used for recursive physical plan and
257///   expression deserialization.
258pub fn parse_physical_expr_with_converter(
259    proto: &protobuf::PhysicalExprNode,
260    input_schema: &Schema,
261    ctx: &PhysicalPlanDecodeContext<'_>,
262    proto_converter: &dyn PhysicalProtoConverterExtension,
263) -> Result<Arc<dyn PhysicalExpr>> {
264    let expr_type = proto
265        .expr_type
266        .as_ref()
267        .ok_or_else(|| proto_error("Unexpected empty physical expression"))?;
268
269    let pexpr: Arc<dyn PhysicalExpr> = match expr_type {
270        ExprType::Column(c) => {
271            let pcol: Column = c.into();
272            Arc::new(pcol)
273        }
274        ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
275        ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
276        ExprType::BinaryExpr(binary_expr) => {
277            let op = logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?;
278            if !binary_expr.operands.is_empty() {
279                // New linearized format: reduce the flat operands list back into
280                // a nested binary expression tree.
281                let operands: Vec<Arc<dyn PhysicalExpr>> = binary_expr
282                    .operands
283                    .iter()
284                    .map(|e| proto_converter.proto_to_physical_expr(e, input_schema, ctx))
285                    .collect::<Result<Vec<_>>>()?;
286
287                if operands.len() < 2 {
288                    return Err(proto_error(
289                        "A binary expression must always have at least 2 operands",
290                    ));
291                }
292
293                operands
294                    .into_iter()
295                    .reduce(|left, right| Arc::new(BinaryExpr::new(left, op, right)))
296                    .expect(
297                        "Binary expression could not be reduced to a single expression.",
298                    )
299            } else {
300                // Legacy format with l/r fields
301                Arc::new(BinaryExpr::new(
302                    parse_required_physical_expr(
303                        binary_expr.l.as_deref(),
304                        ctx,
305                        "left",
306                        input_schema,
307                        proto_converter,
308                    )?,
309                    op,
310                    parse_required_physical_expr(
311                        binary_expr.r.as_deref(),
312                        ctx,
313                        "right",
314                        input_schema,
315                        proto_converter,
316                    )?,
317                ))
318            }
319        }
320        ExprType::AggregateExpr(_) => {
321            return not_impl_err!(
322                "Cannot convert aggregate expr node to physical expression"
323            );
324        }
325        ExprType::WindowExpr(_) => {
326            return not_impl_err!(
327                "Cannot convert window expr node to physical expression"
328            );
329        }
330        ExprType::Sort(_) => {
331            return not_impl_err!("Cannot convert sort expr node to physical expression");
332        }
333        ExprType::IsNullExpr(e) => {
334            Arc::new(IsNullExpr::new(parse_required_physical_expr(
335                e.expr.as_deref(),
336                ctx,
337                "expr",
338                input_schema,
339                proto_converter,
340            )?))
341        }
342        ExprType::IsNotNullExpr(e) => {
343            Arc::new(IsNotNullExpr::new(parse_required_physical_expr(
344                e.expr.as_deref(),
345                ctx,
346                "expr",
347                input_schema,
348                proto_converter,
349            )?))
350        }
351        ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr(
352            e.expr.as_deref(),
353            ctx,
354            "expr",
355            input_schema,
356            proto_converter,
357        )?)),
358        ExprType::Negative(e) => {
359            Arc::new(NegativeExpr::new(parse_required_physical_expr(
360                e.expr.as_deref(),
361                ctx,
362                "expr",
363                input_schema,
364                proto_converter,
365            )?))
366        }
367        ExprType::InList(e) => in_list(
368            parse_required_physical_expr(
369                e.expr.as_deref(),
370                ctx,
371                "expr",
372                input_schema,
373                proto_converter,
374            )?,
375            parse_physical_exprs(&e.list, ctx, input_schema, proto_converter)?,
376            &e.negated,
377            input_schema,
378        )?,
379        ExprType::Case(e) => Arc::new(CaseExpr::try_new(
380            e.expr
381                .as_ref()
382                .map(|e| {
383                    proto_converter.proto_to_physical_expr(e.as_ref(), input_schema, ctx)
384                })
385                .transpose()?,
386            e.when_then_expr
387                .iter()
388                .map(|e| {
389                    Ok((
390                        parse_required_physical_expr(
391                            e.when_expr.as_ref(),
392                            ctx,
393                            "when_expr",
394                            input_schema,
395                            proto_converter,
396                        )?,
397                        parse_required_physical_expr(
398                            e.then_expr.as_ref(),
399                            ctx,
400                            "then_expr",
401                            input_schema,
402                            proto_converter,
403                        )?,
404                    ))
405                })
406                .collect::<Result<Vec<_>>>()?,
407            e.else_expr
408                .as_ref()
409                .map(|e| {
410                    proto_converter.proto_to_physical_expr(e.as_ref(), input_schema, ctx)
411                })
412                .transpose()?,
413        )?),
414        ExprType::Cast(e) => Arc::new(CastExpr::new(
415            parse_required_physical_expr(
416                e.expr.as_deref(),
417                ctx,
418                "expr",
419                input_schema,
420                proto_converter,
421            )?,
422            convert_required!(e.arrow_type)?,
423            None,
424        )),
425        ExprType::TryCast(e) => Arc::new(TryCastExpr::new(
426            parse_required_physical_expr(
427                e.expr.as_deref(),
428                ctx,
429                "expr",
430                input_schema,
431                proto_converter,
432            )?,
433            convert_required!(e.arrow_type)?,
434        )),
435        ExprType::ScalarUdf(e) => {
436            let udf = match &e.fun_definition {
437                Some(buf) => ctx.codec().try_decode_udf(&e.name, buf)?,
438                None => ctx
439                    .task_ctx()
440                    .udf(e.name.as_str())
441                    .or_else(|_| ctx.codec().try_decode_udf(&e.name, &[]))?,
442            };
443            let scalar_fun_def = Arc::clone(&udf);
444
445            let args = parse_physical_exprs(&e.args, ctx, input_schema, proto_converter)?;
446
447            let config_options = Arc::clone(ctx.task_ctx().session_config().options());
448
449            Arc::new(
450                ScalarFunctionExpr::new(
451                    e.name.as_str(),
452                    scalar_fun_def,
453                    args,
454                    Field::new(
455                        &e.return_field_name,
456                        convert_required!(e.return_type)?,
457                        true,
458                    )
459                    .into(),
460                    config_options,
461                )
462                .with_nullable(e.nullable),
463            )
464        }
465        ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new(
466            like_expr.negated,
467            like_expr.case_insensitive,
468            parse_required_physical_expr(
469                like_expr.expr.as_deref(),
470                ctx,
471                "expr",
472                input_schema,
473                proto_converter,
474            )?,
475            parse_required_physical_expr(
476                like_expr.pattern.as_deref(),
477                ctx,
478                "pattern",
479                input_schema,
480                proto_converter,
481            )?,
482        )),
483        ExprType::HashExpr(hash_expr) => {
484            let on_columns = parse_physical_exprs(
485                &hash_expr.on_columns,
486                ctx,
487                input_schema,
488                proto_converter,
489            )?;
490            Arc::new(HashExpr::new(
491                on_columns,
492                SeededRandomState::with_seed(hash_expr.seed0),
493                hash_expr.description.clone(),
494            ))
495        }
496        ExprType::ScalarSubquery(sq) => {
497            let data_type: arrow::datatypes::DataType = sq
498                .data_type
499                .as_ref()
500                .ok_or_else(|| {
501                    proto_error("Missing data_type in PhysicalScalarSubqueryExprNode")
502                })?
503                .try_into()?;
504            let results = ctx.scalar_subquery_results().ok_or_else(|| {
505                proto_error(
506                    "ScalarSubqueryExpr can only be deserialized as part \
507                         of a surrounding ScalarSubqueryExec",
508                )
509            })?;
510            Arc::new(ScalarSubqueryExpr::new(
511                data_type,
512                sq.nullable,
513                SubqueryIndex::new(sq.index as usize),
514                results.clone(),
515            ))
516        }
517        ExprType::DynamicFilter(dynamic_filter) => {
518            let children = parse_physical_exprs(
519                &dynamic_filter.children,
520                ctx,
521                input_schema,
522                proto_converter,
523            )?;
524
525            let remapped_children = if !dynamic_filter.remapped_children.is_empty() {
526                Some(parse_physical_exprs(
527                    &dynamic_filter.remapped_children,
528                    ctx,
529                    input_schema,
530                    proto_converter,
531                )?)
532            } else {
533                None
534            };
535
536            let inner_expr = parse_required_physical_expr(
537                dynamic_filter.inner_expr.as_deref(),
538                ctx,
539                "inner_expr",
540                input_schema,
541                proto_converter,
542            )?;
543
544            let expression_id = proto.expr_id.ok_or_else(|| {
545                proto_error(
546                    "DynamicFilterPhysicalExpr requires PhysicalExprNode.expr_id \
547                     to be set by the serializer",
548                )
549            })?;
550
551            let base_filter: Arc<dyn PhysicalExpr> =
552                Arc::new(DynamicFilterPhysicalExpr::from_parts(
553                    children,
554                    remapped_children,
555                    DynamicFilterInner {
556                        expression_id,
557                        generation: dynamic_filter.generation,
558                        expr: inner_expr,
559                        is_complete: dynamic_filter.is_complete,
560                    },
561                ));
562            base_filter
563        }
564        ExprType::Extension(extension) => {
565            let inputs: Vec<Arc<dyn PhysicalExpr>> = extension
566                .inputs
567                .iter()
568                .map(|e| proto_converter.proto_to_physical_expr(e, input_schema, ctx))
569                .collect::<Result<_>>()?;
570            ctx.codec()
571                .try_decode_expr(extension.expr.as_slice(), &inputs)? as _
572        }
573    };
574
575    Ok(pexpr)
576}
577
578fn parse_required_physical_expr(
579    expr: Option<&protobuf::PhysicalExprNode>,
580    ctx: &PhysicalPlanDecodeContext<'_>,
581    field: &str,
582    input_schema: &Schema,
583    proto_converter: &dyn PhysicalProtoConverterExtension,
584) -> Result<Arc<dyn PhysicalExpr>> {
585    expr.map(|e| proto_converter.proto_to_physical_expr(e, input_schema, ctx))
586        .transpose()?
587        .ok_or_else(|| internal_datafusion_err!("Missing required field {field:?}"))
588}
589
590pub fn parse_protobuf_hash_partitioning(
591    partitioning: Option<&protobuf::PhysicalHashRepartition>,
592    ctx: &PhysicalPlanDecodeContext<'_>,
593    input_schema: &Schema,
594    proto_converter: &dyn PhysicalProtoConverterExtension,
595) -> Result<Option<Partitioning>> {
596    match partitioning {
597        Some(hash_part) => {
598            let expr = parse_physical_exprs(
599                &hash_part.hash_expr,
600                ctx,
601                input_schema,
602                proto_converter,
603            )?;
604
605            Ok(Some(Partitioning::Hash(
606                expr,
607                hash_part.partition_count.try_into().unwrap(),
608            )))
609        }
610        None => Ok(None),
611    }
612}
613
614pub fn parse_protobuf_partitioning(
615    partitioning: Option<&protobuf::Partitioning>,
616    ctx: &PhysicalPlanDecodeContext<'_>,
617    input_schema: &Schema,
618    proto_converter: &dyn PhysicalProtoConverterExtension,
619) -> Result<Option<Partitioning>> {
620    match partitioning {
621        Some(protobuf::Partitioning { partition_method }) => match partition_method {
622            Some(protobuf::partitioning::PartitionMethod::RoundRobin(
623                partition_count,
624            )) => Ok(Some(Partitioning::RoundRobinBatch(
625                *partition_count as usize,
626            ))),
627            Some(protobuf::partitioning::PartitionMethod::Hash(hash_repartition)) => {
628                parse_protobuf_hash_partitioning(
629                    Some(hash_repartition),
630                    ctx,
631                    input_schema,
632                    proto_converter,
633                )
634            }
635            Some(protobuf::partitioning::PartitionMethod::Unknown(partition_count)) => {
636                Ok(Some(Partitioning::UnknownPartitioning(
637                    *partition_count as usize,
638                )))
639            }
640            None => Ok(None),
641        },
642        None => Ok(None),
643    }
644}
645
646pub fn parse_protobuf_file_scan_schema(
647    proto: &protobuf::FileScanExecConf,
648) -> Result<Arc<Schema>> {
649    Ok(Arc::new(convert_required!(proto.schema)?))
650}
651
652/// Parses a TableSchema from protobuf, extracting the file schema and partition columns
653pub fn parse_table_schema_from_proto(
654    proto: &protobuf::FileScanExecConf,
655) -> Result<TableSchema> {
656    let schema: Arc<Schema> = parse_protobuf_file_scan_schema(proto)?;
657
658    // Reacquire the partition column types from the schema before removing them below.
659    let table_partition_cols = proto
660        .table_partition_cols
661        .iter()
662        .map(|col| Ok(Arc::new(schema.field_with_name(col)?.clone())))
663        .collect::<Result<Vec<_>>>()?;
664
665    // Remove partition columns from the schema after recreating table_partition_cols
666    // because the partition columns are not in the file. They are present to allow
667    // the partition column types to be reconstructed after serde.
668    let file_schema = Arc::new(
669        Schema::new(
670            schema
671                .fields()
672                .iter()
673                .filter(|field| !table_partition_cols.contains(field))
674                .cloned()
675                .collect::<Vec<_>>(),
676        )
677        .with_metadata(schema.metadata.clone()),
678    );
679
680    Ok(TableSchema::new(file_schema, table_partition_cols))
681}
682
683pub fn parse_protobuf_file_scan_config(
684    proto: &protobuf::FileScanExecConf,
685    ctx: &PhysicalPlanDecodeContext<'_>,
686    proto_converter: &dyn PhysicalProtoConverterExtension,
687    file_source: Arc<dyn FileSource>,
688) -> Result<FileScanConfig> {
689    let schema: Arc<Schema> = parse_protobuf_file_scan_schema(proto)?;
690
691    let constraints = convert_required!(proto.constraints)?;
692    let statistics = convert_required!(proto.statistics)?;
693
694    let file_groups = proto
695        .file_groups
696        .iter()
697        .map(|f| f.try_into())
698        .collect::<Result<Vec<_>, _>>()?;
699
700    let object_store_url = match proto.object_store_url.is_empty() {
701        false => ObjectStoreUrl::parse(&proto.object_store_url)?,
702        true => ObjectStoreUrl::local_filesystem(),
703    };
704
705    let mut output_ordering = vec![];
706    for node_collection in &proto.output_ordering {
707        let sort_exprs = parse_physical_sort_exprs(
708            &node_collection.physical_sort_expr_nodes,
709            ctx,
710            &schema,
711            proto_converter,
712        )?;
713        output_ordering.extend(LexOrdering::new(sort_exprs));
714    }
715
716    // Parse projection expressions if present and apply to file source
717    let file_source = if let Some(proto_projection_exprs) = &proto.projection_exprs {
718        let projection_exprs: Vec<ProjectionExpr> = proto_projection_exprs
719            .projections
720            .iter()
721            .map(|proto_expr| {
722                let expr = proto_converter.proto_to_physical_expr(
723                    proto_expr.expr.as_ref().ok_or_else(|| {
724                        internal_datafusion_err!("ProjectionExpr missing expr field")
725                    })?,
726                    &schema,
727                    ctx,
728                )?;
729                Ok(ProjectionExpr::new(expr, proto_expr.alias.clone()))
730            })
731            .collect::<Result<Vec<_>>>()?;
732
733        let projection_exprs = ProjectionExprs::new(projection_exprs);
734
735        // Apply projection to file source
736        file_source
737            .try_pushdown_projection(&projection_exprs)?
738            .unwrap_or(file_source)
739    } else {
740        file_source
741    };
742
743    let config = FileScanConfigBuilder::new(object_store_url, file_source)
744        .with_file_groups(file_groups)
745        .with_constraints(constraints)
746        .with_statistics(statistics)
747        .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize))
748        .with_output_ordering(output_ordering)
749        .with_batch_size(proto.batch_size.map(|s| s as usize))
750        .build();
751    Ok(config)
752}
753
754pub fn parse_record_batches(buf: &[u8]) -> Result<Vec<RecordBatch>> {
755    if buf.is_empty() {
756        return Ok(vec![]);
757    }
758    let reader = StreamReader::try_new(buf, None)?;
759    let mut batches = Vec::new();
760    for batch in reader {
761        batches.push(batch?);
762    }
763    Ok(batches)
764}
765
766impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
767    type Error = DataFusionError;
768
769    fn try_from(val: &protobuf::PartitionedFile) -> Result<Self, Self::Error> {
770        let mut pf = PartitionedFile::new_from_meta(ObjectMeta {
771            location: Path::parse(val.path.as_str())
772                .map_err(|e| proto_error(format!("Invalid object_store path: {e}")))?,
773            last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64),
774            size: val.size,
775            e_tag: None,
776            version: None,
777        })
778        .with_partition_values(
779            val.partition_values
780                .iter()
781                .map(|v| v.try_into())
782                .collect::<Result<Vec<_>, _>>()?,
783        );
784        if let Some(range) = val.range.as_ref() {
785            let file_range: FileRange = range.try_into()?;
786            pf = pf.with_range(file_range.start, file_range.end);
787        }
788        if let Some(proto_stats) = val.statistics.as_ref() {
789            pf = pf.with_statistics(Arc::new(proto_stats.try_into()?));
790        }
791        Ok(pf)
792    }
793}
794
795impl TryFrom<&protobuf::FileRange> for FileRange {
796    type Error = DataFusionError;
797
798    fn try_from(value: &protobuf::FileRange) -> Result<Self, Self::Error> {
799        Ok(FileRange {
800            start: value.start,
801            end: value.end,
802        })
803    }
804}
805
806impl TryFrom<&protobuf::FileGroup> for FileGroup {
807    type Error = DataFusionError;
808
809    fn try_from(val: &protobuf::FileGroup) -> Result<Self, Self::Error> {
810        let files = val
811            .files
812            .iter()
813            .map(|f| f.try_into())
814            .collect::<Result<Vec<_>, _>>()?;
815        Ok(FileGroup::new(files))
816    }
817}
818
819impl TryFrom<&protobuf::JsonSink> for JsonSink {
820    type Error = DataFusionError;
821
822    fn try_from(value: &protobuf::JsonSink) -> Result<Self, Self::Error> {
823        Ok(Self::new(
824            convert_required!(value.config)?,
825            convert_required!(value.writer_options)?,
826        ))
827    }
828}
829
830#[cfg(feature = "parquet")]
831impl TryFrom<&protobuf::ParquetSink> for ParquetSink {
832    type Error = DataFusionError;
833
834    fn try_from(value: &protobuf::ParquetSink) -> Result<Self, Self::Error> {
835        Ok(Self::new(
836            convert_required!(value.config)?,
837            convert_required!(value.parquet_options)?,
838        ))
839    }
840}
841
842impl TryFrom<&protobuf::CsvSink> for CsvSink {
843    type Error = DataFusionError;
844
845    fn try_from(value: &protobuf::CsvSink) -> Result<Self, Self::Error> {
846        Ok(Self::new(
847            convert_required!(value.config)?,
848            convert_required!(value.writer_options)?,
849        ))
850    }
851}
852
853impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
854    type Error = DataFusionError;
855
856    fn try_from(conf: &protobuf::FileSinkConfig) -> Result<Self, Self::Error> {
857        let file_group = FileGroup::new(
858            conf.file_groups
859                .iter()
860                .map(|f| f.try_into())
861                .collect::<Result<Vec<_>>>()?,
862        );
863        let table_paths = conf
864            .table_paths
865            .iter()
866            .map(ListingTableUrl::parse)
867            .collect::<Result<Vec<_>>>()?;
868        let table_partition_cols = conf
869            .table_partition_cols
870            .iter()
871            .map(|protobuf::PartitionColumn { name, arrow_type }| {
872                let data_type = convert_required!(arrow_type)?;
873                Ok((name.clone(), data_type))
874            })
875            .collect::<Result<Vec<_>>>()?;
876        let insert_op = match conf.insert_op() {
877            protobuf::InsertOp::Append => InsertOp::Append,
878            protobuf::InsertOp::Overwrite => InsertOp::Overwrite,
879            protobuf::InsertOp::Replace => InsertOp::Replace,
880        };
881        let file_output_mode = match conf.file_output_mode() {
882            protobuf::FileOutputMode::Automatic => {
883                datafusion_datasource::file_sink_config::FileOutputMode::Automatic
884            }
885            protobuf::FileOutputMode::SingleFile => {
886                datafusion_datasource::file_sink_config::FileOutputMode::SingleFile
887            }
888            protobuf::FileOutputMode::Directory => {
889                datafusion_datasource::file_sink_config::FileOutputMode::Directory
890            }
891        };
892        Ok(Self {
893            original_url: String::default(),
894            object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?,
895            file_group,
896            table_paths,
897            output_schema: Arc::new(convert_required!(conf.output_schema)?),
898            table_partition_cols,
899            insert_op,
900            keep_partition_by_columns: conf.keep_partition_by_columns,
901            file_extension: conf.file_extension.clone(),
902            file_output_mode,
903        })
904    }
905}
906
907#[cfg(test)]
908mod tests {
909
910    use super::*;
911
912    #[test]
913    fn partitioned_file_path_roundtrip_percent_encoded() {
914        let path_str = "foo/foo%2Fbar/baz%252Fqux";
915        let pf = PartitionedFile::new_from_meta(ObjectMeta {
916            location: Path::parse(path_str).unwrap(),
917            last_modified: Utc.timestamp_nanos(1_000),
918            size: 42,
919            e_tag: None,
920            version: None,
921        });
922
923        let proto = protobuf::PartitionedFile::try_from(&pf).unwrap();
924        assert_eq!(proto.path, path_str);
925
926        let pf2 = PartitionedFile::try_from(&proto).unwrap();
927        assert_eq!(pf2.object_meta.location.as_ref(), path_str);
928        assert_eq!(pf2.object_meta.location, pf.object_meta.location);
929        assert_eq!(pf2.object_meta.size, pf.object_meta.size);
930        assert_eq!(pf2.object_meta.last_modified, pf.object_meta.last_modified);
931    }
932
933    #[test]
934    fn partitioned_file_from_proto_invalid_path() {
935        let proto = protobuf::PartitionedFile {
936            path: "foo//bar".to_string(),
937            size: 1,
938            last_modified_ns: 0,
939            partition_values: vec![],
940            range: None,
941            statistics: None,
942        };
943
944        let err = PartitionedFile::try_from(&proto).unwrap_err();
945        assert!(err.to_string().contains("Invalid object_store path"));
946    }
947}