1use std::sync::Arc;
19
20#[cfg(feature = "parquet")]
21use datafusion::datasource::file_format::parquet::ParquetSink;
22use datafusion::datasource::physical_plan::FileSink;
23use datafusion::physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
24use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
25use datafusion::physical_expr_common::physical_expr::snapshot_physical_expr;
26use datafusion::physical_plan::expressions::{
27 BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
28 Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
29};
30use datafusion::physical_plan::udaf::AggregateFunctionExpr;
31use datafusion::physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
32use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
33use datafusion::{
34 datasource::{
35 file_format::{csv::CsvSink, json::JsonSink},
36 listing::{FileRange, PartitionedFile},
37 physical_plan::{FileScanConfig, FileSinkConfig},
38 },
39 physical_plan::expressions::LikeExpr,
40};
41use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
42use datafusion_expr::WindowFrame;
43
44use crate::protobuf::{
45 self, physical_aggregate_expr_node, physical_window_expr_node, PhysicalSortExprNode,
46 PhysicalSortExprNodeCollection,
47};
48
49use super::PhysicalExtensionCodec;
50
51pub fn serialize_physical_aggr_expr(
52 aggr_expr: Arc<AggregateFunctionExpr>,
53 codec: &dyn PhysicalExtensionCodec,
54) -> Result<protobuf::PhysicalExprNode> {
55 let expressions = serialize_physical_exprs(&aggr_expr.expressions(), codec)?;
56 let ordering_req = match aggr_expr.order_bys() {
57 Some(order) => order.clone(),
58 None => LexOrdering::default(),
59 };
60 let ordering_req = serialize_physical_sort_exprs(ordering_req, codec)?;
61
62 let name = aggr_expr.fun().name().to_string();
63 let mut buf = Vec::new();
64 codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
65 Ok(protobuf::PhysicalExprNode {
66 expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
67 protobuf::PhysicalAggregateExprNode {
68 aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)),
69 expr: expressions,
70 ordering_req,
71 distinct: aggr_expr.is_distinct(),
72 ignore_nulls: aggr_expr.ignore_nulls(),
73 fun_definition: (!buf.is_empty()).then_some(buf),
74 },
75 )),
76 })
77}
78
79fn serialize_physical_window_aggr_expr(
80 aggr_expr: &AggregateFunctionExpr,
81 _window_frame: &WindowFrame,
82 codec: &dyn PhysicalExtensionCodec,
83) -> Result<(physical_window_expr_node::WindowFunction, Option<Vec<u8>>)> {
84 if aggr_expr.is_distinct() || aggr_expr.ignore_nulls() {
85 return not_impl_err!(
87 "Distinct aggregate functions not supported in window expressions"
88 );
89 }
90
91 let mut buf = Vec::new();
92 codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
93 Ok((
94 physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(
95 aggr_expr.fun().name().to_string(),
96 ),
97 (!buf.is_empty()).then_some(buf),
98 ))
99}
100
101pub fn serialize_physical_window_expr(
102 window_expr: &Arc<dyn WindowExpr>,
103 codec: &dyn PhysicalExtensionCodec,
104) -> Result<protobuf::PhysicalWindowExprNode> {
105 let expr = window_expr.as_any();
106 let args = window_expr.expressions().to_vec();
107 let window_frame = window_expr.get_window_frame();
108
109 let (window_function, fun_definition) = if let Some(plain_aggr_window_expr) =
110 expr.downcast_ref::<PlainAggregateWindowExpr>()
111 {
112 serialize_physical_window_aggr_expr(
113 plain_aggr_window_expr.get_aggregate_expr(),
114 window_frame,
115 codec,
116 )?
117 } else if let Some(sliding_aggr_window_expr) =
118 expr.downcast_ref::<SlidingAggregateWindowExpr>()
119 {
120 serialize_physical_window_aggr_expr(
121 sliding_aggr_window_expr.get_aggregate_expr(),
122 window_frame,
123 codec,
124 )?
125 } else if let Some(udf_window_expr) = expr.downcast_ref::<StandardWindowExpr>() {
126 if let Some(expr) = udf_window_expr
127 .get_standard_func_expr()
128 .as_any()
129 .downcast_ref::<WindowUDFExpr>()
130 {
131 let mut buf = Vec::new();
132 codec.try_encode_udwf(expr.fun(), &mut buf)?;
133 (
134 physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(
135 expr.fun().name().to_string(),
136 ),
137 (!buf.is_empty()).then_some(buf),
138 )
139 } else {
140 return not_impl_err!(
141 "User-defined window function not supported: {window_expr:?}"
142 );
143 }
144 } else {
145 return not_impl_err!("WindowExpr not supported: {window_expr:?}");
146 };
147
148 let args = serialize_physical_exprs(&args, codec)?;
149 let partition_by = serialize_physical_exprs(window_expr.partition_by(), codec)?;
150 let order_by = serialize_physical_sort_exprs(window_expr.order_by().to_vec(), codec)?;
151 let window_frame: protobuf::WindowFrame = window_frame
152 .as_ref()
153 .try_into()
154 .map_err(|e| DataFusionError::Internal(format!("{e}")))?;
155
156 Ok(protobuf::PhysicalWindowExprNode {
157 args,
158 partition_by,
159 order_by,
160 window_frame: Some(window_frame),
161 window_function: Some(window_function),
162 name: window_expr.name().to_string(),
163 fun_definition,
164 })
165}
166
167pub fn serialize_physical_sort_exprs<I>(
168 sort_exprs: I,
169 codec: &dyn PhysicalExtensionCodec,
170) -> Result<Vec<PhysicalSortExprNode>>
171where
172 I: IntoIterator<Item = PhysicalSortExpr>,
173{
174 sort_exprs
175 .into_iter()
176 .map(|sort_expr| serialize_physical_sort_expr(sort_expr, codec))
177 .collect()
178}
179
180pub fn serialize_physical_sort_expr(
181 sort_expr: PhysicalSortExpr,
182 codec: &dyn PhysicalExtensionCodec,
183) -> Result<PhysicalSortExprNode> {
184 let PhysicalSortExpr { expr, options } = sort_expr;
185 let expr = serialize_physical_expr(&expr, codec)?;
186 Ok(PhysicalSortExprNode {
187 expr: Some(Box::new(expr)),
188 asc: !options.descending,
189 nulls_first: options.nulls_first,
190 })
191}
192
193pub fn serialize_physical_exprs<'a, I>(
194 values: I,
195 codec: &dyn PhysicalExtensionCodec,
196) -> Result<Vec<protobuf::PhysicalExprNode>>
197where
198 I: IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
199{
200 values
201 .into_iter()
202 .map(|value| serialize_physical_expr(value, codec))
203 .collect()
204}
205
206pub fn serialize_physical_expr(
211 value: &Arc<dyn PhysicalExpr>,
212 codec: &dyn PhysicalExtensionCodec,
213) -> Result<protobuf::PhysicalExprNode> {
214 let value = snapshot_physical_expr(Arc::clone(value))?;
217 let expr = value.as_any();
218
219 if let Some(expr) = expr.downcast_ref::<Column>() {
220 Ok(protobuf::PhysicalExprNode {
221 expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
222 protobuf::PhysicalColumn {
223 name: expr.name().to_string(),
224 index: expr.index() as u32,
225 },
226 )),
227 })
228 } else if let Some(expr) = expr.downcast_ref::<UnKnownColumn>() {
229 Ok(protobuf::PhysicalExprNode {
230 expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn(
231 protobuf::UnknownColumn {
232 name: expr.name().to_string(),
233 },
234 )),
235 })
236 } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
237 let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
238 l: Some(Box::new(serialize_physical_expr(expr.left(), codec)?)),
239 r: Some(Box::new(serialize_physical_expr(expr.right(), codec)?)),
240 op: format!("{:?}", expr.op()),
241 });
242
243 Ok(protobuf::PhysicalExprNode {
244 expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
245 binary_expr,
246 )),
247 })
248 } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
249 Ok(protobuf::PhysicalExprNode {
250 expr_type: Some(
251 protobuf::physical_expr_node::ExprType::Case(
252 Box::new(
253 protobuf::PhysicalCaseNode {
254 expr: expr
255 .expr()
256 .map(|exp| {
257 serialize_physical_expr(exp, codec).map(Box::new)
258 })
259 .transpose()?,
260 when_then_expr: expr
261 .when_then_expr()
262 .iter()
263 .map(|(when_expr, then_expr)| {
264 serialize_when_then_expr(when_expr, then_expr, codec)
265 })
266 .collect::<Result<
267 Vec<protobuf::PhysicalWhenThen>,
268 DataFusionError,
269 >>()?,
270 else_expr: expr
271 .else_expr()
272 .map(|a| serialize_physical_expr(a, codec).map(Box::new))
273 .transpose()?,
274 },
275 ),
276 ),
277 ),
278 })
279 } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
280 Ok(protobuf::PhysicalExprNode {
281 expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
282 protobuf::PhysicalNot {
283 expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
284 },
285 ))),
286 })
287 } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
288 Ok(protobuf::PhysicalExprNode {
289 expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
290 Box::new(protobuf::PhysicalIsNull {
291 expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
292 }),
293 )),
294 })
295 } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
296 Ok(protobuf::PhysicalExprNode {
297 expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
298 Box::new(protobuf::PhysicalIsNotNull {
299 expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
300 }),
301 )),
302 })
303 } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
304 Ok(protobuf::PhysicalExprNode {
305 expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new(
306 protobuf::PhysicalInListNode {
307 expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)),
308 list: serialize_physical_exprs(expr.list(), codec)?,
309 negated: expr.negated(),
310 },
311 ))),
312 })
313 } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
314 Ok(protobuf::PhysicalExprNode {
315 expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
316 protobuf::PhysicalNegativeNode {
317 expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
318 },
319 ))),
320 })
321 } else if let Some(lit) = expr.downcast_ref::<Literal>() {
322 Ok(protobuf::PhysicalExprNode {
323 expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
324 lit.value().try_into()?,
325 )),
326 })
327 } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
328 Ok(protobuf::PhysicalExprNode {
329 expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
330 protobuf::PhysicalCastNode {
331 expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)),
332 arrow_type: Some(cast.cast_type().try_into()?),
333 },
334 ))),
335 })
336 } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
337 Ok(protobuf::PhysicalExprNode {
338 expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new(
339 protobuf::PhysicalTryCastNode {
340 expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)),
341 arrow_type: Some(cast.cast_type().try_into()?),
342 },
343 ))),
344 })
345 } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
346 let mut buf = Vec::new();
347 codec.try_encode_udf(expr.fun(), &mut buf)?;
348 Ok(protobuf::PhysicalExprNode {
349 expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf(
350 protobuf::PhysicalScalarUdfNode {
351 name: expr.name().to_string(),
352 args: serialize_physical_exprs(expr.args(), codec)?,
353 fun_definition: (!buf.is_empty()).then_some(buf),
354 return_type: Some(expr.return_type().try_into()?),
355 nullable: expr.nullable(),
356 },
357 )),
358 })
359 } else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
360 Ok(protobuf::PhysicalExprNode {
361 expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new(
362 protobuf::PhysicalLikeExprNode {
363 negated: expr.negated(),
364 case_insensitive: expr.case_insensitive(),
365 expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)),
366 pattern: Some(Box::new(serialize_physical_expr(
367 expr.pattern(),
368 codec,
369 )?)),
370 },
371 ))),
372 })
373 } else {
374 let mut buf: Vec<u8> = vec![];
375 match codec.try_encode_expr(&value, &mut buf) {
376 Ok(_) => {
377 let inputs: Vec<protobuf::PhysicalExprNode> = value
378 .children()
379 .into_iter()
380 .map(|e| serialize_physical_expr(e, codec))
381 .collect::<Result<_>>()?;
382 Ok(protobuf::PhysicalExprNode {
383 expr_type: Some(protobuf::physical_expr_node::ExprType::Extension(
384 protobuf::PhysicalExtensionExprNode { expr: buf, inputs },
385 )),
386 })
387 }
388 Err(e) => internal_err!(
389 "Unsupported physical expr and extension codec failed with [{e}]. Expr: {value:?}"
390 ),
391 }
392 }
393}
394
395pub fn serialize_partitioning(
396 partitioning: &Partitioning,
397 codec: &dyn PhysicalExtensionCodec,
398) -> Result<protobuf::Partitioning> {
399 let serialized_partitioning = match partitioning {
400 Partitioning::RoundRobinBatch(partition_count) => protobuf::Partitioning {
401 partition_method: Some(protobuf::partitioning::PartitionMethod::RoundRobin(
402 *partition_count as u64,
403 )),
404 },
405 Partitioning::Hash(exprs, partition_count) => {
406 let serialized_exprs = serialize_physical_exprs(exprs, codec)?;
407 protobuf::Partitioning {
408 partition_method: Some(protobuf::partitioning::PartitionMethod::Hash(
409 protobuf::PhysicalHashRepartition {
410 hash_expr: serialized_exprs,
411 partition_count: *partition_count as u64,
412 },
413 )),
414 }
415 }
416 Partitioning::UnknownPartitioning(partition_count) => protobuf::Partitioning {
417 partition_method: Some(protobuf::partitioning::PartitionMethod::Unknown(
418 *partition_count as u64,
419 )),
420 },
421 };
422 Ok(serialized_partitioning)
423}
424
425fn serialize_when_then_expr(
426 when_expr: &Arc<dyn PhysicalExpr>,
427 then_expr: &Arc<dyn PhysicalExpr>,
428 codec: &dyn PhysicalExtensionCodec,
429) -> Result<protobuf::PhysicalWhenThen> {
430 Ok(protobuf::PhysicalWhenThen {
431 when_expr: Some(serialize_physical_expr(when_expr, codec)?),
432 then_expr: Some(serialize_physical_expr(then_expr, codec)?),
433 })
434}
435
436impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile {
437 type Error = DataFusionError;
438
439 fn try_from(pf: &PartitionedFile) -> Result<Self> {
440 let last_modified = pf.object_meta.last_modified;
441 let last_modified_ns = last_modified.timestamp_nanos_opt().ok_or_else(|| {
442 DataFusionError::Plan(format!(
443 "Invalid timestamp on PartitionedFile::ObjectMeta: {last_modified}"
444 ))
445 })? as u64;
446 Ok(protobuf::PartitionedFile {
447 path: pf.object_meta.location.as_ref().to_owned(),
448 size: pf.object_meta.size,
449 last_modified_ns,
450 partition_values: pf
451 .partition_values
452 .iter()
453 .map(|v| v.try_into())
454 .collect::<Result<Vec<_>, _>>()?,
455 range: pf.range.as_ref().map(|r| r.try_into()).transpose()?,
456 statistics: pf.statistics.as_ref().map(|s| s.as_ref().into()),
457 })
458 }
459}
460
461impl TryFrom<&FileRange> for protobuf::FileRange {
462 type Error = DataFusionError;
463
464 fn try_from(value: &FileRange) -> Result<Self> {
465 Ok(protobuf::FileRange {
466 start: value.start,
467 end: value.end,
468 })
469 }
470}
471
472impl TryFrom<&[PartitionedFile]> for protobuf::FileGroup {
473 type Error = DataFusionError;
474
475 fn try_from(gr: &[PartitionedFile]) -> Result<Self, Self::Error> {
476 Ok(protobuf::FileGroup {
477 files: gr
478 .iter()
479 .map(|f| f.try_into())
480 .collect::<Result<Vec<_>, _>>()?,
481 })
482 }
483}
484
485pub fn serialize_file_scan_config(
486 conf: &FileScanConfig,
487 codec: &dyn PhysicalExtensionCodec,
488) -> Result<protobuf::FileScanExecConf> {
489 let file_groups = conf
490 .file_groups
491 .iter()
492 .map(|p| p.files().try_into())
493 .collect::<Result<Vec<_>, _>>()?;
494
495 let mut output_orderings = vec![];
496 for order in &conf.output_ordering {
497 let ordering = serialize_physical_sort_exprs(order.to_vec(), codec)?;
498 output_orderings.push(ordering)
499 }
500
501 let mut fields = conf
504 .file_schema
505 .fields()
506 .iter()
507 .cloned()
508 .collect::<Vec<_>>();
509 fields.extend(conf.table_partition_cols.iter().cloned().map(Arc::new));
510 let schema = Arc::new(arrow::datatypes::Schema::new(fields.clone()));
511
512 Ok(protobuf::FileScanExecConf {
513 file_groups,
514 statistics: Some((&conf.file_source.statistics().unwrap()).into()),
515 limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }),
516 projection: conf
517 .projection
518 .as_ref()
519 .unwrap_or(&(0..schema.fields().len()).collect::<Vec<_>>())
520 .iter()
521 .map(|n| *n as u32)
522 .collect(),
523 schema: Some(schema.as_ref().try_into()?),
524 table_partition_cols: conf
525 .table_partition_cols
526 .iter()
527 .map(|x| x.name().clone())
528 .collect::<Vec<_>>(),
529 object_store_url: conf.object_store_url.to_string(),
530 output_ordering: output_orderings
531 .into_iter()
532 .map(|e| PhysicalSortExprNodeCollection {
533 physical_sort_expr_nodes: e,
534 })
535 .collect::<Vec<_>>(),
536 constraints: Some(conf.constraints.clone().into()),
537 batch_size: conf.batch_size.map(|s| s as u64),
538 })
539}
540
541pub fn serialize_maybe_filter(
542 expr: Option<Arc<dyn PhysicalExpr>>,
543 codec: &dyn PhysicalExtensionCodec,
544) -> Result<protobuf::MaybeFilter> {
545 match expr {
546 None => Ok(protobuf::MaybeFilter { expr: None }),
547 Some(expr) => Ok(protobuf::MaybeFilter {
548 expr: Some(serialize_physical_expr(&expr, codec)?),
549 }),
550 }
551}
552
553impl TryFrom<&JsonSink> for protobuf::JsonSink {
554 type Error = DataFusionError;
555
556 fn try_from(value: &JsonSink) -> Result<Self, Self::Error> {
557 Ok(Self {
558 config: Some(value.config().try_into()?),
559 writer_options: Some(value.writer_options().try_into()?),
560 })
561 }
562}
563
564impl TryFrom<&CsvSink> for protobuf::CsvSink {
565 type Error = DataFusionError;
566
567 fn try_from(value: &CsvSink) -> Result<Self, Self::Error> {
568 Ok(Self {
569 config: Some(value.config().try_into()?),
570 writer_options: Some(value.writer_options().try_into()?),
571 })
572 }
573}
574
575#[cfg(feature = "parquet")]
576impl TryFrom<&ParquetSink> for protobuf::ParquetSink {
577 type Error = DataFusionError;
578
579 fn try_from(value: &ParquetSink) -> Result<Self, Self::Error> {
580 Ok(Self {
581 config: Some(value.config().try_into()?),
582 parquet_options: Some(value.parquet_options().try_into()?),
583 })
584 }
585}
586
587impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
588 type Error = DataFusionError;
589
590 fn try_from(conf: &FileSinkConfig) -> Result<Self, Self::Error> {
591 let file_groups = conf
592 .file_group
593 .iter()
594 .map(TryInto::try_into)
595 .collect::<Result<Vec<_>>>()?;
596 let table_paths = conf
597 .table_paths
598 .iter()
599 .map(ToString::to_string)
600 .collect::<Vec<_>>();
601 let table_partition_cols = conf
602 .table_partition_cols
603 .iter()
604 .map(|(name, data_type)| {
605 Ok(protobuf::PartitionColumn {
606 name: name.to_owned(),
607 arrow_type: Some(data_type.try_into()?),
608 })
609 })
610 .collect::<Result<Vec<_>>>()?;
611 Ok(Self {
612 object_store_url: conf.object_store_url.to_string(),
613 file_groups,
614 table_paths,
615 output_schema: Some(conf.output_schema.as_ref().try_into()?),
616 table_partition_cols,
617 keep_partition_by_columns: conf.keep_partition_by_columns,
618 insert_op: conf.insert_op as i32,
619 file_extension: conf.file_extension.to_string(),
620 })
621 }
622}