1use std::collections::HashMap;
16
17use apache_avro::Schema as AvroSchema;
18use derive_builder::Builder;
19use derive_getters::Getters;
20use serde::{de::DeserializeOwned, ser::SerializeSeq, Deserialize, Serialize};
21use serde_bytes::ByteBuf;
22use serde_repr::{Deserialize_repr, Serialize_repr};
23
24use crate::{error::Error, partition::BoundPartitionField};
25
26use super::{
27 partition::PartitionSpec,
28 schema::Schema,
29 table_metadata::FormatVersion,
30 types::{PrimitiveType, StructType, Type},
31 values::{Struct, Value},
32};
33
34#[derive(Debug, Serialize, PartialEq, Clone, Getters, Builder)]
36#[serde(into = "ManifestEntryEnum")]
37#[builder(build_fn(error = "Error"), setter(prefix = "with"))]
38pub struct ManifestEntry {
39 format_version: FormatVersion,
41 status: Status,
43 #[builder(setter(strip_option), default)]
46 snapshot_id: Option<i64>,
47 #[builder(setter(strip_option), default)]
49 sequence_number: Option<i64>,
50 data_file: DataFile,
52}
53
54impl ManifestEntry {
55 pub fn builder() -> ManifestEntryBuilder {
63 ManifestEntryBuilder::default()
64 }
65
66 pub fn status_mut(&mut self) -> &mut Status {
70 &mut self.status
71 }
72
73 pub fn sequence_number_mut(&mut self) -> &mut Option<i64> {
78 &mut self.sequence_number
79 }
80
81 pub fn snapshot_id_mut(&mut self) -> &mut Option<i64> {
86 &mut self.snapshot_id
87 }
88}
89
90impl ManifestEntry {
91 pub fn try_from_v2(
92 value: ManifestEntryV2,
93 schema: &Schema,
94 partition_spec: &PartitionSpec,
95 ) -> Result<Self, Error> {
96 Ok(ManifestEntry {
97 format_version: FormatVersion::V2,
98 status: value.status,
99 snapshot_id: value.snapshot_id,
100 sequence_number: value.sequence_number,
101 data_file: DataFile::try_from_v2(value.data_file, schema, partition_spec)?,
102 })
103 }
104
105 pub fn try_from_v1(
106 value: ManifestEntryV1,
107 schema: &Schema,
108 partition_spec: &PartitionSpec,
109 ) -> Result<Self, Error> {
110 Ok(ManifestEntry {
111 format_version: FormatVersion::V2,
112 status: value.status,
113 snapshot_id: Some(value.snapshot_id),
114 sequence_number: None,
115 data_file: DataFile::try_from_v1(value.data_file, schema, partition_spec)?,
116 })
117 }
118}
119
120#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
122#[serde(untagged)]
123pub enum ManifestEntryEnum {
124 V2(ManifestEntryV2),
126 V1(ManifestEntryV1),
128}
129
130#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
132pub struct ManifestEntryV2 {
133 pub status: Status,
135 pub snapshot_id: Option<i64>,
138 pub sequence_number: Option<i64>,
140 pub data_file: DataFileV2,
142}
143
144#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
146pub struct ManifestEntryV1 {
147 pub status: Status,
149 pub snapshot_id: i64,
152 pub data_file: DataFileV1,
154}
155
156impl From<ManifestEntry> for ManifestEntryEnum {
157 fn from(value: ManifestEntry) -> Self {
158 match value.format_version {
159 FormatVersion::V2 => ManifestEntryEnum::V2(value.into()),
160 FormatVersion::V1 => ManifestEntryEnum::V1(value.into()),
161 }
162 }
163}
164
165impl From<ManifestEntry> for ManifestEntryV2 {
166 fn from(value: ManifestEntry) -> Self {
167 ManifestEntryV2 {
168 status: value.status,
169 snapshot_id: value.snapshot_id,
170 sequence_number: value.sequence_number,
171 data_file: value.data_file.into(),
172 }
173 }
174}
175
176impl From<ManifestEntry> for ManifestEntryV1 {
177 fn from(v1: ManifestEntry) -> Self {
178 ManifestEntryV1 {
179 status: v1.status,
180 snapshot_id: v1.snapshot_id.unwrap_or(0),
181 data_file: v1.data_file.into(),
182 }
183 }
184}
185
186impl From<ManifestEntryV1> for ManifestEntryV2 {
187 fn from(v1: ManifestEntryV1) -> Self {
188 ManifestEntryV2 {
189 status: v1.status,
190 snapshot_id: Some(v1.snapshot_id),
191 sequence_number: Some(0),
192 data_file: v1.data_file.into(),
193 }
194 }
195}
196
197impl ManifestEntry {
198 pub fn schema(
200 partition_schema: &str,
201 format_version: &FormatVersion,
202 ) -> Result<AvroSchema, Error> {
203 let schema = match format_version {
204 FormatVersion::V1 => {
205 let datafile_schema = DataFileV1::schema(partition_schema);
206 r#"{
207 "type": "record",
208 "name": "manifest_entry",
209 "fields": [
210 {
211 "name": "status",
212 "type": "int",
213 "field-id": 0
214 },
215 {
216 "name": "snapshot_id",
217 "type": "long",
218 "field-id": 1
219 },
220 {
221 "name": "data_file",
222 "type": "#
223 .to_owned()
224 + &datafile_schema
225 + r#",
226 "field-id": 2
227 }
228 ]
229 }"#
230 }
231 FormatVersion::V2 => {
232 let datafile_schema = DataFileV2::schema(partition_schema);
233 r#"{
234 "type": "record",
235 "name": "manifest_entry",
236 "fields": [
237 {
238 "name": "status",
239 "type": "int",
240 "field-id": 0
241 },
242 {
243 "name": "snapshot_id",
244 "type": [
245 "null",
246 "long"
247 ],
248 "default": null,
249 "field-id": 1
250 },
251 {
252 "name": "sequence_number",
253 "type": [
254 "null",
255 "long"
256 ],
257 "default": null,
258 "field-id": 3
259 },
260 {
261 "name": "data_file",
262 "type": "#
263 .to_owned()
264 + &datafile_schema
265 + r#",
266 "field-id": 2
267 }
268 ]
269 }"#
270 }
271 };
272 AvroSchema::parse_str(&schema).map_err(Into::into)
273 }
274}
275
276#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy)]
277#[repr(u8)]
278pub enum Status {
280 Existing = 0,
282 Added = 1,
284 Deleted = 2,
286}
287
288#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone)]
289#[repr(u8)]
290pub enum Content {
292 Data = 0,
294 PositionDeletes = 1,
296 EqualityDeletes = 2,
298}
299
300impl TryFrom<Vec<u8>> for Content {
301 type Error = Error;
302 fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
303 match String::from_utf8(value)?.to_uppercase().as_str() {
304 "DATA" => Ok(Content::Data),
305 "POSITION DELETES" => Ok(Content::PositionDeletes),
306 "EQUALITY DELETES" => Ok(Content::EqualityDeletes),
307 _ => Err(Error::Conversion(
308 "string".to_string(),
309 "content".to_string(),
310 )),
311 }
312 }
313}
314
315impl From<Content> for Vec<u8> {
316 fn from(value: Content) -> Self {
317 match value {
318 Content::Data => "DATA".as_bytes().to_owned(),
319 Content::PositionDeletes => "POSITION DELETES".as_bytes().to_owned(),
320 Content::EqualityDeletes => "EQUALITY DELETES".as_bytes().to_owned(),
321 }
322 }
323}
324
325#[derive(Debug, PartialEq, Eq, Clone)]
326#[repr(u8)]
327pub enum FileFormat {
329 Avro = 0,
331 Orc = 1,
333 Parquet = 2,
335}
336
337impl Serialize for FileFormat {
340 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
341 where
342 S: serde::Serializer,
343 {
344 use FileFormat::*;
345 match self {
346 Avro => serializer.serialize_str("AVRO"),
347 Orc => serializer.serialize_str("ORC"),
348 Parquet => serializer.serialize_str("PARQUET"),
349 }
350 }
351}
352
353impl<'de> Deserialize<'de> for FileFormat {
356 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
357 where
358 D: serde::Deserializer<'de>,
359 {
360 let s = String::deserialize(deserializer)?;
361 if s == "AVRO" {
362 Ok(FileFormat::Avro)
363 } else if s == "ORC" {
364 Ok(FileFormat::Orc)
365 } else if s == "PARQUET" {
366 Ok(FileFormat::Parquet)
367 } else {
368 Err(serde::de::Error::custom("Invalid data file format."))
369 }
370 }
371}
372
373pub fn partition_value_schema(spec: &[BoundPartitionField<'_>]) -> Result<String, Error> {
375 Ok(spec
376 .iter()
377 .map(|field| {
378 let data_type = avro_schema_datatype(field.field_type());
379 Ok::<_, Error>(
380 r#"
381 {
382 "name": ""#
383 .to_owned()
384 + field.name()
385 + r#"",
386 "type": ["null",""#
387 + &format!("{}", &data_type)
388 + r#""],
389 "field-id": "#
390 + &field.field_id().to_string()
391 + r#",
392 "default": null
393 },"#,
394 )
395 })
396 .try_fold(
397 r#"{"type": "record","name": "r102","fields": ["#.to_owned(),
398 |acc, x| {
399 let result = acc + &x?;
400 Ok::<_, Error>(result)
401 },
402 )?
403 .trim_end_matches(',')
404 .to_owned()
405 + r#"]}"#)
406}
407
408fn avro_schema_datatype(data_type: &Type) -> Type {
409 match data_type {
410 Type::Primitive(prim) => match prim {
411 PrimitiveType::Date => Type::Primitive(PrimitiveType::Int),
412 PrimitiveType::Time => Type::Primitive(PrimitiveType::Long),
413 PrimitiveType::Timestamp => Type::Primitive(PrimitiveType::Long),
414 PrimitiveType::Timestamptz => Type::Primitive(PrimitiveType::Long),
415 p => Type::Primitive(p.clone()),
416 },
417 t => t.clone(),
418 }
419}
420
421#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
422struct KeyValue<T: Serialize + Clone> {
423 key: i32,
424 value: T,
425}
426
427#[derive(Debug, PartialEq, Eq, Clone)]
429pub struct AvroMap<T: Serialize + Clone>(pub HashMap<i32, T>);
430
431impl<T: Serialize + Clone> core::ops::Deref for AvroMap<T> {
432 type Target = HashMap<i32, T>;
433
434 fn deref(self: &'_ AvroMap<T>) -> &'_ Self::Target {
435 &self.0
436 }
437}
438
439impl<T: Serialize + Clone> core::ops::DerefMut for AvroMap<T> {
440 fn deref_mut(self: &'_ mut AvroMap<T>) -> &'_ mut Self::Target {
441 &mut self.0
442 }
443}
444
445impl<T: Serialize + Clone> Serialize for AvroMap<T> {
446 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
447 where
448 S: serde::Serializer,
449 {
450 let entries = self
451 .0
452 .iter()
453 .map(|(key, value)| KeyValue {
454 key: *key,
455 value: (*value).clone(),
456 })
457 .collect::<Vec<KeyValue<T>>>();
458 let mut seq = serializer.serialize_seq(Some(entries.len()))?;
459 for element in entries {
460 seq.serialize_element(&element)?;
461 }
462 seq.end()
463 }
464}
465
466impl<'de, T: Serialize + DeserializeOwned + Clone> Deserialize<'de> for AvroMap<T> {
467 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
468 where
469 D: serde::Deserializer<'de>,
470 {
471 let vec: Vec<KeyValue<T>> = Vec::deserialize(deserializer)?;
472 Ok(AvroMap(HashMap::from_iter(
473 vec.into_iter().map(|x| (x.key, x.value)),
474 )))
475 }
476}
477
478impl AvroMap<ByteBuf> {
479 fn into_value_map(self, schema: &StructType) -> Result<HashMap<i32, Value>, Error> {
487 Ok(HashMap::from_iter(
488 self.0
489 .into_iter()
490 .map(|(k, v)| {
491 Ok((
492 k,
493 Value::try_from_bytes(
494 &v,
495 &schema
496 .get(k as usize)
497 .ok_or(Error::ColumnNotInSchema(
498 k.to_string(),
499 format!("{schema:?}"),
500 ))?
501 .field_type,
502 )?,
503 ))
504 })
505 .collect::<Result<Vec<_>, Error>>()?,
506 ))
507 }
508}
509
510impl From<HashMap<i32, Value>> for AvroMap<ByteBuf> {
511 fn from(value: HashMap<i32, Value>) -> Self {
512 AvroMap(HashMap::from_iter(
513 value.into_iter().map(|(k, v)| (k, v.into())),
514 ))
515 }
516}
517
518#[derive(Debug, PartialEq, Clone, Getters, Builder)]
519#[builder(build_fn(error = "Error"), setter(prefix = "with"))]
520pub struct DataFile {
522 content: Content,
524 file_path: String,
526 file_format: FileFormat,
528 partition: Struct,
530 record_count: i64,
532 file_size_in_bytes: i64,
534 column_sizes: Option<AvroMap<i64>>,
536 value_counts: Option<AvroMap<i64>>,
538 null_value_counts: Option<AvroMap<i64>>,
540 nan_value_counts: Option<AvroMap<i64>>,
542 lower_bounds: Option<HashMap<i32, Value>>,
544 upper_bounds: Option<HashMap<i32, Value>>,
546 #[builder(default)]
548 key_metadata: Option<ByteBuf>,
549 #[builder(default)]
551 split_offsets: Option<Vec<i64>>,
552 #[builder(default)]
554 equality_ids: Option<Vec<i32>>,
555 #[builder(default)]
557 sort_order_id: Option<i32>,
558}
559
560impl DataFile {
561 pub fn builder() -> DataFileBuilder {
562 DataFileBuilder::default()
563 }
564}
565
566impl DataFile {
567 pub(crate) fn try_from_v2(
568 value: DataFileV2,
569 schema: &Schema,
570 partition_spec: &PartitionSpec,
571 ) -> Result<Self, Error> {
572 Ok(DataFile {
573 content: value.content,
574 file_path: value.file_path,
575 file_format: value.file_format,
576 partition: value
577 .partition
578 .cast(schema.fields(), partition_spec.fields())?,
579 record_count: value.record_count,
580 file_size_in_bytes: value.file_size_in_bytes,
581 column_sizes: value.column_sizes,
582 value_counts: value.value_counts,
583 null_value_counts: value.null_value_counts,
584 nan_value_counts: value.nan_value_counts,
585 lower_bounds: value
586 .lower_bounds
587 .map(|map| map.into_value_map(schema.fields()))
588 .transpose()?,
589 upper_bounds: value
590 .upper_bounds
591 .map(|map| map.into_value_map(schema.fields()))
592 .transpose()?,
593 key_metadata: value.key_metadata,
594 split_offsets: value.split_offsets,
595 equality_ids: value.equality_ids,
596 sort_order_id: value.sort_order_id,
597 })
598 }
599
600 pub(crate) fn try_from_v1(
601 value: DataFileV1,
602 schema: &Schema,
603 partition_spec: &PartitionSpec,
604 ) -> Result<Self, Error> {
605 Ok(DataFile {
606 content: Content::Data,
607 file_path: value.file_path,
608 file_format: value.file_format,
609 partition: value
610 .partition
611 .cast(schema.fields(), partition_spec.fields())?,
612 record_count: value.record_count,
613 file_size_in_bytes: value.file_size_in_bytes,
614 column_sizes: value.column_sizes,
615 value_counts: value.value_counts,
616 null_value_counts: value.null_value_counts,
617 nan_value_counts: value.nan_value_counts,
618 lower_bounds: value
619 .lower_bounds
620 .map(|map| map.into_value_map(schema.fields()))
621 .transpose()?,
622 upper_bounds: value
623 .upper_bounds
624 .map(|map| map.into_value_map(schema.fields()))
625 .transpose()?,
626 key_metadata: value.key_metadata,
627 split_offsets: value.split_offsets,
628 equality_ids: None,
629 sort_order_id: value.sort_order_id,
630 })
631 }
632}
633
634#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
635pub struct DataFileV2 {
637 pub content: Content,
639 pub file_path: String,
641 pub file_format: FileFormat,
643 pub partition: Struct,
645 pub record_count: i64,
647 pub file_size_in_bytes: i64,
649 pub column_sizes: Option<AvroMap<i64>>,
651 pub value_counts: Option<AvroMap<i64>>,
653 pub null_value_counts: Option<AvroMap<i64>>,
655 pub nan_value_counts: Option<AvroMap<i64>>,
657 pub lower_bounds: Option<AvroMap<ByteBuf>>,
659 pub upper_bounds: Option<AvroMap<ByteBuf>>,
661 pub key_metadata: Option<ByteBuf>,
663 pub split_offsets: Option<Vec<i64>>,
665 pub equality_ids: Option<Vec<i32>>,
667 pub sort_order_id: Option<i32>,
669}
670
671#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
672pub struct DataFileV1 {
674 pub file_path: String,
676 pub file_format: FileFormat,
678 pub partition: Struct,
680 pub record_count: i64,
682 pub file_size_in_bytes: i64,
684 pub block_size_in_bytes: i64,
686 pub file_ordinal: Option<i32>,
688 pub sort_columns: Option<Vec<i32>>,
690 pub column_sizes: Option<AvroMap<i64>>,
692 pub value_counts: Option<AvroMap<i64>>,
694 pub null_value_counts: Option<AvroMap<i64>>,
696 pub nan_value_counts: Option<AvroMap<i64>>,
698 pub lower_bounds: Option<AvroMap<ByteBuf>>,
700 pub upper_bounds: Option<AvroMap<ByteBuf>>,
702 pub key_metadata: Option<ByteBuf>,
704 pub split_offsets: Option<Vec<i64>>,
706 pub sort_order_id: Option<i32>,
708}
709
710impl From<DataFile> for DataFileV2 {
711 fn from(value: DataFile) -> Self {
712 DataFileV2 {
713 content: value.content,
714 file_path: value.file_path,
715 file_format: value.file_format,
716 partition: value.partition,
717 record_count: value.record_count,
718 file_size_in_bytes: value.file_size_in_bytes,
719 column_sizes: value.column_sizes,
720 value_counts: value.value_counts,
721 null_value_counts: value.null_value_counts,
722 nan_value_counts: value.nan_value_counts,
723 lower_bounds: value.lower_bounds.map(Into::into),
724 upper_bounds: value.upper_bounds.map(Into::into),
725 key_metadata: value.key_metadata,
726 split_offsets: value.split_offsets,
727 equality_ids: value.equality_ids,
728 sort_order_id: value.sort_order_id,
729 }
730 }
731}
732
733impl From<DataFile> for DataFileV1 {
734 fn from(value: DataFile) -> Self {
735 DataFileV1 {
736 file_path: value.file_path,
737 file_format: value.file_format,
738 partition: value.partition,
739 record_count: value.record_count,
740 file_size_in_bytes: value.file_size_in_bytes,
741 column_sizes: value.column_sizes,
742 value_counts: value.value_counts,
743 null_value_counts: value.null_value_counts,
744 nan_value_counts: value.nan_value_counts,
745 lower_bounds: value.lower_bounds.map(Into::into),
746 upper_bounds: value.upper_bounds.map(Into::into),
747 key_metadata: value.key_metadata,
748 split_offsets: value.split_offsets,
749 sort_order_id: value.sort_order_id,
750 block_size_in_bytes: 0,
751 file_ordinal: None,
752 sort_columns: None,
753 }
754 }
755}
756
757impl From<DataFileV1> for DataFileV2 {
758 fn from(v1: DataFileV1) -> Self {
759 DataFileV2 {
760 content: Content::Data,
761 file_path: v1.file_path,
762 file_format: v1.file_format,
763 partition: v1.partition,
764 record_count: v1.record_count,
765 file_size_in_bytes: v1.file_size_in_bytes,
766 column_sizes: v1.column_sizes,
767 value_counts: v1.value_counts,
768 null_value_counts: v1.null_value_counts,
769 nan_value_counts: v1.nan_value_counts,
770 lower_bounds: v1.lower_bounds,
771 upper_bounds: v1.upper_bounds,
772 key_metadata: v1.key_metadata,
773 split_offsets: v1.split_offsets,
774 equality_ids: None,
775 sort_order_id: v1.sort_order_id,
776 }
777 }
778}
779
780impl DataFileV1 {
781 pub fn schema(partition_schema: &str) -> String {
783 r#"{
784 "type": "record",
785 "name": "r2",
786 "fields": [
787 {
788 "name": "file_path",
789 "type": "string",
790 "field-id": 100
791 },
792 {
793 "name": "file_format",
794 "type": "string",
795 "field-id": 101
796 },
797 {
798 "name": "partition",
799 "type": "#
800 .to_owned()
801 + partition_schema
802 + r#",
803 "field-id": 102
804 },
805 {
806 "name": "record_count",
807 "type": "long",
808 "field-id": 103
809 },
810 {
811 "name": "file_size_in_bytes",
812 "type": "long",
813 "field-id": 104
814 },
815 {
816 "name": "block_size_in_bytes",
817 "type": "long",
818 "field-id": 105
819 },
820 {
821 "name": "file_ordinal",
822 "type": [
823 "null",
824 "int"
825 ],
826 "default": null,
827 "field-id": 106
828 },
829 {
830 "name": "sort_columns",
831 "type": [
832 "null",
833 {
834 "type": "array",
835 "items": "int",
836 "element-id": 112
837 }
838 ],
839 "default": null,
840 "field-id": 107
841 },
842 {
843 "name": "column_sizes",
844 "type": [
845 "null",
846 {
847 "type": "array",
848 "logicalType": "map",
849 "items": {
850 "type": "record",
851 "name": "k117_v118",
852 "fields": [
853 {
854 "name": "key",
855 "type": "int",
856 "field-id": 117
857 },
858 {
859 "name": "value",
860 "type": "long",
861 "field-id": 118
862 }
863 ]
864 }
865 }
866 ],
867 "default": null,
868 "field-id": 108
869 },
870 {
871 "name": "value_counts",
872 "type": [
873 "null",
874 {
875 "type": "array",
876 "logicalType": "map",
877 "items": {
878 "type": "record",
879 "name": "k119_v120",
880 "fields": [
881 {
882 "name": "key",
883 "type": "int",
884 "field-id": 119
885 },
886 {
887 "name": "value",
888 "type": "long",
889 "field-id": 120
890 }
891 ]
892 }
893 }
894 ],
895 "default": null,
896 "field-id": 109
897 },
898 {
899 "name": "null_value_counts",
900 "type": [
901 "null",
902 {
903 "type": "array",
904 "logicalType": "map",
905 "items": {
906 "type": "record",
907 "name": "k121_v122",
908 "fields": [
909 {
910 "name": "key",
911 "type": "int",
912 "field-id": 121
913 },
914 {
915 "name": "value",
916 "type": "long",
917 "field-id": 122
918 }
919 ]
920 }
921 }
922 ],
923 "default": null,
924 "field-id": 110
925 },
926 {
927 "name": "nan_value_counts",
928 "type": [
929 "null",
930 {
931 "type": "array",
932 "logicalType": "map",
933 "items": {
934 "type": "record",
935 "name": "k138_v139",
936 "fields": [
937 {
938 "name": "key",
939 "type": "int",
940 "field-id": 138
941 },
942 {
943 "name": "value",
944 "type": "long",
945 "field-id": 139
946 }
947 ]
948 }
949 }
950 ],
951 "default": null,
952 "field-id": 137
953 },
954 {
955 "name": "lower_bounds",
956 "type": [
957 "null",
958 {
959 "type": "array",
960 "logicalType": "map",
961 "items": {
962 "type": "record",
963 "name": "k126_v127",
964 "fields": [
965 {
966 "name": "key",
967 "type": "int",
968 "field-id": 126
969 },
970 {
971 "name": "value",
972 "type": "bytes",
973 "field-id": 127
974 }
975 ]
976 }
977 }
978 ],
979 "default": null,
980 "field-id": 125
981 },
982 {
983 "name": "upper_bounds",
984 "type": [
985 "null",
986 {
987 "type": "array",
988 "logicalType": "map",
989 "items": {
990 "type": "record",
991 "name": "k129_v130",
992 "fields": [
993 {
994 "name": "key",
995 "type": "int",
996 "field-id": 129
997 },
998 {
999 "name": "value",
1000 "type": "bytes",
1001 "field-id": 130
1002 }
1003 ]
1004 }
1005 }
1006 ],
1007 "default": null,
1008 "field-id": 128
1009 },
1010 {
1011 "name": "key_metadata",
1012 "type": [
1013 "null",
1014 "bytes"
1015 ],
1016 "default": null,
1017 "field-id": 131
1018 },
1019 {
1020 "name": "split_offsets",
1021 "type": [
1022 "null",
1023 {
1024 "type": "array",
1025 "items": "long",
1026 "element-id": 133
1027 }
1028 ],
1029 "default": null,
1030 "field-id": 132
1031 },
1032 {
1033 "name": "sort_order_id",
1034 "type": [
1035 "null",
1036 "int"
1037 ],
1038 "default": null,
1039 "field-id": 140
1040 }
1041 ]
1042 }"#
1043 }
1044}
1045
1046impl DataFileV2 {
1047 pub fn schema(partition_schema: &str) -> String {
1049 r#"{
1050 "type": "record",
1051 "name": "r2",
1052 "fields": [
1053 {
1054 "name": "content",
1055 "type": "int",
1056 "field-id": 134
1057 },
1058 {
1059 "name": "file_path",
1060 "type": "string",
1061 "field-id": 100
1062 },
1063 {
1064 "name": "file_format",
1065 "type": "string",
1066 "field-id": 101
1067 },
1068 {
1069 "name": "partition",
1070 "type": "#
1071 .to_owned()
1072 + partition_schema
1073 + r#",
1074 "field-id": 102
1075 },
1076 {
1077 "name": "record_count",
1078 "type": "long",
1079 "field-id": 103
1080 },
1081 {
1082 "name": "file_size_in_bytes",
1083 "type": "long",
1084 "field-id": 104
1085 },
1086 {
1087 "name": "column_sizes",
1088 "type": [
1089 "null",
1090 {
1091 "type": "array",
1092 "logicalType": "map",
1093 "items": {
1094 "type": "record",
1095 "name": "k117_v118",
1096 "fields": [
1097 {
1098 "name": "key",
1099 "type": "int",
1100 "field-id": 117
1101 },
1102 {
1103 "name": "value",
1104 "type": "long",
1105 "field-id": 118
1106 }
1107 ]
1108 }
1109 }
1110 ],
1111 "default": null,
1112 "field-id": 108
1113 },
1114 {
1115 "name": "value_counts",
1116 "type": [
1117 "null",
1118 {
1119 "type": "array",
1120 "logicalType": "map",
1121 "items": {
1122 "type": "record",
1123 "name": "k119_v120",
1124 "fields": [
1125 {
1126 "name": "key",
1127 "type": "int",
1128 "field-id": 119
1129 },
1130 {
1131 "name": "value",
1132 "type": "long",
1133 "field-id": 120
1134 }
1135 ]
1136 }
1137 }
1138 ],
1139 "default": null,
1140 "field-id": 109
1141 },
1142 {
1143 "name": "null_value_counts",
1144 "type": [
1145 "null",
1146 {
1147 "type": "array",
1148 "logicalType": "map",
1149 "items": {
1150 "type": "record",
1151 "name": "k121_v122",
1152 "fields": [
1153 {
1154 "name": "key",
1155 "type": "int",
1156 "field-id": 121
1157 },
1158 {
1159 "name": "value",
1160 "type": "long",
1161 "field-id": 122
1162 }
1163 ]
1164 }
1165 }
1166 ],
1167 "default": null,
1168 "field-id": 110
1169 },
1170 {
1171 "name": "nan_value_counts",
1172 "type": [
1173 "null",
1174 {
1175 "type": "array",
1176 "logicalType": "map",
1177 "items": {
1178 "type": "record",
1179 "name": "k138_v139",
1180 "fields": [
1181 {
1182 "name": "key",
1183 "type": "int",
1184 "field-id": 138
1185 },
1186 {
1187 "name": "value",
1188 "type": "long",
1189 "field-id": 139
1190 }
1191 ]
1192 }
1193 }
1194 ],
1195 "default": null,
1196 "field-id": 137
1197 },
1198 {
1199 "name": "lower_bounds",
1200 "type": [
1201 "null",
1202 {
1203 "type": "array",
1204 "logicalType": "map",
1205 "items": {
1206 "type": "record",
1207 "name": "k126_v127",
1208 "fields": [
1209 {
1210 "name": "key",
1211 "type": "int",
1212 "field-id": 126
1213 },
1214 {
1215 "name": "value",
1216 "type": "bytes",
1217 "field-id": 127
1218 }
1219 ]
1220 }
1221 }
1222 ],
1223 "default": null,
1224 "field-id": 125
1225 },
1226 {
1227 "name": "upper_bounds",
1228 "type": [
1229 "null",
1230 {
1231 "type": "array",
1232 "logicalType": "map",
1233 "items": {
1234 "type": "record",
1235 "name": "k129_v130",
1236 "fields": [
1237 {
1238 "name": "key",
1239 "type": "int",
1240 "field-id": 129
1241 },
1242 {
1243 "name": "value",
1244 "type": "bytes",
1245 "field-id": 130
1246 }
1247 ]
1248 }
1249 }
1250 ],
1251 "default": null,
1252 "field-id": 128
1253 },
1254 {
1255 "name": "key_metadata",
1256 "type": [
1257 "null",
1258 "bytes"
1259 ],
1260 "default": null,
1261 "field-id": 131
1262 },
1263 {
1264 "name": "split_offsets",
1265 "type": [
1266 "null",
1267 {
1268 "type": "array",
1269 "items": "long",
1270 "element-id": 133
1271 }
1272 ],
1273 "default": null,
1274 "field-id": 132
1275 },
1276 {
1277 "name": "equality_ids",
1278 "type": [
1279 "null",
1280 {
1281 "type": "array",
1282 "items": "int",
1283 "element-id": 136
1284 }
1285 ],
1286 "default": null,
1287 "field-id": 135
1288 },
1289 {
1290 "name": "sort_order_id",
1291 "type": [
1292 "null",
1293 "int"
1294 ],
1295 "default": null,
1296 "field-id": 140
1297 }
1298 ]
1299 }"#
1300 }
1301}
1302
1303#[cfg(test)]
1304mod tests {
1305 use crate::spec::{
1306 partition::{PartitionField, Transform},
1307 table_metadata::TableMetadataBuilder,
1308 types::{PrimitiveType, StructField, Type},
1309 values::Value,
1310 };
1311
1312 use super::*;
1313 use apache_avro::{self, types::Value as AvroValue};
1314
1315 #[test]
1316 fn manifest_entry() {
1317 let table_metadata = TableMetadataBuilder::default()
1318 .location("/")
1319 .current_schema_id(0)
1320 .schemas(HashMap::from_iter(vec![(
1321 0,
1322 Schema::builder()
1323 .with_struct_field(StructField {
1324 id: 0,
1325 name: "date".to_string(),
1326 required: true,
1327 field_type: Type::Primitive(PrimitiveType::Date),
1328 doc: None,
1329 })
1330 .build()
1331 .unwrap(),
1332 )]))
1333 .default_spec_id(0)
1334 .partition_specs(HashMap::from_iter(vec![(
1335 0,
1336 PartitionSpec::builder()
1337 .with_partition_field(PartitionField::new(0, 1000, "day", Transform::Day))
1338 .build()
1339 .unwrap(),
1340 )]))
1341 .build()
1342 .unwrap();
1343
1344 let manifest_entry = ManifestEntry {
1345 format_version: FormatVersion::V2,
1346 status: Status::Added,
1347 snapshot_id: Some(638933773299822130),
1348 sequence_number: Some(1),
1349 data_file: DataFile {
1350 content: Content::Data,
1351 file_path: "/".to_string(),
1352 file_format: FileFormat::Parquet,
1353 partition: Struct::from_iter(vec![("day".to_owned(), Some(Value::Int(1)))]),
1354 record_count: 4,
1355 file_size_in_bytes: 1200,
1356 column_sizes: None,
1357 value_counts: None,
1358 null_value_counts: None,
1359 nan_value_counts: None,
1360 lower_bounds: Some(HashMap::from_iter(vec![(0, Value::Date(0))])),
1361 upper_bounds: None,
1362 key_metadata: None,
1363 split_offsets: None,
1364 equality_ids: None,
1365 sort_order_id: None,
1366 },
1367 };
1368
1369 let partition_schema =
1370 partition_value_schema(&table_metadata.current_partition_fields(None).unwrap())
1371 .unwrap();
1372
1373 let schema = ManifestEntry::schema(&partition_schema, &FormatVersion::V2).unwrap();
1374
1375 let partition_spec = r#"[{
1377 "source-id": 4,
1378 "field-id": 1000,
1379 "name": "date",
1380 "transform": "day"
1381 }]"#;
1382 let partition_spec_id = "0";
1383 let table_schema = r#"{"schema": "0"}"#;
1385 let table_schema_id = "1";
1386 let format_version = FormatVersion::V1;
1387 let content = "DATA";
1388
1389 let meta: std::collections::HashMap<String, apache_avro::types::Value> =
1390 std::collections::HashMap::from_iter(vec![
1391 ("schema".to_string(), AvroValue::Bytes(table_schema.into())),
1392 (
1393 "schema-id".to_string(),
1394 AvroValue::Bytes(table_schema_id.into()),
1395 ),
1396 (
1397 "partition-spec".to_string(),
1398 AvroValue::Bytes(partition_spec.into()),
1399 ),
1400 (
1401 "partition-spec-id".to_string(),
1402 AvroValue::Bytes(partition_spec_id.into()),
1403 ),
1404 (
1405 "format-version".to_string(),
1406 AvroValue::Bytes(vec![u8::from(format_version)]),
1407 ),
1408 ("content".to_string(), AvroValue::Bytes(content.into())),
1409 ]);
1410 let mut writer = apache_avro::Writer::builder()
1411 .schema(&schema)
1412 .writer(vec![])
1413 .user_metadata(meta)
1414 .build();
1415 writer.append_ser(manifest_entry.clone()).unwrap();
1416
1417 let encoded = writer.into_inner().unwrap();
1418
1419 let reader = apache_avro::Reader::new(&encoded[..]).unwrap();
1420
1421 for value in reader {
1422 let entry = apache_avro::from_value::<ManifestEntryV2>(&value.unwrap()).unwrap();
1423 assert_eq!(
1424 manifest_entry,
1425 ManifestEntry::try_from_v2(
1426 entry,
1427 table_metadata.current_schema(None).unwrap(),
1428 table_metadata.default_partition_spec().unwrap()
1429 )
1430 .unwrap()
1431 )
1432 }
1433 }
1434
1435 #[test]
1436 fn test_read_manifest_entry() {
1437 let table_metadata = TableMetadataBuilder::default()
1438 .location("/")
1439 .current_schema_id(0)
1440 .schemas(HashMap::from_iter(vec![(
1441 0,
1442 Schema::builder()
1443 .with_struct_field(StructField {
1444 id: 0,
1445 name: "date".to_string(),
1446 required: true,
1447 field_type: Type::Primitive(PrimitiveType::Date),
1448 doc: None,
1449 })
1450 .build()
1451 .unwrap(),
1452 )]))
1453 .default_spec_id(0)
1454 .partition_specs(HashMap::from_iter(vec![(
1455 0,
1456 PartitionSpec::builder()
1457 .with_partition_field(PartitionField::new(0, 1000, "day", Transform::Day))
1458 .build()
1459 .unwrap(),
1460 )]))
1461 .build()
1462 .unwrap();
1463
1464 let manifest_entry = ManifestEntry {
1465 format_version: FormatVersion::V2,
1466 status: Status::Added,
1467 snapshot_id: Some(638933773299822130),
1468 sequence_number: Some(1),
1469 data_file: DataFile {
1470 content: Content::Data,
1471 file_path: "/".to_string(),
1472 file_format: FileFormat::Parquet,
1473 partition: Struct::from_iter(vec![("day".to_owned(), Some(Value::Int(1)))]),
1474 record_count: 4,
1475 file_size_in_bytes: 1200,
1476 column_sizes: None,
1477 value_counts: None,
1478 null_value_counts: None,
1479 nan_value_counts: None,
1480 lower_bounds: Some(HashMap::from_iter(vec![(0, Value::Date(0))])),
1481 upper_bounds: None,
1482 key_metadata: None,
1483 split_offsets: None,
1484 equality_ids: None,
1485 sort_order_id: None,
1486 },
1487 };
1488
1489 let partition_schema =
1490 partition_value_schema(&table_metadata.current_partition_fields(None).unwrap())
1491 .unwrap();
1492
1493 let schema = ManifestEntry::schema(&partition_schema, &FormatVersion::V2).unwrap();
1494
1495 let partition_spec = r#"[{
1497 "source-id": 4,
1498 "field-id": 1000,
1499 "name": "date",
1500 "transform": "day"
1501 }]"#;
1502 let partition_spec_id = "0";
1503 let table_schema = r#"{"schema": "0"}"#;
1505 let table_schema_id = "1";
1506 let format_version = "1";
1507 let content = "DATA";
1508
1509 let meta: std::collections::HashMap<String, apache_avro::types::Value> =
1510 std::collections::HashMap::from_iter(vec![
1511 ("schema".to_string(), AvroValue::Bytes(table_schema.into())),
1512 (
1513 "schema-id".to_string(),
1514 AvroValue::Bytes(table_schema_id.into()),
1515 ),
1516 (
1517 "partition-spec".to_string(),
1518 AvroValue::Bytes(partition_spec.into()),
1519 ),
1520 (
1521 "partition-spec-id".to_string(),
1522 AvroValue::Bytes(partition_spec_id.into()),
1523 ),
1524 (
1525 "format-version".to_string(),
1526 AvroValue::Bytes(format_version.into()),
1527 ),
1528 ("content".to_string(), AvroValue::Bytes(content.into())),
1529 ]);
1530 let mut writer = apache_avro::Writer::builder()
1531 .schema(&schema)
1532 .writer(vec![])
1533 .user_metadata(meta)
1534 .build();
1535 writer.append_ser(manifest_entry.clone()).unwrap();
1536
1537 let encoded = writer.into_inner().unwrap();
1538
1539 let reader = apache_avro::Reader::new(&encoded[..]).unwrap();
1540 let record = reader.into_iter().next().unwrap().unwrap();
1541
1542 let metadata_entry = apache_avro::from_value::<ManifestEntryV2>(&record).unwrap();
1543 assert_eq!(
1544 manifest_entry,
1545 ManifestEntry::try_from_v2(
1546 metadata_entry,
1547 table_metadata.current_schema(None).unwrap(),
1548 table_metadata.default_partition_spec().unwrap()
1549 )
1550 .unwrap()
1551 );
1552 }
1553
1554 #[test]
1555 pub fn test_partition_values() {
1556 let partition_values = Struct::from_iter(vec![("day".to_owned(), Some(Value::Int(1)))]);
1557
1558 let part_field = PartitionField::new(4, 1000, "day", Transform::Day);
1559 let field = StructField {
1560 id: 4,
1561 name: "day".to_owned(),
1562 required: false,
1563 field_type: Type::Primitive(PrimitiveType::Int),
1564 doc: None,
1565 };
1566 let partition_fields = vec![BoundPartitionField::new(&part_field, &field)];
1567
1568 let raw_schema = partition_value_schema(&partition_fields).unwrap();
1569
1570 let schema = apache_avro::Schema::parse_str(&raw_schema).unwrap();
1571
1572 let mut writer = apache_avro::Writer::new(&schema, Vec::new());
1573
1574 writer.append_ser(partition_values.clone()).unwrap();
1575
1576 let encoded = writer.into_inner().unwrap();
1577
1578 let reader = apache_avro::Reader::new(&*encoded).unwrap();
1579
1580 for record in reader {
1581 let result = apache_avro::from_value::<Struct>(&record.unwrap()).unwrap();
1582 assert_eq!(partition_values, result);
1583 }
1584 }
1585}