1use std::sync::OnceLock;
16
17use apache_avro::{types::Value as AvroValue, Schema as AvroSchema};
18use serde::{Deserialize, Serialize};
19use serde_bytes::ByteBuf;
20use serde_repr::{Deserialize_repr, Serialize_repr};
21
22use crate::error::Error;
23
24use self::_serde::{FieldSummarySerde, ManifestListEntryV1, ManifestListEntryV2};
25
26use super::{
27 table_metadata::{FormatVersion, TableMetadata},
28 types::Type,
29 values::Value,
30};
31
32#[derive(Debug, Serialize, PartialEq, Eq, Clone)]
33#[serde(into = "ManifestListEntryEnum")]
34pub struct ManifestListEntry {
37 pub format_version: FormatVersion,
39 pub manifest_path: String,
41 pub manifest_length: i64,
43 pub partition_spec_id: i32,
45 pub content: Content,
47 pub sequence_number: i64,
49 pub min_sequence_number: i64,
51 pub added_snapshot_id: i64,
53 pub added_files_count: Option<i32>,
55 pub existing_files_count: Option<i32>,
57 pub deleted_files_count: Option<i32>,
59 pub added_rows_count: Option<i64>,
61 pub existing_rows_count: Option<i64>,
63 pub deleted_rows_count: Option<i64>,
65 pub partitions: Option<Vec<FieldSummary>>,
67 pub key_metadata: Option<ByteBuf>,
69}
70
71#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
73#[serde(untagged)]
74pub enum ManifestListEntryEnum {
75 V2(ManifestListEntryV2),
77 V1(ManifestListEntryV1),
79}
80
81#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
82#[serde(into = "FieldSummarySerde")]
83pub struct FieldSummary {
85 pub contains_null: bool,
87 pub contains_nan: Option<bool>,
89 pub lower_bound: Option<Value>,
92 pub upper_bound: Option<Value>,
95}
96
97#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy)]
98#[repr(u8)]
99pub enum Content {
101 Data = 0,
103 Deletes = 1,
105}
106
107mod _serde {
108 use crate::spec::table_metadata::FormatVersion;
109
110 use super::{Content, FieldSummary, ManifestListEntry, ManifestListEntryEnum};
111 use serde::{Deserialize, Serialize};
112 use serde_bytes::ByteBuf;
113
114 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
115 pub struct ManifestListEntryV2 {
118 pub manifest_path: String,
120 pub manifest_length: i64,
122 pub partition_spec_id: i32,
124 pub content: Content,
126 pub sequence_number: i64,
128 pub min_sequence_number: i64,
130 pub added_snapshot_id: i64,
132 pub added_files_count: i32,
134 pub existing_files_count: i32,
136 pub deleted_files_count: i32,
138 pub added_rows_count: i64,
140 pub existing_rows_count: i64,
142 pub deleted_rows_count: i64,
144 pub partitions: Option<Vec<FieldSummarySerde>>,
146 pub key_metadata: Option<ByteBuf>,
148 }
149
150 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
151 pub struct ManifestListEntryV1 {
154 pub manifest_path: String,
156 pub manifest_length: i64,
158 pub partition_spec_id: i32,
160 pub added_snapshot_id: i64,
162 pub added_files_count: Option<i32>,
164 pub existing_files_count: Option<i32>,
166 pub deleted_files_count: Option<i32>,
168 pub added_rows_count: Option<i64>,
170 pub existing_rows_count: Option<i64>,
172 pub deleted_rows_count: Option<i64>,
174 pub partitions: Option<Vec<FieldSummarySerde>>,
176 pub key_metadata: Option<ByteBuf>,
178 }
179
180 impl From<ManifestListEntry> for ManifestListEntryEnum {
181 fn from(value: ManifestListEntry) -> Self {
182 match &value.format_version {
183 FormatVersion::V2 => ManifestListEntryEnum::V2(value.into()),
184 FormatVersion::V1 => ManifestListEntryEnum::V1(value.into()),
185 }
186 }
187 }
188
189 impl From<ManifestListEntry> for ManifestListEntryV1 {
190 fn from(value: ManifestListEntry) -> Self {
191 ManifestListEntryV1 {
192 manifest_path: value.manifest_path,
193 manifest_length: value.manifest_length,
194 partition_spec_id: value.partition_spec_id,
195 added_snapshot_id: value.added_snapshot_id,
196 added_files_count: value.added_files_count,
197 existing_files_count: value.existing_files_count,
198 deleted_files_count: value.deleted_files_count,
199 added_rows_count: value.added_rows_count,
200 existing_rows_count: value.existing_rows_count,
201 deleted_rows_count: value.deleted_rows_count,
202 partitions: value
203 .partitions
204 .map(|v| v.into_iter().map(Into::into).collect()),
205 key_metadata: value.key_metadata,
206 }
207 }
208 }
209
210 impl From<ManifestListEntry> for ManifestListEntryV2 {
211 fn from(value: ManifestListEntry) -> Self {
212 ManifestListEntryV2 {
213 manifest_path: value.manifest_path,
214 manifest_length: value.manifest_length,
215 partition_spec_id: value.partition_spec_id,
216 content: value.content,
217 sequence_number: value.sequence_number,
218 min_sequence_number: value.min_sequence_number,
219 added_snapshot_id: value.added_snapshot_id,
220 added_files_count: value.added_files_count.unwrap(),
221 existing_files_count: value.existing_files_count.unwrap(),
222 deleted_files_count: value.deleted_files_count.unwrap(),
223 added_rows_count: value.added_rows_count.unwrap(),
224 existing_rows_count: value.existing_rows_count.unwrap(),
225 deleted_rows_count: value.deleted_rows_count.unwrap(),
226 partitions: value
227 .partitions
228 .map(|v| v.into_iter().map(Into::into).collect()),
229 key_metadata: value.key_metadata,
230 }
231 }
232 }
233
234 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
235 pub struct FieldSummarySerde {
237 pub contains_null: bool,
239 pub contains_nan: Option<bool>,
241 pub lower_bound: Option<ByteBuf>,
244 pub upper_bound: Option<ByteBuf>,
247 }
248
249 impl From<FieldSummary> for FieldSummarySerde {
250 fn from(value: FieldSummary) -> Self {
251 FieldSummarySerde {
252 contains_null: value.contains_null,
253 contains_nan: value.contains_nan,
254 lower_bound: value.lower_bound.map(Into::into),
255 upper_bound: value.upper_bound.map(Into::into),
256 }
257 }
258 }
259}
260
261impl ManifestListEntry {
262 pub fn try_from_enum(
263 entry: ManifestListEntryEnum,
264 table_metadata: &TableMetadata,
265 ) -> Result<ManifestListEntry, Error> {
266 match entry {
267 ManifestListEntryEnum::V2(entry) => {
268 ManifestListEntry::try_from_v2(entry, table_metadata)
269 }
270 ManifestListEntryEnum::V1(entry) => {
271 ManifestListEntry::try_from_v1(entry, table_metadata)
272 }
273 }
274 }
275
276 pub fn try_from_v2(
277 entry: _serde::ManifestListEntryV2,
278 table_metadata: &TableMetadata,
279 ) -> Result<ManifestListEntry, Error> {
280 let partition_types = table_metadata.default_partition_spec()?.data_types(
281 table_metadata
282 .current_schema(None)
283 .or(table_metadata
284 .refs
285 .values()
286 .next()
287 .ok_or(Error::NotFound("Current schema".to_string()))
288 .and_then(|x| table_metadata.schema(x.snapshot_id)))
289 .unwrap()
290 .fields(),
291 )?;
292 Ok(ManifestListEntry {
293 format_version: FormatVersion::V2,
294 manifest_path: entry.manifest_path,
295 manifest_length: entry.manifest_length,
296 partition_spec_id: entry.partition_spec_id,
297 content: entry.content,
298 sequence_number: entry.sequence_number,
299 min_sequence_number: entry.min_sequence_number,
300 added_snapshot_id: entry.added_snapshot_id,
301 added_files_count: Some(entry.added_files_count),
302 existing_files_count: Some(entry.existing_files_count),
303 deleted_files_count: Some(entry.deleted_files_count),
304 added_rows_count: Some(entry.added_rows_count),
305 existing_rows_count: Some(entry.existing_rows_count),
306 deleted_rows_count: Some(entry.deleted_rows_count),
307 partitions: entry
308 .partitions
309 .map(|v| {
310 v.into_iter()
311 .zip(partition_types.iter())
312 .map(|(x, d)| FieldSummary::try_from(x, d))
313 .collect::<Result<Vec<_>, Error>>()
314 })
315 .transpose()?,
316 key_metadata: entry.key_metadata,
317 })
318 }
319
320 pub fn try_from_v1(
321 entry: _serde::ManifestListEntryV1,
322 table_metadata: &TableMetadata,
323 ) -> Result<ManifestListEntry, Error> {
324 let partition_types = table_metadata.default_partition_spec()?.data_types(
325 table_metadata
326 .current_schema(None)
327 .or(table_metadata
328 .refs
329 .values()
330 .next()
331 .ok_or(Error::NotFound("Current schema".to_string()))
332 .and_then(|x| table_metadata.schema(x.snapshot_id)))
333 .unwrap()
334 .fields(),
335 )?;
336 Ok(ManifestListEntry {
337 format_version: FormatVersion::V1,
338 manifest_path: entry.manifest_path,
339 manifest_length: entry.manifest_length,
340 partition_spec_id: entry.partition_spec_id,
341 content: Content::Data,
342 sequence_number: 0,
343 min_sequence_number: 0,
344 added_snapshot_id: entry.added_snapshot_id,
345 added_files_count: entry.added_files_count,
346 existing_files_count: entry.existing_files_count,
347 deleted_files_count: entry.deleted_files_count,
348 added_rows_count: entry.added_rows_count,
349 existing_rows_count: entry.existing_rows_count,
350 deleted_rows_count: entry.deleted_rows_count,
351 partitions: entry
352 .partitions
353 .map(|v| {
354 v.into_iter()
355 .zip(partition_types.iter())
356 .map(|(x, d)| FieldSummary::try_from(x, d))
357 .collect::<Result<Vec<_>, Error>>()
358 })
359 .transpose()?,
360 key_metadata: entry.key_metadata,
361 })
362 }
363}
364
365impl FieldSummary {
366 fn try_from(value: _serde::FieldSummarySerde, data_type: &Type) -> Result<Self, Error> {
367 Ok(FieldSummary {
368 contains_null: value.contains_null,
369 contains_nan: value.contains_nan,
370 lower_bound: value
371 .lower_bound
372 .map(|x| Value::try_from_bytes(&x, data_type))
373 .transpose()?,
374 upper_bound: value
375 .upper_bound
376 .map(|x| Value::try_from_bytes(&x, data_type))
377 .transpose()?,
378 })
379 }
380}
381
382pub fn manifest_list_schema_v1() -> &'static AvroSchema {
383 static MANIFEST_LIST_SCHEMA_V1: OnceLock<AvroSchema> = OnceLock::new();
384 MANIFEST_LIST_SCHEMA_V1.get_or_init(|| {
385 AvroSchema::parse_str(
386 r#"
387 {
388 "type": "record",
389 "name": "manifest_file",
390 "fields": [
391 {
392 "name": "manifest_path",
393 "type": "string",
394 "field-id": 500
395 },
396 {
397 "name": "manifest_length",
398 "type": "long",
399 "field-id": 501
400 },
401 {
402 "name": "partition_spec_id",
403 "type": "int",
404 "field-id": 502
405 },
406 {
407 "name": "added_snapshot_id",
408 "type": "long",
409 "field-id": 503
410 },
411 {
412 "name": "added_files_count",
413 "type": [
414 "null",
415 "int"
416 ],
417 "default": null,
418 "field-id": 504
419 },
420 {
421 "name": "existing_files_count",
422 "type": [
423 "null",
424 "int"
425 ],
426 "default": null,
427 "field-id": 505
428 },
429 {
430 "name": "deleted_files_count",
431 "type": [
432 "null",
433 "int"
434 ],
435 "default": null,
436 "field-id": 506
437 },
438 {
439 "name": "added_rows_count",
440 "type": [
441 "null",
442 "long"
443 ],
444 "default": null,
445 "field-id": 512
446 },
447 {
448 "name": "existing_rows_count",
449 "type": [
450 "null",
451 "long"
452 ],
453 "default": null,
454 "field-id": 513
455 },
456 {
457 "name": "deleted_rows_count",
458 "type": [
459 "null",
460 "long"
461 ],
462 "default": null,
463 "field-id": 514
464 },
465 {
466 "name": "partitions",
467 "type": [
468 "null",
469 {
470 "type": "array",
471 "items": {
472 "type": "record",
473 "name": "r508",
474 "fields": [
475 {
476 "name": "contains_null",
477 "type": "boolean",
478 "field-id": 509
479 },
480 {
481 "name": "contains_nan",
482 "type": [
483 "null",
484 "boolean"
485 ],
486 "field-id": 518
487 },
488 {
489 "name": "lower_bound",
490 "type": [
491 "null",
492 "bytes"
493 ],
494 "field-id": 510
495 },
496 {
497 "name": "upper_bound",
498 "type": [
499 "null",
500 "bytes"
501 ],
502 "field-id": 511
503 }
504 ]
505 },
506 "element-id": 508
507 }
508 ],
509 "default": null,
510 "field-id": 507
511 },
512 {
513 "name": "key_metadata",
514 "type": [
515 "null",
516 "bytes"
517 ],
518 "field-id": 519
519 }
520 ]
521 }
522 "#,
523 )
524 .unwrap()
525 })
526}
527pub fn manifest_list_schema_v2() -> &'static AvroSchema {
528 static MANIFEST_LIST_SCHEMA_V2: OnceLock<AvroSchema> = OnceLock::new();
529 MANIFEST_LIST_SCHEMA_V2.get_or_init(|| {
530 AvroSchema::parse_str(
531 r#"
532 {
533 "type": "record",
534 "name": "manifest_file",
535 "fields": [
536 {
537 "name": "manifest_path",
538 "type": "string",
539 "field-id": 500
540 },
541 {
542 "name": "manifest_length",
543 "type": "long",
544 "field-id": 501
545 },
546 {
547 "name": "partition_spec_id",
548 "type": "int",
549 "field-id": 502
550 },
551 {
552 "name": "content",
553 "type": "int",
554 "field-id": 517
555 },
556 {
557 "name": "sequence_number",
558 "type": "long",
559 "field-id": 515
560 },
561 {
562 "name": "min_sequence_number",
563 "type": "long",
564 "field-id": 516
565 },
566 {
567 "name": "added_snapshot_id",
568 "type": "long",
569 "field-id": 503
570 },
571 {
572 "name": "added_files_count",
573 "type": "int",
574 "field-id": 504
575 },
576 {
577 "name": "existing_files_count",
578 "type": "int",
579 "field-id": 505
580 },
581 {
582 "name": "deleted_files_count",
583 "type": "int",
584 "field-id": 506
585 },
586 {
587 "name": "added_rows_count",
588 "type": "long",
589 "field-id": 512
590 },
591 {
592 "name": "existing_rows_count",
593 "type": "long",
594 "field-id": 513
595 },
596 {
597 "name": "deleted_rows_count",
598 "type": "long",
599 "field-id": 514
600 },
601 {
602 "name": "partitions",
603 "type": [
604 "null",
605 {
606 "type": "array",
607 "items": {
608 "type": "record",
609 "name": "r508",
610 "fields": [
611 {
612 "name": "contains_null",
613 "type": "boolean",
614 "field-id": 509
615 },
616 {
617 "name": "contains_nan",
618 "type": [
619 "null",
620 "boolean"
621 ],
622 "field-id": 518
623 },
624 {
625 "name": "lower_bound",
626 "type": [
627 "null",
628 "bytes"
629 ],
630 "field-id": 510
631 },
632 {
633 "name": "upper_bound",
634 "type": [
635 "null",
636 "bytes"
637 ],
638 "field-id": 511
639 }
640 ]
641 },
642 "element-id": 508
643 }
644 ],
645 "default": null,
646 "field-id": 507
647 },
648 {
649 "name": "key_metadata",
650 "type": [
651 "null",
652 "bytes"
653 ],
654 "field-id": 519
655 }
656 ]
657 }
658 "#,
659 )
660 .unwrap()
661 })
662}
663
664pub fn avro_value_to_manifest_list_entry(
666 value: Result<AvroValue, apache_avro::Error>,
667 table_metadata: &TableMetadata,
668) -> Result<ManifestListEntry, Error> {
669 let entry = value?;
670 match table_metadata.format_version {
671 FormatVersion::V1 => ManifestListEntry::try_from_v1(
672 apache_avro::from_value::<_serde::ManifestListEntryV1>(&entry)?,
673 table_metadata,
674 ),
675 FormatVersion::V2 => ManifestListEntry::try_from_v2(
676 apache_avro::from_value::<_serde::ManifestListEntryV2>(&entry)?,
677 table_metadata,
678 ),
679 }
680}
681
682#[cfg(test)]
683mod tests {
684
685 use std::collections::HashMap;
686
687 use super::*;
688
689 use crate::spec::{
690 partition::{PartitionField, PartitionSpec, Transform},
691 schema::Schema,
692 table_metadata::TableMetadataBuilder,
693 types::{PrimitiveType, StructField},
694 };
695
696 #[test]
697 pub fn test_manifest_list_v2() {
698 let table_metadata = TableMetadataBuilder::default()
699 .location("/")
700 .current_schema_id(1)
701 .schemas(HashMap::from_iter(vec![(
702 1,
703 Schema::builder()
704 .with_schema_id(1)
705 .with_struct_field(StructField {
706 id: 0,
707 name: "date".to_string(),
708 required: true,
709 field_type: Type::Primitive(PrimitiveType::Date),
710 doc: None,
711 })
712 .build()
713 .unwrap(),
714 )]))
715 .default_spec_id(0)
716 .partition_specs(HashMap::from_iter(vec![(
717 0,
718 PartitionSpec::builder()
719 .with_partition_field(PartitionField::new(0, 1000, "day", Transform::Day))
720 .build()
721 .unwrap(),
722 )]))
723 .build()
724 .unwrap();
725
726 let manifest_file = ManifestListEntry {
727 format_version: FormatVersion::V2,
728 manifest_path: "".to_string(),
729 manifest_length: 1200,
730 partition_spec_id: 0,
731 content: Content::Data,
732 sequence_number: 566,
733 min_sequence_number: 0,
734 added_snapshot_id: 39487483032,
735 added_files_count: Some(1),
736 existing_files_count: Some(2),
737 deleted_files_count: Some(0),
738 added_rows_count: Some(1000),
739 existing_rows_count: Some(8000),
740 deleted_rows_count: Some(0),
741 partitions: Some(vec![FieldSummary {
742 contains_null: true,
743 contains_nan: Some(false),
744 lower_bound: Some(Value::Int(1234)),
745 upper_bound: Some(Value::Int(76890)),
746 }]),
747 key_metadata: None,
748 };
749
750 let schema = manifest_list_schema_v2();
751
752 let mut writer = apache_avro::Writer::new(schema, Vec::new());
753
754 writer.append_ser(manifest_file.clone()).unwrap();
755
756 let encoded = writer.into_inner().unwrap();
757
758 let reader = apache_avro::Reader::new(&*encoded).unwrap();
759
760 for record in reader {
761 let result =
762 apache_avro::from_value::<_serde::ManifestListEntryV2>(&record.unwrap()).unwrap();
763 assert_eq!(
764 manifest_file,
765 ManifestListEntry::try_from_v2(result, &table_metadata).unwrap()
766 );
767 }
768 }
769
770 #[test]
771 pub fn test_manifest_list_v1() {
772 let table_metadata = TableMetadataBuilder::default()
773 .format_version(FormatVersion::V1)
774 .location("/")
775 .current_schema_id(1)
776 .schemas(HashMap::from_iter(vec![(
777 1,
778 Schema::builder()
779 .with_schema_id(1)
780 .with_struct_field(StructField {
781 id: 0,
782 name: "date".to_string(),
783 required: true,
784 field_type: Type::Primitive(PrimitiveType::Date),
785 doc: None,
786 })
787 .build()
788 .unwrap(),
789 )]))
790 .default_spec_id(0)
791 .partition_specs(HashMap::from_iter(vec![(
792 0,
793 PartitionSpec::builder()
794 .with_partition_field(PartitionField::new(0, 1000, "day", Transform::Day))
795 .build()
796 .unwrap(),
797 )]))
798 .build()
799 .unwrap();
800
801 let manifest_file = ManifestListEntry {
802 format_version: FormatVersion::V1,
803 manifest_path: "".to_string(),
804 manifest_length: 1200,
805 partition_spec_id: 0,
806 content: Content::Data,
807 sequence_number: 0,
808 min_sequence_number: 0,
809 added_snapshot_id: 39487483032,
810 added_files_count: Some(1),
811 existing_files_count: Some(2),
812 deleted_files_count: Some(0),
813 added_rows_count: Some(1000),
814 existing_rows_count: Some(8000),
815 deleted_rows_count: Some(0),
816 partitions: Some(vec![FieldSummary {
817 contains_null: true,
818 contains_nan: Some(false),
819 lower_bound: Some(Value::Int(1234)),
820 upper_bound: Some(Value::Int(76890)),
821 }]),
822 key_metadata: None,
823 };
824
825 let schema = manifest_list_schema_v1();
826
827 let mut writer = apache_avro::Writer::new(schema, Vec::new());
828
829 writer.append_ser(manifest_file.clone()).unwrap();
830
831 let encoded = writer.into_inner().unwrap();
832
833 let reader = apache_avro::Reader::new(&*encoded).unwrap();
834
835 for record in reader {
836 let result =
837 apache_avro::from_value::<_serde::ManifestListEntryV1>(&record.unwrap()).unwrap();
838 assert_eq!(
839 manifest_file,
840 ManifestListEntry::try_from_v1(result, &table_metadata).unwrap()
841 );
842 }
843 }
844}