1use std::sync::Arc;
19
20#[cfg(feature = "parquet")]
21use datafusion::datasource::file_format::parquet::ParquetSink;
22use datafusion::datasource::physical_plan::FileSink;
23use datafusion::physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
24use datafusion::physical_expr::ScalarFunctionExpr;
25use datafusion::physical_expr_common::physical_expr::snapshot_physical_expr;
26use datafusion::physical_expr_common::sort_expr::PhysicalSortExpr;
27use datafusion::physical_plan::expressions::{
28 BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
29 Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
30};
31use datafusion::physical_plan::udaf::AggregateFunctionExpr;
32use datafusion::physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
33use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
34use datafusion::{
35 datasource::{
36 file_format::{csv::CsvSink, json::JsonSink},
37 listing::{FileRange, PartitionedFile},
38 physical_plan::{FileScanConfig, FileSinkConfig},
39 },
40 physical_plan::expressions::LikeExpr,
41};
42use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
43use datafusion_expr::WindowFrame;
44
45use crate::protobuf::{
46 self, physical_aggregate_expr_node, physical_window_expr_node, PhysicalSortExprNode,
47 PhysicalSortExprNodeCollection,
48};
49
50use super::PhysicalExtensionCodec;
51
52pub fn serialize_physical_aggr_expr(
53 aggr_expr: Arc<AggregateFunctionExpr>,
54 codec: &dyn PhysicalExtensionCodec,
55) -> Result<protobuf::PhysicalExprNode> {
56 let expressions = serialize_physical_exprs(&aggr_expr.expressions(), codec)?;
57 let order_bys =
58 serialize_physical_sort_exprs(aggr_expr.order_bys().iter().cloned(), codec)?;
59
60 let name = aggr_expr.fun().name().to_string();
61 let mut buf = Vec::new();
62 codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
63 Ok(protobuf::PhysicalExprNode {
64 expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
65 protobuf::PhysicalAggregateExprNode {
66 aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)),
67 expr: expressions,
68 ordering_req: order_bys,
69 distinct: aggr_expr.is_distinct(),
70 ignore_nulls: aggr_expr.ignore_nulls(),
71 fun_definition: (!buf.is_empty()).then_some(buf),
72 },
73 )),
74 })
75}
76
77fn serialize_physical_window_aggr_expr(
78 aggr_expr: &AggregateFunctionExpr,
79 _window_frame: &WindowFrame,
80 codec: &dyn PhysicalExtensionCodec,
81) -> Result<(physical_window_expr_node::WindowFunction, Option<Vec<u8>>)> {
82 if aggr_expr.is_distinct() || aggr_expr.ignore_nulls() {
83 return not_impl_err!(
85 "Distinct aggregate functions not supported in window expressions"
86 );
87 }
88
89 let mut buf = Vec::new();
90 codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
91 Ok((
92 physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(
93 aggr_expr.fun().name().to_string(),
94 ),
95 (!buf.is_empty()).then_some(buf),
96 ))
97}
98
99pub fn serialize_physical_window_expr(
100 window_expr: &Arc<dyn WindowExpr>,
101 codec: &dyn PhysicalExtensionCodec,
102) -> Result<protobuf::PhysicalWindowExprNode> {
103 let expr = window_expr.as_any();
104 let args = window_expr.expressions().to_vec();
105 let window_frame = window_expr.get_window_frame();
106
107 let (window_function, fun_definition) = if let Some(plain_aggr_window_expr) =
108 expr.downcast_ref::<PlainAggregateWindowExpr>()
109 {
110 serialize_physical_window_aggr_expr(
111 plain_aggr_window_expr.get_aggregate_expr(),
112 window_frame,
113 codec,
114 )?
115 } else if let Some(sliding_aggr_window_expr) =
116 expr.downcast_ref::<SlidingAggregateWindowExpr>()
117 {
118 serialize_physical_window_aggr_expr(
119 sliding_aggr_window_expr.get_aggregate_expr(),
120 window_frame,
121 codec,
122 )?
123 } else if let Some(udf_window_expr) = expr.downcast_ref::<StandardWindowExpr>() {
124 if let Some(expr) = udf_window_expr
125 .get_standard_func_expr()
126 .as_any()
127 .downcast_ref::<WindowUDFExpr>()
128 {
129 let mut buf = Vec::new();
130 codec.try_encode_udwf(expr.fun(), &mut buf)?;
131 (
132 physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(
133 expr.fun().name().to_string(),
134 ),
135 (!buf.is_empty()).then_some(buf),
136 )
137 } else {
138 return not_impl_err!(
139 "User-defined window function not supported: {window_expr:?}"
140 );
141 }
142 } else {
143 return not_impl_err!("WindowExpr not supported: {window_expr:?}");
144 };
145
146 let args = serialize_physical_exprs(&args, codec)?;
147 let partition_by = serialize_physical_exprs(window_expr.partition_by(), codec)?;
148 let order_by = serialize_physical_sort_exprs(window_expr.order_by().to_vec(), codec)?;
149 let window_frame: protobuf::WindowFrame = window_frame
150 .as_ref()
151 .try_into()
152 .map_err(|e| DataFusionError::Internal(format!("{e}")))?;
153
154 Ok(protobuf::PhysicalWindowExprNode {
155 args,
156 partition_by,
157 order_by,
158 window_frame: Some(window_frame),
159 window_function: Some(window_function),
160 name: window_expr.name().to_string(),
161 fun_definition,
162 })
163}
164
165pub fn serialize_physical_sort_exprs<I>(
166 sort_exprs: I,
167 codec: &dyn PhysicalExtensionCodec,
168) -> Result<Vec<PhysicalSortExprNode>>
169where
170 I: IntoIterator<Item = PhysicalSortExpr>,
171{
172 sort_exprs
173 .into_iter()
174 .map(|sort_expr| serialize_physical_sort_expr(sort_expr, codec))
175 .collect()
176}
177
178pub fn serialize_physical_sort_expr(
179 sort_expr: PhysicalSortExpr,
180 codec: &dyn PhysicalExtensionCodec,
181) -> Result<PhysicalSortExprNode> {
182 let PhysicalSortExpr { expr, options } = sort_expr;
183 let expr = serialize_physical_expr(&expr, codec)?;
184 Ok(PhysicalSortExprNode {
185 expr: Some(Box::new(expr)),
186 asc: !options.descending,
187 nulls_first: options.nulls_first,
188 })
189}
190
191pub fn serialize_physical_exprs<'a, I>(
192 values: I,
193 codec: &dyn PhysicalExtensionCodec,
194) -> Result<Vec<protobuf::PhysicalExprNode>>
195where
196 I: IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
197{
198 values
199 .into_iter()
200 .map(|value| serialize_physical_expr(value, codec))
201 .collect()
202}
203
204pub fn serialize_physical_expr(
209 value: &Arc<dyn PhysicalExpr>,
210 codec: &dyn PhysicalExtensionCodec,
211) -> Result<protobuf::PhysicalExprNode> {
212 let value = snapshot_physical_expr(Arc::clone(value))?;
215 let expr = value.as_any();
216
217 if let Some(expr) = expr.downcast_ref::<Column>() {
218 Ok(protobuf::PhysicalExprNode {
219 expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
220 protobuf::PhysicalColumn {
221 name: expr.name().to_string(),
222 index: expr.index() as u32,
223 },
224 )),
225 })
226 } else if let Some(expr) = expr.downcast_ref::<UnKnownColumn>() {
227 Ok(protobuf::PhysicalExprNode {
228 expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn(
229 protobuf::UnknownColumn {
230 name: expr.name().to_string(),
231 },
232 )),
233 })
234 } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
235 let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
236 l: Some(Box::new(serialize_physical_expr(expr.left(), codec)?)),
237 r: Some(Box::new(serialize_physical_expr(expr.right(), codec)?)),
238 op: format!("{:?}", expr.op()),
239 });
240
241 Ok(protobuf::PhysicalExprNode {
242 expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
243 binary_expr,
244 )),
245 })
246 } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
247 Ok(protobuf::PhysicalExprNode {
248 expr_type: Some(
249 protobuf::physical_expr_node::ExprType::Case(
250 Box::new(
251 protobuf::PhysicalCaseNode {
252 expr: expr
253 .expr()
254 .map(|exp| {
255 serialize_physical_expr(exp, codec).map(Box::new)
256 })
257 .transpose()?,
258 when_then_expr: expr
259 .when_then_expr()
260 .iter()
261 .map(|(when_expr, then_expr)| {
262 serialize_when_then_expr(when_expr, then_expr, codec)
263 })
264 .collect::<Result<
265 Vec<protobuf::PhysicalWhenThen>,
266 DataFusionError,
267 >>()?,
268 else_expr: expr
269 .else_expr()
270 .map(|a| serialize_physical_expr(a, codec).map(Box::new))
271 .transpose()?,
272 },
273 ),
274 ),
275 ),
276 })
277 } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
278 Ok(protobuf::PhysicalExprNode {
279 expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
280 protobuf::PhysicalNot {
281 expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
282 },
283 ))),
284 })
285 } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
286 Ok(protobuf::PhysicalExprNode {
287 expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
288 Box::new(protobuf::PhysicalIsNull {
289 expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
290 }),
291 )),
292 })
293 } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
294 Ok(protobuf::PhysicalExprNode {
295 expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
296 Box::new(protobuf::PhysicalIsNotNull {
297 expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
298 }),
299 )),
300 })
301 } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
302 Ok(protobuf::PhysicalExprNode {
303 expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new(
304 protobuf::PhysicalInListNode {
305 expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)),
306 list: serialize_physical_exprs(expr.list(), codec)?,
307 negated: expr.negated(),
308 },
309 ))),
310 })
311 } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
312 Ok(protobuf::PhysicalExprNode {
313 expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
314 protobuf::PhysicalNegativeNode {
315 expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
316 },
317 ))),
318 })
319 } else if let Some(lit) = expr.downcast_ref::<Literal>() {
320 Ok(protobuf::PhysicalExprNode {
321 expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
322 lit.value().try_into()?,
323 )),
324 })
325 } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
326 Ok(protobuf::PhysicalExprNode {
327 expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
328 protobuf::PhysicalCastNode {
329 expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)),
330 arrow_type: Some(cast.cast_type().try_into()?),
331 },
332 ))),
333 })
334 } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
335 Ok(protobuf::PhysicalExprNode {
336 expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new(
337 protobuf::PhysicalTryCastNode {
338 expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)),
339 arrow_type: Some(cast.cast_type().try_into()?),
340 },
341 ))),
342 })
343 } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
344 let mut buf = Vec::new();
345 codec.try_encode_udf(expr.fun(), &mut buf)?;
346 Ok(protobuf::PhysicalExprNode {
347 expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf(
348 protobuf::PhysicalScalarUdfNode {
349 name: expr.name().to_string(),
350 args: serialize_physical_exprs(expr.args(), codec)?,
351 fun_definition: (!buf.is_empty()).then_some(buf),
352 return_type: Some(expr.return_type().try_into()?),
353 nullable: expr.nullable(),
354 },
355 )),
356 })
357 } else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
358 Ok(protobuf::PhysicalExprNode {
359 expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new(
360 protobuf::PhysicalLikeExprNode {
361 negated: expr.negated(),
362 case_insensitive: expr.case_insensitive(),
363 expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)),
364 pattern: Some(Box::new(serialize_physical_expr(
365 expr.pattern(),
366 codec,
367 )?)),
368 },
369 ))),
370 })
371 } else {
372 let mut buf: Vec<u8> = vec![];
373 match codec.try_encode_expr(&value, &mut buf) {
374 Ok(_) => {
375 let inputs: Vec<protobuf::PhysicalExprNode> = value
376 .children()
377 .into_iter()
378 .map(|e| serialize_physical_expr(e, codec))
379 .collect::<Result<_>>()?;
380 Ok(protobuf::PhysicalExprNode {
381 expr_type: Some(protobuf::physical_expr_node::ExprType::Extension(
382 protobuf::PhysicalExtensionExprNode { expr: buf, inputs },
383 )),
384 })
385 }
386 Err(e) => internal_err!(
387 "Unsupported physical expr and extension codec failed with [{e}]. Expr: {value:?}"
388 ),
389 }
390 }
391}
392
393pub fn serialize_partitioning(
394 partitioning: &Partitioning,
395 codec: &dyn PhysicalExtensionCodec,
396) -> Result<protobuf::Partitioning> {
397 let serialized_partitioning = match partitioning {
398 Partitioning::RoundRobinBatch(partition_count) => protobuf::Partitioning {
399 partition_method: Some(protobuf::partitioning::PartitionMethod::RoundRobin(
400 *partition_count as u64,
401 )),
402 },
403 Partitioning::Hash(exprs, partition_count) => {
404 let serialized_exprs = serialize_physical_exprs(exprs, codec)?;
405 protobuf::Partitioning {
406 partition_method: Some(protobuf::partitioning::PartitionMethod::Hash(
407 protobuf::PhysicalHashRepartition {
408 hash_expr: serialized_exprs,
409 partition_count: *partition_count as u64,
410 },
411 )),
412 }
413 }
414 Partitioning::UnknownPartitioning(partition_count) => protobuf::Partitioning {
415 partition_method: Some(protobuf::partitioning::PartitionMethod::Unknown(
416 *partition_count as u64,
417 )),
418 },
419 };
420 Ok(serialized_partitioning)
421}
422
423fn serialize_when_then_expr(
424 when_expr: &Arc<dyn PhysicalExpr>,
425 then_expr: &Arc<dyn PhysicalExpr>,
426 codec: &dyn PhysicalExtensionCodec,
427) -> Result<protobuf::PhysicalWhenThen> {
428 Ok(protobuf::PhysicalWhenThen {
429 when_expr: Some(serialize_physical_expr(when_expr, codec)?),
430 then_expr: Some(serialize_physical_expr(then_expr, codec)?),
431 })
432}
433
434impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile {
435 type Error = DataFusionError;
436
437 fn try_from(pf: &PartitionedFile) -> Result<Self> {
438 let last_modified = pf.object_meta.last_modified;
439 let last_modified_ns = last_modified.timestamp_nanos_opt().ok_or_else(|| {
440 DataFusionError::Plan(format!(
441 "Invalid timestamp on PartitionedFile::ObjectMeta: {last_modified}"
442 ))
443 })? as u64;
444 Ok(protobuf::PartitionedFile {
445 path: pf.object_meta.location.as_ref().to_owned(),
446 size: pf.object_meta.size,
447 last_modified_ns,
448 partition_values: pf
449 .partition_values
450 .iter()
451 .map(|v| v.try_into())
452 .collect::<Result<Vec<_>, _>>()?,
453 range: pf.range.as_ref().map(|r| r.try_into()).transpose()?,
454 statistics: pf.statistics.as_ref().map(|s| s.as_ref().into()),
455 })
456 }
457}
458
459impl TryFrom<&FileRange> for protobuf::FileRange {
460 type Error = DataFusionError;
461
462 fn try_from(value: &FileRange) -> Result<Self> {
463 Ok(protobuf::FileRange {
464 start: value.start,
465 end: value.end,
466 })
467 }
468}
469
470impl TryFrom<&[PartitionedFile]> for protobuf::FileGroup {
471 type Error = DataFusionError;
472
473 fn try_from(gr: &[PartitionedFile]) -> Result<Self, Self::Error> {
474 Ok(protobuf::FileGroup {
475 files: gr
476 .iter()
477 .map(|f| f.try_into())
478 .collect::<Result<Vec<_>, _>>()?,
479 })
480 }
481}
482
483pub fn serialize_file_scan_config(
484 conf: &FileScanConfig,
485 codec: &dyn PhysicalExtensionCodec,
486) -> Result<protobuf::FileScanExecConf> {
487 let file_groups = conf
488 .file_groups
489 .iter()
490 .map(|p| p.files().try_into())
491 .collect::<Result<Vec<_>, _>>()?;
492
493 let mut output_orderings = vec![];
494 for order in &conf.output_ordering {
495 let ordering = serialize_physical_sort_exprs(order.to_vec(), codec)?;
496 output_orderings.push(ordering)
497 }
498
499 let mut fields = conf
502 .file_schema
503 .fields()
504 .iter()
505 .cloned()
506 .collect::<Vec<_>>();
507 fields.extend(conf.table_partition_cols.iter().cloned());
508 let schema = Arc::new(arrow::datatypes::Schema::new(fields.clone()));
509
510 Ok(protobuf::FileScanExecConf {
511 file_groups,
512 statistics: Some((&conf.file_source.statistics().unwrap()).into()),
513 limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }),
514 projection: conf
515 .projection
516 .as_ref()
517 .unwrap_or(&(0..schema.fields().len()).collect::<Vec<_>>())
518 .iter()
519 .map(|n| *n as u32)
520 .collect(),
521 schema: Some(schema.as_ref().try_into()?),
522 table_partition_cols: conf
523 .table_partition_cols
524 .iter()
525 .map(|x| x.name().clone())
526 .collect::<Vec<_>>(),
527 object_store_url: conf.object_store_url.to_string(),
528 output_ordering: output_orderings
529 .into_iter()
530 .map(|e| PhysicalSortExprNodeCollection {
531 physical_sort_expr_nodes: e,
532 })
533 .collect::<Vec<_>>(),
534 constraints: Some(conf.constraints.clone().into()),
535 batch_size: conf.batch_size.map(|s| s as u64),
536 })
537}
538
539pub fn serialize_maybe_filter(
540 expr: Option<Arc<dyn PhysicalExpr>>,
541 codec: &dyn PhysicalExtensionCodec,
542) -> Result<protobuf::MaybeFilter> {
543 match expr {
544 None => Ok(protobuf::MaybeFilter { expr: None }),
545 Some(expr) => Ok(protobuf::MaybeFilter {
546 expr: Some(serialize_physical_expr(&expr, codec)?),
547 }),
548 }
549}
550
551impl TryFrom<&JsonSink> for protobuf::JsonSink {
552 type Error = DataFusionError;
553
554 fn try_from(value: &JsonSink) -> Result<Self, Self::Error> {
555 Ok(Self {
556 config: Some(value.config().try_into()?),
557 writer_options: Some(value.writer_options().try_into()?),
558 })
559 }
560}
561
562impl TryFrom<&CsvSink> for protobuf::CsvSink {
563 type Error = DataFusionError;
564
565 fn try_from(value: &CsvSink) -> Result<Self, Self::Error> {
566 Ok(Self {
567 config: Some(value.config().try_into()?),
568 writer_options: Some(value.writer_options().try_into()?),
569 })
570 }
571}
572
573#[cfg(feature = "parquet")]
574impl TryFrom<&ParquetSink> for protobuf::ParquetSink {
575 type Error = DataFusionError;
576
577 fn try_from(value: &ParquetSink) -> Result<Self, Self::Error> {
578 Ok(Self {
579 config: Some(value.config().try_into()?),
580 parquet_options: Some(value.parquet_options().try_into()?),
581 })
582 }
583}
584
585impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
586 type Error = DataFusionError;
587
588 fn try_from(conf: &FileSinkConfig) -> Result<Self, Self::Error> {
589 let file_groups = conf
590 .file_group
591 .iter()
592 .map(TryInto::try_into)
593 .collect::<Result<Vec<_>>>()?;
594 let table_paths = conf
595 .table_paths
596 .iter()
597 .map(ToString::to_string)
598 .collect::<Vec<_>>();
599 let table_partition_cols = conf
600 .table_partition_cols
601 .iter()
602 .map(|(name, data_type)| {
603 Ok(protobuf::PartitionColumn {
604 name: name.to_owned(),
605 arrow_type: Some(data_type.try_into()?),
606 })
607 })
608 .collect::<Result<Vec<_>>>()?;
609 Ok(Self {
610 object_store_url: conf.object_store_url.to_string(),
611 file_groups,
612 table_paths,
613 output_schema: Some(conf.output_schema.as_ref().try_into()?),
614 table_partition_cols,
615 keep_partition_by_columns: conf.keep_partition_by_columns,
616 insert_op: conf.insert_op as i32,
617 file_extension: conf.file_extension.to_string(),
618 })
619 }
620}