1use std::sync::Arc;
19
20use arrow::array::RecordBatch;
21use arrow::datatypes::Schema;
22use arrow::ipc::writer::StreamWriter;
23#[cfg(feature = "parquet")]
24use datafusion::datasource::file_format::parquet::ParquetSink;
25use datafusion::datasource::physical_plan::FileSink;
26use datafusion::physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
27use datafusion::physical_expr::ScalarFunctionExpr;
28use datafusion::physical_expr_common::physical_expr::snapshot_physical_expr;
29use datafusion::physical_expr_common::sort_expr::PhysicalSortExpr;
30use datafusion::physical_plan::expressions::{
31 BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
32 Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
33};
34use datafusion::physical_plan::udaf::AggregateFunctionExpr;
35use datafusion::physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
36use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
37use datafusion::{
38 datasource::{
39 file_format::{csv::CsvSink, json::JsonSink},
40 listing::{FileRange, PartitionedFile},
41 physical_plan::{FileScanConfig, FileSinkConfig},
42 },
43 physical_plan::expressions::LikeExpr,
44};
45use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
46use datafusion_expr::WindowFrame;
47
48use crate::protobuf::{
49 self, physical_aggregate_expr_node, physical_window_expr_node, PhysicalSortExprNode,
50 PhysicalSortExprNodeCollection,
51};
52
53use super::PhysicalExtensionCodec;
54
55pub fn serialize_physical_aggr_expr(
56 aggr_expr: Arc<AggregateFunctionExpr>,
57 codec: &dyn PhysicalExtensionCodec,
58) -> Result<protobuf::PhysicalExprNode> {
59 let expressions = serialize_physical_exprs(&aggr_expr.expressions(), codec)?;
60 let order_bys =
61 serialize_physical_sort_exprs(aggr_expr.order_bys().iter().cloned(), codec)?;
62
63 let name = aggr_expr.fun().name().to_string();
64 let mut buf = Vec::new();
65 codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
66 Ok(protobuf::PhysicalExprNode {
67 expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
68 protobuf::PhysicalAggregateExprNode {
69 aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)),
70 expr: expressions,
71 ordering_req: order_bys,
72 distinct: aggr_expr.is_distinct(),
73 ignore_nulls: aggr_expr.ignore_nulls(),
74 fun_definition: (!buf.is_empty()).then_some(buf),
75 human_display: aggr_expr.human_display().to_string(),
76 },
77 )),
78 })
79}
80
81fn serialize_physical_window_aggr_expr(
82 aggr_expr: &AggregateFunctionExpr,
83 _window_frame: &WindowFrame,
84 codec: &dyn PhysicalExtensionCodec,
85) -> Result<(physical_window_expr_node::WindowFunction, Option<Vec<u8>>)> {
86 let mut buf = Vec::new();
89 codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
90 Ok((
91 physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(
92 aggr_expr.fun().name().to_string(),
93 ),
94 (!buf.is_empty()).then_some(buf),
95 ))
96}
97
98pub fn serialize_physical_window_expr(
99 window_expr: &Arc<dyn WindowExpr>,
100 codec: &dyn PhysicalExtensionCodec,
101) -> Result<protobuf::PhysicalWindowExprNode> {
102 let expr = window_expr.as_any();
103 let args = window_expr.expressions().to_vec();
104 let window_frame = window_expr.get_window_frame();
105
106 let (window_function, fun_definition, ignore_nulls, distinct) =
107 if let Some(plain_aggr_window_expr) =
108 expr.downcast_ref::<PlainAggregateWindowExpr>()
109 {
110 let aggr_expr = plain_aggr_window_expr.get_aggregate_expr();
111 let (window_function, fun_definition) =
112 serialize_physical_window_aggr_expr(aggr_expr, window_frame, codec)?;
113 (
114 window_function,
115 fun_definition,
116 aggr_expr.ignore_nulls(),
117 aggr_expr.is_distinct(),
118 )
119 } else if let Some(sliding_aggr_window_expr) =
120 expr.downcast_ref::<SlidingAggregateWindowExpr>()
121 {
122 let aggr_expr = sliding_aggr_window_expr.get_aggregate_expr();
123 let (window_function, fun_definition) =
124 serialize_physical_window_aggr_expr(aggr_expr, window_frame, codec)?;
125 (
126 window_function,
127 fun_definition,
128 aggr_expr.ignore_nulls(),
129 aggr_expr.is_distinct(),
130 )
131 } else if let Some(udf_window_expr) = expr.downcast_ref::<StandardWindowExpr>() {
132 if let Some(expr) = udf_window_expr
133 .get_standard_func_expr()
134 .as_any()
135 .downcast_ref::<WindowUDFExpr>()
136 {
137 let mut buf = Vec::new();
138 codec.try_encode_udwf(expr.fun(), &mut buf)?;
139 (
140 physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(
141 expr.fun().name().to_string(),
142 ),
143 (!buf.is_empty()).then_some(buf),
144 false, false,
146 )
147 } else {
148 return not_impl_err!(
149 "User-defined window function not supported: {window_expr:?}"
150 );
151 }
152 } else {
153 return not_impl_err!("WindowExpr not supported: {window_expr:?}");
154 };
155
156 let args = serialize_physical_exprs(&args, codec)?;
157 let partition_by = serialize_physical_exprs(window_expr.partition_by(), codec)?;
158 let order_by = serialize_physical_sort_exprs(window_expr.order_by().to_vec(), codec)?;
159 let window_frame: protobuf::WindowFrame = window_frame
160 .as_ref()
161 .try_into()
162 .map_err(|e| DataFusionError::Internal(format!("{e}")))?;
163
164 Ok(protobuf::PhysicalWindowExprNode {
165 args,
166 partition_by,
167 order_by,
168 window_frame: Some(window_frame),
169 window_function: Some(window_function),
170 name: window_expr.name().to_string(),
171 fun_definition,
172 ignore_nulls,
173 distinct,
174 })
175}
176
177pub fn serialize_physical_sort_exprs<I>(
178 sort_exprs: I,
179 codec: &dyn PhysicalExtensionCodec,
180) -> Result<Vec<PhysicalSortExprNode>>
181where
182 I: IntoIterator<Item = PhysicalSortExpr>,
183{
184 sort_exprs
185 .into_iter()
186 .map(|sort_expr| serialize_physical_sort_expr(sort_expr, codec))
187 .collect()
188}
189
190pub fn serialize_physical_sort_expr(
191 sort_expr: PhysicalSortExpr,
192 codec: &dyn PhysicalExtensionCodec,
193) -> Result<PhysicalSortExprNode> {
194 let PhysicalSortExpr { expr, options } = sort_expr;
195 let expr = serialize_physical_expr(&expr, codec)?;
196 Ok(PhysicalSortExprNode {
197 expr: Some(Box::new(expr)),
198 asc: !options.descending,
199 nulls_first: options.nulls_first,
200 })
201}
202
203pub fn serialize_physical_exprs<'a, I>(
204 values: I,
205 codec: &dyn PhysicalExtensionCodec,
206) -> Result<Vec<protobuf::PhysicalExprNode>>
207where
208 I: IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
209{
210 values
211 .into_iter()
212 .map(|value| serialize_physical_expr(value, codec))
213 .collect()
214}
215
216pub fn serialize_physical_expr(
221 value: &Arc<dyn PhysicalExpr>,
222 codec: &dyn PhysicalExtensionCodec,
223) -> Result<protobuf::PhysicalExprNode> {
224 let value = snapshot_physical_expr(Arc::clone(value))?;
227 let expr = value.as_any();
228
229 if let Some(expr) = expr.downcast_ref::<Column>() {
230 Ok(protobuf::PhysicalExprNode {
231 expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
232 protobuf::PhysicalColumn {
233 name: expr.name().to_string(),
234 index: expr.index() as u32,
235 },
236 )),
237 })
238 } else if let Some(expr) = expr.downcast_ref::<UnKnownColumn>() {
239 Ok(protobuf::PhysicalExprNode {
240 expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn(
241 protobuf::UnknownColumn {
242 name: expr.name().to_string(),
243 },
244 )),
245 })
246 } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
247 let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
248 l: Some(Box::new(serialize_physical_expr(expr.left(), codec)?)),
249 r: Some(Box::new(serialize_physical_expr(expr.right(), codec)?)),
250 op: format!("{:?}", expr.op()),
251 });
252
253 Ok(protobuf::PhysicalExprNode {
254 expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
255 binary_expr,
256 )),
257 })
258 } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
259 Ok(protobuf::PhysicalExprNode {
260 expr_type: Some(
261 protobuf::physical_expr_node::ExprType::Case(
262 Box::new(
263 protobuf::PhysicalCaseNode {
264 expr: expr
265 .expr()
266 .map(|exp| {
267 serialize_physical_expr(exp, codec).map(Box::new)
268 })
269 .transpose()?,
270 when_then_expr: expr
271 .when_then_expr()
272 .iter()
273 .map(|(when_expr, then_expr)| {
274 serialize_when_then_expr(when_expr, then_expr, codec)
275 })
276 .collect::<Result<
277 Vec<protobuf::PhysicalWhenThen>,
278 DataFusionError,
279 >>()?,
280 else_expr: expr
281 .else_expr()
282 .map(|a| serialize_physical_expr(a, codec).map(Box::new))
283 .transpose()?,
284 },
285 ),
286 ),
287 ),
288 })
289 } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
290 Ok(protobuf::PhysicalExprNode {
291 expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
292 protobuf::PhysicalNot {
293 expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
294 },
295 ))),
296 })
297 } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
298 Ok(protobuf::PhysicalExprNode {
299 expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
300 Box::new(protobuf::PhysicalIsNull {
301 expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
302 }),
303 )),
304 })
305 } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
306 Ok(protobuf::PhysicalExprNode {
307 expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
308 Box::new(protobuf::PhysicalIsNotNull {
309 expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
310 }),
311 )),
312 })
313 } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
314 Ok(protobuf::PhysicalExprNode {
315 expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new(
316 protobuf::PhysicalInListNode {
317 expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)),
318 list: serialize_physical_exprs(expr.list(), codec)?,
319 negated: expr.negated(),
320 },
321 ))),
322 })
323 } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
324 Ok(protobuf::PhysicalExprNode {
325 expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
326 protobuf::PhysicalNegativeNode {
327 expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
328 },
329 ))),
330 })
331 } else if let Some(lit) = expr.downcast_ref::<Literal>() {
332 Ok(protobuf::PhysicalExprNode {
333 expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
334 lit.value().try_into()?,
335 )),
336 })
337 } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
338 Ok(protobuf::PhysicalExprNode {
339 expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
340 protobuf::PhysicalCastNode {
341 expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)),
342 arrow_type: Some(cast.cast_type().try_into()?),
343 },
344 ))),
345 })
346 } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
347 Ok(protobuf::PhysicalExprNode {
348 expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new(
349 protobuf::PhysicalTryCastNode {
350 expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)),
351 arrow_type: Some(cast.cast_type().try_into()?),
352 },
353 ))),
354 })
355 } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
356 let mut buf = Vec::new();
357 codec.try_encode_udf(expr.fun(), &mut buf)?;
358 Ok(protobuf::PhysicalExprNode {
359 expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf(
360 protobuf::PhysicalScalarUdfNode {
361 name: expr.name().to_string(),
362 args: serialize_physical_exprs(expr.args(), codec)?,
363 fun_definition: (!buf.is_empty()).then_some(buf),
364 return_type: Some(expr.return_type().try_into()?),
365 nullable: expr.nullable(),
366 return_field_name: expr
367 .return_field(&Schema::empty())?
368 .name()
369 .to_string(),
370 },
371 )),
372 })
373 } else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
374 Ok(protobuf::PhysicalExprNode {
375 expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new(
376 protobuf::PhysicalLikeExprNode {
377 negated: expr.negated(),
378 case_insensitive: expr.case_insensitive(),
379 expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)),
380 pattern: Some(Box::new(serialize_physical_expr(
381 expr.pattern(),
382 codec,
383 )?)),
384 },
385 ))),
386 })
387 } else {
388 let mut buf: Vec<u8> = vec![];
389 match codec.try_encode_expr(&value, &mut buf) {
390 Ok(_) => {
391 let inputs: Vec<protobuf::PhysicalExprNode> = value
392 .children()
393 .into_iter()
394 .map(|e| serialize_physical_expr(e, codec))
395 .collect::<Result<_>>()?;
396 Ok(protobuf::PhysicalExprNode {
397 expr_type: Some(protobuf::physical_expr_node::ExprType::Extension(
398 protobuf::PhysicalExtensionExprNode { expr: buf, inputs },
399 )),
400 })
401 }
402 Err(e) => internal_err!(
403 "Unsupported physical expr and extension codec failed with [{e}]. Expr: {value:?}"
404 ),
405 }
406 }
407}
408
409pub fn serialize_partitioning(
410 partitioning: &Partitioning,
411 codec: &dyn PhysicalExtensionCodec,
412) -> Result<protobuf::Partitioning> {
413 let serialized_partitioning = match partitioning {
414 Partitioning::RoundRobinBatch(partition_count) => protobuf::Partitioning {
415 partition_method: Some(protobuf::partitioning::PartitionMethod::RoundRobin(
416 *partition_count as u64,
417 )),
418 },
419 Partitioning::Hash(exprs, partition_count) => {
420 let serialized_exprs = serialize_physical_exprs(exprs, codec)?;
421 protobuf::Partitioning {
422 partition_method: Some(protobuf::partitioning::PartitionMethod::Hash(
423 protobuf::PhysicalHashRepartition {
424 hash_expr: serialized_exprs,
425 partition_count: *partition_count as u64,
426 },
427 )),
428 }
429 }
430 Partitioning::UnknownPartitioning(partition_count) => protobuf::Partitioning {
431 partition_method: Some(protobuf::partitioning::PartitionMethod::Unknown(
432 *partition_count as u64,
433 )),
434 },
435 };
436 Ok(serialized_partitioning)
437}
438
439fn serialize_when_then_expr(
440 when_expr: &Arc<dyn PhysicalExpr>,
441 then_expr: &Arc<dyn PhysicalExpr>,
442 codec: &dyn PhysicalExtensionCodec,
443) -> Result<protobuf::PhysicalWhenThen> {
444 Ok(protobuf::PhysicalWhenThen {
445 when_expr: Some(serialize_physical_expr(when_expr, codec)?),
446 then_expr: Some(serialize_physical_expr(then_expr, codec)?),
447 })
448}
449
450impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile {
451 type Error = DataFusionError;
452
453 fn try_from(pf: &PartitionedFile) -> Result<Self> {
454 let last_modified = pf.object_meta.last_modified;
455 let last_modified_ns = last_modified.timestamp_nanos_opt().ok_or_else(|| {
456 DataFusionError::Plan(format!(
457 "Invalid timestamp on PartitionedFile::ObjectMeta: {last_modified}"
458 ))
459 })? as u64;
460 Ok(protobuf::PartitionedFile {
461 path: pf.object_meta.location.as_ref().to_owned(),
462 size: pf.object_meta.size,
463 last_modified_ns,
464 partition_values: pf
465 .partition_values
466 .iter()
467 .map(|v| v.try_into())
468 .collect::<Result<Vec<_>, _>>()?,
469 range: pf.range.as_ref().map(|r| r.try_into()).transpose()?,
470 statistics: pf.statistics.as_ref().map(|s| s.as_ref().into()),
471 })
472 }
473}
474
475impl TryFrom<&FileRange> for protobuf::FileRange {
476 type Error = DataFusionError;
477
478 fn try_from(value: &FileRange) -> Result<Self> {
479 Ok(protobuf::FileRange {
480 start: value.start,
481 end: value.end,
482 })
483 }
484}
485
486impl TryFrom<&[PartitionedFile]> for protobuf::FileGroup {
487 type Error = DataFusionError;
488
489 fn try_from(gr: &[PartitionedFile]) -> Result<Self, Self::Error> {
490 Ok(protobuf::FileGroup {
491 files: gr
492 .iter()
493 .map(|f| f.try_into())
494 .collect::<Result<Vec<_>, _>>()?,
495 })
496 }
497}
498
499pub fn serialize_file_scan_config(
500 conf: &FileScanConfig,
501 codec: &dyn PhysicalExtensionCodec,
502) -> Result<protobuf::FileScanExecConf> {
503 let file_groups = conf
504 .file_groups
505 .iter()
506 .map(|p| p.files().try_into())
507 .collect::<Result<Vec<_>, _>>()?;
508
509 let mut output_orderings = vec![];
510 for order in &conf.output_ordering {
511 let ordering = serialize_physical_sort_exprs(order.to_vec(), codec)?;
512 output_orderings.push(ordering)
513 }
514
515 let mut fields = conf
518 .file_schema
519 .fields()
520 .iter()
521 .cloned()
522 .collect::<Vec<_>>();
523 fields.extend(conf.table_partition_cols.iter().cloned());
524 let schema = Arc::new(arrow::datatypes::Schema::new(fields.clone()));
525
526 Ok(protobuf::FileScanExecConf {
527 file_groups,
528 statistics: Some((&conf.file_source.statistics().unwrap()).into()),
529 limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }),
530 projection: conf
531 .projection
532 .as_ref()
533 .unwrap_or(&(0..schema.fields().len()).collect::<Vec<_>>())
534 .iter()
535 .map(|n| *n as u32)
536 .collect(),
537 schema: Some(schema.as_ref().try_into()?),
538 table_partition_cols: conf
539 .table_partition_cols
540 .iter()
541 .map(|x| x.name().clone())
542 .collect::<Vec<_>>(),
543 object_store_url: conf.object_store_url.to_string(),
544 output_ordering: output_orderings
545 .into_iter()
546 .map(|e| PhysicalSortExprNodeCollection {
547 physical_sort_expr_nodes: e,
548 })
549 .collect::<Vec<_>>(),
550 constraints: Some(conf.constraints.clone().into()),
551 batch_size: conf.batch_size.map(|s| s as u64),
552 })
553}
554
555pub fn serialize_maybe_filter(
556 expr: Option<Arc<dyn PhysicalExpr>>,
557 codec: &dyn PhysicalExtensionCodec,
558) -> Result<protobuf::MaybeFilter> {
559 match expr {
560 None => Ok(protobuf::MaybeFilter { expr: None }),
561 Some(expr) => Ok(protobuf::MaybeFilter {
562 expr: Some(serialize_physical_expr(&expr, codec)?),
563 }),
564 }
565}
566
567pub fn serialize_record_batches(batches: &[RecordBatch]) -> Result<Vec<u8>> {
568 if batches.is_empty() {
569 return Ok(vec![]);
570 }
571 let schema = batches[0].schema();
572 let mut buf = Vec::new();
573 let mut writer = StreamWriter::try_new(&mut buf, &schema)?;
574 for batch in batches {
575 writer.write(batch)?;
576 }
577 writer.finish()?;
578 Ok(buf)
579}
580
581impl TryFrom<&JsonSink> for protobuf::JsonSink {
582 type Error = DataFusionError;
583
584 fn try_from(value: &JsonSink) -> Result<Self, Self::Error> {
585 Ok(Self {
586 config: Some(value.config().try_into()?),
587 writer_options: Some(value.writer_options().try_into()?),
588 })
589 }
590}
591
592impl TryFrom<&CsvSink> for protobuf::CsvSink {
593 type Error = DataFusionError;
594
595 fn try_from(value: &CsvSink) -> Result<Self, Self::Error> {
596 Ok(Self {
597 config: Some(value.config().try_into()?),
598 writer_options: Some(value.writer_options().try_into()?),
599 })
600 }
601}
602
603#[cfg(feature = "parquet")]
604impl TryFrom<&ParquetSink> for protobuf::ParquetSink {
605 type Error = DataFusionError;
606
607 fn try_from(value: &ParquetSink) -> Result<Self, Self::Error> {
608 Ok(Self {
609 config: Some(value.config().try_into()?),
610 parquet_options: Some(value.parquet_options().try_into()?),
611 })
612 }
613}
614
615impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
616 type Error = DataFusionError;
617
618 fn try_from(conf: &FileSinkConfig) -> Result<Self, Self::Error> {
619 let file_groups = conf
620 .file_group
621 .iter()
622 .map(TryInto::try_into)
623 .collect::<Result<Vec<_>>>()?;
624 let table_paths = conf
625 .table_paths
626 .iter()
627 .map(ToString::to_string)
628 .collect::<Vec<_>>();
629 let table_partition_cols = conf
630 .table_partition_cols
631 .iter()
632 .map(|(name, data_type)| {
633 Ok(protobuf::PartitionColumn {
634 name: name.to_owned(),
635 arrow_type: Some(data_type.try_into()?),
636 })
637 })
638 .collect::<Result<Vec<_>>>()?;
639 Ok(Self {
640 object_store_url: conf.object_store_url.to_string(),
641 file_groups,
642 table_paths,
643 output_schema: Some(conf.output_schema.as_ref().try_into()?),
644 table_partition_cols,
645 keep_partition_by_columns: conf.keep_partition_by_columns,
646 insert_op: conf.insert_op as i32,
647 file_extension: conf.file_extension.to_string(),
648 })
649 }
650}