1use std::fmt::Debug;
19use std::sync::Arc;
20
21use self::from_proto::parse_protobuf_partitioning;
22use self::to_proto::{serialize_partitioning, serialize_physical_expr};
23use crate::common::{byte_to_string, str_to_byte};
24use crate::physical_plan::from_proto::{
25 parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
26 parse_physical_window_expr, parse_protobuf_file_scan_config,
27 parse_protobuf_file_scan_schema, parse_record_batches,
28};
29use crate::physical_plan::to_proto::{
30 serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr,
31 serialize_physical_sort_exprs, serialize_physical_window_expr,
32 serialize_record_batches,
33};
34use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
35use crate::protobuf::physical_expr_node::ExprType;
36use crate::protobuf::physical_plan_node::PhysicalPlanType;
37use crate::protobuf::{
38 self, proto_error, window_agg_exec_node, ListUnnest as ProtoListUnnest, SortExprNode,
39 SortMergeJoinExecNode,
40};
41use crate::{convert_required, into_required};
42
43use datafusion::arrow::compute::SortOptions;
44use datafusion::arrow::datatypes::{IntervalMonthDayNanoType, Schema, SchemaRef};
45use datafusion::catalog::memory::MemorySourceConfig;
46use datafusion::datasource::file_format::csv::CsvSink;
47use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
48use datafusion::datasource::file_format::json::JsonSink;
49#[cfg(feature = "parquet")]
50use datafusion::datasource::file_format::parquet::ParquetSink;
51#[cfg(feature = "avro")]
52use datafusion::datasource::physical_plan::AvroSource;
53#[cfg(feature = "parquet")]
54use datafusion::datasource::physical_plan::ParquetSource;
55use datafusion::datasource::physical_plan::{
56 CsvSource, FileScanConfig, FileScanConfigBuilder, JsonSource,
57};
58use datafusion::datasource::sink::DataSinkExec;
59use datafusion::datasource::source::{DataSource, DataSourceExec};
60use datafusion::execution::runtime_env::RuntimeEnv;
61use datafusion::execution::FunctionRegistry;
62use datafusion::functions_table::generate_series::{
63 Empty, GenSeriesArgs, GenerateSeriesTable, GenericSeriesState, TimestampValue,
64};
65use datafusion::physical_expr::aggregate::AggregateExprBuilder;
66use datafusion::physical_expr::aggregate::AggregateFunctionExpr;
67use datafusion::physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
68use datafusion::physical_plan::aggregates::AggregateMode;
69use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
70use datafusion::physical_plan::analyze::AnalyzeExec;
71use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
72use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
73use datafusion::physical_plan::coop::CooperativeExec;
74use datafusion::physical_plan::empty::EmptyExec;
75use datafusion::physical_plan::explain::ExplainExec;
76use datafusion::physical_plan::expressions::PhysicalSortExpr;
77use datafusion::physical_plan::filter::FilterExec;
78use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
79use datafusion::physical_plan::joins::{
80 CrossJoinExec, NestedLoopJoinExec, SortMergeJoinExec, StreamJoinPartitionMode,
81 SymmetricHashJoinExec,
82};
83use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
84use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
85use datafusion::physical_plan::memory::LazyMemoryExec;
86use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
87use datafusion::physical_plan::projection::{ProjectionExec, ProjectionExpr};
88use datafusion::physical_plan::repartition::RepartitionExec;
89use datafusion::physical_plan::sorts::sort::SortExec;
90use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
91use datafusion::physical_plan::union::{InterleaveExec, UnionExec};
92use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
93use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
94use datafusion::physical_plan::{
95 ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr,
96};
97use datafusion_common::config::TableParquetOptions;
98use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
99use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
100
101use datafusion::prelude::SessionContext;
102use prost::bytes::BufMut;
103use prost::Message;
104
105pub mod from_proto;
106pub mod to_proto;
107
108impl AsExecutionPlan for protobuf::PhysicalPlanNode {
109 fn try_decode(buf: &[u8]) -> Result<Self>
110 where
111 Self: Sized,
112 {
113 protobuf::PhysicalPlanNode::decode(buf).map_err(|e| {
114 DataFusionError::Internal(format!("failed to decode physical plan: {e:?}"))
115 })
116 }
117
118 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
119 where
120 B: BufMut,
121 Self: Sized,
122 {
123 self.encode(buf).map_err(|e| {
124 DataFusionError::Internal(format!("failed to encode physical plan: {e:?}"))
125 })
126 }
127
128 fn try_into_physical_plan(
129 &self,
130 ctx: &SessionContext,
131 runtime: &RuntimeEnv,
132 extension_codec: &dyn PhysicalExtensionCodec,
133 ) -> Result<Arc<dyn ExecutionPlan>> {
134 let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
135 proto_error(format!(
136 "physical_plan::from_proto() Unsupported physical plan '{self:?}'"
137 ))
138 })?;
139 match plan {
140 PhysicalPlanType::Explain(explain) => self.try_into_explain_physical_plan(
141 explain,
142 ctx,
143 runtime,
144 extension_codec,
145 ),
146 PhysicalPlanType::Projection(projection) => self
147 .try_into_projection_physical_plan(
148 projection,
149 ctx,
150 runtime,
151 extension_codec,
152 ),
153 PhysicalPlanType::Filter(filter) => {
154 self.try_into_filter_physical_plan(filter, ctx, runtime, extension_codec)
155 }
156 PhysicalPlanType::CsvScan(scan) => {
157 self.try_into_csv_scan_physical_plan(scan, ctx, runtime, extension_codec)
158 }
159 PhysicalPlanType::JsonScan(scan) => {
160 self.try_into_json_scan_physical_plan(scan, ctx, runtime, extension_codec)
161 }
162 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
163 PhysicalPlanType::ParquetScan(scan) => self
164 .try_into_parquet_scan_physical_plan(scan, ctx, runtime, extension_codec),
165 #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
166 PhysicalPlanType::AvroScan(scan) => {
167 self.try_into_avro_scan_physical_plan(scan, ctx, runtime, extension_codec)
168 }
169 PhysicalPlanType::MemoryScan(scan) => self
170 .try_into_memory_scan_physical_plan(scan, ctx, runtime, extension_codec),
171 PhysicalPlanType::CoalesceBatches(coalesce_batches) => self
172 .try_into_coalesce_batches_physical_plan(
173 coalesce_batches,
174 ctx,
175 runtime,
176 extension_codec,
177 ),
178 PhysicalPlanType::Merge(merge) => {
179 self.try_into_merge_physical_plan(merge, ctx, runtime, extension_codec)
180 }
181 PhysicalPlanType::Repartition(repart) => self
182 .try_into_repartition_physical_plan(
183 repart,
184 ctx,
185 runtime,
186 extension_codec,
187 ),
188 PhysicalPlanType::GlobalLimit(limit) => self
189 .try_into_global_limit_physical_plan(
190 limit,
191 ctx,
192 runtime,
193 extension_codec,
194 ),
195 PhysicalPlanType::LocalLimit(limit) => self
196 .try_into_local_limit_physical_plan(limit, ctx, runtime, extension_codec),
197 PhysicalPlanType::Window(window_agg) => self.try_into_window_physical_plan(
198 window_agg,
199 ctx,
200 runtime,
201 extension_codec,
202 ),
203 PhysicalPlanType::Aggregate(hash_agg) => self
204 .try_into_aggregate_physical_plan(
205 hash_agg,
206 ctx,
207 runtime,
208 extension_codec,
209 ),
210 PhysicalPlanType::HashJoin(hashjoin) => self
211 .try_into_hash_join_physical_plan(
212 hashjoin,
213 ctx,
214 runtime,
215 extension_codec,
216 ),
217 PhysicalPlanType::SymmetricHashJoin(sym_join) => self
218 .try_into_symmetric_hash_join_physical_plan(
219 sym_join,
220 ctx,
221 runtime,
222 extension_codec,
223 ),
224 PhysicalPlanType::Union(union) => {
225 self.try_into_union_physical_plan(union, ctx, runtime, extension_codec)
226 }
227 PhysicalPlanType::Interleave(interleave) => self
228 .try_into_interleave_physical_plan(
229 interleave,
230 ctx,
231 runtime,
232 extension_codec,
233 ),
234 PhysicalPlanType::CrossJoin(crossjoin) => self
235 .try_into_cross_join_physical_plan(
236 crossjoin,
237 ctx,
238 runtime,
239 extension_codec,
240 ),
241 PhysicalPlanType::Empty(empty) => {
242 self.try_into_empty_physical_plan(empty, ctx, runtime, extension_codec)
243 }
244 PhysicalPlanType::PlaceholderRow(placeholder) => self
245 .try_into_placeholder_row_physical_plan(
246 placeholder,
247 ctx,
248 runtime,
249 extension_codec,
250 ),
251 PhysicalPlanType::Sort(sort) => {
252 self.try_into_sort_physical_plan(sort, ctx, runtime, extension_codec)
253 }
254 PhysicalPlanType::SortPreservingMerge(sort) => self
255 .try_into_sort_preserving_merge_physical_plan(
256 sort,
257 ctx,
258 runtime,
259 extension_codec,
260 ),
261 PhysicalPlanType::Extension(extension) => self
262 .try_into_extension_physical_plan(
263 extension,
264 ctx,
265 runtime,
266 extension_codec,
267 ),
268 PhysicalPlanType::NestedLoopJoin(join) => self
269 .try_into_nested_loop_join_physical_plan(
270 join,
271 ctx,
272 runtime,
273 extension_codec,
274 ),
275 PhysicalPlanType::Analyze(analyze) => self.try_into_analyze_physical_plan(
276 analyze,
277 ctx,
278 runtime,
279 extension_codec,
280 ),
281 PhysicalPlanType::JsonSink(sink) => {
282 self.try_into_json_sink_physical_plan(sink, ctx, runtime, extension_codec)
283 }
284 PhysicalPlanType::CsvSink(sink) => {
285 self.try_into_csv_sink_physical_plan(sink, ctx, runtime, extension_codec)
286 }
287 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
288 PhysicalPlanType::ParquetSink(sink) => self
289 .try_into_parquet_sink_physical_plan(sink, ctx, runtime, extension_codec),
290 PhysicalPlanType::Unnest(unnest) => {
291 self.try_into_unnest_physical_plan(unnest, ctx, runtime, extension_codec)
292 }
293 PhysicalPlanType::Cooperative(cooperative) => self
294 .try_into_cooperative_physical_plan(
295 cooperative,
296 ctx,
297 runtime,
298 extension_codec,
299 ),
300 PhysicalPlanType::GenerateSeries(generate_series) => {
301 self.try_into_generate_series_physical_plan(generate_series)
302 }
303 PhysicalPlanType::SortMergeJoin(sort_join) => {
304 self.try_into_sort_join(sort_join, ctx, runtime, extension_codec)
305 }
306 }
307 }
308
309 fn try_from_physical_plan(
310 plan: Arc<dyn ExecutionPlan>,
311 extension_codec: &dyn PhysicalExtensionCodec,
312 ) -> Result<Self>
313 where
314 Self: Sized,
315 {
316 let plan_clone = Arc::clone(&plan);
317 let plan = plan.as_any();
318
319 if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
320 return protobuf::PhysicalPlanNode::try_from_explain_exec(
321 exec,
322 extension_codec,
323 );
324 }
325
326 if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
327 return protobuf::PhysicalPlanNode::try_from_projection_exec(
328 exec,
329 extension_codec,
330 );
331 }
332
333 if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
334 return protobuf::PhysicalPlanNode::try_from_analyze_exec(
335 exec,
336 extension_codec,
337 );
338 }
339
340 if let Some(exec) = plan.downcast_ref::<FilterExec>() {
341 return protobuf::PhysicalPlanNode::try_from_filter_exec(
342 exec,
343 extension_codec,
344 );
345 }
346
347 if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
348 return protobuf::PhysicalPlanNode::try_from_global_limit_exec(
349 limit,
350 extension_codec,
351 );
352 }
353
354 if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
355 return protobuf::PhysicalPlanNode::try_from_local_limit_exec(
356 limit,
357 extension_codec,
358 );
359 }
360
361 if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
362 return protobuf::PhysicalPlanNode::try_from_hash_join_exec(
363 exec,
364 extension_codec,
365 );
366 }
367
368 if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
369 return protobuf::PhysicalPlanNode::try_from_symmetric_hash_join_exec(
370 exec,
371 extension_codec,
372 );
373 }
374
375 if let Some(exec) = plan.downcast_ref::<SortMergeJoinExec>() {
376 return protobuf::PhysicalPlanNode::try_from_sort_merge_join_exec(
377 exec,
378 extension_codec,
379 );
380 }
381
382 if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
383 return protobuf::PhysicalPlanNode::try_from_cross_join_exec(
384 exec,
385 extension_codec,
386 );
387 }
388
389 if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
390 return protobuf::PhysicalPlanNode::try_from_aggregate_exec(
391 exec,
392 extension_codec,
393 );
394 }
395
396 if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
397 return protobuf::PhysicalPlanNode::try_from_empty_exec(
398 empty,
399 extension_codec,
400 );
401 }
402
403 if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
404 return protobuf::PhysicalPlanNode::try_from_placeholder_row_exec(
405 empty,
406 extension_codec,
407 );
408 }
409
410 if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
411 return protobuf::PhysicalPlanNode::try_from_coalesce_batches_exec(
412 coalesce_batches,
413 extension_codec,
414 );
415 }
416
417 if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
418 if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec(
419 data_source_exec,
420 extension_codec,
421 )? {
422 return Ok(node);
423 }
424 }
425
426 if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
427 return protobuf::PhysicalPlanNode::try_from_coalesce_partitions_exec(
428 exec,
429 extension_codec,
430 );
431 }
432
433 if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
434 return protobuf::PhysicalPlanNode::try_from_repartition_exec(
435 exec,
436 extension_codec,
437 );
438 }
439
440 if let Some(exec) = plan.downcast_ref::<SortExec>() {
441 return protobuf::PhysicalPlanNode::try_from_sort_exec(exec, extension_codec);
442 }
443
444 if let Some(union) = plan.downcast_ref::<UnionExec>() {
445 return protobuf::PhysicalPlanNode::try_from_union_exec(
446 union,
447 extension_codec,
448 );
449 }
450
451 if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
452 return protobuf::PhysicalPlanNode::try_from_interleave_exec(
453 interleave,
454 extension_codec,
455 );
456 }
457
458 if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
459 return protobuf::PhysicalPlanNode::try_from_sort_preserving_merge_exec(
460 exec,
461 extension_codec,
462 );
463 }
464
465 if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
466 return protobuf::PhysicalPlanNode::try_from_nested_loop_join_exec(
467 exec,
468 extension_codec,
469 );
470 }
471
472 if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
473 return protobuf::PhysicalPlanNode::try_from_window_agg_exec(
474 exec,
475 extension_codec,
476 );
477 }
478
479 if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
480 return protobuf::PhysicalPlanNode::try_from_bounded_window_agg_exec(
481 exec,
482 extension_codec,
483 );
484 }
485
486 if let Some(exec) = plan.downcast_ref::<DataSinkExec>() {
487 if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec(
488 exec,
489 extension_codec,
490 )? {
491 return Ok(node);
492 }
493 }
494
495 if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
496 return protobuf::PhysicalPlanNode::try_from_unnest_exec(
497 exec,
498 extension_codec,
499 );
500 }
501
502 if let Some(exec) = plan.downcast_ref::<CooperativeExec>() {
503 return protobuf::PhysicalPlanNode::try_from_cooperative_exec(
504 exec,
505 extension_codec,
506 );
507 }
508
509 if let Some(exec) = plan.downcast_ref::<LazyMemoryExec>() {
510 if let Some(node) =
511 protobuf::PhysicalPlanNode::try_from_lazy_memory_exec(exec)?
512 {
513 return Ok(node);
514 }
515 }
516
517 let mut buf: Vec<u8> = vec![];
518 match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
519 Ok(_) => {
520 let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
521 .children()
522 .into_iter()
523 .cloned()
524 .map(|i| {
525 protobuf::PhysicalPlanNode::try_from_physical_plan(
526 i,
527 extension_codec,
528 )
529 })
530 .collect::<Result<_>>()?;
531
532 Ok(protobuf::PhysicalPlanNode {
533 physical_plan_type: Some(PhysicalPlanType::Extension(
534 protobuf::PhysicalExtensionNode { node: buf, inputs },
535 )),
536 })
537 }
538 Err(e) => internal_err!(
539 "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
540 ),
541 }
542 }
543}
544
545impl protobuf::PhysicalPlanNode {
546 fn try_into_explain_physical_plan(
547 &self,
548 explain: &protobuf::ExplainExecNode,
549 _ctx: &SessionContext,
550 _runtime: &RuntimeEnv,
551 _extension_codec: &dyn PhysicalExtensionCodec,
552 ) -> Result<Arc<dyn ExecutionPlan>> {
553 Ok(Arc::new(ExplainExec::new(
554 Arc::new(explain.schema.as_ref().unwrap().try_into()?),
555 explain
556 .stringified_plans
557 .iter()
558 .map(|plan| plan.into())
559 .collect(),
560 explain.verbose,
561 )))
562 }
563
564 fn try_into_projection_physical_plan(
565 &self,
566 projection: &protobuf::ProjectionExecNode,
567 ctx: &SessionContext,
568 runtime: &RuntimeEnv,
569 extension_codec: &dyn PhysicalExtensionCodec,
570 ) -> Result<Arc<dyn ExecutionPlan>> {
571 let input: Arc<dyn ExecutionPlan> =
572 into_physical_plan(&projection.input, ctx, runtime, extension_codec)?;
573 let exprs = projection
574 .expr
575 .iter()
576 .zip(projection.expr_name.iter())
577 .map(|(expr, name)| {
578 Ok((
579 parse_physical_expr(
580 expr,
581 ctx,
582 input.schema().as_ref(),
583 extension_codec,
584 )?,
585 name.to_string(),
586 ))
587 })
588 .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>>>()?;
589 let proj_exprs: Vec<ProjectionExpr> = exprs
590 .into_iter()
591 .map(|(expr, alias)| ProjectionExpr { expr, alias })
592 .collect();
593 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
594 }
595
596 fn try_into_filter_physical_plan(
597 &self,
598 filter: &protobuf::FilterExecNode,
599 ctx: &SessionContext,
600 runtime: &RuntimeEnv,
601 extension_codec: &dyn PhysicalExtensionCodec,
602 ) -> Result<Arc<dyn ExecutionPlan>> {
603 let input: Arc<dyn ExecutionPlan> =
604 into_physical_plan(&filter.input, ctx, runtime, extension_codec)?;
605
606 let predicate = filter
607 .expr
608 .as_ref()
609 .map(|expr| {
610 parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
611 })
612 .transpose()?
613 .ok_or_else(|| {
614 DataFusionError::Internal(
615 "filter (FilterExecNode) in PhysicalPlanNode is missing.".to_owned(),
616 )
617 })?;
618
619 let filter_selectivity = filter.default_filter_selectivity.try_into();
620 let projection = if !filter.projection.is_empty() {
621 Some(
622 filter
623 .projection
624 .iter()
625 .map(|i| *i as usize)
626 .collect::<Vec<_>>(),
627 )
628 } else {
629 None
630 };
631
632 let filter =
633 FilterExec::try_new(predicate, input)?.with_projection(projection)?;
634 match filter_selectivity {
635 Ok(filter_selectivity) => Ok(Arc::new(
636 filter.with_default_selectivity(filter_selectivity)?,
637 )),
638 Err(_) => Err(DataFusionError::Internal(
639 "filter_selectivity in PhysicalPlanNode is invalid ".to_owned(),
640 )),
641 }
642 }
643
644 fn try_into_csv_scan_physical_plan(
645 &self,
646 scan: &protobuf::CsvScanExecNode,
647 ctx: &SessionContext,
648 _runtime: &RuntimeEnv,
649 extension_codec: &dyn PhysicalExtensionCodec,
650 ) -> Result<Arc<dyn ExecutionPlan>> {
651 let escape =
652 if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(escape)) =
653 &scan.optional_escape
654 {
655 Some(str_to_byte(escape, "escape")?)
656 } else {
657 None
658 };
659
660 let comment = if let Some(
661 protobuf::csv_scan_exec_node::OptionalComment::Comment(comment),
662 ) = &scan.optional_comment
663 {
664 Some(str_to_byte(comment, "comment")?)
665 } else {
666 None
667 };
668
669 let source = Arc::new(
670 CsvSource::new(
671 scan.has_header,
672 str_to_byte(&scan.delimiter, "delimiter")?,
673 0,
674 )
675 .with_escape(escape)
676 .with_comment(comment),
677 );
678
679 let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config(
680 scan.base_conf.as_ref().unwrap(),
681 ctx,
682 extension_codec,
683 source,
684 )?)
685 .with_newlines_in_values(scan.newlines_in_values)
686 .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
687 .build();
688 Ok(DataSourceExec::from_data_source(conf))
689 }
690
691 fn try_into_json_scan_physical_plan(
692 &self,
693 scan: &protobuf::JsonScanExecNode,
694 ctx: &SessionContext,
695 _runtime: &RuntimeEnv,
696 extension_codec: &dyn PhysicalExtensionCodec,
697 ) -> Result<Arc<dyn ExecutionPlan>> {
698 let scan_conf = parse_protobuf_file_scan_config(
699 scan.base_conf.as_ref().unwrap(),
700 ctx,
701 extension_codec,
702 Arc::new(JsonSource::new()),
703 )?;
704 Ok(DataSourceExec::from_data_source(scan_conf))
705 }
706
707 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
708 fn try_into_parquet_scan_physical_plan(
709 &self,
710 scan: &protobuf::ParquetScanExecNode,
711 ctx: &SessionContext,
712 _runtime: &RuntimeEnv,
713 extension_codec: &dyn PhysicalExtensionCodec,
714 ) -> Result<Arc<dyn ExecutionPlan>> {
715 #[cfg(feature = "parquet")]
716 {
717 let schema =
718 parse_protobuf_file_scan_schema(scan.base_conf.as_ref().unwrap())?;
719
720 let base_conf = scan.base_conf.as_ref().unwrap();
722 let predicate_schema = if !base_conf.projection.is_empty() {
723 let projected_fields: Vec<_> = base_conf
725 .projection
726 .iter()
727 .map(|&i| schema.field(i as usize).clone())
728 .collect();
729 Arc::new(Schema::new(projected_fields))
730 } else {
731 schema
732 };
733
734 let predicate = scan
735 .predicate
736 .as_ref()
737 .map(|expr| {
738 parse_physical_expr(
739 expr,
740 ctx,
741 predicate_schema.as_ref(),
742 extension_codec,
743 )
744 })
745 .transpose()?;
746 let mut options = TableParquetOptions::default();
747
748 if let Some(table_options) = scan.parquet_options.as_ref() {
749 options = table_options.try_into()?;
750 }
751 let mut source = ParquetSource::new(options);
752
753 if let Some(predicate) = predicate {
754 source = source.with_predicate(predicate);
755 }
756 let base_config = parse_protobuf_file_scan_config(
757 base_conf,
758 ctx,
759 extension_codec,
760 Arc::new(source),
761 )?;
762 Ok(DataSourceExec::from_data_source(base_config))
763 }
764 #[cfg(not(feature = "parquet"))]
765 panic!("Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled")
766 }
767
768 #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
769 fn try_into_avro_scan_physical_plan(
770 &self,
771 scan: &protobuf::AvroScanExecNode,
772 ctx: &SessionContext,
773 _runtime: &RuntimeEnv,
774 extension_codec: &dyn PhysicalExtensionCodec,
775 ) -> Result<Arc<dyn ExecutionPlan>> {
776 #[cfg(feature = "avro")]
777 {
778 let conf = parse_protobuf_file_scan_config(
779 scan.base_conf.as_ref().unwrap(),
780 ctx,
781 extension_codec,
782 Arc::new(AvroSource::new()),
783 )?;
784 Ok(DataSourceExec::from_data_source(conf))
785 }
786 #[cfg(not(feature = "avro"))]
787 panic!("Unable to process a Avro PhysicalPlan when `avro` feature is not enabled")
788 }
789
790 fn try_into_memory_scan_physical_plan(
791 &self,
792 scan: &protobuf::MemoryScanExecNode,
793 ctx: &SessionContext,
794 _runtime: &RuntimeEnv,
795 extension_codec: &dyn PhysicalExtensionCodec,
796 ) -> Result<Arc<dyn ExecutionPlan>> {
797 let partitions = scan
798 .partitions
799 .iter()
800 .map(|p| parse_record_batches(p))
801 .collect::<Result<Vec<_>>>()?;
802
803 let proto_schema = scan.schema.as_ref().ok_or_else(|| {
804 DataFusionError::Internal(
805 "schema in MemoryScanExecNode is missing.".to_owned(),
806 )
807 })?;
808 let schema: SchemaRef = SchemaRef::new(proto_schema.try_into()?);
809
810 let projection = if !scan.projection.is_empty() {
811 Some(
812 scan.projection
813 .iter()
814 .map(|i| *i as usize)
815 .collect::<Vec<_>>(),
816 )
817 } else {
818 None
819 };
820
821 let mut sort_information = vec![];
822 for ordering in &scan.sort_information {
823 let sort_exprs = parse_physical_sort_exprs(
824 &ordering.physical_sort_expr_nodes,
825 ctx,
826 &schema,
827 extension_codec,
828 )?;
829 sort_information.extend(LexOrdering::new(sort_exprs));
830 }
831
832 let source = MemorySourceConfig::try_new(&partitions, schema, projection)?
833 .with_limit(scan.fetch.map(|f| f as usize))
834 .with_show_sizes(scan.show_sizes);
835
836 let source = source.try_with_sort_information(sort_information)?;
837
838 Ok(DataSourceExec::from_data_source(source))
839 }
840
841 fn try_into_coalesce_batches_physical_plan(
842 &self,
843 coalesce_batches: &protobuf::CoalesceBatchesExecNode,
844 ctx: &SessionContext,
845 runtime: &RuntimeEnv,
846 extension_codec: &dyn PhysicalExtensionCodec,
847 ) -> Result<Arc<dyn ExecutionPlan>> {
848 let input: Arc<dyn ExecutionPlan> =
849 into_physical_plan(&coalesce_batches.input, ctx, runtime, extension_codec)?;
850 Ok(Arc::new(
851 CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize)
852 .with_fetch(coalesce_batches.fetch.map(|f| f as usize)),
853 ))
854 }
855
856 fn try_into_merge_physical_plan(
857 &self,
858 merge: &protobuf::CoalescePartitionsExecNode,
859 ctx: &SessionContext,
860 runtime: &RuntimeEnv,
861 extension_codec: &dyn PhysicalExtensionCodec,
862 ) -> Result<Arc<dyn ExecutionPlan>> {
863 let input: Arc<dyn ExecutionPlan> =
864 into_physical_plan(&merge.input, ctx, runtime, extension_codec)?;
865 Ok(Arc::new(
866 CoalescePartitionsExec::new(input)
867 .with_fetch(merge.fetch.map(|f| f as usize)),
868 ))
869 }
870
871 fn try_into_repartition_physical_plan(
872 &self,
873 repart: &protobuf::RepartitionExecNode,
874 ctx: &SessionContext,
875 runtime: &RuntimeEnv,
876 extension_codec: &dyn PhysicalExtensionCodec,
877 ) -> Result<Arc<dyn ExecutionPlan>> {
878 let input: Arc<dyn ExecutionPlan> =
879 into_physical_plan(&repart.input, ctx, runtime, extension_codec)?;
880 let partitioning = parse_protobuf_partitioning(
881 repart.partitioning.as_ref(),
882 ctx,
883 input.schema().as_ref(),
884 extension_codec,
885 )?;
886 Ok(Arc::new(RepartitionExec::try_new(
887 input,
888 partitioning.unwrap(),
889 )?))
890 }
891
892 fn try_into_global_limit_physical_plan(
893 &self,
894 limit: &protobuf::GlobalLimitExecNode,
895 ctx: &SessionContext,
896 runtime: &RuntimeEnv,
897 extension_codec: &dyn PhysicalExtensionCodec,
898 ) -> Result<Arc<dyn ExecutionPlan>> {
899 let input: Arc<dyn ExecutionPlan> =
900 into_physical_plan(&limit.input, ctx, runtime, extension_codec)?;
901 let fetch = if limit.fetch >= 0 {
902 Some(limit.fetch as usize)
903 } else {
904 None
905 };
906 Ok(Arc::new(GlobalLimitExec::new(
907 input,
908 limit.skip as usize,
909 fetch,
910 )))
911 }
912
913 fn try_into_local_limit_physical_plan(
914 &self,
915 limit: &protobuf::LocalLimitExecNode,
916 ctx: &SessionContext,
917 runtime: &RuntimeEnv,
918 extension_codec: &dyn PhysicalExtensionCodec,
919 ) -> Result<Arc<dyn ExecutionPlan>> {
920 let input: Arc<dyn ExecutionPlan> =
921 into_physical_plan(&limit.input, ctx, runtime, extension_codec)?;
922 Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
923 }
924
925 fn try_into_window_physical_plan(
926 &self,
927 window_agg: &protobuf::WindowAggExecNode,
928 ctx: &SessionContext,
929 runtime: &RuntimeEnv,
930 extension_codec: &dyn PhysicalExtensionCodec,
931 ) -> Result<Arc<dyn ExecutionPlan>> {
932 let input: Arc<dyn ExecutionPlan> =
933 into_physical_plan(&window_agg.input, ctx, runtime, extension_codec)?;
934 let input_schema = input.schema();
935
936 let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
937 .window_expr
938 .iter()
939 .map(|window_expr| {
940 parse_physical_window_expr(
941 window_expr,
942 ctx,
943 input_schema.as_ref(),
944 extension_codec,
945 )
946 })
947 .collect::<Result<Vec<_>, _>>()?;
948
949 let partition_keys = window_agg
950 .partition_keys
951 .iter()
952 .map(|expr| {
953 parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
954 })
955 .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
956
957 if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() {
958 let input_order_mode = match input_order_mode {
959 window_agg_exec_node::InputOrderMode::Linear(_) => InputOrderMode::Linear,
960 window_agg_exec_node::InputOrderMode::PartiallySorted(
961 protobuf::PartiallySortedInputOrderMode { columns },
962 ) => InputOrderMode::PartiallySorted(
963 columns.iter().map(|c| *c as usize).collect(),
964 ),
965 window_agg_exec_node::InputOrderMode::Sorted(_) => InputOrderMode::Sorted,
966 };
967
968 Ok(Arc::new(BoundedWindowAggExec::try_new(
969 physical_window_expr,
970 input,
971 input_order_mode,
972 !partition_keys.is_empty(),
973 )?))
974 } else {
975 Ok(Arc::new(WindowAggExec::try_new(
976 physical_window_expr,
977 input,
978 !partition_keys.is_empty(),
979 )?))
980 }
981 }
982
983 fn try_into_aggregate_physical_plan(
984 &self,
985 hash_agg: &protobuf::AggregateExecNode,
986 ctx: &SessionContext,
987 runtime: &RuntimeEnv,
988 extension_codec: &dyn PhysicalExtensionCodec,
989 ) -> Result<Arc<dyn ExecutionPlan>> {
990 let input: Arc<dyn ExecutionPlan> =
991 into_physical_plan(&hash_agg.input, ctx, runtime, extension_codec)?;
992 let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(|_| {
993 proto_error(format!(
994 "Received a AggregateNode message with unknown AggregateMode {}",
995 hash_agg.mode
996 ))
997 })?;
998 let agg_mode: AggregateMode = match mode {
999 protobuf::AggregateMode::Partial => AggregateMode::Partial,
1000 protobuf::AggregateMode::Final => AggregateMode::Final,
1001 protobuf::AggregateMode::FinalPartitioned => AggregateMode::FinalPartitioned,
1002 protobuf::AggregateMode::Single => AggregateMode::Single,
1003 protobuf::AggregateMode::SinglePartitioned => {
1004 AggregateMode::SinglePartitioned
1005 }
1006 };
1007
1008 let num_expr = hash_agg.group_expr.len();
1009
1010 let group_expr = hash_agg
1011 .group_expr
1012 .iter()
1013 .zip(hash_agg.group_expr_name.iter())
1014 .map(|(expr, name)| {
1015 parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
1016 .map(|expr| (expr, name.to_string()))
1017 })
1018 .collect::<Result<Vec<_>, _>>()?;
1019
1020 let null_expr = hash_agg
1021 .null_expr
1022 .iter()
1023 .zip(hash_agg.group_expr_name.iter())
1024 .map(|(expr, name)| {
1025 parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
1026 .map(|expr| (expr, name.to_string()))
1027 })
1028 .collect::<Result<Vec<_>, _>>()?;
1029
1030 let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
1031 hash_agg
1032 .groups
1033 .chunks(num_expr)
1034 .map(|g| g.to_vec())
1035 .collect::<Vec<Vec<bool>>>()
1036 } else {
1037 vec![]
1038 };
1039
1040 let input_schema = hash_agg.input_schema.as_ref().ok_or_else(|| {
1041 DataFusionError::Internal(
1042 "input_schema in AggregateNode is missing.".to_owned(),
1043 )
1044 })?;
1045 let physical_schema: SchemaRef = SchemaRef::new(input_schema.try_into()?);
1046
1047 let physical_filter_expr = hash_agg
1048 .filter_expr
1049 .iter()
1050 .map(|expr| {
1051 expr.expr
1052 .as_ref()
1053 .map(|e| {
1054 parse_physical_expr(e, ctx, &physical_schema, extension_codec)
1055 })
1056 .transpose()
1057 })
1058 .collect::<Result<Vec<_>, _>>()?;
1059
1060 let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> = hash_agg
1061 .aggr_expr
1062 .iter()
1063 .zip(hash_agg.aggr_expr_name.iter())
1064 .map(|(expr, name)| {
1065 let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
1066 proto_error("Unexpected empty aggregate physical expression")
1067 })?;
1068
1069 match expr_type {
1070 ExprType::AggregateExpr(agg_node) => {
1071 let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node
1072 .expr
1073 .iter()
1074 .map(|e| {
1075 parse_physical_expr(
1076 e,
1077 ctx,
1078 &physical_schema,
1079 extension_codec,
1080 )
1081 })
1082 .collect::<Result<Vec<_>>>()?;
1083 let order_bys = agg_node
1084 .ordering_req
1085 .iter()
1086 .map(|e| {
1087 parse_physical_sort_expr(
1088 e,
1089 ctx,
1090 &physical_schema,
1091 extension_codec,
1092 )
1093 })
1094 .collect::<Result<_>>()?;
1095 agg_node
1096 .aggregate_function
1097 .as_ref()
1098 .map(|func| match func {
1099 AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
1100 let agg_udf = match &agg_node.fun_definition {
1101 Some(buf) => extension_codec
1102 .try_decode_udaf(udaf_name, buf)?,
1103 None => ctx.udaf(udaf_name).or_else(|_| {
1104 extension_codec
1105 .try_decode_udaf(udaf_name, &[])
1106 })?,
1107 };
1108
1109 AggregateExprBuilder::new(agg_udf, input_phy_expr)
1110 .schema(Arc::clone(&physical_schema))
1111 .alias(name)
1112 .human_display(agg_node.human_display.clone())
1113 .with_ignore_nulls(agg_node.ignore_nulls)
1114 .with_distinct(agg_node.distinct)
1115 .order_by(order_bys)
1116 .build()
1117 .map(Arc::new)
1118 }
1119 })
1120 .transpose()?
1121 .ok_or_else(|| {
1122 proto_error(
1123 "Invalid AggregateExpr, missing aggregate_function",
1124 )
1125 })
1126 }
1127 _ => internal_err!("Invalid aggregate expression for AggregateExec"),
1128 }
1129 })
1130 .collect::<Result<Vec<_>, _>>()?;
1131
1132 let limit = hash_agg
1133 .limit
1134 .as_ref()
1135 .map(|lit_value| lit_value.limit as usize);
1136
1137 let agg = AggregateExec::try_new(
1138 agg_mode,
1139 PhysicalGroupBy::new(group_expr, null_expr, groups),
1140 physical_aggr_expr,
1141 physical_filter_expr,
1142 input,
1143 physical_schema,
1144 )?;
1145
1146 let agg = agg.with_limit(limit);
1147
1148 Ok(Arc::new(agg))
1149 }
1150
1151 fn try_into_hash_join_physical_plan(
1152 &self,
1153 hashjoin: &protobuf::HashJoinExecNode,
1154 ctx: &SessionContext,
1155 runtime: &RuntimeEnv,
1156 extension_codec: &dyn PhysicalExtensionCodec,
1157 ) -> Result<Arc<dyn ExecutionPlan>> {
1158 let left: Arc<dyn ExecutionPlan> =
1159 into_physical_plan(&hashjoin.left, ctx, runtime, extension_codec)?;
1160 let right: Arc<dyn ExecutionPlan> =
1161 into_physical_plan(&hashjoin.right, ctx, runtime, extension_codec)?;
1162 let left_schema = left.schema();
1163 let right_schema = right.schema();
1164 let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin
1165 .on
1166 .iter()
1167 .map(|col| {
1168 let left = parse_physical_expr(
1169 &col.left.clone().unwrap(),
1170 ctx,
1171 left_schema.as_ref(),
1172 extension_codec,
1173 )?;
1174 let right = parse_physical_expr(
1175 &col.right.clone().unwrap(),
1176 ctx,
1177 right_schema.as_ref(),
1178 extension_codec,
1179 )?;
1180 Ok((left, right))
1181 })
1182 .collect::<Result<_>>()?;
1183 let join_type =
1184 protobuf::JoinType::try_from(hashjoin.join_type).map_err(|_| {
1185 proto_error(format!(
1186 "Received a HashJoinNode message with unknown JoinType {}",
1187 hashjoin.join_type
1188 ))
1189 })?;
1190 let null_equality = protobuf::NullEquality::try_from(hashjoin.null_equality)
1191 .map_err(|_| {
1192 proto_error(format!(
1193 "Received a HashJoinNode message with unknown NullEquality {}",
1194 hashjoin.null_equality
1195 ))
1196 })?;
1197 let filter = hashjoin
1198 .filter
1199 .as_ref()
1200 .map(|f| {
1201 let schema = f
1202 .schema
1203 .as_ref()
1204 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1205 .try_into()?;
1206
1207 let expression = parse_physical_expr(
1208 f.expression.as_ref().ok_or_else(|| {
1209 proto_error("Unexpected empty filter expression")
1210 })?,
1211 ctx, &schema,
1212 extension_codec,
1213 )?;
1214 let column_indices = f.column_indices
1215 .iter()
1216 .map(|i| {
1217 let side = protobuf::JoinSide::try_from(i.side)
1218 .map_err(|_| proto_error(format!(
1219 "Received a HashJoinNode message with JoinSide in Filter {}",
1220 i.side))
1221 )?;
1222
1223 Ok(ColumnIndex {
1224 index: i.index as usize,
1225 side: side.into(),
1226 })
1227 })
1228 .collect::<Result<Vec<_>>>()?;
1229
1230 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1231 })
1232 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1233
1234 let partition_mode = protobuf::PartitionMode::try_from(hashjoin.partition_mode)
1235 .map_err(|_| {
1236 proto_error(format!(
1237 "Received a HashJoinNode message with unknown PartitionMode {}",
1238 hashjoin.partition_mode
1239 ))
1240 })?;
1241 let partition_mode = match partition_mode {
1242 protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
1243 protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
1244 protobuf::PartitionMode::Auto => PartitionMode::Auto,
1245 };
1246 let projection = if !hashjoin.projection.is_empty() {
1247 Some(
1248 hashjoin
1249 .projection
1250 .iter()
1251 .map(|i| *i as usize)
1252 .collect::<Vec<_>>(),
1253 )
1254 } else {
1255 None
1256 };
1257 Ok(Arc::new(HashJoinExec::try_new(
1258 left,
1259 right,
1260 on,
1261 filter,
1262 &join_type.into(),
1263 projection,
1264 partition_mode,
1265 null_equality.into(),
1266 )?))
1267 }
1268
1269 fn try_into_symmetric_hash_join_physical_plan(
1270 &self,
1271 sym_join: &protobuf::SymmetricHashJoinExecNode,
1272 ctx: &SessionContext,
1273 runtime: &RuntimeEnv,
1274 extension_codec: &dyn PhysicalExtensionCodec,
1275 ) -> Result<Arc<dyn ExecutionPlan>> {
1276 let left = into_physical_plan(&sym_join.left, ctx, runtime, extension_codec)?;
1277 let right = into_physical_plan(&sym_join.right, ctx, runtime, extension_codec)?;
1278 let left_schema = left.schema();
1279 let right_schema = right.schema();
1280 let on = sym_join
1281 .on
1282 .iter()
1283 .map(|col| {
1284 let left = parse_physical_expr(
1285 &col.left.clone().unwrap(),
1286 ctx,
1287 left_schema.as_ref(),
1288 extension_codec,
1289 )?;
1290 let right = parse_physical_expr(
1291 &col.right.clone().unwrap(),
1292 ctx,
1293 right_schema.as_ref(),
1294 extension_codec,
1295 )?;
1296 Ok((left, right))
1297 })
1298 .collect::<Result<_>>()?;
1299 let join_type =
1300 protobuf::JoinType::try_from(sym_join.join_type).map_err(|_| {
1301 proto_error(format!(
1302 "Received a SymmetricHashJoin message with unknown JoinType {}",
1303 sym_join.join_type
1304 ))
1305 })?;
1306 let null_equality = protobuf::NullEquality::try_from(sym_join.null_equality)
1307 .map_err(|_| {
1308 proto_error(format!(
1309 "Received a SymmetricHashJoin message with unknown NullEquality {}",
1310 sym_join.null_equality
1311 ))
1312 })?;
1313 let filter = sym_join
1314 .filter
1315 .as_ref()
1316 .map(|f| {
1317 let schema = f
1318 .schema
1319 .as_ref()
1320 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1321 .try_into()?;
1322
1323 let expression = parse_physical_expr(
1324 f.expression.as_ref().ok_or_else(|| {
1325 proto_error("Unexpected empty filter expression")
1326 })?,
1327 ctx, &schema,
1328 extension_codec,
1329 )?;
1330 let column_indices = f.column_indices
1331 .iter()
1332 .map(|i| {
1333 let side = protobuf::JoinSide::try_from(i.side)
1334 .map_err(|_| proto_error(format!(
1335 "Received a HashJoinNode message with JoinSide in Filter {}",
1336 i.side))
1337 )?;
1338
1339 Ok(ColumnIndex {
1340 index: i.index as usize,
1341 side: side.into(),
1342 })
1343 })
1344 .collect::<Result<_>>()?;
1345
1346 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1347 })
1348 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1349
1350 let left_sort_exprs = parse_physical_sort_exprs(
1351 &sym_join.left_sort_exprs,
1352 ctx,
1353 &left_schema,
1354 extension_codec,
1355 )?;
1356 let left_sort_exprs = LexOrdering::new(left_sort_exprs);
1357
1358 let right_sort_exprs = parse_physical_sort_exprs(
1359 &sym_join.right_sort_exprs,
1360 ctx,
1361 &right_schema,
1362 extension_codec,
1363 )?;
1364 let right_sort_exprs = LexOrdering::new(right_sort_exprs);
1365
1366 let partition_mode = protobuf::StreamPartitionMode::try_from(
1367 sym_join.partition_mode,
1368 )
1369 .map_err(|_| {
1370 proto_error(format!(
1371 "Received a SymmetricHashJoin message with unknown PartitionMode {}",
1372 sym_join.partition_mode
1373 ))
1374 })?;
1375 let partition_mode = match partition_mode {
1376 protobuf::StreamPartitionMode::SinglePartition => {
1377 StreamJoinPartitionMode::SinglePartition
1378 }
1379 protobuf::StreamPartitionMode::PartitionedExec => {
1380 StreamJoinPartitionMode::Partitioned
1381 }
1382 };
1383 SymmetricHashJoinExec::try_new(
1384 left,
1385 right,
1386 on,
1387 filter,
1388 &join_type.into(),
1389 null_equality.into(),
1390 left_sort_exprs,
1391 right_sort_exprs,
1392 partition_mode,
1393 )
1394 .map(|e| Arc::new(e) as _)
1395 }
1396
1397 fn try_into_union_physical_plan(
1398 &self,
1399 union: &protobuf::UnionExecNode,
1400 ctx: &SessionContext,
1401 runtime: &RuntimeEnv,
1402 extension_codec: &dyn PhysicalExtensionCodec,
1403 ) -> Result<Arc<dyn ExecutionPlan>> {
1404 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1405 for input in &union.inputs {
1406 inputs.push(input.try_into_physical_plan(ctx, runtime, extension_codec)?);
1407 }
1408 Ok(Arc::new(UnionExec::new(inputs)))
1409 }
1410
1411 fn try_into_interleave_physical_plan(
1412 &self,
1413 interleave: &protobuf::InterleaveExecNode,
1414 ctx: &SessionContext,
1415 runtime: &RuntimeEnv,
1416 extension_codec: &dyn PhysicalExtensionCodec,
1417 ) -> Result<Arc<dyn ExecutionPlan>> {
1418 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1419 for input in &interleave.inputs {
1420 inputs.push(input.try_into_physical_plan(ctx, runtime, extension_codec)?);
1421 }
1422 Ok(Arc::new(InterleaveExec::try_new(inputs)?))
1423 }
1424
1425 fn try_into_cross_join_physical_plan(
1426 &self,
1427 crossjoin: &protobuf::CrossJoinExecNode,
1428 ctx: &SessionContext,
1429 runtime: &RuntimeEnv,
1430 extension_codec: &dyn PhysicalExtensionCodec,
1431 ) -> Result<Arc<dyn ExecutionPlan>> {
1432 let left: Arc<dyn ExecutionPlan> =
1433 into_physical_plan(&crossjoin.left, ctx, runtime, extension_codec)?;
1434 let right: Arc<dyn ExecutionPlan> =
1435 into_physical_plan(&crossjoin.right, ctx, runtime, extension_codec)?;
1436 Ok(Arc::new(CrossJoinExec::new(left, right)))
1437 }
1438
1439 fn try_into_empty_physical_plan(
1440 &self,
1441 empty: &protobuf::EmptyExecNode,
1442 _ctx: &SessionContext,
1443 _runtime: &RuntimeEnv,
1444 _extension_codec: &dyn PhysicalExtensionCodec,
1445 ) -> Result<Arc<dyn ExecutionPlan>> {
1446 let schema = Arc::new(convert_required!(empty.schema)?);
1447 Ok(Arc::new(EmptyExec::new(schema)))
1448 }
1449
1450 fn try_into_placeholder_row_physical_plan(
1451 &self,
1452 placeholder: &protobuf::PlaceholderRowExecNode,
1453 _ctx: &SessionContext,
1454 _runtime: &RuntimeEnv,
1455 _extension_codec: &dyn PhysicalExtensionCodec,
1456 ) -> Result<Arc<dyn ExecutionPlan>> {
1457 let schema = Arc::new(convert_required!(placeholder.schema)?);
1458 Ok(Arc::new(PlaceholderRowExec::new(schema)))
1459 }
1460
1461 fn try_into_sort_physical_plan(
1462 &self,
1463 sort: &protobuf::SortExecNode,
1464 ctx: &SessionContext,
1465 runtime: &RuntimeEnv,
1466 extension_codec: &dyn PhysicalExtensionCodec,
1467 ) -> Result<Arc<dyn ExecutionPlan>> {
1468 let input = into_physical_plan(&sort.input, ctx, runtime, extension_codec)?;
1469 let exprs = sort
1470 .expr
1471 .iter()
1472 .map(|expr| {
1473 let expr = expr.expr_type.as_ref().ok_or_else(|| {
1474 proto_error(format!(
1475 "physical_plan::from_proto() Unexpected expr {self:?}"
1476 ))
1477 })?;
1478 if let ExprType::Sort(sort_expr) = expr {
1479 let expr = sort_expr
1480 .expr
1481 .as_ref()
1482 .ok_or_else(|| {
1483 proto_error(format!(
1484 "physical_plan::from_proto() Unexpected sort expr {self:?}"
1485 ))
1486 })?
1487 .as_ref();
1488 Ok(PhysicalSortExpr {
1489 expr: parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)?,
1490 options: SortOptions {
1491 descending: !sort_expr.asc,
1492 nulls_first: sort_expr.nulls_first,
1493 },
1494 })
1495 } else {
1496 internal_err!(
1497 "physical_plan::from_proto() {self:?}"
1498 )
1499 }
1500 })
1501 .collect::<Result<Vec<_>>>()?;
1502 let Some(ordering) = LexOrdering::new(exprs) else {
1503 return internal_err!("SortExec requires an ordering");
1504 };
1505 let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1506 let new_sort = SortExec::new(ordering, input)
1507 .with_fetch(fetch)
1508 .with_preserve_partitioning(sort.preserve_partitioning);
1509
1510 Ok(Arc::new(new_sort))
1511 }
1512
1513 fn try_into_sort_preserving_merge_physical_plan(
1514 &self,
1515 sort: &protobuf::SortPreservingMergeExecNode,
1516 ctx: &SessionContext,
1517 runtime: &RuntimeEnv,
1518 extension_codec: &dyn PhysicalExtensionCodec,
1519 ) -> Result<Arc<dyn ExecutionPlan>> {
1520 let input = into_physical_plan(&sort.input, ctx, runtime, extension_codec)?;
1521 let exprs = sort
1522 .expr
1523 .iter()
1524 .map(|expr| {
1525 let expr = expr.expr_type.as_ref().ok_or_else(|| {
1526 proto_error(format!(
1527 "physical_plan::from_proto() Unexpected expr {self:?}"
1528 ))
1529 })?;
1530 if let ExprType::Sort(sort_expr) = expr {
1531 let expr = sort_expr
1532 .expr
1533 .as_ref()
1534 .ok_or_else(|| {
1535 proto_error(format!(
1536 "physical_plan::from_proto() Unexpected sort expr {self:?}"
1537 ))
1538 })?
1539 .as_ref();
1540 Ok(PhysicalSortExpr {
1541 expr: parse_physical_expr(
1542 expr,
1543 ctx,
1544 input.schema().as_ref(),
1545 extension_codec,
1546 )?,
1547 options: SortOptions {
1548 descending: !sort_expr.asc,
1549 nulls_first: sort_expr.nulls_first,
1550 },
1551 })
1552 } else {
1553 internal_err!("physical_plan::from_proto() {self:?}")
1554 }
1555 })
1556 .collect::<Result<Vec<_>>>()?;
1557 let Some(ordering) = LexOrdering::new(exprs) else {
1558 return internal_err!("SortExec requires an ordering");
1559 };
1560 let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1561 Ok(Arc::new(
1562 SortPreservingMergeExec::new(ordering, input).with_fetch(fetch),
1563 ))
1564 }
1565
1566 fn try_into_extension_physical_plan(
1567 &self,
1568 extension: &protobuf::PhysicalExtensionNode,
1569 ctx: &SessionContext,
1570 runtime: &RuntimeEnv,
1571 extension_codec: &dyn PhysicalExtensionCodec,
1572 ) -> Result<Arc<dyn ExecutionPlan>> {
1573 let inputs: Vec<Arc<dyn ExecutionPlan>> = extension
1574 .inputs
1575 .iter()
1576 .map(|i| i.try_into_physical_plan(ctx, runtime, extension_codec))
1577 .collect::<Result<_>>()?;
1578
1579 let extension_node =
1580 extension_codec.try_decode(extension.node.as_slice(), &inputs, ctx)?;
1581
1582 Ok(extension_node)
1583 }
1584
1585 fn try_into_nested_loop_join_physical_plan(
1586 &self,
1587 join: &protobuf::NestedLoopJoinExecNode,
1588 ctx: &SessionContext,
1589 runtime: &RuntimeEnv,
1590 extension_codec: &dyn PhysicalExtensionCodec,
1591 ) -> Result<Arc<dyn ExecutionPlan>> {
1592 let left: Arc<dyn ExecutionPlan> =
1593 into_physical_plan(&join.left, ctx, runtime, extension_codec)?;
1594 let right: Arc<dyn ExecutionPlan> =
1595 into_physical_plan(&join.right, ctx, runtime, extension_codec)?;
1596 let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| {
1597 proto_error(format!(
1598 "Received a NestedLoopJoinExecNode message with unknown JoinType {}",
1599 join.join_type
1600 ))
1601 })?;
1602 let filter = join
1603 .filter
1604 .as_ref()
1605 .map(|f| {
1606 let schema = f
1607 .schema
1608 .as_ref()
1609 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1610 .try_into()?;
1611
1612 let expression = parse_physical_expr(
1613 f.expression.as_ref().ok_or_else(|| {
1614 proto_error("Unexpected empty filter expression")
1615 })?,
1616 ctx, &schema,
1617 extension_codec,
1618 )?;
1619 let column_indices = f.column_indices
1620 .iter()
1621 .map(|i| {
1622 let side = protobuf::JoinSide::try_from(i.side)
1623 .map_err(|_| proto_error(format!(
1624 "Received a NestedLoopJoinExecNode message with JoinSide in Filter {}",
1625 i.side))
1626 )?;
1627
1628 Ok(ColumnIndex {
1629 index: i.index as usize,
1630 side: side.into(),
1631 })
1632 })
1633 .collect::<Result<Vec<_>>>()?;
1634
1635 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1636 })
1637 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1638
1639 let projection = if !join.projection.is_empty() {
1640 Some(
1641 join.projection
1642 .iter()
1643 .map(|i| *i as usize)
1644 .collect::<Vec<_>>(),
1645 )
1646 } else {
1647 None
1648 };
1649
1650 Ok(Arc::new(NestedLoopJoinExec::try_new(
1651 left,
1652 right,
1653 filter,
1654 &join_type.into(),
1655 projection,
1656 )?))
1657 }
1658
1659 fn try_into_analyze_physical_plan(
1660 &self,
1661 analyze: &protobuf::AnalyzeExecNode,
1662 ctx: &SessionContext,
1663 runtime: &RuntimeEnv,
1664 extension_codec: &dyn PhysicalExtensionCodec,
1665 ) -> Result<Arc<dyn ExecutionPlan>> {
1666 let input: Arc<dyn ExecutionPlan> =
1667 into_physical_plan(&analyze.input, ctx, runtime, extension_codec)?;
1668 Ok(Arc::new(AnalyzeExec::new(
1669 analyze.verbose,
1670 analyze.show_statistics,
1671 input,
1672 Arc::new(convert_required!(analyze.schema)?),
1673 )))
1674 }
1675
1676 fn try_into_json_sink_physical_plan(
1677 &self,
1678 sink: &protobuf::JsonSinkExecNode,
1679 ctx: &SessionContext,
1680 runtime: &RuntimeEnv,
1681 extension_codec: &dyn PhysicalExtensionCodec,
1682 ) -> Result<Arc<dyn ExecutionPlan>> {
1683 let input = into_physical_plan(&sink.input, ctx, runtime, extension_codec)?;
1684
1685 let data_sink: JsonSink = sink
1686 .sink
1687 .as_ref()
1688 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1689 .try_into()?;
1690 let sink_schema = input.schema();
1691 let sort_order = sink
1692 .sort_order
1693 .as_ref()
1694 .map(|collection| {
1695 parse_physical_sort_exprs(
1696 &collection.physical_sort_expr_nodes,
1697 ctx,
1698 &sink_schema,
1699 extension_codec,
1700 )
1701 .map(|sort_exprs| {
1702 LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1703 })
1704 })
1705 .transpose()?
1706 .flatten();
1707 Ok(Arc::new(DataSinkExec::new(
1708 input,
1709 Arc::new(data_sink),
1710 sort_order,
1711 )))
1712 }
1713
1714 fn try_into_csv_sink_physical_plan(
1715 &self,
1716 sink: &protobuf::CsvSinkExecNode,
1717 ctx: &SessionContext,
1718 runtime: &RuntimeEnv,
1719 extension_codec: &dyn PhysicalExtensionCodec,
1720 ) -> Result<Arc<dyn ExecutionPlan>> {
1721 let input = into_physical_plan(&sink.input, ctx, runtime, extension_codec)?;
1722
1723 let data_sink: CsvSink = sink
1724 .sink
1725 .as_ref()
1726 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1727 .try_into()?;
1728 let sink_schema = input.schema();
1729 let sort_order = sink
1730 .sort_order
1731 .as_ref()
1732 .map(|collection| {
1733 parse_physical_sort_exprs(
1734 &collection.physical_sort_expr_nodes,
1735 ctx,
1736 &sink_schema,
1737 extension_codec,
1738 )
1739 .map(|sort_exprs| {
1740 LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1741 })
1742 })
1743 .transpose()?
1744 .flatten();
1745 Ok(Arc::new(DataSinkExec::new(
1746 input,
1747 Arc::new(data_sink),
1748 sort_order,
1749 )))
1750 }
1751
1752 fn try_into_parquet_sink_physical_plan(
1753 &self,
1754 sink: &protobuf::ParquetSinkExecNode,
1755 ctx: &SessionContext,
1756 runtime: &RuntimeEnv,
1757 extension_codec: &dyn PhysicalExtensionCodec,
1758 ) -> Result<Arc<dyn ExecutionPlan>> {
1759 #[cfg(feature = "parquet")]
1760 {
1761 let input = into_physical_plan(&sink.input, ctx, runtime, extension_codec)?;
1762
1763 let data_sink: ParquetSink = sink
1764 .sink
1765 .as_ref()
1766 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1767 .try_into()?;
1768 let sink_schema = input.schema();
1769 let sort_order = sink
1770 .sort_order
1771 .as_ref()
1772 .map(|collection| {
1773 parse_physical_sort_exprs(
1774 &collection.physical_sort_expr_nodes,
1775 ctx,
1776 &sink_schema,
1777 extension_codec,
1778 )
1779 .map(|sort_exprs| {
1780 LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1781 })
1782 })
1783 .transpose()?
1784 .flatten();
1785 Ok(Arc::new(DataSinkExec::new(
1786 input,
1787 Arc::new(data_sink),
1788 sort_order,
1789 )))
1790 }
1791 #[cfg(not(feature = "parquet"))]
1792 panic!("Trying to use ParquetSink without `parquet` feature enabled");
1793 }
1794
1795 fn try_into_unnest_physical_plan(
1796 &self,
1797 unnest: &protobuf::UnnestExecNode,
1798 ctx: &SessionContext,
1799 runtime: &RuntimeEnv,
1800 extension_codec: &dyn PhysicalExtensionCodec,
1801 ) -> Result<Arc<dyn ExecutionPlan>> {
1802 let input = into_physical_plan(&unnest.input, ctx, runtime, extension_codec)?;
1803
1804 Ok(Arc::new(UnnestExec::new(
1805 input,
1806 unnest
1807 .list_type_columns
1808 .iter()
1809 .map(|c| ListUnnest {
1810 index_in_input_schema: c.index_in_input_schema as _,
1811 depth: c.depth as _,
1812 })
1813 .collect(),
1814 unnest.struct_type_columns.iter().map(|c| *c as _).collect(),
1815 Arc::new(convert_required!(unnest.schema)?),
1816 into_required!(unnest.options)?,
1817 )))
1818 }
1819
1820 fn generate_series_name_to_str(name: protobuf::GenerateSeriesName) -> &'static str {
1821 match name {
1822 protobuf::GenerateSeriesName::GsGenerateSeries => "generate_series",
1823 protobuf::GenerateSeriesName::GsRange => "range",
1824 }
1825 }
1826 fn try_into_sort_join(
1827 &self,
1828 sort_join: &SortMergeJoinExecNode,
1829 ctx: &SessionContext,
1830 runtime: &RuntimeEnv,
1831 extension_codec: &dyn PhysicalExtensionCodec,
1832 ) -> Result<Arc<dyn ExecutionPlan>> {
1833 let left = into_physical_plan(&sort_join.left, ctx, runtime, extension_codec)?;
1834 let left_schema = left.schema();
1835 let right = into_physical_plan(&sort_join.right, ctx, runtime, extension_codec)?;
1836 let right_schema = right.schema();
1837
1838 let filter = sort_join
1839 .filter
1840 .as_ref()
1841 .map(|f| {
1842 let schema = f
1843 .schema
1844 .as_ref()
1845 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1846 .try_into()?;
1847
1848 let expression = parse_physical_expr(
1849 f.expression.as_ref().ok_or_else(|| {
1850 proto_error("Unexpected empty filter expression")
1851 })?,
1852 ctx,
1853 &schema,
1854 extension_codec,
1855 )?;
1856 let column_indices = f
1857 .column_indices
1858 .iter()
1859 .map(|i| {
1860 let side =
1861 protobuf::JoinSide::try_from(i.side).map_err(|_| {
1862 proto_error(format!(
1863 "Received a SortMergeJoinExecNode message with JoinSide in Filter {}",
1864 i.side
1865 ))
1866 })?;
1867
1868 Ok(ColumnIndex {
1869 index: i.index as usize,
1870 side: side.into(),
1871 })
1872 })
1873 .collect::<Result<Vec<_>>>()?;
1874
1875 Ok(JoinFilter::new(
1876 expression,
1877 column_indices,
1878 Arc::new(schema),
1879 ))
1880 })
1881 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1882
1883 let join_type =
1884 protobuf::JoinType::try_from(sort_join.join_type).map_err(|_| {
1885 proto_error(format!(
1886 "Received a SortMergeJoinExecNode message with unknown JoinType {}",
1887 sort_join.join_type
1888 ))
1889 })?;
1890
1891 let null_equality = protobuf::NullEquality::try_from(sort_join.null_equality)
1892 .map_err(|_| {
1893 proto_error(format!(
1894 "Received a SortMergeJoinExecNode message with unknown NullEquality {}",
1895 sort_join.null_equality
1896 ))
1897 })?;
1898
1899 let sort_options = sort_join
1900 .sort_options
1901 .iter()
1902 .map(|e| SortOptions {
1903 descending: !e.asc,
1904 nulls_first: e.nulls_first,
1905 })
1906 .collect();
1907 let on = sort_join
1908 .on
1909 .iter()
1910 .map(|col| {
1911 let left = parse_physical_expr(
1912 &col.left.clone().unwrap(),
1913 ctx,
1914 left_schema.as_ref(),
1915 extension_codec,
1916 )?;
1917 let right = parse_physical_expr(
1918 &col.right.clone().unwrap(),
1919 ctx,
1920 right_schema.as_ref(),
1921 extension_codec,
1922 )?;
1923 Ok((left, right))
1924 })
1925 .collect::<Result<_>>()?;
1926
1927 Ok(Arc::new(SortMergeJoinExec::try_new(
1928 left,
1929 right,
1930 on,
1931 filter,
1932 join_type.into(),
1933 sort_options,
1934 null_equality.into(),
1935 )?))
1936 }
1937
1938 fn try_into_generate_series_physical_plan(
1939 &self,
1940 generate_series: &protobuf::GenerateSeriesNode,
1941 ) -> Result<Arc<dyn ExecutionPlan>> {
1942 let schema: SchemaRef = Arc::new(convert_required!(generate_series.schema)?);
1943
1944 let args = match &generate_series.args {
1945 Some(protobuf::generate_series_node::Args::ContainsNull(args)) => {
1946 GenSeriesArgs::ContainsNull {
1947 name: Self::generate_series_name_to_str(args.name()),
1948 }
1949 }
1950 Some(protobuf::generate_series_node::Args::Int64Args(args)) => {
1951 GenSeriesArgs::Int64Args {
1952 start: args.start,
1953 end: args.end,
1954 step: args.step,
1955 include_end: args.include_end,
1956 name: Self::generate_series_name_to_str(args.name()),
1957 }
1958 }
1959 Some(protobuf::generate_series_node::Args::TimestampArgs(args)) => {
1960 let step_proto = args.step.as_ref().ok_or_else(|| {
1961 DataFusionError::Internal("Missing step in TimestampArgs".to_string())
1962 })?;
1963 let step = IntervalMonthDayNanoType::make_value(
1964 step_proto.months,
1965 step_proto.days,
1966 step_proto.nanos,
1967 );
1968 GenSeriesArgs::TimestampArgs {
1969 start: args.start,
1970 end: args.end,
1971 step,
1972 tz: args.tz.as_ref().map(|s| Arc::from(s.as_str())),
1973 include_end: args.include_end,
1974 name: Self::generate_series_name_to_str(args.name()),
1975 }
1976 }
1977 Some(protobuf::generate_series_node::Args::DateArgs(args)) => {
1978 let step_proto = args.step.as_ref().ok_or_else(|| {
1979 DataFusionError::Internal("Missing step in DateArgs".to_string())
1980 })?;
1981 let step = IntervalMonthDayNanoType::make_value(
1982 step_proto.months,
1983 step_proto.days,
1984 step_proto.nanos,
1985 );
1986 GenSeriesArgs::DateArgs {
1987 start: args.start,
1988 end: args.end,
1989 step,
1990 include_end: args.include_end,
1991 name: Self::generate_series_name_to_str(args.name()),
1992 }
1993 }
1994 None => return internal_err!("Missing args in GenerateSeriesNode"),
1995 };
1996
1997 let table = GenerateSeriesTable::new(Arc::clone(&schema), args);
1998 let generator = table.as_generator(generate_series.target_batch_size as usize)?;
1999
2000 Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?))
2001 }
2002
2003 fn try_into_cooperative_physical_plan(
2004 &self,
2005 field_stream: &protobuf::CooperativeExecNode,
2006 ctx: &SessionContext,
2007 runtime: &RuntimeEnv,
2008 extension_codec: &dyn PhysicalExtensionCodec,
2009 ) -> Result<Arc<dyn ExecutionPlan>> {
2010 let input =
2011 into_physical_plan(&field_stream.input, ctx, runtime, extension_codec)?;
2012 Ok(Arc::new(CooperativeExec::new(input)))
2013 }
2014
2015 fn try_from_explain_exec(
2016 exec: &ExplainExec,
2017 _extension_codec: &dyn PhysicalExtensionCodec,
2018 ) -> Result<Self> {
2019 Ok(protobuf::PhysicalPlanNode {
2020 physical_plan_type: Some(PhysicalPlanType::Explain(
2021 protobuf::ExplainExecNode {
2022 schema: Some(exec.schema().as_ref().try_into()?),
2023 stringified_plans: exec
2024 .stringified_plans()
2025 .iter()
2026 .map(|plan| plan.into())
2027 .collect(),
2028 verbose: exec.verbose(),
2029 },
2030 )),
2031 })
2032 }
2033
2034 fn try_from_projection_exec(
2035 exec: &ProjectionExec,
2036 extension_codec: &dyn PhysicalExtensionCodec,
2037 ) -> Result<Self> {
2038 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2039 exec.input().to_owned(),
2040 extension_codec,
2041 )?;
2042 let expr = exec
2043 .expr()
2044 .iter()
2045 .map(|proj_expr| serialize_physical_expr(&proj_expr.expr, extension_codec))
2046 .collect::<Result<Vec<_>>>()?;
2047 let expr_name = exec
2048 .expr()
2049 .iter()
2050 .map(|proj_expr| proj_expr.alias.clone())
2051 .collect();
2052 Ok(protobuf::PhysicalPlanNode {
2053 physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
2054 protobuf::ProjectionExecNode {
2055 input: Some(Box::new(input)),
2056 expr,
2057 expr_name,
2058 },
2059 ))),
2060 })
2061 }
2062
2063 fn try_from_analyze_exec(
2064 exec: &AnalyzeExec,
2065 extension_codec: &dyn PhysicalExtensionCodec,
2066 ) -> Result<Self> {
2067 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2068 exec.input().to_owned(),
2069 extension_codec,
2070 )?;
2071 Ok(protobuf::PhysicalPlanNode {
2072 physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
2073 protobuf::AnalyzeExecNode {
2074 verbose: exec.verbose(),
2075 show_statistics: exec.show_statistics(),
2076 input: Some(Box::new(input)),
2077 schema: Some(exec.schema().as_ref().try_into()?),
2078 },
2079 ))),
2080 })
2081 }
2082
2083 fn try_from_filter_exec(
2084 exec: &FilterExec,
2085 extension_codec: &dyn PhysicalExtensionCodec,
2086 ) -> Result<Self> {
2087 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2088 exec.input().to_owned(),
2089 extension_codec,
2090 )?;
2091 Ok(protobuf::PhysicalPlanNode {
2092 physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
2093 protobuf::FilterExecNode {
2094 input: Some(Box::new(input)),
2095 expr: Some(serialize_physical_expr(
2096 exec.predicate(),
2097 extension_codec,
2098 )?),
2099 default_filter_selectivity: exec.default_selectivity() as u32,
2100 projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
2101 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2102 }),
2103 },
2104 ))),
2105 })
2106 }
2107
2108 fn try_from_global_limit_exec(
2109 limit: &GlobalLimitExec,
2110 extension_codec: &dyn PhysicalExtensionCodec,
2111 ) -> Result<Self> {
2112 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2113 limit.input().to_owned(),
2114 extension_codec,
2115 )?;
2116
2117 Ok(protobuf::PhysicalPlanNode {
2118 physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
2119 protobuf::GlobalLimitExecNode {
2120 input: Some(Box::new(input)),
2121 skip: limit.skip() as u32,
2122 fetch: match limit.fetch() {
2123 Some(n) => n as i64,
2124 _ => -1, },
2126 },
2127 ))),
2128 })
2129 }
2130
2131 fn try_from_local_limit_exec(
2132 limit: &LocalLimitExec,
2133 extension_codec: &dyn PhysicalExtensionCodec,
2134 ) -> Result<Self> {
2135 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2136 limit.input().to_owned(),
2137 extension_codec,
2138 )?;
2139 Ok(protobuf::PhysicalPlanNode {
2140 physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
2141 protobuf::LocalLimitExecNode {
2142 input: Some(Box::new(input)),
2143 fetch: limit.fetch() as u32,
2144 },
2145 ))),
2146 })
2147 }
2148
2149 fn try_from_hash_join_exec(
2150 exec: &HashJoinExec,
2151 extension_codec: &dyn PhysicalExtensionCodec,
2152 ) -> Result<Self> {
2153 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2154 exec.left().to_owned(),
2155 extension_codec,
2156 )?;
2157 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2158 exec.right().to_owned(),
2159 extension_codec,
2160 )?;
2161 let on: Vec<protobuf::JoinOn> = exec
2162 .on()
2163 .iter()
2164 .map(|tuple| {
2165 let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2166 let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2167 Ok::<_, DataFusionError>(protobuf::JoinOn {
2168 left: Some(l),
2169 right: Some(r),
2170 })
2171 })
2172 .collect::<Result<_>>()?;
2173 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2174 let null_equality: protobuf::NullEquality = exec.null_equality().into();
2175 let filter = exec
2176 .filter()
2177 .as_ref()
2178 .map(|f| {
2179 let expression =
2180 serialize_physical_expr(f.expression(), extension_codec)?;
2181 let column_indices = f
2182 .column_indices()
2183 .iter()
2184 .map(|i| {
2185 let side: protobuf::JoinSide = i.side.to_owned().into();
2186 protobuf::ColumnIndex {
2187 index: i.index as u32,
2188 side: side.into(),
2189 }
2190 })
2191 .collect();
2192 let schema = f.schema().as_ref().try_into()?;
2193 Ok(protobuf::JoinFilter {
2194 expression: Some(expression),
2195 column_indices,
2196 schema: Some(schema),
2197 })
2198 })
2199 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2200
2201 let partition_mode = match exec.partition_mode() {
2202 PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
2203 PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
2204 PartitionMode::Auto => protobuf::PartitionMode::Auto,
2205 };
2206
2207 Ok(protobuf::PhysicalPlanNode {
2208 physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
2209 protobuf::HashJoinExecNode {
2210 left: Some(Box::new(left)),
2211 right: Some(Box::new(right)),
2212 on,
2213 join_type: join_type.into(),
2214 partition_mode: partition_mode.into(),
2215 null_equality: null_equality.into(),
2216 filter,
2217 projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
2218 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2219 }),
2220 },
2221 ))),
2222 })
2223 }
2224
2225 fn try_from_symmetric_hash_join_exec(
2226 exec: &SymmetricHashJoinExec,
2227 extension_codec: &dyn PhysicalExtensionCodec,
2228 ) -> Result<Self> {
2229 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2230 exec.left().to_owned(),
2231 extension_codec,
2232 )?;
2233 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2234 exec.right().to_owned(),
2235 extension_codec,
2236 )?;
2237 let on = exec
2238 .on()
2239 .iter()
2240 .map(|tuple| {
2241 let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2242 let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2243 Ok::<_, DataFusionError>(protobuf::JoinOn {
2244 left: Some(l),
2245 right: Some(r),
2246 })
2247 })
2248 .collect::<Result<_>>()?;
2249 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2250 let null_equality: protobuf::NullEquality = exec.null_equality().into();
2251 let filter = exec
2252 .filter()
2253 .as_ref()
2254 .map(|f| {
2255 let expression =
2256 serialize_physical_expr(f.expression(), extension_codec)?;
2257 let column_indices = f
2258 .column_indices()
2259 .iter()
2260 .map(|i| {
2261 let side: protobuf::JoinSide = i.side.to_owned().into();
2262 protobuf::ColumnIndex {
2263 index: i.index as u32,
2264 side: side.into(),
2265 }
2266 })
2267 .collect();
2268 let schema = f.schema().as_ref().try_into()?;
2269 Ok(protobuf::JoinFilter {
2270 expression: Some(expression),
2271 column_indices,
2272 schema: Some(schema),
2273 })
2274 })
2275 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2276
2277 let partition_mode = match exec.partition_mode() {
2278 StreamJoinPartitionMode::SinglePartition => {
2279 protobuf::StreamPartitionMode::SinglePartition
2280 }
2281 StreamJoinPartitionMode::Partitioned => {
2282 protobuf::StreamPartitionMode::PartitionedExec
2283 }
2284 };
2285
2286 let left_sort_exprs = exec
2287 .left_sort_exprs()
2288 .map(|exprs| {
2289 exprs
2290 .iter()
2291 .map(|expr| {
2292 Ok(protobuf::PhysicalSortExprNode {
2293 expr: Some(Box::new(serialize_physical_expr(
2294 &expr.expr,
2295 extension_codec,
2296 )?)),
2297 asc: !expr.options.descending,
2298 nulls_first: expr.options.nulls_first,
2299 })
2300 })
2301 .collect::<Result<Vec<_>>>()
2302 })
2303 .transpose()?
2304 .unwrap_or(vec![]);
2305
2306 let right_sort_exprs = exec
2307 .right_sort_exprs()
2308 .map(|exprs| {
2309 exprs
2310 .iter()
2311 .map(|expr| {
2312 Ok(protobuf::PhysicalSortExprNode {
2313 expr: Some(Box::new(serialize_physical_expr(
2314 &expr.expr,
2315 extension_codec,
2316 )?)),
2317 asc: !expr.options.descending,
2318 nulls_first: expr.options.nulls_first,
2319 })
2320 })
2321 .collect::<Result<Vec<_>>>()
2322 })
2323 .transpose()?
2324 .unwrap_or(vec![]);
2325
2326 Ok(protobuf::PhysicalPlanNode {
2327 physical_plan_type: Some(PhysicalPlanType::SymmetricHashJoin(Box::new(
2328 protobuf::SymmetricHashJoinExecNode {
2329 left: Some(Box::new(left)),
2330 right: Some(Box::new(right)),
2331 on,
2332 join_type: join_type.into(),
2333 partition_mode: partition_mode.into(),
2334 null_equality: null_equality.into(),
2335 left_sort_exprs,
2336 right_sort_exprs,
2337 filter,
2338 },
2339 ))),
2340 })
2341 }
2342
2343 fn try_from_sort_merge_join_exec(
2344 exec: &SortMergeJoinExec,
2345 extension_codec: &dyn PhysicalExtensionCodec,
2346 ) -> Result<Self> {
2347 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2348 exec.left().to_owned(),
2349 extension_codec,
2350 )?;
2351 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2352 exec.right().to_owned(),
2353 extension_codec,
2354 )?;
2355 let on = exec
2356 .on()
2357 .iter()
2358 .map(|tuple| {
2359 let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2360 let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2361 Ok::<_, DataFusionError>(protobuf::JoinOn {
2362 left: Some(l),
2363 right: Some(r),
2364 })
2365 })
2366 .collect::<Result<_>>()?;
2367 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2368 let null_equality: protobuf::NullEquality = exec.null_equality().into();
2369 let filter = exec
2370 .filter()
2371 .as_ref()
2372 .map(|f| {
2373 let expression =
2374 serialize_physical_expr(f.expression(), extension_codec)?;
2375 let column_indices = f
2376 .column_indices()
2377 .iter()
2378 .map(|i| {
2379 let side: protobuf::JoinSide = i.side.to_owned().into();
2380 protobuf::ColumnIndex {
2381 index: i.index as u32,
2382 side: side.into(),
2383 }
2384 })
2385 .collect();
2386 let schema = f.schema().as_ref().try_into()?;
2387 Ok(protobuf::JoinFilter {
2388 expression: Some(expression),
2389 column_indices,
2390 schema: Some(schema),
2391 })
2392 })
2393 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2394
2395 let sort_options = exec
2396 .sort_options()
2397 .iter()
2398 .map(
2399 |SortOptions {
2400 descending,
2401 nulls_first,
2402 }| {
2403 SortExprNode {
2404 expr: None,
2405 asc: !*descending,
2406 nulls_first: *nulls_first,
2407 }
2408 },
2409 )
2410 .collect();
2411
2412 Ok(protobuf::PhysicalPlanNode {
2413 physical_plan_type: Some(PhysicalPlanType::SortMergeJoin(Box::new(
2414 protobuf::SortMergeJoinExecNode {
2415 left: Some(Box::new(left)),
2416 right: Some(Box::new(right)),
2417 on,
2418 join_type: join_type.into(),
2419 null_equality: null_equality.into(),
2420 filter,
2421 sort_options,
2422 },
2423 ))),
2424 })
2425 }
2426
2427 fn try_from_cross_join_exec(
2428 exec: &CrossJoinExec,
2429 extension_codec: &dyn PhysicalExtensionCodec,
2430 ) -> Result<Self> {
2431 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2432 exec.left().to_owned(),
2433 extension_codec,
2434 )?;
2435 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2436 exec.right().to_owned(),
2437 extension_codec,
2438 )?;
2439 Ok(protobuf::PhysicalPlanNode {
2440 physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
2441 protobuf::CrossJoinExecNode {
2442 left: Some(Box::new(left)),
2443 right: Some(Box::new(right)),
2444 },
2445 ))),
2446 })
2447 }
2448
2449 fn try_from_aggregate_exec(
2450 exec: &AggregateExec,
2451 extension_codec: &dyn PhysicalExtensionCodec,
2452 ) -> Result<Self> {
2453 let groups: Vec<bool> = exec
2454 .group_expr()
2455 .groups()
2456 .iter()
2457 .flatten()
2458 .copied()
2459 .collect();
2460
2461 let group_names = exec
2462 .group_expr()
2463 .expr()
2464 .iter()
2465 .map(|expr| expr.1.to_owned())
2466 .collect();
2467
2468 let filter = exec
2469 .filter_expr()
2470 .iter()
2471 .map(|expr| serialize_maybe_filter(expr.to_owned(), extension_codec))
2472 .collect::<Result<Vec<_>>>()?;
2473
2474 let agg = exec
2475 .aggr_expr()
2476 .iter()
2477 .map(|expr| serialize_physical_aggr_expr(expr.to_owned(), extension_codec))
2478 .collect::<Result<Vec<_>>>()?;
2479
2480 let agg_names = exec
2481 .aggr_expr()
2482 .iter()
2483 .map(|expr| expr.name().to_string())
2484 .collect::<Vec<_>>();
2485
2486 let agg_mode = match exec.mode() {
2487 AggregateMode::Partial => protobuf::AggregateMode::Partial,
2488 AggregateMode::Final => protobuf::AggregateMode::Final,
2489 AggregateMode::FinalPartitioned => protobuf::AggregateMode::FinalPartitioned,
2490 AggregateMode::Single => protobuf::AggregateMode::Single,
2491 AggregateMode::SinglePartitioned => {
2492 protobuf::AggregateMode::SinglePartitioned
2493 }
2494 };
2495 let input_schema = exec.input_schema();
2496 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2497 exec.input().to_owned(),
2498 extension_codec,
2499 )?;
2500
2501 let null_expr = exec
2502 .group_expr()
2503 .null_expr()
2504 .iter()
2505 .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2506 .collect::<Result<Vec<_>>>()?;
2507
2508 let group_expr = exec
2509 .group_expr()
2510 .expr()
2511 .iter()
2512 .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2513 .collect::<Result<Vec<_>>>()?;
2514
2515 let limit = exec.limit().map(|value| protobuf::AggLimit {
2516 limit: value as u64,
2517 });
2518
2519 Ok(protobuf::PhysicalPlanNode {
2520 physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
2521 protobuf::AggregateExecNode {
2522 group_expr,
2523 group_expr_name: group_names,
2524 aggr_expr: agg,
2525 filter_expr: filter,
2526 aggr_expr_name: agg_names,
2527 mode: agg_mode as i32,
2528 input: Some(Box::new(input)),
2529 input_schema: Some(input_schema.as_ref().try_into()?),
2530 null_expr,
2531 groups,
2532 limit,
2533 },
2534 ))),
2535 })
2536 }
2537
2538 fn try_from_empty_exec(
2539 empty: &EmptyExec,
2540 _extension_codec: &dyn PhysicalExtensionCodec,
2541 ) -> Result<Self> {
2542 let schema = empty.schema().as_ref().try_into()?;
2543 Ok(protobuf::PhysicalPlanNode {
2544 physical_plan_type: Some(PhysicalPlanType::Empty(protobuf::EmptyExecNode {
2545 schema: Some(schema),
2546 })),
2547 })
2548 }
2549
2550 fn try_from_placeholder_row_exec(
2551 empty: &PlaceholderRowExec,
2552 _extension_codec: &dyn PhysicalExtensionCodec,
2553 ) -> Result<Self> {
2554 let schema = empty.schema().as_ref().try_into()?;
2555 Ok(protobuf::PhysicalPlanNode {
2556 physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
2557 protobuf::PlaceholderRowExecNode {
2558 schema: Some(schema),
2559 },
2560 )),
2561 })
2562 }
2563
2564 fn try_from_coalesce_batches_exec(
2565 coalesce_batches: &CoalesceBatchesExec,
2566 extension_codec: &dyn PhysicalExtensionCodec,
2567 ) -> Result<Self> {
2568 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2569 coalesce_batches.input().to_owned(),
2570 extension_codec,
2571 )?;
2572 Ok(protobuf::PhysicalPlanNode {
2573 physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
2574 protobuf::CoalesceBatchesExecNode {
2575 input: Some(Box::new(input)),
2576 target_batch_size: coalesce_batches.target_batch_size() as u32,
2577 fetch: coalesce_batches.fetch().map(|n| n as u32),
2578 },
2579 ))),
2580 })
2581 }
2582
2583 fn try_from_data_source_exec(
2584 data_source_exec: &DataSourceExec,
2585 extension_codec: &dyn PhysicalExtensionCodec,
2586 ) -> Result<Option<Self>> {
2587 let data_source = data_source_exec.data_source();
2588 if let Some(maybe_csv) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2589 let source = maybe_csv.file_source();
2590 if let Some(csv_config) = source.as_any().downcast_ref::<CsvSource>() {
2591 return Ok(Some(protobuf::PhysicalPlanNode {
2592 physical_plan_type: Some(PhysicalPlanType::CsvScan(
2593 protobuf::CsvScanExecNode {
2594 base_conf: Some(serialize_file_scan_config(
2595 maybe_csv,
2596 extension_codec,
2597 )?),
2598 has_header: csv_config.has_header(),
2599 delimiter: byte_to_string(
2600 csv_config.delimiter(),
2601 "delimiter",
2602 )?,
2603 quote: byte_to_string(csv_config.quote(), "quote")?,
2604 optional_escape: if let Some(escape) = csv_config.escape() {
2605 Some(
2606 protobuf::csv_scan_exec_node::OptionalEscape::Escape(
2607 byte_to_string(escape, "escape")?,
2608 ),
2609 )
2610 } else {
2611 None
2612 },
2613 optional_comment: if let Some(comment) = csv_config.comment()
2614 {
2615 Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
2616 byte_to_string(comment, "comment")?,
2617 ))
2618 } else {
2619 None
2620 },
2621 newlines_in_values: maybe_csv.newlines_in_values(),
2622 },
2623 )),
2624 }));
2625 }
2626 }
2627
2628 if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2629 let source = scan_conf.file_source();
2630 if let Some(_json_source) = source.as_any().downcast_ref::<JsonSource>() {
2631 return Ok(Some(protobuf::PhysicalPlanNode {
2632 physical_plan_type: Some(PhysicalPlanType::JsonScan(
2633 protobuf::JsonScanExecNode {
2634 base_conf: Some(serialize_file_scan_config(
2635 scan_conf,
2636 extension_codec,
2637 )?),
2638 },
2639 )),
2640 }));
2641 }
2642 }
2643
2644 #[cfg(feature = "parquet")]
2645 if let Some((maybe_parquet, conf)) =
2646 data_source_exec.downcast_to_file_source::<ParquetSource>()
2647 {
2648 let predicate = conf
2649 .predicate()
2650 .map(|pred| serialize_physical_expr(pred, extension_codec))
2651 .transpose()?;
2652 return Ok(Some(protobuf::PhysicalPlanNode {
2653 physical_plan_type: Some(PhysicalPlanType::ParquetScan(
2654 protobuf::ParquetScanExecNode {
2655 base_conf: Some(serialize_file_scan_config(
2656 maybe_parquet,
2657 extension_codec,
2658 )?),
2659 predicate,
2660 parquet_options: Some(conf.table_parquet_options().try_into()?),
2661 },
2662 )),
2663 }));
2664 }
2665
2666 #[cfg(feature = "avro")]
2667 if let Some(maybe_avro) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2668 let source = maybe_avro.file_source();
2669 if source.as_any().downcast_ref::<AvroSource>().is_some() {
2670 return Ok(Some(protobuf::PhysicalPlanNode {
2671 physical_plan_type: Some(PhysicalPlanType::AvroScan(
2672 protobuf::AvroScanExecNode {
2673 base_conf: Some(serialize_file_scan_config(
2674 maybe_avro,
2675 extension_codec,
2676 )?),
2677 },
2678 )),
2679 }));
2680 }
2681 }
2682
2683 if let Some(source_conf) =
2684 data_source.as_any().downcast_ref::<MemorySourceConfig>()
2685 {
2686 let proto_partitions = source_conf
2687 .partitions()
2688 .iter()
2689 .map(|p| serialize_record_batches(p))
2690 .collect::<Result<Vec<_>>>()?;
2691
2692 let proto_schema: protobuf::Schema =
2693 source_conf.original_schema().as_ref().try_into()?;
2694
2695 let proto_projection = source_conf
2696 .projection()
2697 .as_ref()
2698 .map_or_else(Vec::new, |v| {
2699 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2700 });
2701
2702 let proto_sort_information = source_conf
2703 .sort_information()
2704 .iter()
2705 .map(|ordering| {
2706 let sort_exprs = serialize_physical_sort_exprs(
2707 ordering.to_owned(),
2708 extension_codec,
2709 )?;
2710 Ok::<_, DataFusionError>(protobuf::PhysicalSortExprNodeCollection {
2711 physical_sort_expr_nodes: sort_exprs,
2712 })
2713 })
2714 .collect::<Result<Vec<_>, _>>()?;
2715
2716 return Ok(Some(protobuf::PhysicalPlanNode {
2717 physical_plan_type: Some(PhysicalPlanType::MemoryScan(
2718 protobuf::MemoryScanExecNode {
2719 partitions: proto_partitions,
2720 schema: Some(proto_schema),
2721 projection: proto_projection,
2722 sort_information: proto_sort_information,
2723 show_sizes: source_conf.show_sizes(),
2724 fetch: source_conf.fetch().map(|f| f as u32),
2725 },
2726 )),
2727 }));
2728 }
2729
2730 Ok(None)
2731 }
2732
2733 fn try_from_coalesce_partitions_exec(
2734 exec: &CoalescePartitionsExec,
2735 extension_codec: &dyn PhysicalExtensionCodec,
2736 ) -> Result<Self> {
2737 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2738 exec.input().to_owned(),
2739 extension_codec,
2740 )?;
2741 Ok(protobuf::PhysicalPlanNode {
2742 physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
2743 protobuf::CoalescePartitionsExecNode {
2744 input: Some(Box::new(input)),
2745 fetch: exec.fetch().map(|f| f as u32),
2746 },
2747 ))),
2748 })
2749 }
2750
2751 fn try_from_repartition_exec(
2752 exec: &RepartitionExec,
2753 extension_codec: &dyn PhysicalExtensionCodec,
2754 ) -> Result<Self> {
2755 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2756 exec.input().to_owned(),
2757 extension_codec,
2758 )?;
2759
2760 let pb_partitioning =
2761 serialize_partitioning(exec.partitioning(), extension_codec)?;
2762
2763 Ok(protobuf::PhysicalPlanNode {
2764 physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
2765 protobuf::RepartitionExecNode {
2766 input: Some(Box::new(input)),
2767 partitioning: Some(pb_partitioning),
2768 },
2769 ))),
2770 })
2771 }
2772
2773 fn try_from_sort_exec(
2774 exec: &SortExec,
2775 extension_codec: &dyn PhysicalExtensionCodec,
2776 ) -> Result<Self> {
2777 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2778 exec.input().to_owned(),
2779 extension_codec,
2780 )?;
2781 let expr = exec
2782 .expr()
2783 .iter()
2784 .map(|expr| {
2785 let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2786 expr: Some(Box::new(serialize_physical_expr(
2787 &expr.expr,
2788 extension_codec,
2789 )?)),
2790 asc: !expr.options.descending,
2791 nulls_first: expr.options.nulls_first,
2792 });
2793 Ok(protobuf::PhysicalExprNode {
2794 expr_type: Some(ExprType::Sort(sort_expr)),
2795 })
2796 })
2797 .collect::<Result<Vec<_>>>()?;
2798 Ok(protobuf::PhysicalPlanNode {
2799 physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
2800 protobuf::SortExecNode {
2801 input: Some(Box::new(input)),
2802 expr,
2803 fetch: match exec.fetch() {
2804 Some(n) => n as i64,
2805 _ => -1,
2806 },
2807 preserve_partitioning: exec.preserve_partitioning(),
2808 },
2809 ))),
2810 })
2811 }
2812
2813 fn try_from_union_exec(
2814 union: &UnionExec,
2815 extension_codec: &dyn PhysicalExtensionCodec,
2816 ) -> Result<Self> {
2817 let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2818 for input in union.inputs() {
2819 inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2820 input.to_owned(),
2821 extension_codec,
2822 )?);
2823 }
2824 Ok(protobuf::PhysicalPlanNode {
2825 physical_plan_type: Some(PhysicalPlanType::Union(protobuf::UnionExecNode {
2826 inputs,
2827 })),
2828 })
2829 }
2830
2831 fn try_from_interleave_exec(
2832 interleave: &InterleaveExec,
2833 extension_codec: &dyn PhysicalExtensionCodec,
2834 ) -> Result<Self> {
2835 let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2836 for input in interleave.inputs() {
2837 inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2838 input.to_owned(),
2839 extension_codec,
2840 )?);
2841 }
2842 Ok(protobuf::PhysicalPlanNode {
2843 physical_plan_type: Some(PhysicalPlanType::Interleave(
2844 protobuf::InterleaveExecNode { inputs },
2845 )),
2846 })
2847 }
2848
2849 fn try_from_sort_preserving_merge_exec(
2850 exec: &SortPreservingMergeExec,
2851 extension_codec: &dyn PhysicalExtensionCodec,
2852 ) -> Result<Self> {
2853 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2854 exec.input().to_owned(),
2855 extension_codec,
2856 )?;
2857 let expr = exec
2858 .expr()
2859 .iter()
2860 .map(|expr| {
2861 let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2862 expr: Some(Box::new(serialize_physical_expr(
2863 &expr.expr,
2864 extension_codec,
2865 )?)),
2866 asc: !expr.options.descending,
2867 nulls_first: expr.options.nulls_first,
2868 });
2869 Ok(protobuf::PhysicalExprNode {
2870 expr_type: Some(ExprType::Sort(sort_expr)),
2871 })
2872 })
2873 .collect::<Result<Vec<_>>>()?;
2874 Ok(protobuf::PhysicalPlanNode {
2875 physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(Box::new(
2876 protobuf::SortPreservingMergeExecNode {
2877 input: Some(Box::new(input)),
2878 expr,
2879 fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
2880 },
2881 ))),
2882 })
2883 }
2884
2885 fn try_from_nested_loop_join_exec(
2886 exec: &NestedLoopJoinExec,
2887 extension_codec: &dyn PhysicalExtensionCodec,
2888 ) -> Result<Self> {
2889 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2890 exec.left().to_owned(),
2891 extension_codec,
2892 )?;
2893 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2894 exec.right().to_owned(),
2895 extension_codec,
2896 )?;
2897
2898 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2899 let filter = exec
2900 .filter()
2901 .as_ref()
2902 .map(|f| {
2903 let expression =
2904 serialize_physical_expr(f.expression(), extension_codec)?;
2905 let column_indices = f
2906 .column_indices()
2907 .iter()
2908 .map(|i| {
2909 let side: protobuf::JoinSide = i.side.to_owned().into();
2910 protobuf::ColumnIndex {
2911 index: i.index as u32,
2912 side: side.into(),
2913 }
2914 })
2915 .collect();
2916 let schema = f.schema().as_ref().try_into()?;
2917 Ok(protobuf::JoinFilter {
2918 expression: Some(expression),
2919 column_indices,
2920 schema: Some(schema),
2921 })
2922 })
2923 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2924
2925 Ok(protobuf::PhysicalPlanNode {
2926 physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
2927 protobuf::NestedLoopJoinExecNode {
2928 left: Some(Box::new(left)),
2929 right: Some(Box::new(right)),
2930 join_type: join_type.into(),
2931 filter,
2932 projection: exec.projection().map_or_else(Vec::new, |v| {
2933 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2934 }),
2935 },
2936 ))),
2937 })
2938 }
2939
2940 fn try_from_window_agg_exec(
2941 exec: &WindowAggExec,
2942 extension_codec: &dyn PhysicalExtensionCodec,
2943 ) -> Result<Self> {
2944 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2945 exec.input().to_owned(),
2946 extension_codec,
2947 )?;
2948
2949 let window_expr = exec
2950 .window_expr()
2951 .iter()
2952 .map(|e| serialize_physical_window_expr(e, extension_codec))
2953 .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2954
2955 let partition_keys = exec
2956 .partition_keys()
2957 .iter()
2958 .map(|e| serialize_physical_expr(e, extension_codec))
2959 .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2960
2961 Ok(protobuf::PhysicalPlanNode {
2962 physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2963 protobuf::WindowAggExecNode {
2964 input: Some(Box::new(input)),
2965 window_expr,
2966 partition_keys,
2967 input_order_mode: None,
2968 },
2969 ))),
2970 })
2971 }
2972
2973 fn try_from_bounded_window_agg_exec(
2974 exec: &BoundedWindowAggExec,
2975 extension_codec: &dyn PhysicalExtensionCodec,
2976 ) -> Result<Self> {
2977 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2978 exec.input().to_owned(),
2979 extension_codec,
2980 )?;
2981
2982 let window_expr = exec
2983 .window_expr()
2984 .iter()
2985 .map(|e| serialize_physical_window_expr(e, extension_codec))
2986 .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2987
2988 let partition_keys = exec
2989 .partition_keys()
2990 .iter()
2991 .map(|e| serialize_physical_expr(e, extension_codec))
2992 .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2993
2994 let input_order_mode = match &exec.input_order_mode {
2995 InputOrderMode::Linear => {
2996 window_agg_exec_node::InputOrderMode::Linear(protobuf::EmptyMessage {})
2997 }
2998 InputOrderMode::PartiallySorted(columns) => {
2999 window_agg_exec_node::InputOrderMode::PartiallySorted(
3000 protobuf::PartiallySortedInputOrderMode {
3001 columns: columns.iter().map(|c| *c as u64).collect(),
3002 },
3003 )
3004 }
3005 InputOrderMode::Sorted => {
3006 window_agg_exec_node::InputOrderMode::Sorted(protobuf::EmptyMessage {})
3007 }
3008 };
3009
3010 Ok(protobuf::PhysicalPlanNode {
3011 physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
3012 protobuf::WindowAggExecNode {
3013 input: Some(Box::new(input)),
3014 window_expr,
3015 partition_keys,
3016 input_order_mode: Some(input_order_mode),
3017 },
3018 ))),
3019 })
3020 }
3021
3022 fn try_from_data_sink_exec(
3023 exec: &DataSinkExec,
3024 extension_codec: &dyn PhysicalExtensionCodec,
3025 ) -> Result<Option<Self>> {
3026 let input: protobuf::PhysicalPlanNode =
3027 protobuf::PhysicalPlanNode::try_from_physical_plan(
3028 exec.input().to_owned(),
3029 extension_codec,
3030 )?;
3031 let sort_order = match exec.sort_order() {
3032 Some(requirements) => {
3033 let expr = requirements
3034 .iter()
3035 .map(|requirement| {
3036 let expr: PhysicalSortExpr = requirement.to_owned().into();
3037 let sort_expr = protobuf::PhysicalSortExprNode {
3038 expr: Some(Box::new(serialize_physical_expr(
3039 &expr.expr,
3040 extension_codec,
3041 )?)),
3042 asc: !expr.options.descending,
3043 nulls_first: expr.options.nulls_first,
3044 };
3045 Ok(sort_expr)
3046 })
3047 .collect::<Result<Vec<_>>>()?;
3048 Some(protobuf::PhysicalSortExprNodeCollection {
3049 physical_sort_expr_nodes: expr,
3050 })
3051 }
3052 None => None,
3053 };
3054
3055 if let Some(sink) = exec.sink().as_any().downcast_ref::<JsonSink>() {
3056 return Ok(Some(protobuf::PhysicalPlanNode {
3057 physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new(
3058 protobuf::JsonSinkExecNode {
3059 input: Some(Box::new(input)),
3060 sink: Some(sink.try_into()?),
3061 sink_schema: Some(exec.schema().as_ref().try_into()?),
3062 sort_order,
3063 },
3064 ))),
3065 }));
3066 }
3067
3068 if let Some(sink) = exec.sink().as_any().downcast_ref::<CsvSink>() {
3069 return Ok(Some(protobuf::PhysicalPlanNode {
3070 physical_plan_type: Some(PhysicalPlanType::CsvSink(Box::new(
3071 protobuf::CsvSinkExecNode {
3072 input: Some(Box::new(input)),
3073 sink: Some(sink.try_into()?),
3074 sink_schema: Some(exec.schema().as_ref().try_into()?),
3075 sort_order,
3076 },
3077 ))),
3078 }));
3079 }
3080
3081 #[cfg(feature = "parquet")]
3082 if let Some(sink) = exec.sink().as_any().downcast_ref::<ParquetSink>() {
3083 return Ok(Some(protobuf::PhysicalPlanNode {
3084 physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new(
3085 protobuf::ParquetSinkExecNode {
3086 input: Some(Box::new(input)),
3087 sink: Some(sink.try_into()?),
3088 sink_schema: Some(exec.schema().as_ref().try_into()?),
3089 sort_order,
3090 },
3091 ))),
3092 }));
3093 }
3094
3095 Ok(None)
3097 }
3098
3099 fn try_from_unnest_exec(
3100 exec: &UnnestExec,
3101 extension_codec: &dyn PhysicalExtensionCodec,
3102 ) -> Result<Self> {
3103 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
3104 exec.input().to_owned(),
3105 extension_codec,
3106 )?;
3107
3108 Ok(protobuf::PhysicalPlanNode {
3109 physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new(
3110 protobuf::UnnestExecNode {
3111 input: Some(Box::new(input)),
3112 schema: Some(exec.schema().try_into()?),
3113 list_type_columns: exec
3114 .list_column_indices()
3115 .iter()
3116 .map(|c| ProtoListUnnest {
3117 index_in_input_schema: c.index_in_input_schema as _,
3118 depth: c.depth as _,
3119 })
3120 .collect(),
3121 struct_type_columns: exec
3122 .struct_column_indices()
3123 .iter()
3124 .map(|c| *c as _)
3125 .collect(),
3126 options: Some(exec.options().into()),
3127 },
3128 ))),
3129 })
3130 }
3131
3132 fn try_from_cooperative_exec(
3133 exec: &CooperativeExec,
3134 extension_codec: &dyn PhysicalExtensionCodec,
3135 ) -> Result<Self> {
3136 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
3137 exec.input().to_owned(),
3138 extension_codec,
3139 )?;
3140
3141 Ok(protobuf::PhysicalPlanNode {
3142 physical_plan_type: Some(PhysicalPlanType::Cooperative(Box::new(
3143 protobuf::CooperativeExecNode {
3144 input: Some(Box::new(input)),
3145 },
3146 ))),
3147 })
3148 }
3149
3150 fn str_to_generate_series_name(name: &str) -> Result<protobuf::GenerateSeriesName> {
3151 match name {
3152 "generate_series" => Ok(protobuf::GenerateSeriesName::GsGenerateSeries),
3153 "range" => Ok(protobuf::GenerateSeriesName::GsRange),
3154 _ => internal_err!("unknown name: {name}"),
3155 }
3156 }
3157
3158 fn try_from_lazy_memory_exec(exec: &LazyMemoryExec) -> Result<Option<Self>> {
3159 let generators = exec.generators();
3160
3161 let [generator] = generators.as_slice() else {
3163 return Ok(None);
3164 };
3165
3166 let generator_guard = generator.read();
3167
3168 if let Some(empty_gen) = generator_guard.as_any().downcast_ref::<Empty>() {
3170 let schema = exec.schema();
3171 let node = protobuf::GenerateSeriesNode {
3172 schema: Some(schema.as_ref().try_into()?),
3173 target_batch_size: 8192, args: Some(protobuf::generate_series_node::Args::ContainsNull(
3175 protobuf::GenerateSeriesArgsContainsNull {
3176 name: Self::str_to_generate_series_name(empty_gen.name())? as i32,
3177 },
3178 )),
3179 };
3180
3181 return Ok(Some(protobuf::PhysicalPlanNode {
3182 physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3183 }));
3184 }
3185
3186 if let Some(int_64) = generator_guard
3187 .as_any()
3188 .downcast_ref::<GenericSeriesState<i64>>()
3189 {
3190 let schema = exec.schema();
3191 let node = protobuf::GenerateSeriesNode {
3192 schema: Some(schema.as_ref().try_into()?),
3193 target_batch_size: int_64.batch_size() as u32,
3194 args: Some(protobuf::generate_series_node::Args::Int64Args(
3195 protobuf::GenerateSeriesArgsInt64 {
3196 start: *int_64.start(),
3197 end: *int_64.end(),
3198 step: *int_64.step(),
3199 include_end: int_64.include_end(),
3200 name: Self::str_to_generate_series_name(int_64.name())? as i32,
3201 },
3202 )),
3203 };
3204
3205 return Ok(Some(protobuf::PhysicalPlanNode {
3206 physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3207 }));
3208 }
3209
3210 if let Some(timestamp_args) = generator_guard
3211 .as_any()
3212 .downcast_ref::<GenericSeriesState<TimestampValue>>()
3213 {
3214 let schema = exec.schema();
3215
3216 let start = timestamp_args.start().value();
3217 let end = timestamp_args.end().value();
3218
3219 let step_value = timestamp_args.step();
3220
3221 let step = Some(datafusion_proto_common::IntervalMonthDayNanoValue {
3222 months: step_value.months,
3223 days: step_value.days,
3224 nanos: step_value.nanoseconds,
3225 });
3226 let include_end = timestamp_args.include_end();
3227 let name = Self::str_to_generate_series_name(timestamp_args.name())? as i32;
3228
3229 let args = match timestamp_args.current().tz_str() {
3230 Some(tz) => protobuf::generate_series_node::Args::TimestampArgs(
3231 protobuf::GenerateSeriesArgsTimestamp {
3232 start,
3233 end,
3234 step,
3235 include_end,
3236 name,
3237 tz: Some(tz.to_string()),
3238 },
3239 ),
3240 None => protobuf::generate_series_node::Args::DateArgs(
3241 protobuf::GenerateSeriesArgsDate {
3242 start,
3243 end,
3244 step,
3245 include_end,
3246 name,
3247 },
3248 ),
3249 };
3250
3251 let node = protobuf::GenerateSeriesNode {
3252 schema: Some(schema.as_ref().try_into()?),
3253 target_batch_size: timestamp_args.batch_size() as u32,
3254 args: Some(args),
3255 };
3256
3257 return Ok(Some(protobuf::PhysicalPlanNode {
3258 physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3259 }));
3260 }
3261
3262 Ok(None)
3263 }
3264}
3265
3266pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
3267 fn try_decode(buf: &[u8]) -> Result<Self>
3268 where
3269 Self: Sized;
3270
3271 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
3272 where
3273 B: BufMut,
3274 Self: Sized;
3275
3276 fn try_into_physical_plan(
3277 &self,
3278 ctx: &SessionContext,
3279 runtime: &RuntimeEnv,
3280 extension_codec: &dyn PhysicalExtensionCodec,
3281 ) -> Result<Arc<dyn ExecutionPlan>>;
3282
3283 fn try_from_physical_plan(
3284 plan: Arc<dyn ExecutionPlan>,
3285 extension_codec: &dyn PhysicalExtensionCodec,
3286 ) -> Result<Self>
3287 where
3288 Self: Sized;
3289}
3290
3291pub trait PhysicalExtensionCodec: Debug + Send + Sync {
3292 fn try_decode(
3293 &self,
3294 buf: &[u8],
3295 inputs: &[Arc<dyn ExecutionPlan>],
3296 registry: &dyn FunctionRegistry,
3297 ) -> Result<Arc<dyn ExecutionPlan>>;
3298
3299 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
3300
3301 fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
3302 not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
3303 }
3304
3305 fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
3306 Ok(())
3307 }
3308
3309 fn try_decode_expr(
3310 &self,
3311 _buf: &[u8],
3312 _inputs: &[Arc<dyn PhysicalExpr>],
3313 ) -> Result<Arc<dyn PhysicalExpr>> {
3314 not_impl_err!("PhysicalExtensionCodec is not provided")
3315 }
3316
3317 fn try_encode_expr(
3318 &self,
3319 _node: &Arc<dyn PhysicalExpr>,
3320 _buf: &mut Vec<u8>,
3321 ) -> Result<()> {
3322 not_impl_err!("PhysicalExtensionCodec is not provided")
3323 }
3324
3325 fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
3326 not_impl_err!(
3327 "PhysicalExtensionCodec is not provided for aggregate function {name}"
3328 )
3329 }
3330
3331 fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
3332 Ok(())
3333 }
3334
3335 fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
3336 not_impl_err!("PhysicalExtensionCodec is not provided for window function {name}")
3337 }
3338
3339 fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
3340 Ok(())
3341 }
3342}
3343
3344#[derive(Debug)]
3345pub struct DefaultPhysicalExtensionCodec {}
3346
3347impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
3348 fn try_decode(
3349 &self,
3350 _buf: &[u8],
3351 _inputs: &[Arc<dyn ExecutionPlan>],
3352 _registry: &dyn FunctionRegistry,
3353 ) -> Result<Arc<dyn ExecutionPlan>> {
3354 not_impl_err!("PhysicalExtensionCodec is not provided")
3355 }
3356
3357 fn try_encode(
3358 &self,
3359 _node: Arc<dyn ExecutionPlan>,
3360 _buf: &mut Vec<u8>,
3361 ) -> Result<()> {
3362 not_impl_err!("PhysicalExtensionCodec is not provided")
3363 }
3364}
3365
3366#[derive(Clone, PartialEq, prost::Message)]
3369struct DataEncoderTuple {
3370 #[prost(uint32, tag = 1)]
3373 pub encoder_position: u32,
3374
3375 #[prost(bytes, tag = 2)]
3376 pub blob: Vec<u8>,
3377}
3378
3379#[derive(Debug)]
3382pub struct ComposedPhysicalExtensionCodec {
3383 codecs: Vec<Arc<dyn PhysicalExtensionCodec>>,
3384}
3385
3386impl ComposedPhysicalExtensionCodec {
3387 pub fn new(codecs: Vec<Arc<dyn PhysicalExtensionCodec>>) -> Self {
3390 Self { codecs }
3391 }
3392
3393 fn decode_protobuf<R>(
3394 &self,
3395 buf: &[u8],
3396 decode: impl FnOnce(&dyn PhysicalExtensionCodec, &[u8]) -> Result<R>,
3397 ) -> Result<R> {
3398 let proto = DataEncoderTuple::decode(buf)
3399 .map_err(|e| DataFusionError::Internal(e.to_string()))?;
3400
3401 let codec = self.codecs.get(proto.encoder_position as usize).ok_or(
3402 DataFusionError::Internal(
3403 "Can't find required codec in codec list".to_owned(),
3404 ),
3405 )?;
3406
3407 decode(codec.as_ref(), &proto.blob)
3408 }
3409
3410 fn encode_protobuf(
3411 &self,
3412 buf: &mut Vec<u8>,
3413 mut encode: impl FnMut(&dyn PhysicalExtensionCodec, &mut Vec<u8>) -> Result<()>,
3414 ) -> Result<()> {
3415 let mut data = vec![];
3416 let mut last_err = None;
3417 let mut encoder_position = None;
3418
3419 for (position, codec) in self.codecs.iter().enumerate() {
3421 match encode(codec.as_ref(), &mut data) {
3422 Ok(_) => {
3423 encoder_position = Some(position as u32);
3424 break;
3425 }
3426 Err(err) => last_err = Some(err),
3427 }
3428 }
3429
3430 let encoder_position = encoder_position.ok_or_else(|| {
3431 last_err.unwrap_or_else(|| {
3432 DataFusionError::NotImplemented(
3433 "Empty list of composed codecs".to_owned(),
3434 )
3435 })
3436 })?;
3437
3438 let proto = DataEncoderTuple {
3440 encoder_position,
3441 blob: data,
3442 };
3443 proto
3444 .encode(buf)
3445 .map_err(|e| DataFusionError::Internal(e.to_string()))
3446 }
3447}
3448
3449impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec {
3450 fn try_decode(
3451 &self,
3452 buf: &[u8],
3453 inputs: &[Arc<dyn ExecutionPlan>],
3454 registry: &dyn FunctionRegistry,
3455 ) -> Result<Arc<dyn ExecutionPlan>> {
3456 self.decode_protobuf(buf, |codec, data| codec.try_decode(data, inputs, registry))
3457 }
3458
3459 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
3460 self.encode_protobuf(buf, |codec, data| codec.try_encode(Arc::clone(&node), data))
3461 }
3462
3463 fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
3464 self.decode_protobuf(buf, |codec, data| codec.try_decode_udf(name, data))
3465 }
3466
3467 fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
3468 self.encode_protobuf(buf, |codec, data| codec.try_encode_udf(node, data))
3469 }
3470
3471 fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
3472 self.decode_protobuf(buf, |codec, data| codec.try_decode_udaf(name, data))
3473 }
3474
3475 fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
3476 self.encode_protobuf(buf, |codec, data| codec.try_encode_udaf(node, data))
3477 }
3478}
3479
3480fn into_physical_plan(
3481 node: &Option<Box<protobuf::PhysicalPlanNode>>,
3482 ctx: &SessionContext,
3483 runtime: &RuntimeEnv,
3484 extension_codec: &dyn PhysicalExtensionCodec,
3485) -> Result<Arc<dyn ExecutionPlan>> {
3486 if let Some(field) = node {
3487 field.try_into_physical_plan(ctx, runtime, extension_codec)
3488 } else {
3489 Err(proto_error("Missing required field in protobuf"))
3490 }
3491}