1use std::sync::Arc;
21
22use arrow::array::RecordBatch;
23use arrow::compute::SortOptions;
24use arrow::datatypes::Field;
25use arrow::ipc::reader::StreamReader;
26use chrono::{TimeZone, Utc};
27use datafusion_expr::dml::InsertOp;
28use object_store::ObjectMeta;
29use object_store::path::Path;
30
31use arrow::datatypes::Schema;
32use datafusion_common::{DataFusionError, Result, internal_datafusion_err, not_impl_err};
33use datafusion_datasource::file::FileSource;
34use datafusion_datasource::file_groups::FileGroup;
35use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
36use datafusion_datasource::file_sink_config::FileSinkConfig;
37use datafusion_datasource::{FileRange, ListingTableUrl, PartitionedFile, TableSchema};
38use datafusion_datasource_csv::file_format::CsvSink;
39use datafusion_datasource_json::file_format::JsonSink;
40#[cfg(feature = "parquet")]
41use datafusion_datasource_parquet::file_format::ParquetSink;
42use datafusion_execution::object_store::ObjectStoreUrl;
43use datafusion_execution::{FunctionRegistry, TaskContext};
44use datafusion_expr::WindowFunctionDefinition;
45use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs};
46use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
47use datafusion_physical_plan::expressions::{
48 BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr, Literal,
49 NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, in_list,
50};
51use datafusion_physical_plan::joins::{HashExpr, SeededRandomState};
52use datafusion_physical_plan::windows::{create_window_expr, schema_add_window_field};
53use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
54use datafusion_proto_common::common::proto_error;
55
56use crate::convert_required;
57use crate::logical_plan::{self};
58use crate::protobuf;
59use crate::protobuf::physical_expr_node::ExprType;
60
61use super::PhysicalExtensionCodec;
62
63impl From<&protobuf::PhysicalColumn> for Column {
64 fn from(c: &protobuf::PhysicalColumn) -> Column {
65 Column::new(&c.name, c.index as usize)
66 }
67}
68
69pub fn parse_physical_sort_expr(
79 proto: &protobuf::PhysicalSortExprNode,
80 ctx: &TaskContext,
81 input_schema: &Schema,
82 codec: &dyn PhysicalExtensionCodec,
83) -> Result<PhysicalSortExpr> {
84 if let Some(expr) = &proto.expr {
85 let expr = parse_physical_expr(expr.as_ref(), ctx, input_schema, codec)?;
86 let options = SortOptions {
87 descending: !proto.asc,
88 nulls_first: proto.nulls_first,
89 };
90 Ok(PhysicalSortExpr { expr, options })
91 } else {
92 Err(proto_error("Unexpected empty physical expression"))
93 }
94}
95
96pub fn parse_physical_sort_exprs(
106 proto: &[protobuf::PhysicalSortExprNode],
107 ctx: &TaskContext,
108 input_schema: &Schema,
109 codec: &dyn PhysicalExtensionCodec,
110) -> Result<Vec<PhysicalSortExpr>> {
111 proto
112 .iter()
113 .map(|sort_expr| parse_physical_sort_expr(sort_expr, ctx, input_schema, codec))
114 .collect()
115}
116
117pub fn parse_physical_window_expr(
128 proto: &protobuf::PhysicalWindowExprNode,
129 ctx: &TaskContext,
130 input_schema: &Schema,
131 codec: &dyn PhysicalExtensionCodec,
132) -> Result<Arc<dyn WindowExpr>> {
133 let window_node_expr = parse_physical_exprs(&proto.args, ctx, input_schema, codec)?;
134 let partition_by =
135 parse_physical_exprs(&proto.partition_by, ctx, input_schema, codec)?;
136
137 let order_by = parse_physical_sort_exprs(&proto.order_by, ctx, input_schema, codec)?;
138
139 let window_frame = proto
140 .window_frame
141 .as_ref()
142 .map(|wf| wf.clone().try_into())
143 .transpose()
144 .map_err(|e| internal_datafusion_err!("{e}"))?
145 .ok_or_else(|| {
146 internal_datafusion_err!("Missing required field 'window_frame' in protobuf")
147 })?;
148
149 let fun = if let Some(window_func) = proto.window_function.as_ref() {
150 match window_func {
151 protobuf::physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(udaf_name) => {
152 WindowFunctionDefinition::AggregateUDF(match &proto.fun_definition {
153 Some(buf) => codec.try_decode_udaf(udaf_name, buf)?,
154 None => ctx.udaf(udaf_name).or_else(|_| codec.try_decode_udaf(udaf_name, &[]))?,
155 })
156 }
157 protobuf::physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(udwf_name) => {
158 WindowFunctionDefinition::WindowUDF(match &proto.fun_definition {
159 Some(buf) => codec.try_decode_udwf(udwf_name, buf)?,
160 None => ctx.udwf(udwf_name).or_else(|_| codec.try_decode_udwf(udwf_name, &[]))?
161 })
162 }
163 }
164 } else {
165 return Err(proto_error("Missing required field in protobuf"));
166 };
167
168 let name = proto.name.clone();
169 let extended_schema =
171 schema_add_window_field(&window_node_expr, input_schema, &fun, &name)?;
172 create_window_expr(
173 &fun,
174 name,
175 &window_node_expr,
176 &partition_by,
177 &order_by,
178 Arc::new(window_frame),
179 extended_schema,
180 proto.ignore_nulls,
181 proto.distinct,
182 None,
183 )
184}
185
186pub fn parse_physical_exprs<'a, I>(
187 protos: I,
188 ctx: &TaskContext,
189 input_schema: &Schema,
190 codec: &dyn PhysicalExtensionCodec,
191) -> Result<Vec<Arc<dyn PhysicalExpr>>>
192where
193 I: IntoIterator<Item = &'a protobuf::PhysicalExprNode>,
194{
195 protos
196 .into_iter()
197 .map(|p| parse_physical_expr(p, ctx, input_schema, codec))
198 .collect::<Result<Vec<_>>>()
199}
200
201pub fn parse_physical_expr(
211 proto: &protobuf::PhysicalExprNode,
212 ctx: &TaskContext,
213 input_schema: &Schema,
214 codec: &dyn PhysicalExtensionCodec,
215) -> Result<Arc<dyn PhysicalExpr>> {
216 let expr_type = proto
217 .expr_type
218 .as_ref()
219 .ok_or_else(|| proto_error("Unexpected empty physical expression"))?;
220
221 let pexpr: Arc<dyn PhysicalExpr> = match expr_type {
222 ExprType::Column(c) => {
223 let pcol: Column = c.into();
224 Arc::new(pcol)
225 }
226 ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
227 ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
228 ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new(
229 parse_required_physical_expr(
230 binary_expr.l.as_deref(),
231 ctx,
232 "left",
233 input_schema,
234 codec,
235 )?,
236 logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?,
237 parse_required_physical_expr(
238 binary_expr.r.as_deref(),
239 ctx,
240 "right",
241 input_schema,
242 codec,
243 )?,
244 )),
245 ExprType::AggregateExpr(_) => {
246 return not_impl_err!(
247 "Cannot convert aggregate expr node to physical expression"
248 );
249 }
250 ExprType::WindowExpr(_) => {
251 return not_impl_err!(
252 "Cannot convert window expr node to physical expression"
253 );
254 }
255 ExprType::Sort(_) => {
256 return not_impl_err!("Cannot convert sort expr node to physical expression");
257 }
258 ExprType::IsNullExpr(e) => {
259 Arc::new(IsNullExpr::new(parse_required_physical_expr(
260 e.expr.as_deref(),
261 ctx,
262 "expr",
263 input_schema,
264 codec,
265 )?))
266 }
267 ExprType::IsNotNullExpr(e) => {
268 Arc::new(IsNotNullExpr::new(parse_required_physical_expr(
269 e.expr.as_deref(),
270 ctx,
271 "expr",
272 input_schema,
273 codec,
274 )?))
275 }
276 ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr(
277 e.expr.as_deref(),
278 ctx,
279 "expr",
280 input_schema,
281 codec,
282 )?)),
283 ExprType::Negative(e) => {
284 Arc::new(NegativeExpr::new(parse_required_physical_expr(
285 e.expr.as_deref(),
286 ctx,
287 "expr",
288 input_schema,
289 codec,
290 )?))
291 }
292 ExprType::InList(e) => in_list(
293 parse_required_physical_expr(
294 e.expr.as_deref(),
295 ctx,
296 "expr",
297 input_schema,
298 codec,
299 )?,
300 parse_physical_exprs(&e.list, ctx, input_schema, codec)?,
301 &e.negated,
302 input_schema,
303 )?,
304 ExprType::Case(e) => Arc::new(CaseExpr::try_new(
305 e.expr
306 .as_ref()
307 .map(|e| parse_physical_expr(e.as_ref(), ctx, input_schema, codec))
308 .transpose()?,
309 e.when_then_expr
310 .iter()
311 .map(|e| {
312 Ok((
313 parse_required_physical_expr(
314 e.when_expr.as_ref(),
315 ctx,
316 "when_expr",
317 input_schema,
318 codec,
319 )?,
320 parse_required_physical_expr(
321 e.then_expr.as_ref(),
322 ctx,
323 "then_expr",
324 input_schema,
325 codec,
326 )?,
327 ))
328 })
329 .collect::<Result<Vec<_>>>()?,
330 e.else_expr
331 .as_ref()
332 .map(|e| parse_physical_expr(e.as_ref(), ctx, input_schema, codec))
333 .transpose()?,
334 )?),
335 ExprType::Cast(e) => Arc::new(CastExpr::new(
336 parse_required_physical_expr(
337 e.expr.as_deref(),
338 ctx,
339 "expr",
340 input_schema,
341 codec,
342 )?,
343 convert_required!(e.arrow_type)?,
344 None,
345 )),
346 ExprType::TryCast(e) => Arc::new(TryCastExpr::new(
347 parse_required_physical_expr(
348 e.expr.as_deref(),
349 ctx,
350 "expr",
351 input_schema,
352 codec,
353 )?,
354 convert_required!(e.arrow_type)?,
355 )),
356 ExprType::ScalarUdf(e) => {
357 let udf = match &e.fun_definition {
358 Some(buf) => codec.try_decode_udf(&e.name, buf)?,
359 None => ctx
360 .udf(e.name.as_str())
361 .or_else(|_| codec.try_decode_udf(&e.name, &[]))?,
362 };
363 let scalar_fun_def = Arc::clone(&udf);
364
365 let args = parse_physical_exprs(&e.args, ctx, input_schema, codec)?;
366
367 let config_options = Arc::clone(ctx.session_config().options());
368
369 Arc::new(
370 ScalarFunctionExpr::new(
371 e.name.as_str(),
372 scalar_fun_def,
373 args,
374 Field::new(
375 &e.return_field_name,
376 convert_required!(e.return_type)?,
377 true,
378 )
379 .into(),
380 config_options,
381 )
382 .with_nullable(e.nullable),
383 )
384 }
385 ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new(
386 like_expr.negated,
387 like_expr.case_insensitive,
388 parse_required_physical_expr(
389 like_expr.expr.as_deref(),
390 ctx,
391 "expr",
392 input_schema,
393 codec,
394 )?,
395 parse_required_physical_expr(
396 like_expr.pattern.as_deref(),
397 ctx,
398 "pattern",
399 input_schema,
400 codec,
401 )?,
402 )),
403 ExprType::HashExpr(hash_expr) => {
404 let on_columns =
405 parse_physical_exprs(&hash_expr.on_columns, ctx, input_schema, codec)?;
406 Arc::new(HashExpr::new(
407 on_columns,
408 SeededRandomState::with_seeds(
409 hash_expr.seed0,
410 hash_expr.seed1,
411 hash_expr.seed2,
412 hash_expr.seed3,
413 ),
414 hash_expr.description.clone(),
415 ))
416 }
417 ExprType::Extension(extension) => {
418 let inputs: Vec<Arc<dyn PhysicalExpr>> = extension
419 .inputs
420 .iter()
421 .map(|e| parse_physical_expr(e, ctx, input_schema, codec))
422 .collect::<Result<_>>()?;
423 (codec.try_decode_expr(extension.expr.as_slice(), &inputs)?) as _
424 }
425 };
426
427 Ok(pexpr)
428}
429
430fn parse_required_physical_expr(
431 expr: Option<&protobuf::PhysicalExprNode>,
432 ctx: &TaskContext,
433 field: &str,
434 input_schema: &Schema,
435 codec: &dyn PhysicalExtensionCodec,
436) -> Result<Arc<dyn PhysicalExpr>> {
437 expr.map(|e| parse_physical_expr(e, ctx, input_schema, codec))
438 .transpose()?
439 .ok_or_else(|| internal_datafusion_err!("Missing required field {field:?}"))
440}
441
442pub fn parse_protobuf_hash_partitioning(
443 partitioning: Option<&protobuf::PhysicalHashRepartition>,
444 ctx: &TaskContext,
445 input_schema: &Schema,
446 codec: &dyn PhysicalExtensionCodec,
447) -> Result<Option<Partitioning>> {
448 match partitioning {
449 Some(hash_part) => {
450 let expr =
451 parse_physical_exprs(&hash_part.hash_expr, ctx, input_schema, codec)?;
452
453 Ok(Some(Partitioning::Hash(
454 expr,
455 hash_part.partition_count.try_into().unwrap(),
456 )))
457 }
458 None => Ok(None),
459 }
460}
461
462pub fn parse_protobuf_partitioning(
463 partitioning: Option<&protobuf::Partitioning>,
464 ctx: &TaskContext,
465 input_schema: &Schema,
466 codec: &dyn PhysicalExtensionCodec,
467) -> Result<Option<Partitioning>> {
468 match partitioning {
469 Some(protobuf::Partitioning { partition_method }) => match partition_method {
470 Some(protobuf::partitioning::PartitionMethod::RoundRobin(
471 partition_count,
472 )) => Ok(Some(Partitioning::RoundRobinBatch(
473 *partition_count as usize,
474 ))),
475 Some(protobuf::partitioning::PartitionMethod::Hash(hash_repartition)) => {
476 parse_protobuf_hash_partitioning(
477 Some(hash_repartition),
478 ctx,
479 input_schema,
480 codec,
481 )
482 }
483 Some(protobuf::partitioning::PartitionMethod::Unknown(partition_count)) => {
484 Ok(Some(Partitioning::UnknownPartitioning(
485 *partition_count as usize,
486 )))
487 }
488 None => Ok(None),
489 },
490 None => Ok(None),
491 }
492}
493
494pub fn parse_protobuf_file_scan_schema(
495 proto: &protobuf::FileScanExecConf,
496) -> Result<Arc<Schema>> {
497 Ok(Arc::new(convert_required!(proto.schema)?))
498}
499
500pub fn parse_table_schema_from_proto(
502 proto: &protobuf::FileScanExecConf,
503) -> Result<TableSchema> {
504 let schema: Arc<Schema> = parse_protobuf_file_scan_schema(proto)?;
505
506 let table_partition_cols = proto
508 .table_partition_cols
509 .iter()
510 .map(|col| Ok(Arc::new(schema.field_with_name(col)?.clone())))
511 .collect::<Result<Vec<_>>>()?;
512
513 let file_schema = Arc::new(
517 Schema::new(
518 schema
519 .fields()
520 .iter()
521 .filter(|field| !table_partition_cols.contains(field))
522 .cloned()
523 .collect::<Vec<_>>(),
524 )
525 .with_metadata(schema.metadata.clone()),
526 );
527
528 Ok(TableSchema::new(file_schema, table_partition_cols))
529}
530
531pub fn parse_protobuf_file_scan_config(
532 proto: &protobuf::FileScanExecConf,
533 ctx: &TaskContext,
534 codec: &dyn PhysicalExtensionCodec,
535 file_source: Arc<dyn FileSource>,
536) -> Result<FileScanConfig> {
537 let schema: Arc<Schema> = parse_protobuf_file_scan_schema(proto)?;
538
539 let constraints = convert_required!(proto.constraints)?;
540 let statistics = convert_required!(proto.statistics)?;
541
542 let file_groups = proto
543 .file_groups
544 .iter()
545 .map(|f| f.try_into())
546 .collect::<Result<Vec<_>, _>>()?;
547
548 let object_store_url = match proto.object_store_url.is_empty() {
549 false => ObjectStoreUrl::parse(&proto.object_store_url)?,
550 true => ObjectStoreUrl::local_filesystem(),
551 };
552
553 let mut output_ordering = vec![];
554 for node_collection in &proto.output_ordering {
555 let sort_exprs = parse_physical_sort_exprs(
556 &node_collection.physical_sort_expr_nodes,
557 ctx,
558 &schema,
559 codec,
560 )?;
561 output_ordering.extend(LexOrdering::new(sort_exprs));
562 }
563
564 let file_source = if let Some(proto_projection_exprs) = &proto.projection_exprs {
566 let projection_exprs: Vec<ProjectionExpr> = proto_projection_exprs
567 .projections
568 .iter()
569 .map(|proto_expr| {
570 let expr = parse_physical_expr(
571 proto_expr.expr.as_ref().ok_or_else(|| {
572 internal_datafusion_err!("ProjectionExpr missing expr field")
573 })?,
574 ctx,
575 &schema,
576 codec,
577 )?;
578 Ok(ProjectionExpr::new(expr, proto_expr.alias.clone()))
579 })
580 .collect::<Result<Vec<_>>>()?;
581
582 let projection_exprs = ProjectionExprs::new(projection_exprs);
583
584 file_source
586 .try_pushdown_projection(&projection_exprs)?
587 .unwrap_or(file_source)
588 } else {
589 file_source
590 };
591
592 let config = FileScanConfigBuilder::new(object_store_url, file_source)
593 .with_file_groups(file_groups)
594 .with_constraints(constraints)
595 .with_statistics(statistics)
596 .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize))
597 .with_output_ordering(output_ordering)
598 .with_batch_size(proto.batch_size.map(|s| s as usize))
599 .build();
600 Ok(config)
601}
602
603pub fn parse_record_batches(buf: &[u8]) -> Result<Vec<RecordBatch>> {
604 if buf.is_empty() {
605 return Ok(vec![]);
606 }
607 let reader = StreamReader::try_new(buf, None)?;
608 let mut batches = Vec::new();
609 for batch in reader {
610 batches.push(batch?);
611 }
612 Ok(batches)
613}
614
615impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
616 type Error = DataFusionError;
617
618 fn try_from(val: &protobuf::PartitionedFile) -> Result<Self, Self::Error> {
619 Ok(PartitionedFile {
620 object_meta: ObjectMeta {
621 location: Path::parse(val.path.as_str()).map_err(|e| {
622 proto_error(format!("Invalid object_store path: {e}"))
623 })?,
624 last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64),
625 size: val.size,
626 e_tag: None,
627 version: None,
628 },
629 partition_values: val
630 .partition_values
631 .iter()
632 .map(|v| v.try_into())
633 .collect::<Result<Vec<_>, _>>()?,
634 range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
635 statistics: val
636 .statistics
637 .as_ref()
638 .map(|v| v.try_into().map(Arc::new))
639 .transpose()?,
640 extensions: None,
641 metadata_size_hint: None,
642 })
643 }
644}
645
646impl TryFrom<&protobuf::FileRange> for FileRange {
647 type Error = DataFusionError;
648
649 fn try_from(value: &protobuf::FileRange) -> Result<Self, Self::Error> {
650 Ok(FileRange {
651 start: value.start,
652 end: value.end,
653 })
654 }
655}
656
657impl TryFrom<&protobuf::FileGroup> for FileGroup {
658 type Error = DataFusionError;
659
660 fn try_from(val: &protobuf::FileGroup) -> Result<Self, Self::Error> {
661 let files = val
662 .files
663 .iter()
664 .map(|f| f.try_into())
665 .collect::<Result<Vec<_>, _>>()?;
666 Ok(FileGroup::new(files))
667 }
668}
669
670impl TryFrom<&protobuf::JsonSink> for JsonSink {
671 type Error = DataFusionError;
672
673 fn try_from(value: &protobuf::JsonSink) -> Result<Self, Self::Error> {
674 Ok(Self::new(
675 convert_required!(value.config)?,
676 convert_required!(value.writer_options)?,
677 ))
678 }
679}
680
681#[cfg(feature = "parquet")]
682impl TryFrom<&protobuf::ParquetSink> for ParquetSink {
683 type Error = DataFusionError;
684
685 fn try_from(value: &protobuf::ParquetSink) -> Result<Self, Self::Error> {
686 Ok(Self::new(
687 convert_required!(value.config)?,
688 convert_required!(value.parquet_options)?,
689 ))
690 }
691}
692
693impl TryFrom<&protobuf::CsvSink> for CsvSink {
694 type Error = DataFusionError;
695
696 fn try_from(value: &protobuf::CsvSink) -> Result<Self, Self::Error> {
697 Ok(Self::new(
698 convert_required!(value.config)?,
699 convert_required!(value.writer_options)?,
700 ))
701 }
702}
703
704impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
705 type Error = DataFusionError;
706
707 fn try_from(conf: &protobuf::FileSinkConfig) -> Result<Self, Self::Error> {
708 let file_group = FileGroup::new(
709 conf.file_groups
710 .iter()
711 .map(|f| f.try_into())
712 .collect::<Result<Vec<_>>>()?,
713 );
714 let table_paths = conf
715 .table_paths
716 .iter()
717 .map(ListingTableUrl::parse)
718 .collect::<Result<Vec<_>>>()?;
719 let table_partition_cols = conf
720 .table_partition_cols
721 .iter()
722 .map(|protobuf::PartitionColumn { name, arrow_type }| {
723 let data_type = convert_required!(arrow_type)?;
724 Ok((name.clone(), data_type))
725 })
726 .collect::<Result<Vec<_>>>()?;
727 let insert_op = match conf.insert_op() {
728 protobuf::InsertOp::Append => InsertOp::Append,
729 protobuf::InsertOp::Overwrite => InsertOp::Overwrite,
730 protobuf::InsertOp::Replace => InsertOp::Replace,
731 };
732 Ok(Self {
733 original_url: String::default(),
734 object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?,
735 file_group,
736 table_paths,
737 output_schema: Arc::new(convert_required!(conf.output_schema)?),
738 table_partition_cols,
739 insert_op,
740 keep_partition_by_columns: conf.keep_partition_by_columns,
741 file_extension: conf.file_extension.clone(),
742 })
743 }
744}
745
746#[cfg(test)]
747mod tests {
748 use super::*;
749 use chrono::{TimeZone, Utc};
750 use datafusion_datasource::PartitionedFile;
751 use object_store::ObjectMeta;
752 use object_store::path::Path;
753
754 #[test]
755 fn partitioned_file_path_roundtrip_percent_encoded() {
756 let path_str = "foo/foo%2Fbar/baz%252Fqux";
757 let pf = PartitionedFile {
758 object_meta: ObjectMeta {
759 location: Path::parse(path_str).unwrap(),
760 last_modified: Utc.timestamp_nanos(1_000),
761 size: 42,
762 e_tag: None,
763 version: None,
764 },
765 partition_values: vec![],
766 range: None,
767 statistics: None,
768 extensions: None,
769 metadata_size_hint: None,
770 };
771
772 let proto = protobuf::PartitionedFile::try_from(&pf).unwrap();
773 assert_eq!(proto.path, path_str);
774
775 let pf2 = PartitionedFile::try_from(&proto).unwrap();
776 assert_eq!(pf2.object_meta.location.as_ref(), path_str);
777 assert_eq!(pf2.object_meta.location, pf.object_meta.location);
778 assert_eq!(pf2.object_meta.size, pf.object_meta.size);
779 assert_eq!(pf2.object_meta.last_modified, pf.object_meta.last_modified);
780 }
781
782 #[test]
783 fn partitioned_file_from_proto_invalid_path() {
784 let proto = protobuf::PartitionedFile {
785 path: "foo//bar".to_string(),
786 size: 1,
787 last_modified_ns: 0,
788 partition_values: vec![],
789 range: None,
790 statistics: None,
791 };
792
793 let err = PartitionedFile::try_from(&proto).unwrap_err();
794 assert!(err.to_string().contains("Invalid object_store path"));
795 }
796}