1use std::fmt::Debug;
19use std::sync::Arc;
20
21use self::from_proto::parse_protobuf_partitioning;
22use self::to_proto::{serialize_partitioning, serialize_physical_expr};
23use crate::common::{byte_to_string, str_to_byte};
24use crate::physical_plan::from_proto::{
25 parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
26 parse_physical_window_expr, parse_protobuf_file_scan_config, parse_record_batches,
27};
28use crate::physical_plan::to_proto::{
29 serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr,
30 serialize_physical_sort_exprs, serialize_physical_window_expr,
31 serialize_record_batches,
32};
33use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
34use crate::protobuf::physical_expr_node::ExprType;
35use crate::protobuf::physical_plan_node::PhysicalPlanType;
36use crate::protobuf::{
37 self, proto_error, window_agg_exec_node, ListUnnest as ProtoListUnnest, SortExprNode,
38 SortMergeJoinExecNode,
39};
40use crate::{convert_required, into_required};
41
42use arrow::compute::SortOptions;
43use arrow::datatypes::{IntervalMonthDayNanoType, SchemaRef};
44use datafusion_catalog::memory::MemorySourceConfig;
45use datafusion_common::{
46 internal_datafusion_err, internal_err, not_impl_err, DataFusionError, Result,
47};
48#[cfg(feature = "parquet")]
49use datafusion_datasource::file::FileSource;
50use datafusion_datasource::file_compression_type::FileCompressionType;
51use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
52use datafusion_datasource::sink::DataSinkExec;
53use datafusion_datasource::source::{DataSource, DataSourceExec};
54#[cfg(feature = "avro")]
55use datafusion_datasource_avro::source::AvroSource;
56use datafusion_datasource_csv::file_format::CsvSink;
57use datafusion_datasource_csv::source::CsvSource;
58use datafusion_datasource_json::file_format::JsonSink;
59use datafusion_datasource_json::source::JsonSource;
60#[cfg(feature = "parquet")]
61use datafusion_datasource_parquet::file_format::ParquetSink;
62#[cfg(feature = "parquet")]
63use datafusion_datasource_parquet::source::ParquetSource;
64use datafusion_execution::{FunctionRegistry, TaskContext};
65use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
66use datafusion_functions_table::generate_series::{
67 Empty, GenSeriesArgs, GenerateSeriesTable, GenericSeriesState, TimestampValue,
68};
69use datafusion_physical_expr::aggregate::AggregateExprBuilder;
70use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
71use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
72use datafusion_physical_plan::aggregates::AggregateMode;
73use datafusion_physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
74use datafusion_physical_plan::analyze::AnalyzeExec;
75use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
76use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
77use datafusion_physical_plan::coop::CooperativeExec;
78use datafusion_physical_plan::empty::EmptyExec;
79use datafusion_physical_plan::explain::ExplainExec;
80use datafusion_physical_plan::expressions::PhysicalSortExpr;
81use datafusion_physical_plan::filter::FilterExec;
82use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
83use datafusion_physical_plan::joins::{
84 CrossJoinExec, NestedLoopJoinExec, SortMergeJoinExec, StreamJoinPartitionMode,
85 SymmetricHashJoinExec,
86};
87use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
88use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
89use datafusion_physical_plan::memory::LazyMemoryExec;
90use datafusion_physical_plan::metrics::MetricType;
91use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
92use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
93use datafusion_physical_plan::repartition::RepartitionExec;
94use datafusion_physical_plan::sorts::sort::SortExec;
95use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
96use datafusion_physical_plan::union::{InterleaveExec, UnionExec};
97use datafusion_physical_plan::unnest::{ListUnnest, UnnestExec};
98use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
99use datafusion_physical_plan::{ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr};
100
101use prost::bytes::BufMut;
102use prost::Message;
103
104pub mod from_proto;
105pub mod to_proto;
106
107impl AsExecutionPlan for protobuf::PhysicalPlanNode {
108 fn try_decode(buf: &[u8]) -> Result<Self>
109 where
110 Self: Sized,
111 {
112 protobuf::PhysicalPlanNode::decode(buf).map_err(|e| {
113 internal_datafusion_err!("failed to decode physical plan: {e:?}")
114 })
115 }
116
117 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
118 where
119 B: BufMut,
120 Self: Sized,
121 {
122 self.encode(buf).map_err(|e| {
123 internal_datafusion_err!("failed to encode physical plan: {e:?}")
124 })
125 }
126
127 fn try_into_physical_plan(
128 &self,
129 ctx: &TaskContext,
130
131 extension_codec: &dyn PhysicalExtensionCodec,
132 ) -> Result<Arc<dyn ExecutionPlan>> {
133 let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
134 proto_error(format!(
135 "physical_plan::from_proto() Unsupported physical plan '{self:?}'"
136 ))
137 })?;
138 match plan {
139 PhysicalPlanType::Explain(explain) => {
140 self.try_into_explain_physical_plan(explain, ctx, extension_codec)
141 }
142 PhysicalPlanType::Projection(projection) => {
143 self.try_into_projection_physical_plan(projection, ctx, extension_codec)
144 }
145 PhysicalPlanType::Filter(filter) => {
146 self.try_into_filter_physical_plan(filter, ctx, extension_codec)
147 }
148 PhysicalPlanType::CsvScan(scan) => {
149 self.try_into_csv_scan_physical_plan(scan, ctx, extension_codec)
150 }
151 PhysicalPlanType::JsonScan(scan) => {
152 self.try_into_json_scan_physical_plan(scan, ctx, extension_codec)
153 }
154 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
155 PhysicalPlanType::ParquetScan(scan) => {
156 self.try_into_parquet_scan_physical_plan(scan, ctx, extension_codec)
157 }
158 #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
159 PhysicalPlanType::AvroScan(scan) => {
160 self.try_into_avro_scan_physical_plan(scan, ctx, extension_codec)
161 }
162 PhysicalPlanType::MemoryScan(scan) => {
163 self.try_into_memory_scan_physical_plan(scan, ctx, extension_codec)
164 }
165 PhysicalPlanType::CoalesceBatches(coalesce_batches) => self
166 .try_into_coalesce_batches_physical_plan(
167 coalesce_batches,
168 ctx,
169 extension_codec,
170 ),
171 PhysicalPlanType::Merge(merge) => {
172 self.try_into_merge_physical_plan(merge, ctx, extension_codec)
173 }
174 PhysicalPlanType::Repartition(repart) => {
175 self.try_into_repartition_physical_plan(repart, ctx, extension_codec)
176 }
177 PhysicalPlanType::GlobalLimit(limit) => {
178 self.try_into_global_limit_physical_plan(limit, ctx, extension_codec)
179 }
180 PhysicalPlanType::LocalLimit(limit) => {
181 self.try_into_local_limit_physical_plan(limit, ctx, extension_codec)
182 }
183 PhysicalPlanType::Window(window_agg) => {
184 self.try_into_window_physical_plan(window_agg, ctx, extension_codec)
185 }
186 PhysicalPlanType::Aggregate(hash_agg) => {
187 self.try_into_aggregate_physical_plan(hash_agg, ctx, extension_codec)
188 }
189 PhysicalPlanType::HashJoin(hashjoin) => {
190 self.try_into_hash_join_physical_plan(hashjoin, ctx, extension_codec)
191 }
192 PhysicalPlanType::SymmetricHashJoin(sym_join) => self
193 .try_into_symmetric_hash_join_physical_plan(
194 sym_join,
195 ctx,
196 extension_codec,
197 ),
198 PhysicalPlanType::Union(union) => {
199 self.try_into_union_physical_plan(union, ctx, extension_codec)
200 }
201 PhysicalPlanType::Interleave(interleave) => {
202 self.try_into_interleave_physical_plan(interleave, ctx, extension_codec)
203 }
204 PhysicalPlanType::CrossJoin(crossjoin) => {
205 self.try_into_cross_join_physical_plan(crossjoin, ctx, extension_codec)
206 }
207 PhysicalPlanType::Empty(empty) => {
208 self.try_into_empty_physical_plan(empty, ctx, extension_codec)
209 }
210 PhysicalPlanType::PlaceholderRow(placeholder) => self
211 .try_into_placeholder_row_physical_plan(
212 placeholder,
213 ctx,
214 extension_codec,
215 ),
216 PhysicalPlanType::Sort(sort) => {
217 self.try_into_sort_physical_plan(sort, ctx, extension_codec)
218 }
219 PhysicalPlanType::SortPreservingMerge(sort) => self
220 .try_into_sort_preserving_merge_physical_plan(sort, ctx, extension_codec),
221 PhysicalPlanType::Extension(extension) => {
222 self.try_into_extension_physical_plan(extension, ctx, extension_codec)
223 }
224 PhysicalPlanType::NestedLoopJoin(join) => {
225 self.try_into_nested_loop_join_physical_plan(join, ctx, extension_codec)
226 }
227 PhysicalPlanType::Analyze(analyze) => {
228 self.try_into_analyze_physical_plan(analyze, ctx, extension_codec)
229 }
230 PhysicalPlanType::JsonSink(sink) => {
231 self.try_into_json_sink_physical_plan(sink, ctx, extension_codec)
232 }
233 PhysicalPlanType::CsvSink(sink) => {
234 self.try_into_csv_sink_physical_plan(sink, ctx, extension_codec)
235 }
236 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
237 PhysicalPlanType::ParquetSink(sink) => {
238 self.try_into_parquet_sink_physical_plan(sink, ctx, extension_codec)
239 }
240 PhysicalPlanType::Unnest(unnest) => {
241 self.try_into_unnest_physical_plan(unnest, ctx, extension_codec)
242 }
243 PhysicalPlanType::Cooperative(cooperative) => {
244 self.try_into_cooperative_physical_plan(cooperative, ctx, extension_codec)
245 }
246 PhysicalPlanType::GenerateSeries(generate_series) => {
247 self.try_into_generate_series_physical_plan(generate_series)
248 }
249 PhysicalPlanType::SortMergeJoin(sort_join) => {
250 self.try_into_sort_join(sort_join, ctx, extension_codec)
251 }
252 }
253 }
254
255 fn try_from_physical_plan(
256 plan: Arc<dyn ExecutionPlan>,
257 extension_codec: &dyn PhysicalExtensionCodec,
258 ) -> Result<Self>
259 where
260 Self: Sized,
261 {
262 let plan_clone = Arc::clone(&plan);
263 let plan = plan.as_any();
264
265 if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
266 return protobuf::PhysicalPlanNode::try_from_explain_exec(
267 exec,
268 extension_codec,
269 );
270 }
271
272 if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
273 return protobuf::PhysicalPlanNode::try_from_projection_exec(
274 exec,
275 extension_codec,
276 );
277 }
278
279 if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
280 return protobuf::PhysicalPlanNode::try_from_analyze_exec(
281 exec,
282 extension_codec,
283 );
284 }
285
286 if let Some(exec) = plan.downcast_ref::<FilterExec>() {
287 return protobuf::PhysicalPlanNode::try_from_filter_exec(
288 exec,
289 extension_codec,
290 );
291 }
292
293 if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
294 return protobuf::PhysicalPlanNode::try_from_global_limit_exec(
295 limit,
296 extension_codec,
297 );
298 }
299
300 if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
301 return protobuf::PhysicalPlanNode::try_from_local_limit_exec(
302 limit,
303 extension_codec,
304 );
305 }
306
307 if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
308 return protobuf::PhysicalPlanNode::try_from_hash_join_exec(
309 exec,
310 extension_codec,
311 );
312 }
313
314 if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
315 return protobuf::PhysicalPlanNode::try_from_symmetric_hash_join_exec(
316 exec,
317 extension_codec,
318 );
319 }
320
321 if let Some(exec) = plan.downcast_ref::<SortMergeJoinExec>() {
322 return protobuf::PhysicalPlanNode::try_from_sort_merge_join_exec(
323 exec,
324 extension_codec,
325 );
326 }
327
328 if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
329 return protobuf::PhysicalPlanNode::try_from_cross_join_exec(
330 exec,
331 extension_codec,
332 );
333 }
334
335 if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
336 return protobuf::PhysicalPlanNode::try_from_aggregate_exec(
337 exec,
338 extension_codec,
339 );
340 }
341
342 if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
343 return protobuf::PhysicalPlanNode::try_from_empty_exec(
344 empty,
345 extension_codec,
346 );
347 }
348
349 if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
350 return protobuf::PhysicalPlanNode::try_from_placeholder_row_exec(
351 empty,
352 extension_codec,
353 );
354 }
355
356 if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
357 return protobuf::PhysicalPlanNode::try_from_coalesce_batches_exec(
358 coalesce_batches,
359 extension_codec,
360 );
361 }
362
363 if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
364 if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec(
365 data_source_exec,
366 extension_codec,
367 )? {
368 return Ok(node);
369 }
370 }
371
372 if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
373 return protobuf::PhysicalPlanNode::try_from_coalesce_partitions_exec(
374 exec,
375 extension_codec,
376 );
377 }
378
379 if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
380 return protobuf::PhysicalPlanNode::try_from_repartition_exec(
381 exec,
382 extension_codec,
383 );
384 }
385
386 if let Some(exec) = plan.downcast_ref::<SortExec>() {
387 return protobuf::PhysicalPlanNode::try_from_sort_exec(exec, extension_codec);
388 }
389
390 if let Some(union) = plan.downcast_ref::<UnionExec>() {
391 return protobuf::PhysicalPlanNode::try_from_union_exec(
392 union,
393 extension_codec,
394 );
395 }
396
397 if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
398 return protobuf::PhysicalPlanNode::try_from_interleave_exec(
399 interleave,
400 extension_codec,
401 );
402 }
403
404 if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
405 return protobuf::PhysicalPlanNode::try_from_sort_preserving_merge_exec(
406 exec,
407 extension_codec,
408 );
409 }
410
411 if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
412 return protobuf::PhysicalPlanNode::try_from_nested_loop_join_exec(
413 exec,
414 extension_codec,
415 );
416 }
417
418 if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
419 return protobuf::PhysicalPlanNode::try_from_window_agg_exec(
420 exec,
421 extension_codec,
422 );
423 }
424
425 if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
426 return protobuf::PhysicalPlanNode::try_from_bounded_window_agg_exec(
427 exec,
428 extension_codec,
429 );
430 }
431
432 if let Some(exec) = plan.downcast_ref::<DataSinkExec>() {
433 if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec(
434 exec,
435 extension_codec,
436 )? {
437 return Ok(node);
438 }
439 }
440
441 if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
442 return protobuf::PhysicalPlanNode::try_from_unnest_exec(
443 exec,
444 extension_codec,
445 );
446 }
447
448 if let Some(exec) = plan.downcast_ref::<CooperativeExec>() {
449 return protobuf::PhysicalPlanNode::try_from_cooperative_exec(
450 exec,
451 extension_codec,
452 );
453 }
454
455 if let Some(exec) = plan.downcast_ref::<LazyMemoryExec>() {
456 if let Some(node) =
457 protobuf::PhysicalPlanNode::try_from_lazy_memory_exec(exec)?
458 {
459 return Ok(node);
460 }
461 }
462
463 let mut buf: Vec<u8> = vec![];
464 match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
465 Ok(_) => {
466 let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
467 .children()
468 .into_iter()
469 .cloned()
470 .map(|i| {
471 protobuf::PhysicalPlanNode::try_from_physical_plan(
472 i,
473 extension_codec,
474 )
475 })
476 .collect::<Result<_>>()?;
477
478 Ok(protobuf::PhysicalPlanNode {
479 physical_plan_type: Some(PhysicalPlanType::Extension(
480 protobuf::PhysicalExtensionNode { node: buf, inputs },
481 )),
482 })
483 }
484 Err(e) => internal_err!(
485 "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
486 ),
487 }
488 }
489}
490
491impl protobuf::PhysicalPlanNode {
492 fn try_into_explain_physical_plan(
493 &self,
494 explain: &protobuf::ExplainExecNode,
495 _ctx: &TaskContext,
496
497 _extension_codec: &dyn PhysicalExtensionCodec,
498 ) -> Result<Arc<dyn ExecutionPlan>> {
499 Ok(Arc::new(ExplainExec::new(
500 Arc::new(explain.schema.as_ref().unwrap().try_into()?),
501 explain
502 .stringified_plans
503 .iter()
504 .map(|plan| plan.into())
505 .collect(),
506 explain.verbose,
507 )))
508 }
509
510 fn try_into_projection_physical_plan(
511 &self,
512 projection: &protobuf::ProjectionExecNode,
513 ctx: &TaskContext,
514
515 extension_codec: &dyn PhysicalExtensionCodec,
516 ) -> Result<Arc<dyn ExecutionPlan>> {
517 let input: Arc<dyn ExecutionPlan> =
518 into_physical_plan(&projection.input, ctx, extension_codec)?;
519 let exprs = projection
520 .expr
521 .iter()
522 .zip(projection.expr_name.iter())
523 .map(|(expr, name)| {
524 Ok((
525 parse_physical_expr(
526 expr,
527 ctx,
528 input.schema().as_ref(),
529 extension_codec,
530 )?,
531 name.to_string(),
532 ))
533 })
534 .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>>>()?;
535 let proj_exprs: Vec<ProjectionExpr> = exprs
536 .into_iter()
537 .map(|(expr, alias)| ProjectionExpr { expr, alias })
538 .collect();
539 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
540 }
541
542 fn try_into_filter_physical_plan(
543 &self,
544 filter: &protobuf::FilterExecNode,
545 ctx: &TaskContext,
546
547 extension_codec: &dyn PhysicalExtensionCodec,
548 ) -> Result<Arc<dyn ExecutionPlan>> {
549 let input: Arc<dyn ExecutionPlan> =
550 into_physical_plan(&filter.input, ctx, extension_codec)?;
551
552 let predicate = filter
553 .expr
554 .as_ref()
555 .map(|expr| {
556 parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
557 })
558 .transpose()?
559 .ok_or_else(|| {
560 internal_datafusion_err!(
561 "filter (FilterExecNode) in PhysicalPlanNode is missing."
562 )
563 })?;
564
565 let filter_selectivity = filter.default_filter_selectivity.try_into();
566 let projection = if !filter.projection.is_empty() {
567 Some(
568 filter
569 .projection
570 .iter()
571 .map(|i| *i as usize)
572 .collect::<Vec<_>>(),
573 )
574 } else {
575 None
576 };
577
578 let filter =
579 FilterExec::try_new(predicate, input)?.with_projection(projection)?;
580 match filter_selectivity {
581 Ok(filter_selectivity) => Ok(Arc::new(
582 filter.with_default_selectivity(filter_selectivity)?,
583 )),
584 Err(_) => Err(internal_datafusion_err!(
585 "filter_selectivity in PhysicalPlanNode is invalid "
586 )),
587 }
588 }
589
590 fn try_into_csv_scan_physical_plan(
591 &self,
592 scan: &protobuf::CsvScanExecNode,
593 ctx: &TaskContext,
594
595 extension_codec: &dyn PhysicalExtensionCodec,
596 ) -> Result<Arc<dyn ExecutionPlan>> {
597 let escape =
598 if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(escape)) =
599 &scan.optional_escape
600 {
601 Some(str_to_byte(escape, "escape")?)
602 } else {
603 None
604 };
605
606 let comment = if let Some(
607 protobuf::csv_scan_exec_node::OptionalComment::Comment(comment),
608 ) = &scan.optional_comment
609 {
610 Some(str_to_byte(comment, "comment")?)
611 } else {
612 None
613 };
614
615 let source = Arc::new(
616 CsvSource::new(
617 scan.has_header,
618 str_to_byte(&scan.delimiter, "delimiter")?,
619 0,
620 )
621 .with_escape(escape)
622 .with_comment(comment),
623 );
624
625 let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config(
626 scan.base_conf.as_ref().unwrap(),
627 ctx,
628 extension_codec,
629 source,
630 )?)
631 .with_newlines_in_values(scan.newlines_in_values)
632 .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
633 .build();
634 Ok(DataSourceExec::from_data_source(conf))
635 }
636
637 fn try_into_json_scan_physical_plan(
638 &self,
639 scan: &protobuf::JsonScanExecNode,
640 ctx: &TaskContext,
641
642 extension_codec: &dyn PhysicalExtensionCodec,
643 ) -> Result<Arc<dyn ExecutionPlan>> {
644 let scan_conf = parse_protobuf_file_scan_config(
645 scan.base_conf.as_ref().unwrap(),
646 ctx,
647 extension_codec,
648 Arc::new(JsonSource::new()),
649 )?;
650 Ok(DataSourceExec::from_data_source(scan_conf))
651 }
652
653 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
654 fn try_into_parquet_scan_physical_plan(
655 &self,
656 scan: &protobuf::ParquetScanExecNode,
657 ctx: &TaskContext,
658
659 extension_codec: &dyn PhysicalExtensionCodec,
660 ) -> Result<Arc<dyn ExecutionPlan>> {
661 #[cfg(feature = "parquet")]
662 {
663 let schema = from_proto::parse_protobuf_file_scan_schema(
664 scan.base_conf.as_ref().unwrap(),
665 )?;
666
667 let base_conf = scan.base_conf.as_ref().unwrap();
669 let predicate_schema = if !base_conf.projection.is_empty() {
670 let projected_fields: Vec<_> = base_conf
672 .projection
673 .iter()
674 .map(|&i| schema.field(i as usize).clone())
675 .collect();
676 Arc::new(arrow::datatypes::Schema::new(projected_fields))
677 } else {
678 schema
679 };
680
681 let predicate = scan
682 .predicate
683 .as_ref()
684 .map(|expr| {
685 parse_physical_expr(
686 expr,
687 ctx,
688 predicate_schema.as_ref(),
689 extension_codec,
690 )
691 })
692 .transpose()?;
693 let mut options = datafusion_common::config::TableParquetOptions::default();
694
695 if let Some(table_options) = scan.parquet_options.as_ref() {
696 options = table_options.try_into()?;
697 }
698 let mut source = ParquetSource::new(options);
699
700 if let Some(predicate) = predicate {
701 source = source.with_predicate(predicate);
702 }
703 let base_config = parse_protobuf_file_scan_config(
704 base_conf,
705 ctx,
706 extension_codec,
707 Arc::new(source),
708 )?;
709 Ok(DataSourceExec::from_data_source(base_config))
710 }
711 #[cfg(not(feature = "parquet"))]
712 panic!("Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled")
713 }
714
715 #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
716 fn try_into_avro_scan_physical_plan(
717 &self,
718 scan: &protobuf::AvroScanExecNode,
719 ctx: &TaskContext,
720
721 extension_codec: &dyn PhysicalExtensionCodec,
722 ) -> Result<Arc<dyn ExecutionPlan>> {
723 #[cfg(feature = "avro")]
724 {
725 let conf = parse_protobuf_file_scan_config(
726 scan.base_conf.as_ref().unwrap(),
727 ctx,
728 extension_codec,
729 Arc::new(AvroSource::new()),
730 )?;
731 Ok(DataSourceExec::from_data_source(conf))
732 }
733 #[cfg(not(feature = "avro"))]
734 panic!("Unable to process a Avro PhysicalPlan when `avro` feature is not enabled")
735 }
736
737 fn try_into_memory_scan_physical_plan(
738 &self,
739 scan: &protobuf::MemoryScanExecNode,
740 ctx: &TaskContext,
741
742 extension_codec: &dyn PhysicalExtensionCodec,
743 ) -> Result<Arc<dyn ExecutionPlan>> {
744 let partitions = scan
745 .partitions
746 .iter()
747 .map(|p| parse_record_batches(p))
748 .collect::<Result<Vec<_>>>()?;
749
750 let proto_schema = scan.schema.as_ref().ok_or_else(|| {
751 internal_datafusion_err!("schema in MemoryScanExecNode is missing.")
752 })?;
753 let schema: SchemaRef = SchemaRef::new(proto_schema.try_into()?);
754
755 let projection = if !scan.projection.is_empty() {
756 Some(
757 scan.projection
758 .iter()
759 .map(|i| *i as usize)
760 .collect::<Vec<_>>(),
761 )
762 } else {
763 None
764 };
765
766 let mut sort_information = vec![];
767 for ordering in &scan.sort_information {
768 let sort_exprs = parse_physical_sort_exprs(
769 &ordering.physical_sort_expr_nodes,
770 ctx,
771 &schema,
772 extension_codec,
773 )?;
774 sort_information.extend(LexOrdering::new(sort_exprs));
775 }
776
777 let source = MemorySourceConfig::try_new(&partitions, schema, projection)?
778 .with_limit(scan.fetch.map(|f| f as usize))
779 .with_show_sizes(scan.show_sizes);
780
781 let source = source.try_with_sort_information(sort_information)?;
782
783 Ok(DataSourceExec::from_data_source(source))
784 }
785
786 fn try_into_coalesce_batches_physical_plan(
787 &self,
788 coalesce_batches: &protobuf::CoalesceBatchesExecNode,
789 ctx: &TaskContext,
790
791 extension_codec: &dyn PhysicalExtensionCodec,
792 ) -> Result<Arc<dyn ExecutionPlan>> {
793 let input: Arc<dyn ExecutionPlan> =
794 into_physical_plan(&coalesce_batches.input, ctx, extension_codec)?;
795 Ok(Arc::new(
796 CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize)
797 .with_fetch(coalesce_batches.fetch.map(|f| f as usize)),
798 ))
799 }
800
801 fn try_into_merge_physical_plan(
802 &self,
803 merge: &protobuf::CoalescePartitionsExecNode,
804 ctx: &TaskContext,
805
806 extension_codec: &dyn PhysicalExtensionCodec,
807 ) -> Result<Arc<dyn ExecutionPlan>> {
808 let input: Arc<dyn ExecutionPlan> =
809 into_physical_plan(&merge.input, ctx, extension_codec)?;
810 Ok(Arc::new(
811 CoalescePartitionsExec::new(input)
812 .with_fetch(merge.fetch.map(|f| f as usize)),
813 ))
814 }
815
816 fn try_into_repartition_physical_plan(
817 &self,
818 repart: &protobuf::RepartitionExecNode,
819 ctx: &TaskContext,
820
821 extension_codec: &dyn PhysicalExtensionCodec,
822 ) -> Result<Arc<dyn ExecutionPlan>> {
823 let input: Arc<dyn ExecutionPlan> =
824 into_physical_plan(&repart.input, ctx, extension_codec)?;
825 let partitioning = parse_protobuf_partitioning(
826 repart.partitioning.as_ref(),
827 ctx,
828 input.schema().as_ref(),
829 extension_codec,
830 )?;
831 Ok(Arc::new(RepartitionExec::try_new(
832 input,
833 partitioning.unwrap(),
834 )?))
835 }
836
837 fn try_into_global_limit_physical_plan(
838 &self,
839 limit: &protobuf::GlobalLimitExecNode,
840 ctx: &TaskContext,
841
842 extension_codec: &dyn PhysicalExtensionCodec,
843 ) -> Result<Arc<dyn ExecutionPlan>> {
844 let input: Arc<dyn ExecutionPlan> =
845 into_physical_plan(&limit.input, ctx, extension_codec)?;
846 let fetch = if limit.fetch >= 0 {
847 Some(limit.fetch as usize)
848 } else {
849 None
850 };
851 Ok(Arc::new(GlobalLimitExec::new(
852 input,
853 limit.skip as usize,
854 fetch,
855 )))
856 }
857
858 fn try_into_local_limit_physical_plan(
859 &self,
860 limit: &protobuf::LocalLimitExecNode,
861 ctx: &TaskContext,
862
863 extension_codec: &dyn PhysicalExtensionCodec,
864 ) -> Result<Arc<dyn ExecutionPlan>> {
865 let input: Arc<dyn ExecutionPlan> =
866 into_physical_plan(&limit.input, ctx, extension_codec)?;
867 Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
868 }
869
870 fn try_into_window_physical_plan(
871 &self,
872 window_agg: &protobuf::WindowAggExecNode,
873 ctx: &TaskContext,
874
875 extension_codec: &dyn PhysicalExtensionCodec,
876 ) -> Result<Arc<dyn ExecutionPlan>> {
877 let input: Arc<dyn ExecutionPlan> =
878 into_physical_plan(&window_agg.input, ctx, extension_codec)?;
879 let input_schema = input.schema();
880
881 let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
882 .window_expr
883 .iter()
884 .map(|window_expr| {
885 parse_physical_window_expr(
886 window_expr,
887 ctx,
888 input_schema.as_ref(),
889 extension_codec,
890 )
891 })
892 .collect::<Result<Vec<_>, _>>()?;
893
894 let partition_keys = window_agg
895 .partition_keys
896 .iter()
897 .map(|expr| {
898 parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
899 })
900 .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
901
902 if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() {
903 let input_order_mode = match input_order_mode {
904 window_agg_exec_node::InputOrderMode::Linear(_) => InputOrderMode::Linear,
905 window_agg_exec_node::InputOrderMode::PartiallySorted(
906 protobuf::PartiallySortedInputOrderMode { columns },
907 ) => InputOrderMode::PartiallySorted(
908 columns.iter().map(|c| *c as usize).collect(),
909 ),
910 window_agg_exec_node::InputOrderMode::Sorted(_) => InputOrderMode::Sorted,
911 };
912
913 Ok(Arc::new(BoundedWindowAggExec::try_new(
914 physical_window_expr,
915 input,
916 input_order_mode,
917 !partition_keys.is_empty(),
918 )?))
919 } else {
920 Ok(Arc::new(WindowAggExec::try_new(
921 physical_window_expr,
922 input,
923 !partition_keys.is_empty(),
924 )?))
925 }
926 }
927
928 fn try_into_aggregate_physical_plan(
929 &self,
930 hash_agg: &protobuf::AggregateExecNode,
931 ctx: &TaskContext,
932
933 extension_codec: &dyn PhysicalExtensionCodec,
934 ) -> Result<Arc<dyn ExecutionPlan>> {
935 let input: Arc<dyn ExecutionPlan> =
936 into_physical_plan(&hash_agg.input, ctx, extension_codec)?;
937 let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(|_| {
938 proto_error(format!(
939 "Received a AggregateNode message with unknown AggregateMode {}",
940 hash_agg.mode
941 ))
942 })?;
943 let agg_mode: AggregateMode = match mode {
944 protobuf::AggregateMode::Partial => AggregateMode::Partial,
945 protobuf::AggregateMode::Final => AggregateMode::Final,
946 protobuf::AggregateMode::FinalPartitioned => AggregateMode::FinalPartitioned,
947 protobuf::AggregateMode::Single => AggregateMode::Single,
948 protobuf::AggregateMode::SinglePartitioned => {
949 AggregateMode::SinglePartitioned
950 }
951 };
952
953 let num_expr = hash_agg.group_expr.len();
954
955 let group_expr = hash_agg
956 .group_expr
957 .iter()
958 .zip(hash_agg.group_expr_name.iter())
959 .map(|(expr, name)| {
960 parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
961 .map(|expr| (expr, name.to_string()))
962 })
963 .collect::<Result<Vec<_>, _>>()?;
964
965 let null_expr = hash_agg
966 .null_expr
967 .iter()
968 .zip(hash_agg.group_expr_name.iter())
969 .map(|(expr, name)| {
970 parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
971 .map(|expr| (expr, name.to_string()))
972 })
973 .collect::<Result<Vec<_>, _>>()?;
974
975 let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
976 hash_agg
977 .groups
978 .chunks(num_expr)
979 .map(|g| g.to_vec())
980 .collect::<Vec<Vec<bool>>>()
981 } else {
982 vec![]
983 };
984
985 let input_schema = hash_agg.input_schema.as_ref().ok_or_else(|| {
986 internal_datafusion_err!("input_schema in AggregateNode is missing.")
987 })?;
988 let physical_schema: SchemaRef = SchemaRef::new(input_schema.try_into()?);
989
990 let physical_filter_expr = hash_agg
991 .filter_expr
992 .iter()
993 .map(|expr| {
994 expr.expr
995 .as_ref()
996 .map(|e| {
997 parse_physical_expr(e, ctx, &physical_schema, extension_codec)
998 })
999 .transpose()
1000 })
1001 .collect::<Result<Vec<_>, _>>()?;
1002
1003 let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> = hash_agg
1004 .aggr_expr
1005 .iter()
1006 .zip(hash_agg.aggr_expr_name.iter())
1007 .map(|(expr, name)| {
1008 let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
1009 proto_error("Unexpected empty aggregate physical expression")
1010 })?;
1011
1012 match expr_type {
1013 ExprType::AggregateExpr(agg_node) => {
1014 let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node
1015 .expr
1016 .iter()
1017 .map(|e| {
1018 parse_physical_expr(
1019 e,
1020 ctx,
1021 &physical_schema,
1022 extension_codec,
1023 )
1024 })
1025 .collect::<Result<Vec<_>>>()?;
1026 let order_bys = agg_node
1027 .ordering_req
1028 .iter()
1029 .map(|e| {
1030 parse_physical_sort_expr(
1031 e,
1032 ctx,
1033 &physical_schema,
1034 extension_codec,
1035 )
1036 })
1037 .collect::<Result<_>>()?;
1038 agg_node
1039 .aggregate_function
1040 .as_ref()
1041 .map(|func| match func {
1042 AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
1043 let agg_udf = match &agg_node.fun_definition {
1044 Some(buf) => extension_codec
1045 .try_decode_udaf(udaf_name, buf)?,
1046 None => ctx.udaf(udaf_name).or_else(|_| {
1047 extension_codec
1048 .try_decode_udaf(udaf_name, &[])
1049 })?,
1050 };
1051
1052 AggregateExprBuilder::new(agg_udf, input_phy_expr)
1053 .schema(Arc::clone(&physical_schema))
1054 .alias(name)
1055 .human_display(agg_node.human_display.clone())
1056 .with_ignore_nulls(agg_node.ignore_nulls)
1057 .with_distinct(agg_node.distinct)
1058 .order_by(order_bys)
1059 .build()
1060 .map(Arc::new)
1061 }
1062 })
1063 .transpose()?
1064 .ok_or_else(|| {
1065 proto_error(
1066 "Invalid AggregateExpr, missing aggregate_function",
1067 )
1068 })
1069 }
1070 _ => internal_err!("Invalid aggregate expression for AggregateExec"),
1071 }
1072 })
1073 .collect::<Result<Vec<_>, _>>()?;
1074
1075 let limit = hash_agg
1076 .limit
1077 .as_ref()
1078 .map(|lit_value| lit_value.limit as usize);
1079
1080 let agg = AggregateExec::try_new(
1081 agg_mode,
1082 PhysicalGroupBy::new(group_expr, null_expr, groups),
1083 physical_aggr_expr,
1084 physical_filter_expr,
1085 input,
1086 physical_schema,
1087 )?;
1088
1089 let agg = agg.with_limit(limit);
1090
1091 Ok(Arc::new(agg))
1092 }
1093
1094 fn try_into_hash_join_physical_plan(
1095 &self,
1096 hashjoin: &protobuf::HashJoinExecNode,
1097 ctx: &TaskContext,
1098
1099 extension_codec: &dyn PhysicalExtensionCodec,
1100 ) -> Result<Arc<dyn ExecutionPlan>> {
1101 let left: Arc<dyn ExecutionPlan> =
1102 into_physical_plan(&hashjoin.left, ctx, extension_codec)?;
1103 let right: Arc<dyn ExecutionPlan> =
1104 into_physical_plan(&hashjoin.right, ctx, extension_codec)?;
1105 let left_schema = left.schema();
1106 let right_schema = right.schema();
1107 let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin
1108 .on
1109 .iter()
1110 .map(|col| {
1111 let left = parse_physical_expr(
1112 &col.left.clone().unwrap(),
1113 ctx,
1114 left_schema.as_ref(),
1115 extension_codec,
1116 )?;
1117 let right = parse_physical_expr(
1118 &col.right.clone().unwrap(),
1119 ctx,
1120 right_schema.as_ref(),
1121 extension_codec,
1122 )?;
1123 Ok((left, right))
1124 })
1125 .collect::<Result<_>>()?;
1126 let join_type =
1127 protobuf::JoinType::try_from(hashjoin.join_type).map_err(|_| {
1128 proto_error(format!(
1129 "Received a HashJoinNode message with unknown JoinType {}",
1130 hashjoin.join_type
1131 ))
1132 })?;
1133 let null_equality = protobuf::NullEquality::try_from(hashjoin.null_equality)
1134 .map_err(|_| {
1135 proto_error(format!(
1136 "Received a HashJoinNode message with unknown NullEquality {}",
1137 hashjoin.null_equality
1138 ))
1139 })?;
1140 let filter = hashjoin
1141 .filter
1142 .as_ref()
1143 .map(|f| {
1144 let schema = f
1145 .schema
1146 .as_ref()
1147 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1148 .try_into()?;
1149
1150 let expression = parse_physical_expr(
1151 f.expression.as_ref().ok_or_else(|| {
1152 proto_error("Unexpected empty filter expression")
1153 })?,
1154 ctx, &schema,
1155 extension_codec,
1156 )?;
1157 let column_indices = f.column_indices
1158 .iter()
1159 .map(|i| {
1160 let side = protobuf::JoinSide::try_from(i.side)
1161 .map_err(|_| proto_error(format!(
1162 "Received a HashJoinNode message with JoinSide in Filter {}",
1163 i.side))
1164 )?;
1165
1166 Ok(ColumnIndex {
1167 index: i.index as usize,
1168 side: side.into(),
1169 })
1170 })
1171 .collect::<Result<Vec<_>>>()?;
1172
1173 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1174 })
1175 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1176
1177 let partition_mode = protobuf::PartitionMode::try_from(hashjoin.partition_mode)
1178 .map_err(|_| {
1179 proto_error(format!(
1180 "Received a HashJoinNode message with unknown PartitionMode {}",
1181 hashjoin.partition_mode
1182 ))
1183 })?;
1184 let partition_mode = match partition_mode {
1185 protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
1186 protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
1187 protobuf::PartitionMode::Auto => PartitionMode::Auto,
1188 };
1189 let projection = if !hashjoin.projection.is_empty() {
1190 Some(
1191 hashjoin
1192 .projection
1193 .iter()
1194 .map(|i| *i as usize)
1195 .collect::<Vec<_>>(),
1196 )
1197 } else {
1198 None
1199 };
1200 Ok(Arc::new(HashJoinExec::try_new(
1201 left,
1202 right,
1203 on,
1204 filter,
1205 &join_type.into(),
1206 projection,
1207 partition_mode,
1208 null_equality.into(),
1209 )?))
1210 }
1211
1212 fn try_into_symmetric_hash_join_physical_plan(
1213 &self,
1214 sym_join: &protobuf::SymmetricHashJoinExecNode,
1215 ctx: &TaskContext,
1216
1217 extension_codec: &dyn PhysicalExtensionCodec,
1218 ) -> Result<Arc<dyn ExecutionPlan>> {
1219 let left = into_physical_plan(&sym_join.left, ctx, extension_codec)?;
1220 let right = into_physical_plan(&sym_join.right, ctx, extension_codec)?;
1221 let left_schema = left.schema();
1222 let right_schema = right.schema();
1223 let on = sym_join
1224 .on
1225 .iter()
1226 .map(|col| {
1227 let left = parse_physical_expr(
1228 &col.left.clone().unwrap(),
1229 ctx,
1230 left_schema.as_ref(),
1231 extension_codec,
1232 )?;
1233 let right = parse_physical_expr(
1234 &col.right.clone().unwrap(),
1235 ctx,
1236 right_schema.as_ref(),
1237 extension_codec,
1238 )?;
1239 Ok((left, right))
1240 })
1241 .collect::<Result<_>>()?;
1242 let join_type =
1243 protobuf::JoinType::try_from(sym_join.join_type).map_err(|_| {
1244 proto_error(format!(
1245 "Received a SymmetricHashJoin message with unknown JoinType {}",
1246 sym_join.join_type
1247 ))
1248 })?;
1249 let null_equality = protobuf::NullEquality::try_from(sym_join.null_equality)
1250 .map_err(|_| {
1251 proto_error(format!(
1252 "Received a SymmetricHashJoin message with unknown NullEquality {}",
1253 sym_join.null_equality
1254 ))
1255 })?;
1256 let filter = sym_join
1257 .filter
1258 .as_ref()
1259 .map(|f| {
1260 let schema = f
1261 .schema
1262 .as_ref()
1263 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1264 .try_into()?;
1265
1266 let expression = parse_physical_expr(
1267 f.expression.as_ref().ok_or_else(|| {
1268 proto_error("Unexpected empty filter expression")
1269 })?,
1270 ctx, &schema,
1271 extension_codec,
1272 )?;
1273 let column_indices = f.column_indices
1274 .iter()
1275 .map(|i| {
1276 let side = protobuf::JoinSide::try_from(i.side)
1277 .map_err(|_| proto_error(format!(
1278 "Received a HashJoinNode message with JoinSide in Filter {}",
1279 i.side))
1280 )?;
1281
1282 Ok(ColumnIndex {
1283 index: i.index as usize,
1284 side: side.into(),
1285 })
1286 })
1287 .collect::<Result<_>>()?;
1288
1289 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1290 })
1291 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1292
1293 let left_sort_exprs = parse_physical_sort_exprs(
1294 &sym_join.left_sort_exprs,
1295 ctx,
1296 &left_schema,
1297 extension_codec,
1298 )?;
1299 let left_sort_exprs = LexOrdering::new(left_sort_exprs);
1300
1301 let right_sort_exprs = parse_physical_sort_exprs(
1302 &sym_join.right_sort_exprs,
1303 ctx,
1304 &right_schema,
1305 extension_codec,
1306 )?;
1307 let right_sort_exprs = LexOrdering::new(right_sort_exprs);
1308
1309 let partition_mode = protobuf::StreamPartitionMode::try_from(
1310 sym_join.partition_mode,
1311 )
1312 .map_err(|_| {
1313 proto_error(format!(
1314 "Received a SymmetricHashJoin message with unknown PartitionMode {}",
1315 sym_join.partition_mode
1316 ))
1317 })?;
1318 let partition_mode = match partition_mode {
1319 protobuf::StreamPartitionMode::SinglePartition => {
1320 StreamJoinPartitionMode::SinglePartition
1321 }
1322 protobuf::StreamPartitionMode::PartitionedExec => {
1323 StreamJoinPartitionMode::Partitioned
1324 }
1325 };
1326 SymmetricHashJoinExec::try_new(
1327 left,
1328 right,
1329 on,
1330 filter,
1331 &join_type.into(),
1332 null_equality.into(),
1333 left_sort_exprs,
1334 right_sort_exprs,
1335 partition_mode,
1336 )
1337 .map(|e| Arc::new(e) as _)
1338 }
1339
1340 fn try_into_union_physical_plan(
1341 &self,
1342 union: &protobuf::UnionExecNode,
1343 ctx: &TaskContext,
1344
1345 extension_codec: &dyn PhysicalExtensionCodec,
1346 ) -> Result<Arc<dyn ExecutionPlan>> {
1347 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1348 for input in &union.inputs {
1349 inputs.push(input.try_into_physical_plan(ctx, extension_codec)?);
1350 }
1351 UnionExec::try_new(inputs)
1352 }
1353
1354 fn try_into_interleave_physical_plan(
1355 &self,
1356 interleave: &protobuf::InterleaveExecNode,
1357 ctx: &TaskContext,
1358
1359 extension_codec: &dyn PhysicalExtensionCodec,
1360 ) -> Result<Arc<dyn ExecutionPlan>> {
1361 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1362 for input in &interleave.inputs {
1363 inputs.push(input.try_into_physical_plan(ctx, extension_codec)?);
1364 }
1365 Ok(Arc::new(InterleaveExec::try_new(inputs)?))
1366 }
1367
1368 fn try_into_cross_join_physical_plan(
1369 &self,
1370 crossjoin: &protobuf::CrossJoinExecNode,
1371 ctx: &TaskContext,
1372
1373 extension_codec: &dyn PhysicalExtensionCodec,
1374 ) -> Result<Arc<dyn ExecutionPlan>> {
1375 let left: Arc<dyn ExecutionPlan> =
1376 into_physical_plan(&crossjoin.left, ctx, extension_codec)?;
1377 let right: Arc<dyn ExecutionPlan> =
1378 into_physical_plan(&crossjoin.right, ctx, extension_codec)?;
1379 Ok(Arc::new(CrossJoinExec::new(left, right)))
1380 }
1381
1382 fn try_into_empty_physical_plan(
1383 &self,
1384 empty: &protobuf::EmptyExecNode,
1385 _ctx: &TaskContext,
1386
1387 _extension_codec: &dyn PhysicalExtensionCodec,
1388 ) -> Result<Arc<dyn ExecutionPlan>> {
1389 let schema = Arc::new(convert_required!(empty.schema)?);
1390 Ok(Arc::new(EmptyExec::new(schema)))
1391 }
1392
1393 fn try_into_placeholder_row_physical_plan(
1394 &self,
1395 placeholder: &protobuf::PlaceholderRowExecNode,
1396 _ctx: &TaskContext,
1397
1398 _extension_codec: &dyn PhysicalExtensionCodec,
1399 ) -> Result<Arc<dyn ExecutionPlan>> {
1400 let schema = Arc::new(convert_required!(placeholder.schema)?);
1401 Ok(Arc::new(PlaceholderRowExec::new(schema)))
1402 }
1403
1404 fn try_into_sort_physical_plan(
1405 &self,
1406 sort: &protobuf::SortExecNode,
1407 ctx: &TaskContext,
1408
1409 extension_codec: &dyn PhysicalExtensionCodec,
1410 ) -> Result<Arc<dyn ExecutionPlan>> {
1411 let input = into_physical_plan(&sort.input, ctx, extension_codec)?;
1412 let exprs = sort
1413 .expr
1414 .iter()
1415 .map(|expr| {
1416 let expr = expr.expr_type.as_ref().ok_or_else(|| {
1417 proto_error(format!(
1418 "physical_plan::from_proto() Unexpected expr {self:?}"
1419 ))
1420 })?;
1421 if let ExprType::Sort(sort_expr) = expr {
1422 let expr = sort_expr
1423 .expr
1424 .as_ref()
1425 .ok_or_else(|| {
1426 proto_error(format!(
1427 "physical_plan::from_proto() Unexpected sort expr {self:?}"
1428 ))
1429 })?
1430 .as_ref();
1431 Ok(PhysicalSortExpr {
1432 expr: parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)?,
1433 options: SortOptions {
1434 descending: !sort_expr.asc,
1435 nulls_first: sort_expr.nulls_first,
1436 },
1437 })
1438 } else {
1439 internal_err!(
1440 "physical_plan::from_proto() {self:?}"
1441 )
1442 }
1443 })
1444 .collect::<Result<Vec<_>>>()?;
1445 let Some(ordering) = LexOrdering::new(exprs) else {
1446 return internal_err!("SortExec requires an ordering");
1447 };
1448 let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1449 let new_sort = SortExec::new(ordering, input)
1450 .with_fetch(fetch)
1451 .with_preserve_partitioning(sort.preserve_partitioning);
1452
1453 Ok(Arc::new(new_sort))
1454 }
1455
1456 fn try_into_sort_preserving_merge_physical_plan(
1457 &self,
1458 sort: &protobuf::SortPreservingMergeExecNode,
1459 ctx: &TaskContext,
1460
1461 extension_codec: &dyn PhysicalExtensionCodec,
1462 ) -> Result<Arc<dyn ExecutionPlan>> {
1463 let input = into_physical_plan(&sort.input, ctx, extension_codec)?;
1464 let exprs = sort
1465 .expr
1466 .iter()
1467 .map(|expr| {
1468 let expr = expr.expr_type.as_ref().ok_or_else(|| {
1469 proto_error(format!(
1470 "physical_plan::from_proto() Unexpected expr {self:?}"
1471 ))
1472 })?;
1473 if let ExprType::Sort(sort_expr) = expr {
1474 let expr = sort_expr
1475 .expr
1476 .as_ref()
1477 .ok_or_else(|| {
1478 proto_error(format!(
1479 "physical_plan::from_proto() Unexpected sort expr {self:?}"
1480 ))
1481 })?
1482 .as_ref();
1483 Ok(PhysicalSortExpr {
1484 expr: parse_physical_expr(
1485 expr,
1486 ctx,
1487 input.schema().as_ref(),
1488 extension_codec,
1489 )?,
1490 options: SortOptions {
1491 descending: !sort_expr.asc,
1492 nulls_first: sort_expr.nulls_first,
1493 },
1494 })
1495 } else {
1496 internal_err!("physical_plan::from_proto() {self:?}")
1497 }
1498 })
1499 .collect::<Result<Vec<_>>>()?;
1500 let Some(ordering) = LexOrdering::new(exprs) else {
1501 return internal_err!("SortExec requires an ordering");
1502 };
1503 let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1504 Ok(Arc::new(
1505 SortPreservingMergeExec::new(ordering, input).with_fetch(fetch),
1506 ))
1507 }
1508
1509 fn try_into_extension_physical_plan(
1510 &self,
1511 extension: &protobuf::PhysicalExtensionNode,
1512 ctx: &TaskContext,
1513
1514 extension_codec: &dyn PhysicalExtensionCodec,
1515 ) -> Result<Arc<dyn ExecutionPlan>> {
1516 let inputs: Vec<Arc<dyn ExecutionPlan>> = extension
1517 .inputs
1518 .iter()
1519 .map(|i| i.try_into_physical_plan(ctx, extension_codec))
1520 .collect::<Result<_>>()?;
1521
1522 let extension_node =
1523 extension_codec.try_decode(extension.node.as_slice(), &inputs, ctx)?;
1524
1525 Ok(extension_node)
1526 }
1527
1528 fn try_into_nested_loop_join_physical_plan(
1529 &self,
1530 join: &protobuf::NestedLoopJoinExecNode,
1531 ctx: &TaskContext,
1532
1533 extension_codec: &dyn PhysicalExtensionCodec,
1534 ) -> Result<Arc<dyn ExecutionPlan>> {
1535 let left: Arc<dyn ExecutionPlan> =
1536 into_physical_plan(&join.left, ctx, extension_codec)?;
1537 let right: Arc<dyn ExecutionPlan> =
1538 into_physical_plan(&join.right, ctx, extension_codec)?;
1539 let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| {
1540 proto_error(format!(
1541 "Received a NestedLoopJoinExecNode message with unknown JoinType {}",
1542 join.join_type
1543 ))
1544 })?;
1545 let filter = join
1546 .filter
1547 .as_ref()
1548 .map(|f| {
1549 let schema = f
1550 .schema
1551 .as_ref()
1552 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1553 .try_into()?;
1554
1555 let expression = parse_physical_expr(
1556 f.expression.as_ref().ok_or_else(|| {
1557 proto_error("Unexpected empty filter expression")
1558 })?,
1559 ctx, &schema,
1560 extension_codec,
1561 )?;
1562 let column_indices = f.column_indices
1563 .iter()
1564 .map(|i| {
1565 let side = protobuf::JoinSide::try_from(i.side)
1566 .map_err(|_| proto_error(format!(
1567 "Received a NestedLoopJoinExecNode message with JoinSide in Filter {}",
1568 i.side))
1569 )?;
1570
1571 Ok(ColumnIndex {
1572 index: i.index as usize,
1573 side: side.into(),
1574 })
1575 })
1576 .collect::<Result<Vec<_>>>()?;
1577
1578 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1579 })
1580 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1581
1582 let projection = if !join.projection.is_empty() {
1583 Some(
1584 join.projection
1585 .iter()
1586 .map(|i| *i as usize)
1587 .collect::<Vec<_>>(),
1588 )
1589 } else {
1590 None
1591 };
1592
1593 Ok(Arc::new(NestedLoopJoinExec::try_new(
1594 left,
1595 right,
1596 filter,
1597 &join_type.into(),
1598 projection,
1599 )?))
1600 }
1601
1602 fn try_into_analyze_physical_plan(
1603 &self,
1604 analyze: &protobuf::AnalyzeExecNode,
1605 ctx: &TaskContext,
1606
1607 extension_codec: &dyn PhysicalExtensionCodec,
1608 ) -> Result<Arc<dyn ExecutionPlan>> {
1609 let input: Arc<dyn ExecutionPlan> =
1610 into_physical_plan(&analyze.input, ctx, extension_codec)?;
1611 Ok(Arc::new(AnalyzeExec::new(
1612 analyze.verbose,
1613 analyze.show_statistics,
1614 vec![MetricType::SUMMARY, MetricType::DEV],
1615 input,
1616 Arc::new(convert_required!(analyze.schema)?),
1617 )))
1618 }
1619
1620 fn try_into_json_sink_physical_plan(
1621 &self,
1622 sink: &protobuf::JsonSinkExecNode,
1623 ctx: &TaskContext,
1624
1625 extension_codec: &dyn PhysicalExtensionCodec,
1626 ) -> Result<Arc<dyn ExecutionPlan>> {
1627 let input = into_physical_plan(&sink.input, ctx, extension_codec)?;
1628
1629 let data_sink: JsonSink = sink
1630 .sink
1631 .as_ref()
1632 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1633 .try_into()?;
1634 let sink_schema = input.schema();
1635 let sort_order = sink
1636 .sort_order
1637 .as_ref()
1638 .map(|collection| {
1639 parse_physical_sort_exprs(
1640 &collection.physical_sort_expr_nodes,
1641 ctx,
1642 &sink_schema,
1643 extension_codec,
1644 )
1645 .map(|sort_exprs| {
1646 LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1647 })
1648 })
1649 .transpose()?
1650 .flatten();
1651 Ok(Arc::new(DataSinkExec::new(
1652 input,
1653 Arc::new(data_sink),
1654 sort_order,
1655 )))
1656 }
1657
1658 fn try_into_csv_sink_physical_plan(
1659 &self,
1660 sink: &protobuf::CsvSinkExecNode,
1661 ctx: &TaskContext,
1662
1663 extension_codec: &dyn PhysicalExtensionCodec,
1664 ) -> Result<Arc<dyn ExecutionPlan>> {
1665 let input = into_physical_plan(&sink.input, ctx, extension_codec)?;
1666
1667 let data_sink: CsvSink = sink
1668 .sink
1669 .as_ref()
1670 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1671 .try_into()?;
1672 let sink_schema = input.schema();
1673 let sort_order = sink
1674 .sort_order
1675 .as_ref()
1676 .map(|collection| {
1677 parse_physical_sort_exprs(
1678 &collection.physical_sort_expr_nodes,
1679 ctx,
1680 &sink_schema,
1681 extension_codec,
1682 )
1683 .map(|sort_exprs| {
1684 LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1685 })
1686 })
1687 .transpose()?
1688 .flatten();
1689 Ok(Arc::new(DataSinkExec::new(
1690 input,
1691 Arc::new(data_sink),
1692 sort_order,
1693 )))
1694 }
1695
1696 #[cfg_attr(not(feature = "parquet"), expect(unused_variables))]
1697 fn try_into_parquet_sink_physical_plan(
1698 &self,
1699 sink: &protobuf::ParquetSinkExecNode,
1700 ctx: &TaskContext,
1701
1702 extension_codec: &dyn PhysicalExtensionCodec,
1703 ) -> Result<Arc<dyn ExecutionPlan>> {
1704 #[cfg(feature = "parquet")]
1705 {
1706 let input = into_physical_plan(&sink.input, ctx, extension_codec)?;
1707
1708 let data_sink: ParquetSink = sink
1709 .sink
1710 .as_ref()
1711 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1712 .try_into()?;
1713 let sink_schema = input.schema();
1714 let sort_order = sink
1715 .sort_order
1716 .as_ref()
1717 .map(|collection| {
1718 parse_physical_sort_exprs(
1719 &collection.physical_sort_expr_nodes,
1720 ctx,
1721 &sink_schema,
1722 extension_codec,
1723 )
1724 .map(|sort_exprs| {
1725 LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1726 })
1727 })
1728 .transpose()?
1729 .flatten();
1730 Ok(Arc::new(DataSinkExec::new(
1731 input,
1732 Arc::new(data_sink),
1733 sort_order,
1734 )))
1735 }
1736 #[cfg(not(feature = "parquet"))]
1737 panic!("Trying to use ParquetSink without `parquet` feature enabled");
1738 }
1739
1740 fn try_into_unnest_physical_plan(
1741 &self,
1742 unnest: &protobuf::UnnestExecNode,
1743 ctx: &TaskContext,
1744
1745 extension_codec: &dyn PhysicalExtensionCodec,
1746 ) -> Result<Arc<dyn ExecutionPlan>> {
1747 let input = into_physical_plan(&unnest.input, ctx, extension_codec)?;
1748
1749 Ok(Arc::new(UnnestExec::new(
1750 input,
1751 unnest
1752 .list_type_columns
1753 .iter()
1754 .map(|c| ListUnnest {
1755 index_in_input_schema: c.index_in_input_schema as _,
1756 depth: c.depth as _,
1757 })
1758 .collect(),
1759 unnest.struct_type_columns.iter().map(|c| *c as _).collect(),
1760 Arc::new(convert_required!(unnest.schema)?),
1761 into_required!(unnest.options)?,
1762 )?))
1763 }
1764
1765 fn generate_series_name_to_str(name: protobuf::GenerateSeriesName) -> &'static str {
1766 match name {
1767 protobuf::GenerateSeriesName::GsGenerateSeries => "generate_series",
1768 protobuf::GenerateSeriesName::GsRange => "range",
1769 }
1770 }
1771 fn try_into_sort_join(
1772 &self,
1773 sort_join: &SortMergeJoinExecNode,
1774 ctx: &TaskContext,
1775
1776 extension_codec: &dyn PhysicalExtensionCodec,
1777 ) -> Result<Arc<dyn ExecutionPlan>> {
1778 let left = into_physical_plan(&sort_join.left, ctx, extension_codec)?;
1779 let left_schema = left.schema();
1780 let right = into_physical_plan(&sort_join.right, ctx, extension_codec)?;
1781 let right_schema = right.schema();
1782
1783 let filter = sort_join
1784 .filter
1785 .as_ref()
1786 .map(|f| {
1787 let schema = f
1788 .schema
1789 .as_ref()
1790 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1791 .try_into()?;
1792
1793 let expression = parse_physical_expr(
1794 f.expression.as_ref().ok_or_else(|| {
1795 proto_error("Unexpected empty filter expression")
1796 })?,
1797 ctx,
1798 &schema,
1799 extension_codec,
1800 )?;
1801 let column_indices = f
1802 .column_indices
1803 .iter()
1804 .map(|i| {
1805 let side =
1806 protobuf::JoinSide::try_from(i.side).map_err(|_| {
1807 proto_error(format!(
1808 "Received a SortMergeJoinExecNode message with JoinSide in Filter {}",
1809 i.side
1810 ))
1811 })?;
1812
1813 Ok(ColumnIndex {
1814 index: i.index as usize,
1815 side: side.into(),
1816 })
1817 })
1818 .collect::<Result<Vec<_>>>()?;
1819
1820 Ok(JoinFilter::new(
1821 expression,
1822 column_indices,
1823 Arc::new(schema),
1824 ))
1825 })
1826 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1827
1828 let join_type =
1829 protobuf::JoinType::try_from(sort_join.join_type).map_err(|_| {
1830 proto_error(format!(
1831 "Received a SortMergeJoinExecNode message with unknown JoinType {}",
1832 sort_join.join_type
1833 ))
1834 })?;
1835
1836 let null_equality = protobuf::NullEquality::try_from(sort_join.null_equality)
1837 .map_err(|_| {
1838 proto_error(format!(
1839 "Received a SortMergeJoinExecNode message with unknown NullEquality {}",
1840 sort_join.null_equality
1841 ))
1842 })?;
1843
1844 let sort_options = sort_join
1845 .sort_options
1846 .iter()
1847 .map(|e| SortOptions {
1848 descending: !e.asc,
1849 nulls_first: e.nulls_first,
1850 })
1851 .collect();
1852 let on = sort_join
1853 .on
1854 .iter()
1855 .map(|col| {
1856 let left = parse_physical_expr(
1857 &col.left.clone().unwrap(),
1858 ctx,
1859 left_schema.as_ref(),
1860 extension_codec,
1861 )?;
1862 let right = parse_physical_expr(
1863 &col.right.clone().unwrap(),
1864 ctx,
1865 right_schema.as_ref(),
1866 extension_codec,
1867 )?;
1868 Ok((left, right))
1869 })
1870 .collect::<Result<_>>()?;
1871
1872 Ok(Arc::new(SortMergeJoinExec::try_new(
1873 left,
1874 right,
1875 on,
1876 filter,
1877 join_type.into(),
1878 sort_options,
1879 null_equality.into(),
1880 )?))
1881 }
1882
1883 fn try_into_generate_series_physical_plan(
1884 &self,
1885 generate_series: &protobuf::GenerateSeriesNode,
1886 ) -> Result<Arc<dyn ExecutionPlan>> {
1887 let schema: SchemaRef = Arc::new(convert_required!(generate_series.schema)?);
1888
1889 let args = match &generate_series.args {
1890 Some(protobuf::generate_series_node::Args::ContainsNull(args)) => {
1891 GenSeriesArgs::ContainsNull {
1892 name: Self::generate_series_name_to_str(args.name()),
1893 }
1894 }
1895 Some(protobuf::generate_series_node::Args::Int64Args(args)) => {
1896 GenSeriesArgs::Int64Args {
1897 start: args.start,
1898 end: args.end,
1899 step: args.step,
1900 include_end: args.include_end,
1901 name: Self::generate_series_name_to_str(args.name()),
1902 }
1903 }
1904 Some(protobuf::generate_series_node::Args::TimestampArgs(args)) => {
1905 let step_proto = args.step.as_ref().ok_or_else(|| {
1906 internal_datafusion_err!("Missing step in TimestampArgs")
1907 })?;
1908 let step = IntervalMonthDayNanoType::make_value(
1909 step_proto.months,
1910 step_proto.days,
1911 step_proto.nanos,
1912 );
1913 GenSeriesArgs::TimestampArgs {
1914 start: args.start,
1915 end: args.end,
1916 step,
1917 tz: args.tz.as_ref().map(|s| Arc::from(s.as_str())),
1918 include_end: args.include_end,
1919 name: Self::generate_series_name_to_str(args.name()),
1920 }
1921 }
1922 Some(protobuf::generate_series_node::Args::DateArgs(args)) => {
1923 let step_proto = args.step.as_ref().ok_or_else(|| {
1924 internal_datafusion_err!("Missing step in DateArgs")
1925 })?;
1926 let step = IntervalMonthDayNanoType::make_value(
1927 step_proto.months,
1928 step_proto.days,
1929 step_proto.nanos,
1930 );
1931 GenSeriesArgs::DateArgs {
1932 start: args.start,
1933 end: args.end,
1934 step,
1935 include_end: args.include_end,
1936 name: Self::generate_series_name_to_str(args.name()),
1937 }
1938 }
1939 None => return internal_err!("Missing args in GenerateSeriesNode"),
1940 };
1941
1942 let table = GenerateSeriesTable::new(Arc::clone(&schema), args);
1943 let generator = table.as_generator(generate_series.target_batch_size as usize)?;
1944
1945 Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?))
1946 }
1947
1948 fn try_into_cooperative_physical_plan(
1949 &self,
1950 field_stream: &protobuf::CooperativeExecNode,
1951 ctx: &TaskContext,
1952
1953 extension_codec: &dyn PhysicalExtensionCodec,
1954 ) -> Result<Arc<dyn ExecutionPlan>> {
1955 let input = into_physical_plan(&field_stream.input, ctx, extension_codec)?;
1956 Ok(Arc::new(CooperativeExec::new(input)))
1957 }
1958
1959 fn try_from_explain_exec(
1960 exec: &ExplainExec,
1961 _extension_codec: &dyn PhysicalExtensionCodec,
1962 ) -> Result<Self> {
1963 Ok(protobuf::PhysicalPlanNode {
1964 physical_plan_type: Some(PhysicalPlanType::Explain(
1965 protobuf::ExplainExecNode {
1966 schema: Some(exec.schema().as_ref().try_into()?),
1967 stringified_plans: exec
1968 .stringified_plans()
1969 .iter()
1970 .map(|plan| plan.into())
1971 .collect(),
1972 verbose: exec.verbose(),
1973 },
1974 )),
1975 })
1976 }
1977
1978 fn try_from_projection_exec(
1979 exec: &ProjectionExec,
1980 extension_codec: &dyn PhysicalExtensionCodec,
1981 ) -> Result<Self> {
1982 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1983 exec.input().to_owned(),
1984 extension_codec,
1985 )?;
1986 let expr = exec
1987 .expr()
1988 .iter()
1989 .map(|proj_expr| serialize_physical_expr(&proj_expr.expr, extension_codec))
1990 .collect::<Result<Vec<_>>>()?;
1991 let expr_name = exec
1992 .expr()
1993 .iter()
1994 .map(|proj_expr| proj_expr.alias.clone())
1995 .collect();
1996 Ok(protobuf::PhysicalPlanNode {
1997 physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
1998 protobuf::ProjectionExecNode {
1999 input: Some(Box::new(input)),
2000 expr,
2001 expr_name,
2002 },
2003 ))),
2004 })
2005 }
2006
2007 fn try_from_analyze_exec(
2008 exec: &AnalyzeExec,
2009 extension_codec: &dyn PhysicalExtensionCodec,
2010 ) -> Result<Self> {
2011 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2012 exec.input().to_owned(),
2013 extension_codec,
2014 )?;
2015 Ok(protobuf::PhysicalPlanNode {
2016 physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
2017 protobuf::AnalyzeExecNode {
2018 verbose: exec.verbose(),
2019 show_statistics: exec.show_statistics(),
2020 input: Some(Box::new(input)),
2021 schema: Some(exec.schema().as_ref().try_into()?),
2022 },
2023 ))),
2024 })
2025 }
2026
2027 fn try_from_filter_exec(
2028 exec: &FilterExec,
2029 extension_codec: &dyn PhysicalExtensionCodec,
2030 ) -> Result<Self> {
2031 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2032 exec.input().to_owned(),
2033 extension_codec,
2034 )?;
2035 Ok(protobuf::PhysicalPlanNode {
2036 physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
2037 protobuf::FilterExecNode {
2038 input: Some(Box::new(input)),
2039 expr: Some(serialize_physical_expr(
2040 exec.predicate(),
2041 extension_codec,
2042 )?),
2043 default_filter_selectivity: exec.default_selectivity() as u32,
2044 projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
2045 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2046 }),
2047 },
2048 ))),
2049 })
2050 }
2051
2052 fn try_from_global_limit_exec(
2053 limit: &GlobalLimitExec,
2054 extension_codec: &dyn PhysicalExtensionCodec,
2055 ) -> Result<Self> {
2056 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2057 limit.input().to_owned(),
2058 extension_codec,
2059 )?;
2060
2061 Ok(protobuf::PhysicalPlanNode {
2062 physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
2063 protobuf::GlobalLimitExecNode {
2064 input: Some(Box::new(input)),
2065 skip: limit.skip() as u32,
2066 fetch: match limit.fetch() {
2067 Some(n) => n as i64,
2068 _ => -1, },
2070 },
2071 ))),
2072 })
2073 }
2074
2075 fn try_from_local_limit_exec(
2076 limit: &LocalLimitExec,
2077 extension_codec: &dyn PhysicalExtensionCodec,
2078 ) -> Result<Self> {
2079 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2080 limit.input().to_owned(),
2081 extension_codec,
2082 )?;
2083 Ok(protobuf::PhysicalPlanNode {
2084 physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
2085 protobuf::LocalLimitExecNode {
2086 input: Some(Box::new(input)),
2087 fetch: limit.fetch() as u32,
2088 },
2089 ))),
2090 })
2091 }
2092
2093 fn try_from_hash_join_exec(
2094 exec: &HashJoinExec,
2095 extension_codec: &dyn PhysicalExtensionCodec,
2096 ) -> Result<Self> {
2097 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2098 exec.left().to_owned(),
2099 extension_codec,
2100 )?;
2101 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2102 exec.right().to_owned(),
2103 extension_codec,
2104 )?;
2105 let on: Vec<protobuf::JoinOn> = exec
2106 .on()
2107 .iter()
2108 .map(|tuple| {
2109 let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2110 let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2111 Ok::<_, DataFusionError>(protobuf::JoinOn {
2112 left: Some(l),
2113 right: Some(r),
2114 })
2115 })
2116 .collect::<Result<_>>()?;
2117 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2118 let null_equality: protobuf::NullEquality = exec.null_equality().into();
2119 let filter = exec
2120 .filter()
2121 .as_ref()
2122 .map(|f| {
2123 let expression =
2124 serialize_physical_expr(f.expression(), extension_codec)?;
2125 let column_indices = f
2126 .column_indices()
2127 .iter()
2128 .map(|i| {
2129 let side: protobuf::JoinSide = i.side.to_owned().into();
2130 protobuf::ColumnIndex {
2131 index: i.index as u32,
2132 side: side.into(),
2133 }
2134 })
2135 .collect();
2136 let schema = f.schema().as_ref().try_into()?;
2137 Ok(protobuf::JoinFilter {
2138 expression: Some(expression),
2139 column_indices,
2140 schema: Some(schema),
2141 })
2142 })
2143 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2144
2145 let partition_mode = match exec.partition_mode() {
2146 PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
2147 PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
2148 PartitionMode::Auto => protobuf::PartitionMode::Auto,
2149 };
2150
2151 Ok(protobuf::PhysicalPlanNode {
2152 physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
2153 protobuf::HashJoinExecNode {
2154 left: Some(Box::new(left)),
2155 right: Some(Box::new(right)),
2156 on,
2157 join_type: join_type.into(),
2158 partition_mode: partition_mode.into(),
2159 null_equality: null_equality.into(),
2160 filter,
2161 projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
2162 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2163 }),
2164 },
2165 ))),
2166 })
2167 }
2168
2169 fn try_from_symmetric_hash_join_exec(
2170 exec: &SymmetricHashJoinExec,
2171 extension_codec: &dyn PhysicalExtensionCodec,
2172 ) -> Result<Self> {
2173 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2174 exec.left().to_owned(),
2175 extension_codec,
2176 )?;
2177 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2178 exec.right().to_owned(),
2179 extension_codec,
2180 )?;
2181 let on = exec
2182 .on()
2183 .iter()
2184 .map(|tuple| {
2185 let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2186 let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2187 Ok::<_, DataFusionError>(protobuf::JoinOn {
2188 left: Some(l),
2189 right: Some(r),
2190 })
2191 })
2192 .collect::<Result<_>>()?;
2193 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2194 let null_equality: protobuf::NullEquality = exec.null_equality().into();
2195 let filter = exec
2196 .filter()
2197 .as_ref()
2198 .map(|f| {
2199 let expression =
2200 serialize_physical_expr(f.expression(), extension_codec)?;
2201 let column_indices = f
2202 .column_indices()
2203 .iter()
2204 .map(|i| {
2205 let side: protobuf::JoinSide = i.side.to_owned().into();
2206 protobuf::ColumnIndex {
2207 index: i.index as u32,
2208 side: side.into(),
2209 }
2210 })
2211 .collect();
2212 let schema = f.schema().as_ref().try_into()?;
2213 Ok(protobuf::JoinFilter {
2214 expression: Some(expression),
2215 column_indices,
2216 schema: Some(schema),
2217 })
2218 })
2219 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2220
2221 let partition_mode = match exec.partition_mode() {
2222 StreamJoinPartitionMode::SinglePartition => {
2223 protobuf::StreamPartitionMode::SinglePartition
2224 }
2225 StreamJoinPartitionMode::Partitioned => {
2226 protobuf::StreamPartitionMode::PartitionedExec
2227 }
2228 };
2229
2230 let left_sort_exprs = exec
2231 .left_sort_exprs()
2232 .map(|exprs| {
2233 exprs
2234 .iter()
2235 .map(|expr| {
2236 Ok(protobuf::PhysicalSortExprNode {
2237 expr: Some(Box::new(serialize_physical_expr(
2238 &expr.expr,
2239 extension_codec,
2240 )?)),
2241 asc: !expr.options.descending,
2242 nulls_first: expr.options.nulls_first,
2243 })
2244 })
2245 .collect::<Result<Vec<_>>>()
2246 })
2247 .transpose()?
2248 .unwrap_or(vec![]);
2249
2250 let right_sort_exprs = exec
2251 .right_sort_exprs()
2252 .map(|exprs| {
2253 exprs
2254 .iter()
2255 .map(|expr| {
2256 Ok(protobuf::PhysicalSortExprNode {
2257 expr: Some(Box::new(serialize_physical_expr(
2258 &expr.expr,
2259 extension_codec,
2260 )?)),
2261 asc: !expr.options.descending,
2262 nulls_first: expr.options.nulls_first,
2263 })
2264 })
2265 .collect::<Result<Vec<_>>>()
2266 })
2267 .transpose()?
2268 .unwrap_or(vec![]);
2269
2270 Ok(protobuf::PhysicalPlanNode {
2271 physical_plan_type: Some(PhysicalPlanType::SymmetricHashJoin(Box::new(
2272 protobuf::SymmetricHashJoinExecNode {
2273 left: Some(Box::new(left)),
2274 right: Some(Box::new(right)),
2275 on,
2276 join_type: join_type.into(),
2277 partition_mode: partition_mode.into(),
2278 null_equality: null_equality.into(),
2279 left_sort_exprs,
2280 right_sort_exprs,
2281 filter,
2282 },
2283 ))),
2284 })
2285 }
2286
2287 fn try_from_sort_merge_join_exec(
2288 exec: &SortMergeJoinExec,
2289 extension_codec: &dyn PhysicalExtensionCodec,
2290 ) -> Result<Self> {
2291 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2292 exec.left().to_owned(),
2293 extension_codec,
2294 )?;
2295 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2296 exec.right().to_owned(),
2297 extension_codec,
2298 )?;
2299 let on = exec
2300 .on()
2301 .iter()
2302 .map(|tuple| {
2303 let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2304 let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2305 Ok::<_, DataFusionError>(protobuf::JoinOn {
2306 left: Some(l),
2307 right: Some(r),
2308 })
2309 })
2310 .collect::<Result<_>>()?;
2311 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2312 let null_equality: protobuf::NullEquality = exec.null_equality().into();
2313 let filter = exec
2314 .filter()
2315 .as_ref()
2316 .map(|f| {
2317 let expression =
2318 serialize_physical_expr(f.expression(), extension_codec)?;
2319 let column_indices = f
2320 .column_indices()
2321 .iter()
2322 .map(|i| {
2323 let side: protobuf::JoinSide = i.side.to_owned().into();
2324 protobuf::ColumnIndex {
2325 index: i.index as u32,
2326 side: side.into(),
2327 }
2328 })
2329 .collect();
2330 let schema = f.schema().as_ref().try_into()?;
2331 Ok(protobuf::JoinFilter {
2332 expression: Some(expression),
2333 column_indices,
2334 schema: Some(schema),
2335 })
2336 })
2337 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2338
2339 let sort_options = exec
2340 .sort_options()
2341 .iter()
2342 .map(
2343 |SortOptions {
2344 descending,
2345 nulls_first,
2346 }| {
2347 SortExprNode {
2348 expr: None,
2349 asc: !*descending,
2350 nulls_first: *nulls_first,
2351 }
2352 },
2353 )
2354 .collect();
2355
2356 Ok(protobuf::PhysicalPlanNode {
2357 physical_plan_type: Some(PhysicalPlanType::SortMergeJoin(Box::new(
2358 protobuf::SortMergeJoinExecNode {
2359 left: Some(Box::new(left)),
2360 right: Some(Box::new(right)),
2361 on,
2362 join_type: join_type.into(),
2363 null_equality: null_equality.into(),
2364 filter,
2365 sort_options,
2366 },
2367 ))),
2368 })
2369 }
2370
2371 fn try_from_cross_join_exec(
2372 exec: &CrossJoinExec,
2373 extension_codec: &dyn PhysicalExtensionCodec,
2374 ) -> Result<Self> {
2375 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2376 exec.left().to_owned(),
2377 extension_codec,
2378 )?;
2379 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2380 exec.right().to_owned(),
2381 extension_codec,
2382 )?;
2383 Ok(protobuf::PhysicalPlanNode {
2384 physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
2385 protobuf::CrossJoinExecNode {
2386 left: Some(Box::new(left)),
2387 right: Some(Box::new(right)),
2388 },
2389 ))),
2390 })
2391 }
2392
2393 fn try_from_aggregate_exec(
2394 exec: &AggregateExec,
2395 extension_codec: &dyn PhysicalExtensionCodec,
2396 ) -> Result<Self> {
2397 let groups: Vec<bool> = exec
2398 .group_expr()
2399 .groups()
2400 .iter()
2401 .flatten()
2402 .copied()
2403 .collect();
2404
2405 let group_names = exec
2406 .group_expr()
2407 .expr()
2408 .iter()
2409 .map(|expr| expr.1.to_owned())
2410 .collect();
2411
2412 let filter = exec
2413 .filter_expr()
2414 .iter()
2415 .map(|expr| serialize_maybe_filter(expr.to_owned(), extension_codec))
2416 .collect::<Result<Vec<_>>>()?;
2417
2418 let agg = exec
2419 .aggr_expr()
2420 .iter()
2421 .map(|expr| serialize_physical_aggr_expr(expr.to_owned(), extension_codec))
2422 .collect::<Result<Vec<_>>>()?;
2423
2424 let agg_names = exec
2425 .aggr_expr()
2426 .iter()
2427 .map(|expr| expr.name().to_string())
2428 .collect::<Vec<_>>();
2429
2430 let agg_mode = match exec.mode() {
2431 AggregateMode::Partial => protobuf::AggregateMode::Partial,
2432 AggregateMode::Final => protobuf::AggregateMode::Final,
2433 AggregateMode::FinalPartitioned => protobuf::AggregateMode::FinalPartitioned,
2434 AggregateMode::Single => protobuf::AggregateMode::Single,
2435 AggregateMode::SinglePartitioned => {
2436 protobuf::AggregateMode::SinglePartitioned
2437 }
2438 };
2439 let input_schema = exec.input_schema();
2440 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2441 exec.input().to_owned(),
2442 extension_codec,
2443 )?;
2444
2445 let null_expr = exec
2446 .group_expr()
2447 .null_expr()
2448 .iter()
2449 .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2450 .collect::<Result<Vec<_>>>()?;
2451
2452 let group_expr = exec
2453 .group_expr()
2454 .expr()
2455 .iter()
2456 .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2457 .collect::<Result<Vec<_>>>()?;
2458
2459 let limit = exec.limit().map(|value| protobuf::AggLimit {
2460 limit: value as u64,
2461 });
2462
2463 Ok(protobuf::PhysicalPlanNode {
2464 physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
2465 protobuf::AggregateExecNode {
2466 group_expr,
2467 group_expr_name: group_names,
2468 aggr_expr: agg,
2469 filter_expr: filter,
2470 aggr_expr_name: agg_names,
2471 mode: agg_mode as i32,
2472 input: Some(Box::new(input)),
2473 input_schema: Some(input_schema.as_ref().try_into()?),
2474 null_expr,
2475 groups,
2476 limit,
2477 },
2478 ))),
2479 })
2480 }
2481
2482 fn try_from_empty_exec(
2483 empty: &EmptyExec,
2484 _extension_codec: &dyn PhysicalExtensionCodec,
2485 ) -> Result<Self> {
2486 let schema = empty.schema().as_ref().try_into()?;
2487 Ok(protobuf::PhysicalPlanNode {
2488 physical_plan_type: Some(PhysicalPlanType::Empty(protobuf::EmptyExecNode {
2489 schema: Some(schema),
2490 })),
2491 })
2492 }
2493
2494 fn try_from_placeholder_row_exec(
2495 empty: &PlaceholderRowExec,
2496 _extension_codec: &dyn PhysicalExtensionCodec,
2497 ) -> Result<Self> {
2498 let schema = empty.schema().as_ref().try_into()?;
2499 Ok(protobuf::PhysicalPlanNode {
2500 physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
2501 protobuf::PlaceholderRowExecNode {
2502 schema: Some(schema),
2503 },
2504 )),
2505 })
2506 }
2507
2508 fn try_from_coalesce_batches_exec(
2509 coalesce_batches: &CoalesceBatchesExec,
2510 extension_codec: &dyn PhysicalExtensionCodec,
2511 ) -> Result<Self> {
2512 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2513 coalesce_batches.input().to_owned(),
2514 extension_codec,
2515 )?;
2516 Ok(protobuf::PhysicalPlanNode {
2517 physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
2518 protobuf::CoalesceBatchesExecNode {
2519 input: Some(Box::new(input)),
2520 target_batch_size: coalesce_batches.target_batch_size() as u32,
2521 fetch: coalesce_batches.fetch().map(|n| n as u32),
2522 },
2523 ))),
2524 })
2525 }
2526
2527 fn try_from_data_source_exec(
2528 data_source_exec: &DataSourceExec,
2529 extension_codec: &dyn PhysicalExtensionCodec,
2530 ) -> Result<Option<Self>> {
2531 let data_source = data_source_exec.data_source();
2532 if let Some(maybe_csv) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2533 let source = maybe_csv.file_source();
2534 if let Some(csv_config) = source.as_any().downcast_ref::<CsvSource>() {
2535 return Ok(Some(protobuf::PhysicalPlanNode {
2536 physical_plan_type: Some(PhysicalPlanType::CsvScan(
2537 protobuf::CsvScanExecNode {
2538 base_conf: Some(serialize_file_scan_config(
2539 maybe_csv,
2540 extension_codec,
2541 )?),
2542 has_header: csv_config.has_header(),
2543 delimiter: byte_to_string(
2544 csv_config.delimiter(),
2545 "delimiter",
2546 )?,
2547 quote: byte_to_string(csv_config.quote(), "quote")?,
2548 optional_escape: if let Some(escape) = csv_config.escape() {
2549 Some(
2550 protobuf::csv_scan_exec_node::OptionalEscape::Escape(
2551 byte_to_string(escape, "escape")?,
2552 ),
2553 )
2554 } else {
2555 None
2556 },
2557 optional_comment: if let Some(comment) = csv_config.comment()
2558 {
2559 Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
2560 byte_to_string(comment, "comment")?,
2561 ))
2562 } else {
2563 None
2564 },
2565 newlines_in_values: maybe_csv.newlines_in_values(),
2566 truncate_rows: csv_config.truncate_rows(),
2567 },
2568 )),
2569 }));
2570 }
2571 }
2572
2573 if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2574 let source = scan_conf.file_source();
2575 if let Some(_json_source) = source.as_any().downcast_ref::<JsonSource>() {
2576 return Ok(Some(protobuf::PhysicalPlanNode {
2577 physical_plan_type: Some(PhysicalPlanType::JsonScan(
2578 protobuf::JsonScanExecNode {
2579 base_conf: Some(serialize_file_scan_config(
2580 scan_conf,
2581 extension_codec,
2582 )?),
2583 },
2584 )),
2585 }));
2586 }
2587 }
2588
2589 #[cfg(feature = "parquet")]
2590 if let Some((maybe_parquet, conf)) =
2591 data_source_exec.downcast_to_file_source::<ParquetSource>()
2592 {
2593 let predicate = conf
2594 .filter()
2595 .map(|pred| serialize_physical_expr(&pred, extension_codec))
2596 .transpose()?;
2597 return Ok(Some(protobuf::PhysicalPlanNode {
2598 physical_plan_type: Some(PhysicalPlanType::ParquetScan(
2599 protobuf::ParquetScanExecNode {
2600 base_conf: Some(serialize_file_scan_config(
2601 maybe_parquet,
2602 extension_codec,
2603 )?),
2604 predicate,
2605 parquet_options: Some(conf.table_parquet_options().try_into()?),
2606 },
2607 )),
2608 }));
2609 }
2610
2611 #[cfg(feature = "avro")]
2612 if let Some(maybe_avro) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2613 let source = maybe_avro.file_source();
2614 if source.as_any().downcast_ref::<AvroSource>().is_some() {
2615 return Ok(Some(protobuf::PhysicalPlanNode {
2616 physical_plan_type: Some(PhysicalPlanType::AvroScan(
2617 protobuf::AvroScanExecNode {
2618 base_conf: Some(serialize_file_scan_config(
2619 maybe_avro,
2620 extension_codec,
2621 )?),
2622 },
2623 )),
2624 }));
2625 }
2626 }
2627
2628 if let Some(source_conf) =
2629 data_source.as_any().downcast_ref::<MemorySourceConfig>()
2630 {
2631 let proto_partitions = source_conf
2632 .partitions()
2633 .iter()
2634 .map(|p| serialize_record_batches(p))
2635 .collect::<Result<Vec<_>>>()?;
2636
2637 let proto_schema: protobuf::Schema =
2638 source_conf.original_schema().as_ref().try_into()?;
2639
2640 let proto_projection = source_conf
2641 .projection()
2642 .as_ref()
2643 .map_or_else(Vec::new, |v| {
2644 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2645 });
2646
2647 let proto_sort_information = source_conf
2648 .sort_information()
2649 .iter()
2650 .map(|ordering| {
2651 let sort_exprs = serialize_physical_sort_exprs(
2652 ordering.to_owned(),
2653 extension_codec,
2654 )?;
2655 Ok::<_, DataFusionError>(protobuf::PhysicalSortExprNodeCollection {
2656 physical_sort_expr_nodes: sort_exprs,
2657 })
2658 })
2659 .collect::<Result<Vec<_>, _>>()?;
2660
2661 return Ok(Some(protobuf::PhysicalPlanNode {
2662 physical_plan_type: Some(PhysicalPlanType::MemoryScan(
2663 protobuf::MemoryScanExecNode {
2664 partitions: proto_partitions,
2665 schema: Some(proto_schema),
2666 projection: proto_projection,
2667 sort_information: proto_sort_information,
2668 show_sizes: source_conf.show_sizes(),
2669 fetch: source_conf.fetch().map(|f| f as u32),
2670 },
2671 )),
2672 }));
2673 }
2674
2675 Ok(None)
2676 }
2677
2678 fn try_from_coalesce_partitions_exec(
2679 exec: &CoalescePartitionsExec,
2680 extension_codec: &dyn PhysicalExtensionCodec,
2681 ) -> Result<Self> {
2682 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2683 exec.input().to_owned(),
2684 extension_codec,
2685 )?;
2686 Ok(protobuf::PhysicalPlanNode {
2687 physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
2688 protobuf::CoalescePartitionsExecNode {
2689 input: Some(Box::new(input)),
2690 fetch: exec.fetch().map(|f| f as u32),
2691 },
2692 ))),
2693 })
2694 }
2695
2696 fn try_from_repartition_exec(
2697 exec: &RepartitionExec,
2698 extension_codec: &dyn PhysicalExtensionCodec,
2699 ) -> Result<Self> {
2700 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2701 exec.input().to_owned(),
2702 extension_codec,
2703 )?;
2704
2705 let pb_partitioning =
2706 serialize_partitioning(exec.partitioning(), extension_codec)?;
2707
2708 Ok(protobuf::PhysicalPlanNode {
2709 physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
2710 protobuf::RepartitionExecNode {
2711 input: Some(Box::new(input)),
2712 partitioning: Some(pb_partitioning),
2713 },
2714 ))),
2715 })
2716 }
2717
2718 fn try_from_sort_exec(
2719 exec: &SortExec,
2720 extension_codec: &dyn PhysicalExtensionCodec,
2721 ) -> Result<Self> {
2722 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2723 exec.input().to_owned(),
2724 extension_codec,
2725 )?;
2726 let expr = exec
2727 .expr()
2728 .iter()
2729 .map(|expr| {
2730 let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2731 expr: Some(Box::new(serialize_physical_expr(
2732 &expr.expr,
2733 extension_codec,
2734 )?)),
2735 asc: !expr.options.descending,
2736 nulls_first: expr.options.nulls_first,
2737 });
2738 Ok(protobuf::PhysicalExprNode {
2739 expr_type: Some(ExprType::Sort(sort_expr)),
2740 })
2741 })
2742 .collect::<Result<Vec<_>>>()?;
2743 Ok(protobuf::PhysicalPlanNode {
2744 physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
2745 protobuf::SortExecNode {
2746 input: Some(Box::new(input)),
2747 expr,
2748 fetch: match exec.fetch() {
2749 Some(n) => n as i64,
2750 _ => -1,
2751 },
2752 preserve_partitioning: exec.preserve_partitioning(),
2753 },
2754 ))),
2755 })
2756 }
2757
2758 fn try_from_union_exec(
2759 union: &UnionExec,
2760 extension_codec: &dyn PhysicalExtensionCodec,
2761 ) -> Result<Self> {
2762 let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2763 for input in union.inputs() {
2764 inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2765 input.to_owned(),
2766 extension_codec,
2767 )?);
2768 }
2769 Ok(protobuf::PhysicalPlanNode {
2770 physical_plan_type: Some(PhysicalPlanType::Union(protobuf::UnionExecNode {
2771 inputs,
2772 })),
2773 })
2774 }
2775
2776 fn try_from_interleave_exec(
2777 interleave: &InterleaveExec,
2778 extension_codec: &dyn PhysicalExtensionCodec,
2779 ) -> Result<Self> {
2780 let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2781 for input in interleave.inputs() {
2782 inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2783 input.to_owned(),
2784 extension_codec,
2785 )?);
2786 }
2787 Ok(protobuf::PhysicalPlanNode {
2788 physical_plan_type: Some(PhysicalPlanType::Interleave(
2789 protobuf::InterleaveExecNode { inputs },
2790 )),
2791 })
2792 }
2793
2794 fn try_from_sort_preserving_merge_exec(
2795 exec: &SortPreservingMergeExec,
2796 extension_codec: &dyn PhysicalExtensionCodec,
2797 ) -> Result<Self> {
2798 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2799 exec.input().to_owned(),
2800 extension_codec,
2801 )?;
2802 let expr = exec
2803 .expr()
2804 .iter()
2805 .map(|expr| {
2806 let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2807 expr: Some(Box::new(serialize_physical_expr(
2808 &expr.expr,
2809 extension_codec,
2810 )?)),
2811 asc: !expr.options.descending,
2812 nulls_first: expr.options.nulls_first,
2813 });
2814 Ok(protobuf::PhysicalExprNode {
2815 expr_type: Some(ExprType::Sort(sort_expr)),
2816 })
2817 })
2818 .collect::<Result<Vec<_>>>()?;
2819 Ok(protobuf::PhysicalPlanNode {
2820 physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(Box::new(
2821 protobuf::SortPreservingMergeExecNode {
2822 input: Some(Box::new(input)),
2823 expr,
2824 fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
2825 },
2826 ))),
2827 })
2828 }
2829
2830 fn try_from_nested_loop_join_exec(
2831 exec: &NestedLoopJoinExec,
2832 extension_codec: &dyn PhysicalExtensionCodec,
2833 ) -> Result<Self> {
2834 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2835 exec.left().to_owned(),
2836 extension_codec,
2837 )?;
2838 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2839 exec.right().to_owned(),
2840 extension_codec,
2841 )?;
2842
2843 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2844 let filter = exec
2845 .filter()
2846 .as_ref()
2847 .map(|f| {
2848 let expression =
2849 serialize_physical_expr(f.expression(), extension_codec)?;
2850 let column_indices = f
2851 .column_indices()
2852 .iter()
2853 .map(|i| {
2854 let side: protobuf::JoinSide = i.side.to_owned().into();
2855 protobuf::ColumnIndex {
2856 index: i.index as u32,
2857 side: side.into(),
2858 }
2859 })
2860 .collect();
2861 let schema = f.schema().as_ref().try_into()?;
2862 Ok(protobuf::JoinFilter {
2863 expression: Some(expression),
2864 column_indices,
2865 schema: Some(schema),
2866 })
2867 })
2868 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2869
2870 Ok(protobuf::PhysicalPlanNode {
2871 physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
2872 protobuf::NestedLoopJoinExecNode {
2873 left: Some(Box::new(left)),
2874 right: Some(Box::new(right)),
2875 join_type: join_type.into(),
2876 filter,
2877 projection: exec.projection().map_or_else(Vec::new, |v| {
2878 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2879 }),
2880 },
2881 ))),
2882 })
2883 }
2884
2885 fn try_from_window_agg_exec(
2886 exec: &WindowAggExec,
2887 extension_codec: &dyn PhysicalExtensionCodec,
2888 ) -> Result<Self> {
2889 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2890 exec.input().to_owned(),
2891 extension_codec,
2892 )?;
2893
2894 let window_expr = exec
2895 .window_expr()
2896 .iter()
2897 .map(|e| serialize_physical_window_expr(e, extension_codec))
2898 .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2899
2900 let partition_keys = exec
2901 .partition_keys()
2902 .iter()
2903 .map(|e| serialize_physical_expr(e, extension_codec))
2904 .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2905
2906 Ok(protobuf::PhysicalPlanNode {
2907 physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2908 protobuf::WindowAggExecNode {
2909 input: Some(Box::new(input)),
2910 window_expr,
2911 partition_keys,
2912 input_order_mode: None,
2913 },
2914 ))),
2915 })
2916 }
2917
2918 fn try_from_bounded_window_agg_exec(
2919 exec: &BoundedWindowAggExec,
2920 extension_codec: &dyn PhysicalExtensionCodec,
2921 ) -> Result<Self> {
2922 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2923 exec.input().to_owned(),
2924 extension_codec,
2925 )?;
2926
2927 let window_expr = exec
2928 .window_expr()
2929 .iter()
2930 .map(|e| serialize_physical_window_expr(e, extension_codec))
2931 .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2932
2933 let partition_keys = exec
2934 .partition_keys()
2935 .iter()
2936 .map(|e| serialize_physical_expr(e, extension_codec))
2937 .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2938
2939 let input_order_mode = match &exec.input_order_mode {
2940 InputOrderMode::Linear => {
2941 window_agg_exec_node::InputOrderMode::Linear(protobuf::EmptyMessage {})
2942 }
2943 InputOrderMode::PartiallySorted(columns) => {
2944 window_agg_exec_node::InputOrderMode::PartiallySorted(
2945 protobuf::PartiallySortedInputOrderMode {
2946 columns: columns.iter().map(|c| *c as u64).collect(),
2947 },
2948 )
2949 }
2950 InputOrderMode::Sorted => {
2951 window_agg_exec_node::InputOrderMode::Sorted(protobuf::EmptyMessage {})
2952 }
2953 };
2954
2955 Ok(protobuf::PhysicalPlanNode {
2956 physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2957 protobuf::WindowAggExecNode {
2958 input: Some(Box::new(input)),
2959 window_expr,
2960 partition_keys,
2961 input_order_mode: Some(input_order_mode),
2962 },
2963 ))),
2964 })
2965 }
2966
2967 fn try_from_data_sink_exec(
2968 exec: &DataSinkExec,
2969 extension_codec: &dyn PhysicalExtensionCodec,
2970 ) -> Result<Option<Self>> {
2971 let input: protobuf::PhysicalPlanNode =
2972 protobuf::PhysicalPlanNode::try_from_physical_plan(
2973 exec.input().to_owned(),
2974 extension_codec,
2975 )?;
2976 let sort_order = match exec.sort_order() {
2977 Some(requirements) => {
2978 let expr = requirements
2979 .iter()
2980 .map(|requirement| {
2981 let expr: PhysicalSortExpr = requirement.to_owned().into();
2982 let sort_expr = protobuf::PhysicalSortExprNode {
2983 expr: Some(Box::new(serialize_physical_expr(
2984 &expr.expr,
2985 extension_codec,
2986 )?)),
2987 asc: !expr.options.descending,
2988 nulls_first: expr.options.nulls_first,
2989 };
2990 Ok(sort_expr)
2991 })
2992 .collect::<Result<Vec<_>>>()?;
2993 Some(protobuf::PhysicalSortExprNodeCollection {
2994 physical_sort_expr_nodes: expr,
2995 })
2996 }
2997 None => None,
2998 };
2999
3000 if let Some(sink) = exec.sink().as_any().downcast_ref::<JsonSink>() {
3001 return Ok(Some(protobuf::PhysicalPlanNode {
3002 physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new(
3003 protobuf::JsonSinkExecNode {
3004 input: Some(Box::new(input)),
3005 sink: Some(sink.try_into()?),
3006 sink_schema: Some(exec.schema().as_ref().try_into()?),
3007 sort_order,
3008 },
3009 ))),
3010 }));
3011 }
3012
3013 if let Some(sink) = exec.sink().as_any().downcast_ref::<CsvSink>() {
3014 return Ok(Some(protobuf::PhysicalPlanNode {
3015 physical_plan_type: Some(PhysicalPlanType::CsvSink(Box::new(
3016 protobuf::CsvSinkExecNode {
3017 input: Some(Box::new(input)),
3018 sink: Some(sink.try_into()?),
3019 sink_schema: Some(exec.schema().as_ref().try_into()?),
3020 sort_order,
3021 },
3022 ))),
3023 }));
3024 }
3025
3026 #[cfg(feature = "parquet")]
3027 if let Some(sink) = exec.sink().as_any().downcast_ref::<ParquetSink>() {
3028 return Ok(Some(protobuf::PhysicalPlanNode {
3029 physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new(
3030 protobuf::ParquetSinkExecNode {
3031 input: Some(Box::new(input)),
3032 sink: Some(sink.try_into()?),
3033 sink_schema: Some(exec.schema().as_ref().try_into()?),
3034 sort_order,
3035 },
3036 ))),
3037 }));
3038 }
3039
3040 Ok(None)
3042 }
3043
3044 fn try_from_unnest_exec(
3045 exec: &UnnestExec,
3046 extension_codec: &dyn PhysicalExtensionCodec,
3047 ) -> Result<Self> {
3048 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
3049 exec.input().to_owned(),
3050 extension_codec,
3051 )?;
3052
3053 Ok(protobuf::PhysicalPlanNode {
3054 physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new(
3055 protobuf::UnnestExecNode {
3056 input: Some(Box::new(input)),
3057 schema: Some(exec.schema().try_into()?),
3058 list_type_columns: exec
3059 .list_column_indices()
3060 .iter()
3061 .map(|c| ProtoListUnnest {
3062 index_in_input_schema: c.index_in_input_schema as _,
3063 depth: c.depth as _,
3064 })
3065 .collect(),
3066 struct_type_columns: exec
3067 .struct_column_indices()
3068 .iter()
3069 .map(|c| *c as _)
3070 .collect(),
3071 options: Some(exec.options().into()),
3072 },
3073 ))),
3074 })
3075 }
3076
3077 fn try_from_cooperative_exec(
3078 exec: &CooperativeExec,
3079 extension_codec: &dyn PhysicalExtensionCodec,
3080 ) -> Result<Self> {
3081 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
3082 exec.input().to_owned(),
3083 extension_codec,
3084 )?;
3085
3086 Ok(protobuf::PhysicalPlanNode {
3087 physical_plan_type: Some(PhysicalPlanType::Cooperative(Box::new(
3088 protobuf::CooperativeExecNode {
3089 input: Some(Box::new(input)),
3090 },
3091 ))),
3092 })
3093 }
3094
3095 fn str_to_generate_series_name(name: &str) -> Result<protobuf::GenerateSeriesName> {
3096 match name {
3097 "generate_series" => Ok(protobuf::GenerateSeriesName::GsGenerateSeries),
3098 "range" => Ok(protobuf::GenerateSeriesName::GsRange),
3099 _ => internal_err!("unknown name: {name}"),
3100 }
3101 }
3102
3103 fn try_from_lazy_memory_exec(exec: &LazyMemoryExec) -> Result<Option<Self>> {
3104 let generators = exec.generators();
3105
3106 let [generator] = generators.as_slice() else {
3108 return Ok(None);
3109 };
3110
3111 let generator_guard = generator.read();
3112
3113 if let Some(empty_gen) = generator_guard.as_any().downcast_ref::<Empty>() {
3115 let schema = exec.schema();
3116 let node = protobuf::GenerateSeriesNode {
3117 schema: Some(schema.as_ref().try_into()?),
3118 target_batch_size: 8192, args: Some(protobuf::generate_series_node::Args::ContainsNull(
3120 protobuf::GenerateSeriesArgsContainsNull {
3121 name: Self::str_to_generate_series_name(empty_gen.name())? as i32,
3122 },
3123 )),
3124 };
3125
3126 return Ok(Some(protobuf::PhysicalPlanNode {
3127 physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3128 }));
3129 }
3130
3131 if let Some(int_64) = generator_guard
3132 .as_any()
3133 .downcast_ref::<GenericSeriesState<i64>>()
3134 {
3135 let schema = exec.schema();
3136 let node = protobuf::GenerateSeriesNode {
3137 schema: Some(schema.as_ref().try_into()?),
3138 target_batch_size: int_64.batch_size() as u32,
3139 args: Some(protobuf::generate_series_node::Args::Int64Args(
3140 protobuf::GenerateSeriesArgsInt64 {
3141 start: *int_64.start(),
3142 end: *int_64.end(),
3143 step: *int_64.step(),
3144 include_end: int_64.include_end(),
3145 name: Self::str_to_generate_series_name(int_64.name())? as i32,
3146 },
3147 )),
3148 };
3149
3150 return Ok(Some(protobuf::PhysicalPlanNode {
3151 physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3152 }));
3153 }
3154
3155 if let Some(timestamp_args) = generator_guard
3156 .as_any()
3157 .downcast_ref::<GenericSeriesState<TimestampValue>>()
3158 {
3159 let schema = exec.schema();
3160
3161 let start = timestamp_args.start().value();
3162 let end = timestamp_args.end().value();
3163
3164 let step_value = timestamp_args.step();
3165
3166 let step = Some(datafusion_proto_common::IntervalMonthDayNanoValue {
3167 months: step_value.months,
3168 days: step_value.days,
3169 nanos: step_value.nanoseconds,
3170 });
3171 let include_end = timestamp_args.include_end();
3172 let name = Self::str_to_generate_series_name(timestamp_args.name())? as i32;
3173
3174 let args = match timestamp_args.current().tz_str() {
3175 Some(tz) => protobuf::generate_series_node::Args::TimestampArgs(
3176 protobuf::GenerateSeriesArgsTimestamp {
3177 start,
3178 end,
3179 step,
3180 include_end,
3181 name,
3182 tz: Some(tz.to_string()),
3183 },
3184 ),
3185 None => protobuf::generate_series_node::Args::DateArgs(
3186 protobuf::GenerateSeriesArgsDate {
3187 start,
3188 end,
3189 step,
3190 include_end,
3191 name,
3192 },
3193 ),
3194 };
3195
3196 let node = protobuf::GenerateSeriesNode {
3197 schema: Some(schema.as_ref().try_into()?),
3198 target_batch_size: timestamp_args.batch_size() as u32,
3199 args: Some(args),
3200 };
3201
3202 return Ok(Some(protobuf::PhysicalPlanNode {
3203 physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3204 }));
3205 }
3206
3207 Ok(None)
3208 }
3209}
3210
3211pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
3212 fn try_decode(buf: &[u8]) -> Result<Self>
3213 where
3214 Self: Sized;
3215
3216 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
3217 where
3218 B: BufMut,
3219 Self: Sized;
3220
3221 fn try_into_physical_plan(
3222 &self,
3223 ctx: &TaskContext,
3224
3225 extension_codec: &dyn PhysicalExtensionCodec,
3226 ) -> Result<Arc<dyn ExecutionPlan>>;
3227
3228 fn try_from_physical_plan(
3229 plan: Arc<dyn ExecutionPlan>,
3230 extension_codec: &dyn PhysicalExtensionCodec,
3231 ) -> Result<Self>
3232 where
3233 Self: Sized;
3234}
3235
3236pub trait PhysicalExtensionCodec: Debug + Send + Sync {
3237 fn try_decode(
3238 &self,
3239 buf: &[u8],
3240 inputs: &[Arc<dyn ExecutionPlan>],
3241 ctx: &TaskContext,
3242 ) -> Result<Arc<dyn ExecutionPlan>>;
3243
3244 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
3245
3246 fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
3247 not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
3248 }
3249
3250 fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
3251 Ok(())
3252 }
3253
3254 fn try_decode_expr(
3255 &self,
3256 _buf: &[u8],
3257 _inputs: &[Arc<dyn PhysicalExpr>],
3258 ) -> Result<Arc<dyn PhysicalExpr>> {
3259 not_impl_err!("PhysicalExtensionCodec is not provided")
3260 }
3261
3262 fn try_encode_expr(
3263 &self,
3264 _node: &Arc<dyn PhysicalExpr>,
3265 _buf: &mut Vec<u8>,
3266 ) -> Result<()> {
3267 not_impl_err!("PhysicalExtensionCodec is not provided")
3268 }
3269
3270 fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
3271 not_impl_err!(
3272 "PhysicalExtensionCodec is not provided for aggregate function {name}"
3273 )
3274 }
3275
3276 fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
3277 Ok(())
3278 }
3279
3280 fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
3281 not_impl_err!("PhysicalExtensionCodec is not provided for window function {name}")
3282 }
3283
3284 fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
3285 Ok(())
3286 }
3287}
3288
3289#[derive(Debug)]
3290pub struct DefaultPhysicalExtensionCodec {}
3291
3292impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
3293 fn try_decode(
3294 &self,
3295 _buf: &[u8],
3296 _inputs: &[Arc<dyn ExecutionPlan>],
3297 _ctx: &TaskContext,
3298 ) -> Result<Arc<dyn ExecutionPlan>> {
3299 not_impl_err!("PhysicalExtensionCodec is not provided")
3300 }
3301
3302 fn try_encode(
3303 &self,
3304 _node: Arc<dyn ExecutionPlan>,
3305 _buf: &mut Vec<u8>,
3306 ) -> Result<()> {
3307 not_impl_err!("PhysicalExtensionCodec is not provided")
3308 }
3309}
3310
3311#[derive(Clone, PartialEq, prost::Message)]
3314struct DataEncoderTuple {
3315 #[prost(uint32, tag = 1)]
3318 pub encoder_position: u32,
3319
3320 #[prost(bytes, tag = 2)]
3321 pub blob: Vec<u8>,
3322}
3323
3324#[derive(Debug)]
3327pub struct ComposedPhysicalExtensionCodec {
3328 codecs: Vec<Arc<dyn PhysicalExtensionCodec>>,
3329}
3330
3331impl ComposedPhysicalExtensionCodec {
3332 pub fn new(codecs: Vec<Arc<dyn PhysicalExtensionCodec>>) -> Self {
3335 Self { codecs }
3336 }
3337
3338 fn decode_protobuf<R>(
3339 &self,
3340 buf: &[u8],
3341 decode: impl FnOnce(&dyn PhysicalExtensionCodec, &[u8]) -> Result<R>,
3342 ) -> Result<R> {
3343 let proto =
3344 DataEncoderTuple::decode(buf).map_err(|e| internal_datafusion_err!("{e}"))?;
3345
3346 let codec = self.codecs.get(proto.encoder_position as usize).ok_or(
3347 internal_datafusion_err!("Can't find required codec in codec list"),
3348 )?;
3349
3350 decode(codec.as_ref(), &proto.blob)
3351 }
3352
3353 fn encode_protobuf(
3354 &self,
3355 buf: &mut Vec<u8>,
3356 mut encode: impl FnMut(&dyn PhysicalExtensionCodec, &mut Vec<u8>) -> Result<()>,
3357 ) -> Result<()> {
3358 let mut data = vec![];
3359 let mut last_err = None;
3360 let mut encoder_position = None;
3361
3362 for (position, codec) in self.codecs.iter().enumerate() {
3364 match encode(codec.as_ref(), &mut data) {
3365 Ok(_) => {
3366 encoder_position = Some(position as u32);
3367 break;
3368 }
3369 Err(err) => last_err = Some(err),
3370 }
3371 }
3372
3373 let encoder_position = encoder_position.ok_or_else(|| {
3374 last_err.unwrap_or_else(|| {
3375 DataFusionError::NotImplemented(
3376 "Empty list of composed codecs".to_owned(),
3377 )
3378 })
3379 })?;
3380
3381 let proto = DataEncoderTuple {
3383 encoder_position,
3384 blob: data,
3385 };
3386 proto
3387 .encode(buf)
3388 .map_err(|e| internal_datafusion_err!("{e}"))
3389 }
3390}
3391
3392impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec {
3393 fn try_decode(
3394 &self,
3395 buf: &[u8],
3396 inputs: &[Arc<dyn ExecutionPlan>],
3397 ctx: &TaskContext,
3398 ) -> Result<Arc<dyn ExecutionPlan>> {
3399 self.decode_protobuf(buf, |codec, data| codec.try_decode(data, inputs, ctx))
3400 }
3401
3402 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
3403 self.encode_protobuf(buf, |codec, data| codec.try_encode(Arc::clone(&node), data))
3404 }
3405
3406 fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
3407 self.decode_protobuf(buf, |codec, data| codec.try_decode_udf(name, data))
3408 }
3409
3410 fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
3411 self.encode_protobuf(buf, |codec, data| codec.try_encode_udf(node, data))
3412 }
3413
3414 fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
3415 self.decode_protobuf(buf, |codec, data| codec.try_decode_udaf(name, data))
3416 }
3417
3418 fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
3419 self.encode_protobuf(buf, |codec, data| codec.try_encode_udaf(node, data))
3420 }
3421}
3422
3423fn into_physical_plan(
3424 node: &Option<Box<protobuf::PhysicalPlanNode>>,
3425 ctx: &TaskContext,
3426
3427 extension_codec: &dyn PhysicalExtensionCodec,
3428) -> Result<Arc<dyn ExecutionPlan>> {
3429 if let Some(field) = node {
3430 field.try_into_physical_plan(ctx, extension_codec)
3431 } else {
3432 Err(proto_error("Missing required field in protobuf"))
3433 }
3434}