datafusion_proto/physical_plan/
from_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Serde code to convert from protocol buffers to Rust data structures.
19
20use std::sync::Arc;
21
22use arrow::array::RecordBatch;
23use arrow::compute::SortOptions;
24use arrow::datatypes::Field;
25use arrow::ipc::reader::StreamReader;
26use chrono::{TimeZone, Utc};
27use datafusion_expr::dml::InsertOp;
28use object_store::ObjectMeta;
29use object_store::path::Path;
30
31use arrow::datatypes::Schema;
32use datafusion_common::{DataFusionError, Result, internal_datafusion_err, not_impl_err};
33use datafusion_datasource::file::FileSource;
34use datafusion_datasource::file_groups::FileGroup;
35use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
36use datafusion_datasource::file_sink_config::FileSinkConfig;
37use datafusion_datasource::{FileRange, ListingTableUrl, PartitionedFile, TableSchema};
38use datafusion_datasource_csv::file_format::CsvSink;
39use datafusion_datasource_json::file_format::JsonSink;
40#[cfg(feature = "parquet")]
41use datafusion_datasource_parquet::file_format::ParquetSink;
42use datafusion_execution::object_store::ObjectStoreUrl;
43use datafusion_execution::{FunctionRegistry, TaskContext};
44use datafusion_expr::WindowFunctionDefinition;
45use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs};
46use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
47use datafusion_physical_plan::expressions::{
48    BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr, Literal,
49    NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, in_list,
50};
51use datafusion_physical_plan::joins::{HashExpr, SeededRandomState};
52use datafusion_physical_plan::windows::{create_window_expr, schema_add_window_field};
53use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
54use datafusion_proto_common::common::proto_error;
55
56use crate::convert_required;
57use crate::logical_plan::{self};
58use crate::protobuf;
59use crate::protobuf::physical_expr_node::ExprType;
60
61use super::PhysicalExtensionCodec;
62
63impl From<&protobuf::PhysicalColumn> for Column {
64    fn from(c: &protobuf::PhysicalColumn) -> Column {
65        Column::new(&c.name, c.index as usize)
66    }
67}
68
69/// Parses a physical sort expression from a protobuf.
70///
71/// # Arguments
72///
73/// * `proto` - Input proto with physical sort expression node
74/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
75/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
76///   when performing type coercion.
77/// * `codec` - An extension codec used to decode custom UDFs.
78pub fn parse_physical_sort_expr(
79    proto: &protobuf::PhysicalSortExprNode,
80    ctx: &TaskContext,
81    input_schema: &Schema,
82    codec: &dyn PhysicalExtensionCodec,
83) -> Result<PhysicalSortExpr> {
84    if let Some(expr) = &proto.expr {
85        let expr = parse_physical_expr(expr.as_ref(), ctx, input_schema, codec)?;
86        let options = SortOptions {
87            descending: !proto.asc,
88            nulls_first: proto.nulls_first,
89        };
90        Ok(PhysicalSortExpr { expr, options })
91    } else {
92        Err(proto_error("Unexpected empty physical expression"))
93    }
94}
95
96/// Parses a physical sort expressions from a protobuf.
97///
98/// # Arguments
99///
100/// * `proto` - Input proto with vector of physical sort expression node
101/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
102/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
103///   when performing type coercion.
104/// * `codec` - An extension codec used to decode custom UDFs.
105pub fn parse_physical_sort_exprs(
106    proto: &[protobuf::PhysicalSortExprNode],
107    ctx: &TaskContext,
108    input_schema: &Schema,
109    codec: &dyn PhysicalExtensionCodec,
110) -> Result<Vec<PhysicalSortExpr>> {
111    proto
112        .iter()
113        .map(|sort_expr| parse_physical_sort_expr(sort_expr, ctx, input_schema, codec))
114        .collect()
115}
116
117/// Parses a physical window expr from a protobuf.
118///
119/// # Arguments
120///
121/// * `proto` - Input proto with physical window expression node.
122/// * `name` - Name of the window expression.
123/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
124/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
125///   when performing type coercion.
126/// * `codec` - An extension codec used to decode custom UDFs.
127pub fn parse_physical_window_expr(
128    proto: &protobuf::PhysicalWindowExprNode,
129    ctx: &TaskContext,
130    input_schema: &Schema,
131    codec: &dyn PhysicalExtensionCodec,
132) -> Result<Arc<dyn WindowExpr>> {
133    let window_node_expr = parse_physical_exprs(&proto.args, ctx, input_schema, codec)?;
134    let partition_by =
135        parse_physical_exprs(&proto.partition_by, ctx, input_schema, codec)?;
136
137    let order_by = parse_physical_sort_exprs(&proto.order_by, ctx, input_schema, codec)?;
138
139    let window_frame = proto
140        .window_frame
141        .as_ref()
142        .map(|wf| wf.clone().try_into())
143        .transpose()
144        .map_err(|e| internal_datafusion_err!("{e}"))?
145        .ok_or_else(|| {
146            internal_datafusion_err!("Missing required field 'window_frame' in protobuf")
147        })?;
148
149    let fun = if let Some(window_func) = proto.window_function.as_ref() {
150        match window_func {
151            protobuf::physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(udaf_name) => {
152                WindowFunctionDefinition::AggregateUDF(match &proto.fun_definition {
153                    Some(buf) => codec.try_decode_udaf(udaf_name, buf)?,
154                    None => ctx.udaf(udaf_name).or_else(|_| codec.try_decode_udaf(udaf_name, &[]))?,
155                })
156            }
157            protobuf::physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(udwf_name) => {
158                WindowFunctionDefinition::WindowUDF(match &proto.fun_definition {
159                    Some(buf) => codec.try_decode_udwf(udwf_name, buf)?,
160                    None => ctx.udwf(udwf_name).or_else(|_| codec.try_decode_udwf(udwf_name, &[]))?
161                })
162            }
163        }
164    } else {
165        return Err(proto_error("Missing required field in protobuf"));
166    };
167
168    let name = proto.name.clone();
169    // TODO: Remove extended_schema if functions are all UDAF
170    let extended_schema =
171        schema_add_window_field(&window_node_expr, input_schema, &fun, &name)?;
172    create_window_expr(
173        &fun,
174        name,
175        &window_node_expr,
176        &partition_by,
177        &order_by,
178        Arc::new(window_frame),
179        extended_schema,
180        proto.ignore_nulls,
181        proto.distinct,
182        None,
183    )
184}
185
186pub fn parse_physical_exprs<'a, I>(
187    protos: I,
188    ctx: &TaskContext,
189    input_schema: &Schema,
190    codec: &dyn PhysicalExtensionCodec,
191) -> Result<Vec<Arc<dyn PhysicalExpr>>>
192where
193    I: IntoIterator<Item = &'a protobuf::PhysicalExprNode>,
194{
195    protos
196        .into_iter()
197        .map(|p| parse_physical_expr(p, ctx, input_schema, codec))
198        .collect::<Result<Vec<_>>>()
199}
200
201/// Parses a physical expression from a protobuf.
202///
203/// # Arguments
204///
205/// * `proto` - Input proto with physical expression node
206/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
207/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
208///   when performing type coercion.
209/// * `codec` - An extension codec used to decode custom UDFs.
210pub fn parse_physical_expr(
211    proto: &protobuf::PhysicalExprNode,
212    ctx: &TaskContext,
213    input_schema: &Schema,
214    codec: &dyn PhysicalExtensionCodec,
215) -> Result<Arc<dyn PhysicalExpr>> {
216    let expr_type = proto
217        .expr_type
218        .as_ref()
219        .ok_or_else(|| proto_error("Unexpected empty physical expression"))?;
220
221    let pexpr: Arc<dyn PhysicalExpr> = match expr_type {
222        ExprType::Column(c) => {
223            let pcol: Column = c.into();
224            Arc::new(pcol)
225        }
226        ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
227        ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
228        ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new(
229            parse_required_physical_expr(
230                binary_expr.l.as_deref(),
231                ctx,
232                "left",
233                input_schema,
234                codec,
235            )?,
236            logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?,
237            parse_required_physical_expr(
238                binary_expr.r.as_deref(),
239                ctx,
240                "right",
241                input_schema,
242                codec,
243            )?,
244        )),
245        ExprType::AggregateExpr(_) => {
246            return not_impl_err!(
247                "Cannot convert aggregate expr node to physical expression"
248            );
249        }
250        ExprType::WindowExpr(_) => {
251            return not_impl_err!(
252                "Cannot convert window expr node to physical expression"
253            );
254        }
255        ExprType::Sort(_) => {
256            return not_impl_err!("Cannot convert sort expr node to physical expression");
257        }
258        ExprType::IsNullExpr(e) => {
259            Arc::new(IsNullExpr::new(parse_required_physical_expr(
260                e.expr.as_deref(),
261                ctx,
262                "expr",
263                input_schema,
264                codec,
265            )?))
266        }
267        ExprType::IsNotNullExpr(e) => {
268            Arc::new(IsNotNullExpr::new(parse_required_physical_expr(
269                e.expr.as_deref(),
270                ctx,
271                "expr",
272                input_schema,
273                codec,
274            )?))
275        }
276        ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr(
277            e.expr.as_deref(),
278            ctx,
279            "expr",
280            input_schema,
281            codec,
282        )?)),
283        ExprType::Negative(e) => {
284            Arc::new(NegativeExpr::new(parse_required_physical_expr(
285                e.expr.as_deref(),
286                ctx,
287                "expr",
288                input_schema,
289                codec,
290            )?))
291        }
292        ExprType::InList(e) => in_list(
293            parse_required_physical_expr(
294                e.expr.as_deref(),
295                ctx,
296                "expr",
297                input_schema,
298                codec,
299            )?,
300            parse_physical_exprs(&e.list, ctx, input_schema, codec)?,
301            &e.negated,
302            input_schema,
303        )?,
304        ExprType::Case(e) => Arc::new(CaseExpr::try_new(
305            e.expr
306                .as_ref()
307                .map(|e| parse_physical_expr(e.as_ref(), ctx, input_schema, codec))
308                .transpose()?,
309            e.when_then_expr
310                .iter()
311                .map(|e| {
312                    Ok((
313                        parse_required_physical_expr(
314                            e.when_expr.as_ref(),
315                            ctx,
316                            "when_expr",
317                            input_schema,
318                            codec,
319                        )?,
320                        parse_required_physical_expr(
321                            e.then_expr.as_ref(),
322                            ctx,
323                            "then_expr",
324                            input_schema,
325                            codec,
326                        )?,
327                    ))
328                })
329                .collect::<Result<Vec<_>>>()?,
330            e.else_expr
331                .as_ref()
332                .map(|e| parse_physical_expr(e.as_ref(), ctx, input_schema, codec))
333                .transpose()?,
334        )?),
335        ExprType::Cast(e) => Arc::new(CastExpr::new(
336            parse_required_physical_expr(
337                e.expr.as_deref(),
338                ctx,
339                "expr",
340                input_schema,
341                codec,
342            )?,
343            convert_required!(e.arrow_type)?,
344            None,
345        )),
346        ExprType::TryCast(e) => Arc::new(TryCastExpr::new(
347            parse_required_physical_expr(
348                e.expr.as_deref(),
349                ctx,
350                "expr",
351                input_schema,
352                codec,
353            )?,
354            convert_required!(e.arrow_type)?,
355        )),
356        ExprType::ScalarUdf(e) => {
357            let udf = match &e.fun_definition {
358                Some(buf) => codec.try_decode_udf(&e.name, buf)?,
359                None => ctx
360                    .udf(e.name.as_str())
361                    .or_else(|_| codec.try_decode_udf(&e.name, &[]))?,
362            };
363            let scalar_fun_def = Arc::clone(&udf);
364
365            let args = parse_physical_exprs(&e.args, ctx, input_schema, codec)?;
366
367            let config_options = Arc::clone(ctx.session_config().options());
368
369            Arc::new(
370                ScalarFunctionExpr::new(
371                    e.name.as_str(),
372                    scalar_fun_def,
373                    args,
374                    Field::new(
375                        &e.return_field_name,
376                        convert_required!(e.return_type)?,
377                        true,
378                    )
379                    .into(),
380                    config_options,
381                )
382                .with_nullable(e.nullable),
383            )
384        }
385        ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new(
386            like_expr.negated,
387            like_expr.case_insensitive,
388            parse_required_physical_expr(
389                like_expr.expr.as_deref(),
390                ctx,
391                "expr",
392                input_schema,
393                codec,
394            )?,
395            parse_required_physical_expr(
396                like_expr.pattern.as_deref(),
397                ctx,
398                "pattern",
399                input_schema,
400                codec,
401            )?,
402        )),
403        ExprType::HashExpr(hash_expr) => {
404            let on_columns =
405                parse_physical_exprs(&hash_expr.on_columns, ctx, input_schema, codec)?;
406            Arc::new(HashExpr::new(
407                on_columns,
408                SeededRandomState::with_seeds(
409                    hash_expr.seed0,
410                    hash_expr.seed1,
411                    hash_expr.seed2,
412                    hash_expr.seed3,
413                ),
414                hash_expr.description.clone(),
415            ))
416        }
417        ExprType::Extension(extension) => {
418            let inputs: Vec<Arc<dyn PhysicalExpr>> = extension
419                .inputs
420                .iter()
421                .map(|e| parse_physical_expr(e, ctx, input_schema, codec))
422                .collect::<Result<_>>()?;
423            (codec.try_decode_expr(extension.expr.as_slice(), &inputs)?) as _
424        }
425    };
426
427    Ok(pexpr)
428}
429
430fn parse_required_physical_expr(
431    expr: Option<&protobuf::PhysicalExprNode>,
432    ctx: &TaskContext,
433    field: &str,
434    input_schema: &Schema,
435    codec: &dyn PhysicalExtensionCodec,
436) -> Result<Arc<dyn PhysicalExpr>> {
437    expr.map(|e| parse_physical_expr(e, ctx, input_schema, codec))
438        .transpose()?
439        .ok_or_else(|| internal_datafusion_err!("Missing required field {field:?}"))
440}
441
442pub fn parse_protobuf_hash_partitioning(
443    partitioning: Option<&protobuf::PhysicalHashRepartition>,
444    ctx: &TaskContext,
445    input_schema: &Schema,
446    codec: &dyn PhysicalExtensionCodec,
447) -> Result<Option<Partitioning>> {
448    match partitioning {
449        Some(hash_part) => {
450            let expr =
451                parse_physical_exprs(&hash_part.hash_expr, ctx, input_schema, codec)?;
452
453            Ok(Some(Partitioning::Hash(
454                expr,
455                hash_part.partition_count.try_into().unwrap(),
456            )))
457        }
458        None => Ok(None),
459    }
460}
461
462pub fn parse_protobuf_partitioning(
463    partitioning: Option<&protobuf::Partitioning>,
464    ctx: &TaskContext,
465    input_schema: &Schema,
466    codec: &dyn PhysicalExtensionCodec,
467) -> Result<Option<Partitioning>> {
468    match partitioning {
469        Some(protobuf::Partitioning { partition_method }) => match partition_method {
470            Some(protobuf::partitioning::PartitionMethod::RoundRobin(
471                partition_count,
472            )) => Ok(Some(Partitioning::RoundRobinBatch(
473                *partition_count as usize,
474            ))),
475            Some(protobuf::partitioning::PartitionMethod::Hash(hash_repartition)) => {
476                parse_protobuf_hash_partitioning(
477                    Some(hash_repartition),
478                    ctx,
479                    input_schema,
480                    codec,
481                )
482            }
483            Some(protobuf::partitioning::PartitionMethod::Unknown(partition_count)) => {
484                Ok(Some(Partitioning::UnknownPartitioning(
485                    *partition_count as usize,
486                )))
487            }
488            None => Ok(None),
489        },
490        None => Ok(None),
491    }
492}
493
494pub fn parse_protobuf_file_scan_schema(
495    proto: &protobuf::FileScanExecConf,
496) -> Result<Arc<Schema>> {
497    Ok(Arc::new(convert_required!(proto.schema)?))
498}
499
500/// Parses a TableSchema from protobuf, extracting the file schema and partition columns
501pub fn parse_table_schema_from_proto(
502    proto: &protobuf::FileScanExecConf,
503) -> Result<TableSchema> {
504    let schema: Arc<Schema> = parse_protobuf_file_scan_schema(proto)?;
505
506    // Reacquire the partition column types from the schema before removing them below.
507    let table_partition_cols = proto
508        .table_partition_cols
509        .iter()
510        .map(|col| Ok(Arc::new(schema.field_with_name(col)?.clone())))
511        .collect::<Result<Vec<_>>>()?;
512
513    // Remove partition columns from the schema after recreating table_partition_cols
514    // because the partition columns are not in the file. They are present to allow
515    // the partition column types to be reconstructed after serde.
516    let file_schema = Arc::new(
517        Schema::new(
518            schema
519                .fields()
520                .iter()
521                .filter(|field| !table_partition_cols.contains(field))
522                .cloned()
523                .collect::<Vec<_>>(),
524        )
525        .with_metadata(schema.metadata.clone()),
526    );
527
528    Ok(TableSchema::new(file_schema, table_partition_cols))
529}
530
531pub fn parse_protobuf_file_scan_config(
532    proto: &protobuf::FileScanExecConf,
533    ctx: &TaskContext,
534    codec: &dyn PhysicalExtensionCodec,
535    file_source: Arc<dyn FileSource>,
536) -> Result<FileScanConfig> {
537    let schema: Arc<Schema> = parse_protobuf_file_scan_schema(proto)?;
538
539    let constraints = convert_required!(proto.constraints)?;
540    let statistics = convert_required!(proto.statistics)?;
541
542    let file_groups = proto
543        .file_groups
544        .iter()
545        .map(|f| f.try_into())
546        .collect::<Result<Vec<_>, _>>()?;
547
548    let object_store_url = match proto.object_store_url.is_empty() {
549        false => ObjectStoreUrl::parse(&proto.object_store_url)?,
550        true => ObjectStoreUrl::local_filesystem(),
551    };
552
553    let mut output_ordering = vec![];
554    for node_collection in &proto.output_ordering {
555        let sort_exprs = parse_physical_sort_exprs(
556            &node_collection.physical_sort_expr_nodes,
557            ctx,
558            &schema,
559            codec,
560        )?;
561        output_ordering.extend(LexOrdering::new(sort_exprs));
562    }
563
564    // Parse projection expressions if present and apply to file source
565    let file_source = if let Some(proto_projection_exprs) = &proto.projection_exprs {
566        let projection_exprs: Vec<ProjectionExpr> = proto_projection_exprs
567            .projections
568            .iter()
569            .map(|proto_expr| {
570                let expr = parse_physical_expr(
571                    proto_expr.expr.as_ref().ok_or_else(|| {
572                        internal_datafusion_err!("ProjectionExpr missing expr field")
573                    })?,
574                    ctx,
575                    &schema,
576                    codec,
577                )?;
578                Ok(ProjectionExpr::new(expr, proto_expr.alias.clone()))
579            })
580            .collect::<Result<Vec<_>>>()?;
581
582        let projection_exprs = ProjectionExprs::new(projection_exprs);
583
584        // Apply projection to file source
585        file_source
586            .try_pushdown_projection(&projection_exprs)?
587            .unwrap_or(file_source)
588    } else {
589        file_source
590    };
591
592    let config = FileScanConfigBuilder::new(object_store_url, file_source)
593        .with_file_groups(file_groups)
594        .with_constraints(constraints)
595        .with_statistics(statistics)
596        .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize))
597        .with_output_ordering(output_ordering)
598        .with_batch_size(proto.batch_size.map(|s| s as usize))
599        .build();
600    Ok(config)
601}
602
603pub fn parse_record_batches(buf: &[u8]) -> Result<Vec<RecordBatch>> {
604    if buf.is_empty() {
605        return Ok(vec![]);
606    }
607    let reader = StreamReader::try_new(buf, None)?;
608    let mut batches = Vec::new();
609    for batch in reader {
610        batches.push(batch?);
611    }
612    Ok(batches)
613}
614
615impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
616    type Error = DataFusionError;
617
618    fn try_from(val: &protobuf::PartitionedFile) -> Result<Self, Self::Error> {
619        Ok(PartitionedFile {
620            object_meta: ObjectMeta {
621                location: Path::parse(val.path.as_str()).map_err(|e| {
622                    proto_error(format!("Invalid object_store path: {e}"))
623                })?,
624                last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64),
625                size: val.size,
626                e_tag: None,
627                version: None,
628            },
629            partition_values: val
630                .partition_values
631                .iter()
632                .map(|v| v.try_into())
633                .collect::<Result<Vec<_>, _>>()?,
634            range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
635            statistics: val
636                .statistics
637                .as_ref()
638                .map(|v| v.try_into().map(Arc::new))
639                .transpose()?,
640            extensions: None,
641            metadata_size_hint: None,
642        })
643    }
644}
645
646impl TryFrom<&protobuf::FileRange> for FileRange {
647    type Error = DataFusionError;
648
649    fn try_from(value: &protobuf::FileRange) -> Result<Self, Self::Error> {
650        Ok(FileRange {
651            start: value.start,
652            end: value.end,
653        })
654    }
655}
656
657impl TryFrom<&protobuf::FileGroup> for FileGroup {
658    type Error = DataFusionError;
659
660    fn try_from(val: &protobuf::FileGroup) -> Result<Self, Self::Error> {
661        let files = val
662            .files
663            .iter()
664            .map(|f| f.try_into())
665            .collect::<Result<Vec<_>, _>>()?;
666        Ok(FileGroup::new(files))
667    }
668}
669
670impl TryFrom<&protobuf::JsonSink> for JsonSink {
671    type Error = DataFusionError;
672
673    fn try_from(value: &protobuf::JsonSink) -> Result<Self, Self::Error> {
674        Ok(Self::new(
675            convert_required!(value.config)?,
676            convert_required!(value.writer_options)?,
677        ))
678    }
679}
680
681#[cfg(feature = "parquet")]
682impl TryFrom<&protobuf::ParquetSink> for ParquetSink {
683    type Error = DataFusionError;
684
685    fn try_from(value: &protobuf::ParquetSink) -> Result<Self, Self::Error> {
686        Ok(Self::new(
687            convert_required!(value.config)?,
688            convert_required!(value.parquet_options)?,
689        ))
690    }
691}
692
693impl TryFrom<&protobuf::CsvSink> for CsvSink {
694    type Error = DataFusionError;
695
696    fn try_from(value: &protobuf::CsvSink) -> Result<Self, Self::Error> {
697        Ok(Self::new(
698            convert_required!(value.config)?,
699            convert_required!(value.writer_options)?,
700        ))
701    }
702}
703
704impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
705    type Error = DataFusionError;
706
707    fn try_from(conf: &protobuf::FileSinkConfig) -> Result<Self, Self::Error> {
708        let file_group = FileGroup::new(
709            conf.file_groups
710                .iter()
711                .map(|f| f.try_into())
712                .collect::<Result<Vec<_>>>()?,
713        );
714        let table_paths = conf
715            .table_paths
716            .iter()
717            .map(ListingTableUrl::parse)
718            .collect::<Result<Vec<_>>>()?;
719        let table_partition_cols = conf
720            .table_partition_cols
721            .iter()
722            .map(|protobuf::PartitionColumn { name, arrow_type }| {
723                let data_type = convert_required!(arrow_type)?;
724                Ok((name.clone(), data_type))
725            })
726            .collect::<Result<Vec<_>>>()?;
727        let insert_op = match conf.insert_op() {
728            protobuf::InsertOp::Append => InsertOp::Append,
729            protobuf::InsertOp::Overwrite => InsertOp::Overwrite,
730            protobuf::InsertOp::Replace => InsertOp::Replace,
731        };
732        Ok(Self {
733            original_url: String::default(),
734            object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?,
735            file_group,
736            table_paths,
737            output_schema: Arc::new(convert_required!(conf.output_schema)?),
738            table_partition_cols,
739            insert_op,
740            keep_partition_by_columns: conf.keep_partition_by_columns,
741            file_extension: conf.file_extension.clone(),
742        })
743    }
744}
745
746#[cfg(test)]
747mod tests {
748    use super::*;
749    use chrono::{TimeZone, Utc};
750    use datafusion_datasource::PartitionedFile;
751    use object_store::ObjectMeta;
752    use object_store::path::Path;
753
754    #[test]
755    fn partitioned_file_path_roundtrip_percent_encoded() {
756        let path_str = "foo/foo%2Fbar/baz%252Fqux";
757        let pf = PartitionedFile {
758            object_meta: ObjectMeta {
759                location: Path::parse(path_str).unwrap(),
760                last_modified: Utc.timestamp_nanos(1_000),
761                size: 42,
762                e_tag: None,
763                version: None,
764            },
765            partition_values: vec![],
766            range: None,
767            statistics: None,
768            extensions: None,
769            metadata_size_hint: None,
770        };
771
772        let proto = protobuf::PartitionedFile::try_from(&pf).unwrap();
773        assert_eq!(proto.path, path_str);
774
775        let pf2 = PartitionedFile::try_from(&proto).unwrap();
776        assert_eq!(pf2.object_meta.location.as_ref(), path_str);
777        assert_eq!(pf2.object_meta.location, pf.object_meta.location);
778        assert_eq!(pf2.object_meta.size, pf.object_meta.size);
779        assert_eq!(pf2.object_meta.last_modified, pf.object_meta.last_modified);
780    }
781
782    #[test]
783    fn partitioned_file_from_proto_invalid_path() {
784        let proto = protobuf::PartitionedFile {
785            path: "foo//bar".to_string(),
786            size: 1,
787            last_modified_ns: 0,
788            partition_values: vec![],
789            range: None,
790            statistics: None,
791        };
792
793        let err = PartitionedFile::try_from(&proto).unwrap_err();
794        assert!(err.to_string().contains("Invalid object_store path"));
795    }
796}