datafusion_proto/physical_plan/
from_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Serde code to convert from protocol buffers to Rust data structures.
19
20use std::sync::Arc;
21
22use arrow::compute::SortOptions;
23use arrow::datatypes::Field;
24use chrono::{TimeZone, Utc};
25use datafusion_expr::dml::InsertOp;
26use object_store::path::Path;
27use object_store::ObjectMeta;
28
29use datafusion::arrow::datatypes::Schema;
30use datafusion::datasource::file_format::csv::CsvSink;
31use datafusion::datasource::file_format::json::JsonSink;
32#[cfg(feature = "parquet")]
33use datafusion::datasource::file_format::parquet::ParquetSink;
34use datafusion::datasource::listing::{FileRange, ListingTableUrl, PartitionedFile};
35use datafusion::datasource::object_store::ObjectStoreUrl;
36use datafusion::datasource::physical_plan::{
37    FileGroup, FileScanConfig, FileScanConfigBuilder, FileSinkConfig, FileSource,
38};
39use datafusion::execution::FunctionRegistry;
40use datafusion::logical_expr::WindowFunctionDefinition;
41use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
42use datafusion::physical_plan::expressions::{
43    in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr,
44    Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
45};
46use datafusion::physical_plan::windows::{create_window_expr, schema_add_window_field};
47use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
48use datafusion_common::{not_impl_err, DataFusionError, Result};
49use datafusion_proto_common::common::proto_error;
50
51use crate::convert_required;
52use crate::logical_plan::{self};
53use crate::protobuf;
54use crate::protobuf::physical_expr_node::ExprType;
55
56use super::PhysicalExtensionCodec;
57
58impl From<&protobuf::PhysicalColumn> for Column {
59    fn from(c: &protobuf::PhysicalColumn) -> Column {
60        Column::new(&c.name, c.index as usize)
61    }
62}
63
64/// Parses a physical sort expression from a protobuf.
65///
66/// # Arguments
67///
68/// * `proto` - Input proto with physical sort expression node
69/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
70/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
71///   when performing type coercion.
72/// * `codec` - An extension codec used to decode custom UDFs.
73pub fn parse_physical_sort_expr(
74    proto: &protobuf::PhysicalSortExprNode,
75    registry: &dyn FunctionRegistry,
76    input_schema: &Schema,
77    codec: &dyn PhysicalExtensionCodec,
78) -> Result<PhysicalSortExpr> {
79    if let Some(expr) = &proto.expr {
80        let expr = parse_physical_expr(expr.as_ref(), registry, input_schema, codec)?;
81        let options = SortOptions {
82            descending: !proto.asc,
83            nulls_first: proto.nulls_first,
84        };
85        Ok(PhysicalSortExpr { expr, options })
86    } else {
87        Err(proto_error("Unexpected empty physical expression"))
88    }
89}
90
91/// Parses a physical sort expressions from a protobuf.
92///
93/// # Arguments
94///
95/// * `proto` - Input proto with vector of physical sort expression node
96/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
97/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
98///   when performing type coercion.
99/// * `codec` - An extension codec used to decode custom UDFs.
100pub fn parse_physical_sort_exprs(
101    proto: &[protobuf::PhysicalSortExprNode],
102    registry: &dyn FunctionRegistry,
103    input_schema: &Schema,
104    codec: &dyn PhysicalExtensionCodec,
105) -> Result<LexOrdering> {
106    proto
107        .iter()
108        .map(|sort_expr| {
109            parse_physical_sort_expr(sort_expr, registry, input_schema, codec)
110        })
111        .collect::<Result<LexOrdering>>()
112}
113
114/// Parses a physical window expr from a protobuf.
115///
116/// # Arguments
117///
118/// * `proto` - Input proto with physical window expression node.
119/// * `name` - Name of the window expression.
120/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
121/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
122///   when performing type coercion.
123/// * `codec` - An extension codec used to decode custom UDFs.
124pub fn parse_physical_window_expr(
125    proto: &protobuf::PhysicalWindowExprNode,
126    registry: &dyn FunctionRegistry,
127    input_schema: &Schema,
128    codec: &dyn PhysicalExtensionCodec,
129) -> Result<Arc<dyn WindowExpr>> {
130    let window_node_expr =
131        parse_physical_exprs(&proto.args, registry, input_schema, codec)?;
132    let partition_by =
133        parse_physical_exprs(&proto.partition_by, registry, input_schema, codec)?;
134
135    let order_by =
136        parse_physical_sort_exprs(&proto.order_by, registry, input_schema, codec)?;
137
138    let window_frame = proto
139        .window_frame
140        .as_ref()
141        .map(|wf| wf.clone().try_into())
142        .transpose()
143        .map_err(|e| DataFusionError::Internal(format!("{e}")))?
144        .ok_or_else(|| {
145            DataFusionError::Internal(
146                "Missing required field 'window_frame' in protobuf".to_string(),
147            )
148        })?;
149
150    let fun = if let Some(window_func) = proto.window_function.as_ref() {
151        match window_func {
152            protobuf::physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(udaf_name) => {
153                WindowFunctionDefinition::AggregateUDF(match &proto.fun_definition {
154                    Some(buf) => codec.try_decode_udaf(udaf_name, buf)?,
155                    None => registry.udaf(udaf_name).or_else(|_| codec.try_decode_udaf(udaf_name, &[]))?,
156                })
157            }
158            protobuf::physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(udwf_name) => {
159                WindowFunctionDefinition::WindowUDF(match &proto.fun_definition {
160                    Some(buf) => codec.try_decode_udwf(udwf_name, buf)?,
161                    None => registry.udwf(udwf_name).or_else(|_| codec.try_decode_udwf(udwf_name, &[]))?
162                })
163            }
164        }
165    } else {
166        return Err(proto_error("Missing required field in protobuf"));
167    };
168
169    let name = proto.name.clone();
170    // TODO: Remove extended_schema if functions are all UDAF
171    let extended_schema =
172        schema_add_window_field(&window_node_expr, input_schema, &fun, &name)?;
173    create_window_expr(
174        &fun,
175        name,
176        &window_node_expr,
177        &partition_by,
178        order_by.as_ref(),
179        Arc::new(window_frame),
180        &extended_schema,
181        false,
182    )
183}
184
185pub fn parse_physical_exprs<'a, I>(
186    protos: I,
187    registry: &dyn FunctionRegistry,
188    input_schema: &Schema,
189    codec: &dyn PhysicalExtensionCodec,
190) -> Result<Vec<Arc<dyn PhysicalExpr>>>
191where
192    I: IntoIterator<Item = &'a protobuf::PhysicalExprNode>,
193{
194    protos
195        .into_iter()
196        .map(|p| parse_physical_expr(p, registry, input_schema, codec))
197        .collect::<Result<Vec<_>>>()
198}
199
200/// Parses a physical expression from a protobuf.
201///
202/// # Arguments
203///
204/// * `proto` - Input proto with physical expression node
205/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
206/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
207///   when performing type coercion.
208/// * `codec` - An extension codec used to decode custom UDFs.
209pub fn parse_physical_expr(
210    proto: &protobuf::PhysicalExprNode,
211    registry: &dyn FunctionRegistry,
212    input_schema: &Schema,
213    codec: &dyn PhysicalExtensionCodec,
214) -> Result<Arc<dyn PhysicalExpr>> {
215    let expr_type = proto
216        .expr_type
217        .as_ref()
218        .ok_or_else(|| proto_error("Unexpected empty physical expression"))?;
219
220    let pexpr: Arc<dyn PhysicalExpr> = match expr_type {
221        ExprType::Column(c) => {
222            let pcol: Column = c.into();
223            Arc::new(pcol)
224        }
225        ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
226        ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
227        ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new(
228            parse_required_physical_expr(
229                binary_expr.l.as_deref(),
230                registry,
231                "left",
232                input_schema,
233                codec,
234            )?,
235            logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?,
236            parse_required_physical_expr(
237                binary_expr.r.as_deref(),
238                registry,
239                "right",
240                input_schema,
241                codec,
242            )?,
243        )),
244        ExprType::AggregateExpr(_) => {
245            return not_impl_err!(
246                "Cannot convert aggregate expr node to physical expression"
247            );
248        }
249        ExprType::WindowExpr(_) => {
250            return not_impl_err!(
251                "Cannot convert window expr node to physical expression"
252            );
253        }
254        ExprType::Sort(_) => {
255            return not_impl_err!("Cannot convert sort expr node to physical expression");
256        }
257        ExprType::IsNullExpr(e) => {
258            Arc::new(IsNullExpr::new(parse_required_physical_expr(
259                e.expr.as_deref(),
260                registry,
261                "expr",
262                input_schema,
263                codec,
264            )?))
265        }
266        ExprType::IsNotNullExpr(e) => {
267            Arc::new(IsNotNullExpr::new(parse_required_physical_expr(
268                e.expr.as_deref(),
269                registry,
270                "expr",
271                input_schema,
272                codec,
273            )?))
274        }
275        ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr(
276            e.expr.as_deref(),
277            registry,
278            "expr",
279            input_schema,
280            codec,
281        )?)),
282        ExprType::Negative(e) => {
283            Arc::new(NegativeExpr::new(parse_required_physical_expr(
284                e.expr.as_deref(),
285                registry,
286                "expr",
287                input_schema,
288                codec,
289            )?))
290        }
291        ExprType::InList(e) => in_list(
292            parse_required_physical_expr(
293                e.expr.as_deref(),
294                registry,
295                "expr",
296                input_schema,
297                codec,
298            )?,
299            parse_physical_exprs(&e.list, registry, input_schema, codec)?,
300            &e.negated,
301            input_schema,
302        )?,
303        ExprType::Case(e) => Arc::new(CaseExpr::try_new(
304            e.expr
305                .as_ref()
306                .map(|e| parse_physical_expr(e.as_ref(), registry, input_schema, codec))
307                .transpose()?,
308            e.when_then_expr
309                .iter()
310                .map(|e| {
311                    Ok((
312                        parse_required_physical_expr(
313                            e.when_expr.as_ref(),
314                            registry,
315                            "when_expr",
316                            input_schema,
317                            codec,
318                        )?,
319                        parse_required_physical_expr(
320                            e.then_expr.as_ref(),
321                            registry,
322                            "then_expr",
323                            input_schema,
324                            codec,
325                        )?,
326                    ))
327                })
328                .collect::<Result<Vec<_>>>()?,
329            e.else_expr
330                .as_ref()
331                .map(|e| parse_physical_expr(e.as_ref(), registry, input_schema, codec))
332                .transpose()?,
333        )?),
334        ExprType::Cast(e) => Arc::new(CastExpr::new(
335            parse_required_physical_expr(
336                e.expr.as_deref(),
337                registry,
338                "expr",
339                input_schema,
340                codec,
341            )?,
342            convert_required!(e.arrow_type)?,
343            None,
344        )),
345        ExprType::TryCast(e) => Arc::new(TryCastExpr::new(
346            parse_required_physical_expr(
347                e.expr.as_deref(),
348                registry,
349                "expr",
350                input_schema,
351                codec,
352            )?,
353            convert_required!(e.arrow_type)?,
354        )),
355        ExprType::ScalarUdf(e) => {
356            let udf = match &e.fun_definition {
357                Some(buf) => codec.try_decode_udf(&e.name, buf)?,
358                None => registry
359                    .udf(e.name.as_str())
360                    .or_else(|_| codec.try_decode_udf(&e.name, &[]))?,
361            };
362            let scalar_fun_def = Arc::clone(&udf);
363
364            let args = parse_physical_exprs(&e.args, registry, input_schema, codec)?;
365
366            Arc::new(
367                ScalarFunctionExpr::new(
368                    e.name.as_str(),
369                    scalar_fun_def,
370                    args,
371                    Field::new("f", convert_required!(e.return_type)?, true).into(),
372                )
373                .with_nullable(e.nullable),
374            )
375        }
376        ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new(
377            like_expr.negated,
378            like_expr.case_insensitive,
379            parse_required_physical_expr(
380                like_expr.expr.as_deref(),
381                registry,
382                "expr",
383                input_schema,
384                codec,
385            )?,
386            parse_required_physical_expr(
387                like_expr.pattern.as_deref(),
388                registry,
389                "pattern",
390                input_schema,
391                codec,
392            )?,
393        )),
394        ExprType::Extension(extension) => {
395            let inputs: Vec<Arc<dyn PhysicalExpr>> = extension
396                .inputs
397                .iter()
398                .map(|e| parse_physical_expr(e, registry, input_schema, codec))
399                .collect::<Result<_>>()?;
400            (codec.try_decode_expr(extension.expr.as_slice(), &inputs)?) as _
401        }
402    };
403
404    Ok(pexpr)
405}
406
407fn parse_required_physical_expr(
408    expr: Option<&protobuf::PhysicalExprNode>,
409    registry: &dyn FunctionRegistry,
410    field: &str,
411    input_schema: &Schema,
412    codec: &dyn PhysicalExtensionCodec,
413) -> Result<Arc<dyn PhysicalExpr>> {
414    expr.map(|e| parse_physical_expr(e, registry, input_schema, codec))
415        .transpose()?
416        .ok_or_else(|| {
417            DataFusionError::Internal(format!("Missing required field {field:?}"))
418        })
419}
420
421pub fn parse_protobuf_hash_partitioning(
422    partitioning: Option<&protobuf::PhysicalHashRepartition>,
423    registry: &dyn FunctionRegistry,
424    input_schema: &Schema,
425    codec: &dyn PhysicalExtensionCodec,
426) -> Result<Option<Partitioning>> {
427    match partitioning {
428        Some(hash_part) => {
429            let expr = parse_physical_exprs(
430                &hash_part.hash_expr,
431                registry,
432                input_schema,
433                codec,
434            )?;
435
436            Ok(Some(Partitioning::Hash(
437                expr,
438                hash_part.partition_count.try_into().unwrap(),
439            )))
440        }
441        None => Ok(None),
442    }
443}
444
445pub fn parse_protobuf_partitioning(
446    partitioning: Option<&protobuf::Partitioning>,
447    registry: &dyn FunctionRegistry,
448    input_schema: &Schema,
449    codec: &dyn PhysicalExtensionCodec,
450) -> Result<Option<Partitioning>> {
451    match partitioning {
452        Some(protobuf::Partitioning { partition_method }) => match partition_method {
453            Some(protobuf::partitioning::PartitionMethod::RoundRobin(
454                partition_count,
455            )) => Ok(Some(Partitioning::RoundRobinBatch(
456                *partition_count as usize,
457            ))),
458            Some(protobuf::partitioning::PartitionMethod::Hash(hash_repartition)) => {
459                parse_protobuf_hash_partitioning(
460                    Some(hash_repartition),
461                    registry,
462                    input_schema,
463                    codec,
464                )
465            }
466            Some(protobuf::partitioning::PartitionMethod::Unknown(partition_count)) => {
467                Ok(Some(Partitioning::UnknownPartitioning(
468                    *partition_count as usize,
469                )))
470            }
471            None => Ok(None),
472        },
473        None => Ok(None),
474    }
475}
476
477pub fn parse_protobuf_file_scan_schema(
478    proto: &protobuf::FileScanExecConf,
479) -> Result<Arc<Schema>> {
480    Ok(Arc::new(convert_required!(proto.schema)?))
481}
482
483pub fn parse_protobuf_file_scan_config(
484    proto: &protobuf::FileScanExecConf,
485    registry: &dyn FunctionRegistry,
486    codec: &dyn PhysicalExtensionCodec,
487    file_source: Arc<dyn FileSource>,
488) -> Result<FileScanConfig> {
489    let schema: Arc<Schema> = parse_protobuf_file_scan_schema(proto)?;
490    let projection = proto
491        .projection
492        .iter()
493        .map(|i| *i as usize)
494        .collect::<Vec<_>>();
495
496    let constraints = convert_required!(proto.constraints)?;
497    let statistics = convert_required!(proto.statistics)?;
498
499    let file_groups = proto
500        .file_groups
501        .iter()
502        .map(|f| f.try_into())
503        .collect::<Result<Vec<_>, _>>()?;
504
505    let object_store_url = match proto.object_store_url.is_empty() {
506        false => ObjectStoreUrl::parse(&proto.object_store_url)?,
507        true => ObjectStoreUrl::local_filesystem(),
508    };
509
510    // Reacquire the partition column types from the schema before removing them below.
511    let table_partition_cols = proto
512        .table_partition_cols
513        .iter()
514        .map(|col| Ok(schema.field_with_name(col)?.clone()))
515        .collect::<Result<Vec<_>>>()?;
516
517    // Remove partition columns from the schema after recreating table_partition_cols
518    // because the partition columns are not in the file. They are present to allow
519    // the partition column types to be reconstructed after serde.
520    let file_schema = Arc::new(Schema::new(
521        schema
522            .fields()
523            .iter()
524            .filter(|field| !table_partition_cols.contains(field))
525            .cloned()
526            .collect::<Vec<_>>(),
527    ));
528
529    let mut output_ordering = vec![];
530    for node_collection in &proto.output_ordering {
531        let sort_expr = parse_physical_sort_exprs(
532            &node_collection.physical_sort_expr_nodes,
533            registry,
534            &schema,
535            codec,
536        )?;
537        output_ordering.push(sort_expr);
538    }
539
540    let config = FileScanConfigBuilder::new(object_store_url, file_schema, file_source)
541        .with_file_groups(file_groups)
542        .with_constraints(constraints)
543        .with_statistics(statistics)
544        .with_projection(Some(projection))
545        .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize))
546        .with_table_partition_cols(table_partition_cols)
547        .with_output_ordering(output_ordering)
548        .with_batch_size(proto.batch_size.map(|s| s as usize))
549        .build();
550    Ok(config)
551}
552
553impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
554    type Error = DataFusionError;
555
556    fn try_from(val: &protobuf::PartitionedFile) -> Result<Self, Self::Error> {
557        Ok(PartitionedFile {
558            object_meta: ObjectMeta {
559                location: Path::from(val.path.as_str()),
560                last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64),
561                size: val.size,
562                e_tag: None,
563                version: None,
564            },
565            partition_values: val
566                .partition_values
567                .iter()
568                .map(|v| v.try_into())
569                .collect::<Result<Vec<_>, _>>()?,
570            range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
571            statistics: val
572                .statistics
573                .as_ref()
574                .map(|v| v.try_into().map(Arc::new))
575                .transpose()?,
576            extensions: None,
577            metadata_size_hint: None,
578        })
579    }
580}
581
582impl TryFrom<&protobuf::FileRange> for FileRange {
583    type Error = DataFusionError;
584
585    fn try_from(value: &protobuf::FileRange) -> Result<Self, Self::Error> {
586        Ok(FileRange {
587            start: value.start,
588            end: value.end,
589        })
590    }
591}
592
593impl TryFrom<&protobuf::FileGroup> for FileGroup {
594    type Error = DataFusionError;
595
596    fn try_from(val: &protobuf::FileGroup) -> Result<Self, Self::Error> {
597        let files = val
598            .files
599            .iter()
600            .map(|f| f.try_into())
601            .collect::<Result<Vec<_>, _>>()?;
602        Ok(FileGroup::new(files))
603    }
604}
605
606impl TryFrom<&protobuf::JsonSink> for JsonSink {
607    type Error = DataFusionError;
608
609    fn try_from(value: &protobuf::JsonSink) -> Result<Self, Self::Error> {
610        Ok(Self::new(
611            convert_required!(value.config)?,
612            convert_required!(value.writer_options)?,
613        ))
614    }
615}
616
617#[cfg(feature = "parquet")]
618impl TryFrom<&protobuf::ParquetSink> for ParquetSink {
619    type Error = DataFusionError;
620
621    fn try_from(value: &protobuf::ParquetSink) -> Result<Self, Self::Error> {
622        Ok(Self::new(
623            convert_required!(value.config)?,
624            convert_required!(value.parquet_options)?,
625        ))
626    }
627}
628
629impl TryFrom<&protobuf::CsvSink> for CsvSink {
630    type Error = DataFusionError;
631
632    fn try_from(value: &protobuf::CsvSink) -> Result<Self, Self::Error> {
633        Ok(Self::new(
634            convert_required!(value.config)?,
635            convert_required!(value.writer_options)?,
636        ))
637    }
638}
639
640impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
641    type Error = DataFusionError;
642
643    fn try_from(conf: &protobuf::FileSinkConfig) -> Result<Self, Self::Error> {
644        let file_group = FileGroup::new(
645            conf.file_groups
646                .iter()
647                .map(|f| f.try_into())
648                .collect::<Result<Vec<_>>>()?,
649        );
650        let table_paths = conf
651            .table_paths
652            .iter()
653            .map(ListingTableUrl::parse)
654            .collect::<Result<Vec<_>>>()?;
655        let table_partition_cols = conf
656            .table_partition_cols
657            .iter()
658            .map(|protobuf::PartitionColumn { name, arrow_type }| {
659                let data_type = convert_required!(arrow_type)?;
660                Ok((name.clone(), data_type))
661            })
662            .collect::<Result<Vec<_>>>()?;
663        let insert_op = match conf.insert_op() {
664            protobuf::InsertOp::Append => InsertOp::Append,
665            protobuf::InsertOp::Overwrite => InsertOp::Overwrite,
666            protobuf::InsertOp::Replace => InsertOp::Replace,
667        };
668        Ok(Self {
669            original_url: String::default(),
670            object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?,
671            file_group,
672            table_paths,
673            output_schema: Arc::new(convert_required!(conf.output_schema)?),
674            table_partition_cols,
675            insert_op,
676            keep_partition_by_columns: conf.keep_partition_by_columns,
677            file_extension: conf.file_extension.clone(),
678        })
679    }
680}