1use std::sync::Arc;
19
20use arrow::array::RecordBatch;
21use arrow::datatypes::Schema;
22use arrow::ipc::writer::StreamWriter;
23use datafusion_common::{
24 DataFusionError, Result, internal_datafusion_err, internal_err, not_impl_err,
25};
26use datafusion_datasource::file_scan_config::FileScanConfig;
27use datafusion_datasource::file_sink_config::FileSink;
28use datafusion_datasource::file_sink_config::FileSinkConfig;
29use datafusion_datasource::{FileRange, PartitionedFile};
30use datafusion_datasource_csv::file_format::CsvSink;
31use datafusion_datasource_json::file_format::JsonSink;
32#[cfg(feature = "parquet")]
33use datafusion_datasource_parquet::file_format::ParquetSink;
34use datafusion_expr::WindowFrame;
35use datafusion_physical_expr::ScalarFunctionExpr;
36use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
37use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr;
38use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
39use datafusion_physical_plan::expressions::LikeExpr;
40use datafusion_physical_plan::expressions::{
41 BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
42 Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
43};
44use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr};
45use datafusion_physical_plan::udaf::AggregateFunctionExpr;
46use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
47use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
48
49use crate::protobuf::{
50 self, PhysicalSortExprNode, PhysicalSortExprNodeCollection,
51 physical_aggregate_expr_node, physical_window_expr_node,
52};
53
54use super::PhysicalExtensionCodec;
55
56#[expect(clippy::needless_pass_by_value)]
57pub fn serialize_physical_aggr_expr(
58 aggr_expr: Arc<AggregateFunctionExpr>,
59 codec: &dyn PhysicalExtensionCodec,
60) -> Result<protobuf::PhysicalExprNode> {
61 let expressions = serialize_physical_exprs(&aggr_expr.expressions(), codec)?;
62 let order_bys =
63 serialize_physical_sort_exprs(aggr_expr.order_bys().iter().cloned(), codec)?;
64
65 let name = aggr_expr.fun().name().to_string();
66 let mut buf = Vec::new();
67 codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
68 Ok(protobuf::PhysicalExprNode {
69 expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
70 protobuf::PhysicalAggregateExprNode {
71 aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)),
72 expr: expressions,
73 ordering_req: order_bys,
74 distinct: aggr_expr.is_distinct(),
75 ignore_nulls: aggr_expr.ignore_nulls(),
76 fun_definition: (!buf.is_empty()).then_some(buf),
77 human_display: aggr_expr.human_display().to_string(),
78 },
79 )),
80 })
81}
82
83fn serialize_physical_window_aggr_expr(
84 aggr_expr: &AggregateFunctionExpr,
85 _window_frame: &WindowFrame,
86 codec: &dyn PhysicalExtensionCodec,
87) -> Result<(physical_window_expr_node::WindowFunction, Option<Vec<u8>>)> {
88 let mut buf = Vec::new();
91 codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
92 Ok((
93 physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(
94 aggr_expr.fun().name().to_string(),
95 ),
96 (!buf.is_empty()).then_some(buf),
97 ))
98}
99
100pub fn serialize_physical_window_expr(
101 window_expr: &Arc<dyn WindowExpr>,
102 codec: &dyn PhysicalExtensionCodec,
103) -> Result<protobuf::PhysicalWindowExprNode> {
104 let expr = window_expr.as_any();
105 let args = window_expr.expressions().to_vec();
106 let window_frame = window_expr.get_window_frame();
107
108 let (window_function, fun_definition, ignore_nulls, distinct) =
109 if let Some(plain_aggr_window_expr) =
110 expr.downcast_ref::<PlainAggregateWindowExpr>()
111 {
112 let aggr_expr = plain_aggr_window_expr.get_aggregate_expr();
113 let (window_function, fun_definition) =
114 serialize_physical_window_aggr_expr(aggr_expr, window_frame, codec)?;
115 (
116 window_function,
117 fun_definition,
118 aggr_expr.ignore_nulls(),
119 aggr_expr.is_distinct(),
120 )
121 } else if let Some(sliding_aggr_window_expr) =
122 expr.downcast_ref::<SlidingAggregateWindowExpr>()
123 {
124 let aggr_expr = sliding_aggr_window_expr.get_aggregate_expr();
125 let (window_function, fun_definition) =
126 serialize_physical_window_aggr_expr(aggr_expr, window_frame, codec)?;
127 (
128 window_function,
129 fun_definition,
130 aggr_expr.ignore_nulls(),
131 aggr_expr.is_distinct(),
132 )
133 } else if let Some(udf_window_expr) = expr.downcast_ref::<StandardWindowExpr>() {
134 if let Some(expr) = udf_window_expr
135 .get_standard_func_expr()
136 .as_any()
137 .downcast_ref::<WindowUDFExpr>()
138 {
139 let mut buf = Vec::new();
140 codec.try_encode_udwf(expr.fun(), &mut buf)?;
141 (
142 physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(
143 expr.fun().name().to_string(),
144 ),
145 (!buf.is_empty()).then_some(buf),
146 false, false,
148 )
149 } else {
150 return not_impl_err!(
151 "User-defined window function not supported: {window_expr:?}"
152 );
153 }
154 } else {
155 return not_impl_err!("WindowExpr not supported: {window_expr:?}");
156 };
157
158 let args = serialize_physical_exprs(&args, codec)?;
159 let partition_by = serialize_physical_exprs(window_expr.partition_by(), codec)?;
160 let order_by = serialize_physical_sort_exprs(window_expr.order_by().to_vec(), codec)?;
161 let window_frame: protobuf::WindowFrame = window_frame
162 .as_ref()
163 .try_into()
164 .map_err(|e| internal_datafusion_err!("{e}"))?;
165
166 Ok(protobuf::PhysicalWindowExprNode {
167 args,
168 partition_by,
169 order_by,
170 window_frame: Some(window_frame),
171 window_function: Some(window_function),
172 name: window_expr.name().to_string(),
173 fun_definition,
174 ignore_nulls,
175 distinct,
176 })
177}
178
179pub fn serialize_physical_sort_exprs<I>(
180 sort_exprs: I,
181 codec: &dyn PhysicalExtensionCodec,
182) -> Result<Vec<PhysicalSortExprNode>>
183where
184 I: IntoIterator<Item = PhysicalSortExpr>,
185{
186 sort_exprs
187 .into_iter()
188 .map(|sort_expr| serialize_physical_sort_expr(sort_expr, codec))
189 .collect()
190}
191
192pub fn serialize_physical_sort_expr(
193 sort_expr: PhysicalSortExpr,
194 codec: &dyn PhysicalExtensionCodec,
195) -> Result<PhysicalSortExprNode> {
196 let PhysicalSortExpr { expr, options } = sort_expr;
197 let expr = serialize_physical_expr(&expr, codec)?;
198 Ok(PhysicalSortExprNode {
199 expr: Some(Box::new(expr)),
200 asc: !options.descending,
201 nulls_first: options.nulls_first,
202 })
203}
204
205pub fn serialize_physical_exprs<'a, I>(
206 values: I,
207 codec: &dyn PhysicalExtensionCodec,
208) -> Result<Vec<protobuf::PhysicalExprNode>>
209where
210 I: IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
211{
212 values
213 .into_iter()
214 .map(|value| serialize_physical_expr(value, codec))
215 .collect()
216}
217
218pub fn serialize_physical_expr(
223 value: &Arc<dyn PhysicalExpr>,
224 codec: &dyn PhysicalExtensionCodec,
225) -> Result<protobuf::PhysicalExprNode> {
226 let value = snapshot_physical_expr(Arc::clone(value))?;
229 let expr = value.as_any();
230
231 if expr.downcast_ref::<HashTableLookupExpr>().is_some() {
245 let value = datafusion_proto_common::ScalarValue {
246 value: Some(datafusion_proto_common::scalar_value::Value::BoolValue(
247 true,
248 )),
249 };
250 return Ok(protobuf::PhysicalExprNode {
251 expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(value)),
252 });
253 }
254
255 if let Some(expr) = expr.downcast_ref::<Column>() {
256 Ok(protobuf::PhysicalExprNode {
257 expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
258 protobuf::PhysicalColumn {
259 name: expr.name().to_string(),
260 index: expr.index() as u32,
261 },
262 )),
263 })
264 } else if let Some(expr) = expr.downcast_ref::<UnKnownColumn>() {
265 Ok(protobuf::PhysicalExprNode {
266 expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn(
267 protobuf::UnknownColumn {
268 name: expr.name().to_string(),
269 },
270 )),
271 })
272 } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
273 let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
274 l: Some(Box::new(serialize_physical_expr(expr.left(), codec)?)),
275 r: Some(Box::new(serialize_physical_expr(expr.right(), codec)?)),
276 op: format!("{:?}", expr.op()),
277 });
278
279 Ok(protobuf::PhysicalExprNode {
280 expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
281 binary_expr,
282 )),
283 })
284 } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
285 Ok(protobuf::PhysicalExprNode {
286 expr_type: Some(
287 protobuf::physical_expr_node::ExprType::Case(
288 Box::new(
289 protobuf::PhysicalCaseNode {
290 expr: expr
291 .expr()
292 .map(|exp| {
293 serialize_physical_expr(exp, codec).map(Box::new)
294 })
295 .transpose()?,
296 when_then_expr: expr
297 .when_then_expr()
298 .iter()
299 .map(|(when_expr, then_expr)| {
300 serialize_when_then_expr(when_expr, then_expr, codec)
301 })
302 .collect::<Result<
303 Vec<protobuf::PhysicalWhenThen>,
304 DataFusionError,
305 >>()?,
306 else_expr: expr
307 .else_expr()
308 .map(|a| serialize_physical_expr(a, codec).map(Box::new))
309 .transpose()?,
310 },
311 ),
312 ),
313 ),
314 })
315 } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
316 Ok(protobuf::PhysicalExprNode {
317 expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
318 protobuf::PhysicalNot {
319 expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
320 },
321 ))),
322 })
323 } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
324 Ok(protobuf::PhysicalExprNode {
325 expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
326 Box::new(protobuf::PhysicalIsNull {
327 expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
328 }),
329 )),
330 })
331 } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
332 Ok(protobuf::PhysicalExprNode {
333 expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
334 Box::new(protobuf::PhysicalIsNotNull {
335 expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
336 }),
337 )),
338 })
339 } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
340 Ok(protobuf::PhysicalExprNode {
341 expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new(
342 protobuf::PhysicalInListNode {
343 expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)),
344 list: serialize_physical_exprs(expr.list(), codec)?,
345 negated: expr.negated(),
346 },
347 ))),
348 })
349 } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
350 Ok(protobuf::PhysicalExprNode {
351 expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
352 protobuf::PhysicalNegativeNode {
353 expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
354 },
355 ))),
356 })
357 } else if let Some(lit) = expr.downcast_ref::<Literal>() {
358 Ok(protobuf::PhysicalExprNode {
359 expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
360 lit.value().try_into()?,
361 )),
362 })
363 } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
364 Ok(protobuf::PhysicalExprNode {
365 expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
366 protobuf::PhysicalCastNode {
367 expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)),
368 arrow_type: Some(cast.cast_type().try_into()?),
369 },
370 ))),
371 })
372 } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
373 Ok(protobuf::PhysicalExprNode {
374 expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new(
375 protobuf::PhysicalTryCastNode {
376 expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)),
377 arrow_type: Some(cast.cast_type().try_into()?),
378 },
379 ))),
380 })
381 } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
382 let mut buf = Vec::new();
383 codec.try_encode_udf(expr.fun(), &mut buf)?;
384 Ok(protobuf::PhysicalExprNode {
385 expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf(
386 protobuf::PhysicalScalarUdfNode {
387 name: expr.name().to_string(),
388 args: serialize_physical_exprs(expr.args(), codec)?,
389 fun_definition: (!buf.is_empty()).then_some(buf),
390 return_type: Some(expr.return_type().try_into()?),
391 nullable: expr.nullable(),
392 return_field_name: expr
393 .return_field(&Schema::empty())?
394 .name()
395 .to_string(),
396 },
397 )),
398 })
399 } else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
400 Ok(protobuf::PhysicalExprNode {
401 expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new(
402 protobuf::PhysicalLikeExprNode {
403 negated: expr.negated(),
404 case_insensitive: expr.case_insensitive(),
405 expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)),
406 pattern: Some(Box::new(serialize_physical_expr(
407 expr.pattern(),
408 codec,
409 )?)),
410 },
411 ))),
412 })
413 } else if let Some(expr) = expr.downcast_ref::<HashExpr>() {
414 let (s0, s1, s2, s3) = expr.seeds();
415 Ok(protobuf::PhysicalExprNode {
416 expr_type: Some(protobuf::physical_expr_node::ExprType::HashExpr(
417 protobuf::PhysicalHashExprNode {
418 on_columns: serialize_physical_exprs(expr.on_columns(), codec)?,
419 seed0: s0,
420 seed1: s1,
421 seed2: s2,
422 seed3: s3,
423 description: expr.description().to_string(),
424 },
425 )),
426 })
427 } else {
428 let mut buf: Vec<u8> = vec![];
429 match codec.try_encode_expr(&value, &mut buf) {
430 Ok(_) => {
431 let inputs: Vec<protobuf::PhysicalExprNode> = value
432 .children()
433 .into_iter()
434 .map(|e| serialize_physical_expr(e, codec))
435 .collect::<Result<_>>()?;
436 Ok(protobuf::PhysicalExprNode {
437 expr_type: Some(protobuf::physical_expr_node::ExprType::Extension(
438 protobuf::PhysicalExtensionExprNode { expr: buf, inputs },
439 )),
440 })
441 }
442 Err(e) => internal_err!(
443 "Unsupported physical expr and extension codec failed with [{e}]. Expr: {value:?}"
444 ),
445 }
446 }
447}
448
449pub fn serialize_partitioning(
450 partitioning: &Partitioning,
451 codec: &dyn PhysicalExtensionCodec,
452) -> Result<protobuf::Partitioning> {
453 let serialized_partitioning = match partitioning {
454 Partitioning::RoundRobinBatch(partition_count) => protobuf::Partitioning {
455 partition_method: Some(protobuf::partitioning::PartitionMethod::RoundRobin(
456 *partition_count as u64,
457 )),
458 },
459 Partitioning::Hash(exprs, partition_count) => {
460 let serialized_exprs = serialize_physical_exprs(exprs, codec)?;
461 protobuf::Partitioning {
462 partition_method: Some(protobuf::partitioning::PartitionMethod::Hash(
463 protobuf::PhysicalHashRepartition {
464 hash_expr: serialized_exprs,
465 partition_count: *partition_count as u64,
466 },
467 )),
468 }
469 }
470 Partitioning::UnknownPartitioning(partition_count) => protobuf::Partitioning {
471 partition_method: Some(protobuf::partitioning::PartitionMethod::Unknown(
472 *partition_count as u64,
473 )),
474 },
475 };
476 Ok(serialized_partitioning)
477}
478
479fn serialize_when_then_expr(
480 when_expr: &Arc<dyn PhysicalExpr>,
481 then_expr: &Arc<dyn PhysicalExpr>,
482 codec: &dyn PhysicalExtensionCodec,
483) -> Result<protobuf::PhysicalWhenThen> {
484 Ok(protobuf::PhysicalWhenThen {
485 when_expr: Some(serialize_physical_expr(when_expr, codec)?),
486 then_expr: Some(serialize_physical_expr(then_expr, codec)?),
487 })
488}
489
490impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile {
491 type Error = DataFusionError;
492
493 fn try_from(pf: &PartitionedFile) -> Result<Self> {
494 let last_modified = pf.object_meta.last_modified;
495 let last_modified_ns = last_modified.timestamp_nanos_opt().ok_or_else(|| {
496 DataFusionError::Plan(format!(
497 "Invalid timestamp on PartitionedFile::ObjectMeta: {last_modified}"
498 ))
499 })? as u64;
500 Ok(protobuf::PartitionedFile {
501 path: pf.object_meta.location.as_ref().to_owned(),
502 size: pf.object_meta.size,
503 last_modified_ns,
504 partition_values: pf
505 .partition_values
506 .iter()
507 .map(|v| v.try_into())
508 .collect::<Result<Vec<_>, _>>()?,
509 range: pf.range.as_ref().map(|r| r.try_into()).transpose()?,
510 statistics: pf.statistics.as_ref().map(|s| s.as_ref().into()),
511 })
512 }
513}
514
515impl TryFrom<&FileRange> for protobuf::FileRange {
516 type Error = DataFusionError;
517
518 fn try_from(value: &FileRange) -> Result<Self> {
519 Ok(protobuf::FileRange {
520 start: value.start,
521 end: value.end,
522 })
523 }
524}
525
526impl TryFrom<&[PartitionedFile]> for protobuf::FileGroup {
527 type Error = DataFusionError;
528
529 fn try_from(gr: &[PartitionedFile]) -> Result<Self, Self::Error> {
530 Ok(protobuf::FileGroup {
531 files: gr
532 .iter()
533 .map(|f| f.try_into())
534 .collect::<Result<Vec<_>, _>>()?,
535 })
536 }
537}
538
539pub fn serialize_file_scan_config(
540 conf: &FileScanConfig,
541 codec: &dyn PhysicalExtensionCodec,
542) -> Result<protobuf::FileScanExecConf> {
543 let file_groups = conf
544 .file_groups
545 .iter()
546 .map(|p| p.files().try_into())
547 .collect::<Result<Vec<_>, _>>()?;
548
549 let mut output_orderings = vec![];
550 for order in &conf.output_ordering {
551 let ordering = serialize_physical_sort_exprs(order.to_vec(), codec)?;
552 output_orderings.push(ordering)
553 }
554
555 let mut fields = conf
558 .file_schema()
559 .fields()
560 .iter()
561 .cloned()
562 .collect::<Vec<_>>();
563 fields.extend(conf.table_partition_cols().iter().cloned());
564
565 let schema = Arc::new(
566 arrow::datatypes::Schema::new(fields.clone())
567 .with_metadata(conf.file_schema().metadata.clone()),
568 );
569
570 let projection_exprs = conf
571 .file_source
572 .projection()
573 .as_ref()
574 .map(|projection_exprs| {
575 let projections = projection_exprs.iter().cloned().collect::<Vec<_>>();
576 Ok::<_, DataFusionError>(protobuf::ProjectionExprs {
577 projections: projections
578 .into_iter()
579 .map(|expr| {
580 Ok(protobuf::ProjectionExpr {
581 alias: expr.alias.to_string(),
582 expr: Some(serialize_physical_expr(&expr.expr, codec)?),
583 })
584 })
585 .collect::<Result<Vec<_>>>()?,
586 })
587 })
588 .transpose()?;
589
590 Ok(protobuf::FileScanExecConf {
591 file_groups,
592 statistics: Some((&conf.statistics()).into()),
593 limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }),
594 projection: vec![],
595 schema: Some(schema.as_ref().try_into()?),
596 table_partition_cols: conf
597 .table_partition_cols()
598 .iter()
599 .map(|x| x.name().clone())
600 .collect::<Vec<_>>(),
601 object_store_url: conf.object_store_url.to_string(),
602 output_ordering: output_orderings
603 .into_iter()
604 .map(|e| PhysicalSortExprNodeCollection {
605 physical_sort_expr_nodes: e,
606 })
607 .collect::<Vec<_>>(),
608 constraints: Some(conf.constraints.clone().into()),
609 batch_size: conf.batch_size.map(|s| s as u64),
610 projection_exprs,
611 })
612}
613
614pub fn serialize_maybe_filter(
615 expr: Option<Arc<dyn PhysicalExpr>>,
616 codec: &dyn PhysicalExtensionCodec,
617) -> Result<protobuf::MaybeFilter> {
618 match expr {
619 None => Ok(protobuf::MaybeFilter { expr: None }),
620 Some(expr) => Ok(protobuf::MaybeFilter {
621 expr: Some(serialize_physical_expr(&expr, codec)?),
622 }),
623 }
624}
625
626pub fn serialize_record_batches(batches: &[RecordBatch]) -> Result<Vec<u8>> {
627 if batches.is_empty() {
628 return Ok(vec![]);
629 }
630 let schema = batches[0].schema();
631 let mut buf = Vec::new();
632 let mut writer = StreamWriter::try_new(&mut buf, &schema)?;
633 for batch in batches {
634 writer.write(batch)?;
635 }
636 writer.finish()?;
637 Ok(buf)
638}
639
640impl TryFrom<&JsonSink> for protobuf::JsonSink {
641 type Error = DataFusionError;
642
643 fn try_from(value: &JsonSink) -> Result<Self, Self::Error> {
644 Ok(Self {
645 config: Some(value.config().try_into()?),
646 writer_options: Some(value.writer_options().try_into()?),
647 })
648 }
649}
650
651impl TryFrom<&CsvSink> for protobuf::CsvSink {
652 type Error = DataFusionError;
653
654 fn try_from(value: &CsvSink) -> Result<Self, Self::Error> {
655 Ok(Self {
656 config: Some(value.config().try_into()?),
657 writer_options: Some(value.writer_options().try_into()?),
658 })
659 }
660}
661
662#[cfg(feature = "parquet")]
663impl TryFrom<&ParquetSink> for protobuf::ParquetSink {
664 type Error = DataFusionError;
665
666 fn try_from(value: &ParquetSink) -> Result<Self, Self::Error> {
667 Ok(Self {
668 config: Some(value.config().try_into()?),
669 parquet_options: Some(value.parquet_options().try_into()?),
670 })
671 }
672}
673
674impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
675 type Error = DataFusionError;
676
677 fn try_from(conf: &FileSinkConfig) -> Result<Self, Self::Error> {
678 let file_groups = conf
679 .file_group
680 .iter()
681 .map(TryInto::try_into)
682 .collect::<Result<Vec<_>>>()?;
683 let table_paths = conf
684 .table_paths
685 .iter()
686 .map(ToString::to_string)
687 .collect::<Vec<_>>();
688 let table_partition_cols = conf
689 .table_partition_cols
690 .iter()
691 .map(|(name, data_type)| {
692 Ok(protobuf::PartitionColumn {
693 name: name.to_owned(),
694 arrow_type: Some(data_type.try_into()?),
695 })
696 })
697 .collect::<Result<Vec<_>>>()?;
698 Ok(Self {
699 object_store_url: conf.object_store_url.to_string(),
700 file_groups,
701 table_paths,
702 output_schema: Some(conf.output_schema.as_ref().try_into()?),
703 table_partition_cols,
704 keep_partition_by_columns: conf.keep_partition_by_columns,
705 insert_op: conf.insert_op as i32,
706 file_extension: conf.file_extension.to_string(),
707 })
708 }
709}