1use std::sync::Arc;
19
20use arrow::array::RecordBatch;
21use arrow::datatypes::Schema;
22use arrow::ipc::writer::StreamWriter;
23use datafusion_common::{
24 DataFusionError, Result, internal_datafusion_err, internal_err, not_impl_err,
25};
26use datafusion_datasource::file_scan_config::FileScanConfig;
27use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
28use datafusion_datasource::{FileRange, PartitionedFile};
29use datafusion_datasource_csv::file_format::CsvSink;
30use datafusion_datasource_json::file_format::JsonSink;
31#[cfg(feature = "parquet")]
32use datafusion_datasource_parquet::file_format::ParquetSink;
33use datafusion_expr::WindowFrame;
34use datafusion_physical_expr::ScalarFunctionExpr;
35use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
36use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr;
37use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
38use datafusion_physical_plan::expressions::{
39 BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
40 LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
41};
42use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr};
43use datafusion_physical_plan::udaf::AggregateFunctionExpr;
44use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
45use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
46
47use super::{
48 DefaultPhysicalProtoConverter, PhysicalExtensionCodec,
49 PhysicalProtoConverterExtension,
50};
51use crate::protobuf::{
52 self, PhysicalSortExprNode, PhysicalSortExprNodeCollection,
53 physical_aggregate_expr_node, physical_window_expr_node,
54};
55
56#[expect(clippy::needless_pass_by_value)]
57pub fn serialize_physical_aggr_expr(
58 aggr_expr: Arc<AggregateFunctionExpr>,
59 codec: &dyn PhysicalExtensionCodec,
60 proto_converter: &dyn PhysicalProtoConverterExtension,
61) -> Result<protobuf::PhysicalExprNode> {
62 let expressions =
63 serialize_physical_exprs(&aggr_expr.expressions(), codec, proto_converter)?;
64 let order_bys = serialize_physical_sort_exprs(
65 aggr_expr.order_bys().iter().cloned(),
66 codec,
67 proto_converter,
68 )?;
69
70 let name = aggr_expr.fun().name().to_string();
71 let mut buf = Vec::new();
72 codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
73 Ok(protobuf::PhysicalExprNode {
74 expr_id: None,
75 expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
76 protobuf::PhysicalAggregateExprNode {
77 aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)),
78 expr: expressions,
79 ordering_req: order_bys,
80 distinct: aggr_expr.is_distinct(),
81 ignore_nulls: aggr_expr.ignore_nulls(),
82 fun_definition: (!buf.is_empty()).then_some(buf),
83 human_display: aggr_expr.human_display().to_string(),
84 },
85 )),
86 })
87}
88
89fn serialize_physical_window_aggr_expr(
90 aggr_expr: &AggregateFunctionExpr,
91 _window_frame: &WindowFrame,
92 codec: &dyn PhysicalExtensionCodec,
93) -> Result<(physical_window_expr_node::WindowFunction, Option<Vec<u8>>)> {
94 let mut buf = Vec::new();
97 codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
98 Ok((
99 physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(
100 aggr_expr.fun().name().to_string(),
101 ),
102 (!buf.is_empty()).then_some(buf),
103 ))
104}
105
106pub fn serialize_physical_window_expr(
107 window_expr: &Arc<dyn WindowExpr>,
108 codec: &dyn PhysicalExtensionCodec,
109 proto_converter: &dyn PhysicalProtoConverterExtension,
110) -> Result<protobuf::PhysicalWindowExprNode> {
111 let expr = window_expr.as_any();
112 let mut args = window_expr.expressions().to_vec();
113 let window_frame = window_expr.get_window_frame();
114
115 let (window_function, fun_definition, ignore_nulls, distinct) =
116 if let Some(plain_aggr_window_expr) =
117 expr.downcast_ref::<PlainAggregateWindowExpr>()
118 {
119 let aggr_expr = plain_aggr_window_expr.get_aggregate_expr();
120 let (window_function, fun_definition) =
121 serialize_physical_window_aggr_expr(aggr_expr, window_frame, codec)?;
122 (
123 window_function,
124 fun_definition,
125 aggr_expr.ignore_nulls(),
126 aggr_expr.is_distinct(),
127 )
128 } else if let Some(sliding_aggr_window_expr) =
129 expr.downcast_ref::<SlidingAggregateWindowExpr>()
130 {
131 let aggr_expr = sliding_aggr_window_expr.get_aggregate_expr();
132 let (window_function, fun_definition) =
133 serialize_physical_window_aggr_expr(aggr_expr, window_frame, codec)?;
134 (
135 window_function,
136 fun_definition,
137 aggr_expr.ignore_nulls(),
138 aggr_expr.is_distinct(),
139 )
140 } else if let Some(udf_window_expr) = expr.downcast_ref::<StandardWindowExpr>() {
141 if let Some(expr) = udf_window_expr
142 .get_standard_func_expr()
143 .as_any()
144 .downcast_ref::<WindowUDFExpr>()
145 {
146 let mut buf = Vec::new();
147 codec.try_encode_udwf(expr.fun(), &mut buf)?;
148 args = expr.args().to_vec();
149 (
150 physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(
151 expr.fun().name().to_string(),
152 ),
153 (!buf.is_empty()).then_some(buf),
154 false, false,
156 )
157 } else {
158 return not_impl_err!(
159 "User-defined window function not supported: {window_expr:?}"
160 );
161 }
162 } else {
163 return not_impl_err!("WindowExpr not supported: {window_expr:?}");
164 };
165
166 let args = serialize_physical_exprs(&args, codec, proto_converter)?;
167 let partition_by =
168 serialize_physical_exprs(window_expr.partition_by(), codec, proto_converter)?;
169 let order_by = serialize_physical_sort_exprs(
170 window_expr.order_by().to_vec(),
171 codec,
172 proto_converter,
173 )?;
174 let window_frame: protobuf::WindowFrame = window_frame
175 .as_ref()
176 .try_into()
177 .map_err(|e| internal_datafusion_err!("{e}"))?;
178
179 Ok(protobuf::PhysicalWindowExprNode {
180 args,
181 partition_by,
182 order_by,
183 window_frame: Some(window_frame),
184 window_function: Some(window_function),
185 name: window_expr.name().to_string(),
186 fun_definition,
187 ignore_nulls,
188 distinct,
189 })
190}
191
192pub fn serialize_physical_sort_exprs<I>(
193 sort_exprs: I,
194 codec: &dyn PhysicalExtensionCodec,
195 proto_converter: &dyn PhysicalProtoConverterExtension,
196) -> Result<Vec<PhysicalSortExprNode>>
197where
198 I: IntoIterator<Item = PhysicalSortExpr>,
199{
200 sort_exprs
201 .into_iter()
202 .map(|sort_expr| serialize_physical_sort_expr(sort_expr, codec, proto_converter))
203 .collect()
204}
205
206pub fn serialize_physical_sort_expr(
207 sort_expr: PhysicalSortExpr,
208 codec: &dyn PhysicalExtensionCodec,
209 proto_converter: &dyn PhysicalProtoConverterExtension,
210) -> Result<PhysicalSortExprNode> {
211 let PhysicalSortExpr { expr, options } = sort_expr;
212 let expr = proto_converter.physical_expr_to_proto(&expr, codec)?;
213 Ok(PhysicalSortExprNode {
214 expr: Some(Box::new(expr)),
215 asc: !options.descending,
216 nulls_first: options.nulls_first,
217 })
218}
219
220pub fn serialize_physical_exprs<'a, I>(
221 values: I,
222 codec: &dyn PhysicalExtensionCodec,
223 proto_converter: &dyn PhysicalProtoConverterExtension,
224) -> Result<Vec<protobuf::PhysicalExprNode>>
225where
226 I: IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
227{
228 values
229 .into_iter()
230 .map(|value| proto_converter.physical_expr_to_proto(value, codec))
231 .collect()
232}
233
234pub fn serialize_physical_expr(
239 value: &Arc<dyn PhysicalExpr>,
240 codec: &dyn PhysicalExtensionCodec,
241) -> Result<protobuf::PhysicalExprNode> {
242 serialize_physical_expr_with_converter(
243 value,
244 codec,
245 &DefaultPhysicalProtoConverter {},
246 )
247}
248
249pub fn serialize_physical_expr_with_converter(
256 value: &Arc<dyn PhysicalExpr>,
257 codec: &dyn PhysicalExtensionCodec,
258 proto_converter: &dyn PhysicalProtoConverterExtension,
259) -> Result<protobuf::PhysicalExprNode> {
260 let value = snapshot_physical_expr(Arc::clone(value))?;
263 let expr = value.as_any();
264
265 if expr.downcast_ref::<HashTableLookupExpr>().is_some() {
279 let value = datafusion_proto_common::ScalarValue {
280 value: Some(datafusion_proto_common::scalar_value::Value::BoolValue(
281 true,
282 )),
283 };
284 return Ok(protobuf::PhysicalExprNode {
285 expr_id: None,
286 expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(value)),
287 });
288 }
289
290 if let Some(expr) = expr.downcast_ref::<Column>() {
291 Ok(protobuf::PhysicalExprNode {
292 expr_id: None,
293 expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
294 protobuf::PhysicalColumn {
295 name: expr.name().to_string(),
296 index: expr.index() as u32,
297 },
298 )),
299 })
300 } else if let Some(expr) = expr.downcast_ref::<UnKnownColumn>() {
301 Ok(protobuf::PhysicalExprNode {
302 expr_id: None,
303 expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn(
304 protobuf::UnknownColumn {
305 name: expr.name().to_string(),
306 },
307 )),
308 })
309 } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
310 let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
311 l: Some(Box::new(
312 proto_converter.physical_expr_to_proto(expr.left(), codec)?,
313 )),
314 r: Some(Box::new(
315 proto_converter.physical_expr_to_proto(expr.right(), codec)?,
316 )),
317 op: format!("{:?}", expr.op()),
318 });
319
320 Ok(protobuf::PhysicalExprNode {
321 expr_id: None,
322 expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
323 binary_expr,
324 )),
325 })
326 } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
327 Ok(protobuf::PhysicalExprNode {
328 expr_id: None,
329 expr_type: Some(
330 protobuf::physical_expr_node::ExprType::Case(
331 Box::new(
332 protobuf::PhysicalCaseNode {
333 expr: expr
334 .expr()
335 .map(|exp| {
336 proto_converter
337 .physical_expr_to_proto(exp, codec)
338 .map(Box::new)
339 })
340 .transpose()?,
341 when_then_expr: expr
342 .when_then_expr()
343 .iter()
344 .map(|(when_expr, then_expr)| {
345 serialize_when_then_expr(
346 when_expr,
347 then_expr,
348 codec,
349 proto_converter,
350 )
351 })
352 .collect::<Result<
353 Vec<protobuf::PhysicalWhenThen>,
354 DataFusionError,
355 >>()?,
356 else_expr: expr
357 .else_expr()
358 .map(|a| {
359 proto_converter
360 .physical_expr_to_proto(a, codec)
361 .map(Box::new)
362 })
363 .transpose()?,
364 },
365 ),
366 ),
367 ),
368 })
369 } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
370 Ok(protobuf::PhysicalExprNode {
371 expr_id: None,
372 expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
373 protobuf::PhysicalNot {
374 expr: Some(Box::new(
375 proto_converter.physical_expr_to_proto(expr.arg(), codec)?,
376 )),
377 },
378 ))),
379 })
380 } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
381 Ok(protobuf::PhysicalExprNode {
382 expr_id: None,
383 expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
384 Box::new(protobuf::PhysicalIsNull {
385 expr: Some(Box::new(
386 proto_converter.physical_expr_to_proto(expr.arg(), codec)?,
387 )),
388 }),
389 )),
390 })
391 } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
392 Ok(protobuf::PhysicalExprNode {
393 expr_id: None,
394 expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
395 Box::new(protobuf::PhysicalIsNotNull {
396 expr: Some(Box::new(
397 proto_converter.physical_expr_to_proto(expr.arg(), codec)?,
398 )),
399 }),
400 )),
401 })
402 } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
403 Ok(protobuf::PhysicalExprNode {
404 expr_id: None,
405 expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new(
406 protobuf::PhysicalInListNode {
407 expr: Some(Box::new(
408 proto_converter.physical_expr_to_proto(expr.expr(), codec)?,
409 )),
410 list: serialize_physical_exprs(expr.list(), codec, proto_converter)?,
411 negated: expr.negated(),
412 },
413 ))),
414 })
415 } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
416 Ok(protobuf::PhysicalExprNode {
417 expr_id: None,
418 expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
419 protobuf::PhysicalNegativeNode {
420 expr: Some(Box::new(
421 proto_converter.physical_expr_to_proto(expr.arg(), codec)?,
422 )),
423 },
424 ))),
425 })
426 } else if let Some(lit) = expr.downcast_ref::<Literal>() {
427 Ok(protobuf::PhysicalExprNode {
428 expr_id: None,
429 expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
430 lit.value().try_into()?,
431 )),
432 })
433 } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
434 Ok(protobuf::PhysicalExprNode {
435 expr_id: None,
436 expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
437 protobuf::PhysicalCastNode {
438 expr: Some(Box::new(
439 proto_converter.physical_expr_to_proto(cast.expr(), codec)?,
440 )),
441 arrow_type: Some(cast.cast_type().try_into()?),
442 },
443 ))),
444 })
445 } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
446 Ok(protobuf::PhysicalExprNode {
447 expr_id: None,
448 expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new(
449 protobuf::PhysicalTryCastNode {
450 expr: Some(Box::new(
451 proto_converter.physical_expr_to_proto(cast.expr(), codec)?,
452 )),
453 arrow_type: Some(cast.cast_type().try_into()?),
454 },
455 ))),
456 })
457 } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
458 let mut buf = Vec::new();
459 codec.try_encode_udf(expr.fun(), &mut buf)?;
460 Ok(protobuf::PhysicalExprNode {
461 expr_id: None,
462 expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf(
463 protobuf::PhysicalScalarUdfNode {
464 name: expr.name().to_string(),
465 args: serialize_physical_exprs(expr.args(), codec, proto_converter)?,
466 fun_definition: (!buf.is_empty()).then_some(buf),
467 return_type: Some(expr.return_type().try_into()?),
468 nullable: expr.nullable(),
469 return_field_name: expr
470 .return_field(&Schema::empty())?
471 .name()
472 .to_string(),
473 },
474 )),
475 })
476 } else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
477 Ok(protobuf::PhysicalExprNode {
478 expr_id: None,
479 expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new(
480 protobuf::PhysicalLikeExprNode {
481 negated: expr.negated(),
482 case_insensitive: expr.case_insensitive(),
483 expr: Some(Box::new(
484 proto_converter.physical_expr_to_proto(expr.expr(), codec)?,
485 )),
486 pattern: Some(Box::new(
487 proto_converter.physical_expr_to_proto(expr.pattern(), codec)?,
488 )),
489 },
490 ))),
491 })
492 } else if let Some(expr) = expr.downcast_ref::<HashExpr>() {
493 let (s0, s1, s2, s3) = expr.seeds();
494 Ok(protobuf::PhysicalExprNode {
495 expr_id: None,
496 expr_type: Some(protobuf::physical_expr_node::ExprType::HashExpr(
497 protobuf::PhysicalHashExprNode {
498 on_columns: serialize_physical_exprs(
499 expr.on_columns(),
500 codec,
501 proto_converter,
502 )?,
503 seed0: s0,
504 seed1: s1,
505 seed2: s2,
506 seed3: s3,
507 description: expr.description().to_string(),
508 },
509 )),
510 })
511 } else {
512 let mut buf: Vec<u8> = vec![];
513 match codec.try_encode_expr(&value, &mut buf) {
514 Ok(_) => {
515 let inputs: Vec<protobuf::PhysicalExprNode> = value
516 .children()
517 .into_iter()
518 .map(|e| proto_converter.physical_expr_to_proto(e, codec))
519 .collect::<Result<_>>()?;
520 Ok(protobuf::PhysicalExprNode {
521 expr_id: None,
522 expr_type: Some(protobuf::physical_expr_node::ExprType::Extension(
523 protobuf::PhysicalExtensionExprNode { expr: buf, inputs },
524 )),
525 })
526 }
527 Err(e) => internal_err!(
528 "Unsupported physical expr and extension codec failed with [{e}]. Expr: {value:?}"
529 ),
530 }
531 }
532}
533
534pub fn serialize_partitioning(
535 partitioning: &Partitioning,
536 codec: &dyn PhysicalExtensionCodec,
537 proto_converter: &dyn PhysicalProtoConverterExtension,
538) -> Result<protobuf::Partitioning> {
539 let serialized_partitioning = match partitioning {
540 Partitioning::RoundRobinBatch(partition_count) => protobuf::Partitioning {
541 partition_method: Some(protobuf::partitioning::PartitionMethod::RoundRobin(
542 *partition_count as u64,
543 )),
544 },
545 Partitioning::Hash(exprs, partition_count) => {
546 let serialized_exprs =
547 serialize_physical_exprs(exprs, codec, proto_converter)?;
548 protobuf::Partitioning {
549 partition_method: Some(protobuf::partitioning::PartitionMethod::Hash(
550 protobuf::PhysicalHashRepartition {
551 hash_expr: serialized_exprs,
552 partition_count: *partition_count as u64,
553 },
554 )),
555 }
556 }
557 Partitioning::UnknownPartitioning(partition_count) => protobuf::Partitioning {
558 partition_method: Some(protobuf::partitioning::PartitionMethod::Unknown(
559 *partition_count as u64,
560 )),
561 },
562 };
563 Ok(serialized_partitioning)
564}
565
566fn serialize_when_then_expr(
567 when_expr: &Arc<dyn PhysicalExpr>,
568 then_expr: &Arc<dyn PhysicalExpr>,
569 codec: &dyn PhysicalExtensionCodec,
570 proto_converter: &dyn PhysicalProtoConverterExtension,
571) -> Result<protobuf::PhysicalWhenThen> {
572 Ok(protobuf::PhysicalWhenThen {
573 when_expr: Some(proto_converter.physical_expr_to_proto(when_expr, codec)?),
574 then_expr: Some(proto_converter.physical_expr_to_proto(then_expr, codec)?),
575 })
576}
577
578impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile {
579 type Error = DataFusionError;
580
581 fn try_from(pf: &PartitionedFile) -> Result<Self> {
582 let last_modified = pf.object_meta.last_modified;
583 let last_modified_ns = last_modified.timestamp_nanos_opt().ok_or_else(|| {
584 DataFusionError::Plan(format!(
585 "Invalid timestamp on PartitionedFile::ObjectMeta: {last_modified}"
586 ))
587 })? as u64;
588 Ok(protobuf::PartitionedFile {
589 path: pf.object_meta.location.as_ref().to_owned(),
590 size: pf.object_meta.size,
591 last_modified_ns,
592 partition_values: pf
593 .partition_values
594 .iter()
595 .map(|v| v.try_into())
596 .collect::<Result<Vec<_>, _>>()?,
597 range: pf.range.as_ref().map(|r| r.try_into()).transpose()?,
598 statistics: pf.statistics.as_ref().map(|s| s.as_ref().into()),
599 })
600 }
601}
602
603impl TryFrom<&FileRange> for protobuf::FileRange {
604 type Error = DataFusionError;
605
606 fn try_from(value: &FileRange) -> Result<Self> {
607 Ok(protobuf::FileRange {
608 start: value.start,
609 end: value.end,
610 })
611 }
612}
613
614impl TryFrom<&[PartitionedFile]> for protobuf::FileGroup {
615 type Error = DataFusionError;
616
617 fn try_from(gr: &[PartitionedFile]) -> Result<Self, Self::Error> {
618 Ok(protobuf::FileGroup {
619 files: gr
620 .iter()
621 .map(|f| f.try_into())
622 .collect::<Result<Vec<_>, _>>()?,
623 })
624 }
625}
626
627pub fn serialize_file_scan_config(
628 conf: &FileScanConfig,
629 codec: &dyn PhysicalExtensionCodec,
630 proto_converter: &dyn PhysicalProtoConverterExtension,
631) -> Result<protobuf::FileScanExecConf> {
632 let file_groups = conf
633 .file_groups
634 .iter()
635 .map(|p| p.files().try_into())
636 .collect::<Result<Vec<_>, _>>()?;
637
638 let mut output_orderings = vec![];
639 for order in &conf.output_ordering {
640 let ordering =
641 serialize_physical_sort_exprs(order.to_vec(), codec, proto_converter)?;
642 output_orderings.push(ordering)
643 }
644
645 let mut fields = conf
648 .file_schema()
649 .fields()
650 .iter()
651 .cloned()
652 .collect::<Vec<_>>();
653 fields.extend(conf.table_partition_cols().iter().cloned());
654
655 let schema = Arc::new(
656 Schema::new(fields.clone()).with_metadata(conf.file_schema().metadata.clone()),
657 );
658
659 let projection_exprs = conf
660 .file_source
661 .projection()
662 .as_ref()
663 .map(|projection_exprs| {
664 let projections = projection_exprs.iter().cloned().collect::<Vec<_>>();
665 Ok::<_, DataFusionError>(protobuf::ProjectionExprs {
666 projections: projections
667 .into_iter()
668 .map(|expr| {
669 Ok(protobuf::ProjectionExpr {
670 alias: expr.alias.to_string(),
671 expr: Some(
672 proto_converter
673 .physical_expr_to_proto(&expr.expr, codec)?,
674 ),
675 })
676 })
677 .collect::<Result<Vec<_>>>()?,
678 })
679 })
680 .transpose()?;
681
682 Ok(protobuf::FileScanExecConf {
683 file_groups,
684 statistics: Some((&conf.statistics()).into()),
685 limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }),
686 projection: vec![],
687 schema: Some(schema.as_ref().try_into()?),
688 table_partition_cols: conf
689 .table_partition_cols()
690 .iter()
691 .map(|x| x.name().clone())
692 .collect::<Vec<_>>(),
693 object_store_url: conf.object_store_url.to_string(),
694 output_ordering: output_orderings
695 .into_iter()
696 .map(|e| PhysicalSortExprNodeCollection {
697 physical_sort_expr_nodes: e,
698 })
699 .collect::<Vec<_>>(),
700 constraints: Some(conf.constraints.clone().into()),
701 batch_size: conf.batch_size.map(|s| s as u64),
702 projection_exprs,
703 })
704}
705
706pub fn serialize_maybe_filter(
707 expr: Option<Arc<dyn PhysicalExpr>>,
708 codec: &dyn PhysicalExtensionCodec,
709 proto_converter: &dyn PhysicalProtoConverterExtension,
710) -> Result<protobuf::MaybeFilter> {
711 match expr {
712 None => Ok(protobuf::MaybeFilter { expr: None }),
713 Some(expr) => Ok(protobuf::MaybeFilter {
714 expr: Some(proto_converter.physical_expr_to_proto(&expr, codec)?),
715 }),
716 }
717}
718
719pub fn serialize_record_batches(batches: &[RecordBatch]) -> Result<Vec<u8>> {
720 if batches.is_empty() {
721 return Ok(vec![]);
722 }
723 let schema = batches[0].schema();
724 let mut buf = Vec::new();
725 let mut writer = StreamWriter::try_new(&mut buf, &schema)?;
726 for batch in batches {
727 writer.write(batch)?;
728 }
729 writer.finish()?;
730 Ok(buf)
731}
732
733impl TryFrom<&JsonSink> for protobuf::JsonSink {
734 type Error = DataFusionError;
735
736 fn try_from(value: &JsonSink) -> Result<Self, Self::Error> {
737 Ok(Self {
738 config: Some(value.config().try_into()?),
739 writer_options: Some(value.writer_options().try_into()?),
740 })
741 }
742}
743
744impl TryFrom<&CsvSink> for protobuf::CsvSink {
745 type Error = DataFusionError;
746
747 fn try_from(value: &CsvSink) -> Result<Self, Self::Error> {
748 Ok(Self {
749 config: Some(value.config().try_into()?),
750 writer_options: Some(value.writer_options().try_into()?),
751 })
752 }
753}
754
755#[cfg(feature = "parquet")]
756impl TryFrom<&ParquetSink> for protobuf::ParquetSink {
757 type Error = DataFusionError;
758
759 fn try_from(value: &ParquetSink) -> Result<Self, Self::Error> {
760 Ok(Self {
761 config: Some(value.config().try_into()?),
762 parquet_options: Some(value.parquet_options().try_into()?),
763 })
764 }
765}
766
767impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
768 type Error = DataFusionError;
769
770 fn try_from(conf: &FileSinkConfig) -> Result<Self, Self::Error> {
771 let file_groups = conf
772 .file_group
773 .iter()
774 .map(TryInto::try_into)
775 .collect::<Result<Vec<_>>>()?;
776 let table_paths = conf
777 .table_paths
778 .iter()
779 .map(ToString::to_string)
780 .collect::<Vec<_>>();
781 let table_partition_cols = conf
782 .table_partition_cols
783 .iter()
784 .map(|(name, data_type)| {
785 Ok(protobuf::PartitionColumn {
786 name: name.to_owned(),
787 arrow_type: Some(data_type.try_into()?),
788 })
789 })
790 .collect::<Result<Vec<_>>>()?;
791 let file_output_mode = match conf.file_output_mode {
792 datafusion_datasource::file_sink_config::FileOutputMode::Automatic => {
793 protobuf::FileOutputMode::Automatic
794 }
795 datafusion_datasource::file_sink_config::FileOutputMode::SingleFile => {
796 protobuf::FileOutputMode::SingleFile
797 }
798 datafusion_datasource::file_sink_config::FileOutputMode::Directory => {
799 protobuf::FileOutputMode::Directory
800 }
801 };
802 Ok(Self {
803 object_store_url: conf.object_store_url.to_string(),
804 file_groups,
805 table_paths,
806 output_schema: Some(conf.output_schema.as_ref().try_into()?),
807 table_partition_cols,
808 keep_partition_by_columns: conf.keep_partition_by_columns,
809 insert_op: conf.insert_op as i32,
810 file_extension: conf.file_extension.to_string(),
811 file_output_mode: file_output_mode.into(),
812 })
813 }
814}