1use std::fmt::Debug;
19use std::sync::Arc;
20
21use self::from_proto::parse_protobuf_partitioning;
22use self::to_proto::{serialize_partitioning, serialize_physical_expr};
23use crate::common::{byte_to_string, str_to_byte};
24use crate::physical_plan::from_proto::{
25 parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
26 parse_physical_window_expr, parse_protobuf_file_scan_config, parse_record_batches,
27 parse_table_schema_from_proto,
28};
29use crate::physical_plan::to_proto::{
30 serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr,
31 serialize_physical_sort_exprs, serialize_physical_window_expr,
32 serialize_record_batches,
33};
34use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
35use crate::protobuf::physical_expr_node::ExprType;
36use crate::protobuf::physical_plan_node::PhysicalPlanType;
37use crate::protobuf::{
38 self, ListUnnest as ProtoListUnnest, SortExprNode, SortMergeJoinExecNode,
39 proto_error, window_agg_exec_node,
40};
41use crate::{convert_required, into_required};
42
43use arrow::compute::SortOptions;
44use arrow::datatypes::{IntervalMonthDayNanoType, SchemaRef};
45use datafusion_catalog::memory::MemorySourceConfig;
46use datafusion_common::config::CsvOptions;
47use datafusion_common::{
48 DataFusionError, Result, internal_datafusion_err, internal_err, not_impl_err,
49};
50#[cfg(feature = "parquet")]
51use datafusion_datasource::file::FileSource;
52use datafusion_datasource::file_compression_type::FileCompressionType;
53use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
54use datafusion_datasource::sink::DataSinkExec;
55use datafusion_datasource::source::{DataSource, DataSourceExec};
56#[cfg(feature = "avro")]
57use datafusion_datasource_avro::source::AvroSource;
58use datafusion_datasource_csv::file_format::CsvSink;
59use datafusion_datasource_csv::source::CsvSource;
60use datafusion_datasource_json::file_format::JsonSink;
61use datafusion_datasource_json::source::JsonSource;
62#[cfg(feature = "parquet")]
63use datafusion_datasource_parquet::file_format::ParquetSink;
64#[cfg(feature = "parquet")]
65use datafusion_datasource_parquet::source::ParquetSource;
66use datafusion_execution::{FunctionRegistry, TaskContext};
67use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
68use datafusion_functions_table::generate_series::{
69 Empty, GenSeriesArgs, GenerateSeriesTable, GenericSeriesState, TimestampValue,
70};
71use datafusion_physical_expr::aggregate::AggregateExprBuilder;
72use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
73use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
74use datafusion_physical_plan::aggregates::AggregateMode;
75use datafusion_physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
76use datafusion_physical_plan::analyze::AnalyzeExec;
77use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
78use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
79use datafusion_physical_plan::coop::CooperativeExec;
80use datafusion_physical_plan::empty::EmptyExec;
81use datafusion_physical_plan::explain::ExplainExec;
82use datafusion_physical_plan::expressions::PhysicalSortExpr;
83use datafusion_physical_plan::filter::FilterExec;
84use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
85use datafusion_physical_plan::joins::{
86 CrossJoinExec, NestedLoopJoinExec, SortMergeJoinExec, StreamJoinPartitionMode,
87 SymmetricHashJoinExec,
88};
89use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
90use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
91use datafusion_physical_plan::memory::LazyMemoryExec;
92use datafusion_physical_plan::metrics::MetricType;
93use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
94use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
95use datafusion_physical_plan::repartition::RepartitionExec;
96use datafusion_physical_plan::sorts::sort::SortExec;
97use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
98use datafusion_physical_plan::union::{InterleaveExec, UnionExec};
99use datafusion_physical_plan::unnest::{ListUnnest, UnnestExec};
100use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
101use datafusion_physical_plan::{ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr};
102
103use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr;
104use datafusion_physical_plan::async_func::AsyncFuncExec;
105use prost::Message;
106use prost::bytes::BufMut;
107
108pub mod from_proto;
109pub mod to_proto;
110
111impl AsExecutionPlan for protobuf::PhysicalPlanNode {
112 fn try_decode(buf: &[u8]) -> Result<Self>
113 where
114 Self: Sized,
115 {
116 protobuf::PhysicalPlanNode::decode(buf).map_err(|e| {
117 internal_datafusion_err!("failed to decode physical plan: {e:?}")
118 })
119 }
120
121 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
122 where
123 B: BufMut,
124 Self: Sized,
125 {
126 self.encode(buf).map_err(|e| {
127 internal_datafusion_err!("failed to encode physical plan: {e:?}")
128 })
129 }
130
131 fn try_into_physical_plan(
132 &self,
133 ctx: &TaskContext,
134
135 extension_codec: &dyn PhysicalExtensionCodec,
136 ) -> Result<Arc<dyn ExecutionPlan>> {
137 let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
138 proto_error(format!(
139 "physical_plan::from_proto() Unsupported physical plan '{self:?}'"
140 ))
141 })?;
142 match plan {
143 PhysicalPlanType::Explain(explain) => {
144 self.try_into_explain_physical_plan(explain, ctx, extension_codec)
145 }
146 PhysicalPlanType::Projection(projection) => {
147 self.try_into_projection_physical_plan(projection, ctx, extension_codec)
148 }
149 PhysicalPlanType::Filter(filter) => {
150 self.try_into_filter_physical_plan(filter, ctx, extension_codec)
151 }
152 PhysicalPlanType::CsvScan(scan) => {
153 self.try_into_csv_scan_physical_plan(scan, ctx, extension_codec)
154 }
155 PhysicalPlanType::JsonScan(scan) => {
156 self.try_into_json_scan_physical_plan(scan, ctx, extension_codec)
157 }
158 PhysicalPlanType::ParquetScan(scan) => {
159 self.try_into_parquet_scan_physical_plan(scan, ctx, extension_codec)
160 }
161 PhysicalPlanType::AvroScan(scan) => {
162 self.try_into_avro_scan_physical_plan(scan, ctx, extension_codec)
163 }
164 PhysicalPlanType::MemoryScan(scan) => {
165 self.try_into_memory_scan_physical_plan(scan, ctx, extension_codec)
166 }
167 PhysicalPlanType::CoalesceBatches(coalesce_batches) => self
168 .try_into_coalesce_batches_physical_plan(
169 coalesce_batches,
170 ctx,
171 extension_codec,
172 ),
173 PhysicalPlanType::Merge(merge) => {
174 self.try_into_merge_physical_plan(merge, ctx, extension_codec)
175 }
176 PhysicalPlanType::Repartition(repart) => {
177 self.try_into_repartition_physical_plan(repart, ctx, extension_codec)
178 }
179 PhysicalPlanType::GlobalLimit(limit) => {
180 self.try_into_global_limit_physical_plan(limit, ctx, extension_codec)
181 }
182 PhysicalPlanType::LocalLimit(limit) => {
183 self.try_into_local_limit_physical_plan(limit, ctx, extension_codec)
184 }
185 PhysicalPlanType::Window(window_agg) => {
186 self.try_into_window_physical_plan(window_agg, ctx, extension_codec)
187 }
188 PhysicalPlanType::Aggregate(hash_agg) => {
189 self.try_into_aggregate_physical_plan(hash_agg, ctx, extension_codec)
190 }
191 PhysicalPlanType::HashJoin(hashjoin) => {
192 self.try_into_hash_join_physical_plan(hashjoin, ctx, extension_codec)
193 }
194 PhysicalPlanType::SymmetricHashJoin(sym_join) => self
195 .try_into_symmetric_hash_join_physical_plan(
196 sym_join,
197 ctx,
198 extension_codec,
199 ),
200 PhysicalPlanType::Union(union) => {
201 self.try_into_union_physical_plan(union, ctx, extension_codec)
202 }
203 PhysicalPlanType::Interleave(interleave) => {
204 self.try_into_interleave_physical_plan(interleave, ctx, extension_codec)
205 }
206 PhysicalPlanType::CrossJoin(crossjoin) => {
207 self.try_into_cross_join_physical_plan(crossjoin, ctx, extension_codec)
208 }
209 PhysicalPlanType::Empty(empty) => {
210 self.try_into_empty_physical_plan(empty, ctx, extension_codec)
211 }
212 PhysicalPlanType::PlaceholderRow(placeholder) => self
213 .try_into_placeholder_row_physical_plan(
214 placeholder,
215 ctx,
216 extension_codec,
217 ),
218 PhysicalPlanType::Sort(sort) => {
219 self.try_into_sort_physical_plan(sort, ctx, extension_codec)
220 }
221 PhysicalPlanType::SortPreservingMerge(sort) => self
222 .try_into_sort_preserving_merge_physical_plan(sort, ctx, extension_codec),
223 PhysicalPlanType::Extension(extension) => {
224 self.try_into_extension_physical_plan(extension, ctx, extension_codec)
225 }
226 PhysicalPlanType::NestedLoopJoin(join) => {
227 self.try_into_nested_loop_join_physical_plan(join, ctx, extension_codec)
228 }
229 PhysicalPlanType::Analyze(analyze) => {
230 self.try_into_analyze_physical_plan(analyze, ctx, extension_codec)
231 }
232 PhysicalPlanType::JsonSink(sink) => {
233 self.try_into_json_sink_physical_plan(sink, ctx, extension_codec)
234 }
235 PhysicalPlanType::CsvSink(sink) => {
236 self.try_into_csv_sink_physical_plan(sink, ctx, extension_codec)
237 }
238 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
239 PhysicalPlanType::ParquetSink(sink) => {
240 self.try_into_parquet_sink_physical_plan(sink, ctx, extension_codec)
241 }
242 PhysicalPlanType::Unnest(unnest) => {
243 self.try_into_unnest_physical_plan(unnest, ctx, extension_codec)
244 }
245 PhysicalPlanType::Cooperative(cooperative) => {
246 self.try_into_cooperative_physical_plan(cooperative, ctx, extension_codec)
247 }
248 PhysicalPlanType::GenerateSeries(generate_series) => {
249 self.try_into_generate_series_physical_plan(generate_series)
250 }
251 PhysicalPlanType::SortMergeJoin(sort_join) => {
252 self.try_into_sort_join(sort_join, ctx, extension_codec)
253 }
254 PhysicalPlanType::AsyncFunc(async_func) => {
255 self.try_into_async_func_physical_plan(async_func, ctx, extension_codec)
256 }
257 }
258 }
259
260 fn try_from_physical_plan(
261 plan: Arc<dyn ExecutionPlan>,
262 extension_codec: &dyn PhysicalExtensionCodec,
263 ) -> Result<Self>
264 where
265 Self: Sized,
266 {
267 let plan_clone = Arc::clone(&plan);
268 let plan = plan.as_any();
269
270 if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
271 return protobuf::PhysicalPlanNode::try_from_explain_exec(
272 exec,
273 extension_codec,
274 );
275 }
276
277 if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
278 return protobuf::PhysicalPlanNode::try_from_projection_exec(
279 exec,
280 extension_codec,
281 );
282 }
283
284 if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
285 return protobuf::PhysicalPlanNode::try_from_analyze_exec(
286 exec,
287 extension_codec,
288 );
289 }
290
291 if let Some(exec) = plan.downcast_ref::<FilterExec>() {
292 return protobuf::PhysicalPlanNode::try_from_filter_exec(
293 exec,
294 extension_codec,
295 );
296 }
297
298 if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
299 return protobuf::PhysicalPlanNode::try_from_global_limit_exec(
300 limit,
301 extension_codec,
302 );
303 }
304
305 if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
306 return protobuf::PhysicalPlanNode::try_from_local_limit_exec(
307 limit,
308 extension_codec,
309 );
310 }
311
312 if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
313 return protobuf::PhysicalPlanNode::try_from_hash_join_exec(
314 exec,
315 extension_codec,
316 );
317 }
318
319 if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
320 return protobuf::PhysicalPlanNode::try_from_symmetric_hash_join_exec(
321 exec,
322 extension_codec,
323 );
324 }
325
326 if let Some(exec) = plan.downcast_ref::<SortMergeJoinExec>() {
327 return protobuf::PhysicalPlanNode::try_from_sort_merge_join_exec(
328 exec,
329 extension_codec,
330 );
331 }
332
333 if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
334 return protobuf::PhysicalPlanNode::try_from_cross_join_exec(
335 exec,
336 extension_codec,
337 );
338 }
339
340 if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
341 return protobuf::PhysicalPlanNode::try_from_aggregate_exec(
342 exec,
343 extension_codec,
344 );
345 }
346
347 if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
348 return protobuf::PhysicalPlanNode::try_from_empty_exec(
349 empty,
350 extension_codec,
351 );
352 }
353
354 if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
355 return protobuf::PhysicalPlanNode::try_from_placeholder_row_exec(
356 empty,
357 extension_codec,
358 );
359 }
360
361 if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
362 return protobuf::PhysicalPlanNode::try_from_coalesce_batches_exec(
363 coalesce_batches,
364 extension_codec,
365 );
366 }
367
368 if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>()
369 && let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec(
370 data_source_exec,
371 extension_codec,
372 )?
373 {
374 return Ok(node);
375 }
376
377 if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
378 return protobuf::PhysicalPlanNode::try_from_coalesce_partitions_exec(
379 exec,
380 extension_codec,
381 );
382 }
383
384 if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
385 return protobuf::PhysicalPlanNode::try_from_repartition_exec(
386 exec,
387 extension_codec,
388 );
389 }
390
391 if let Some(exec) = plan.downcast_ref::<SortExec>() {
392 return protobuf::PhysicalPlanNode::try_from_sort_exec(exec, extension_codec);
393 }
394
395 if let Some(union) = plan.downcast_ref::<UnionExec>() {
396 return protobuf::PhysicalPlanNode::try_from_union_exec(
397 union,
398 extension_codec,
399 );
400 }
401
402 if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
403 return protobuf::PhysicalPlanNode::try_from_interleave_exec(
404 interleave,
405 extension_codec,
406 );
407 }
408
409 if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
410 return protobuf::PhysicalPlanNode::try_from_sort_preserving_merge_exec(
411 exec,
412 extension_codec,
413 );
414 }
415
416 if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
417 return protobuf::PhysicalPlanNode::try_from_nested_loop_join_exec(
418 exec,
419 extension_codec,
420 );
421 }
422
423 if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
424 return protobuf::PhysicalPlanNode::try_from_window_agg_exec(
425 exec,
426 extension_codec,
427 );
428 }
429
430 if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
431 return protobuf::PhysicalPlanNode::try_from_bounded_window_agg_exec(
432 exec,
433 extension_codec,
434 );
435 }
436
437 if let Some(exec) = plan.downcast_ref::<DataSinkExec>()
438 && let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec(
439 exec,
440 extension_codec,
441 )?
442 {
443 return Ok(node);
444 }
445
446 if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
447 return protobuf::PhysicalPlanNode::try_from_unnest_exec(
448 exec,
449 extension_codec,
450 );
451 }
452
453 if let Some(exec) = plan.downcast_ref::<CooperativeExec>() {
454 return protobuf::PhysicalPlanNode::try_from_cooperative_exec(
455 exec,
456 extension_codec,
457 );
458 }
459
460 if let Some(exec) = plan.downcast_ref::<LazyMemoryExec>()
461 && let Some(node) =
462 protobuf::PhysicalPlanNode::try_from_lazy_memory_exec(exec)?
463 {
464 return Ok(node);
465 }
466
467 if let Some(exec) = plan.downcast_ref::<AsyncFuncExec>() {
468 return protobuf::PhysicalPlanNode::try_from_async_func_exec(
469 exec,
470 extension_codec,
471 );
472 }
473
474 let mut buf: Vec<u8> = vec![];
475 match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
476 Ok(_) => {
477 let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
478 .children()
479 .into_iter()
480 .cloned()
481 .map(|i| {
482 protobuf::PhysicalPlanNode::try_from_physical_plan(
483 i,
484 extension_codec,
485 )
486 })
487 .collect::<Result<_>>()?;
488
489 Ok(protobuf::PhysicalPlanNode {
490 physical_plan_type: Some(PhysicalPlanType::Extension(
491 protobuf::PhysicalExtensionNode { node: buf, inputs },
492 )),
493 })
494 }
495 Err(e) => internal_err!(
496 "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
497 ),
498 }
499 }
500}
501
502impl protobuf::PhysicalPlanNode {
503 fn try_into_explain_physical_plan(
504 &self,
505 explain: &protobuf::ExplainExecNode,
506 _ctx: &TaskContext,
507
508 _extension_codec: &dyn PhysicalExtensionCodec,
509 ) -> Result<Arc<dyn ExecutionPlan>> {
510 Ok(Arc::new(ExplainExec::new(
511 Arc::new(explain.schema.as_ref().unwrap().try_into()?),
512 explain
513 .stringified_plans
514 .iter()
515 .map(|plan| plan.into())
516 .collect(),
517 explain.verbose,
518 )))
519 }
520
521 fn try_into_projection_physical_plan(
522 &self,
523 projection: &protobuf::ProjectionExecNode,
524 ctx: &TaskContext,
525
526 extension_codec: &dyn PhysicalExtensionCodec,
527 ) -> Result<Arc<dyn ExecutionPlan>> {
528 let input: Arc<dyn ExecutionPlan> =
529 into_physical_plan(&projection.input, ctx, extension_codec)?;
530 let exprs = projection
531 .expr
532 .iter()
533 .zip(projection.expr_name.iter())
534 .map(|(expr, name)| {
535 Ok((
536 parse_physical_expr(
537 expr,
538 ctx,
539 input.schema().as_ref(),
540 extension_codec,
541 )?,
542 name.to_string(),
543 ))
544 })
545 .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>>>()?;
546 let proj_exprs: Vec<ProjectionExpr> = exprs
547 .into_iter()
548 .map(|(expr, alias)| ProjectionExpr { expr, alias })
549 .collect();
550 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
551 }
552
553 fn try_into_filter_physical_plan(
554 &self,
555 filter: &protobuf::FilterExecNode,
556 ctx: &TaskContext,
557
558 extension_codec: &dyn PhysicalExtensionCodec,
559 ) -> Result<Arc<dyn ExecutionPlan>> {
560 let input: Arc<dyn ExecutionPlan> =
561 into_physical_plan(&filter.input, ctx, extension_codec)?;
562
563 let predicate = filter
564 .expr
565 .as_ref()
566 .map(|expr| {
567 parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
568 })
569 .transpose()?
570 .ok_or_else(|| {
571 internal_datafusion_err!(
572 "filter (FilterExecNode) in PhysicalPlanNode is missing."
573 )
574 })?;
575
576 let filter_selectivity = filter.default_filter_selectivity.try_into();
577 let projection = if !filter.projection.is_empty() {
578 Some(
579 filter
580 .projection
581 .iter()
582 .map(|i| *i as usize)
583 .collect::<Vec<_>>(),
584 )
585 } else {
586 None
587 };
588
589 let filter =
590 FilterExec::try_new(predicate, input)?.with_projection(projection)?;
591 match filter_selectivity {
592 Ok(filter_selectivity) => Ok(Arc::new(
593 filter.with_default_selectivity(filter_selectivity)?,
594 )),
595 Err(_) => Err(internal_datafusion_err!(
596 "filter_selectivity in PhysicalPlanNode is invalid "
597 )),
598 }
599 }
600
601 fn try_into_csv_scan_physical_plan(
602 &self,
603 scan: &protobuf::CsvScanExecNode,
604 ctx: &TaskContext,
605
606 extension_codec: &dyn PhysicalExtensionCodec,
607 ) -> Result<Arc<dyn ExecutionPlan>> {
608 let escape =
609 if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(escape)) =
610 &scan.optional_escape
611 {
612 Some(str_to_byte(escape, "escape")?)
613 } else {
614 None
615 };
616
617 let comment = if let Some(
618 protobuf::csv_scan_exec_node::OptionalComment::Comment(comment),
619 ) = &scan.optional_comment
620 {
621 Some(str_to_byte(comment, "comment")?)
622 } else {
623 None
624 };
625
626 let table_schema =
628 parse_table_schema_from_proto(scan.base_conf.as_ref().unwrap())?;
629
630 let csv_options = CsvOptions {
631 has_header: Some(scan.has_header),
632 delimiter: str_to_byte(&scan.delimiter, "delimiter")?,
633 quote: str_to_byte(&scan.quote, "quote")?,
634 newlines_in_values: Some(scan.newlines_in_values),
635 ..Default::default()
636 };
637 let source = Arc::new(
638 CsvSource::new(table_schema)
639 .with_csv_options(csv_options)
640 .with_escape(escape)
641 .with_comment(comment),
642 );
643
644 let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config(
645 scan.base_conf.as_ref().unwrap(),
646 ctx,
647 extension_codec,
648 source,
649 )?)
650 .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
651 .build();
652 Ok(DataSourceExec::from_data_source(conf))
653 }
654
655 fn try_into_json_scan_physical_plan(
656 &self,
657 scan: &protobuf::JsonScanExecNode,
658 ctx: &TaskContext,
659
660 extension_codec: &dyn PhysicalExtensionCodec,
661 ) -> Result<Arc<dyn ExecutionPlan>> {
662 let base_conf = scan.base_conf.as_ref().unwrap();
663 let table_schema = parse_table_schema_from_proto(base_conf)?;
664 let scan_conf = parse_protobuf_file_scan_config(
665 base_conf,
666 ctx,
667 extension_codec,
668 Arc::new(JsonSource::new(table_schema)),
669 )?;
670 Ok(DataSourceExec::from_data_source(scan_conf))
671 }
672
673 #[cfg_attr(not(feature = "parquet"), expect(unused_variables))]
674 fn try_into_parquet_scan_physical_plan(
675 &self,
676 scan: &protobuf::ParquetScanExecNode,
677 ctx: &TaskContext,
678 extension_codec: &dyn PhysicalExtensionCodec,
679 ) -> Result<Arc<dyn ExecutionPlan>> {
680 #[cfg(feature = "parquet")]
681 {
682 let schema = from_proto::parse_protobuf_file_scan_schema(
683 scan.base_conf.as_ref().unwrap(),
684 )?;
685
686 let base_conf = scan.base_conf.as_ref().unwrap();
688 let predicate_schema = if !base_conf.projection.is_empty() {
689 let projected_fields: Vec<_> = base_conf
691 .projection
692 .iter()
693 .map(|&i| schema.field(i as usize).clone())
694 .collect();
695 Arc::new(arrow::datatypes::Schema::new(projected_fields))
696 } else {
697 schema
698 };
699
700 let predicate = scan
701 .predicate
702 .as_ref()
703 .map(|expr| {
704 parse_physical_expr(
705 expr,
706 ctx,
707 predicate_schema.as_ref(),
708 extension_codec,
709 )
710 })
711 .transpose()?;
712 let mut options = datafusion_common::config::TableParquetOptions::default();
713
714 if let Some(table_options) = scan.parquet_options.as_ref() {
715 options = table_options.try_into()?;
716 }
717
718 let table_schema = parse_table_schema_from_proto(base_conf)?;
720
721 let mut source =
722 ParquetSource::new(table_schema).with_table_parquet_options(options);
723
724 if let Some(predicate) = predicate {
725 source = source.with_predicate(predicate);
726 }
727 let base_config = parse_protobuf_file_scan_config(
728 base_conf,
729 ctx,
730 extension_codec,
731 Arc::new(source),
732 )?;
733 Ok(DataSourceExec::from_data_source(base_config))
734 }
735 #[cfg(not(feature = "parquet"))]
736 panic!(
737 "Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled"
738 )
739 }
740
741 #[cfg_attr(not(feature = "avro"), expect(unused_variables))]
742 fn try_into_avro_scan_physical_plan(
743 &self,
744 scan: &protobuf::AvroScanExecNode,
745 ctx: &TaskContext,
746 extension_codec: &dyn PhysicalExtensionCodec,
747 ) -> Result<Arc<dyn ExecutionPlan>> {
748 #[cfg(feature = "avro")]
749 {
750 let table_schema =
751 parse_table_schema_from_proto(scan.base_conf.as_ref().unwrap())?;
752 let conf = parse_protobuf_file_scan_config(
753 scan.base_conf.as_ref().unwrap(),
754 ctx,
755 extension_codec,
756 Arc::new(AvroSource::new(table_schema)),
757 )?;
758 Ok(DataSourceExec::from_data_source(conf))
759 }
760
761 #[cfg(not(feature = "avro"))]
762 panic!("Unable to process a Avro PhysicalPlan when `avro` feature is not enabled")
763 }
764
765 fn try_into_memory_scan_physical_plan(
766 &self,
767 scan: &protobuf::MemoryScanExecNode,
768 ctx: &TaskContext,
769
770 extension_codec: &dyn PhysicalExtensionCodec,
771 ) -> Result<Arc<dyn ExecutionPlan>> {
772 let partitions = scan
773 .partitions
774 .iter()
775 .map(|p| parse_record_batches(p))
776 .collect::<Result<Vec<_>>>()?;
777
778 let proto_schema = scan.schema.as_ref().ok_or_else(|| {
779 internal_datafusion_err!("schema in MemoryScanExecNode is missing.")
780 })?;
781 let schema: SchemaRef = SchemaRef::new(proto_schema.try_into()?);
782
783 let projection = if !scan.projection.is_empty() {
784 Some(
785 scan.projection
786 .iter()
787 .map(|i| *i as usize)
788 .collect::<Vec<_>>(),
789 )
790 } else {
791 None
792 };
793
794 let mut sort_information = vec![];
795 for ordering in &scan.sort_information {
796 let sort_exprs = parse_physical_sort_exprs(
797 &ordering.physical_sort_expr_nodes,
798 ctx,
799 &schema,
800 extension_codec,
801 )?;
802 sort_information.extend(LexOrdering::new(sort_exprs));
803 }
804
805 let source = MemorySourceConfig::try_new(&partitions, schema, projection)?
806 .with_limit(scan.fetch.map(|f| f as usize))
807 .with_show_sizes(scan.show_sizes);
808
809 let source = source.try_with_sort_information(sort_information)?;
810
811 Ok(DataSourceExec::from_data_source(source))
812 }
813
814 fn try_into_coalesce_batches_physical_plan(
815 &self,
816 coalesce_batches: &protobuf::CoalesceBatchesExecNode,
817 ctx: &TaskContext,
818
819 extension_codec: &dyn PhysicalExtensionCodec,
820 ) -> Result<Arc<dyn ExecutionPlan>> {
821 let input: Arc<dyn ExecutionPlan> =
822 into_physical_plan(&coalesce_batches.input, ctx, extension_codec)?;
823 Ok(Arc::new(
824 CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize)
825 .with_fetch(coalesce_batches.fetch.map(|f| f as usize)),
826 ))
827 }
828
829 fn try_into_merge_physical_plan(
830 &self,
831 merge: &protobuf::CoalescePartitionsExecNode,
832 ctx: &TaskContext,
833
834 extension_codec: &dyn PhysicalExtensionCodec,
835 ) -> Result<Arc<dyn ExecutionPlan>> {
836 let input: Arc<dyn ExecutionPlan> =
837 into_physical_plan(&merge.input, ctx, extension_codec)?;
838 Ok(Arc::new(
839 CoalescePartitionsExec::new(input)
840 .with_fetch(merge.fetch.map(|f| f as usize)),
841 ))
842 }
843
844 fn try_into_repartition_physical_plan(
845 &self,
846 repart: &protobuf::RepartitionExecNode,
847 ctx: &TaskContext,
848
849 extension_codec: &dyn PhysicalExtensionCodec,
850 ) -> Result<Arc<dyn ExecutionPlan>> {
851 let input: Arc<dyn ExecutionPlan> =
852 into_physical_plan(&repart.input, ctx, extension_codec)?;
853 let partitioning = parse_protobuf_partitioning(
854 repart.partitioning.as_ref(),
855 ctx,
856 input.schema().as_ref(),
857 extension_codec,
858 )?;
859 Ok(Arc::new(RepartitionExec::try_new(
860 input,
861 partitioning.unwrap(),
862 )?))
863 }
864
865 fn try_into_global_limit_physical_plan(
866 &self,
867 limit: &protobuf::GlobalLimitExecNode,
868 ctx: &TaskContext,
869
870 extension_codec: &dyn PhysicalExtensionCodec,
871 ) -> Result<Arc<dyn ExecutionPlan>> {
872 let input: Arc<dyn ExecutionPlan> =
873 into_physical_plan(&limit.input, ctx, extension_codec)?;
874 let fetch = if limit.fetch >= 0 {
875 Some(limit.fetch as usize)
876 } else {
877 None
878 };
879 Ok(Arc::new(GlobalLimitExec::new(
880 input,
881 limit.skip as usize,
882 fetch,
883 )))
884 }
885
886 fn try_into_local_limit_physical_plan(
887 &self,
888 limit: &protobuf::LocalLimitExecNode,
889 ctx: &TaskContext,
890
891 extension_codec: &dyn PhysicalExtensionCodec,
892 ) -> Result<Arc<dyn ExecutionPlan>> {
893 let input: Arc<dyn ExecutionPlan> =
894 into_physical_plan(&limit.input, ctx, extension_codec)?;
895 Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
896 }
897
898 fn try_into_window_physical_plan(
899 &self,
900 window_agg: &protobuf::WindowAggExecNode,
901 ctx: &TaskContext,
902
903 extension_codec: &dyn PhysicalExtensionCodec,
904 ) -> Result<Arc<dyn ExecutionPlan>> {
905 let input: Arc<dyn ExecutionPlan> =
906 into_physical_plan(&window_agg.input, ctx, extension_codec)?;
907 let input_schema = input.schema();
908
909 let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
910 .window_expr
911 .iter()
912 .map(|window_expr| {
913 parse_physical_window_expr(
914 window_expr,
915 ctx,
916 input_schema.as_ref(),
917 extension_codec,
918 )
919 })
920 .collect::<Result<Vec<_>, _>>()?;
921
922 let partition_keys = window_agg
923 .partition_keys
924 .iter()
925 .map(|expr| {
926 parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
927 })
928 .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
929
930 if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() {
931 let input_order_mode = match input_order_mode {
932 window_agg_exec_node::InputOrderMode::Linear(_) => InputOrderMode::Linear,
933 window_agg_exec_node::InputOrderMode::PartiallySorted(
934 protobuf::PartiallySortedInputOrderMode { columns },
935 ) => InputOrderMode::PartiallySorted(
936 columns.iter().map(|c| *c as usize).collect(),
937 ),
938 window_agg_exec_node::InputOrderMode::Sorted(_) => InputOrderMode::Sorted,
939 };
940
941 Ok(Arc::new(BoundedWindowAggExec::try_new(
942 physical_window_expr,
943 input,
944 input_order_mode,
945 !partition_keys.is_empty(),
946 )?))
947 } else {
948 Ok(Arc::new(WindowAggExec::try_new(
949 physical_window_expr,
950 input,
951 !partition_keys.is_empty(),
952 )?))
953 }
954 }
955
956 fn try_into_aggregate_physical_plan(
957 &self,
958 hash_agg: &protobuf::AggregateExecNode,
959 ctx: &TaskContext,
960
961 extension_codec: &dyn PhysicalExtensionCodec,
962 ) -> Result<Arc<dyn ExecutionPlan>> {
963 let input: Arc<dyn ExecutionPlan> =
964 into_physical_plan(&hash_agg.input, ctx, extension_codec)?;
965 let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(|_| {
966 proto_error(format!(
967 "Received a AggregateNode message with unknown AggregateMode {}",
968 hash_agg.mode
969 ))
970 })?;
971 let agg_mode: AggregateMode = match mode {
972 protobuf::AggregateMode::Partial => AggregateMode::Partial,
973 protobuf::AggregateMode::Final => AggregateMode::Final,
974 protobuf::AggregateMode::FinalPartitioned => AggregateMode::FinalPartitioned,
975 protobuf::AggregateMode::Single => AggregateMode::Single,
976 protobuf::AggregateMode::SinglePartitioned => {
977 AggregateMode::SinglePartitioned
978 }
979 };
980
981 let num_expr = hash_agg.group_expr.len();
982
983 let group_expr = hash_agg
984 .group_expr
985 .iter()
986 .zip(hash_agg.group_expr_name.iter())
987 .map(|(expr, name)| {
988 parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
989 .map(|expr| (expr, name.to_string()))
990 })
991 .collect::<Result<Vec<_>, _>>()?;
992
993 let null_expr = hash_agg
994 .null_expr
995 .iter()
996 .zip(hash_agg.group_expr_name.iter())
997 .map(|(expr, name)| {
998 parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
999 .map(|expr| (expr, name.to_string()))
1000 })
1001 .collect::<Result<Vec<_>, _>>()?;
1002
1003 let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
1004 hash_agg
1005 .groups
1006 .chunks(num_expr)
1007 .map(|g| g.to_vec())
1008 .collect::<Vec<Vec<bool>>>()
1009 } else {
1010 vec![]
1011 };
1012
1013 let has_grouping_set = hash_agg.has_grouping_set;
1014
1015 let input_schema = hash_agg.input_schema.as_ref().ok_or_else(|| {
1016 internal_datafusion_err!("input_schema in AggregateNode is missing.")
1017 })?;
1018 let physical_schema: SchemaRef = SchemaRef::new(input_schema.try_into()?);
1019
1020 let physical_filter_expr = hash_agg
1021 .filter_expr
1022 .iter()
1023 .map(|expr| {
1024 expr.expr
1025 .as_ref()
1026 .map(|e| {
1027 parse_physical_expr(e, ctx, &physical_schema, extension_codec)
1028 })
1029 .transpose()
1030 })
1031 .collect::<Result<Vec<_>, _>>()?;
1032
1033 let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> = hash_agg
1034 .aggr_expr
1035 .iter()
1036 .zip(hash_agg.aggr_expr_name.iter())
1037 .map(|(expr, name)| {
1038 let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
1039 proto_error("Unexpected empty aggregate physical expression")
1040 })?;
1041
1042 match expr_type {
1043 ExprType::AggregateExpr(agg_node) => {
1044 let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node
1045 .expr
1046 .iter()
1047 .map(|e| {
1048 parse_physical_expr(
1049 e,
1050 ctx,
1051 &physical_schema,
1052 extension_codec,
1053 )
1054 })
1055 .collect::<Result<Vec<_>>>()?;
1056 let order_bys = agg_node
1057 .ordering_req
1058 .iter()
1059 .map(|e| {
1060 parse_physical_sort_expr(
1061 e,
1062 ctx,
1063 &physical_schema,
1064 extension_codec,
1065 )
1066 })
1067 .collect::<Result<_>>()?;
1068 agg_node
1069 .aggregate_function
1070 .as_ref()
1071 .map(|func| match func {
1072 AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
1073 let agg_udf = match &agg_node.fun_definition {
1074 Some(buf) => extension_codec
1075 .try_decode_udaf(udaf_name, buf)?,
1076 None => ctx.udaf(udaf_name).or_else(|_| {
1077 extension_codec
1078 .try_decode_udaf(udaf_name, &[])
1079 })?,
1080 };
1081
1082 AggregateExprBuilder::new(agg_udf, input_phy_expr)
1083 .schema(Arc::clone(&physical_schema))
1084 .alias(name)
1085 .human_display(agg_node.human_display.clone())
1086 .with_ignore_nulls(agg_node.ignore_nulls)
1087 .with_distinct(agg_node.distinct)
1088 .order_by(order_bys)
1089 .build()
1090 .map(Arc::new)
1091 }
1092 })
1093 .transpose()?
1094 .ok_or_else(|| {
1095 proto_error(
1096 "Invalid AggregateExpr, missing aggregate_function",
1097 )
1098 })
1099 }
1100 _ => internal_err!("Invalid aggregate expression for AggregateExec"),
1101 }
1102 })
1103 .collect::<Result<Vec<_>, _>>()?;
1104
1105 let limit = hash_agg
1106 .limit
1107 .as_ref()
1108 .map(|lit_value| lit_value.limit as usize);
1109
1110 let agg = AggregateExec::try_new(
1111 agg_mode,
1112 PhysicalGroupBy::new(group_expr, null_expr, groups, has_grouping_set),
1113 physical_aggr_expr,
1114 physical_filter_expr,
1115 input,
1116 physical_schema,
1117 )?;
1118
1119 let agg = agg.with_limit(limit);
1120
1121 Ok(Arc::new(agg))
1122 }
1123
1124 fn try_into_hash_join_physical_plan(
1125 &self,
1126 hashjoin: &protobuf::HashJoinExecNode,
1127 ctx: &TaskContext,
1128
1129 extension_codec: &dyn PhysicalExtensionCodec,
1130 ) -> Result<Arc<dyn ExecutionPlan>> {
1131 let left: Arc<dyn ExecutionPlan> =
1132 into_physical_plan(&hashjoin.left, ctx, extension_codec)?;
1133 let right: Arc<dyn ExecutionPlan> =
1134 into_physical_plan(&hashjoin.right, ctx, extension_codec)?;
1135 let left_schema = left.schema();
1136 let right_schema = right.schema();
1137 let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin
1138 .on
1139 .iter()
1140 .map(|col| {
1141 let left = parse_physical_expr(
1142 &col.left.clone().unwrap(),
1143 ctx,
1144 left_schema.as_ref(),
1145 extension_codec,
1146 )?;
1147 let right = parse_physical_expr(
1148 &col.right.clone().unwrap(),
1149 ctx,
1150 right_schema.as_ref(),
1151 extension_codec,
1152 )?;
1153 Ok((left, right))
1154 })
1155 .collect::<Result<_>>()?;
1156 let join_type =
1157 protobuf::JoinType::try_from(hashjoin.join_type).map_err(|_| {
1158 proto_error(format!(
1159 "Received a HashJoinNode message with unknown JoinType {}",
1160 hashjoin.join_type
1161 ))
1162 })?;
1163 let null_equality = protobuf::NullEquality::try_from(hashjoin.null_equality)
1164 .map_err(|_| {
1165 proto_error(format!(
1166 "Received a HashJoinNode message with unknown NullEquality {}",
1167 hashjoin.null_equality
1168 ))
1169 })?;
1170 let filter = hashjoin
1171 .filter
1172 .as_ref()
1173 .map(|f| {
1174 let schema = f
1175 .schema
1176 .as_ref()
1177 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1178 .try_into()?;
1179
1180 let expression = parse_physical_expr(
1181 f.expression.as_ref().ok_or_else(|| {
1182 proto_error("Unexpected empty filter expression")
1183 })?,
1184 ctx, &schema,
1185 extension_codec,
1186 )?;
1187 let column_indices = f.column_indices
1188 .iter()
1189 .map(|i| {
1190 let side = protobuf::JoinSide::try_from(i.side)
1191 .map_err(|_| proto_error(format!(
1192 "Received a HashJoinNode message with JoinSide in Filter {}",
1193 i.side))
1194 )?;
1195
1196 Ok(ColumnIndex {
1197 index: i.index as usize,
1198 side: side.into(),
1199 })
1200 })
1201 .collect::<Result<Vec<_>>>()?;
1202
1203 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1204 })
1205 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1206
1207 let partition_mode = protobuf::PartitionMode::try_from(hashjoin.partition_mode)
1208 .map_err(|_| {
1209 proto_error(format!(
1210 "Received a HashJoinNode message with unknown PartitionMode {}",
1211 hashjoin.partition_mode
1212 ))
1213 })?;
1214 let partition_mode = match partition_mode {
1215 protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
1216 protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
1217 protobuf::PartitionMode::Auto => PartitionMode::Auto,
1218 };
1219 let projection = if !hashjoin.projection.is_empty() {
1220 Some(
1221 hashjoin
1222 .projection
1223 .iter()
1224 .map(|i| *i as usize)
1225 .collect::<Vec<_>>(),
1226 )
1227 } else {
1228 None
1229 };
1230 Ok(Arc::new(HashJoinExec::try_new(
1231 left,
1232 right,
1233 on,
1234 filter,
1235 &join_type.into(),
1236 projection,
1237 partition_mode,
1238 null_equality.into(),
1239 )?))
1240 }
1241
1242 fn try_into_symmetric_hash_join_physical_plan(
1243 &self,
1244 sym_join: &protobuf::SymmetricHashJoinExecNode,
1245 ctx: &TaskContext,
1246
1247 extension_codec: &dyn PhysicalExtensionCodec,
1248 ) -> Result<Arc<dyn ExecutionPlan>> {
1249 let left = into_physical_plan(&sym_join.left, ctx, extension_codec)?;
1250 let right = into_physical_plan(&sym_join.right, ctx, extension_codec)?;
1251 let left_schema = left.schema();
1252 let right_schema = right.schema();
1253 let on = sym_join
1254 .on
1255 .iter()
1256 .map(|col| {
1257 let left = parse_physical_expr(
1258 &col.left.clone().unwrap(),
1259 ctx,
1260 left_schema.as_ref(),
1261 extension_codec,
1262 )?;
1263 let right = parse_physical_expr(
1264 &col.right.clone().unwrap(),
1265 ctx,
1266 right_schema.as_ref(),
1267 extension_codec,
1268 )?;
1269 Ok((left, right))
1270 })
1271 .collect::<Result<_>>()?;
1272 let join_type =
1273 protobuf::JoinType::try_from(sym_join.join_type).map_err(|_| {
1274 proto_error(format!(
1275 "Received a SymmetricHashJoin message with unknown JoinType {}",
1276 sym_join.join_type
1277 ))
1278 })?;
1279 let null_equality = protobuf::NullEquality::try_from(sym_join.null_equality)
1280 .map_err(|_| {
1281 proto_error(format!(
1282 "Received a SymmetricHashJoin message with unknown NullEquality {}",
1283 sym_join.null_equality
1284 ))
1285 })?;
1286 let filter = sym_join
1287 .filter
1288 .as_ref()
1289 .map(|f| {
1290 let schema = f
1291 .schema
1292 .as_ref()
1293 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1294 .try_into()?;
1295
1296 let expression = parse_physical_expr(
1297 f.expression.as_ref().ok_or_else(|| {
1298 proto_error("Unexpected empty filter expression")
1299 })?,
1300 ctx, &schema,
1301 extension_codec,
1302 )?;
1303 let column_indices = f.column_indices
1304 .iter()
1305 .map(|i| {
1306 let side = protobuf::JoinSide::try_from(i.side)
1307 .map_err(|_| proto_error(format!(
1308 "Received a HashJoinNode message with JoinSide in Filter {}",
1309 i.side))
1310 )?;
1311
1312 Ok(ColumnIndex {
1313 index: i.index as usize,
1314 side: side.into(),
1315 })
1316 })
1317 .collect::<Result<_>>()?;
1318
1319 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1320 })
1321 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1322
1323 let left_sort_exprs = parse_physical_sort_exprs(
1324 &sym_join.left_sort_exprs,
1325 ctx,
1326 &left_schema,
1327 extension_codec,
1328 )?;
1329 let left_sort_exprs = LexOrdering::new(left_sort_exprs);
1330
1331 let right_sort_exprs = parse_physical_sort_exprs(
1332 &sym_join.right_sort_exprs,
1333 ctx,
1334 &right_schema,
1335 extension_codec,
1336 )?;
1337 let right_sort_exprs = LexOrdering::new(right_sort_exprs);
1338
1339 let partition_mode = protobuf::StreamPartitionMode::try_from(
1340 sym_join.partition_mode,
1341 )
1342 .map_err(|_| {
1343 proto_error(format!(
1344 "Received a SymmetricHashJoin message with unknown PartitionMode {}",
1345 sym_join.partition_mode
1346 ))
1347 })?;
1348 let partition_mode = match partition_mode {
1349 protobuf::StreamPartitionMode::SinglePartition => {
1350 StreamJoinPartitionMode::SinglePartition
1351 }
1352 protobuf::StreamPartitionMode::PartitionedExec => {
1353 StreamJoinPartitionMode::Partitioned
1354 }
1355 };
1356 SymmetricHashJoinExec::try_new(
1357 left,
1358 right,
1359 on,
1360 filter,
1361 &join_type.into(),
1362 null_equality.into(),
1363 left_sort_exprs,
1364 right_sort_exprs,
1365 partition_mode,
1366 )
1367 .map(|e| Arc::new(e) as _)
1368 }
1369
1370 fn try_into_union_physical_plan(
1371 &self,
1372 union: &protobuf::UnionExecNode,
1373 ctx: &TaskContext,
1374
1375 extension_codec: &dyn PhysicalExtensionCodec,
1376 ) -> Result<Arc<dyn ExecutionPlan>> {
1377 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1378 for input in &union.inputs {
1379 inputs.push(input.try_into_physical_plan(ctx, extension_codec)?);
1380 }
1381 UnionExec::try_new(inputs)
1382 }
1383
1384 fn try_into_interleave_physical_plan(
1385 &self,
1386 interleave: &protobuf::InterleaveExecNode,
1387 ctx: &TaskContext,
1388
1389 extension_codec: &dyn PhysicalExtensionCodec,
1390 ) -> Result<Arc<dyn ExecutionPlan>> {
1391 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1392 for input in &interleave.inputs {
1393 inputs.push(input.try_into_physical_plan(ctx, extension_codec)?);
1394 }
1395 Ok(Arc::new(InterleaveExec::try_new(inputs)?))
1396 }
1397
1398 fn try_into_cross_join_physical_plan(
1399 &self,
1400 crossjoin: &protobuf::CrossJoinExecNode,
1401 ctx: &TaskContext,
1402
1403 extension_codec: &dyn PhysicalExtensionCodec,
1404 ) -> Result<Arc<dyn ExecutionPlan>> {
1405 let left: Arc<dyn ExecutionPlan> =
1406 into_physical_plan(&crossjoin.left, ctx, extension_codec)?;
1407 let right: Arc<dyn ExecutionPlan> =
1408 into_physical_plan(&crossjoin.right, ctx, extension_codec)?;
1409 Ok(Arc::new(CrossJoinExec::new(left, right)))
1410 }
1411
1412 fn try_into_empty_physical_plan(
1413 &self,
1414 empty: &protobuf::EmptyExecNode,
1415 _ctx: &TaskContext,
1416
1417 _extension_codec: &dyn PhysicalExtensionCodec,
1418 ) -> Result<Arc<dyn ExecutionPlan>> {
1419 let schema = Arc::new(convert_required!(empty.schema)?);
1420 Ok(Arc::new(EmptyExec::new(schema)))
1421 }
1422
1423 fn try_into_placeholder_row_physical_plan(
1424 &self,
1425 placeholder: &protobuf::PlaceholderRowExecNode,
1426 _ctx: &TaskContext,
1427
1428 _extension_codec: &dyn PhysicalExtensionCodec,
1429 ) -> Result<Arc<dyn ExecutionPlan>> {
1430 let schema = Arc::new(convert_required!(placeholder.schema)?);
1431 Ok(Arc::new(PlaceholderRowExec::new(schema)))
1432 }
1433
1434 fn try_into_sort_physical_plan(
1435 &self,
1436 sort: &protobuf::SortExecNode,
1437 ctx: &TaskContext,
1438
1439 extension_codec: &dyn PhysicalExtensionCodec,
1440 ) -> Result<Arc<dyn ExecutionPlan>> {
1441 let input = into_physical_plan(&sort.input, ctx, extension_codec)?;
1442 let exprs = sort
1443 .expr
1444 .iter()
1445 .map(|expr| {
1446 let expr = expr.expr_type.as_ref().ok_or_else(|| {
1447 proto_error(format!(
1448 "physical_plan::from_proto() Unexpected expr {self:?}"
1449 ))
1450 })?;
1451 if let ExprType::Sort(sort_expr) = expr {
1452 let expr = sort_expr
1453 .expr
1454 .as_ref()
1455 .ok_or_else(|| {
1456 proto_error(format!(
1457 "physical_plan::from_proto() Unexpected sort expr {self:?}"
1458 ))
1459 })?
1460 .as_ref();
1461 Ok(PhysicalSortExpr {
1462 expr: parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)?,
1463 options: SortOptions {
1464 descending: !sort_expr.asc,
1465 nulls_first: sort_expr.nulls_first,
1466 },
1467 })
1468 } else {
1469 internal_err!(
1470 "physical_plan::from_proto() {self:?}"
1471 )
1472 }
1473 })
1474 .collect::<Result<Vec<_>>>()?;
1475 let Some(ordering) = LexOrdering::new(exprs) else {
1476 return internal_err!("SortExec requires an ordering");
1477 };
1478 let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1479 let new_sort = SortExec::new(ordering, input)
1480 .with_fetch(fetch)
1481 .with_preserve_partitioning(sort.preserve_partitioning);
1482
1483 Ok(Arc::new(new_sort))
1484 }
1485
1486 fn try_into_sort_preserving_merge_physical_plan(
1487 &self,
1488 sort: &protobuf::SortPreservingMergeExecNode,
1489 ctx: &TaskContext,
1490
1491 extension_codec: &dyn PhysicalExtensionCodec,
1492 ) -> Result<Arc<dyn ExecutionPlan>> {
1493 let input = into_physical_plan(&sort.input, ctx, extension_codec)?;
1494 let exprs = sort
1495 .expr
1496 .iter()
1497 .map(|expr| {
1498 let expr = expr.expr_type.as_ref().ok_or_else(|| {
1499 proto_error(format!(
1500 "physical_plan::from_proto() Unexpected expr {self:?}"
1501 ))
1502 })?;
1503 if let ExprType::Sort(sort_expr) = expr {
1504 let expr = sort_expr
1505 .expr
1506 .as_ref()
1507 .ok_or_else(|| {
1508 proto_error(format!(
1509 "physical_plan::from_proto() Unexpected sort expr {self:?}"
1510 ))
1511 })?
1512 .as_ref();
1513 Ok(PhysicalSortExpr {
1514 expr: parse_physical_expr(
1515 expr,
1516 ctx,
1517 input.schema().as_ref(),
1518 extension_codec,
1519 )?,
1520 options: SortOptions {
1521 descending: !sort_expr.asc,
1522 nulls_first: sort_expr.nulls_first,
1523 },
1524 })
1525 } else {
1526 internal_err!("physical_plan::from_proto() {self:?}")
1527 }
1528 })
1529 .collect::<Result<Vec<_>>>()?;
1530 let Some(ordering) = LexOrdering::new(exprs) else {
1531 return internal_err!("SortExec requires an ordering");
1532 };
1533 let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1534 Ok(Arc::new(
1535 SortPreservingMergeExec::new(ordering, input).with_fetch(fetch),
1536 ))
1537 }
1538
1539 fn try_into_extension_physical_plan(
1540 &self,
1541 extension: &protobuf::PhysicalExtensionNode,
1542 ctx: &TaskContext,
1543
1544 extension_codec: &dyn PhysicalExtensionCodec,
1545 ) -> Result<Arc<dyn ExecutionPlan>> {
1546 let inputs: Vec<Arc<dyn ExecutionPlan>> = extension
1547 .inputs
1548 .iter()
1549 .map(|i| i.try_into_physical_plan(ctx, extension_codec))
1550 .collect::<Result<_>>()?;
1551
1552 let extension_node =
1553 extension_codec.try_decode(extension.node.as_slice(), &inputs, ctx)?;
1554
1555 Ok(extension_node)
1556 }
1557
1558 fn try_into_nested_loop_join_physical_plan(
1559 &self,
1560 join: &protobuf::NestedLoopJoinExecNode,
1561 ctx: &TaskContext,
1562
1563 extension_codec: &dyn PhysicalExtensionCodec,
1564 ) -> Result<Arc<dyn ExecutionPlan>> {
1565 let left: Arc<dyn ExecutionPlan> =
1566 into_physical_plan(&join.left, ctx, extension_codec)?;
1567 let right: Arc<dyn ExecutionPlan> =
1568 into_physical_plan(&join.right, ctx, extension_codec)?;
1569 let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| {
1570 proto_error(format!(
1571 "Received a NestedLoopJoinExecNode message with unknown JoinType {}",
1572 join.join_type
1573 ))
1574 })?;
1575 let filter = join
1576 .filter
1577 .as_ref()
1578 .map(|f| {
1579 let schema = f
1580 .schema
1581 .as_ref()
1582 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1583 .try_into()?;
1584
1585 let expression = parse_physical_expr(
1586 f.expression.as_ref().ok_or_else(|| {
1587 proto_error("Unexpected empty filter expression")
1588 })?,
1589 ctx, &schema,
1590 extension_codec,
1591 )?;
1592 let column_indices = f.column_indices
1593 .iter()
1594 .map(|i| {
1595 let side = protobuf::JoinSide::try_from(i.side)
1596 .map_err(|_| proto_error(format!(
1597 "Received a NestedLoopJoinExecNode message with JoinSide in Filter {}",
1598 i.side))
1599 )?;
1600
1601 Ok(ColumnIndex {
1602 index: i.index as usize,
1603 side: side.into(),
1604 })
1605 })
1606 .collect::<Result<Vec<_>>>()?;
1607
1608 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1609 })
1610 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1611
1612 let projection = if !join.projection.is_empty() {
1613 Some(
1614 join.projection
1615 .iter()
1616 .map(|i| *i as usize)
1617 .collect::<Vec<_>>(),
1618 )
1619 } else {
1620 None
1621 };
1622
1623 Ok(Arc::new(NestedLoopJoinExec::try_new(
1624 left,
1625 right,
1626 filter,
1627 &join_type.into(),
1628 projection,
1629 )?))
1630 }
1631
1632 fn try_into_analyze_physical_plan(
1633 &self,
1634 analyze: &protobuf::AnalyzeExecNode,
1635 ctx: &TaskContext,
1636
1637 extension_codec: &dyn PhysicalExtensionCodec,
1638 ) -> Result<Arc<dyn ExecutionPlan>> {
1639 let input: Arc<dyn ExecutionPlan> =
1640 into_physical_plan(&analyze.input, ctx, extension_codec)?;
1641 Ok(Arc::new(AnalyzeExec::new(
1642 analyze.verbose,
1643 analyze.show_statistics,
1644 vec![MetricType::SUMMARY, MetricType::DEV],
1645 input,
1646 Arc::new(convert_required!(analyze.schema)?),
1647 )))
1648 }
1649
1650 fn try_into_json_sink_physical_plan(
1651 &self,
1652 sink: &protobuf::JsonSinkExecNode,
1653 ctx: &TaskContext,
1654
1655 extension_codec: &dyn PhysicalExtensionCodec,
1656 ) -> Result<Arc<dyn ExecutionPlan>> {
1657 let input = into_physical_plan(&sink.input, ctx, extension_codec)?;
1658
1659 let data_sink: JsonSink = sink
1660 .sink
1661 .as_ref()
1662 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1663 .try_into()?;
1664 let sink_schema = input.schema();
1665 let sort_order = sink
1666 .sort_order
1667 .as_ref()
1668 .map(|collection| {
1669 parse_physical_sort_exprs(
1670 &collection.physical_sort_expr_nodes,
1671 ctx,
1672 &sink_schema,
1673 extension_codec,
1674 )
1675 .map(|sort_exprs| {
1676 LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1677 })
1678 })
1679 .transpose()?
1680 .flatten();
1681 Ok(Arc::new(DataSinkExec::new(
1682 input,
1683 Arc::new(data_sink),
1684 sort_order,
1685 )))
1686 }
1687
1688 fn try_into_csv_sink_physical_plan(
1689 &self,
1690 sink: &protobuf::CsvSinkExecNode,
1691 ctx: &TaskContext,
1692
1693 extension_codec: &dyn PhysicalExtensionCodec,
1694 ) -> Result<Arc<dyn ExecutionPlan>> {
1695 let input = into_physical_plan(&sink.input, ctx, extension_codec)?;
1696
1697 let data_sink: CsvSink = sink
1698 .sink
1699 .as_ref()
1700 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1701 .try_into()?;
1702 let sink_schema = input.schema();
1703 let sort_order = sink
1704 .sort_order
1705 .as_ref()
1706 .map(|collection| {
1707 parse_physical_sort_exprs(
1708 &collection.physical_sort_expr_nodes,
1709 ctx,
1710 &sink_schema,
1711 extension_codec,
1712 )
1713 .map(|sort_exprs| {
1714 LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1715 })
1716 })
1717 .transpose()?
1718 .flatten();
1719 Ok(Arc::new(DataSinkExec::new(
1720 input,
1721 Arc::new(data_sink),
1722 sort_order,
1723 )))
1724 }
1725
1726 #[cfg_attr(not(feature = "parquet"), expect(unused_variables))]
1727 fn try_into_parquet_sink_physical_plan(
1728 &self,
1729 sink: &protobuf::ParquetSinkExecNode,
1730 ctx: &TaskContext,
1731
1732 extension_codec: &dyn PhysicalExtensionCodec,
1733 ) -> Result<Arc<dyn ExecutionPlan>> {
1734 #[cfg(feature = "parquet")]
1735 {
1736 let input = into_physical_plan(&sink.input, ctx, extension_codec)?;
1737
1738 let data_sink: ParquetSink = sink
1739 .sink
1740 .as_ref()
1741 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1742 .try_into()?;
1743 let sink_schema = input.schema();
1744 let sort_order = sink
1745 .sort_order
1746 .as_ref()
1747 .map(|collection| {
1748 parse_physical_sort_exprs(
1749 &collection.physical_sort_expr_nodes,
1750 ctx,
1751 &sink_schema,
1752 extension_codec,
1753 )
1754 .map(|sort_exprs| {
1755 LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1756 })
1757 })
1758 .transpose()?
1759 .flatten();
1760 Ok(Arc::new(DataSinkExec::new(
1761 input,
1762 Arc::new(data_sink),
1763 sort_order,
1764 )))
1765 }
1766 #[cfg(not(feature = "parquet"))]
1767 panic!("Trying to use ParquetSink without `parquet` feature enabled");
1768 }
1769
1770 fn try_into_unnest_physical_plan(
1771 &self,
1772 unnest: &protobuf::UnnestExecNode,
1773 ctx: &TaskContext,
1774
1775 extension_codec: &dyn PhysicalExtensionCodec,
1776 ) -> Result<Arc<dyn ExecutionPlan>> {
1777 let input = into_physical_plan(&unnest.input, ctx, extension_codec)?;
1778
1779 Ok(Arc::new(UnnestExec::new(
1780 input,
1781 unnest
1782 .list_type_columns
1783 .iter()
1784 .map(|c| ListUnnest {
1785 index_in_input_schema: c.index_in_input_schema as _,
1786 depth: c.depth as _,
1787 })
1788 .collect(),
1789 unnest.struct_type_columns.iter().map(|c| *c as _).collect(),
1790 Arc::new(convert_required!(unnest.schema)?),
1791 into_required!(unnest.options)?,
1792 )?))
1793 }
1794
1795 fn generate_series_name_to_str(name: protobuf::GenerateSeriesName) -> &'static str {
1796 match name {
1797 protobuf::GenerateSeriesName::GsGenerateSeries => "generate_series",
1798 protobuf::GenerateSeriesName::GsRange => "range",
1799 }
1800 }
1801 fn try_into_sort_join(
1802 &self,
1803 sort_join: &SortMergeJoinExecNode,
1804 ctx: &TaskContext,
1805
1806 extension_codec: &dyn PhysicalExtensionCodec,
1807 ) -> Result<Arc<dyn ExecutionPlan>> {
1808 let left = into_physical_plan(&sort_join.left, ctx, extension_codec)?;
1809 let left_schema = left.schema();
1810 let right = into_physical_plan(&sort_join.right, ctx, extension_codec)?;
1811 let right_schema = right.schema();
1812
1813 let filter = sort_join
1814 .filter
1815 .as_ref()
1816 .map(|f| {
1817 let schema = f
1818 .schema
1819 .as_ref()
1820 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1821 .try_into()?;
1822
1823 let expression = parse_physical_expr(
1824 f.expression.as_ref().ok_or_else(|| {
1825 proto_error("Unexpected empty filter expression")
1826 })?,
1827 ctx,
1828 &schema,
1829 extension_codec,
1830 )?;
1831 let column_indices = f
1832 .column_indices
1833 .iter()
1834 .map(|i| {
1835 let side =
1836 protobuf::JoinSide::try_from(i.side).map_err(|_| {
1837 proto_error(format!(
1838 "Received a SortMergeJoinExecNode message with JoinSide in Filter {}",
1839 i.side
1840 ))
1841 })?;
1842
1843 Ok(ColumnIndex {
1844 index: i.index as usize,
1845 side: side.into(),
1846 })
1847 })
1848 .collect::<Result<Vec<_>>>()?;
1849
1850 Ok(JoinFilter::new(
1851 expression,
1852 column_indices,
1853 Arc::new(schema),
1854 ))
1855 })
1856 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1857
1858 let join_type =
1859 protobuf::JoinType::try_from(sort_join.join_type).map_err(|_| {
1860 proto_error(format!(
1861 "Received a SortMergeJoinExecNode message with unknown JoinType {}",
1862 sort_join.join_type
1863 ))
1864 })?;
1865
1866 let null_equality = protobuf::NullEquality::try_from(sort_join.null_equality)
1867 .map_err(|_| {
1868 proto_error(format!(
1869 "Received a SortMergeJoinExecNode message with unknown NullEquality {}",
1870 sort_join.null_equality
1871 ))
1872 })?;
1873
1874 let sort_options = sort_join
1875 .sort_options
1876 .iter()
1877 .map(|e| SortOptions {
1878 descending: !e.asc,
1879 nulls_first: e.nulls_first,
1880 })
1881 .collect();
1882 let on = sort_join
1883 .on
1884 .iter()
1885 .map(|col| {
1886 let left = parse_physical_expr(
1887 &col.left.clone().unwrap(),
1888 ctx,
1889 left_schema.as_ref(),
1890 extension_codec,
1891 )?;
1892 let right = parse_physical_expr(
1893 &col.right.clone().unwrap(),
1894 ctx,
1895 right_schema.as_ref(),
1896 extension_codec,
1897 )?;
1898 Ok((left, right))
1899 })
1900 .collect::<Result<_>>()?;
1901
1902 Ok(Arc::new(SortMergeJoinExec::try_new(
1903 left,
1904 right,
1905 on,
1906 filter,
1907 join_type.into(),
1908 sort_options,
1909 null_equality.into(),
1910 )?))
1911 }
1912
1913 fn try_into_generate_series_physical_plan(
1914 &self,
1915 generate_series: &protobuf::GenerateSeriesNode,
1916 ) -> Result<Arc<dyn ExecutionPlan>> {
1917 let schema: SchemaRef = Arc::new(convert_required!(generate_series.schema)?);
1918
1919 let args = match &generate_series.args {
1920 Some(protobuf::generate_series_node::Args::ContainsNull(args)) => {
1921 GenSeriesArgs::ContainsNull {
1922 name: Self::generate_series_name_to_str(args.name()),
1923 }
1924 }
1925 Some(protobuf::generate_series_node::Args::Int64Args(args)) => {
1926 GenSeriesArgs::Int64Args {
1927 start: args.start,
1928 end: args.end,
1929 step: args.step,
1930 include_end: args.include_end,
1931 name: Self::generate_series_name_to_str(args.name()),
1932 }
1933 }
1934 Some(protobuf::generate_series_node::Args::TimestampArgs(args)) => {
1935 let step_proto = args.step.as_ref().ok_or_else(|| {
1936 internal_datafusion_err!("Missing step in TimestampArgs")
1937 })?;
1938 let step = IntervalMonthDayNanoType::make_value(
1939 step_proto.months,
1940 step_proto.days,
1941 step_proto.nanos,
1942 );
1943 GenSeriesArgs::TimestampArgs {
1944 start: args.start,
1945 end: args.end,
1946 step,
1947 tz: args.tz.as_ref().map(|s| Arc::from(s.as_str())),
1948 include_end: args.include_end,
1949 name: Self::generate_series_name_to_str(args.name()),
1950 }
1951 }
1952 Some(protobuf::generate_series_node::Args::DateArgs(args)) => {
1953 let step_proto = args.step.as_ref().ok_or_else(|| {
1954 internal_datafusion_err!("Missing step in DateArgs")
1955 })?;
1956 let step = IntervalMonthDayNanoType::make_value(
1957 step_proto.months,
1958 step_proto.days,
1959 step_proto.nanos,
1960 );
1961 GenSeriesArgs::DateArgs {
1962 start: args.start,
1963 end: args.end,
1964 step,
1965 include_end: args.include_end,
1966 name: Self::generate_series_name_to_str(args.name()),
1967 }
1968 }
1969 None => return internal_err!("Missing args in GenerateSeriesNode"),
1970 };
1971
1972 let table = GenerateSeriesTable::new(Arc::clone(&schema), args);
1973 let generator = table.as_generator(generate_series.target_batch_size as usize)?;
1974
1975 Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?))
1976 }
1977
1978 fn try_into_cooperative_physical_plan(
1979 &self,
1980 field_stream: &protobuf::CooperativeExecNode,
1981 ctx: &TaskContext,
1982
1983 extension_codec: &dyn PhysicalExtensionCodec,
1984 ) -> Result<Arc<dyn ExecutionPlan>> {
1985 let input = into_physical_plan(&field_stream.input, ctx, extension_codec)?;
1986 Ok(Arc::new(CooperativeExec::new(input)))
1987 }
1988
1989 fn try_into_async_func_physical_plan(
1990 &self,
1991 async_func: &protobuf::AsyncFuncExecNode,
1992 ctx: &TaskContext,
1993 extension_codec: &dyn PhysicalExtensionCodec,
1994 ) -> Result<Arc<dyn ExecutionPlan>> {
1995 let input: Arc<dyn ExecutionPlan> =
1996 into_physical_plan(&async_func.input, ctx, extension_codec)?;
1997
1998 if async_func.async_exprs.len() != async_func.async_expr_names.len() {
1999 return internal_err!(
2000 "AsyncFuncExecNode async_exprs length does not match async_expr_names"
2001 );
2002 }
2003
2004 let async_exprs = async_func
2005 .async_exprs
2006 .iter()
2007 .zip(async_func.async_expr_names.iter())
2008 .map(|(expr, name)| {
2009 let physical_expr = parse_physical_expr(
2010 expr,
2011 ctx,
2012 input.schema().as_ref(),
2013 extension_codec,
2014 )?;
2015
2016 Ok(Arc::new(AsyncFuncExpr::try_new(
2017 name.clone(),
2018 physical_expr,
2019 input.schema().as_ref(),
2020 )?))
2021 })
2022 .collect::<Result<Vec<_>>>()?;
2023
2024 Ok(Arc::new(AsyncFuncExec::try_new(async_exprs, input)?))
2025 }
2026
2027 fn try_from_explain_exec(
2028 exec: &ExplainExec,
2029 _extension_codec: &dyn PhysicalExtensionCodec,
2030 ) -> Result<Self> {
2031 Ok(protobuf::PhysicalPlanNode {
2032 physical_plan_type: Some(PhysicalPlanType::Explain(
2033 protobuf::ExplainExecNode {
2034 schema: Some(exec.schema().as_ref().try_into()?),
2035 stringified_plans: exec
2036 .stringified_plans()
2037 .iter()
2038 .map(|plan| plan.into())
2039 .collect(),
2040 verbose: exec.verbose(),
2041 },
2042 )),
2043 })
2044 }
2045
2046 fn try_from_projection_exec(
2047 exec: &ProjectionExec,
2048 extension_codec: &dyn PhysicalExtensionCodec,
2049 ) -> Result<Self> {
2050 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2051 exec.input().to_owned(),
2052 extension_codec,
2053 )?;
2054 let expr = exec
2055 .expr()
2056 .iter()
2057 .map(|proj_expr| serialize_physical_expr(&proj_expr.expr, extension_codec))
2058 .collect::<Result<Vec<_>>>()?;
2059 let expr_name = exec
2060 .expr()
2061 .iter()
2062 .map(|proj_expr| proj_expr.alias.clone())
2063 .collect();
2064 Ok(protobuf::PhysicalPlanNode {
2065 physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
2066 protobuf::ProjectionExecNode {
2067 input: Some(Box::new(input)),
2068 expr,
2069 expr_name,
2070 },
2071 ))),
2072 })
2073 }
2074
2075 fn try_from_analyze_exec(
2076 exec: &AnalyzeExec,
2077 extension_codec: &dyn PhysicalExtensionCodec,
2078 ) -> Result<Self> {
2079 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2080 exec.input().to_owned(),
2081 extension_codec,
2082 )?;
2083 Ok(protobuf::PhysicalPlanNode {
2084 physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
2085 protobuf::AnalyzeExecNode {
2086 verbose: exec.verbose(),
2087 show_statistics: exec.show_statistics(),
2088 input: Some(Box::new(input)),
2089 schema: Some(exec.schema().as_ref().try_into()?),
2090 },
2091 ))),
2092 })
2093 }
2094
2095 fn try_from_filter_exec(
2096 exec: &FilterExec,
2097 extension_codec: &dyn PhysicalExtensionCodec,
2098 ) -> Result<Self> {
2099 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2100 exec.input().to_owned(),
2101 extension_codec,
2102 )?;
2103 Ok(protobuf::PhysicalPlanNode {
2104 physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
2105 protobuf::FilterExecNode {
2106 input: Some(Box::new(input)),
2107 expr: Some(serialize_physical_expr(
2108 exec.predicate(),
2109 extension_codec,
2110 )?),
2111 default_filter_selectivity: exec.default_selectivity() as u32,
2112 projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
2113 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2114 }),
2115 },
2116 ))),
2117 })
2118 }
2119
2120 fn try_from_global_limit_exec(
2121 limit: &GlobalLimitExec,
2122 extension_codec: &dyn PhysicalExtensionCodec,
2123 ) -> Result<Self> {
2124 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2125 limit.input().to_owned(),
2126 extension_codec,
2127 )?;
2128
2129 Ok(protobuf::PhysicalPlanNode {
2130 physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
2131 protobuf::GlobalLimitExecNode {
2132 input: Some(Box::new(input)),
2133 skip: limit.skip() as u32,
2134 fetch: match limit.fetch() {
2135 Some(n) => n as i64,
2136 _ => -1, },
2138 },
2139 ))),
2140 })
2141 }
2142
2143 fn try_from_local_limit_exec(
2144 limit: &LocalLimitExec,
2145 extension_codec: &dyn PhysicalExtensionCodec,
2146 ) -> Result<Self> {
2147 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2148 limit.input().to_owned(),
2149 extension_codec,
2150 )?;
2151 Ok(protobuf::PhysicalPlanNode {
2152 physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
2153 protobuf::LocalLimitExecNode {
2154 input: Some(Box::new(input)),
2155 fetch: limit.fetch() as u32,
2156 },
2157 ))),
2158 })
2159 }
2160
2161 fn try_from_hash_join_exec(
2162 exec: &HashJoinExec,
2163 extension_codec: &dyn PhysicalExtensionCodec,
2164 ) -> Result<Self> {
2165 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2166 exec.left().to_owned(),
2167 extension_codec,
2168 )?;
2169 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2170 exec.right().to_owned(),
2171 extension_codec,
2172 )?;
2173 let on: Vec<protobuf::JoinOn> = exec
2174 .on()
2175 .iter()
2176 .map(|tuple| {
2177 let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2178 let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2179 Ok::<_, DataFusionError>(protobuf::JoinOn {
2180 left: Some(l),
2181 right: Some(r),
2182 })
2183 })
2184 .collect::<Result<_>>()?;
2185 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2186 let null_equality: protobuf::NullEquality = exec.null_equality().into();
2187 let filter = exec
2188 .filter()
2189 .as_ref()
2190 .map(|f| {
2191 let expression =
2192 serialize_physical_expr(f.expression(), extension_codec)?;
2193 let column_indices = f
2194 .column_indices()
2195 .iter()
2196 .map(|i| {
2197 let side: protobuf::JoinSide = i.side.to_owned().into();
2198 protobuf::ColumnIndex {
2199 index: i.index as u32,
2200 side: side.into(),
2201 }
2202 })
2203 .collect();
2204 let schema = f.schema().as_ref().try_into()?;
2205 Ok(protobuf::JoinFilter {
2206 expression: Some(expression),
2207 column_indices,
2208 schema: Some(schema),
2209 })
2210 })
2211 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2212
2213 let partition_mode = match exec.partition_mode() {
2214 PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
2215 PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
2216 PartitionMode::Auto => protobuf::PartitionMode::Auto,
2217 };
2218
2219 Ok(protobuf::PhysicalPlanNode {
2220 physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
2221 protobuf::HashJoinExecNode {
2222 left: Some(Box::new(left)),
2223 right: Some(Box::new(right)),
2224 on,
2225 join_type: join_type.into(),
2226 partition_mode: partition_mode.into(),
2227 null_equality: null_equality.into(),
2228 filter,
2229 projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
2230 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2231 }),
2232 },
2233 ))),
2234 })
2235 }
2236
2237 fn try_from_symmetric_hash_join_exec(
2238 exec: &SymmetricHashJoinExec,
2239 extension_codec: &dyn PhysicalExtensionCodec,
2240 ) -> Result<Self> {
2241 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2242 exec.left().to_owned(),
2243 extension_codec,
2244 )?;
2245 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2246 exec.right().to_owned(),
2247 extension_codec,
2248 )?;
2249 let on = exec
2250 .on()
2251 .iter()
2252 .map(|tuple| {
2253 let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2254 let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2255 Ok::<_, DataFusionError>(protobuf::JoinOn {
2256 left: Some(l),
2257 right: Some(r),
2258 })
2259 })
2260 .collect::<Result<_>>()?;
2261 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2262 let null_equality: protobuf::NullEquality = exec.null_equality().into();
2263 let filter = exec
2264 .filter()
2265 .as_ref()
2266 .map(|f| {
2267 let expression =
2268 serialize_physical_expr(f.expression(), extension_codec)?;
2269 let column_indices = f
2270 .column_indices()
2271 .iter()
2272 .map(|i| {
2273 let side: protobuf::JoinSide = i.side.to_owned().into();
2274 protobuf::ColumnIndex {
2275 index: i.index as u32,
2276 side: side.into(),
2277 }
2278 })
2279 .collect();
2280 let schema = f.schema().as_ref().try_into()?;
2281 Ok(protobuf::JoinFilter {
2282 expression: Some(expression),
2283 column_indices,
2284 schema: Some(schema),
2285 })
2286 })
2287 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2288
2289 let partition_mode = match exec.partition_mode() {
2290 StreamJoinPartitionMode::SinglePartition => {
2291 protobuf::StreamPartitionMode::SinglePartition
2292 }
2293 StreamJoinPartitionMode::Partitioned => {
2294 protobuf::StreamPartitionMode::PartitionedExec
2295 }
2296 };
2297
2298 let left_sort_exprs = exec
2299 .left_sort_exprs()
2300 .map(|exprs| {
2301 exprs
2302 .iter()
2303 .map(|expr| {
2304 Ok(protobuf::PhysicalSortExprNode {
2305 expr: Some(Box::new(serialize_physical_expr(
2306 &expr.expr,
2307 extension_codec,
2308 )?)),
2309 asc: !expr.options.descending,
2310 nulls_first: expr.options.nulls_first,
2311 })
2312 })
2313 .collect::<Result<Vec<_>>>()
2314 })
2315 .transpose()?
2316 .unwrap_or(vec![]);
2317
2318 let right_sort_exprs = exec
2319 .right_sort_exprs()
2320 .map(|exprs| {
2321 exprs
2322 .iter()
2323 .map(|expr| {
2324 Ok(protobuf::PhysicalSortExprNode {
2325 expr: Some(Box::new(serialize_physical_expr(
2326 &expr.expr,
2327 extension_codec,
2328 )?)),
2329 asc: !expr.options.descending,
2330 nulls_first: expr.options.nulls_first,
2331 })
2332 })
2333 .collect::<Result<Vec<_>>>()
2334 })
2335 .transpose()?
2336 .unwrap_or(vec![]);
2337
2338 Ok(protobuf::PhysicalPlanNode {
2339 physical_plan_type: Some(PhysicalPlanType::SymmetricHashJoin(Box::new(
2340 protobuf::SymmetricHashJoinExecNode {
2341 left: Some(Box::new(left)),
2342 right: Some(Box::new(right)),
2343 on,
2344 join_type: join_type.into(),
2345 partition_mode: partition_mode.into(),
2346 null_equality: null_equality.into(),
2347 left_sort_exprs,
2348 right_sort_exprs,
2349 filter,
2350 },
2351 ))),
2352 })
2353 }
2354
2355 fn try_from_sort_merge_join_exec(
2356 exec: &SortMergeJoinExec,
2357 extension_codec: &dyn PhysicalExtensionCodec,
2358 ) -> Result<Self> {
2359 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2360 exec.left().to_owned(),
2361 extension_codec,
2362 )?;
2363 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2364 exec.right().to_owned(),
2365 extension_codec,
2366 )?;
2367 let on = exec
2368 .on()
2369 .iter()
2370 .map(|tuple| {
2371 let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2372 let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2373 Ok::<_, DataFusionError>(protobuf::JoinOn {
2374 left: Some(l),
2375 right: Some(r),
2376 })
2377 })
2378 .collect::<Result<_>>()?;
2379 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2380 let null_equality: protobuf::NullEquality = exec.null_equality().into();
2381 let filter = exec
2382 .filter()
2383 .as_ref()
2384 .map(|f| {
2385 let expression =
2386 serialize_physical_expr(f.expression(), extension_codec)?;
2387 let column_indices = f
2388 .column_indices()
2389 .iter()
2390 .map(|i| {
2391 let side: protobuf::JoinSide = i.side.to_owned().into();
2392 protobuf::ColumnIndex {
2393 index: i.index as u32,
2394 side: side.into(),
2395 }
2396 })
2397 .collect();
2398 let schema = f.schema().as_ref().try_into()?;
2399 Ok(protobuf::JoinFilter {
2400 expression: Some(expression),
2401 column_indices,
2402 schema: Some(schema),
2403 })
2404 })
2405 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2406
2407 let sort_options = exec
2408 .sort_options()
2409 .iter()
2410 .map(
2411 |SortOptions {
2412 descending,
2413 nulls_first,
2414 }| {
2415 SortExprNode {
2416 expr: None,
2417 asc: !*descending,
2418 nulls_first: *nulls_first,
2419 }
2420 },
2421 )
2422 .collect();
2423
2424 Ok(protobuf::PhysicalPlanNode {
2425 physical_plan_type: Some(PhysicalPlanType::SortMergeJoin(Box::new(
2426 protobuf::SortMergeJoinExecNode {
2427 left: Some(Box::new(left)),
2428 right: Some(Box::new(right)),
2429 on,
2430 join_type: join_type.into(),
2431 null_equality: null_equality.into(),
2432 filter,
2433 sort_options,
2434 },
2435 ))),
2436 })
2437 }
2438
2439 fn try_from_cross_join_exec(
2440 exec: &CrossJoinExec,
2441 extension_codec: &dyn PhysicalExtensionCodec,
2442 ) -> Result<Self> {
2443 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2444 exec.left().to_owned(),
2445 extension_codec,
2446 )?;
2447 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2448 exec.right().to_owned(),
2449 extension_codec,
2450 )?;
2451 Ok(protobuf::PhysicalPlanNode {
2452 physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
2453 protobuf::CrossJoinExecNode {
2454 left: Some(Box::new(left)),
2455 right: Some(Box::new(right)),
2456 },
2457 ))),
2458 })
2459 }
2460
2461 fn try_from_aggregate_exec(
2462 exec: &AggregateExec,
2463 extension_codec: &dyn PhysicalExtensionCodec,
2464 ) -> Result<Self> {
2465 let groups: Vec<bool> = exec
2466 .group_expr()
2467 .groups()
2468 .iter()
2469 .flatten()
2470 .copied()
2471 .collect();
2472
2473 let group_names = exec
2474 .group_expr()
2475 .expr()
2476 .iter()
2477 .map(|expr| expr.1.to_owned())
2478 .collect();
2479
2480 let filter = exec
2481 .filter_expr()
2482 .iter()
2483 .map(|expr| serialize_maybe_filter(expr.to_owned(), extension_codec))
2484 .collect::<Result<Vec<_>>>()?;
2485
2486 let agg = exec
2487 .aggr_expr()
2488 .iter()
2489 .map(|expr| serialize_physical_aggr_expr(expr.to_owned(), extension_codec))
2490 .collect::<Result<Vec<_>>>()?;
2491
2492 let agg_names = exec
2493 .aggr_expr()
2494 .iter()
2495 .map(|expr| expr.name().to_string())
2496 .collect::<Vec<_>>();
2497
2498 let agg_mode = match exec.mode() {
2499 AggregateMode::Partial => protobuf::AggregateMode::Partial,
2500 AggregateMode::Final => protobuf::AggregateMode::Final,
2501 AggregateMode::FinalPartitioned => protobuf::AggregateMode::FinalPartitioned,
2502 AggregateMode::Single => protobuf::AggregateMode::Single,
2503 AggregateMode::SinglePartitioned => {
2504 protobuf::AggregateMode::SinglePartitioned
2505 }
2506 };
2507 let input_schema = exec.input_schema();
2508 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2509 exec.input().to_owned(),
2510 extension_codec,
2511 )?;
2512
2513 let null_expr = exec
2514 .group_expr()
2515 .null_expr()
2516 .iter()
2517 .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2518 .collect::<Result<Vec<_>>>()?;
2519
2520 let group_expr = exec
2521 .group_expr()
2522 .expr()
2523 .iter()
2524 .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2525 .collect::<Result<Vec<_>>>()?;
2526
2527 let limit = exec.limit().map(|value| protobuf::AggLimit {
2528 limit: value as u64,
2529 });
2530
2531 Ok(protobuf::PhysicalPlanNode {
2532 physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
2533 protobuf::AggregateExecNode {
2534 group_expr,
2535 group_expr_name: group_names,
2536 aggr_expr: agg,
2537 filter_expr: filter,
2538 aggr_expr_name: agg_names,
2539 mode: agg_mode as i32,
2540 input: Some(Box::new(input)),
2541 input_schema: Some(input_schema.as_ref().try_into()?),
2542 null_expr,
2543 groups,
2544 limit,
2545 has_grouping_set: exec.group_expr().has_grouping_set(),
2546 },
2547 ))),
2548 })
2549 }
2550
2551 fn try_from_empty_exec(
2552 empty: &EmptyExec,
2553 _extension_codec: &dyn PhysicalExtensionCodec,
2554 ) -> Result<Self> {
2555 let schema = empty.schema().as_ref().try_into()?;
2556 Ok(protobuf::PhysicalPlanNode {
2557 physical_plan_type: Some(PhysicalPlanType::Empty(protobuf::EmptyExecNode {
2558 schema: Some(schema),
2559 })),
2560 })
2561 }
2562
2563 fn try_from_placeholder_row_exec(
2564 empty: &PlaceholderRowExec,
2565 _extension_codec: &dyn PhysicalExtensionCodec,
2566 ) -> Result<Self> {
2567 let schema = empty.schema().as_ref().try_into()?;
2568 Ok(protobuf::PhysicalPlanNode {
2569 physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
2570 protobuf::PlaceholderRowExecNode {
2571 schema: Some(schema),
2572 },
2573 )),
2574 })
2575 }
2576
2577 fn try_from_coalesce_batches_exec(
2578 coalesce_batches: &CoalesceBatchesExec,
2579 extension_codec: &dyn PhysicalExtensionCodec,
2580 ) -> Result<Self> {
2581 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2582 coalesce_batches.input().to_owned(),
2583 extension_codec,
2584 )?;
2585 Ok(protobuf::PhysicalPlanNode {
2586 physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
2587 protobuf::CoalesceBatchesExecNode {
2588 input: Some(Box::new(input)),
2589 target_batch_size: coalesce_batches.target_batch_size() as u32,
2590 fetch: coalesce_batches.fetch().map(|n| n as u32),
2591 },
2592 ))),
2593 })
2594 }
2595
2596 fn try_from_data_source_exec(
2597 data_source_exec: &DataSourceExec,
2598 extension_codec: &dyn PhysicalExtensionCodec,
2599 ) -> Result<Option<Self>> {
2600 let data_source = data_source_exec.data_source();
2601 if let Some(maybe_csv) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2602 let source = maybe_csv.file_source();
2603 if let Some(csv_config) = source.as_any().downcast_ref::<CsvSource>() {
2604 return Ok(Some(protobuf::PhysicalPlanNode {
2605 physical_plan_type: Some(PhysicalPlanType::CsvScan(
2606 protobuf::CsvScanExecNode {
2607 base_conf: Some(serialize_file_scan_config(
2608 maybe_csv,
2609 extension_codec,
2610 )?),
2611 has_header: csv_config.has_header(),
2612 delimiter: byte_to_string(
2613 csv_config.delimiter(),
2614 "delimiter",
2615 )?,
2616 quote: byte_to_string(csv_config.quote(), "quote")?,
2617 optional_escape: if let Some(escape) = csv_config.escape() {
2618 Some(
2619 protobuf::csv_scan_exec_node::OptionalEscape::Escape(
2620 byte_to_string(escape, "escape")?,
2621 ),
2622 )
2623 } else {
2624 None
2625 },
2626 optional_comment: if let Some(comment) = csv_config.comment()
2627 {
2628 Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
2629 byte_to_string(comment, "comment")?,
2630 ))
2631 } else {
2632 None
2633 },
2634 newlines_in_values: csv_config.newlines_in_values(),
2635 truncate_rows: csv_config.truncate_rows(),
2636 },
2637 )),
2638 }));
2639 }
2640 }
2641
2642 if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2643 let source = scan_conf.file_source();
2644 if let Some(_json_source) = source.as_any().downcast_ref::<JsonSource>() {
2645 return Ok(Some(protobuf::PhysicalPlanNode {
2646 physical_plan_type: Some(PhysicalPlanType::JsonScan(
2647 protobuf::JsonScanExecNode {
2648 base_conf: Some(serialize_file_scan_config(
2649 scan_conf,
2650 extension_codec,
2651 )?),
2652 },
2653 )),
2654 }));
2655 }
2656 }
2657
2658 #[cfg(feature = "parquet")]
2659 if let Some((maybe_parquet, conf)) =
2660 data_source_exec.downcast_to_file_source::<ParquetSource>()
2661 {
2662 let predicate = conf
2663 .filter()
2664 .map(|pred| serialize_physical_expr(&pred, extension_codec))
2665 .transpose()?;
2666 return Ok(Some(protobuf::PhysicalPlanNode {
2667 physical_plan_type: Some(PhysicalPlanType::ParquetScan(
2668 protobuf::ParquetScanExecNode {
2669 base_conf: Some(serialize_file_scan_config(
2670 maybe_parquet,
2671 extension_codec,
2672 )?),
2673 predicate,
2674 parquet_options: Some(conf.table_parquet_options().try_into()?),
2675 },
2676 )),
2677 }));
2678 }
2679
2680 #[cfg(feature = "avro")]
2681 if let Some(maybe_avro) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2682 let source = maybe_avro.file_source();
2683 if source.as_any().downcast_ref::<AvroSource>().is_some() {
2684 return Ok(Some(protobuf::PhysicalPlanNode {
2685 physical_plan_type: Some(PhysicalPlanType::AvroScan(
2686 protobuf::AvroScanExecNode {
2687 base_conf: Some(serialize_file_scan_config(
2688 maybe_avro,
2689 extension_codec,
2690 )?),
2691 },
2692 )),
2693 }));
2694 }
2695 }
2696
2697 if let Some(source_conf) =
2698 data_source.as_any().downcast_ref::<MemorySourceConfig>()
2699 {
2700 let proto_partitions = source_conf
2701 .partitions()
2702 .iter()
2703 .map(|p| serialize_record_batches(p))
2704 .collect::<Result<Vec<_>>>()?;
2705
2706 let proto_schema: protobuf::Schema =
2707 source_conf.original_schema().as_ref().try_into()?;
2708
2709 let proto_projection = source_conf
2710 .projection()
2711 .as_ref()
2712 .map_or_else(Vec::new, |v| {
2713 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2714 });
2715
2716 let proto_sort_information = source_conf
2717 .sort_information()
2718 .iter()
2719 .map(|ordering| {
2720 let sort_exprs = serialize_physical_sort_exprs(
2721 ordering.to_owned(),
2722 extension_codec,
2723 )?;
2724 Ok::<_, DataFusionError>(protobuf::PhysicalSortExprNodeCollection {
2725 physical_sort_expr_nodes: sort_exprs,
2726 })
2727 })
2728 .collect::<Result<Vec<_>, _>>()?;
2729
2730 return Ok(Some(protobuf::PhysicalPlanNode {
2731 physical_plan_type: Some(PhysicalPlanType::MemoryScan(
2732 protobuf::MemoryScanExecNode {
2733 partitions: proto_partitions,
2734 schema: Some(proto_schema),
2735 projection: proto_projection,
2736 sort_information: proto_sort_information,
2737 show_sizes: source_conf.show_sizes(),
2738 fetch: source_conf.fetch().map(|f| f as u32),
2739 },
2740 )),
2741 }));
2742 }
2743
2744 Ok(None)
2745 }
2746
2747 fn try_from_coalesce_partitions_exec(
2748 exec: &CoalescePartitionsExec,
2749 extension_codec: &dyn PhysicalExtensionCodec,
2750 ) -> Result<Self> {
2751 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2752 exec.input().to_owned(),
2753 extension_codec,
2754 )?;
2755 Ok(protobuf::PhysicalPlanNode {
2756 physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
2757 protobuf::CoalescePartitionsExecNode {
2758 input: Some(Box::new(input)),
2759 fetch: exec.fetch().map(|f| f as u32),
2760 },
2761 ))),
2762 })
2763 }
2764
2765 fn try_from_repartition_exec(
2766 exec: &RepartitionExec,
2767 extension_codec: &dyn PhysicalExtensionCodec,
2768 ) -> Result<Self> {
2769 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2770 exec.input().to_owned(),
2771 extension_codec,
2772 )?;
2773
2774 let pb_partitioning =
2775 serialize_partitioning(exec.partitioning(), extension_codec)?;
2776
2777 Ok(protobuf::PhysicalPlanNode {
2778 physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
2779 protobuf::RepartitionExecNode {
2780 input: Some(Box::new(input)),
2781 partitioning: Some(pb_partitioning),
2782 },
2783 ))),
2784 })
2785 }
2786
2787 fn try_from_sort_exec(
2788 exec: &SortExec,
2789 extension_codec: &dyn PhysicalExtensionCodec,
2790 ) -> Result<Self> {
2791 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2792 exec.input().to_owned(),
2793 extension_codec,
2794 )?;
2795 let expr = exec
2796 .expr()
2797 .iter()
2798 .map(|expr| {
2799 let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2800 expr: Some(Box::new(serialize_physical_expr(
2801 &expr.expr,
2802 extension_codec,
2803 )?)),
2804 asc: !expr.options.descending,
2805 nulls_first: expr.options.nulls_first,
2806 });
2807 Ok(protobuf::PhysicalExprNode {
2808 expr_type: Some(ExprType::Sort(sort_expr)),
2809 })
2810 })
2811 .collect::<Result<Vec<_>>>()?;
2812 Ok(protobuf::PhysicalPlanNode {
2813 physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
2814 protobuf::SortExecNode {
2815 input: Some(Box::new(input)),
2816 expr,
2817 fetch: match exec.fetch() {
2818 Some(n) => n as i64,
2819 _ => -1,
2820 },
2821 preserve_partitioning: exec.preserve_partitioning(),
2822 },
2823 ))),
2824 })
2825 }
2826
2827 fn try_from_union_exec(
2828 union: &UnionExec,
2829 extension_codec: &dyn PhysicalExtensionCodec,
2830 ) -> Result<Self> {
2831 let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2832 for input in union.inputs() {
2833 inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2834 input.to_owned(),
2835 extension_codec,
2836 )?);
2837 }
2838 Ok(protobuf::PhysicalPlanNode {
2839 physical_plan_type: Some(PhysicalPlanType::Union(protobuf::UnionExecNode {
2840 inputs,
2841 })),
2842 })
2843 }
2844
2845 fn try_from_interleave_exec(
2846 interleave: &InterleaveExec,
2847 extension_codec: &dyn PhysicalExtensionCodec,
2848 ) -> Result<Self> {
2849 let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2850 for input in interleave.inputs() {
2851 inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2852 input.to_owned(),
2853 extension_codec,
2854 )?);
2855 }
2856 Ok(protobuf::PhysicalPlanNode {
2857 physical_plan_type: Some(PhysicalPlanType::Interleave(
2858 protobuf::InterleaveExecNode { inputs },
2859 )),
2860 })
2861 }
2862
2863 fn try_from_sort_preserving_merge_exec(
2864 exec: &SortPreservingMergeExec,
2865 extension_codec: &dyn PhysicalExtensionCodec,
2866 ) -> Result<Self> {
2867 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2868 exec.input().to_owned(),
2869 extension_codec,
2870 )?;
2871 let expr = exec
2872 .expr()
2873 .iter()
2874 .map(|expr| {
2875 let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2876 expr: Some(Box::new(serialize_physical_expr(
2877 &expr.expr,
2878 extension_codec,
2879 )?)),
2880 asc: !expr.options.descending,
2881 nulls_first: expr.options.nulls_first,
2882 });
2883 Ok(protobuf::PhysicalExprNode {
2884 expr_type: Some(ExprType::Sort(sort_expr)),
2885 })
2886 })
2887 .collect::<Result<Vec<_>>>()?;
2888 Ok(protobuf::PhysicalPlanNode {
2889 physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(Box::new(
2890 protobuf::SortPreservingMergeExecNode {
2891 input: Some(Box::new(input)),
2892 expr,
2893 fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
2894 },
2895 ))),
2896 })
2897 }
2898
2899 fn try_from_nested_loop_join_exec(
2900 exec: &NestedLoopJoinExec,
2901 extension_codec: &dyn PhysicalExtensionCodec,
2902 ) -> Result<Self> {
2903 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2904 exec.left().to_owned(),
2905 extension_codec,
2906 )?;
2907 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2908 exec.right().to_owned(),
2909 extension_codec,
2910 )?;
2911
2912 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2913 let filter = exec
2914 .filter()
2915 .as_ref()
2916 .map(|f| {
2917 let expression =
2918 serialize_physical_expr(f.expression(), extension_codec)?;
2919 let column_indices = f
2920 .column_indices()
2921 .iter()
2922 .map(|i| {
2923 let side: protobuf::JoinSide = i.side.to_owned().into();
2924 protobuf::ColumnIndex {
2925 index: i.index as u32,
2926 side: side.into(),
2927 }
2928 })
2929 .collect();
2930 let schema = f.schema().as_ref().try_into()?;
2931 Ok(protobuf::JoinFilter {
2932 expression: Some(expression),
2933 column_indices,
2934 schema: Some(schema),
2935 })
2936 })
2937 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2938
2939 Ok(protobuf::PhysicalPlanNode {
2940 physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
2941 protobuf::NestedLoopJoinExecNode {
2942 left: Some(Box::new(left)),
2943 right: Some(Box::new(right)),
2944 join_type: join_type.into(),
2945 filter,
2946 projection: exec.projection().map_or_else(Vec::new, |v| {
2947 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2948 }),
2949 },
2950 ))),
2951 })
2952 }
2953
2954 fn try_from_window_agg_exec(
2955 exec: &WindowAggExec,
2956 extension_codec: &dyn PhysicalExtensionCodec,
2957 ) -> Result<Self> {
2958 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2959 exec.input().to_owned(),
2960 extension_codec,
2961 )?;
2962
2963 let window_expr = exec
2964 .window_expr()
2965 .iter()
2966 .map(|e| serialize_physical_window_expr(e, extension_codec))
2967 .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2968
2969 let partition_keys = exec
2970 .partition_keys()
2971 .iter()
2972 .map(|e| serialize_physical_expr(e, extension_codec))
2973 .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2974
2975 Ok(protobuf::PhysicalPlanNode {
2976 physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2977 protobuf::WindowAggExecNode {
2978 input: Some(Box::new(input)),
2979 window_expr,
2980 partition_keys,
2981 input_order_mode: None,
2982 },
2983 ))),
2984 })
2985 }
2986
2987 fn try_from_bounded_window_agg_exec(
2988 exec: &BoundedWindowAggExec,
2989 extension_codec: &dyn PhysicalExtensionCodec,
2990 ) -> Result<Self> {
2991 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2992 exec.input().to_owned(),
2993 extension_codec,
2994 )?;
2995
2996 let window_expr = exec
2997 .window_expr()
2998 .iter()
2999 .map(|e| serialize_physical_window_expr(e, extension_codec))
3000 .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
3001
3002 let partition_keys = exec
3003 .partition_keys()
3004 .iter()
3005 .map(|e| serialize_physical_expr(e, extension_codec))
3006 .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
3007
3008 let input_order_mode = match &exec.input_order_mode {
3009 InputOrderMode::Linear => {
3010 window_agg_exec_node::InputOrderMode::Linear(protobuf::EmptyMessage {})
3011 }
3012 InputOrderMode::PartiallySorted(columns) => {
3013 window_agg_exec_node::InputOrderMode::PartiallySorted(
3014 protobuf::PartiallySortedInputOrderMode {
3015 columns: columns.iter().map(|c| *c as u64).collect(),
3016 },
3017 )
3018 }
3019 InputOrderMode::Sorted => {
3020 window_agg_exec_node::InputOrderMode::Sorted(protobuf::EmptyMessage {})
3021 }
3022 };
3023
3024 Ok(protobuf::PhysicalPlanNode {
3025 physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
3026 protobuf::WindowAggExecNode {
3027 input: Some(Box::new(input)),
3028 window_expr,
3029 partition_keys,
3030 input_order_mode: Some(input_order_mode),
3031 },
3032 ))),
3033 })
3034 }
3035
3036 fn try_from_data_sink_exec(
3037 exec: &DataSinkExec,
3038 extension_codec: &dyn PhysicalExtensionCodec,
3039 ) -> Result<Option<Self>> {
3040 let input: protobuf::PhysicalPlanNode =
3041 protobuf::PhysicalPlanNode::try_from_physical_plan(
3042 exec.input().to_owned(),
3043 extension_codec,
3044 )?;
3045 let sort_order = match exec.sort_order() {
3046 Some(requirements) => {
3047 let expr = requirements
3048 .iter()
3049 .map(|requirement| {
3050 let expr: PhysicalSortExpr = requirement.to_owned().into();
3051 let sort_expr = protobuf::PhysicalSortExprNode {
3052 expr: Some(Box::new(serialize_physical_expr(
3053 &expr.expr,
3054 extension_codec,
3055 )?)),
3056 asc: !expr.options.descending,
3057 nulls_first: expr.options.nulls_first,
3058 };
3059 Ok(sort_expr)
3060 })
3061 .collect::<Result<Vec<_>>>()?;
3062 Some(protobuf::PhysicalSortExprNodeCollection {
3063 physical_sort_expr_nodes: expr,
3064 })
3065 }
3066 None => None,
3067 };
3068
3069 if let Some(sink) = exec.sink().as_any().downcast_ref::<JsonSink>() {
3070 return Ok(Some(protobuf::PhysicalPlanNode {
3071 physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new(
3072 protobuf::JsonSinkExecNode {
3073 input: Some(Box::new(input)),
3074 sink: Some(sink.try_into()?),
3075 sink_schema: Some(exec.schema().as_ref().try_into()?),
3076 sort_order,
3077 },
3078 ))),
3079 }));
3080 }
3081
3082 if let Some(sink) = exec.sink().as_any().downcast_ref::<CsvSink>() {
3083 return Ok(Some(protobuf::PhysicalPlanNode {
3084 physical_plan_type: Some(PhysicalPlanType::CsvSink(Box::new(
3085 protobuf::CsvSinkExecNode {
3086 input: Some(Box::new(input)),
3087 sink: Some(sink.try_into()?),
3088 sink_schema: Some(exec.schema().as_ref().try_into()?),
3089 sort_order,
3090 },
3091 ))),
3092 }));
3093 }
3094
3095 #[cfg(feature = "parquet")]
3096 if let Some(sink) = exec.sink().as_any().downcast_ref::<ParquetSink>() {
3097 return Ok(Some(protobuf::PhysicalPlanNode {
3098 physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new(
3099 protobuf::ParquetSinkExecNode {
3100 input: Some(Box::new(input)),
3101 sink: Some(sink.try_into()?),
3102 sink_schema: Some(exec.schema().as_ref().try_into()?),
3103 sort_order,
3104 },
3105 ))),
3106 }));
3107 }
3108
3109 Ok(None)
3111 }
3112
3113 fn try_from_unnest_exec(
3114 exec: &UnnestExec,
3115 extension_codec: &dyn PhysicalExtensionCodec,
3116 ) -> Result<Self> {
3117 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
3118 exec.input().to_owned(),
3119 extension_codec,
3120 )?;
3121
3122 Ok(protobuf::PhysicalPlanNode {
3123 physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new(
3124 protobuf::UnnestExecNode {
3125 input: Some(Box::new(input)),
3126 schema: Some(exec.schema().try_into()?),
3127 list_type_columns: exec
3128 .list_column_indices()
3129 .iter()
3130 .map(|c| ProtoListUnnest {
3131 index_in_input_schema: c.index_in_input_schema as _,
3132 depth: c.depth as _,
3133 })
3134 .collect(),
3135 struct_type_columns: exec
3136 .struct_column_indices()
3137 .iter()
3138 .map(|c| *c as _)
3139 .collect(),
3140 options: Some(exec.options().into()),
3141 },
3142 ))),
3143 })
3144 }
3145
3146 fn try_from_cooperative_exec(
3147 exec: &CooperativeExec,
3148 extension_codec: &dyn PhysicalExtensionCodec,
3149 ) -> Result<Self> {
3150 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
3151 exec.input().to_owned(),
3152 extension_codec,
3153 )?;
3154
3155 Ok(protobuf::PhysicalPlanNode {
3156 physical_plan_type: Some(PhysicalPlanType::Cooperative(Box::new(
3157 protobuf::CooperativeExecNode {
3158 input: Some(Box::new(input)),
3159 },
3160 ))),
3161 })
3162 }
3163
3164 fn str_to_generate_series_name(name: &str) -> Result<protobuf::GenerateSeriesName> {
3165 match name {
3166 "generate_series" => Ok(protobuf::GenerateSeriesName::GsGenerateSeries),
3167 "range" => Ok(protobuf::GenerateSeriesName::GsRange),
3168 _ => internal_err!("unknown name: {name}"),
3169 }
3170 }
3171
3172 fn try_from_lazy_memory_exec(exec: &LazyMemoryExec) -> Result<Option<Self>> {
3173 let generators = exec.generators();
3174
3175 let [generator] = generators.as_slice() else {
3177 return Ok(None);
3178 };
3179
3180 let generator_guard = generator.read();
3181
3182 if let Some(empty_gen) = generator_guard.as_any().downcast_ref::<Empty>() {
3184 let schema = exec.schema();
3185 let node = protobuf::GenerateSeriesNode {
3186 schema: Some(schema.as_ref().try_into()?),
3187 target_batch_size: 8192, args: Some(protobuf::generate_series_node::Args::ContainsNull(
3189 protobuf::GenerateSeriesArgsContainsNull {
3190 name: Self::str_to_generate_series_name(empty_gen.name())? as i32,
3191 },
3192 )),
3193 };
3194
3195 return Ok(Some(protobuf::PhysicalPlanNode {
3196 physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3197 }));
3198 }
3199
3200 if let Some(int_64) = generator_guard
3201 .as_any()
3202 .downcast_ref::<GenericSeriesState<i64>>()
3203 {
3204 let schema = exec.schema();
3205 let node = protobuf::GenerateSeriesNode {
3206 schema: Some(schema.as_ref().try_into()?),
3207 target_batch_size: int_64.batch_size() as u32,
3208 args: Some(protobuf::generate_series_node::Args::Int64Args(
3209 protobuf::GenerateSeriesArgsInt64 {
3210 start: *int_64.start(),
3211 end: *int_64.end(),
3212 step: *int_64.step(),
3213 include_end: int_64.include_end(),
3214 name: Self::str_to_generate_series_name(int_64.name())? as i32,
3215 },
3216 )),
3217 };
3218
3219 return Ok(Some(protobuf::PhysicalPlanNode {
3220 physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3221 }));
3222 }
3223
3224 if let Some(timestamp_args) = generator_guard
3225 .as_any()
3226 .downcast_ref::<GenericSeriesState<TimestampValue>>()
3227 {
3228 let schema = exec.schema();
3229
3230 let start = timestamp_args.start().value();
3231 let end = timestamp_args.end().value();
3232
3233 let step_value = timestamp_args.step();
3234
3235 let step = Some(datafusion_proto_common::IntervalMonthDayNanoValue {
3236 months: step_value.months,
3237 days: step_value.days,
3238 nanos: step_value.nanoseconds,
3239 });
3240 let include_end = timestamp_args.include_end();
3241 let name = Self::str_to_generate_series_name(timestamp_args.name())? as i32;
3242
3243 let args = match timestamp_args.current().tz_str() {
3244 Some(tz) => protobuf::generate_series_node::Args::TimestampArgs(
3245 protobuf::GenerateSeriesArgsTimestamp {
3246 start,
3247 end,
3248 step,
3249 include_end,
3250 name,
3251 tz: Some(tz.to_string()),
3252 },
3253 ),
3254 None => protobuf::generate_series_node::Args::DateArgs(
3255 protobuf::GenerateSeriesArgsDate {
3256 start,
3257 end,
3258 step,
3259 include_end,
3260 name,
3261 },
3262 ),
3263 };
3264
3265 let node = protobuf::GenerateSeriesNode {
3266 schema: Some(schema.as_ref().try_into()?),
3267 target_batch_size: timestamp_args.batch_size() as u32,
3268 args: Some(args),
3269 };
3270
3271 return Ok(Some(protobuf::PhysicalPlanNode {
3272 physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3273 }));
3274 }
3275
3276 Ok(None)
3277 }
3278
3279 fn try_from_async_func_exec(
3280 exec: &AsyncFuncExec,
3281 extension_codec: &dyn PhysicalExtensionCodec,
3282 ) -> Result<Self> {
3283 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
3284 Arc::clone(exec.input()),
3285 extension_codec,
3286 )?;
3287
3288 let mut async_exprs = vec![];
3289 let mut async_expr_names = vec![];
3290
3291 for async_expr in exec.async_exprs() {
3292 async_exprs.push(serialize_physical_expr(&async_expr.func, extension_codec)?);
3293 async_expr_names.push(async_expr.name.clone())
3294 }
3295
3296 Ok(protobuf::PhysicalPlanNode {
3297 physical_plan_type: Some(PhysicalPlanType::AsyncFunc(Box::new(
3298 protobuf::AsyncFuncExecNode {
3299 input: Some(Box::new(input)),
3300 async_exprs,
3301 async_expr_names,
3302 },
3303 ))),
3304 })
3305 }
3306}
3307
3308pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
3309 fn try_decode(buf: &[u8]) -> Result<Self>
3310 where
3311 Self: Sized;
3312
3313 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
3314 where
3315 B: BufMut,
3316 Self: Sized;
3317
3318 fn try_into_physical_plan(
3319 &self,
3320 ctx: &TaskContext,
3321
3322 extension_codec: &dyn PhysicalExtensionCodec,
3323 ) -> Result<Arc<dyn ExecutionPlan>>;
3324
3325 fn try_from_physical_plan(
3326 plan: Arc<dyn ExecutionPlan>,
3327 extension_codec: &dyn PhysicalExtensionCodec,
3328 ) -> Result<Self>
3329 where
3330 Self: Sized;
3331}
3332
3333pub trait PhysicalExtensionCodec: Debug + Send + Sync {
3334 fn try_decode(
3335 &self,
3336 buf: &[u8],
3337 inputs: &[Arc<dyn ExecutionPlan>],
3338 ctx: &TaskContext,
3339 ) -> Result<Arc<dyn ExecutionPlan>>;
3340
3341 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
3342
3343 fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
3344 not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
3345 }
3346
3347 fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
3348 Ok(())
3349 }
3350
3351 fn try_decode_expr(
3352 &self,
3353 _buf: &[u8],
3354 _inputs: &[Arc<dyn PhysicalExpr>],
3355 ) -> Result<Arc<dyn PhysicalExpr>> {
3356 not_impl_err!("PhysicalExtensionCodec is not provided")
3357 }
3358
3359 fn try_encode_expr(
3360 &self,
3361 _node: &Arc<dyn PhysicalExpr>,
3362 _buf: &mut Vec<u8>,
3363 ) -> Result<()> {
3364 not_impl_err!("PhysicalExtensionCodec is not provided")
3365 }
3366
3367 fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
3368 not_impl_err!(
3369 "PhysicalExtensionCodec is not provided for aggregate function {name}"
3370 )
3371 }
3372
3373 fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
3374 Ok(())
3375 }
3376
3377 fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
3378 not_impl_err!("PhysicalExtensionCodec is not provided for window function {name}")
3379 }
3380
3381 fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
3382 Ok(())
3383 }
3384}
3385
3386#[derive(Debug)]
3387pub struct DefaultPhysicalExtensionCodec {}
3388
3389impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
3390 fn try_decode(
3391 &self,
3392 _buf: &[u8],
3393 _inputs: &[Arc<dyn ExecutionPlan>],
3394 _ctx: &TaskContext,
3395 ) -> Result<Arc<dyn ExecutionPlan>> {
3396 not_impl_err!("PhysicalExtensionCodec is not provided")
3397 }
3398
3399 fn try_encode(
3400 &self,
3401 _node: Arc<dyn ExecutionPlan>,
3402 _buf: &mut Vec<u8>,
3403 ) -> Result<()> {
3404 not_impl_err!("PhysicalExtensionCodec is not provided")
3405 }
3406}
3407
3408#[derive(Clone, PartialEq, prost::Message)]
3411struct DataEncoderTuple {
3412 #[prost(uint32, tag = 1)]
3415 pub encoder_position: u32,
3416
3417 #[prost(bytes, tag = 2)]
3418 pub blob: Vec<u8>,
3419}
3420
3421#[derive(Debug)]
3424pub struct ComposedPhysicalExtensionCodec {
3425 codecs: Vec<Arc<dyn PhysicalExtensionCodec>>,
3426}
3427
3428impl ComposedPhysicalExtensionCodec {
3429 pub fn new(codecs: Vec<Arc<dyn PhysicalExtensionCodec>>) -> Self {
3432 Self { codecs }
3433 }
3434
3435 fn decode_protobuf<R>(
3436 &self,
3437 buf: &[u8],
3438 decode: impl FnOnce(&dyn PhysicalExtensionCodec, &[u8]) -> Result<R>,
3439 ) -> Result<R> {
3440 let proto =
3441 DataEncoderTuple::decode(buf).map_err(|e| internal_datafusion_err!("{e}"))?;
3442
3443 let codec = self.codecs.get(proto.encoder_position as usize).ok_or(
3444 internal_datafusion_err!("Can't find required codec in codec list"),
3445 )?;
3446
3447 decode(codec.as_ref(), &proto.blob)
3448 }
3449
3450 fn encode_protobuf(
3451 &self,
3452 buf: &mut Vec<u8>,
3453 mut encode: impl FnMut(&dyn PhysicalExtensionCodec, &mut Vec<u8>) -> Result<()>,
3454 ) -> Result<()> {
3455 let mut data = vec![];
3456 let mut last_err = None;
3457 let mut encoder_position = None;
3458
3459 for (position, codec) in self.codecs.iter().enumerate() {
3461 match encode(codec.as_ref(), &mut data) {
3462 Ok(_) => {
3463 encoder_position = Some(position as u32);
3464 break;
3465 }
3466 Err(err) => last_err = Some(err),
3467 }
3468 }
3469
3470 let encoder_position = encoder_position.ok_or_else(|| {
3471 last_err.unwrap_or_else(|| {
3472 DataFusionError::NotImplemented(
3473 "Empty list of composed codecs".to_owned(),
3474 )
3475 })
3476 })?;
3477
3478 let proto = DataEncoderTuple {
3480 encoder_position,
3481 blob: data,
3482 };
3483 proto
3484 .encode(buf)
3485 .map_err(|e| internal_datafusion_err!("{e}"))
3486 }
3487}
3488
3489impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec {
3490 fn try_decode(
3491 &self,
3492 buf: &[u8],
3493 inputs: &[Arc<dyn ExecutionPlan>],
3494 ctx: &TaskContext,
3495 ) -> Result<Arc<dyn ExecutionPlan>> {
3496 self.decode_protobuf(buf, |codec, data| codec.try_decode(data, inputs, ctx))
3497 }
3498
3499 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
3500 self.encode_protobuf(buf, |codec, data| codec.try_encode(Arc::clone(&node), data))
3501 }
3502
3503 fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
3504 self.decode_protobuf(buf, |codec, data| codec.try_decode_udf(name, data))
3505 }
3506
3507 fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
3508 self.encode_protobuf(buf, |codec, data| codec.try_encode_udf(node, data))
3509 }
3510
3511 fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
3512 self.decode_protobuf(buf, |codec, data| codec.try_decode_udaf(name, data))
3513 }
3514
3515 fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
3516 self.encode_protobuf(buf, |codec, data| codec.try_encode_udaf(node, data))
3517 }
3518}
3519
3520fn into_physical_plan(
3521 node: &Option<Box<protobuf::PhysicalPlanNode>>,
3522 ctx: &TaskContext,
3523 extension_codec: &dyn PhysicalExtensionCodec,
3524) -> Result<Arc<dyn ExecutionPlan>> {
3525 if let Some(field) = node {
3526 field.try_into_physical_plan(ctx, extension_codec)
3527 } else {
3528 Err(proto_error("Missing required field in protobuf"))
3529 }
3530}