datafusion_proto/physical_plan/
from_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Serde code to convert from protocol buffers to Rust data structures.
19
20use std::sync::Arc;
21
22use arrow::compute::SortOptions;
23use chrono::{TimeZone, Utc};
24use datafusion_expr::dml::InsertOp;
25use object_store::path::Path;
26use object_store::ObjectMeta;
27
28use datafusion::arrow::datatypes::Schema;
29use datafusion::datasource::file_format::csv::CsvSink;
30use datafusion::datasource::file_format::json::JsonSink;
31#[cfg(feature = "parquet")]
32use datafusion::datasource::file_format::parquet::ParquetSink;
33use datafusion::datasource::listing::{FileRange, ListingTableUrl, PartitionedFile};
34use datafusion::datasource::object_store::ObjectStoreUrl;
35use datafusion::datasource::physical_plan::{
36    FileGroup, FileScanConfig, FileScanConfigBuilder, FileSinkConfig, FileSource,
37};
38use datafusion::execution::FunctionRegistry;
39use datafusion::logical_expr::WindowFunctionDefinition;
40use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
41use datafusion::physical_plan::expressions::{
42    in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr,
43    Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
44};
45use datafusion::physical_plan::windows::{create_window_expr, schema_add_window_field};
46use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
47use datafusion_common::{not_impl_err, DataFusionError, Result};
48use datafusion_proto_common::common::proto_error;
49
50use crate::convert_required;
51use crate::logical_plan::{self};
52use crate::protobuf;
53use crate::protobuf::physical_expr_node::ExprType;
54
55use super::PhysicalExtensionCodec;
56
57impl From<&protobuf::PhysicalColumn> for Column {
58    fn from(c: &protobuf::PhysicalColumn) -> Column {
59        Column::new(&c.name, c.index as usize)
60    }
61}
62
63/// Parses a physical sort expression from a protobuf.
64///
65/// # Arguments
66///
67/// * `proto` - Input proto with physical sort expression node
68/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
69/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
70///   when performing type coercion.
71/// * `codec` - An extension codec used to decode custom UDFs.
72pub fn parse_physical_sort_expr(
73    proto: &protobuf::PhysicalSortExprNode,
74    registry: &dyn FunctionRegistry,
75    input_schema: &Schema,
76    codec: &dyn PhysicalExtensionCodec,
77) -> Result<PhysicalSortExpr> {
78    if let Some(expr) = &proto.expr {
79        let expr = parse_physical_expr(expr.as_ref(), registry, input_schema, codec)?;
80        let options = SortOptions {
81            descending: !proto.asc,
82            nulls_first: proto.nulls_first,
83        };
84        Ok(PhysicalSortExpr { expr, options })
85    } else {
86        Err(proto_error("Unexpected empty physical expression"))
87    }
88}
89
90/// Parses a physical sort expressions from a protobuf.
91///
92/// # Arguments
93///
94/// * `proto` - Input proto with vector of physical sort expression node
95/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
96/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
97///   when performing type coercion.
98/// * `codec` - An extension codec used to decode custom UDFs.
99pub fn parse_physical_sort_exprs(
100    proto: &[protobuf::PhysicalSortExprNode],
101    registry: &dyn FunctionRegistry,
102    input_schema: &Schema,
103    codec: &dyn PhysicalExtensionCodec,
104) -> Result<LexOrdering> {
105    proto
106        .iter()
107        .map(|sort_expr| {
108            parse_physical_sort_expr(sort_expr, registry, input_schema, codec)
109        })
110        .collect::<Result<LexOrdering>>()
111}
112
113/// Parses a physical window expr from a protobuf.
114///
115/// # Arguments
116///
117/// * `proto` - Input proto with physical window expression node.
118/// * `name` - Name of the window expression.
119/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
120/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
121///   when performing type coercion.
122/// * `codec` - An extension codec used to decode custom UDFs.
123pub fn parse_physical_window_expr(
124    proto: &protobuf::PhysicalWindowExprNode,
125    registry: &dyn FunctionRegistry,
126    input_schema: &Schema,
127    codec: &dyn PhysicalExtensionCodec,
128) -> Result<Arc<dyn WindowExpr>> {
129    let window_node_expr =
130        parse_physical_exprs(&proto.args, registry, input_schema, codec)?;
131    let partition_by =
132        parse_physical_exprs(&proto.partition_by, registry, input_schema, codec)?;
133
134    let order_by =
135        parse_physical_sort_exprs(&proto.order_by, registry, input_schema, codec)?;
136
137    let window_frame = proto
138        .window_frame
139        .as_ref()
140        .map(|wf| wf.clone().try_into())
141        .transpose()
142        .map_err(|e| DataFusionError::Internal(format!("{e}")))?
143        .ok_or_else(|| {
144            DataFusionError::Internal(
145                "Missing required field 'window_frame' in protobuf".to_string(),
146            )
147        })?;
148
149    let fun = if let Some(window_func) = proto.window_function.as_ref() {
150        match window_func {
151            protobuf::physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(udaf_name) => {
152                WindowFunctionDefinition::AggregateUDF(match &proto.fun_definition {
153                    Some(buf) => codec.try_decode_udaf(udaf_name, buf)?,
154                    None => registry.udaf(udaf_name)?
155                })
156            }
157            protobuf::physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(udwf_name) => {
158                WindowFunctionDefinition::WindowUDF(match &proto.fun_definition {
159                    Some(buf) => codec.try_decode_udwf(udwf_name, buf)?,
160                    None => registry.udwf(udwf_name)?
161                })
162            }
163        }
164    } else {
165        return Err(proto_error("Missing required field in protobuf"));
166    };
167
168    let name = proto.name.clone();
169    // TODO: Remove extended_schema if functions are all UDAF
170    let extended_schema =
171        schema_add_window_field(&window_node_expr, input_schema, &fun, &name)?;
172    create_window_expr(
173        &fun,
174        name,
175        &window_node_expr,
176        &partition_by,
177        order_by.as_ref(),
178        Arc::new(window_frame),
179        &extended_schema,
180        false,
181    )
182}
183
184pub fn parse_physical_exprs<'a, I>(
185    protos: I,
186    registry: &dyn FunctionRegistry,
187    input_schema: &Schema,
188    codec: &dyn PhysicalExtensionCodec,
189) -> Result<Vec<Arc<dyn PhysicalExpr>>>
190where
191    I: IntoIterator<Item = &'a protobuf::PhysicalExprNode>,
192{
193    protos
194        .into_iter()
195        .map(|p| parse_physical_expr(p, registry, input_schema, codec))
196        .collect::<Result<Vec<_>>>()
197}
198
199/// Parses a physical expression from a protobuf.
200///
201/// # Arguments
202///
203/// * `proto` - Input proto with physical expression node
204/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
205/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
206///   when performing type coercion.
207/// * `codec` - An extension codec used to decode custom UDFs.
208pub fn parse_physical_expr(
209    proto: &protobuf::PhysicalExprNode,
210    registry: &dyn FunctionRegistry,
211    input_schema: &Schema,
212    codec: &dyn PhysicalExtensionCodec,
213) -> Result<Arc<dyn PhysicalExpr>> {
214    let expr_type = proto
215        .expr_type
216        .as_ref()
217        .ok_or_else(|| proto_error("Unexpected empty physical expression"))?;
218
219    let pexpr: Arc<dyn PhysicalExpr> = match expr_type {
220        ExprType::Column(c) => {
221            let pcol: Column = c.into();
222            Arc::new(pcol)
223        }
224        ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
225        ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
226        ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new(
227            parse_required_physical_expr(
228                binary_expr.l.as_deref(),
229                registry,
230                "left",
231                input_schema,
232                codec,
233            )?,
234            logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?,
235            parse_required_physical_expr(
236                binary_expr.r.as_deref(),
237                registry,
238                "right",
239                input_schema,
240                codec,
241            )?,
242        )),
243        ExprType::AggregateExpr(_) => {
244            return not_impl_err!(
245                "Cannot convert aggregate expr node to physical expression"
246            );
247        }
248        ExprType::WindowExpr(_) => {
249            return not_impl_err!(
250                "Cannot convert window expr node to physical expression"
251            );
252        }
253        ExprType::Sort(_) => {
254            return not_impl_err!("Cannot convert sort expr node to physical expression");
255        }
256        ExprType::IsNullExpr(e) => {
257            Arc::new(IsNullExpr::new(parse_required_physical_expr(
258                e.expr.as_deref(),
259                registry,
260                "expr",
261                input_schema,
262                codec,
263            )?))
264        }
265        ExprType::IsNotNullExpr(e) => {
266            Arc::new(IsNotNullExpr::new(parse_required_physical_expr(
267                e.expr.as_deref(),
268                registry,
269                "expr",
270                input_schema,
271                codec,
272            )?))
273        }
274        ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr(
275            e.expr.as_deref(),
276            registry,
277            "expr",
278            input_schema,
279            codec,
280        )?)),
281        ExprType::Negative(e) => {
282            Arc::new(NegativeExpr::new(parse_required_physical_expr(
283                e.expr.as_deref(),
284                registry,
285                "expr",
286                input_schema,
287                codec,
288            )?))
289        }
290        ExprType::InList(e) => in_list(
291            parse_required_physical_expr(
292                e.expr.as_deref(),
293                registry,
294                "expr",
295                input_schema,
296                codec,
297            )?,
298            parse_physical_exprs(&e.list, registry, input_schema, codec)?,
299            &e.negated,
300            input_schema,
301        )?,
302        ExprType::Case(e) => Arc::new(CaseExpr::try_new(
303            e.expr
304                .as_ref()
305                .map(|e| parse_physical_expr(e.as_ref(), registry, input_schema, codec))
306                .transpose()?,
307            e.when_then_expr
308                .iter()
309                .map(|e| {
310                    Ok((
311                        parse_required_physical_expr(
312                            e.when_expr.as_ref(),
313                            registry,
314                            "when_expr",
315                            input_schema,
316                            codec,
317                        )?,
318                        parse_required_physical_expr(
319                            e.then_expr.as_ref(),
320                            registry,
321                            "then_expr",
322                            input_schema,
323                            codec,
324                        )?,
325                    ))
326                })
327                .collect::<Result<Vec<_>>>()?,
328            e.else_expr
329                .as_ref()
330                .map(|e| parse_physical_expr(e.as_ref(), registry, input_schema, codec))
331                .transpose()?,
332        )?),
333        ExprType::Cast(e) => Arc::new(CastExpr::new(
334            parse_required_physical_expr(
335                e.expr.as_deref(),
336                registry,
337                "expr",
338                input_schema,
339                codec,
340            )?,
341            convert_required!(e.arrow_type)?,
342            None,
343        )),
344        ExprType::TryCast(e) => Arc::new(TryCastExpr::new(
345            parse_required_physical_expr(
346                e.expr.as_deref(),
347                registry,
348                "expr",
349                input_schema,
350                codec,
351            )?,
352            convert_required!(e.arrow_type)?,
353        )),
354        ExprType::ScalarUdf(e) => {
355            let udf = match &e.fun_definition {
356                Some(buf) => codec.try_decode_udf(&e.name, buf)?,
357                None => registry.udf(e.name.as_str())?,
358            };
359            let scalar_fun_def = Arc::clone(&udf);
360
361            let args = parse_physical_exprs(&e.args, registry, input_schema, codec)?;
362
363            Arc::new(
364                ScalarFunctionExpr::new(
365                    e.name.as_str(),
366                    scalar_fun_def,
367                    args,
368                    convert_required!(e.return_type)?,
369                )
370                .with_nullable(e.nullable),
371            )
372        }
373        ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new(
374            like_expr.negated,
375            like_expr.case_insensitive,
376            parse_required_physical_expr(
377                like_expr.expr.as_deref(),
378                registry,
379                "expr",
380                input_schema,
381                codec,
382            )?,
383            parse_required_physical_expr(
384                like_expr.pattern.as_deref(),
385                registry,
386                "pattern",
387                input_schema,
388                codec,
389            )?,
390        )),
391        ExprType::Extension(extension) => {
392            let inputs: Vec<Arc<dyn PhysicalExpr>> = extension
393                .inputs
394                .iter()
395                .map(|e| parse_physical_expr(e, registry, input_schema, codec))
396                .collect::<Result<_>>()?;
397            (codec.try_decode_expr(extension.expr.as_slice(), &inputs)?) as _
398        }
399    };
400
401    Ok(pexpr)
402}
403
404fn parse_required_physical_expr(
405    expr: Option<&protobuf::PhysicalExprNode>,
406    registry: &dyn FunctionRegistry,
407    field: &str,
408    input_schema: &Schema,
409    codec: &dyn PhysicalExtensionCodec,
410) -> Result<Arc<dyn PhysicalExpr>> {
411    expr.map(|e| parse_physical_expr(e, registry, input_schema, codec))
412        .transpose()?
413        .ok_or_else(|| {
414            DataFusionError::Internal(format!("Missing required field {field:?}"))
415        })
416}
417
418pub fn parse_protobuf_hash_partitioning(
419    partitioning: Option<&protobuf::PhysicalHashRepartition>,
420    registry: &dyn FunctionRegistry,
421    input_schema: &Schema,
422    codec: &dyn PhysicalExtensionCodec,
423) -> Result<Option<Partitioning>> {
424    match partitioning {
425        Some(hash_part) => {
426            let expr = parse_physical_exprs(
427                &hash_part.hash_expr,
428                registry,
429                input_schema,
430                codec,
431            )?;
432
433            Ok(Some(Partitioning::Hash(
434                expr,
435                hash_part.partition_count.try_into().unwrap(),
436            )))
437        }
438        None => Ok(None),
439    }
440}
441
442pub fn parse_protobuf_partitioning(
443    partitioning: Option<&protobuf::Partitioning>,
444    registry: &dyn FunctionRegistry,
445    input_schema: &Schema,
446    codec: &dyn PhysicalExtensionCodec,
447) -> Result<Option<Partitioning>> {
448    match partitioning {
449        Some(protobuf::Partitioning { partition_method }) => match partition_method {
450            Some(protobuf::partitioning::PartitionMethod::RoundRobin(
451                partition_count,
452            )) => Ok(Some(Partitioning::RoundRobinBatch(
453                *partition_count as usize,
454            ))),
455            Some(protobuf::partitioning::PartitionMethod::Hash(hash_repartition)) => {
456                parse_protobuf_hash_partitioning(
457                    Some(hash_repartition),
458                    registry,
459                    input_schema,
460                    codec,
461                )
462            }
463            Some(protobuf::partitioning::PartitionMethod::Unknown(partition_count)) => {
464                Ok(Some(Partitioning::UnknownPartitioning(
465                    *partition_count as usize,
466                )))
467            }
468            None => Ok(None),
469        },
470        None => Ok(None),
471    }
472}
473
474pub fn parse_protobuf_file_scan_schema(
475    proto: &protobuf::FileScanExecConf,
476) -> Result<Arc<Schema>> {
477    Ok(Arc::new(convert_required!(proto.schema)?))
478}
479
480pub fn parse_protobuf_file_scan_config(
481    proto: &protobuf::FileScanExecConf,
482    registry: &dyn FunctionRegistry,
483    codec: &dyn PhysicalExtensionCodec,
484    file_source: Arc<dyn FileSource>,
485) -> Result<FileScanConfig> {
486    let schema: Arc<Schema> = parse_protobuf_file_scan_schema(proto)?;
487    let projection = proto
488        .projection
489        .iter()
490        .map(|i| *i as usize)
491        .collect::<Vec<_>>();
492
493    let constraints = convert_required!(proto.constraints)?;
494    let statistics = convert_required!(proto.statistics)?;
495
496    let file_groups = proto
497        .file_groups
498        .iter()
499        .map(|f| f.try_into())
500        .collect::<Result<Vec<_>, _>>()?;
501
502    let object_store_url = match proto.object_store_url.is_empty() {
503        false => ObjectStoreUrl::parse(&proto.object_store_url)?,
504        true => ObjectStoreUrl::local_filesystem(),
505    };
506
507    // Reacquire the partition column types from the schema before removing them below.
508    let table_partition_cols = proto
509        .table_partition_cols
510        .iter()
511        .map(|col| Ok(schema.field_with_name(col)?.clone()))
512        .collect::<Result<Vec<_>>>()?;
513
514    // Remove partition columns from the schema after recreating table_partition_cols
515    // because the partition columns are not in the file. They are present to allow
516    // the partition column types to be reconstructed after serde.
517    let file_schema = Arc::new(Schema::new(
518        schema
519            .fields()
520            .iter()
521            .filter(|field| !table_partition_cols.contains(field))
522            .cloned()
523            .collect::<Vec<_>>(),
524    ));
525
526    let mut output_ordering = vec![];
527    for node_collection in &proto.output_ordering {
528        let sort_expr = parse_physical_sort_exprs(
529            &node_collection.physical_sort_expr_nodes,
530            registry,
531            &schema,
532            codec,
533        )?;
534        output_ordering.push(sort_expr);
535    }
536
537    let config = FileScanConfigBuilder::new(object_store_url, file_schema, file_source)
538        .with_file_groups(file_groups)
539        .with_constraints(constraints)
540        .with_statistics(statistics)
541        .with_projection(Some(projection))
542        .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize))
543        .with_table_partition_cols(table_partition_cols)
544        .with_output_ordering(output_ordering)
545        .with_batch_size(proto.batch_size.map(|s| s as usize))
546        .build();
547    Ok(config)
548}
549
550impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
551    type Error = DataFusionError;
552
553    fn try_from(val: &protobuf::PartitionedFile) -> Result<Self, Self::Error> {
554        Ok(PartitionedFile {
555            object_meta: ObjectMeta {
556                location: Path::from(val.path.as_str()),
557                last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64),
558                size: val.size,
559                e_tag: None,
560                version: None,
561            },
562            partition_values: val
563                .partition_values
564                .iter()
565                .map(|v| v.try_into())
566                .collect::<Result<Vec<_>, _>>()?,
567            range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
568            statistics: val
569                .statistics
570                .as_ref()
571                .map(|v| v.try_into().map(Arc::new))
572                .transpose()?,
573            extensions: None,
574            metadata_size_hint: None,
575        })
576    }
577}
578
579impl TryFrom<&protobuf::FileRange> for FileRange {
580    type Error = DataFusionError;
581
582    fn try_from(value: &protobuf::FileRange) -> Result<Self, Self::Error> {
583        Ok(FileRange {
584            start: value.start,
585            end: value.end,
586        })
587    }
588}
589
590impl TryFrom<&protobuf::FileGroup> for FileGroup {
591    type Error = DataFusionError;
592
593    fn try_from(val: &protobuf::FileGroup) -> Result<Self, Self::Error> {
594        let files = val
595            .files
596            .iter()
597            .map(|f| f.try_into())
598            .collect::<Result<Vec<_>, _>>()?;
599        Ok(FileGroup::new(files))
600    }
601}
602
603impl TryFrom<&protobuf::JsonSink> for JsonSink {
604    type Error = DataFusionError;
605
606    fn try_from(value: &protobuf::JsonSink) -> Result<Self, Self::Error> {
607        Ok(Self::new(
608            convert_required!(value.config)?,
609            convert_required!(value.writer_options)?,
610        ))
611    }
612}
613
614#[cfg(feature = "parquet")]
615impl TryFrom<&protobuf::ParquetSink> for ParquetSink {
616    type Error = DataFusionError;
617
618    fn try_from(value: &protobuf::ParquetSink) -> Result<Self, Self::Error> {
619        Ok(Self::new(
620            convert_required!(value.config)?,
621            convert_required!(value.parquet_options)?,
622        ))
623    }
624}
625
626impl TryFrom<&protobuf::CsvSink> for CsvSink {
627    type Error = DataFusionError;
628
629    fn try_from(value: &protobuf::CsvSink) -> Result<Self, Self::Error> {
630        Ok(Self::new(
631            convert_required!(value.config)?,
632            convert_required!(value.writer_options)?,
633        ))
634    }
635}
636
637impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
638    type Error = DataFusionError;
639
640    fn try_from(conf: &protobuf::FileSinkConfig) -> Result<Self, Self::Error> {
641        let file_group = FileGroup::new(
642            conf.file_groups
643                .iter()
644                .map(|f| f.try_into())
645                .collect::<Result<Vec<_>>>()?,
646        );
647        let table_paths = conf
648            .table_paths
649            .iter()
650            .map(ListingTableUrl::parse)
651            .collect::<Result<Vec<_>>>()?;
652        let table_partition_cols = conf
653            .table_partition_cols
654            .iter()
655            .map(|protobuf::PartitionColumn { name, arrow_type }| {
656                let data_type = convert_required!(arrow_type)?;
657                Ok((name.clone(), data_type))
658            })
659            .collect::<Result<Vec<_>>>()?;
660        let insert_op = match conf.insert_op() {
661            protobuf::InsertOp::Append => InsertOp::Append,
662            protobuf::InsertOp::Overwrite => InsertOp::Overwrite,
663            protobuf::InsertOp::Replace => InsertOp::Replace,
664        };
665        Ok(Self {
666            original_url: String::default(),
667            object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?,
668            file_group,
669            table_paths,
670            output_schema: Arc::new(convert_required!(conf.output_schema)?),
671            table_partition_cols,
672            insert_op,
673            keep_partition_by_columns: conf.keep_partition_by_columns,
674            file_extension: conf.file_extension.clone(),
675        })
676    }
677}