1use std::sync::Arc;
19
20use arrow::array::RecordBatch;
21use arrow::datatypes::Schema;
22use arrow::ipc::writer::StreamWriter;
23use datafusion_common::{
24 DataFusionError, Result, internal_datafusion_err, internal_err, not_impl_err,
25};
26use datafusion_datasource::file_scan_config::FileScanConfig;
27use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
28use datafusion_datasource::{FileRange, PartitionedFile};
29use datafusion_datasource_csv::file_format::CsvSink;
30use datafusion_datasource_json::file_format::JsonSink;
31#[cfg(feature = "parquet")]
32use datafusion_datasource_parquet::file_format::ParquetSink;
33use datafusion_expr::WindowFrame;
34use datafusion_physical_expr::ScalarFunctionExpr;
35use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr;
36use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
37use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
38use datafusion_physical_plan::expressions::{
39 BinaryExpr, CaseExpr, CastExpr, Column, DynamicFilterPhysicalExpr, InListExpr,
40 IsNotNullExpr, IsNullExpr, LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr,
41 UnKnownColumn,
42};
43use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr};
44use datafusion_physical_plan::udaf::AggregateFunctionExpr;
45use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
46use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
47
48use super::{
49 DefaultPhysicalProtoConverter, PhysicalExtensionCodec,
50 PhysicalProtoConverterExtension, encode_human_display_alias,
51};
52use crate::protobuf::{
53 self, PhysicalSortExprNode, PhysicalSortExprNodeCollection,
54 physical_aggregate_expr_node, physical_window_expr_node,
55};
56
57#[expect(clippy::needless_pass_by_value)]
58pub fn serialize_physical_aggr_expr(
59 aggr_expr: Arc<AggregateFunctionExpr>,
60 codec: &dyn PhysicalExtensionCodec,
61 proto_converter: &dyn PhysicalProtoConverterExtension,
62) -> Result<protobuf::PhysicalExprNode> {
63 let expressions =
64 serialize_physical_exprs(&aggr_expr.expressions(), codec, proto_converter)?;
65 let order_bys = serialize_physical_sort_exprs(
66 aggr_expr.order_bys().iter().cloned(),
67 codec,
68 proto_converter,
69 )?;
70
71 let name = aggr_expr.fun().name().to_string();
72 let mut buf = Vec::new();
73 codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
74 let human_display = match (aggr_expr.human_display(), aggr_expr.human_display_alias())
75 {
76 (Some(display), Some(alias)) => encode_human_display_alias(display, alias),
77 (Some(display), None) => display.to_string(),
78 (None, _) => String::new(),
79 };
80 Ok(protobuf::PhysicalExprNode {
81 expr_id: None,
82 expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
83 protobuf::PhysicalAggregateExprNode {
84 aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)),
85 expr: expressions,
86 ordering_req: order_bys,
87 distinct: aggr_expr.is_distinct(),
88 ignore_nulls: aggr_expr.ignore_nulls(),
89 fun_definition: (!buf.is_empty()).then_some(buf),
90 human_display,
91 },
92 )),
93 })
94}
95
96fn serialize_physical_window_aggr_expr(
97 aggr_expr: &AggregateFunctionExpr,
98 _window_frame: &WindowFrame,
99 codec: &dyn PhysicalExtensionCodec,
100) -> Result<(physical_window_expr_node::WindowFunction, Option<Vec<u8>>)> {
101 let mut buf = Vec::new();
104 codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
105 Ok((
106 physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(
107 aggr_expr.fun().name().to_string(),
108 ),
109 (!buf.is_empty()).then_some(buf),
110 ))
111}
112
113pub fn serialize_physical_window_expr(
114 window_expr: &Arc<dyn WindowExpr>,
115 codec: &dyn PhysicalExtensionCodec,
116 proto_converter: &dyn PhysicalProtoConverterExtension,
117) -> Result<protobuf::PhysicalWindowExprNode> {
118 let expr = window_expr.as_any();
119 let mut args = window_expr.expressions().to_vec();
120 let window_frame = window_expr.get_window_frame();
121
122 let (window_function, fun_definition, ignore_nulls, distinct) =
123 if let Some(plain_aggr_window_expr) =
124 expr.downcast_ref::<PlainAggregateWindowExpr>()
125 {
126 let aggr_expr = plain_aggr_window_expr.get_aggregate_expr();
127 let (window_function, fun_definition) =
128 serialize_physical_window_aggr_expr(aggr_expr, window_frame, codec)?;
129 (
130 window_function,
131 fun_definition,
132 aggr_expr.ignore_nulls(),
133 aggr_expr.is_distinct(),
134 )
135 } else if let Some(sliding_aggr_window_expr) =
136 expr.downcast_ref::<SlidingAggregateWindowExpr>()
137 {
138 let aggr_expr = sliding_aggr_window_expr.get_aggregate_expr();
139 let (window_function, fun_definition) =
140 serialize_physical_window_aggr_expr(aggr_expr, window_frame, codec)?;
141 (
142 window_function,
143 fun_definition,
144 aggr_expr.ignore_nulls(),
145 aggr_expr.is_distinct(),
146 )
147 } else if let Some(udf_window_expr) = expr.downcast_ref::<StandardWindowExpr>() {
148 if let Some(expr) = udf_window_expr
149 .get_standard_func_expr()
150 .as_any()
151 .downcast_ref::<WindowUDFExpr>()
152 {
153 let mut buf = Vec::new();
154 codec.try_encode_udwf(expr.fun(), &mut buf)?;
155 args = expr.args().to_vec();
156 (
157 physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(
158 expr.fun().name().to_string(),
159 ),
160 (!buf.is_empty()).then_some(buf),
161 false, false,
163 )
164 } else {
165 return not_impl_err!(
166 "User-defined window function not supported: {window_expr:?}"
167 );
168 }
169 } else {
170 return not_impl_err!("WindowExpr not supported: {window_expr:?}");
171 };
172
173 let args = serialize_physical_exprs(&args, codec, proto_converter)?;
174 let partition_by =
175 serialize_physical_exprs(window_expr.partition_by(), codec, proto_converter)?;
176 let order_by = serialize_physical_sort_exprs(
177 window_expr.order_by().to_vec(),
178 codec,
179 proto_converter,
180 )?;
181 let window_frame: protobuf::WindowFrame = window_frame
182 .as_ref()
183 .try_into()
184 .map_err(|e| internal_datafusion_err!("{e}"))?;
185
186 Ok(protobuf::PhysicalWindowExprNode {
187 args,
188 partition_by,
189 order_by,
190 window_frame: Some(window_frame),
191 window_function: Some(window_function),
192 name: window_expr.name().to_string(),
193 fun_definition,
194 ignore_nulls,
195 distinct,
196 })
197}
198
199pub fn serialize_physical_sort_exprs<I>(
200 sort_exprs: I,
201 codec: &dyn PhysicalExtensionCodec,
202 proto_converter: &dyn PhysicalProtoConverterExtension,
203) -> Result<Vec<PhysicalSortExprNode>>
204where
205 I: IntoIterator<Item = PhysicalSortExpr>,
206{
207 sort_exprs
208 .into_iter()
209 .map(|sort_expr| serialize_physical_sort_expr(sort_expr, codec, proto_converter))
210 .collect()
211}
212
213pub fn serialize_physical_sort_expr(
214 sort_expr: PhysicalSortExpr,
215 codec: &dyn PhysicalExtensionCodec,
216 proto_converter: &dyn PhysicalProtoConverterExtension,
217) -> Result<PhysicalSortExprNode> {
218 let PhysicalSortExpr { expr, options } = sort_expr;
219 let expr = proto_converter.physical_expr_to_proto(&expr, codec)?;
220 Ok(PhysicalSortExprNode {
221 expr: Some(Box::new(expr)),
222 asc: !options.descending,
223 nulls_first: options.nulls_first,
224 })
225}
226
227pub fn serialize_physical_exprs<'a, I>(
228 values: I,
229 codec: &dyn PhysicalExtensionCodec,
230 proto_converter: &dyn PhysicalProtoConverterExtension,
231) -> Result<Vec<protobuf::PhysicalExprNode>>
232where
233 I: IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
234{
235 values
236 .into_iter()
237 .map(|value| proto_converter.physical_expr_to_proto(value, codec))
238 .collect()
239}
240
241pub fn serialize_physical_expr(
246 value: &Arc<dyn PhysicalExpr>,
247 codec: &dyn PhysicalExtensionCodec,
248) -> Result<protobuf::PhysicalExprNode> {
249 serialize_physical_expr_with_converter(
250 value,
251 codec,
252 &DefaultPhysicalProtoConverter {},
253 )
254}
255
256pub fn serialize_physical_expr_with_converter(
263 value: &Arc<dyn PhysicalExpr>,
264 codec: &dyn PhysicalExtensionCodec,
265 proto_converter: &dyn PhysicalProtoConverterExtension,
266) -> Result<protobuf::PhysicalExprNode> {
267 let expr = value.as_ref();
268 let expr_id = value.expression_id();
269 if expr.downcast_ref::<HashTableLookupExpr>().is_some() {
283 let value = datafusion_proto_common::ScalarValue {
284 value: Some(datafusion_proto_common::scalar_value::Value::BoolValue(
285 true,
286 )),
287 };
288 return Ok(protobuf::PhysicalExprNode {
289 expr_id,
290 expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(value)),
291 });
292 }
293
294 if let Some(expr) = expr.downcast_ref::<Column>() {
295 Ok(protobuf::PhysicalExprNode {
296 expr_id,
297 expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
298 protobuf::PhysicalColumn {
299 name: expr.name().to_string(),
300 index: expr.index() as u32,
301 },
302 )),
303 })
304 } else if let Some(expr) = expr.downcast_ref::<UnKnownColumn>() {
305 Ok(protobuf::PhysicalExprNode {
306 expr_id,
307 expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn(
308 protobuf::UnknownColumn {
309 name: expr.name().to_string(),
310 },
311 )),
312 })
313 } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
314 let op = expr.op();
317 let mut operand_refs: Vec<&Arc<dyn PhysicalExpr>> = vec![expr.right()];
318 let mut current_expr: &BinaryExpr = expr;
319 loop {
320 match current_expr.left().downcast_ref::<BinaryExpr>() {
321 Some(bin) if bin.op() == op => {
322 operand_refs.push(bin.right());
323 current_expr = bin;
324 }
325 _ => {
326 operand_refs.push(current_expr.left());
327 break;
328 }
329 }
330 }
331
332 operand_refs.reverse();
334
335 let operands = operand_refs
336 .iter()
337 .map(|e| proto_converter.physical_expr_to_proto(e, codec))
338 .collect::<Result<Vec<_>>>()?;
339
340 let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
341 l: None,
342 r: None,
343 op: format!("{:?}", op),
344 operands,
345 });
346
347 Ok(protobuf::PhysicalExprNode {
348 expr_id,
349 expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
350 binary_expr,
351 )),
352 })
353 } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
354 Ok(protobuf::PhysicalExprNode {
355 expr_id,
356 expr_type: Some(
357 protobuf::physical_expr_node::ExprType::Case(
358 Box::new(
359 protobuf::PhysicalCaseNode {
360 expr: expr
361 .expr()
362 .map(|exp| {
363 proto_converter
364 .physical_expr_to_proto(exp, codec)
365 .map(Box::new)
366 })
367 .transpose()?,
368 when_then_expr: expr
369 .when_then_expr()
370 .iter()
371 .map(|(when_expr, then_expr)| {
372 serialize_when_then_expr(
373 when_expr,
374 then_expr,
375 codec,
376 proto_converter,
377 )
378 })
379 .collect::<Result<
380 Vec<protobuf::PhysicalWhenThen>,
381 DataFusionError,
382 >>()?,
383 else_expr: expr
384 .else_expr()
385 .map(|a| {
386 proto_converter
387 .physical_expr_to_proto(a, codec)
388 .map(Box::new)
389 })
390 .transpose()?,
391 },
392 ),
393 ),
394 ),
395 })
396 } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
397 Ok(protobuf::PhysicalExprNode {
398 expr_id,
399 expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
400 protobuf::PhysicalNot {
401 expr: Some(Box::new(
402 proto_converter.physical_expr_to_proto(expr.arg(), codec)?,
403 )),
404 },
405 ))),
406 })
407 } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
408 Ok(protobuf::PhysicalExprNode {
409 expr_id,
410 expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
411 Box::new(protobuf::PhysicalIsNull {
412 expr: Some(Box::new(
413 proto_converter.physical_expr_to_proto(expr.arg(), codec)?,
414 )),
415 }),
416 )),
417 })
418 } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
419 Ok(protobuf::PhysicalExprNode {
420 expr_id,
421 expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
422 Box::new(protobuf::PhysicalIsNotNull {
423 expr: Some(Box::new(
424 proto_converter.physical_expr_to_proto(expr.arg(), codec)?,
425 )),
426 }),
427 )),
428 })
429 } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
430 Ok(protobuf::PhysicalExprNode {
431 expr_id,
432 expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new(
433 protobuf::PhysicalInListNode {
434 expr: Some(Box::new(
435 proto_converter.physical_expr_to_proto(expr.expr(), codec)?,
436 )),
437 list: serialize_physical_exprs(expr.list(), codec, proto_converter)?,
438 negated: expr.negated(),
439 },
440 ))),
441 })
442 } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
443 Ok(protobuf::PhysicalExprNode {
444 expr_id,
445 expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
446 protobuf::PhysicalNegativeNode {
447 expr: Some(Box::new(
448 proto_converter.physical_expr_to_proto(expr.arg(), codec)?,
449 )),
450 },
451 ))),
452 })
453 } else if let Some(lit) = expr.downcast_ref::<Literal>() {
454 Ok(protobuf::PhysicalExprNode {
455 expr_id,
456 expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
457 lit.value().try_into()?,
458 )),
459 })
460 } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
461 Ok(protobuf::PhysicalExprNode {
462 expr_id,
463 expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
464 protobuf::PhysicalCastNode {
465 expr: Some(Box::new(
466 proto_converter.physical_expr_to_proto(cast.expr(), codec)?,
467 )),
468 arrow_type: Some(cast.cast_type().try_into()?),
469 },
470 ))),
471 })
472 } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
473 Ok(protobuf::PhysicalExprNode {
474 expr_id,
475 expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new(
476 protobuf::PhysicalTryCastNode {
477 expr: Some(Box::new(
478 proto_converter.physical_expr_to_proto(cast.expr(), codec)?,
479 )),
480 arrow_type: Some(cast.cast_type().try_into()?),
481 },
482 ))),
483 })
484 } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
485 let mut buf = Vec::new();
486 codec.try_encode_udf(expr.fun(), &mut buf)?;
487 Ok(protobuf::PhysicalExprNode {
488 expr_id,
489 expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf(
490 protobuf::PhysicalScalarUdfNode {
491 name: expr.name().to_string(),
492 args: serialize_physical_exprs(expr.args(), codec, proto_converter)?,
493 fun_definition: (!buf.is_empty()).then_some(buf),
494 return_type: Some(expr.return_type().try_into()?),
495 nullable: expr.nullable(),
496 return_field_name: expr
497 .return_field(&Schema::empty())?
498 .name()
499 .to_string(),
500 },
501 )),
502 })
503 } else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
504 Ok(protobuf::PhysicalExprNode {
505 expr_id,
506 expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new(
507 protobuf::PhysicalLikeExprNode {
508 negated: expr.negated(),
509 case_insensitive: expr.case_insensitive(),
510 expr: Some(Box::new(
511 proto_converter.physical_expr_to_proto(expr.expr(), codec)?,
512 )),
513 pattern: Some(Box::new(
514 proto_converter.physical_expr_to_proto(expr.pattern(), codec)?,
515 )),
516 },
517 ))),
518 })
519 } else if let Some(expr) = expr.downcast_ref::<HashExpr>() {
520 Ok(protobuf::PhysicalExprNode {
521 expr_id,
522 expr_type: Some(protobuf::physical_expr_node::ExprType::HashExpr(
523 protobuf::PhysicalHashExprNode {
524 on_columns: serialize_physical_exprs(
525 expr.on_columns(),
526 codec,
527 proto_converter,
528 )?,
529 seed0: expr.seed(),
530 description: expr.description().to_string(),
531 },
532 )),
533 })
534 } else if let Some(expr) = expr.downcast_ref::<ScalarSubqueryExpr>() {
535 Ok(protobuf::PhysicalExprNode {
536 expr_id,
537 expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarSubquery(
538 protobuf::PhysicalScalarSubqueryExprNode {
539 data_type: Some(expr.data_type().try_into()?),
540 nullable: expr.nullable(),
541 index: expr.index().as_usize() as u32,
542 },
543 )),
544 })
545 } else if let Some(df) = expr.downcast_ref::<DynamicFilterPhysicalExpr>() {
546 let children = df
547 .original_children()
548 .iter()
549 .map(|child| proto_converter.physical_expr_to_proto(child, codec))
550 .collect::<Result<Vec<_>>>()?;
551
552 let remapped_children = if let Some(remapped) = df.remapped_children() {
553 remapped
554 .iter()
555 .map(|child| proto_converter.physical_expr_to_proto(child, codec))
556 .collect::<Result<Vec<_>>>()?
557 } else {
558 vec![]
559 };
560
561 let inner = df.inner();
563 let inner_expr =
564 Box::new(proto_converter.physical_expr_to_proto(&inner.expr, codec)?);
565
566 Ok(protobuf::PhysicalExprNode {
567 expr_id,
568 expr_type: Some(protobuf::physical_expr_node::ExprType::DynamicFilter(
569 Box::new(protobuf::PhysicalDynamicFilterNode {
570 children,
571 remapped_children,
572 generation: inner.generation,
573 inner_expr: Some(inner_expr),
574 is_complete: inner.is_complete,
575 }),
576 )),
577 })
578 } else {
579 let mut buf: Vec<u8> = vec![];
580 match codec.try_encode_expr(value, &mut buf) {
581 Ok(_) => {
582 let inputs: Vec<protobuf::PhysicalExprNode> = value
583 .children()
584 .into_iter()
585 .map(|e| proto_converter.physical_expr_to_proto(e, codec))
586 .collect::<Result<_>>()?;
587 Ok(protobuf::PhysicalExprNode {
588 expr_id,
589 expr_type: Some(protobuf::physical_expr_node::ExprType::Extension(
590 protobuf::PhysicalExtensionExprNode { expr: buf, inputs },
591 )),
592 })
593 }
594 Err(e) => internal_err!(
595 "Unsupported physical expr and extension codec failed with [{e}]. Expr: {value:?}"
596 ),
597 }
598 }
599}
600
601pub fn serialize_partitioning(
602 partitioning: &Partitioning,
603 codec: &dyn PhysicalExtensionCodec,
604 proto_converter: &dyn PhysicalProtoConverterExtension,
605) -> Result<protobuf::Partitioning> {
606 let serialized_partitioning = match partitioning {
607 Partitioning::RoundRobinBatch(partition_count) => protobuf::Partitioning {
608 partition_method: Some(protobuf::partitioning::PartitionMethod::RoundRobin(
609 *partition_count as u64,
610 )),
611 },
612 Partitioning::Hash(exprs, partition_count) => {
613 let serialized_exprs =
614 serialize_physical_exprs(exprs, codec, proto_converter)?;
615 protobuf::Partitioning {
616 partition_method: Some(protobuf::partitioning::PartitionMethod::Hash(
617 protobuf::PhysicalHashRepartition {
618 hash_expr: serialized_exprs,
619 partition_count: *partition_count as u64,
620 },
621 )),
622 }
623 }
624 Partitioning::UnknownPartitioning(partition_count) => protobuf::Partitioning {
625 partition_method: Some(protobuf::partitioning::PartitionMethod::Unknown(
626 *partition_count as u64,
627 )),
628 },
629 };
630 Ok(serialized_partitioning)
631}
632
633fn serialize_when_then_expr(
634 when_expr: &Arc<dyn PhysicalExpr>,
635 then_expr: &Arc<dyn PhysicalExpr>,
636 codec: &dyn PhysicalExtensionCodec,
637 proto_converter: &dyn PhysicalProtoConverterExtension,
638) -> Result<protobuf::PhysicalWhenThen> {
639 Ok(protobuf::PhysicalWhenThen {
640 when_expr: Some(proto_converter.physical_expr_to_proto(when_expr, codec)?),
641 then_expr: Some(proto_converter.physical_expr_to_proto(then_expr, codec)?),
642 })
643}
644
645impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile {
646 type Error = DataFusionError;
647
648 fn try_from(pf: &PartitionedFile) -> Result<Self> {
649 let last_modified = pf.object_meta.last_modified;
650 let last_modified_ns = last_modified.timestamp_nanos_opt().ok_or_else(|| {
651 DataFusionError::Plan(format!(
652 "Invalid timestamp on PartitionedFile::ObjectMeta: {last_modified}"
653 ))
654 })? as u64;
655 Ok(protobuf::PartitionedFile {
656 path: pf.object_meta.location.as_ref().to_owned(),
657 size: pf.object_meta.size,
658 last_modified_ns,
659 partition_values: pf
660 .partition_values
661 .iter()
662 .map(|v| v.try_into())
663 .collect::<Result<Vec<_>, _>>()?,
664 range: pf.range.as_ref().map(|r| r.try_into()).transpose()?,
665 statistics: pf.statistics.as_ref().map(|s| s.as_ref().into()),
666 })
667 }
668}
669
670impl TryFrom<&FileRange> for protobuf::FileRange {
671 type Error = DataFusionError;
672
673 fn try_from(value: &FileRange) -> Result<Self> {
674 Ok(protobuf::FileRange {
675 start: value.start,
676 end: value.end,
677 })
678 }
679}
680
681impl TryFrom<&[PartitionedFile]> for protobuf::FileGroup {
682 type Error = DataFusionError;
683
684 fn try_from(gr: &[PartitionedFile]) -> Result<Self, Self::Error> {
685 Ok(protobuf::FileGroup {
686 files: gr
687 .iter()
688 .map(|f| f.try_into())
689 .collect::<Result<Vec<_>, _>>()?,
690 })
691 }
692}
693
694pub fn serialize_file_scan_config(
695 conf: &FileScanConfig,
696 codec: &dyn PhysicalExtensionCodec,
697 proto_converter: &dyn PhysicalProtoConverterExtension,
698) -> Result<protobuf::FileScanExecConf> {
699 let file_groups = conf
700 .file_groups
701 .iter()
702 .map(|p| p.files().try_into())
703 .collect::<Result<Vec<_>, _>>()?;
704
705 let mut output_orderings = vec![];
706 for order in &conf.output_ordering {
707 let ordering =
708 serialize_physical_sort_exprs(order.to_vec(), codec, proto_converter)?;
709 output_orderings.push(ordering)
710 }
711
712 let mut fields = conf
715 .file_schema()
716 .fields()
717 .iter()
718 .cloned()
719 .collect::<Vec<_>>();
720 fields.extend(conf.table_partition_cols().iter().cloned());
721
722 let schema = Arc::new(
723 Schema::new(fields.clone()).with_metadata(conf.file_schema().metadata.clone()),
724 );
725
726 let projection_exprs = conf
727 .file_source
728 .projection()
729 .as_ref()
730 .map(|projection_exprs| {
731 let projections = projection_exprs.iter().cloned().collect::<Vec<_>>();
732 Ok::<_, DataFusionError>(protobuf::ProjectionExprs {
733 projections: projections
734 .into_iter()
735 .map(|expr| {
736 Ok(protobuf::ProjectionExpr {
737 alias: expr.alias.to_string(),
738 expr: Some(
739 proto_converter
740 .physical_expr_to_proto(&expr.expr, codec)?,
741 ),
742 })
743 })
744 .collect::<Result<Vec<_>>>()?,
745 })
746 })
747 .transpose()?;
748
749 Ok(protobuf::FileScanExecConf {
750 file_groups,
751 statistics: Some((&conf.statistics()).into()),
752 limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }),
753 projection: vec![],
754 schema: Some(schema.as_ref().try_into()?),
755 table_partition_cols: conf
756 .table_partition_cols()
757 .iter()
758 .map(|x| x.name().clone())
759 .collect::<Vec<_>>(),
760 object_store_url: conf.object_store_url.to_string(),
761 output_ordering: output_orderings
762 .into_iter()
763 .map(|e| PhysicalSortExprNodeCollection {
764 physical_sort_expr_nodes: e,
765 })
766 .collect::<Vec<_>>(),
767 constraints: Some(conf.constraints.clone().into()),
768 batch_size: conf.batch_size.map(|s| s as u64),
769 projection_exprs,
770 })
771}
772
773pub fn serialize_maybe_filter(
774 expr: Option<Arc<dyn PhysicalExpr>>,
775 codec: &dyn PhysicalExtensionCodec,
776 proto_converter: &dyn PhysicalProtoConverterExtension,
777) -> Result<protobuf::MaybeFilter> {
778 match expr {
779 None => Ok(protobuf::MaybeFilter { expr: None }),
780 Some(expr) => Ok(protobuf::MaybeFilter {
781 expr: Some(proto_converter.physical_expr_to_proto(&expr, codec)?),
782 }),
783 }
784}
785
786pub fn serialize_record_batches(batches: &[RecordBatch]) -> Result<Vec<u8>> {
787 if batches.is_empty() {
788 return Ok(vec![]);
789 }
790 let schema = batches[0].schema();
791 let mut buf = Vec::new();
792 let mut writer = StreamWriter::try_new(&mut buf, &schema)?;
793 for batch in batches {
794 writer.write(batch)?;
795 }
796 writer.finish()?;
797 Ok(buf)
798}
799
800impl TryFrom<&JsonSink> for protobuf::JsonSink {
801 type Error = DataFusionError;
802
803 fn try_from(value: &JsonSink) -> Result<Self, Self::Error> {
804 Ok(Self {
805 config: Some(value.config().try_into()?),
806 writer_options: Some(value.writer_options().try_into()?),
807 })
808 }
809}
810
811impl TryFrom<&CsvSink> for protobuf::CsvSink {
812 type Error = DataFusionError;
813
814 fn try_from(value: &CsvSink) -> Result<Self, Self::Error> {
815 Ok(Self {
816 config: Some(value.config().try_into()?),
817 writer_options: Some(value.writer_options().try_into()?),
818 })
819 }
820}
821
822#[cfg(feature = "parquet")]
823impl TryFrom<&ParquetSink> for protobuf::ParquetSink {
824 type Error = DataFusionError;
825
826 fn try_from(value: &ParquetSink) -> Result<Self, Self::Error> {
827 Ok(Self {
828 config: Some(value.config().try_into()?),
829 parquet_options: Some(value.parquet_options().try_into()?),
830 })
831 }
832}
833
834impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
835 type Error = DataFusionError;
836
837 fn try_from(conf: &FileSinkConfig) -> Result<Self, Self::Error> {
838 let file_groups = conf
839 .file_group
840 .iter()
841 .map(TryInto::try_into)
842 .collect::<Result<Vec<_>>>()?;
843 let table_paths = conf
844 .table_paths
845 .iter()
846 .map(ToString::to_string)
847 .collect::<Vec<_>>();
848 let table_partition_cols = conf
849 .table_partition_cols
850 .iter()
851 .map(|(name, data_type)| {
852 Ok(protobuf::PartitionColumn {
853 name: name.to_owned(),
854 arrow_type: Some(data_type.try_into()?),
855 })
856 })
857 .collect::<Result<Vec<_>>>()?;
858 let file_output_mode = match conf.file_output_mode {
859 datafusion_datasource::file_sink_config::FileOutputMode::Automatic => {
860 protobuf::FileOutputMode::Automatic
861 }
862 datafusion_datasource::file_sink_config::FileOutputMode::SingleFile => {
863 protobuf::FileOutputMode::SingleFile
864 }
865 datafusion_datasource::file_sink_config::FileOutputMode::Directory => {
866 protobuf::FileOutputMode::Directory
867 }
868 };
869 Ok(Self {
870 object_store_url: conf.object_store_url.to_string(),
871 file_groups,
872 table_paths,
873 output_schema: Some(conf.output_schema.as_ref().try_into()?),
874 table_partition_cols,
875 keep_partition_by_columns: conf.keep_partition_by_columns,
876 insert_op: conf.insert_op as i32,
877 file_extension: conf.file_extension.to_string(),
878 file_output_mode: file_output_mode.into(),
879 })
880 }
881}