1use std::cell::RefCell;
19use std::collections::HashMap;
20use std::fmt::Debug;
21use std::hash::{DefaultHasher, Hash, Hasher};
22use std::sync::Arc;
23
24use arrow::compute::SortOptions;
25use arrow::datatypes::{IntervalMonthDayNanoType, Schema, SchemaRef};
26use datafusion_catalog::memory::MemorySourceConfig;
27use datafusion_common::config::CsvOptions;
28use datafusion_common::{
29 DataFusionError, Result, internal_datafusion_err, internal_err, not_impl_err,
30};
31#[cfg(feature = "parquet")]
32use datafusion_datasource::file::FileSource;
33use datafusion_datasource::file_compression_type::FileCompressionType;
34use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
35use datafusion_datasource::sink::DataSinkExec;
36use datafusion_datasource::source::{DataSource, DataSourceExec};
37use datafusion_datasource_arrow::source::ArrowSource;
38#[cfg(feature = "avro")]
39use datafusion_datasource_avro::source::AvroSource;
40use datafusion_datasource_csv::file_format::CsvSink;
41use datafusion_datasource_csv::source::CsvSource;
42use datafusion_datasource_json::file_format::JsonSink;
43use datafusion_datasource_json::source::JsonSource;
44#[cfg(feature = "parquet")]
45use datafusion_datasource_parquet::CachedParquetFileReaderFactory;
46#[cfg(feature = "parquet")]
47use datafusion_datasource_parquet::file_format::ParquetSink;
48#[cfg(feature = "parquet")]
49use datafusion_datasource_parquet::source::ParquetSource;
50#[cfg(feature = "parquet")]
51use datafusion_execution::object_store::ObjectStoreUrl;
52use datafusion_execution::{FunctionRegistry, TaskContext};
53use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
54use datafusion_functions_table::generate_series::{
55 Empty, GenSeriesArgs, GenerateSeriesTable, GenericSeriesState, TimestampValue,
56};
57use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
58use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr;
59use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
60use datafusion_physical_plan::aggregates::{
61 AggregateExec, AggregateMode, LimitOptions, PhysicalGroupBy,
62};
63use datafusion_physical_plan::analyze::AnalyzeExec;
64use datafusion_physical_plan::async_func::AsyncFuncExec;
65use datafusion_physical_plan::buffer::BufferExec;
66#[expect(deprecated)]
67use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
68use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
69use datafusion_physical_plan::coop::CooperativeExec;
70use datafusion_physical_plan::empty::EmptyExec;
71use datafusion_physical_plan::explain::ExplainExec;
72use datafusion_physical_plan::expressions::PhysicalSortExpr;
73use datafusion_physical_plan::filter::{FilterExec, FilterExecBuilder};
74use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
75use datafusion_physical_plan::joins::{
76 CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec,
77 StreamJoinPartitionMode, SymmetricHashJoinExec,
78};
79use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
80use datafusion_physical_plan::memory::LazyMemoryExec;
81use datafusion_physical_plan::metrics::MetricType;
82use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
83use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
84use datafusion_physical_plan::repartition::RepartitionExec;
85use datafusion_physical_plan::sorts::sort::SortExec;
86use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
87use datafusion_physical_plan::union::{InterleaveExec, UnionExec};
88use datafusion_physical_plan::unnest::{ListUnnest, UnnestExec};
89use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
90use datafusion_physical_plan::{ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr};
91use prost::Message;
92use prost::bytes::BufMut;
93
94use self::from_proto::parse_protobuf_partitioning;
95use self::to_proto::serialize_partitioning;
96use crate::common::{byte_to_string, str_to_byte};
97use crate::physical_plan::from_proto::{
98 parse_physical_expr_with_converter, parse_physical_sort_expr,
99 parse_physical_sort_exprs, parse_physical_window_expr,
100 parse_protobuf_file_scan_config, parse_record_batches, parse_table_schema_from_proto,
101};
102use crate::physical_plan::to_proto::{
103 serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr,
104 serialize_physical_expr_with_converter, serialize_physical_sort_exprs,
105 serialize_physical_window_expr, serialize_record_batches,
106};
107use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
108use crate::protobuf::physical_expr_node::ExprType;
109use crate::protobuf::physical_plan_node::PhysicalPlanType;
110use crate::protobuf::{
111 self, ListUnnest as ProtoListUnnest, SortExprNode, SortMergeJoinExecNode,
112 proto_error, window_agg_exec_node,
113};
114use crate::{convert_required, into_required};
115
116pub mod from_proto;
117pub mod to_proto;
118
119impl AsExecutionPlan for protobuf::PhysicalPlanNode {
120 fn try_decode(buf: &[u8]) -> Result<Self>
121 where
122 Self: Sized,
123 {
124 protobuf::PhysicalPlanNode::decode(buf).map_err(|e| {
125 internal_datafusion_err!("failed to decode physical plan: {e:?}")
126 })
127 }
128
129 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
130 where
131 B: BufMut,
132 Self: Sized,
133 {
134 self.encode(buf).map_err(|e| {
135 internal_datafusion_err!("failed to encode physical plan: {e:?}")
136 })
137 }
138
139 fn try_into_physical_plan(
140 &self,
141 ctx: &TaskContext,
142 codec: &dyn PhysicalExtensionCodec,
143 ) -> Result<Arc<dyn ExecutionPlan>> {
144 self.try_into_physical_plan_with_converter(
145 ctx,
146 codec,
147 &DefaultPhysicalProtoConverter {},
148 )
149 }
150
151 fn try_from_physical_plan(
152 plan: Arc<dyn ExecutionPlan>,
153 codec: &dyn PhysicalExtensionCodec,
154 ) -> Result<Self>
155 where
156 Self: Sized,
157 {
158 Self::try_from_physical_plan_with_converter(
159 plan,
160 codec,
161 &DefaultPhysicalProtoConverter {},
162 )
163 }
164}
165
166impl protobuf::PhysicalPlanNode {
167 pub fn try_into_physical_plan_with_converter(
168 &self,
169 ctx: &TaskContext,
170
171 codec: &dyn PhysicalExtensionCodec,
172 proto_converter: &dyn PhysicalProtoConverterExtension,
173 ) -> Result<Arc<dyn ExecutionPlan>> {
174 let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
175 proto_error(format!(
176 "physical_plan::from_proto() Unsupported physical plan '{self:?}'"
177 ))
178 })?;
179 match plan {
180 PhysicalPlanType::Explain(explain) => {
181 self.try_into_explain_physical_plan(explain, ctx, codec, proto_converter)
182 }
183 PhysicalPlanType::Projection(projection) => self
184 .try_into_projection_physical_plan(
185 projection,
186 ctx,
187 codec,
188 proto_converter,
189 ),
190 PhysicalPlanType::Filter(filter) => {
191 self.try_into_filter_physical_plan(filter, ctx, codec, proto_converter)
192 }
193 PhysicalPlanType::CsvScan(scan) => {
194 self.try_into_csv_scan_physical_plan(scan, ctx, codec, proto_converter)
195 }
196 PhysicalPlanType::JsonScan(scan) => {
197 self.try_into_json_scan_physical_plan(scan, ctx, codec, proto_converter)
198 }
199 PhysicalPlanType::ParquetScan(scan) => self
200 .try_into_parquet_scan_physical_plan(scan, ctx, codec, proto_converter),
201 PhysicalPlanType::AvroScan(scan) => {
202 self.try_into_avro_scan_physical_plan(scan, ctx, codec, proto_converter)
203 }
204 PhysicalPlanType::MemoryScan(scan) => {
205 self.try_into_memory_scan_physical_plan(scan, ctx, codec, proto_converter)
206 }
207 PhysicalPlanType::ArrowScan(scan) => {
208 self.try_into_arrow_scan_physical_plan(scan, ctx, codec, proto_converter)
209 }
210 PhysicalPlanType::CoalesceBatches(coalesce_batches) => self
211 .try_into_coalesce_batches_physical_plan(
212 coalesce_batches,
213 ctx,
214 codec,
215 proto_converter,
216 ),
217 PhysicalPlanType::Merge(merge) => {
218 self.try_into_merge_physical_plan(merge, ctx, codec, proto_converter)
219 }
220 PhysicalPlanType::Repartition(repart) => self
221 .try_into_repartition_physical_plan(repart, ctx, codec, proto_converter),
222 PhysicalPlanType::GlobalLimit(limit) => self
223 .try_into_global_limit_physical_plan(limit, ctx, codec, proto_converter),
224 PhysicalPlanType::LocalLimit(limit) => self
225 .try_into_local_limit_physical_plan(limit, ctx, codec, proto_converter),
226 PhysicalPlanType::Window(window_agg) => self.try_into_window_physical_plan(
227 window_agg,
228 ctx,
229 codec,
230 proto_converter,
231 ),
232 PhysicalPlanType::Aggregate(hash_agg) => self
233 .try_into_aggregate_physical_plan(hash_agg, ctx, codec, proto_converter),
234 PhysicalPlanType::HashJoin(hashjoin) => self
235 .try_into_hash_join_physical_plan(hashjoin, ctx, codec, proto_converter),
236 PhysicalPlanType::SymmetricHashJoin(sym_join) => self
237 .try_into_symmetric_hash_join_physical_plan(
238 sym_join,
239 ctx,
240 codec,
241 proto_converter,
242 ),
243 PhysicalPlanType::Union(union) => {
244 self.try_into_union_physical_plan(union, ctx, codec, proto_converter)
245 }
246 PhysicalPlanType::Interleave(interleave) => self
247 .try_into_interleave_physical_plan(
248 interleave,
249 ctx,
250 codec,
251 proto_converter,
252 ),
253 PhysicalPlanType::CrossJoin(crossjoin) => self
254 .try_into_cross_join_physical_plan(
255 crossjoin,
256 ctx,
257 codec,
258 proto_converter,
259 ),
260 PhysicalPlanType::Empty(empty) => {
261 self.try_into_empty_physical_plan(empty, ctx, codec, proto_converter)
262 }
263 PhysicalPlanType::PlaceholderRow(placeholder) => {
264 self.try_into_placeholder_row_physical_plan(placeholder, ctx, codec)
265 }
266 PhysicalPlanType::Sort(sort) => {
267 self.try_into_sort_physical_plan(sort, ctx, codec, proto_converter)
268 }
269 PhysicalPlanType::SortPreservingMerge(sort) => self
270 .try_into_sort_preserving_merge_physical_plan(
271 sort,
272 ctx,
273 codec,
274 proto_converter,
275 ),
276 PhysicalPlanType::Extension(extension) => self
277 .try_into_extension_physical_plan(extension, ctx, codec, proto_converter),
278 PhysicalPlanType::NestedLoopJoin(join) => self
279 .try_into_nested_loop_join_physical_plan(
280 join,
281 ctx,
282 codec,
283 proto_converter,
284 ),
285 PhysicalPlanType::Analyze(analyze) => {
286 self.try_into_analyze_physical_plan(analyze, ctx, codec, proto_converter)
287 }
288 PhysicalPlanType::JsonSink(sink) => {
289 self.try_into_json_sink_physical_plan(sink, ctx, codec, proto_converter)
290 }
291 PhysicalPlanType::CsvSink(sink) => {
292 self.try_into_csv_sink_physical_plan(sink, ctx, codec, proto_converter)
293 }
294 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
295 PhysicalPlanType::ParquetSink(sink) => self
296 .try_into_parquet_sink_physical_plan(sink, ctx, codec, proto_converter),
297 PhysicalPlanType::Unnest(unnest) => {
298 self.try_into_unnest_physical_plan(unnest, ctx, codec, proto_converter)
299 }
300 PhysicalPlanType::Cooperative(cooperative) => self
301 .try_into_cooperative_physical_plan(
302 cooperative,
303 ctx,
304 codec,
305 proto_converter,
306 ),
307 PhysicalPlanType::GenerateSeries(generate_series) => {
308 self.try_into_generate_series_physical_plan(generate_series)
309 }
310 PhysicalPlanType::SortMergeJoin(sort_join) => {
311 self.try_into_sort_join(sort_join, ctx, codec, proto_converter)
312 }
313 PhysicalPlanType::AsyncFunc(async_func) => self
314 .try_into_async_func_physical_plan(
315 async_func,
316 ctx,
317 codec,
318 proto_converter,
319 ),
320 PhysicalPlanType::Buffer(buffer) => {
321 self.try_into_buffer_physical_plan(buffer, ctx, codec, proto_converter)
322 }
323 }
324 }
325
326 pub fn try_from_physical_plan_with_converter(
327 plan: Arc<dyn ExecutionPlan>,
328 codec: &dyn PhysicalExtensionCodec,
329 proto_converter: &dyn PhysicalProtoConverterExtension,
330 ) -> Result<Self>
331 where
332 Self: Sized,
333 {
334 let plan_clone = Arc::clone(&plan);
335 let plan = plan.as_any();
336
337 if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
338 return protobuf::PhysicalPlanNode::try_from_explain_exec(exec, codec);
339 }
340
341 if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
342 return protobuf::PhysicalPlanNode::try_from_projection_exec(
343 exec,
344 codec,
345 proto_converter,
346 );
347 }
348
349 if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
350 return protobuf::PhysicalPlanNode::try_from_analyze_exec(
351 exec,
352 codec,
353 proto_converter,
354 );
355 }
356
357 if let Some(exec) = plan.downcast_ref::<FilterExec>() {
358 return protobuf::PhysicalPlanNode::try_from_filter_exec(
359 exec,
360 codec,
361 proto_converter,
362 );
363 }
364
365 if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
366 return protobuf::PhysicalPlanNode::try_from_global_limit_exec(
367 limit,
368 codec,
369 proto_converter,
370 );
371 }
372
373 if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
374 return protobuf::PhysicalPlanNode::try_from_local_limit_exec(
375 limit,
376 codec,
377 proto_converter,
378 );
379 }
380
381 if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
382 return protobuf::PhysicalPlanNode::try_from_hash_join_exec(
383 exec,
384 codec,
385 proto_converter,
386 );
387 }
388
389 if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
390 return protobuf::PhysicalPlanNode::try_from_symmetric_hash_join_exec(
391 exec,
392 codec,
393 proto_converter,
394 );
395 }
396
397 if let Some(exec) = plan.downcast_ref::<SortMergeJoinExec>() {
398 return protobuf::PhysicalPlanNode::try_from_sort_merge_join_exec(
399 exec,
400 codec,
401 proto_converter,
402 );
403 }
404
405 if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
406 return protobuf::PhysicalPlanNode::try_from_cross_join_exec(
407 exec,
408 codec,
409 proto_converter,
410 );
411 }
412
413 if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
414 return protobuf::PhysicalPlanNode::try_from_aggregate_exec(
415 exec,
416 codec,
417 proto_converter,
418 );
419 }
420
421 if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
422 return protobuf::PhysicalPlanNode::try_from_empty_exec(empty, codec);
423 }
424
425 if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
426 return protobuf::PhysicalPlanNode::try_from_placeholder_row_exec(
427 empty, codec,
428 );
429 }
430
431 #[expect(deprecated)]
432 if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
433 return protobuf::PhysicalPlanNode::try_from_coalesce_batches_exec(
434 coalesce_batches,
435 codec,
436 proto_converter,
437 );
438 }
439
440 if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>()
441 && let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec(
442 data_source_exec,
443 codec,
444 proto_converter,
445 )?
446 {
447 return Ok(node);
448 }
449
450 if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
451 return protobuf::PhysicalPlanNode::try_from_coalesce_partitions_exec(
452 exec,
453 codec,
454 proto_converter,
455 );
456 }
457
458 if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
459 return protobuf::PhysicalPlanNode::try_from_repartition_exec(
460 exec,
461 codec,
462 proto_converter,
463 );
464 }
465
466 if let Some(exec) = plan.downcast_ref::<SortExec>() {
467 return protobuf::PhysicalPlanNode::try_from_sort_exec(
468 exec,
469 codec,
470 proto_converter,
471 );
472 }
473
474 if let Some(union) = plan.downcast_ref::<UnionExec>() {
475 return protobuf::PhysicalPlanNode::try_from_union_exec(
476 union,
477 codec,
478 proto_converter,
479 );
480 }
481
482 if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
483 return protobuf::PhysicalPlanNode::try_from_interleave_exec(
484 interleave,
485 codec,
486 proto_converter,
487 );
488 }
489
490 if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
491 return protobuf::PhysicalPlanNode::try_from_sort_preserving_merge_exec(
492 exec,
493 codec,
494 proto_converter,
495 );
496 }
497
498 if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
499 return protobuf::PhysicalPlanNode::try_from_nested_loop_join_exec(
500 exec,
501 codec,
502 proto_converter,
503 );
504 }
505
506 if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
507 return protobuf::PhysicalPlanNode::try_from_window_agg_exec(
508 exec,
509 codec,
510 proto_converter,
511 );
512 }
513
514 if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
515 return protobuf::PhysicalPlanNode::try_from_bounded_window_agg_exec(
516 exec,
517 codec,
518 proto_converter,
519 );
520 }
521
522 if let Some(exec) = plan.downcast_ref::<DataSinkExec>()
523 && let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec(
524 exec,
525 codec,
526 proto_converter,
527 )?
528 {
529 return Ok(node);
530 }
531
532 if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
533 return protobuf::PhysicalPlanNode::try_from_unnest_exec(
534 exec,
535 codec,
536 proto_converter,
537 );
538 }
539
540 if let Some(exec) = plan.downcast_ref::<CooperativeExec>() {
541 return protobuf::PhysicalPlanNode::try_from_cooperative_exec(
542 exec,
543 codec,
544 proto_converter,
545 );
546 }
547
548 if let Some(exec) = plan.downcast_ref::<LazyMemoryExec>()
549 && let Some(node) =
550 protobuf::PhysicalPlanNode::try_from_lazy_memory_exec(exec)?
551 {
552 return Ok(node);
553 }
554
555 if let Some(exec) = plan.downcast_ref::<AsyncFuncExec>() {
556 return protobuf::PhysicalPlanNode::try_from_async_func_exec(
557 exec,
558 codec,
559 proto_converter,
560 );
561 }
562
563 if let Some(exec) = plan.downcast_ref::<BufferExec>() {
564 return protobuf::PhysicalPlanNode::try_from_buffer_exec(
565 exec,
566 codec,
567 proto_converter,
568 );
569 }
570
571 let mut buf: Vec<u8> = vec![];
572 match codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
573 Ok(_) => {
574 let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
575 .children()
576 .into_iter()
577 .cloned()
578 .map(|i| {
579 protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
580 i,
581 codec,
582 proto_converter,
583 )
584 })
585 .collect::<Result<_>>()?;
586
587 Ok(protobuf::PhysicalPlanNode {
588 physical_plan_type: Some(PhysicalPlanType::Extension(
589 protobuf::PhysicalExtensionNode { node: buf, inputs },
590 )),
591 })
592 }
593 Err(e) => internal_err!(
594 "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
595 ),
596 }
597 }
598}
599
600impl protobuf::PhysicalPlanNode {
601 fn try_into_explain_physical_plan(
602 &self,
603 explain: &protobuf::ExplainExecNode,
604 _ctx: &TaskContext,
605
606 _codec: &dyn PhysicalExtensionCodec,
607 _proto_converter: &dyn PhysicalProtoConverterExtension,
608 ) -> Result<Arc<dyn ExecutionPlan>> {
609 Ok(Arc::new(ExplainExec::new(
610 Arc::new(explain.schema.as_ref().unwrap().try_into()?),
611 explain
612 .stringified_plans
613 .iter()
614 .map(|plan| plan.into())
615 .collect(),
616 explain.verbose,
617 )))
618 }
619
620 fn try_into_projection_physical_plan(
621 &self,
622 projection: &protobuf::ProjectionExecNode,
623 ctx: &TaskContext,
624
625 codec: &dyn PhysicalExtensionCodec,
626 proto_converter: &dyn PhysicalProtoConverterExtension,
627 ) -> Result<Arc<dyn ExecutionPlan>> {
628 let input: Arc<dyn ExecutionPlan> =
629 into_physical_plan(&projection.input, ctx, codec, proto_converter)?;
630 let exprs = projection
631 .expr
632 .iter()
633 .zip(projection.expr_name.iter())
634 .map(|(expr, name)| {
635 Ok((
636 proto_converter.proto_to_physical_expr(
637 expr,
638 ctx,
639 input.schema().as_ref(),
640 codec,
641 )?,
642 name.to_string(),
643 ))
644 })
645 .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>>>()?;
646 let proj_exprs: Vec<ProjectionExpr> = exprs
647 .into_iter()
648 .map(|(expr, alias)| ProjectionExpr { expr, alias })
649 .collect();
650 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
651 }
652
653 fn try_into_filter_physical_plan(
654 &self,
655 filter: &protobuf::FilterExecNode,
656 ctx: &TaskContext,
657
658 codec: &dyn PhysicalExtensionCodec,
659 proto_converter: &dyn PhysicalProtoConverterExtension,
660 ) -> Result<Arc<dyn ExecutionPlan>> {
661 let input: Arc<dyn ExecutionPlan> =
662 into_physical_plan(&filter.input, ctx, codec, proto_converter)?;
663
664 let predicate = filter
665 .expr
666 .as_ref()
667 .map(|expr| {
668 proto_converter.proto_to_physical_expr(
669 expr,
670 ctx,
671 input.schema().as_ref(),
672 codec,
673 )
674 })
675 .transpose()?
676 .ok_or_else(|| {
677 internal_datafusion_err!(
678 "filter (FilterExecNode) in PhysicalPlanNode is missing."
679 )
680 })?;
681
682 let filter_selectivity = filter.default_filter_selectivity.try_into();
683 let projection = if !filter.projection.is_empty() {
684 Some(
685 filter
686 .projection
687 .iter()
688 .map(|i| *i as usize)
689 .collect::<Vec<_>>(),
690 )
691 } else {
692 None
693 };
694
695 let filter = FilterExecBuilder::new(predicate, input)
696 .apply_projection(projection)?
697 .with_batch_size(filter.batch_size as usize)
698 .with_fetch(filter.fetch.map(|f| f as usize))
699 .build()?;
700 match filter_selectivity {
701 Ok(filter_selectivity) => Ok(Arc::new(
702 filter.with_default_selectivity(filter_selectivity)?,
703 )),
704 Err(_) => Err(internal_datafusion_err!(
705 "filter_selectivity in PhysicalPlanNode is invalid "
706 )),
707 }
708 }
709
710 fn try_into_csv_scan_physical_plan(
711 &self,
712 scan: &protobuf::CsvScanExecNode,
713 ctx: &TaskContext,
714
715 codec: &dyn PhysicalExtensionCodec,
716 proto_converter: &dyn PhysicalProtoConverterExtension,
717 ) -> Result<Arc<dyn ExecutionPlan>> {
718 let escape =
719 if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(escape)) =
720 &scan.optional_escape
721 {
722 Some(str_to_byte(escape, "escape")?)
723 } else {
724 None
725 };
726
727 let comment = if let Some(
728 protobuf::csv_scan_exec_node::OptionalComment::Comment(comment),
729 ) = &scan.optional_comment
730 {
731 Some(str_to_byte(comment, "comment")?)
732 } else {
733 None
734 };
735
736 let table_schema =
738 parse_table_schema_from_proto(scan.base_conf.as_ref().unwrap())?;
739
740 let csv_options = CsvOptions {
741 has_header: Some(scan.has_header),
742 delimiter: str_to_byte(&scan.delimiter, "delimiter")?,
743 quote: str_to_byte(&scan.quote, "quote")?,
744 newlines_in_values: Some(scan.newlines_in_values),
745 ..Default::default()
746 };
747 let source = Arc::new(
748 CsvSource::new(table_schema)
749 .with_csv_options(csv_options)
750 .with_escape(escape)
751 .with_comment(comment),
752 );
753
754 let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config(
755 scan.base_conf.as_ref().unwrap(),
756 ctx,
757 codec,
758 proto_converter,
759 source,
760 )?)
761 .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
762 .build();
763 Ok(DataSourceExec::from_data_source(conf))
764 }
765
766 fn try_into_json_scan_physical_plan(
767 &self,
768 scan: &protobuf::JsonScanExecNode,
769 ctx: &TaskContext,
770
771 codec: &dyn PhysicalExtensionCodec,
772 proto_converter: &dyn PhysicalProtoConverterExtension,
773 ) -> Result<Arc<dyn ExecutionPlan>> {
774 let base_conf = scan.base_conf.as_ref().unwrap();
775 let table_schema = parse_table_schema_from_proto(base_conf)?;
776 let scan_conf = parse_protobuf_file_scan_config(
777 base_conf,
778 ctx,
779 codec,
780 proto_converter,
781 Arc::new(JsonSource::new(table_schema)),
782 )?;
783 Ok(DataSourceExec::from_data_source(scan_conf))
784 }
785
786 fn try_into_arrow_scan_physical_plan(
787 &self,
788 scan: &protobuf::ArrowScanExecNode,
789 ctx: &TaskContext,
790 codec: &dyn PhysicalExtensionCodec,
791 proto_converter: &dyn PhysicalProtoConverterExtension,
792 ) -> Result<Arc<dyn ExecutionPlan>> {
793 let base_conf = scan.base_conf.as_ref().ok_or_else(|| {
794 internal_datafusion_err!("base_conf in ArrowScanExecNode is missing.")
795 })?;
796 let table_schema = parse_table_schema_from_proto(base_conf)?;
797 let scan_conf = parse_protobuf_file_scan_config(
798 base_conf,
799 ctx,
800 codec,
801 proto_converter,
802 Arc::new(ArrowSource::new_file_source(table_schema)),
803 )?;
804 Ok(DataSourceExec::from_data_source(scan_conf))
805 }
806
807 #[cfg_attr(not(feature = "parquet"), expect(unused_variables))]
808 fn try_into_parquet_scan_physical_plan(
809 &self,
810 scan: &protobuf::ParquetScanExecNode,
811 ctx: &TaskContext,
812 codec: &dyn PhysicalExtensionCodec,
813 proto_converter: &dyn PhysicalProtoConverterExtension,
814 ) -> Result<Arc<dyn ExecutionPlan>> {
815 #[cfg(feature = "parquet")]
816 {
817 let schema = from_proto::parse_protobuf_file_scan_schema(
818 scan.base_conf.as_ref().unwrap(),
819 )?;
820
821 let base_conf = scan.base_conf.as_ref().unwrap();
823 let predicate_schema = if !base_conf.projection.is_empty() {
824 let projected_fields: Vec<_> = base_conf
826 .projection
827 .iter()
828 .map(|&i| schema.field(i as usize).clone())
829 .collect();
830 Arc::new(Schema::new(projected_fields))
831 } else {
832 schema
833 };
834
835 let predicate = scan
836 .predicate
837 .as_ref()
838 .map(|expr| {
839 proto_converter.proto_to_physical_expr(
840 expr,
841 ctx,
842 predicate_schema.as_ref(),
843 codec,
844 )
845 })
846 .transpose()?;
847 let mut options = datafusion_common::config::TableParquetOptions::default();
848
849 if let Some(table_options) = scan.parquet_options.as_ref() {
850 options = table_options.try_into()?;
851 }
852
853 let table_schema = parse_table_schema_from_proto(base_conf)?;
855 let object_store_url = match base_conf.object_store_url.is_empty() {
856 false => ObjectStoreUrl::parse(&base_conf.object_store_url)?,
857 true => ObjectStoreUrl::local_filesystem(),
858 };
859 let store = ctx.runtime_env().object_store(object_store_url)?;
860 let metadata_cache =
861 ctx.runtime_env().cache_manager.get_file_metadata_cache();
862 let reader_factory =
863 Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
864
865 let mut source = ParquetSource::new(table_schema)
866 .with_parquet_file_reader_factory(reader_factory)
867 .with_table_parquet_options(options);
868
869 if let Some(predicate) = predicate {
870 source = source.with_predicate(predicate);
871 }
872 let base_config = parse_protobuf_file_scan_config(
873 base_conf,
874 ctx,
875 codec,
876 proto_converter,
877 Arc::new(source),
878 )?;
879 Ok(DataSourceExec::from_data_source(base_config))
880 }
881 #[cfg(not(feature = "parquet"))]
882 panic!(
883 "Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled"
884 )
885 }
886
887 #[cfg_attr(not(feature = "avro"), expect(unused_variables))]
888 fn try_into_avro_scan_physical_plan(
889 &self,
890 scan: &protobuf::AvroScanExecNode,
891 ctx: &TaskContext,
892 codec: &dyn PhysicalExtensionCodec,
893 proto_converter: &dyn PhysicalProtoConverterExtension,
894 ) -> Result<Arc<dyn ExecutionPlan>> {
895 #[cfg(feature = "avro")]
896 {
897 let table_schema =
898 parse_table_schema_from_proto(scan.base_conf.as_ref().unwrap())?;
899 let conf = parse_protobuf_file_scan_config(
900 scan.base_conf.as_ref().unwrap(),
901 ctx,
902 codec,
903 proto_converter,
904 Arc::new(AvroSource::new(table_schema)),
905 )?;
906 Ok(DataSourceExec::from_data_source(conf))
907 }
908
909 #[cfg(not(feature = "avro"))]
910 panic!("Unable to process a Avro PhysicalPlan when `avro` feature is not enabled")
911 }
912
913 fn try_into_memory_scan_physical_plan(
914 &self,
915 scan: &protobuf::MemoryScanExecNode,
916 ctx: &TaskContext,
917
918 codec: &dyn PhysicalExtensionCodec,
919 proto_converter: &dyn PhysicalProtoConverterExtension,
920 ) -> Result<Arc<dyn ExecutionPlan>> {
921 let partitions = scan
922 .partitions
923 .iter()
924 .map(|p| parse_record_batches(p))
925 .collect::<Result<Vec<_>>>()?;
926
927 let proto_schema = scan.schema.as_ref().ok_or_else(|| {
928 internal_datafusion_err!("schema in MemoryScanExecNode is missing.")
929 })?;
930 let schema: SchemaRef = SchemaRef::new(proto_schema.try_into()?);
931
932 let projection = if !scan.projection.is_empty() {
933 Some(
934 scan.projection
935 .iter()
936 .map(|i| *i as usize)
937 .collect::<Vec<_>>(),
938 )
939 } else {
940 None
941 };
942
943 let mut sort_information = vec![];
944 for ordering in &scan.sort_information {
945 let sort_exprs = parse_physical_sort_exprs(
946 &ordering.physical_sort_expr_nodes,
947 ctx,
948 &schema,
949 codec,
950 proto_converter,
951 )?;
952 sort_information.extend(LexOrdering::new(sort_exprs));
953 }
954
955 let source = MemorySourceConfig::try_new(&partitions, schema, projection)?
956 .with_limit(scan.fetch.map(|f| f as usize))
957 .with_show_sizes(scan.show_sizes);
958
959 let source = source.try_with_sort_information(sort_information)?;
960
961 Ok(DataSourceExec::from_data_source(source))
962 }
963
964 fn try_into_coalesce_batches_physical_plan(
965 &self,
966 coalesce_batches: &protobuf::CoalesceBatchesExecNode,
967 ctx: &TaskContext,
968
969 codec: &dyn PhysicalExtensionCodec,
970 proto_converter: &dyn PhysicalProtoConverterExtension,
971 ) -> Result<Arc<dyn ExecutionPlan>> {
972 let input: Arc<dyn ExecutionPlan> =
973 into_physical_plan(&coalesce_batches.input, ctx, codec, proto_converter)?;
974 Ok(Arc::new(
975 #[expect(deprecated)]
976 CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize)
977 .with_fetch(coalesce_batches.fetch.map(|f| f as usize)),
978 ))
979 }
980
981 fn try_into_merge_physical_plan(
982 &self,
983 merge: &protobuf::CoalescePartitionsExecNode,
984 ctx: &TaskContext,
985
986 codec: &dyn PhysicalExtensionCodec,
987 proto_converter: &dyn PhysicalProtoConverterExtension,
988 ) -> Result<Arc<dyn ExecutionPlan>> {
989 let input: Arc<dyn ExecutionPlan> =
990 into_physical_plan(&merge.input, ctx, codec, proto_converter)?;
991 Ok(Arc::new(
992 CoalescePartitionsExec::new(input)
993 .with_fetch(merge.fetch.map(|f| f as usize)),
994 ))
995 }
996
997 fn try_into_repartition_physical_plan(
998 &self,
999 repart: &protobuf::RepartitionExecNode,
1000 ctx: &TaskContext,
1001
1002 codec: &dyn PhysicalExtensionCodec,
1003 proto_converter: &dyn PhysicalProtoConverterExtension,
1004 ) -> Result<Arc<dyn ExecutionPlan>> {
1005 let input: Arc<dyn ExecutionPlan> =
1006 into_physical_plan(&repart.input, ctx, codec, proto_converter)?;
1007 let partitioning = parse_protobuf_partitioning(
1008 repart.partitioning.as_ref(),
1009 ctx,
1010 input.schema().as_ref(),
1011 codec,
1012 proto_converter,
1013 )?;
1014 Ok(Arc::new(RepartitionExec::try_new(
1015 input,
1016 partitioning.unwrap(),
1017 )?))
1018 }
1019
1020 fn try_into_global_limit_physical_plan(
1021 &self,
1022 limit: &protobuf::GlobalLimitExecNode,
1023 ctx: &TaskContext,
1024
1025 codec: &dyn PhysicalExtensionCodec,
1026 proto_converter: &dyn PhysicalProtoConverterExtension,
1027 ) -> Result<Arc<dyn ExecutionPlan>> {
1028 let input: Arc<dyn ExecutionPlan> =
1029 into_physical_plan(&limit.input, ctx, codec, proto_converter)?;
1030 let fetch = if limit.fetch >= 0 {
1031 Some(limit.fetch as usize)
1032 } else {
1033 None
1034 };
1035 Ok(Arc::new(GlobalLimitExec::new(
1036 input,
1037 limit.skip as usize,
1038 fetch,
1039 )))
1040 }
1041
1042 fn try_into_local_limit_physical_plan(
1043 &self,
1044 limit: &protobuf::LocalLimitExecNode,
1045 ctx: &TaskContext,
1046
1047 codec: &dyn PhysicalExtensionCodec,
1048 proto_converter: &dyn PhysicalProtoConverterExtension,
1049 ) -> Result<Arc<dyn ExecutionPlan>> {
1050 let input: Arc<dyn ExecutionPlan> =
1051 into_physical_plan(&limit.input, ctx, codec, proto_converter)?;
1052 Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
1053 }
1054
1055 fn try_into_window_physical_plan(
1056 &self,
1057 window_agg: &protobuf::WindowAggExecNode,
1058 ctx: &TaskContext,
1059
1060 codec: &dyn PhysicalExtensionCodec,
1061 proto_converter: &dyn PhysicalProtoConverterExtension,
1062 ) -> Result<Arc<dyn ExecutionPlan>> {
1063 let input: Arc<dyn ExecutionPlan> =
1064 into_physical_plan(&window_agg.input, ctx, codec, proto_converter)?;
1065 let input_schema = input.schema();
1066
1067 let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
1068 .window_expr
1069 .iter()
1070 .map(|window_expr| {
1071 parse_physical_window_expr(
1072 window_expr,
1073 ctx,
1074 input_schema.as_ref(),
1075 codec,
1076 proto_converter,
1077 )
1078 })
1079 .collect::<Result<Vec<_>, _>>()?;
1080
1081 let partition_keys = window_agg
1082 .partition_keys
1083 .iter()
1084 .map(|expr| {
1085 proto_converter.proto_to_physical_expr(
1086 expr,
1087 ctx,
1088 input.schema().as_ref(),
1089 codec,
1090 )
1091 })
1092 .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
1093
1094 if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() {
1095 let input_order_mode = match input_order_mode {
1096 window_agg_exec_node::InputOrderMode::Linear(_) => InputOrderMode::Linear,
1097 window_agg_exec_node::InputOrderMode::PartiallySorted(
1098 protobuf::PartiallySortedInputOrderMode { columns },
1099 ) => InputOrderMode::PartiallySorted(
1100 columns.iter().map(|c| *c as usize).collect(),
1101 ),
1102 window_agg_exec_node::InputOrderMode::Sorted(_) => InputOrderMode::Sorted,
1103 };
1104
1105 Ok(Arc::new(BoundedWindowAggExec::try_new(
1106 physical_window_expr,
1107 input,
1108 input_order_mode,
1109 !partition_keys.is_empty(),
1110 )?))
1111 } else {
1112 Ok(Arc::new(WindowAggExec::try_new(
1113 physical_window_expr,
1114 input,
1115 !partition_keys.is_empty(),
1116 )?))
1117 }
1118 }
1119
1120 fn try_into_aggregate_physical_plan(
1121 &self,
1122 hash_agg: &protobuf::AggregateExecNode,
1123 ctx: &TaskContext,
1124
1125 codec: &dyn PhysicalExtensionCodec,
1126 proto_converter: &dyn PhysicalProtoConverterExtension,
1127 ) -> Result<Arc<dyn ExecutionPlan>> {
1128 let input: Arc<dyn ExecutionPlan> =
1129 into_physical_plan(&hash_agg.input, ctx, codec, proto_converter)?;
1130 let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(|_| {
1131 proto_error(format!(
1132 "Received a AggregateNode message with unknown AggregateMode {}",
1133 hash_agg.mode
1134 ))
1135 })?;
1136 let agg_mode: AggregateMode = match mode {
1137 protobuf::AggregateMode::Partial => AggregateMode::Partial,
1138 protobuf::AggregateMode::Final => AggregateMode::Final,
1139 protobuf::AggregateMode::FinalPartitioned => AggregateMode::FinalPartitioned,
1140 protobuf::AggregateMode::Single => AggregateMode::Single,
1141 protobuf::AggregateMode::SinglePartitioned => {
1142 AggregateMode::SinglePartitioned
1143 }
1144 protobuf::AggregateMode::PartialReduce => AggregateMode::PartialReduce,
1145 };
1146
1147 let num_expr = hash_agg.group_expr.len();
1148
1149 let group_expr = hash_agg
1150 .group_expr
1151 .iter()
1152 .zip(hash_agg.group_expr_name.iter())
1153 .map(|(expr, name)| {
1154 proto_converter
1155 .proto_to_physical_expr(expr, ctx, input.schema().as_ref(), codec)
1156 .map(|expr| (expr, name.to_string()))
1157 })
1158 .collect::<Result<Vec<_>, _>>()?;
1159
1160 let null_expr = hash_agg
1161 .null_expr
1162 .iter()
1163 .zip(hash_agg.group_expr_name.iter())
1164 .map(|(expr, name)| {
1165 proto_converter
1166 .proto_to_physical_expr(expr, ctx, input.schema().as_ref(), codec)
1167 .map(|expr| (expr, name.to_string()))
1168 })
1169 .collect::<Result<Vec<_>, _>>()?;
1170
1171 let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
1172 hash_agg
1173 .groups
1174 .chunks(num_expr)
1175 .map(|g| g.to_vec())
1176 .collect::<Vec<Vec<bool>>>()
1177 } else {
1178 vec![]
1179 };
1180
1181 let has_grouping_set = hash_agg.has_grouping_set;
1182
1183 let input_schema = hash_agg.input_schema.as_ref().ok_or_else(|| {
1184 internal_datafusion_err!("input_schema in AggregateNode is missing.")
1185 })?;
1186 let physical_schema: SchemaRef = SchemaRef::new(input_schema.try_into()?);
1187
1188 let physical_filter_expr = hash_agg
1189 .filter_expr
1190 .iter()
1191 .map(|expr| {
1192 expr.expr
1193 .as_ref()
1194 .map(|e| {
1195 proto_converter.proto_to_physical_expr(
1196 e,
1197 ctx,
1198 &physical_schema,
1199 codec,
1200 )
1201 })
1202 .transpose()
1203 })
1204 .collect::<Result<Vec<_>, _>>()?;
1205
1206 let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> = hash_agg
1207 .aggr_expr
1208 .iter()
1209 .zip(hash_agg.aggr_expr_name.iter())
1210 .map(|(expr, name)| {
1211 let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
1212 proto_error("Unexpected empty aggregate physical expression")
1213 })?;
1214
1215 match expr_type {
1216 ExprType::AggregateExpr(agg_node) => {
1217 let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node
1218 .expr
1219 .iter()
1220 .map(|e| {
1221 proto_converter.proto_to_physical_expr(
1222 e,
1223 ctx,
1224 &physical_schema,
1225 codec,
1226 )
1227 })
1228 .collect::<Result<Vec<_>>>()?;
1229 let order_bys = agg_node
1230 .ordering_req
1231 .iter()
1232 .map(|e| {
1233 parse_physical_sort_expr(
1234 e,
1235 ctx,
1236 &physical_schema,
1237 codec,
1238 proto_converter,
1239 )
1240 })
1241 .collect::<Result<_>>()?;
1242 agg_node
1243 .aggregate_function
1244 .as_ref()
1245 .map(|func| match func {
1246 AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
1247 let agg_udf = match &agg_node.fun_definition {
1248 Some(buf) => {
1249 codec.try_decode_udaf(udaf_name, buf)?
1250 }
1251 None => ctx.udaf(udaf_name).or_else(|_| {
1252 codec.try_decode_udaf(udaf_name, &[])
1253 })?,
1254 };
1255
1256 AggregateExprBuilder::new(agg_udf, input_phy_expr)
1257 .schema(Arc::clone(&physical_schema))
1258 .alias(name)
1259 .human_display(agg_node.human_display.clone())
1260 .with_ignore_nulls(agg_node.ignore_nulls)
1261 .with_distinct(agg_node.distinct)
1262 .order_by(order_bys)
1263 .build()
1264 .map(Arc::new)
1265 }
1266 })
1267 .transpose()?
1268 .ok_or_else(|| {
1269 proto_error(
1270 "Invalid AggregateExpr, missing aggregate_function",
1271 )
1272 })
1273 }
1274 _ => internal_err!("Invalid aggregate expression for AggregateExec"),
1275 }
1276 })
1277 .collect::<Result<Vec<_>, _>>()?;
1278
1279 let agg = AggregateExec::try_new(
1280 agg_mode,
1281 PhysicalGroupBy::new(group_expr, null_expr, groups, has_grouping_set),
1282 physical_aggr_expr,
1283 physical_filter_expr,
1284 input,
1285 physical_schema,
1286 )?;
1287
1288 let agg = if let Some(limit_proto) = &hash_agg.limit {
1289 let limit = limit_proto.limit as usize;
1290 let limit_options = match limit_proto.descending {
1291 Some(descending) => LimitOptions::new_with_order(limit, descending),
1292 None => LimitOptions::new(limit),
1293 };
1294 agg.with_limit_options(Some(limit_options))
1295 } else {
1296 agg
1297 };
1298
1299 Ok(Arc::new(agg))
1300 }
1301
1302 fn try_into_hash_join_physical_plan(
1303 &self,
1304 hashjoin: &protobuf::HashJoinExecNode,
1305 ctx: &TaskContext,
1306
1307 codec: &dyn PhysicalExtensionCodec,
1308 proto_converter: &dyn PhysicalProtoConverterExtension,
1309 ) -> Result<Arc<dyn ExecutionPlan>> {
1310 let left: Arc<dyn ExecutionPlan> =
1311 into_physical_plan(&hashjoin.left, ctx, codec, proto_converter)?;
1312 let right: Arc<dyn ExecutionPlan> =
1313 into_physical_plan(&hashjoin.right, ctx, codec, proto_converter)?;
1314 let left_schema = left.schema();
1315 let right_schema = right.schema();
1316 let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin
1317 .on
1318 .iter()
1319 .map(|col| {
1320 let left = proto_converter.proto_to_physical_expr(
1321 &col.left.clone().unwrap(),
1322 ctx,
1323 left_schema.as_ref(),
1324 codec,
1325 )?;
1326 let right = proto_converter.proto_to_physical_expr(
1327 &col.right.clone().unwrap(),
1328 ctx,
1329 right_schema.as_ref(),
1330 codec,
1331 )?;
1332 Ok((left, right))
1333 })
1334 .collect::<Result<_>>()?;
1335 let join_type =
1336 protobuf::JoinType::try_from(hashjoin.join_type).map_err(|_| {
1337 proto_error(format!(
1338 "Received a HashJoinNode message with unknown JoinType {}",
1339 hashjoin.join_type
1340 ))
1341 })?;
1342 let null_equality = protobuf::NullEquality::try_from(hashjoin.null_equality)
1343 .map_err(|_| {
1344 proto_error(format!(
1345 "Received a HashJoinNode message with unknown NullEquality {}",
1346 hashjoin.null_equality
1347 ))
1348 })?;
1349 let filter = hashjoin
1350 .filter
1351 .as_ref()
1352 .map(|f| {
1353 let schema = f
1354 .schema
1355 .as_ref()
1356 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1357 .try_into()?;
1358
1359 let expression = proto_converter.proto_to_physical_expr(
1360 f.expression.as_ref().ok_or_else(|| {
1361 proto_error("Unexpected empty filter expression")
1362 })?,
1363 ctx, &schema,
1364 codec,
1365 )?;
1366 let column_indices = f.column_indices
1367 .iter()
1368 .map(|i| {
1369 let side = protobuf::JoinSide::try_from(i.side)
1370 .map_err(|_| proto_error(format!(
1371 "Received a HashJoinNode message with JoinSide in Filter {}",
1372 i.side))
1373 )?;
1374
1375 Ok(ColumnIndex {
1376 index: i.index as usize,
1377 side: side.into(),
1378 })
1379 })
1380 .collect::<Result<Vec<_>>>()?;
1381
1382 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1383 })
1384 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1385
1386 let partition_mode = protobuf::PartitionMode::try_from(hashjoin.partition_mode)
1387 .map_err(|_| {
1388 proto_error(format!(
1389 "Received a HashJoinNode message with unknown PartitionMode {}",
1390 hashjoin.partition_mode
1391 ))
1392 })?;
1393 let partition_mode = match partition_mode {
1394 protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
1395 protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
1396 protobuf::PartitionMode::Auto => PartitionMode::Auto,
1397 };
1398 let projection = if !hashjoin.projection.is_empty() {
1399 Some(
1400 hashjoin
1401 .projection
1402 .iter()
1403 .map(|i| *i as usize)
1404 .collect::<Vec<_>>(),
1405 )
1406 } else {
1407 None
1408 };
1409 Ok(Arc::new(HashJoinExec::try_new(
1410 left,
1411 right,
1412 on,
1413 filter,
1414 &join_type.into(),
1415 projection,
1416 partition_mode,
1417 null_equality.into(),
1418 hashjoin.null_aware,
1419 )?))
1420 }
1421
1422 fn try_into_symmetric_hash_join_physical_plan(
1423 &self,
1424 sym_join: &protobuf::SymmetricHashJoinExecNode,
1425 ctx: &TaskContext,
1426
1427 codec: &dyn PhysicalExtensionCodec,
1428 proto_converter: &dyn PhysicalProtoConverterExtension,
1429 ) -> Result<Arc<dyn ExecutionPlan>> {
1430 let left = into_physical_plan(&sym_join.left, ctx, codec, proto_converter)?;
1431 let right = into_physical_plan(&sym_join.right, ctx, codec, proto_converter)?;
1432 let left_schema = left.schema();
1433 let right_schema = right.schema();
1434 let on = sym_join
1435 .on
1436 .iter()
1437 .map(|col| {
1438 let left = proto_converter.proto_to_physical_expr(
1439 &col.left.clone().unwrap(),
1440 ctx,
1441 left_schema.as_ref(),
1442 codec,
1443 )?;
1444 let right = proto_converter.proto_to_physical_expr(
1445 &col.right.clone().unwrap(),
1446 ctx,
1447 right_schema.as_ref(),
1448 codec,
1449 )?;
1450 Ok((left, right))
1451 })
1452 .collect::<Result<_>>()?;
1453 let join_type =
1454 protobuf::JoinType::try_from(sym_join.join_type).map_err(|_| {
1455 proto_error(format!(
1456 "Received a SymmetricHashJoin message with unknown JoinType {}",
1457 sym_join.join_type
1458 ))
1459 })?;
1460 let null_equality = protobuf::NullEquality::try_from(sym_join.null_equality)
1461 .map_err(|_| {
1462 proto_error(format!(
1463 "Received a SymmetricHashJoin message with unknown NullEquality {}",
1464 sym_join.null_equality
1465 ))
1466 })?;
1467 let filter = sym_join
1468 .filter
1469 .as_ref()
1470 .map(|f| {
1471 let schema = f
1472 .schema
1473 .as_ref()
1474 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1475 .try_into()?;
1476
1477 let expression = proto_converter.proto_to_physical_expr(
1478 f.expression.as_ref().ok_or_else(|| {
1479 proto_error("Unexpected empty filter expression")
1480 })?,
1481 ctx, &schema,
1482 codec,
1483 )?;
1484 let column_indices = f.column_indices
1485 .iter()
1486 .map(|i| {
1487 let side = protobuf::JoinSide::try_from(i.side)
1488 .map_err(|_| proto_error(format!(
1489 "Received a HashJoinNode message with JoinSide in Filter {}",
1490 i.side))
1491 )?;
1492
1493 Ok(ColumnIndex {
1494 index: i.index as usize,
1495 side: side.into(),
1496 })
1497 })
1498 .collect::<Result<_>>()?;
1499
1500 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1501 })
1502 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1503
1504 let left_sort_exprs = parse_physical_sort_exprs(
1505 &sym_join.left_sort_exprs,
1506 ctx,
1507 &left_schema,
1508 codec,
1509 proto_converter,
1510 )?;
1511 let left_sort_exprs = LexOrdering::new(left_sort_exprs);
1512
1513 let right_sort_exprs = parse_physical_sort_exprs(
1514 &sym_join.right_sort_exprs,
1515 ctx,
1516 &right_schema,
1517 codec,
1518 proto_converter,
1519 )?;
1520 let right_sort_exprs = LexOrdering::new(right_sort_exprs);
1521
1522 let partition_mode = protobuf::StreamPartitionMode::try_from(
1523 sym_join.partition_mode,
1524 )
1525 .map_err(|_| {
1526 proto_error(format!(
1527 "Received a SymmetricHashJoin message with unknown PartitionMode {}",
1528 sym_join.partition_mode
1529 ))
1530 })?;
1531 let partition_mode = match partition_mode {
1532 protobuf::StreamPartitionMode::SinglePartition => {
1533 StreamJoinPartitionMode::SinglePartition
1534 }
1535 protobuf::StreamPartitionMode::PartitionedExec => {
1536 StreamJoinPartitionMode::Partitioned
1537 }
1538 };
1539 SymmetricHashJoinExec::try_new(
1540 left,
1541 right,
1542 on,
1543 filter,
1544 &join_type.into(),
1545 null_equality.into(),
1546 left_sort_exprs,
1547 right_sort_exprs,
1548 partition_mode,
1549 )
1550 .map(|e| Arc::new(e) as _)
1551 }
1552
1553 fn try_into_union_physical_plan(
1554 &self,
1555 union: &protobuf::UnionExecNode,
1556 ctx: &TaskContext,
1557
1558 codec: &dyn PhysicalExtensionCodec,
1559 proto_converter: &dyn PhysicalProtoConverterExtension,
1560 ) -> Result<Arc<dyn ExecutionPlan>> {
1561 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1562 for input in &union.inputs {
1563 inputs.push(proto_converter.proto_to_execution_plan(ctx, codec, input)?);
1564 }
1565 UnionExec::try_new(inputs)
1566 }
1567
1568 fn try_into_interleave_physical_plan(
1569 &self,
1570 interleave: &protobuf::InterleaveExecNode,
1571 ctx: &TaskContext,
1572
1573 codec: &dyn PhysicalExtensionCodec,
1574 proto_converter: &dyn PhysicalProtoConverterExtension,
1575 ) -> Result<Arc<dyn ExecutionPlan>> {
1576 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1577 for input in &interleave.inputs {
1578 inputs.push(proto_converter.proto_to_execution_plan(ctx, codec, input)?);
1579 }
1580 Ok(Arc::new(InterleaveExec::try_new(inputs)?))
1581 }
1582
1583 fn try_into_cross_join_physical_plan(
1584 &self,
1585 crossjoin: &protobuf::CrossJoinExecNode,
1586 ctx: &TaskContext,
1587
1588 codec: &dyn PhysicalExtensionCodec,
1589 proto_converter: &dyn PhysicalProtoConverterExtension,
1590 ) -> Result<Arc<dyn ExecutionPlan>> {
1591 let left: Arc<dyn ExecutionPlan> =
1592 into_physical_plan(&crossjoin.left, ctx, codec, proto_converter)?;
1593 let right: Arc<dyn ExecutionPlan> =
1594 into_physical_plan(&crossjoin.right, ctx, codec, proto_converter)?;
1595 Ok(Arc::new(CrossJoinExec::new(left, right)))
1596 }
1597
1598 fn try_into_empty_physical_plan(
1599 &self,
1600 empty: &protobuf::EmptyExecNode,
1601 _ctx: &TaskContext,
1602
1603 _codec: &dyn PhysicalExtensionCodec,
1604 _proto_converter: &dyn PhysicalProtoConverterExtension,
1605 ) -> Result<Arc<dyn ExecutionPlan>> {
1606 let schema = Arc::new(convert_required!(empty.schema)?);
1607 Ok(Arc::new(EmptyExec::new(schema)))
1608 }
1609
1610 fn try_into_placeholder_row_physical_plan(
1611 &self,
1612 placeholder: &protobuf::PlaceholderRowExecNode,
1613 _ctx: &TaskContext,
1614
1615 _codec: &dyn PhysicalExtensionCodec,
1616 ) -> Result<Arc<dyn ExecutionPlan>> {
1617 let schema = Arc::new(convert_required!(placeholder.schema)?);
1618 Ok(Arc::new(PlaceholderRowExec::new(schema)))
1619 }
1620
1621 fn try_into_sort_physical_plan(
1622 &self,
1623 sort: &protobuf::SortExecNode,
1624 ctx: &TaskContext,
1625
1626 codec: &dyn PhysicalExtensionCodec,
1627 proto_converter: &dyn PhysicalProtoConverterExtension,
1628 ) -> Result<Arc<dyn ExecutionPlan>> {
1629 let input = into_physical_plan(&sort.input, ctx, codec, proto_converter)?;
1630 let exprs = sort
1631 .expr
1632 .iter()
1633 .map(|expr| {
1634 let expr = expr.expr_type.as_ref().ok_or_else(|| {
1635 proto_error(format!(
1636 "physical_plan::from_proto() Unexpected expr {self:?}"
1637 ))
1638 })?;
1639 if let ExprType::Sort(sort_expr) = expr {
1640 let expr = sort_expr
1641 .expr
1642 .as_ref()
1643 .ok_or_else(|| {
1644 proto_error(format!(
1645 "physical_plan::from_proto() Unexpected sort expr {self:?}"
1646 ))
1647 })?
1648 .as_ref();
1649 Ok(PhysicalSortExpr {
1650 expr: proto_converter.proto_to_physical_expr(expr, ctx, input.schema().as_ref(), codec)?,
1651 options: SortOptions {
1652 descending: !sort_expr.asc,
1653 nulls_first: sort_expr.nulls_first,
1654 },
1655 })
1656 } else {
1657 internal_err!(
1658 "physical_plan::from_proto() {self:?}"
1659 )
1660 }
1661 })
1662 .collect::<Result<Vec<_>>>()?;
1663 let Some(ordering) = LexOrdering::new(exprs) else {
1664 return internal_err!("SortExec requires an ordering");
1665 };
1666 let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1667 let new_sort = SortExec::new(ordering, input)
1668 .with_fetch(fetch)
1669 .with_preserve_partitioning(sort.preserve_partitioning);
1670
1671 Ok(Arc::new(new_sort))
1672 }
1673
1674 fn try_into_sort_preserving_merge_physical_plan(
1675 &self,
1676 sort: &protobuf::SortPreservingMergeExecNode,
1677 ctx: &TaskContext,
1678
1679 codec: &dyn PhysicalExtensionCodec,
1680 proto_converter: &dyn PhysicalProtoConverterExtension,
1681 ) -> Result<Arc<dyn ExecutionPlan>> {
1682 let input = into_physical_plan(&sort.input, ctx, codec, proto_converter)?;
1683 let exprs = sort
1684 .expr
1685 .iter()
1686 .map(|expr| {
1687 let expr = expr.expr_type.as_ref().ok_or_else(|| {
1688 proto_error(format!(
1689 "physical_plan::from_proto() Unexpected expr {self:?}"
1690 ))
1691 })?;
1692 if let ExprType::Sort(sort_expr) = expr {
1693 let expr = sort_expr
1694 .expr
1695 .as_ref()
1696 .ok_or_else(|| {
1697 proto_error(format!(
1698 "physical_plan::from_proto() Unexpected sort expr {self:?}"
1699 ))
1700 })?
1701 .as_ref();
1702 Ok(PhysicalSortExpr {
1703 expr: proto_converter.proto_to_physical_expr(
1704 expr,
1705 ctx,
1706 input.schema().as_ref(),
1707 codec,
1708 )?,
1709 options: SortOptions {
1710 descending: !sort_expr.asc,
1711 nulls_first: sort_expr.nulls_first,
1712 },
1713 })
1714 } else {
1715 internal_err!("physical_plan::from_proto() {self:?}")
1716 }
1717 })
1718 .collect::<Result<Vec<_>>>()?;
1719 let Some(ordering) = LexOrdering::new(exprs) else {
1720 return internal_err!("SortExec requires an ordering");
1721 };
1722 let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1723 Ok(Arc::new(
1724 SortPreservingMergeExec::new(ordering, input).with_fetch(fetch),
1725 ))
1726 }
1727
1728 fn try_into_extension_physical_plan(
1729 &self,
1730 extension: &protobuf::PhysicalExtensionNode,
1731 ctx: &TaskContext,
1732
1733 codec: &dyn PhysicalExtensionCodec,
1734 proto_converter: &dyn PhysicalProtoConverterExtension,
1735 ) -> Result<Arc<dyn ExecutionPlan>> {
1736 let inputs: Vec<Arc<dyn ExecutionPlan>> = extension
1737 .inputs
1738 .iter()
1739 .map(|i| proto_converter.proto_to_execution_plan(ctx, codec, i))
1740 .collect::<Result<_>>()?;
1741
1742 let extension_node = codec.try_decode(extension.node.as_slice(), &inputs, ctx)?;
1743
1744 Ok(extension_node)
1745 }
1746
1747 fn try_into_nested_loop_join_physical_plan(
1748 &self,
1749 join: &protobuf::NestedLoopJoinExecNode,
1750 ctx: &TaskContext,
1751
1752 codec: &dyn PhysicalExtensionCodec,
1753 proto_converter: &dyn PhysicalProtoConverterExtension,
1754 ) -> Result<Arc<dyn ExecutionPlan>> {
1755 let left: Arc<dyn ExecutionPlan> =
1756 into_physical_plan(&join.left, ctx, codec, proto_converter)?;
1757 let right: Arc<dyn ExecutionPlan> =
1758 into_physical_plan(&join.right, ctx, codec, proto_converter)?;
1759 let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| {
1760 proto_error(format!(
1761 "Received a NestedLoopJoinExecNode message with unknown JoinType {}",
1762 join.join_type
1763 ))
1764 })?;
1765 let filter = join
1766 .filter
1767 .as_ref()
1768 .map(|f| {
1769 let schema = f
1770 .schema
1771 .as_ref()
1772 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1773 .try_into()?;
1774
1775 let expression = proto_converter.proto_to_physical_expr(
1776 f.expression.as_ref().ok_or_else(|| {
1777 proto_error("Unexpected empty filter expression")
1778 })?,
1779 ctx, &schema,
1780 codec,
1781 )?;
1782 let column_indices = f.column_indices
1783 .iter()
1784 .map(|i| {
1785 let side = protobuf::JoinSide::try_from(i.side)
1786 .map_err(|_| proto_error(format!(
1787 "Received a NestedLoopJoinExecNode message with JoinSide in Filter {}",
1788 i.side))
1789 )?;
1790
1791 Ok(ColumnIndex {
1792 index: i.index as usize,
1793 side: side.into(),
1794 })
1795 })
1796 .collect::<Result<Vec<_>>>()?;
1797
1798 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1799 })
1800 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1801
1802 let projection = if !join.projection.is_empty() {
1803 Some(
1804 join.projection
1805 .iter()
1806 .map(|i| *i as usize)
1807 .collect::<Vec<_>>(),
1808 )
1809 } else {
1810 None
1811 };
1812
1813 Ok(Arc::new(NestedLoopJoinExec::try_new(
1814 left,
1815 right,
1816 filter,
1817 &join_type.into(),
1818 projection,
1819 )?))
1820 }
1821
1822 fn try_into_analyze_physical_plan(
1823 &self,
1824 analyze: &protobuf::AnalyzeExecNode,
1825 ctx: &TaskContext,
1826
1827 codec: &dyn PhysicalExtensionCodec,
1828 proto_converter: &dyn PhysicalProtoConverterExtension,
1829 ) -> Result<Arc<dyn ExecutionPlan>> {
1830 let input: Arc<dyn ExecutionPlan> =
1831 into_physical_plan(&analyze.input, ctx, codec, proto_converter)?;
1832 Ok(Arc::new(AnalyzeExec::new(
1833 analyze.verbose,
1834 analyze.show_statistics,
1835 vec![MetricType::SUMMARY, MetricType::DEV],
1836 input,
1837 Arc::new(convert_required!(analyze.schema)?),
1838 )))
1839 }
1840
1841 fn try_into_json_sink_physical_plan(
1842 &self,
1843 sink: &protobuf::JsonSinkExecNode,
1844 ctx: &TaskContext,
1845
1846 codec: &dyn PhysicalExtensionCodec,
1847 proto_converter: &dyn PhysicalProtoConverterExtension,
1848 ) -> Result<Arc<dyn ExecutionPlan>> {
1849 let input = into_physical_plan(&sink.input, ctx, codec, proto_converter)?;
1850
1851 let data_sink: JsonSink = sink
1852 .sink
1853 .as_ref()
1854 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1855 .try_into()?;
1856 let sink_schema = input.schema();
1857 let sort_order = sink
1858 .sort_order
1859 .as_ref()
1860 .map(|collection| {
1861 parse_physical_sort_exprs(
1862 &collection.physical_sort_expr_nodes,
1863 ctx,
1864 &sink_schema,
1865 codec,
1866 proto_converter,
1867 )
1868 .map(|sort_exprs| {
1869 LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1870 })
1871 })
1872 .transpose()?
1873 .flatten();
1874 Ok(Arc::new(DataSinkExec::new(
1875 input,
1876 Arc::new(data_sink),
1877 sort_order,
1878 )))
1879 }
1880
1881 fn try_into_csv_sink_physical_plan(
1882 &self,
1883 sink: &protobuf::CsvSinkExecNode,
1884 ctx: &TaskContext,
1885
1886 codec: &dyn PhysicalExtensionCodec,
1887 proto_converter: &dyn PhysicalProtoConverterExtension,
1888 ) -> Result<Arc<dyn ExecutionPlan>> {
1889 let input = into_physical_plan(&sink.input, ctx, codec, proto_converter)?;
1890
1891 let data_sink: CsvSink = sink
1892 .sink
1893 .as_ref()
1894 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1895 .try_into()?;
1896 let sink_schema = input.schema();
1897 let sort_order = sink
1898 .sort_order
1899 .as_ref()
1900 .map(|collection| {
1901 parse_physical_sort_exprs(
1902 &collection.physical_sort_expr_nodes,
1903 ctx,
1904 &sink_schema,
1905 codec,
1906 proto_converter,
1907 )
1908 .map(|sort_exprs| {
1909 LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1910 })
1911 })
1912 .transpose()?
1913 .flatten();
1914 Ok(Arc::new(DataSinkExec::new(
1915 input,
1916 Arc::new(data_sink),
1917 sort_order,
1918 )))
1919 }
1920
1921 #[cfg_attr(not(feature = "parquet"), expect(unused_variables))]
1922 fn try_into_parquet_sink_physical_plan(
1923 &self,
1924 sink: &protobuf::ParquetSinkExecNode,
1925 ctx: &TaskContext,
1926
1927 codec: &dyn PhysicalExtensionCodec,
1928 proto_converter: &dyn PhysicalProtoConverterExtension,
1929 ) -> Result<Arc<dyn ExecutionPlan>> {
1930 #[cfg(feature = "parquet")]
1931 {
1932 let input = into_physical_plan(&sink.input, ctx, codec, proto_converter)?;
1933
1934 let data_sink: ParquetSink = sink
1935 .sink
1936 .as_ref()
1937 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1938 .try_into()?;
1939 let sink_schema = input.schema();
1940 let sort_order = sink
1941 .sort_order
1942 .as_ref()
1943 .map(|collection| {
1944 parse_physical_sort_exprs(
1945 &collection.physical_sort_expr_nodes,
1946 ctx,
1947 &sink_schema,
1948 codec,
1949 proto_converter,
1950 )
1951 .map(|sort_exprs| {
1952 LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1953 })
1954 })
1955 .transpose()?
1956 .flatten();
1957 Ok(Arc::new(DataSinkExec::new(
1958 input,
1959 Arc::new(data_sink),
1960 sort_order,
1961 )))
1962 }
1963 #[cfg(not(feature = "parquet"))]
1964 panic!("Trying to use ParquetSink without `parquet` feature enabled");
1965 }
1966
1967 fn try_into_unnest_physical_plan(
1968 &self,
1969 unnest: &protobuf::UnnestExecNode,
1970 ctx: &TaskContext,
1971
1972 codec: &dyn PhysicalExtensionCodec,
1973 proto_converter: &dyn PhysicalProtoConverterExtension,
1974 ) -> Result<Arc<dyn ExecutionPlan>> {
1975 let input = into_physical_plan(&unnest.input, ctx, codec, proto_converter)?;
1976
1977 Ok(Arc::new(UnnestExec::new(
1978 input,
1979 unnest
1980 .list_type_columns
1981 .iter()
1982 .map(|c| ListUnnest {
1983 index_in_input_schema: c.index_in_input_schema as _,
1984 depth: c.depth as _,
1985 })
1986 .collect(),
1987 unnest.struct_type_columns.iter().map(|c| *c as _).collect(),
1988 Arc::new(convert_required!(unnest.schema)?),
1989 into_required!(unnest.options)?,
1990 )?))
1991 }
1992
1993 fn generate_series_name_to_str(name: protobuf::GenerateSeriesName) -> &'static str {
1994 match name {
1995 protobuf::GenerateSeriesName::GsGenerateSeries => "generate_series",
1996 protobuf::GenerateSeriesName::GsRange => "range",
1997 }
1998 }
1999 fn try_into_sort_join(
2000 &self,
2001 sort_join: &SortMergeJoinExecNode,
2002 ctx: &TaskContext,
2003
2004 codec: &dyn PhysicalExtensionCodec,
2005 proto_converter: &dyn PhysicalProtoConverterExtension,
2006 ) -> Result<Arc<dyn ExecutionPlan>> {
2007 let left = into_physical_plan(&sort_join.left, ctx, codec, proto_converter)?;
2008 let left_schema = left.schema();
2009 let right = into_physical_plan(&sort_join.right, ctx, codec, proto_converter)?;
2010 let right_schema = right.schema();
2011
2012 let filter = sort_join
2013 .filter
2014 .as_ref()
2015 .map(|f| {
2016 let schema = f
2017 .schema
2018 .as_ref()
2019 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
2020 .try_into()?;
2021
2022 let expression = proto_converter.proto_to_physical_expr(
2023 f.expression.as_ref().ok_or_else(|| {
2024 proto_error("Unexpected empty filter expression")
2025 })?,
2026 ctx,
2027 &schema,
2028 codec,
2029 )?;
2030 let column_indices = f
2031 .column_indices
2032 .iter()
2033 .map(|i| {
2034 let side =
2035 protobuf::JoinSide::try_from(i.side).map_err(|_| {
2036 proto_error(format!(
2037 "Received a SortMergeJoinExecNode message with JoinSide in Filter {}",
2038 i.side
2039 ))
2040 })?;
2041
2042 Ok(ColumnIndex {
2043 index: i.index as usize,
2044 side: side.into(),
2045 })
2046 })
2047 .collect::<Result<Vec<_>>>()?;
2048
2049 Ok(JoinFilter::new(
2050 expression,
2051 column_indices,
2052 Arc::new(schema),
2053 ))
2054 })
2055 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
2056
2057 let join_type =
2058 protobuf::JoinType::try_from(sort_join.join_type).map_err(|_| {
2059 proto_error(format!(
2060 "Received a SortMergeJoinExecNode message with unknown JoinType {}",
2061 sort_join.join_type
2062 ))
2063 })?;
2064
2065 let null_equality = protobuf::NullEquality::try_from(sort_join.null_equality)
2066 .map_err(|_| {
2067 proto_error(format!(
2068 "Received a SortMergeJoinExecNode message with unknown NullEquality {}",
2069 sort_join.null_equality
2070 ))
2071 })?;
2072
2073 let sort_options = sort_join
2074 .sort_options
2075 .iter()
2076 .map(|e| SortOptions {
2077 descending: !e.asc,
2078 nulls_first: e.nulls_first,
2079 })
2080 .collect();
2081 let on = sort_join
2082 .on
2083 .iter()
2084 .map(|col| {
2085 let left = proto_converter.proto_to_physical_expr(
2086 &col.left.clone().unwrap(),
2087 ctx,
2088 left_schema.as_ref(),
2089 codec,
2090 )?;
2091 let right = proto_converter.proto_to_physical_expr(
2092 &col.right.clone().unwrap(),
2093 ctx,
2094 right_schema.as_ref(),
2095 codec,
2096 )?;
2097 Ok((left, right))
2098 })
2099 .collect::<Result<_>>()?;
2100
2101 Ok(Arc::new(SortMergeJoinExec::try_new(
2102 left,
2103 right,
2104 on,
2105 filter,
2106 join_type.into(),
2107 sort_options,
2108 null_equality.into(),
2109 )?))
2110 }
2111
2112 fn try_into_generate_series_physical_plan(
2113 &self,
2114 generate_series: &protobuf::GenerateSeriesNode,
2115 ) -> Result<Arc<dyn ExecutionPlan>> {
2116 let schema: SchemaRef = Arc::new(convert_required!(generate_series.schema)?);
2117
2118 let args = match &generate_series.args {
2119 Some(protobuf::generate_series_node::Args::ContainsNull(args)) => {
2120 GenSeriesArgs::ContainsNull {
2121 name: Self::generate_series_name_to_str(args.name()),
2122 }
2123 }
2124 Some(protobuf::generate_series_node::Args::Int64Args(args)) => {
2125 GenSeriesArgs::Int64Args {
2126 start: args.start,
2127 end: args.end,
2128 step: args.step,
2129 include_end: args.include_end,
2130 name: Self::generate_series_name_to_str(args.name()),
2131 }
2132 }
2133 Some(protobuf::generate_series_node::Args::TimestampArgs(args)) => {
2134 let step_proto = args.step.as_ref().ok_or_else(|| {
2135 internal_datafusion_err!("Missing step in TimestampArgs")
2136 })?;
2137 let step = IntervalMonthDayNanoType::make_value(
2138 step_proto.months,
2139 step_proto.days,
2140 step_proto.nanos,
2141 );
2142 GenSeriesArgs::TimestampArgs {
2143 start: args.start,
2144 end: args.end,
2145 step,
2146 tz: args.tz.as_ref().map(|s| Arc::from(s.as_str())),
2147 include_end: args.include_end,
2148 name: Self::generate_series_name_to_str(args.name()),
2149 }
2150 }
2151 Some(protobuf::generate_series_node::Args::DateArgs(args)) => {
2152 let step_proto = args.step.as_ref().ok_or_else(|| {
2153 internal_datafusion_err!("Missing step in DateArgs")
2154 })?;
2155 let step = IntervalMonthDayNanoType::make_value(
2156 step_proto.months,
2157 step_proto.days,
2158 step_proto.nanos,
2159 );
2160 GenSeriesArgs::DateArgs {
2161 start: args.start,
2162 end: args.end,
2163 step,
2164 include_end: args.include_end,
2165 name: Self::generate_series_name_to_str(args.name()),
2166 }
2167 }
2168 None => return internal_err!("Missing args in GenerateSeriesNode"),
2169 };
2170
2171 let table = GenerateSeriesTable::new(Arc::clone(&schema), args);
2172 let generator = table.as_generator(generate_series.target_batch_size as usize)?;
2173
2174 Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?))
2175 }
2176
2177 fn try_into_cooperative_physical_plan(
2178 &self,
2179 field_stream: &protobuf::CooperativeExecNode,
2180 ctx: &TaskContext,
2181
2182 codec: &dyn PhysicalExtensionCodec,
2183 proto_converter: &dyn PhysicalProtoConverterExtension,
2184 ) -> Result<Arc<dyn ExecutionPlan>> {
2185 let input = into_physical_plan(&field_stream.input, ctx, codec, proto_converter)?;
2186 Ok(Arc::new(CooperativeExec::new(input)))
2187 }
2188
2189 fn try_into_async_func_physical_plan(
2190 &self,
2191 async_func: &protobuf::AsyncFuncExecNode,
2192 ctx: &TaskContext,
2193 codec: &dyn PhysicalExtensionCodec,
2194 proto_converter: &dyn PhysicalProtoConverterExtension,
2195 ) -> Result<Arc<dyn ExecutionPlan>> {
2196 let input: Arc<dyn ExecutionPlan> =
2197 into_physical_plan(&async_func.input, ctx, codec, proto_converter)?;
2198
2199 if async_func.async_exprs.len() != async_func.async_expr_names.len() {
2200 return internal_err!(
2201 "AsyncFuncExecNode async_exprs length does not match async_expr_names"
2202 );
2203 }
2204
2205 let async_exprs = async_func
2206 .async_exprs
2207 .iter()
2208 .zip(async_func.async_expr_names.iter())
2209 .map(|(expr, name)| {
2210 let physical_expr = proto_converter.proto_to_physical_expr(
2211 expr,
2212 ctx,
2213 input.schema().as_ref(),
2214 codec,
2215 )?;
2216
2217 Ok(Arc::new(AsyncFuncExpr::try_new(
2218 name.clone(),
2219 physical_expr,
2220 input.schema().as_ref(),
2221 )?))
2222 })
2223 .collect::<Result<Vec<_>>>()?;
2224
2225 Ok(Arc::new(AsyncFuncExec::try_new(async_exprs, input)?))
2226 }
2227
2228 fn try_into_buffer_physical_plan(
2229 &self,
2230 buffer: &protobuf::BufferExecNode,
2231 ctx: &TaskContext,
2232 extension_codec: &dyn PhysicalExtensionCodec,
2233 proto_converter: &dyn PhysicalProtoConverterExtension,
2234 ) -> Result<Arc<dyn ExecutionPlan>> {
2235 let input: Arc<dyn ExecutionPlan> =
2236 into_physical_plan(&buffer.input, ctx, extension_codec, proto_converter)?;
2237
2238 Ok(Arc::new(BufferExec::new(input, buffer.capacity as usize)))
2239 }
2240
2241 fn try_from_explain_exec(
2242 exec: &ExplainExec,
2243 _codec: &dyn PhysicalExtensionCodec,
2244 ) -> Result<Self> {
2245 Ok(protobuf::PhysicalPlanNode {
2246 physical_plan_type: Some(PhysicalPlanType::Explain(
2247 protobuf::ExplainExecNode {
2248 schema: Some(exec.schema().as_ref().try_into()?),
2249 stringified_plans: exec
2250 .stringified_plans()
2251 .iter()
2252 .map(|plan| plan.into())
2253 .collect(),
2254 verbose: exec.verbose(),
2255 },
2256 )),
2257 })
2258 }
2259
2260 fn try_from_projection_exec(
2261 exec: &ProjectionExec,
2262 codec: &dyn PhysicalExtensionCodec,
2263 proto_converter: &dyn PhysicalProtoConverterExtension,
2264 ) -> Result<Self> {
2265 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2266 exec.input().to_owned(),
2267 codec,
2268 proto_converter,
2269 )?;
2270 let expr = exec
2271 .expr()
2272 .iter()
2273 .map(|proj_expr| {
2274 proto_converter.physical_expr_to_proto(&proj_expr.expr, codec)
2275 })
2276 .collect::<Result<Vec<_>>>()?;
2277 let expr_name = exec
2278 .expr()
2279 .iter()
2280 .map(|proj_expr| proj_expr.alias.clone())
2281 .collect();
2282 Ok(protobuf::PhysicalPlanNode {
2283 physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
2284 protobuf::ProjectionExecNode {
2285 input: Some(Box::new(input)),
2286 expr,
2287 expr_name,
2288 },
2289 ))),
2290 })
2291 }
2292
2293 fn try_from_analyze_exec(
2294 exec: &AnalyzeExec,
2295 codec: &dyn PhysicalExtensionCodec,
2296 proto_converter: &dyn PhysicalProtoConverterExtension,
2297 ) -> Result<Self> {
2298 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2299 exec.input().to_owned(),
2300 codec,
2301 proto_converter,
2302 )?;
2303 Ok(protobuf::PhysicalPlanNode {
2304 physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
2305 protobuf::AnalyzeExecNode {
2306 verbose: exec.verbose(),
2307 show_statistics: exec.show_statistics(),
2308 input: Some(Box::new(input)),
2309 schema: Some(exec.schema().as_ref().try_into()?),
2310 },
2311 ))),
2312 })
2313 }
2314
2315 fn try_from_filter_exec(
2316 exec: &FilterExec,
2317 codec: &dyn PhysicalExtensionCodec,
2318 proto_converter: &dyn PhysicalProtoConverterExtension,
2319 ) -> Result<Self> {
2320 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2321 exec.input().to_owned(),
2322 codec,
2323 proto_converter,
2324 )?;
2325 Ok(protobuf::PhysicalPlanNode {
2326 physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
2327 protobuf::FilterExecNode {
2328 input: Some(Box::new(input)),
2329 expr: Some(
2330 proto_converter
2331 .physical_expr_to_proto(exec.predicate(), codec)?,
2332 ),
2333 default_filter_selectivity: exec.default_selectivity() as u32,
2334 projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
2335 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2336 }),
2337 batch_size: exec.batch_size() as u32,
2338 fetch: exec.fetch().map(|f| f as u32),
2339 },
2340 ))),
2341 })
2342 }
2343
2344 fn try_from_global_limit_exec(
2345 limit: &GlobalLimitExec,
2346 codec: &dyn PhysicalExtensionCodec,
2347 proto_converter: &dyn PhysicalProtoConverterExtension,
2348 ) -> Result<Self> {
2349 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2350 limit.input().to_owned(),
2351 codec,
2352 proto_converter,
2353 )?;
2354
2355 Ok(protobuf::PhysicalPlanNode {
2356 physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
2357 protobuf::GlobalLimitExecNode {
2358 input: Some(Box::new(input)),
2359 skip: limit.skip() as u32,
2360 fetch: match limit.fetch() {
2361 Some(n) => n as i64,
2362 _ => -1, },
2364 },
2365 ))),
2366 })
2367 }
2368
2369 fn try_from_local_limit_exec(
2370 limit: &LocalLimitExec,
2371 codec: &dyn PhysicalExtensionCodec,
2372 proto_converter: &dyn PhysicalProtoConverterExtension,
2373 ) -> Result<Self> {
2374 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2375 limit.input().to_owned(),
2376 codec,
2377 proto_converter,
2378 )?;
2379 Ok(protobuf::PhysicalPlanNode {
2380 physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
2381 protobuf::LocalLimitExecNode {
2382 input: Some(Box::new(input)),
2383 fetch: limit.fetch() as u32,
2384 },
2385 ))),
2386 })
2387 }
2388
2389 fn try_from_hash_join_exec(
2390 exec: &HashJoinExec,
2391 codec: &dyn PhysicalExtensionCodec,
2392 proto_converter: &dyn PhysicalProtoConverterExtension,
2393 ) -> Result<Self> {
2394 let left = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2395 exec.left().to_owned(),
2396 codec,
2397 proto_converter,
2398 )?;
2399 let right = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2400 exec.right().to_owned(),
2401 codec,
2402 proto_converter,
2403 )?;
2404 let on: Vec<protobuf::JoinOn> = exec
2405 .on()
2406 .iter()
2407 .map(|tuple| {
2408 let l = proto_converter.physical_expr_to_proto(&tuple.0, codec)?;
2409 let r = proto_converter.physical_expr_to_proto(&tuple.1, codec)?;
2410 Ok::<_, DataFusionError>(protobuf::JoinOn {
2411 left: Some(l),
2412 right: Some(r),
2413 })
2414 })
2415 .collect::<Result<_>>()?;
2416 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2417 let null_equality: protobuf::NullEquality = exec.null_equality().into();
2418 let filter = exec
2419 .filter()
2420 .as_ref()
2421 .map(|f| {
2422 let expression =
2423 proto_converter.physical_expr_to_proto(f.expression(), codec)?;
2424 let column_indices = f
2425 .column_indices()
2426 .iter()
2427 .map(|i| {
2428 let side: protobuf::JoinSide = i.side.to_owned().into();
2429 protobuf::ColumnIndex {
2430 index: i.index as u32,
2431 side: side.into(),
2432 }
2433 })
2434 .collect();
2435 let schema = f.schema().as_ref().try_into()?;
2436 Ok(protobuf::JoinFilter {
2437 expression: Some(expression),
2438 column_indices,
2439 schema: Some(schema),
2440 })
2441 })
2442 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2443
2444 let partition_mode = match exec.partition_mode() {
2445 PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
2446 PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
2447 PartitionMode::Auto => protobuf::PartitionMode::Auto,
2448 };
2449
2450 Ok(protobuf::PhysicalPlanNode {
2451 physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
2452 protobuf::HashJoinExecNode {
2453 left: Some(Box::new(left)),
2454 right: Some(Box::new(right)),
2455 on,
2456 join_type: join_type.into(),
2457 partition_mode: partition_mode.into(),
2458 null_equality: null_equality.into(),
2459 filter,
2460 projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
2461 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2462 }),
2463 null_aware: exec.null_aware,
2464 },
2465 ))),
2466 })
2467 }
2468
2469 fn try_from_symmetric_hash_join_exec(
2470 exec: &SymmetricHashJoinExec,
2471 codec: &dyn PhysicalExtensionCodec,
2472 proto_converter: &dyn PhysicalProtoConverterExtension,
2473 ) -> Result<Self> {
2474 let left = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2475 exec.left().to_owned(),
2476 codec,
2477 proto_converter,
2478 )?;
2479 let right = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2480 exec.right().to_owned(),
2481 codec,
2482 proto_converter,
2483 )?;
2484 let on = exec
2485 .on()
2486 .iter()
2487 .map(|tuple| {
2488 let l = proto_converter.physical_expr_to_proto(&tuple.0, codec)?;
2489 let r = proto_converter.physical_expr_to_proto(&tuple.1, codec)?;
2490 Ok::<_, DataFusionError>(protobuf::JoinOn {
2491 left: Some(l),
2492 right: Some(r),
2493 })
2494 })
2495 .collect::<Result<_>>()?;
2496 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2497 let null_equality: protobuf::NullEquality = exec.null_equality().into();
2498 let filter = exec
2499 .filter()
2500 .as_ref()
2501 .map(|f| {
2502 let expression =
2503 proto_converter.physical_expr_to_proto(f.expression(), codec)?;
2504 let column_indices = f
2505 .column_indices()
2506 .iter()
2507 .map(|i| {
2508 let side: protobuf::JoinSide = i.side.to_owned().into();
2509 protobuf::ColumnIndex {
2510 index: i.index as u32,
2511 side: side.into(),
2512 }
2513 })
2514 .collect();
2515 let schema = f.schema().as_ref().try_into()?;
2516 Ok(protobuf::JoinFilter {
2517 expression: Some(expression),
2518 column_indices,
2519 schema: Some(schema),
2520 })
2521 })
2522 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2523
2524 let partition_mode = match exec.partition_mode() {
2525 StreamJoinPartitionMode::SinglePartition => {
2526 protobuf::StreamPartitionMode::SinglePartition
2527 }
2528 StreamJoinPartitionMode::Partitioned => {
2529 protobuf::StreamPartitionMode::PartitionedExec
2530 }
2531 };
2532
2533 let left_sort_exprs = exec
2534 .left_sort_exprs()
2535 .map(|exprs| {
2536 exprs
2537 .iter()
2538 .map(|expr| {
2539 Ok(protobuf::PhysicalSortExprNode {
2540 expr: Some(Box::new(
2541 proto_converter
2542 .physical_expr_to_proto(&expr.expr, codec)?,
2543 )),
2544 asc: !expr.options.descending,
2545 nulls_first: expr.options.nulls_first,
2546 })
2547 })
2548 .collect::<Result<Vec<_>>>()
2549 })
2550 .transpose()?
2551 .unwrap_or(vec![]);
2552
2553 let right_sort_exprs = exec
2554 .right_sort_exprs()
2555 .map(|exprs| {
2556 exprs
2557 .iter()
2558 .map(|expr| {
2559 Ok(protobuf::PhysicalSortExprNode {
2560 expr: Some(Box::new(
2561 proto_converter
2562 .physical_expr_to_proto(&expr.expr, codec)?,
2563 )),
2564 asc: !expr.options.descending,
2565 nulls_first: expr.options.nulls_first,
2566 })
2567 })
2568 .collect::<Result<Vec<_>>>()
2569 })
2570 .transpose()?
2571 .unwrap_or(vec![]);
2572
2573 Ok(protobuf::PhysicalPlanNode {
2574 physical_plan_type: Some(PhysicalPlanType::SymmetricHashJoin(Box::new(
2575 protobuf::SymmetricHashJoinExecNode {
2576 left: Some(Box::new(left)),
2577 right: Some(Box::new(right)),
2578 on,
2579 join_type: join_type.into(),
2580 partition_mode: partition_mode.into(),
2581 null_equality: null_equality.into(),
2582 left_sort_exprs,
2583 right_sort_exprs,
2584 filter,
2585 },
2586 ))),
2587 })
2588 }
2589
2590 fn try_from_sort_merge_join_exec(
2591 exec: &SortMergeJoinExec,
2592 codec: &dyn PhysicalExtensionCodec,
2593 proto_converter: &dyn PhysicalProtoConverterExtension,
2594 ) -> Result<Self> {
2595 let left = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2596 exec.left().to_owned(),
2597 codec,
2598 proto_converter,
2599 )?;
2600 let right = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2601 exec.right().to_owned(),
2602 codec,
2603 proto_converter,
2604 )?;
2605 let on = exec
2606 .on()
2607 .iter()
2608 .map(|tuple| {
2609 let l = proto_converter.physical_expr_to_proto(&tuple.0, codec)?;
2610 let r = proto_converter.physical_expr_to_proto(&tuple.1, codec)?;
2611 Ok::<_, DataFusionError>(protobuf::JoinOn {
2612 left: Some(l),
2613 right: Some(r),
2614 })
2615 })
2616 .collect::<Result<_>>()?;
2617 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2618 let null_equality: protobuf::NullEquality = exec.null_equality().into();
2619 let filter = exec
2620 .filter()
2621 .as_ref()
2622 .map(|f| {
2623 let expression =
2624 proto_converter.physical_expr_to_proto(f.expression(), codec)?;
2625 let column_indices = f
2626 .column_indices()
2627 .iter()
2628 .map(|i| {
2629 let side: protobuf::JoinSide = i.side.to_owned().into();
2630 protobuf::ColumnIndex {
2631 index: i.index as u32,
2632 side: side.into(),
2633 }
2634 })
2635 .collect();
2636 let schema = f.schema().as_ref().try_into()?;
2637 Ok(protobuf::JoinFilter {
2638 expression: Some(expression),
2639 column_indices,
2640 schema: Some(schema),
2641 })
2642 })
2643 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2644
2645 let sort_options = exec
2646 .sort_options()
2647 .iter()
2648 .map(
2649 |SortOptions {
2650 descending,
2651 nulls_first,
2652 }| {
2653 SortExprNode {
2654 expr: None,
2655 asc: !*descending,
2656 nulls_first: *nulls_first,
2657 }
2658 },
2659 )
2660 .collect();
2661
2662 Ok(protobuf::PhysicalPlanNode {
2663 physical_plan_type: Some(PhysicalPlanType::SortMergeJoin(Box::new(
2664 SortMergeJoinExecNode {
2665 left: Some(Box::new(left)),
2666 right: Some(Box::new(right)),
2667 on,
2668 join_type: join_type.into(),
2669 null_equality: null_equality.into(),
2670 filter,
2671 sort_options,
2672 },
2673 ))),
2674 })
2675 }
2676
2677 fn try_from_cross_join_exec(
2678 exec: &CrossJoinExec,
2679 codec: &dyn PhysicalExtensionCodec,
2680 proto_converter: &dyn PhysicalProtoConverterExtension,
2681 ) -> Result<Self> {
2682 let left = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2683 exec.left().to_owned(),
2684 codec,
2685 proto_converter,
2686 )?;
2687 let right = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2688 exec.right().to_owned(),
2689 codec,
2690 proto_converter,
2691 )?;
2692 Ok(protobuf::PhysicalPlanNode {
2693 physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
2694 protobuf::CrossJoinExecNode {
2695 left: Some(Box::new(left)),
2696 right: Some(Box::new(right)),
2697 },
2698 ))),
2699 })
2700 }
2701
2702 fn try_from_aggregate_exec(
2703 exec: &AggregateExec,
2704 codec: &dyn PhysicalExtensionCodec,
2705 proto_converter: &dyn PhysicalProtoConverterExtension,
2706 ) -> Result<Self> {
2707 let groups: Vec<bool> = exec
2708 .group_expr()
2709 .groups()
2710 .iter()
2711 .flatten()
2712 .copied()
2713 .collect();
2714
2715 let group_names = exec
2716 .group_expr()
2717 .expr()
2718 .iter()
2719 .map(|expr| expr.1.to_owned())
2720 .collect();
2721
2722 let filter = exec
2723 .filter_expr()
2724 .iter()
2725 .map(|expr| serialize_maybe_filter(expr.to_owned(), codec, proto_converter))
2726 .collect::<Result<Vec<_>>>()?;
2727
2728 let agg = exec
2729 .aggr_expr()
2730 .iter()
2731 .map(|expr| {
2732 serialize_physical_aggr_expr(expr.to_owned(), codec, proto_converter)
2733 })
2734 .collect::<Result<Vec<_>>>()?;
2735
2736 let agg_names = exec
2737 .aggr_expr()
2738 .iter()
2739 .map(|expr| expr.name().to_string())
2740 .collect::<Vec<_>>();
2741
2742 let agg_mode = match exec.mode() {
2743 AggregateMode::Partial => protobuf::AggregateMode::Partial,
2744 AggregateMode::Final => protobuf::AggregateMode::Final,
2745 AggregateMode::FinalPartitioned => protobuf::AggregateMode::FinalPartitioned,
2746 AggregateMode::Single => protobuf::AggregateMode::Single,
2747 AggregateMode::SinglePartitioned => {
2748 protobuf::AggregateMode::SinglePartitioned
2749 }
2750 AggregateMode::PartialReduce => protobuf::AggregateMode::PartialReduce,
2751 };
2752 let input_schema = exec.input_schema();
2753 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2754 exec.input().to_owned(),
2755 codec,
2756 proto_converter,
2757 )?;
2758
2759 let null_expr = exec
2760 .group_expr()
2761 .null_expr()
2762 .iter()
2763 .map(|expr| proto_converter.physical_expr_to_proto(&expr.0, codec))
2764 .collect::<Result<Vec<_>>>()?;
2765
2766 let group_expr = exec
2767 .group_expr()
2768 .expr()
2769 .iter()
2770 .map(|expr| proto_converter.physical_expr_to_proto(&expr.0, codec))
2771 .collect::<Result<Vec<_>>>()?;
2772
2773 let limit = exec.limit_options().map(|config| protobuf::AggLimit {
2774 limit: config.limit() as u64,
2775 descending: config.descending(),
2776 });
2777
2778 Ok(protobuf::PhysicalPlanNode {
2779 physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
2780 protobuf::AggregateExecNode {
2781 group_expr,
2782 group_expr_name: group_names,
2783 aggr_expr: agg,
2784 filter_expr: filter,
2785 aggr_expr_name: agg_names,
2786 mode: agg_mode as i32,
2787 input: Some(Box::new(input)),
2788 input_schema: Some(input_schema.as_ref().try_into()?),
2789 null_expr,
2790 groups,
2791 limit,
2792 has_grouping_set: exec.group_expr().has_grouping_set(),
2793 },
2794 ))),
2795 })
2796 }
2797
2798 fn try_from_empty_exec(
2799 empty: &EmptyExec,
2800 _codec: &dyn PhysicalExtensionCodec,
2801 ) -> Result<Self> {
2802 let schema = empty.schema().as_ref().try_into()?;
2803 Ok(protobuf::PhysicalPlanNode {
2804 physical_plan_type: Some(PhysicalPlanType::Empty(protobuf::EmptyExecNode {
2805 schema: Some(schema),
2806 })),
2807 })
2808 }
2809
2810 fn try_from_placeholder_row_exec(
2811 empty: &PlaceholderRowExec,
2812 _codec: &dyn PhysicalExtensionCodec,
2813 ) -> Result<Self> {
2814 let schema = empty.schema().as_ref().try_into()?;
2815 Ok(protobuf::PhysicalPlanNode {
2816 physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
2817 protobuf::PlaceholderRowExecNode {
2818 schema: Some(schema),
2819 },
2820 )),
2821 })
2822 }
2823
2824 #[expect(deprecated)]
2825 fn try_from_coalesce_batches_exec(
2826 coalesce_batches: &CoalesceBatchesExec,
2827 codec: &dyn PhysicalExtensionCodec,
2828 proto_converter: &dyn PhysicalProtoConverterExtension,
2829 ) -> Result<Self> {
2830 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2831 coalesce_batches.input().to_owned(),
2832 codec,
2833 proto_converter,
2834 )?;
2835 Ok(protobuf::PhysicalPlanNode {
2836 physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
2837 protobuf::CoalesceBatchesExecNode {
2838 input: Some(Box::new(input)),
2839 target_batch_size: coalesce_batches.target_batch_size() as u32,
2840 fetch: coalesce_batches.fetch().map(|n| n as u32),
2841 },
2842 ))),
2843 })
2844 }
2845
2846 fn try_from_data_source_exec(
2847 data_source_exec: &DataSourceExec,
2848 codec: &dyn PhysicalExtensionCodec,
2849 proto_converter: &dyn PhysicalProtoConverterExtension,
2850 ) -> Result<Option<Self>> {
2851 let data_source = data_source_exec.data_source();
2852 if let Some(maybe_csv) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2853 let source = maybe_csv.file_source();
2854 if let Some(csv_config) = source.as_any().downcast_ref::<CsvSource>() {
2855 return Ok(Some(protobuf::PhysicalPlanNode {
2856 physical_plan_type: Some(PhysicalPlanType::CsvScan(
2857 protobuf::CsvScanExecNode {
2858 base_conf: Some(serialize_file_scan_config(
2859 maybe_csv,
2860 codec,
2861 proto_converter,
2862 )?),
2863 has_header: csv_config.has_header(),
2864 delimiter: byte_to_string(
2865 csv_config.delimiter(),
2866 "delimiter",
2867 )?,
2868 quote: byte_to_string(csv_config.quote(), "quote")?,
2869 optional_escape: if let Some(escape) = csv_config.escape() {
2870 Some(
2871 protobuf::csv_scan_exec_node::OptionalEscape::Escape(
2872 byte_to_string(escape, "escape")?,
2873 ),
2874 )
2875 } else {
2876 None
2877 },
2878 optional_comment: if let Some(comment) = csv_config.comment()
2879 {
2880 Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
2881 byte_to_string(comment, "comment")?,
2882 ))
2883 } else {
2884 None
2885 },
2886 newlines_in_values: csv_config.newlines_in_values(),
2887 truncate_rows: csv_config.truncate_rows(),
2888 },
2889 )),
2890 }));
2891 }
2892 }
2893
2894 if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2895 let source = scan_conf.file_source();
2896 if let Some(_json_source) = source.as_any().downcast_ref::<JsonSource>() {
2897 return Ok(Some(protobuf::PhysicalPlanNode {
2898 physical_plan_type: Some(PhysicalPlanType::JsonScan(
2899 protobuf::JsonScanExecNode {
2900 base_conf: Some(serialize_file_scan_config(
2901 scan_conf,
2902 codec,
2903 proto_converter,
2904 )?),
2905 },
2906 )),
2907 }));
2908 }
2909 }
2910
2911 if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2912 let source = scan_conf.file_source();
2913 if let Some(_arrow_source) = source.as_any().downcast_ref::<ArrowSource>() {
2914 return Ok(Some(protobuf::PhysicalPlanNode {
2915 physical_plan_type: Some(PhysicalPlanType::ArrowScan(
2916 protobuf::ArrowScanExecNode {
2917 base_conf: Some(serialize_file_scan_config(
2918 scan_conf,
2919 codec,
2920 proto_converter,
2921 )?),
2922 },
2923 )),
2924 }));
2925 }
2926 }
2927
2928 #[cfg(feature = "parquet")]
2929 if let Some((maybe_parquet, conf)) =
2930 data_source_exec.downcast_to_file_source::<ParquetSource>()
2931 {
2932 let predicate = conf
2933 .filter()
2934 .map(|pred| proto_converter.physical_expr_to_proto(&pred, codec))
2935 .transpose()?;
2936 return Ok(Some(protobuf::PhysicalPlanNode {
2937 physical_plan_type: Some(PhysicalPlanType::ParquetScan(
2938 protobuf::ParquetScanExecNode {
2939 base_conf: Some(serialize_file_scan_config(
2940 maybe_parquet,
2941 codec,
2942 proto_converter,
2943 )?),
2944 predicate,
2945 parquet_options: Some(conf.table_parquet_options().try_into()?),
2946 },
2947 )),
2948 }));
2949 }
2950
2951 #[cfg(feature = "avro")]
2952 if let Some(maybe_avro) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2953 let source = maybe_avro.file_source();
2954 if source.as_any().downcast_ref::<AvroSource>().is_some() {
2955 return Ok(Some(protobuf::PhysicalPlanNode {
2956 physical_plan_type: Some(PhysicalPlanType::AvroScan(
2957 protobuf::AvroScanExecNode {
2958 base_conf: Some(serialize_file_scan_config(
2959 maybe_avro,
2960 codec,
2961 proto_converter,
2962 )?),
2963 },
2964 )),
2965 }));
2966 }
2967 }
2968
2969 if let Some(source_conf) =
2970 data_source.as_any().downcast_ref::<MemorySourceConfig>()
2971 {
2972 let proto_partitions = source_conf
2973 .partitions()
2974 .iter()
2975 .map(|p| serialize_record_batches(p))
2976 .collect::<Result<Vec<_>>>()?;
2977
2978 let proto_schema: protobuf::Schema =
2979 source_conf.original_schema().as_ref().try_into()?;
2980
2981 let proto_projection = source_conf
2982 .projection()
2983 .as_ref()
2984 .map_or_else(Vec::new, |v| {
2985 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2986 });
2987
2988 let proto_sort_information = source_conf
2989 .sort_information()
2990 .iter()
2991 .map(|ordering| {
2992 let sort_exprs = serialize_physical_sort_exprs(
2993 ordering.to_owned(),
2994 codec,
2995 proto_converter,
2996 )?;
2997 Ok::<_, DataFusionError>(protobuf::PhysicalSortExprNodeCollection {
2998 physical_sort_expr_nodes: sort_exprs,
2999 })
3000 })
3001 .collect::<Result<Vec<_>, _>>()?;
3002
3003 return Ok(Some(protobuf::PhysicalPlanNode {
3004 physical_plan_type: Some(PhysicalPlanType::MemoryScan(
3005 protobuf::MemoryScanExecNode {
3006 partitions: proto_partitions,
3007 schema: Some(proto_schema),
3008 projection: proto_projection,
3009 sort_information: proto_sort_information,
3010 show_sizes: source_conf.show_sizes(),
3011 fetch: source_conf.fetch().map(|f| f as u32),
3012 },
3013 )),
3014 }));
3015 }
3016
3017 Ok(None)
3018 }
3019
3020 fn try_from_coalesce_partitions_exec(
3021 exec: &CoalescePartitionsExec,
3022 codec: &dyn PhysicalExtensionCodec,
3023 proto_converter: &dyn PhysicalProtoConverterExtension,
3024 ) -> Result<Self> {
3025 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3026 exec.input().to_owned(),
3027 codec,
3028 proto_converter,
3029 )?;
3030 Ok(protobuf::PhysicalPlanNode {
3031 physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
3032 protobuf::CoalescePartitionsExecNode {
3033 input: Some(Box::new(input)),
3034 fetch: exec.fetch().map(|f| f as u32),
3035 },
3036 ))),
3037 })
3038 }
3039
3040 fn try_from_repartition_exec(
3041 exec: &RepartitionExec,
3042 codec: &dyn PhysicalExtensionCodec,
3043 proto_converter: &dyn PhysicalProtoConverterExtension,
3044 ) -> Result<Self> {
3045 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3046 exec.input().to_owned(),
3047 codec,
3048 proto_converter,
3049 )?;
3050
3051 let pb_partitioning =
3052 serialize_partitioning(exec.partitioning(), codec, proto_converter)?;
3053
3054 Ok(protobuf::PhysicalPlanNode {
3055 physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
3056 protobuf::RepartitionExecNode {
3057 input: Some(Box::new(input)),
3058 partitioning: Some(pb_partitioning),
3059 },
3060 ))),
3061 })
3062 }
3063
3064 fn try_from_sort_exec(
3065 exec: &SortExec,
3066 codec: &dyn PhysicalExtensionCodec,
3067 proto_converter: &dyn PhysicalProtoConverterExtension,
3068 ) -> Result<Self> {
3069 let input = proto_converter.execution_plan_to_proto(exec.input(), codec)?;
3070 let expr = exec
3071 .expr()
3072 .iter()
3073 .map(|expr| {
3074 let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
3075 expr: Some(Box::new(
3076 proto_converter.physical_expr_to_proto(&expr.expr, codec)?,
3077 )),
3078 asc: !expr.options.descending,
3079 nulls_first: expr.options.nulls_first,
3080 });
3081 Ok(protobuf::PhysicalExprNode {
3082 expr_id: None,
3083 expr_type: Some(ExprType::Sort(sort_expr)),
3084 })
3085 })
3086 .collect::<Result<Vec<_>>>()?;
3087 Ok(protobuf::PhysicalPlanNode {
3088 physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
3089 protobuf::SortExecNode {
3090 input: Some(Box::new(input)),
3091 expr,
3092 fetch: match exec.fetch() {
3093 Some(n) => n as i64,
3094 _ => -1,
3095 },
3096 preserve_partitioning: exec.preserve_partitioning(),
3097 },
3098 ))),
3099 })
3100 }
3101
3102 fn try_from_union_exec(
3103 union: &UnionExec,
3104 codec: &dyn PhysicalExtensionCodec,
3105 proto_converter: &dyn PhysicalProtoConverterExtension,
3106 ) -> Result<Self> {
3107 let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
3108 for input in union.inputs() {
3109 inputs.push(
3110 protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3111 input.to_owned(),
3112 codec,
3113 proto_converter,
3114 )?,
3115 );
3116 }
3117 Ok(protobuf::PhysicalPlanNode {
3118 physical_plan_type: Some(PhysicalPlanType::Union(protobuf::UnionExecNode {
3119 inputs,
3120 })),
3121 })
3122 }
3123
3124 fn try_from_interleave_exec(
3125 interleave: &InterleaveExec,
3126 codec: &dyn PhysicalExtensionCodec,
3127 proto_converter: &dyn PhysicalProtoConverterExtension,
3128 ) -> Result<Self> {
3129 let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
3130 for input in interleave.inputs() {
3131 inputs.push(
3132 protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3133 input.to_owned(),
3134 codec,
3135 proto_converter,
3136 )?,
3137 );
3138 }
3139 Ok(protobuf::PhysicalPlanNode {
3140 physical_plan_type: Some(PhysicalPlanType::Interleave(
3141 protobuf::InterleaveExecNode { inputs },
3142 )),
3143 })
3144 }
3145
3146 fn try_from_sort_preserving_merge_exec(
3147 exec: &SortPreservingMergeExec,
3148 codec: &dyn PhysicalExtensionCodec,
3149 proto_converter: &dyn PhysicalProtoConverterExtension,
3150 ) -> Result<Self> {
3151 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3152 exec.input().to_owned(),
3153 codec,
3154 proto_converter,
3155 )?;
3156 let expr = exec
3157 .expr()
3158 .iter()
3159 .map(|expr| {
3160 let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
3161 expr: Some(Box::new(
3162 proto_converter.physical_expr_to_proto(&expr.expr, codec)?,
3163 )),
3164 asc: !expr.options.descending,
3165 nulls_first: expr.options.nulls_first,
3166 });
3167 Ok(protobuf::PhysicalExprNode {
3168 expr_id: None,
3169 expr_type: Some(ExprType::Sort(sort_expr)),
3170 })
3171 })
3172 .collect::<Result<Vec<_>>>()?;
3173 Ok(protobuf::PhysicalPlanNode {
3174 physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(Box::new(
3175 protobuf::SortPreservingMergeExecNode {
3176 input: Some(Box::new(input)),
3177 expr,
3178 fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
3179 },
3180 ))),
3181 })
3182 }
3183
3184 fn try_from_nested_loop_join_exec(
3185 exec: &NestedLoopJoinExec,
3186 codec: &dyn PhysicalExtensionCodec,
3187 proto_converter: &dyn PhysicalProtoConverterExtension,
3188 ) -> Result<Self> {
3189 let left = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3190 exec.left().to_owned(),
3191 codec,
3192 proto_converter,
3193 )?;
3194 let right = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3195 exec.right().to_owned(),
3196 codec,
3197 proto_converter,
3198 )?;
3199
3200 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
3201 let filter = exec
3202 .filter()
3203 .as_ref()
3204 .map(|f| {
3205 let expression =
3206 proto_converter.physical_expr_to_proto(f.expression(), codec)?;
3207 let column_indices = f
3208 .column_indices()
3209 .iter()
3210 .map(|i| {
3211 let side: protobuf::JoinSide = i.side.to_owned().into();
3212 protobuf::ColumnIndex {
3213 index: i.index as u32,
3214 side: side.into(),
3215 }
3216 })
3217 .collect();
3218 let schema = f.schema().as_ref().try_into()?;
3219 Ok(protobuf::JoinFilter {
3220 expression: Some(expression),
3221 column_indices,
3222 schema: Some(schema),
3223 })
3224 })
3225 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
3226
3227 Ok(protobuf::PhysicalPlanNode {
3228 physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
3229 protobuf::NestedLoopJoinExecNode {
3230 left: Some(Box::new(left)),
3231 right: Some(Box::new(right)),
3232 join_type: join_type.into(),
3233 filter,
3234 projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
3235 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
3236 }),
3237 },
3238 ))),
3239 })
3240 }
3241
3242 fn try_from_window_agg_exec(
3243 exec: &WindowAggExec,
3244 codec: &dyn PhysicalExtensionCodec,
3245 proto_converter: &dyn PhysicalProtoConverterExtension,
3246 ) -> Result<Self> {
3247 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3248 exec.input().to_owned(),
3249 codec,
3250 proto_converter,
3251 )?;
3252
3253 let window_expr = exec
3254 .window_expr()
3255 .iter()
3256 .map(|e| serialize_physical_window_expr(e, codec, proto_converter))
3257 .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
3258
3259 let partition_keys = exec
3260 .partition_keys()
3261 .iter()
3262 .map(|e| proto_converter.physical_expr_to_proto(e, codec))
3263 .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
3264
3265 Ok(protobuf::PhysicalPlanNode {
3266 physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
3267 protobuf::WindowAggExecNode {
3268 input: Some(Box::new(input)),
3269 window_expr,
3270 partition_keys,
3271 input_order_mode: None,
3272 },
3273 ))),
3274 })
3275 }
3276
3277 fn try_from_bounded_window_agg_exec(
3278 exec: &BoundedWindowAggExec,
3279 codec: &dyn PhysicalExtensionCodec,
3280 proto_converter: &dyn PhysicalProtoConverterExtension,
3281 ) -> Result<Self> {
3282 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3283 exec.input().to_owned(),
3284 codec,
3285 proto_converter,
3286 )?;
3287
3288 let window_expr = exec
3289 .window_expr()
3290 .iter()
3291 .map(|e| serialize_physical_window_expr(e, codec, proto_converter))
3292 .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
3293
3294 let partition_keys = exec
3295 .partition_keys()
3296 .iter()
3297 .map(|e| proto_converter.physical_expr_to_proto(e, codec))
3298 .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
3299
3300 let input_order_mode = match &exec.input_order_mode {
3301 InputOrderMode::Linear => {
3302 window_agg_exec_node::InputOrderMode::Linear(protobuf::EmptyMessage {})
3303 }
3304 InputOrderMode::PartiallySorted(columns) => {
3305 window_agg_exec_node::InputOrderMode::PartiallySorted(
3306 protobuf::PartiallySortedInputOrderMode {
3307 columns: columns.iter().map(|c| *c as u64).collect(),
3308 },
3309 )
3310 }
3311 InputOrderMode::Sorted => {
3312 window_agg_exec_node::InputOrderMode::Sorted(protobuf::EmptyMessage {})
3313 }
3314 };
3315
3316 Ok(protobuf::PhysicalPlanNode {
3317 physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
3318 protobuf::WindowAggExecNode {
3319 input: Some(Box::new(input)),
3320 window_expr,
3321 partition_keys,
3322 input_order_mode: Some(input_order_mode),
3323 },
3324 ))),
3325 })
3326 }
3327
3328 fn try_from_data_sink_exec(
3329 exec: &DataSinkExec,
3330 codec: &dyn PhysicalExtensionCodec,
3331 proto_converter: &dyn PhysicalProtoConverterExtension,
3332 ) -> Result<Option<Self>> {
3333 let input: protobuf::PhysicalPlanNode =
3334 protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3335 exec.input().to_owned(),
3336 codec,
3337 proto_converter,
3338 )?;
3339 let sort_order = match exec.sort_order() {
3340 Some(requirements) => {
3341 let expr = requirements
3342 .iter()
3343 .map(|requirement| {
3344 let expr: PhysicalSortExpr = requirement.to_owned().into();
3345 let sort_expr = protobuf::PhysicalSortExprNode {
3346 expr: Some(Box::new(
3347 proto_converter
3348 .physical_expr_to_proto(&expr.expr, codec)?,
3349 )),
3350 asc: !expr.options.descending,
3351 nulls_first: expr.options.nulls_first,
3352 };
3353 Ok(sort_expr)
3354 })
3355 .collect::<Result<Vec<_>>>()?;
3356 Some(protobuf::PhysicalSortExprNodeCollection {
3357 physical_sort_expr_nodes: expr,
3358 })
3359 }
3360 None => None,
3361 };
3362
3363 if let Some(sink) = exec.sink().as_any().downcast_ref::<JsonSink>() {
3364 return Ok(Some(protobuf::PhysicalPlanNode {
3365 physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new(
3366 protobuf::JsonSinkExecNode {
3367 input: Some(Box::new(input)),
3368 sink: Some(sink.try_into()?),
3369 sink_schema: Some(exec.schema().as_ref().try_into()?),
3370 sort_order,
3371 },
3372 ))),
3373 }));
3374 }
3375
3376 if let Some(sink) = exec.sink().as_any().downcast_ref::<CsvSink>() {
3377 return Ok(Some(protobuf::PhysicalPlanNode {
3378 physical_plan_type: Some(PhysicalPlanType::CsvSink(Box::new(
3379 protobuf::CsvSinkExecNode {
3380 input: Some(Box::new(input)),
3381 sink: Some(sink.try_into()?),
3382 sink_schema: Some(exec.schema().as_ref().try_into()?),
3383 sort_order,
3384 },
3385 ))),
3386 }));
3387 }
3388
3389 #[cfg(feature = "parquet")]
3390 if let Some(sink) = exec.sink().as_any().downcast_ref::<ParquetSink>() {
3391 return Ok(Some(protobuf::PhysicalPlanNode {
3392 physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new(
3393 protobuf::ParquetSinkExecNode {
3394 input: Some(Box::new(input)),
3395 sink: Some(sink.try_into()?),
3396 sink_schema: Some(exec.schema().as_ref().try_into()?),
3397 sort_order,
3398 },
3399 ))),
3400 }));
3401 }
3402
3403 Ok(None)
3405 }
3406
3407 fn try_from_unnest_exec(
3408 exec: &UnnestExec,
3409 codec: &dyn PhysicalExtensionCodec,
3410 proto_converter: &dyn PhysicalProtoConverterExtension,
3411 ) -> Result<Self> {
3412 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3413 exec.input().to_owned(),
3414 codec,
3415 proto_converter,
3416 )?;
3417
3418 Ok(protobuf::PhysicalPlanNode {
3419 physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new(
3420 protobuf::UnnestExecNode {
3421 input: Some(Box::new(input)),
3422 schema: Some(exec.schema().try_into()?),
3423 list_type_columns: exec
3424 .list_column_indices()
3425 .iter()
3426 .map(|c| ProtoListUnnest {
3427 index_in_input_schema: c.index_in_input_schema as _,
3428 depth: c.depth as _,
3429 })
3430 .collect(),
3431 struct_type_columns: exec
3432 .struct_column_indices()
3433 .iter()
3434 .map(|c| *c as _)
3435 .collect(),
3436 options: Some(exec.options().into()),
3437 },
3438 ))),
3439 })
3440 }
3441
3442 fn try_from_cooperative_exec(
3443 exec: &CooperativeExec,
3444 codec: &dyn PhysicalExtensionCodec,
3445 proto_converter: &dyn PhysicalProtoConverterExtension,
3446 ) -> Result<Self> {
3447 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3448 exec.input().to_owned(),
3449 codec,
3450 proto_converter,
3451 )?;
3452
3453 Ok(protobuf::PhysicalPlanNode {
3454 physical_plan_type: Some(PhysicalPlanType::Cooperative(Box::new(
3455 protobuf::CooperativeExecNode {
3456 input: Some(Box::new(input)),
3457 },
3458 ))),
3459 })
3460 }
3461
3462 fn str_to_generate_series_name(name: &str) -> Result<protobuf::GenerateSeriesName> {
3463 match name {
3464 "generate_series" => Ok(protobuf::GenerateSeriesName::GsGenerateSeries),
3465 "range" => Ok(protobuf::GenerateSeriesName::GsRange),
3466 _ => internal_err!("unknown name: {name}"),
3467 }
3468 }
3469
3470 fn try_from_lazy_memory_exec(exec: &LazyMemoryExec) -> Result<Option<Self>> {
3471 let generators = exec.generators();
3472
3473 let [generator] = generators.as_slice() else {
3475 return Ok(None);
3476 };
3477
3478 let generator_guard = generator.read();
3479
3480 if let Some(empty_gen) = generator_guard.as_any().downcast_ref::<Empty>() {
3482 let schema = exec.schema();
3483 let node = protobuf::GenerateSeriesNode {
3484 schema: Some(schema.as_ref().try_into()?),
3485 target_batch_size: 8192, args: Some(protobuf::generate_series_node::Args::ContainsNull(
3487 protobuf::GenerateSeriesArgsContainsNull {
3488 name: Self::str_to_generate_series_name(empty_gen.name())? as i32,
3489 },
3490 )),
3491 };
3492
3493 return Ok(Some(protobuf::PhysicalPlanNode {
3494 physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3495 }));
3496 }
3497
3498 if let Some(int_64) = generator_guard
3499 .as_any()
3500 .downcast_ref::<GenericSeriesState<i64>>()
3501 {
3502 let schema = exec.schema();
3503 let node = protobuf::GenerateSeriesNode {
3504 schema: Some(schema.as_ref().try_into()?),
3505 target_batch_size: int_64.batch_size() as u32,
3506 args: Some(protobuf::generate_series_node::Args::Int64Args(
3507 protobuf::GenerateSeriesArgsInt64 {
3508 start: *int_64.start(),
3509 end: *int_64.end(),
3510 step: *int_64.step(),
3511 include_end: int_64.include_end(),
3512 name: Self::str_to_generate_series_name(int_64.name())? as i32,
3513 },
3514 )),
3515 };
3516
3517 return Ok(Some(protobuf::PhysicalPlanNode {
3518 physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3519 }));
3520 }
3521
3522 if let Some(timestamp_args) = generator_guard
3523 .as_any()
3524 .downcast_ref::<GenericSeriesState<TimestampValue>>()
3525 {
3526 let schema = exec.schema();
3527
3528 let start = timestamp_args.start().value();
3529 let end = timestamp_args.end().value();
3530
3531 let step_value = timestamp_args.step();
3532
3533 let step = Some(datafusion_proto_common::IntervalMonthDayNanoValue {
3534 months: step_value.months,
3535 days: step_value.days,
3536 nanos: step_value.nanoseconds,
3537 });
3538 let include_end = timestamp_args.include_end();
3539 let name = Self::str_to_generate_series_name(timestamp_args.name())? as i32;
3540
3541 let args = match timestamp_args.current().tz_str() {
3542 Some(tz) => protobuf::generate_series_node::Args::TimestampArgs(
3543 protobuf::GenerateSeriesArgsTimestamp {
3544 start,
3545 end,
3546 step,
3547 include_end,
3548 name,
3549 tz: Some(tz.to_string()),
3550 },
3551 ),
3552 None => protobuf::generate_series_node::Args::DateArgs(
3553 protobuf::GenerateSeriesArgsDate {
3554 start,
3555 end,
3556 step,
3557 include_end,
3558 name,
3559 },
3560 ),
3561 };
3562
3563 let node = protobuf::GenerateSeriesNode {
3564 schema: Some(schema.as_ref().try_into()?),
3565 target_batch_size: timestamp_args.batch_size() as u32,
3566 args: Some(args),
3567 };
3568
3569 return Ok(Some(protobuf::PhysicalPlanNode {
3570 physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3571 }));
3572 }
3573
3574 Ok(None)
3575 }
3576
3577 fn try_from_async_func_exec(
3578 exec: &AsyncFuncExec,
3579 codec: &dyn PhysicalExtensionCodec,
3580 proto_converter: &dyn PhysicalProtoConverterExtension,
3581 ) -> Result<Self> {
3582 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3583 Arc::clone(exec.input()),
3584 codec,
3585 proto_converter,
3586 )?;
3587
3588 let mut async_exprs = vec![];
3589 let mut async_expr_names = vec![];
3590
3591 for async_expr in exec.async_exprs() {
3592 async_exprs
3593 .push(proto_converter.physical_expr_to_proto(&async_expr.func, codec)?);
3594 async_expr_names.push(async_expr.name.clone())
3595 }
3596
3597 Ok(protobuf::PhysicalPlanNode {
3598 physical_plan_type: Some(PhysicalPlanType::AsyncFunc(Box::new(
3599 protobuf::AsyncFuncExecNode {
3600 input: Some(Box::new(input)),
3601 async_exprs,
3602 async_expr_names,
3603 },
3604 ))),
3605 })
3606 }
3607
3608 fn try_from_buffer_exec(
3609 exec: &BufferExec,
3610 extension_codec: &dyn PhysicalExtensionCodec,
3611 proto_converter: &dyn PhysicalProtoConverterExtension,
3612 ) -> Result<Self> {
3613 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3614 Arc::clone(exec.input()),
3615 extension_codec,
3616 proto_converter,
3617 )?;
3618
3619 Ok(protobuf::PhysicalPlanNode {
3620 physical_plan_type: Some(PhysicalPlanType::Buffer(Box::new(
3621 protobuf::BufferExecNode {
3622 input: Some(Box::new(input)),
3623 capacity: exec.capacity() as u64,
3624 },
3625 ))),
3626 })
3627 }
3628}
3629
3630pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
3631 fn try_decode(buf: &[u8]) -> Result<Self>
3632 where
3633 Self: Sized;
3634
3635 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
3636 where
3637 B: BufMut,
3638 Self: Sized;
3639
3640 fn try_into_physical_plan(
3641 &self,
3642 ctx: &TaskContext,
3643
3644 codec: &dyn PhysicalExtensionCodec,
3645 ) -> Result<Arc<dyn ExecutionPlan>>;
3646
3647 fn try_from_physical_plan(
3648 plan: Arc<dyn ExecutionPlan>,
3649 codec: &dyn PhysicalExtensionCodec,
3650 ) -> Result<Self>
3651 where
3652 Self: Sized;
3653}
3654
3655pub trait PhysicalExtensionCodec: Debug + Send + Sync {
3656 fn try_decode(
3657 &self,
3658 buf: &[u8],
3659 inputs: &[Arc<dyn ExecutionPlan>],
3660 ctx: &TaskContext,
3661 ) -> Result<Arc<dyn ExecutionPlan>>;
3662
3663 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
3664
3665 fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
3666 not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
3667 }
3668
3669 fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
3670 Ok(())
3671 }
3672
3673 fn try_decode_expr(
3674 &self,
3675 _buf: &[u8],
3676 _inputs: &[Arc<dyn PhysicalExpr>],
3677 ) -> Result<Arc<dyn PhysicalExpr>> {
3678 not_impl_err!("PhysicalExtensionCodec is not provided")
3679 }
3680
3681 fn try_encode_expr(
3682 &self,
3683 _node: &Arc<dyn PhysicalExpr>,
3684 _buf: &mut Vec<u8>,
3685 ) -> Result<()> {
3686 not_impl_err!("PhysicalExtensionCodec is not provided")
3687 }
3688
3689 fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
3690 not_impl_err!(
3691 "PhysicalExtensionCodec is not provided for aggregate function {name}"
3692 )
3693 }
3694
3695 fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
3696 Ok(())
3697 }
3698
3699 fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
3700 not_impl_err!("PhysicalExtensionCodec is not provided for window function {name}")
3701 }
3702
3703 fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
3704 Ok(())
3705 }
3706}
3707
3708#[derive(Debug)]
3709pub struct DefaultPhysicalExtensionCodec {}
3710
3711impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
3712 fn try_decode(
3713 &self,
3714 _buf: &[u8],
3715 _inputs: &[Arc<dyn ExecutionPlan>],
3716 _ctx: &TaskContext,
3717 ) -> Result<Arc<dyn ExecutionPlan>> {
3718 not_impl_err!("PhysicalExtensionCodec is not provided")
3719 }
3720
3721 fn try_encode(
3722 &self,
3723 _node: Arc<dyn ExecutionPlan>,
3724 _buf: &mut Vec<u8>,
3725 ) -> Result<()> {
3726 not_impl_err!("PhysicalExtensionCodec is not provided")
3727 }
3728}
3729
3730pub trait PhysicalProtoConverterExtension {
3734 fn proto_to_execution_plan(
3735 &self,
3736 ctx: &TaskContext,
3737 codec: &dyn PhysicalExtensionCodec,
3738 proto: &protobuf::PhysicalPlanNode,
3739 ) -> Result<Arc<dyn ExecutionPlan>>;
3740
3741 fn execution_plan_to_proto(
3742 &self,
3743 plan: &Arc<dyn ExecutionPlan>,
3744 codec: &dyn PhysicalExtensionCodec,
3745 ) -> Result<protobuf::PhysicalPlanNode>;
3746
3747 fn proto_to_physical_expr(
3748 &self,
3749 proto: &protobuf::PhysicalExprNode,
3750 ctx: &TaskContext,
3751 input_schema: &Schema,
3752 codec: &dyn PhysicalExtensionCodec,
3753 ) -> Result<Arc<dyn PhysicalExpr>>;
3754
3755 fn physical_expr_to_proto(
3756 &self,
3757 expr: &Arc<dyn PhysicalExpr>,
3758 codec: &dyn PhysicalExtensionCodec,
3759 ) -> Result<protobuf::PhysicalExprNode>;
3760}
3761
3762#[derive(Clone, PartialEq, prost::Message)]
3765struct DataEncoderTuple {
3766 #[prost(uint32, tag = 1)]
3769 pub encoder_position: u32,
3770
3771 #[prost(bytes, tag = 2)]
3772 pub blob: Vec<u8>,
3773}
3774
3775pub struct DefaultPhysicalProtoConverter;
3776impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter {
3777 fn proto_to_execution_plan(
3778 &self,
3779 ctx: &TaskContext,
3780 codec: &dyn PhysicalExtensionCodec,
3781 proto: &protobuf::PhysicalPlanNode,
3782 ) -> Result<Arc<dyn ExecutionPlan>> {
3783 proto.try_into_physical_plan_with_converter(ctx, codec, self)
3784 }
3785
3786 fn execution_plan_to_proto(
3787 &self,
3788 plan: &Arc<dyn ExecutionPlan>,
3789 codec: &dyn PhysicalExtensionCodec,
3790 ) -> Result<protobuf::PhysicalPlanNode>
3791 where
3792 Self: Sized,
3793 {
3794 protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3795 Arc::clone(plan),
3796 codec,
3797 self,
3798 )
3799 }
3800
3801 fn proto_to_physical_expr(
3802 &self,
3803 proto: &protobuf::PhysicalExprNode,
3804 ctx: &TaskContext,
3805 input_schema: &Schema,
3806 codec: &dyn PhysicalExtensionCodec,
3807 ) -> Result<Arc<dyn PhysicalExpr>>
3808 where
3809 Self: Sized,
3810 {
3811 parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)
3813 }
3814
3815 fn physical_expr_to_proto(
3816 &self,
3817 expr: &Arc<dyn PhysicalExpr>,
3818 codec: &dyn PhysicalExtensionCodec,
3819 ) -> Result<protobuf::PhysicalExprNode> {
3820 serialize_physical_expr_with_converter(expr, codec, self)
3821 }
3822}
3823
3824struct DeduplicatingSerializer {
3827 session_id: u64,
3829}
3830
3831impl DeduplicatingSerializer {
3832 fn new() -> Self {
3833 Self {
3834 session_id: rand::random(),
3835 }
3836 }
3837}
3838
3839impl PhysicalProtoConverterExtension for DeduplicatingSerializer {
3840 fn proto_to_execution_plan(
3841 &self,
3842 _ctx: &TaskContext,
3843 _codec: &dyn PhysicalExtensionCodec,
3844 _proto: &protobuf::PhysicalPlanNode,
3845 ) -> Result<Arc<dyn ExecutionPlan>> {
3846 internal_err!("DeduplicatingSerializer cannot deserialize execution plans")
3847 }
3848
3849 fn execution_plan_to_proto(
3850 &self,
3851 plan: &Arc<dyn ExecutionPlan>,
3852 codec: &dyn PhysicalExtensionCodec,
3853 ) -> Result<protobuf::PhysicalPlanNode>
3854 where
3855 Self: Sized,
3856 {
3857 protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3858 Arc::clone(plan),
3859 codec,
3860 self,
3861 )
3862 }
3863
3864 fn proto_to_physical_expr(
3865 &self,
3866 _proto: &protobuf::PhysicalExprNode,
3867 _ctx: &TaskContext,
3868 _input_schema: &Schema,
3869 _codec: &dyn PhysicalExtensionCodec,
3870 ) -> Result<Arc<dyn PhysicalExpr>>
3871 where
3872 Self: Sized,
3873 {
3874 internal_err!("DeduplicatingSerializer cannot deserialize physical expressions")
3875 }
3876
3877 fn physical_expr_to_proto(
3878 &self,
3879 expr: &Arc<dyn PhysicalExpr>,
3880 codec: &dyn PhysicalExtensionCodec,
3881 ) -> Result<protobuf::PhysicalExprNode> {
3882 let mut proto = serialize_physical_expr_with_converter(expr, codec, self)?;
3883
3884 let mut hasher = DefaultHasher::new();
3889 self.session_id.hash(&mut hasher);
3890 (Arc::as_ptr(expr) as *const () as u64).hash(&mut hasher);
3891 std::process::id().hash(&mut hasher);
3892 proto.expr_id = Some(hasher.finish());
3893
3894 Ok(proto)
3895 }
3896}
3897
3898#[derive(Default)]
3901struct DeduplicatingDeserializer {
3902 cache: RefCell<HashMap<u64, Arc<dyn PhysicalExpr>>>,
3904}
3905
3906impl PhysicalProtoConverterExtension for DeduplicatingDeserializer {
3907 fn proto_to_execution_plan(
3908 &self,
3909 ctx: &TaskContext,
3910 codec: &dyn PhysicalExtensionCodec,
3911 proto: &protobuf::PhysicalPlanNode,
3912 ) -> Result<Arc<dyn ExecutionPlan>> {
3913 proto.try_into_physical_plan_with_converter(ctx, codec, self)
3914 }
3915
3916 fn execution_plan_to_proto(
3917 &self,
3918 _plan: &Arc<dyn ExecutionPlan>,
3919 _codec: &dyn PhysicalExtensionCodec,
3920 ) -> Result<protobuf::PhysicalPlanNode>
3921 where
3922 Self: Sized,
3923 {
3924 internal_err!("DeduplicatingDeserializer cannot serialize execution plans")
3925 }
3926
3927 fn proto_to_physical_expr(
3928 &self,
3929 proto: &protobuf::PhysicalExprNode,
3930 ctx: &TaskContext,
3931 input_schema: &Schema,
3932 codec: &dyn PhysicalExtensionCodec,
3933 ) -> Result<Arc<dyn PhysicalExpr>>
3934 where
3935 Self: Sized,
3936 {
3937 if let Some(expr_id) = proto.expr_id {
3938 if let Some(cached) = self.cache.borrow().get(&expr_id) {
3940 return Ok(Arc::clone(cached));
3941 }
3942 let expr = parse_physical_expr_with_converter(
3944 proto,
3945 ctx,
3946 input_schema,
3947 codec,
3948 self,
3949 )?;
3950 self.cache.borrow_mut().insert(expr_id, Arc::clone(&expr));
3951 Ok(expr)
3952 } else {
3953 parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)
3954 }
3955 }
3956
3957 fn physical_expr_to_proto(
3958 &self,
3959 _expr: &Arc<dyn PhysicalExpr>,
3960 _codec: &dyn PhysicalExtensionCodec,
3961 ) -> Result<protobuf::PhysicalExprNode> {
3962 internal_err!("DeduplicatingDeserializer cannot serialize physical expressions")
3963 }
3964}
3965
3966#[derive(Debug, Default, Clone, Copy)]
3982pub struct DeduplicatingProtoConverter {}
3983
3984impl PhysicalProtoConverterExtension for DeduplicatingProtoConverter {
3985 fn proto_to_execution_plan(
3986 &self,
3987 ctx: &TaskContext,
3988 codec: &dyn PhysicalExtensionCodec,
3989 proto: &protobuf::PhysicalPlanNode,
3990 ) -> Result<Arc<dyn ExecutionPlan>> {
3991 let deserializer = DeduplicatingDeserializer::default();
3992 proto.try_into_physical_plan_with_converter(ctx, codec, &deserializer)
3993 }
3994
3995 fn execution_plan_to_proto(
3996 &self,
3997 plan: &Arc<dyn ExecutionPlan>,
3998 codec: &dyn PhysicalExtensionCodec,
3999 ) -> Result<protobuf::PhysicalPlanNode>
4000 where
4001 Self: Sized,
4002 {
4003 let serializer = DeduplicatingSerializer::new();
4004 protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
4005 Arc::clone(plan),
4006 codec,
4007 &serializer,
4008 )
4009 }
4010
4011 fn proto_to_physical_expr(
4012 &self,
4013 proto: &protobuf::PhysicalExprNode,
4014 ctx: &TaskContext,
4015 input_schema: &Schema,
4016 codec: &dyn PhysicalExtensionCodec,
4017 ) -> Result<Arc<dyn PhysicalExpr>>
4018 where
4019 Self: Sized,
4020 {
4021 let deserializer = DeduplicatingDeserializer::default();
4022 deserializer.proto_to_physical_expr(proto, ctx, input_schema, codec)
4023 }
4024
4025 fn physical_expr_to_proto(
4026 &self,
4027 expr: &Arc<dyn PhysicalExpr>,
4028 codec: &dyn PhysicalExtensionCodec,
4029 ) -> Result<protobuf::PhysicalExprNode> {
4030 let serializer = DeduplicatingSerializer::new();
4031 serializer.physical_expr_to_proto(expr, codec)
4032 }
4033}
4034
4035#[derive(Debug)]
4038pub struct ComposedPhysicalExtensionCodec {
4039 codecs: Vec<Arc<dyn PhysicalExtensionCodec>>,
4040}
4041
4042impl ComposedPhysicalExtensionCodec {
4043 pub fn new(codecs: Vec<Arc<dyn PhysicalExtensionCodec>>) -> Self {
4046 Self { codecs }
4047 }
4048
4049 fn decode_protobuf<R>(
4050 &self,
4051 buf: &[u8],
4052 decode: impl FnOnce(&dyn PhysicalExtensionCodec, &[u8]) -> Result<R>,
4053 ) -> Result<R> {
4054 let proto =
4055 DataEncoderTuple::decode(buf).map_err(|e| internal_datafusion_err!("{e}"))?;
4056
4057 let codec = self.codecs.get(proto.encoder_position as usize).ok_or(
4058 internal_datafusion_err!("Can't find required codec in codec list"),
4059 )?;
4060
4061 decode(codec.as_ref(), &proto.blob)
4062 }
4063
4064 fn encode_protobuf(
4065 &self,
4066 buf: &mut Vec<u8>,
4067 mut encode: impl FnMut(&dyn PhysicalExtensionCodec, &mut Vec<u8>) -> Result<()>,
4068 ) -> Result<()> {
4069 let mut data = vec![];
4070 let mut last_err = None;
4071 let mut encoder_position = None;
4072
4073 for (position, codec) in self.codecs.iter().enumerate() {
4075 match encode(codec.as_ref(), &mut data) {
4076 Ok(_) => {
4077 encoder_position = Some(position as u32);
4078 break;
4079 }
4080 Err(err) => last_err = Some(err),
4081 }
4082 }
4083
4084 let encoder_position = encoder_position.ok_or_else(|| {
4085 last_err.unwrap_or_else(|| {
4086 DataFusionError::NotImplemented(
4087 "Empty list of composed codecs".to_owned(),
4088 )
4089 })
4090 })?;
4091
4092 let proto = DataEncoderTuple {
4094 encoder_position,
4095 blob: data,
4096 };
4097 proto
4098 .encode(buf)
4099 .map_err(|e| internal_datafusion_err!("{e}"))
4100 }
4101}
4102
4103impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec {
4104 fn try_decode(
4105 &self,
4106 buf: &[u8],
4107 inputs: &[Arc<dyn ExecutionPlan>],
4108 ctx: &TaskContext,
4109 ) -> Result<Arc<dyn ExecutionPlan>> {
4110 self.decode_protobuf(buf, |codec, data| codec.try_decode(data, inputs, ctx))
4111 }
4112
4113 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
4114 self.encode_protobuf(buf, |codec, data| codec.try_encode(Arc::clone(&node), data))
4115 }
4116
4117 fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
4118 self.decode_protobuf(buf, |codec, data| codec.try_decode_udf(name, data))
4119 }
4120
4121 fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
4122 self.encode_protobuf(buf, |codec, data| codec.try_encode_udf(node, data))
4123 }
4124
4125 fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
4126 self.decode_protobuf(buf, |codec, data| codec.try_decode_udaf(name, data))
4127 }
4128
4129 fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
4130 self.encode_protobuf(buf, |codec, data| codec.try_encode_udaf(node, data))
4131 }
4132}
4133
4134fn into_physical_plan(
4135 node: &Option<Box<protobuf::PhysicalPlanNode>>,
4136 ctx: &TaskContext,
4137 codec: &dyn PhysicalExtensionCodec,
4138 proto_converter: &dyn PhysicalProtoConverterExtension,
4139) -> Result<Arc<dyn ExecutionPlan>> {
4140 if let Some(field) = node {
4141 proto_converter.proto_to_execution_plan(ctx, codec, field)
4142 } else {
4143 Err(proto_error("Missing required field in protobuf"))
4144 }
4145}