1use std::fmt::{Display, Formatter};
4use std::hash::Hasher;
5use std::sync::Arc;
6use std::{collections::HashMap, str::FromStr};
7
8use bitvec::vec::BitVec;
9use chrono::NaiveDate;
10use chrono::NaiveDateTime;
11use chrono::NaiveTime;
12use chrono::Utc;
13use chrono::{DateTime, Datelike};
14use derive_builder::Builder;
15use opendal::Operator;
16use ordered_float::OrderedFloat;
17use parquet::format::FileMetaData;
18use serde::ser::SerializeMap;
19use serde::ser::SerializeStruct;
20use serde::Serialize;
21use std::hash::Hash;
22use uuid::Uuid;
23
24use crate::types::in_memory::_decimal::REQUIRED_LENGTH;
25use crate::types::parse_manifest_list;
26use crate::ErrorKind;
27use crate::Result;
28use crate::{Error, Table};
29
30pub(crate) const UNASSIGNED_SEQ_NUM: i64 = -1;
31pub(crate) const MAIN_BRANCH: &str = "main";
32const EMPTY_SNAPSHOT_ID: i64 = -1;
33
34pub(crate) const MAX_DECIMAL_BYTES: u32 = 24;
35pub(crate) const MAX_DECIMAL_PRECISION: u32 = 38;
36
37mod _decimal {
38 use lazy_static::lazy_static;
39
40 use super::{MAX_DECIMAL_BYTES, MAX_DECIMAL_PRECISION};
41
42 lazy_static! {
43 pub(super) static ref MAX_PRECISION: [u32; MAX_DECIMAL_BYTES as usize] = {
45 let mut ret: [u32; 24] = [0; 24];
46 for (i, prec) in ret.iter_mut().enumerate() {
47 *prec = 2f64.powi((8 * (i + 1) - 1) as i32).log10().floor() as u32;
48 }
49
50 ret
51 };
52
53 pub(super) static ref REQUIRED_LENGTH: [u32; MAX_DECIMAL_PRECISION as usize] = {
55 let mut ret: [u32; MAX_DECIMAL_PRECISION as usize] = [0; MAX_DECIMAL_PRECISION as usize];
56
57 for (i, required_len) in ret.iter_mut().enumerate() {
58 for j in 0..MAX_PRECISION.len() {
59 if MAX_PRECISION[j] >= ((i+1) as u32) {
60 *required_len = (j+1) as u32;
61 break;
62 }
63 }
64 }
65
66 ret
67 };
68
69 }
70}
71
72#[derive(Debug, PartialEq, Clone, Eq)]
74pub enum Any {
75 Primitive(Primitive),
77 Struct(Arc<Struct>),
79 List(List),
81 Map(Map),
83}
84
85#[allow(clippy::derived_hash_with_manual_eq)]
91#[derive(Debug, Clone, Eq, Hash)]
92pub enum AnyValue {
93 Primitive(PrimitiveValue),
95 Struct(StructValue),
102 List(Vec<Option<AnyValue>>),
104 Map {
109 keys: Vec<AnyValue>,
111 values: Vec<Option<AnyValue>>,
113 },
114}
115
116impl PartialEq for AnyValue {
117 fn eq(&self, other: &Self) -> bool {
118 match (self, other) {
119 (Self::Primitive(l0), Self::Primitive(r0)) => l0 == r0,
120 (Self::Struct(l0), Self::Struct(r0)) => l0 == r0,
121 (Self::List(l0), Self::List(r0)) => l0 == r0,
122 (
123 Self::Map {
124 keys: l_keys,
125 values: l_values,
126 },
127 Self::Map {
128 keys: r_keys,
129 values: r_values,
130 },
131 ) => {
132 let mut map = HashMap::with_capacity(l_keys.len());
135 l_keys.iter().zip(l_values.iter()).for_each(|(key, value)| {
136 map.insert(key, value);
137 });
138 r_keys
139 .iter()
140 .zip(r_values.iter())
141 .all(|(key, value)| map.get(key).map_or(false, |v| *v == value))
142 }
143 _ => false,
144 }
145 }
146}
147
148impl Serialize for AnyValue {
149 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
150 where
151 S: serde::Serializer,
152 {
153 match self {
154 AnyValue::Primitive(value) => value.serialize(serializer),
155 AnyValue::Struct(value) => value.serialize(serializer),
156 AnyValue::List(value) => value.serialize(serializer),
157 AnyValue::Map { keys, values } => {
158 let mut map = serializer.serialize_map(Some(keys.len()))?;
159 for (key, value) in keys.iter().zip(values.iter()) {
160 map.serialize_entry(key, value)?;
161 }
162 map.end()
163 }
164 }
165 }
166}
167
168#[derive(Debug, PartialEq, Eq, Clone, Copy)]
170pub enum Primitive {
171 Boolean,
173 Int,
175 Long,
177 Float,
179 Double,
181 Decimal {
186 precision: u8,
188 scale: u8,
190 },
191 Date,
193 Time,
197 Timestamp,
205 Timestampz,
213 String,
217 Uuid,
219 Fixed(u64),
221 Binary,
223}
224
225impl From<Primitive> for Any {
226 fn from(value: Primitive) -> Self {
227 Any::Primitive(value)
228 }
229}
230
231impl Primitive {
232 #[inline(always)]
234 pub fn decimal_required_bytes(precision: u32) -> Result<u32> {
235 if precision == 0 || precision > MAX_DECIMAL_PRECISION {
236 return Err(Error::new(
237 ErrorKind::IcebergDataInvalid,
238 format!(
239 "Decimal precision must be between 1 and {MAX_DECIMAL_PRECISION}: {precision}",
240 ),
241 ));
242 }
243 Ok(REQUIRED_LENGTH[precision as usize - 1])
244 }
245}
246
247#[derive(Debug, PartialEq, Clone, Eq, Hash)]
251pub enum PrimitiveValue {
252 Boolean(bool),
254 Int(i32),
256 Long(i64),
258 Float(OrderedFloat<f32>),
260 Double(OrderedFloat<f64>),
262 Decimal(i128),
264 Date(NaiveDate),
266 Time(NaiveTime),
270 Timestamp(NaiveDateTime),
278 Timestampz(DateTime<Utc>),
286 String(String),
290 Uuid(Uuid),
292 Fixed(Vec<u8>),
294 Binary(Vec<u8>),
296}
297
298impl From<PrimitiveValue> for AnyValue {
299 fn from(value: PrimitiveValue) -> Self {
300 AnyValue::Primitive(value)
301 }
302}
303
304impl Serialize for PrimitiveValue {
305 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
306 where
307 S: serde::Serializer,
308 {
309 match self {
310 PrimitiveValue::Boolean(value) => serializer.serialize_bool(*value),
311 PrimitiveValue::Int(value) => serializer.serialize_i32(*value),
312 PrimitiveValue::Long(value) => serializer.serialize_i64(*value),
313 PrimitiveValue::Float(value) => serializer.serialize_f32(value.0),
314 PrimitiveValue::Double(value) => serializer.serialize_f64(value.0),
315 PrimitiveValue::Decimal(value) => serializer.serialize_bytes(&value.to_be_bytes()),
316 PrimitiveValue::Date(value) => serializer.serialize_i32(value.num_days_from_ce()),
317 PrimitiveValue::Time(value) => serializer.serialize_i64(
318 NaiveDateTime::new(NaiveDate::default(), *value)
319 .and_utc()
320 .timestamp_micros(),
321 ),
322 PrimitiveValue::Timestamp(value) => {
323 serializer.serialize_i64(value.and_utc().timestamp_micros())
324 }
325 PrimitiveValue::Timestampz(value) => serializer.serialize_i64(value.timestamp_micros()),
326 PrimitiveValue::String(value) => serializer.serialize_str(value),
327 PrimitiveValue::Uuid(value) => serializer.serialize_str(&value.to_string()),
328 PrimitiveValue::Fixed(value) => serializer.serialize_bytes(value),
329 PrimitiveValue::Binary(value) => serializer.serialize_bytes(value),
330 }
331 }
332}
333
334#[derive(Default, Debug, Clone, Eq)]
342pub struct Struct {
343 fields: Vec<FieldRef>,
345 id_lookup: HashMap<i32, FieldRef>,
347}
348
349impl PartialEq for Struct {
350 fn eq(&self, other: &Self) -> bool {
351 for (id, field) in self.id_lookup.iter() {
352 if let Some(other_field) = other.id_lookup.get(id) {
353 if field != other_field {
354 return false;
355 }
356 } else {
357 return false;
358 }
359 }
360 true
361 }
362}
363
364impl Struct {
365 pub fn new(fields: Vec<FieldRef>) -> Self {
367 let mut id_lookup = HashMap::with_capacity(fields.len());
368 fields.iter().for_each(|field| {
369 id_lookup.insert(field.id, field.clone());
370 Self::fetch_any_field_id_map(&field.field_type, &mut id_lookup)
371 });
372 Struct { fields, id_lookup }
373 }
374
375 fn fetch_any_field_id_map(ty: &Any, map: &mut HashMap<i32, FieldRef>) {
376 match ty {
377 Any::Primitive(_) => {
378 }
380 Any::Struct(inner) => Self::fetch_struct_field_id_map(inner, map),
381 Any::List(_) => {
382 }
386 Any::Map(_) => {
387 }
391 }
392 }
393
394 fn fetch_struct_field_id_map(ty: &Struct, map: &mut HashMap<i32, FieldRef>) {
395 map.extend(ty.id_lookup.clone())
396 }
397
398 pub fn len(&self) -> usize {
400 self.fields.len()
401 }
402
403 pub fn is_empty(&self) -> bool {
405 self.fields.is_empty()
406 }
407
408 pub fn fields(&self) -> &[FieldRef] {
410 &self.fields
411 }
412
413 pub fn lookup_type(&self, field_id: i32) -> Option<Any> {
415 self.id_lookup
416 .get(&field_id)
417 .map(|field| field.field_type.clone())
418 }
419
420 pub fn lookup_field(&self, field_id: i32) -> Option<&FieldRef> {
422 self.id_lookup.get(&field_id)
423 }
424
425 pub fn lookup_field_by_name(&self, field_name: &str) -> Option<&FieldRef> {
427 self.fields.iter().find(|field| field.name == field_name)
428 }
429}
430
431pub type FieldRef = Arc<Field>;
433
434#[derive(Debug, PartialEq, Clone, Eq)]
436pub struct Field {
437 pub id: i32,
439 pub name: String,
441 pub required: bool,
443 pub field_type: Any,
445 pub comment: Option<String>,
447 pub initial_default: Option<AnyValue>,
449 pub write_default: Option<AnyValue>,
453}
454
455impl Field {
456 pub fn required(id: i32, name: impl Into<String>, r#type: Any) -> Self {
458 Self {
459 id,
460 name: name.into(),
461 required: true,
462 field_type: r#type,
463 comment: None,
464 initial_default: None,
465 write_default: None,
466 }
467 }
468
469 pub fn optional(id: i32, name: impl Into<String>, r#type: Any) -> Self {
471 Self {
472 id,
473 name: name.into(),
474 required: false,
475 field_type: r#type,
476 comment: None,
477 initial_default: None,
478 write_default: None,
479 }
480 }
481
482 fn with_comment(mut self, doc: impl Into<String>) -> Self {
483 self.comment = Some(doc.into());
484 self
485 }
486
487 fn with_required(mut self) -> Self {
488 self.required = true;
489 self
490 }
491}
492
493#[derive(Default, Debug, PartialEq, Clone, Eq)]
495pub struct StructValue {
496 field_values: Vec<AnyValue>,
497 type_info: Arc<Struct>,
498 null_bitmap: BitVec,
499}
500
501impl From<StructValue> for AnyValue {
502 fn from(value: StructValue) -> Self {
503 AnyValue::Struct(value)
504 }
505}
506
507impl StructValue {
508 pub fn iter(&self) -> impl Iterator<Item = (i32, Option<&AnyValue>, &str, bool)> {
511 self.null_bitmap
512 .iter()
513 .zip(self.field_values.iter())
514 .zip(self.type_info.fields().iter())
515 .map(|((null, value), field)| {
516 (
517 field.id,
518 if *null { None } else { Some(value) },
519 field.name.as_str(),
520 field.required,
521 )
522 })
523 }
524}
525
526impl Serialize for StructValue {
527 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
528 where
529 S: serde::Serializer,
530 {
531 let mut record = serializer.serialize_struct("", self.field_values.len())?;
532 for (_, value, key, required) in self.iter() {
533 if required {
534 record.serialize_field(Box::leak(key.to_string().into_boxed_str()), &value.expect("Struct Builder should guaranteed that the value is always if the field is required."))?;
535 } else {
536 record.serialize_field(Box::leak(key.to_string().into_boxed_str()), &value)?;
537 }
538 }
539 record.end()
540 }
541}
542
543impl Hash for StructValue {
544 fn hash<H: Hasher>(&self, state: &mut H) {
545 for (id, value, name, required) in self.iter() {
546 id.hash(state);
547 value.hash(state);
548 name.hash(state);
549 required.hash(state);
550 }
551 }
552}
553
554pub struct StructValueBuilder {
556 fields: HashMap<i32, Option<AnyValue>>,
557 type_info: Arc<Struct>,
558}
559
560impl StructValueBuilder {
561 pub fn new(type_info: Arc<Struct>) -> Self {
563 Self {
564 fields: HashMap::with_capacity(type_info.len()),
565 type_info,
566 }
567 }
568
569 pub fn add_field(&mut self, field_id: i32, field_value: Option<AnyValue>) -> Result<()> {
571 if self
573 .type_info
574 .lookup_field(field_id)
575 .ok_or_else(|| {
576 Error::new(
577 ErrorKind::IcebergDataInvalid,
578 format!("Field {} is not found", field_id),
579 )
580 })?
581 .required
582 && field_value.is_none()
583 {
584 return Err(Error::new(
585 ErrorKind::IcebergDataInvalid,
586 format!("Field {} is required", field_id),
587 ));
588 }
589 self.type_info.lookup_type(field_id).ok_or_else(|| {
591 Error::new(
592 ErrorKind::IcebergDataInvalid,
593 format!("Field {} is not found", field_id),
594 )
595 })?;
596 self.fields.insert(field_id, field_value);
600 Ok(())
601 }
602
603 pub fn build(mut self) -> Result<StructValue> {
605 let mut field_values = Vec::with_capacity(self.fields.len());
606 let mut null_bitmap = BitVec::with_capacity(self.fields.len());
607
608 for field in self.type_info.fields.iter() {
609 let field_value = self.fields.remove(&field.id).ok_or_else(|| {
610 Error::new(
611 ErrorKind::IcebergDataInvalid,
612 format!("Field {} is required", field.name),
613 )
614 })?;
615 if let Some(value) = field_value {
616 null_bitmap.push(false);
617 field_values.push(value);
618 } else {
619 null_bitmap.push(true);
620 field_values.push(AnyValue::Primitive(PrimitiveValue::Int(1)));
622 }
623 }
624
625 Ok(StructValue {
626 field_values,
627 type_info: self.type_info,
628 null_bitmap,
629 })
630 }
631}
632
633#[derive(Debug, PartialEq, Clone, Eq)]
639pub struct List {
640 pub element_id: i32,
642 pub element_required: bool,
644 pub element_type: Box<Any>,
646}
647
648#[derive(Debug, PartialEq, Clone, Eq)]
654pub struct Map {
655 pub key_id: i32,
657 pub key_type: Box<Any>,
659
660 pub value_id: i32,
662 pub value_required: bool,
664 pub value_type: Box<Any>,
666}
667
668#[derive(Debug, PartialEq, Clone)]
673pub struct Schema {
674 pub schema_id: i32,
676 pub identifier_field_ids: Option<Vec<i32>>,
679 r#struct: Struct,
681}
682
683impl Schema {
684 pub fn new(schema_id: i32, identifier_field_ids: Option<Vec<i32>>, r#struct: Struct) -> Self {
686 Schema {
687 schema_id,
688 identifier_field_ids,
689 r#struct,
690 }
691 }
692
693 pub fn fields(&self) -> &[FieldRef] {
695 self.r#struct.fields()
696 }
697
698 pub fn look_up_field_by_id(&self, field_id: i32) -> Option<&FieldRef> {
700 self.r#struct.lookup_field(field_id)
701 }
702}
703
704#[derive(Debug, PartialEq, Eq, Clone, Copy)]
717pub enum Transform {
718 Identity,
723 Bucket(i32),
741 Truncate(i32),
770 Year,
775 Month,
780 Day,
785 Hour,
790 Void,
799}
800
801impl Transform {
802 pub fn result_type(&self, input_type: &Any) -> Result<Any> {
803 let check_time = |input_type: &Any| {
804 if !matches!(
805 input_type,
806 Any::Primitive(Primitive::Date)
807 | Any::Primitive(Primitive::Timestamp)
808 | Any::Primitive(Primitive::Timestampz)
809 ) {
810 return Err(Error::new(
811 ErrorKind::IcebergDataInvalid,
812 format!("transform year type {input_type:?} is invalid"),
813 ));
814 }
815 Ok(())
816 };
817 let check_bucket = |input_type: &Any| {
818 if !matches!(
819 input_type,
820 Any::Primitive(Primitive::Int)
821 | Any::Primitive(Primitive::Long)
822 | Any::Primitive(Primitive::Decimal { .. })
823 | Any::Primitive(Primitive::Date)
824 | Any::Primitive(Primitive::Time)
825 | Any::Primitive(Primitive::Timestamp)
826 | Any::Primitive(Primitive::Timestampz)
827 | Any::Primitive(Primitive::String)
828 | Any::Primitive(Primitive::Uuid)
829 | Any::Primitive(Primitive::Fixed(_))
830 | Any::Primitive(Primitive::Binary)
831 ) {
832 return Err(Error::new(
833 ErrorKind::IcebergDataInvalid,
834 format!("transform bucket type {input_type:?} is invalid"),
835 ));
836 }
837 Ok(())
838 };
839 let check_truncate = |input_type: &Any| {
840 if !matches!(
841 input_type,
842 Any::Primitive(Primitive::Int)
843 | Any::Primitive(Primitive::Long)
844 | Any::Primitive(Primitive::Decimal { .. })
845 | Any::Primitive(Primitive::String)
846 ) {
847 return Err(Error::new(
848 ErrorKind::IcebergDataInvalid,
849 format!("transform truncate type {input_type:?} is invalid"),
850 ));
851 }
852 Ok(())
853 };
854 let check_hour = |input_type: &Any| {
855 if !matches!(
856 input_type,
857 Any::Primitive(Primitive::Timestamp) | Any::Primitive(Primitive::Timestampz)
858 ) {
859 return Err(Error::new(
860 ErrorKind::IcebergDataInvalid,
861 format!("transform hour type {input_type:?} is invalid"),
862 ));
863 }
864 Ok(())
865 };
866 match self {
867 Transform::Identity => Ok(input_type.clone()),
868 Transform::Void => Ok(Primitive::Int.into()),
869 Transform::Year => {
870 check_time(input_type)?;
871 Ok(Primitive::Int.into())
872 }
873 Transform::Month => {
874 check_time(input_type)?;
875 Ok(Primitive::Int.into())
876 }
877 Transform::Day => {
878 check_time(input_type)?;
879 Ok(Primitive::Int.into())
880 }
881 Transform::Hour => {
882 check_hour(input_type)?;
883 Ok(Primitive::Int.into())
884 }
885 Transform::Bucket(_) => {
886 check_bucket(input_type)?;
887 Ok(Primitive::Int.into())
888 }
889 Transform::Truncate(_) => {
890 check_truncate(input_type)?;
891 Ok(input_type.clone())
892 }
893 }
894 }
895}
896
897impl<'a> Display for &'a Transform {
898 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
899 match self {
900 Transform::Identity => write!(f, "identity"),
901 Transform::Year => write!(f, "year"),
902 Transform::Month => write!(f, "month"),
903 Transform::Day => write!(f, "day"),
904 Transform::Hour => write!(f, "hour"),
905 Transform::Void => write!(f, "void"),
906 Transform::Bucket(length) => write!(f, "bucket[{}]", length),
907 Transform::Truncate(width) => write!(f, "truncate[{}]", width),
908 }
909 }
910}
911
912impl FromStr for Transform {
913 type Err = Error;
914
915 fn from_str(s: &str) -> Result<Self> {
916 let t = match s {
917 "identity" => Transform::Identity,
918 "year" => Transform::Year,
919 "month" => Transform::Month,
920 "day" => Transform::Day,
921 "hour" => Transform::Hour,
922 "void" => Transform::Void,
923 v if v.starts_with("bucket") => {
924 let length = v
925 .strip_prefix("bucket")
926 .expect("transform must starts with `bucket`")
927 .trim_start_matches('[')
928 .trim_end_matches(']')
929 .parse()
930 .map_err(|err| {
931 Error::new(
932 ErrorKind::IcebergDataInvalid,
933 format!("transform bucket type {v:?} is invalid"),
934 )
935 .set_source(err)
936 })?;
937
938 Transform::Bucket(length)
939 }
940 v if v.starts_with("truncate") => {
941 let width = v
942 .strip_prefix("truncate")
943 .expect("transform must starts with `truncate`")
944 .trim_start_matches('[')
945 .trim_end_matches(']')
946 .parse()
947 .map_err(|err| {
948 Error::new(
949 ErrorKind::IcebergDataInvalid,
950 format!("transform truncate type {v:?} is invalid"),
951 )
952 .set_source(err)
953 })?;
954
955 Transform::Truncate(width)
956 }
957 v => {
958 return Err(Error::new(
959 ErrorKind::IcebergDataInvalid,
960 format!("transform {v:?} is invalid"),
961 ))
962 }
963 };
964
965 Ok(t)
966 }
967}
968
969#[derive(Debug, PartialEq, Eq, Clone)]
979pub struct PartitionSpec {
980 pub spec_id: i32,
982 pub fields: Vec<PartitionField>,
984}
985
986impl PartitionSpec {
987 pub(crate) fn partition_type(&self, schema: &Schema) -> Result<Struct> {
988 let mut fields = Vec::with_capacity(self.fields.len());
989 for partition_field in &self.fields {
990 let source_field = schema
991 .look_up_field_by_id(partition_field.source_column_id)
992 .ok_or_else(|| {
993 Error::new(
994 ErrorKind::IcebergDataInvalid,
995 format!(
996 "Can't find field id {} in schema",
997 partition_field.source_column_id
998 ),
999 )
1000 })?;
1001 let result_type = partition_field
1002 .transform
1003 .result_type(&source_field.field_type)?;
1004 fields.push(
1005 Field::optional(
1006 partition_field.partition_field_id,
1007 partition_field.name.as_str(),
1008 result_type,
1009 )
1010 .into(),
1011 );
1012 }
1013
1014 Ok(Struct::new(fields))
1015 }
1016
1017 pub fn column_ids(&self) -> Vec<i32> {
1018 self.fields
1019 .iter()
1020 .map(|field| field.source_column_id)
1021 .collect()
1022 }
1023
1024 pub fn is_unpartitioned(&self) -> bool {
1026 self.fields.is_empty()
1027 }
1028}
1029
1030#[derive(Debug, PartialEq, Eq, Clone)]
1032pub struct PartitionField {
1033 pub source_column_id: i32,
1035 pub partition_field_id: i32,
1040 pub transform: Transform,
1047 pub name: String,
1049}
1050
1051#[derive(Debug, PartialEq, Eq, Clone)]
1060pub struct SortOrder {
1061 pub order_id: i32,
1063 pub fields: Vec<SortField>,
1066}
1067
1068#[derive(Debug, PartialEq, Eq, Clone)]
1070pub struct SortField {
1071 pub source_column_id: i32,
1073 pub transform: Transform,
1080 pub direction: SortDirection,
1082 pub null_order: NullOrder,
1085}
1086
1087#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1089pub enum SortDirection {
1090 ASC,
1092 DESC,
1094}
1095
1096impl Display for SortDirection {
1097 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1098 match self {
1099 SortDirection::ASC => write!(f, "asc"),
1100 SortDirection::DESC => write!(f, "desc"),
1101 }
1102 }
1103}
1104
1105impl FromStr for SortDirection {
1106 type Err = Error;
1107
1108 fn from_str(s: &str) -> Result<Self> {
1109 match s {
1110 "asc" => Ok(SortDirection::ASC),
1111 "desc" => Ok(SortDirection::DESC),
1112 v => Err(Error::new(
1113 ErrorKind::IcebergDataInvalid,
1114 format!("sort direction {:?} is invalid", v),
1115 )),
1116 }
1117 }
1118}
1119
1120#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1123pub enum NullOrder {
1124 First,
1126 Last,
1128}
1129
1130impl Display for NullOrder {
1131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1132 match self {
1133 NullOrder::First => write!(f, "nulls-first"),
1134 NullOrder::Last => write!(f, "nulls-last"),
1135 }
1136 }
1137}
1138
1139impl FromStr for NullOrder {
1140 type Err = Error;
1141
1142 fn from_str(s: &str) -> Result<Self> {
1143 match s {
1144 "nulls-first" => Ok(NullOrder::First),
1145 "nulls-last" => Ok(NullOrder::Last),
1146 v => Err(Error::new(
1147 ErrorKind::IcebergDataInvalid,
1148 format!("null order {:?} is invalid", v),
1149 )),
1150 }
1151 }
1152}
1153
1154#[derive(Debug, PartialEq, Clone)]
1168pub struct ManifestList {
1169 pub entries: Vec<ManifestListEntry>,
1171}
1172
1173impl ManifestList {
1174 pub(crate) fn v2_schema() -> Schema {
1175 Schema::new(
1176 1,
1177 None,
1178 Struct::new(vec![
1179 manifest_list::MANIFEST_PATH.clone().into(),
1180 manifest_list::MANIFEST_LENGTH.clone().into(),
1181 manifest_list::PARTITION_SPEC_ID.clone().into(),
1182 manifest_list::CONTENT.clone().into(),
1183 manifest_list::SEQUENCE_NUMBER.clone().into(),
1184 manifest_list::MIN_SEQUENCE_NUMBER.clone().into(),
1185 manifest_list::ADDED_SNAPSHOT_ID.clone().into(),
1186 manifest_list::ADDED_FILES_COUNT.clone().into(),
1187 manifest_list::EXISTING_FILES_COUNT.clone().into(),
1188 manifest_list::DELETED_FILES_COUNT.clone().into(),
1189 manifest_list::ADDED_ROWS_COUNT.clone().into(),
1190 manifest_list::EXISTING_ROWS_COUNT.clone().into(),
1191 manifest_list::DELETED_ROWS_COUNT.clone().into(),
1192 manifest_list::PARTITIONS.clone().into(),
1193 manifest_list::KEY_METADATA.clone().into(),
1194 ]),
1195 )
1196 }
1197}
1198
1199#[derive(Debug, PartialEq, Clone)]
1201pub struct ManifestListEntry {
1202 pub manifest_path: String,
1206 pub manifest_length: i64,
1210 pub partition_spec_id: i32,
1215 pub content: ManifestContentType,
1220 pub sequence_number: i64,
1225 pub min_sequence_number: i64,
1230 pub added_snapshot_id: i64,
1234 pub added_data_files_count: i32,
1239 pub existing_data_files_count: i32,
1244 pub deleted_data_files_count: i32,
1249 pub added_rows_count: i64,
1254 pub existing_rows_count: i64,
1259 pub deleted_rows_count: i64,
1264 pub partitions: Option<Vec<FieldSummary>>,
1271 pub key_metadata: Option<Vec<u8>>,
1275}
1276
1277mod manifest_list {
1278 use super::*;
1279 use once_cell::sync::Lazy;
1280 pub static MANIFEST_PATH: Lazy<Field> =
1281 Lazy::new(|| Field::required(500, "manifest_path", Any::Primitive(Primitive::String)));
1282 pub static MANIFEST_LENGTH: Lazy<Field> =
1283 Lazy::new(|| Field::required(501, "manifest_length", Any::Primitive(Primitive::Long)));
1284 pub static PARTITION_SPEC_ID: Lazy<Field> =
1285 Lazy::new(|| Field::required(502, "partition_spec_id", Any::Primitive(Primitive::Int)));
1286 pub static CONTENT: Lazy<Field> =
1287 Lazy::new(|| Field::required(517, "content", Any::Primitive(Primitive::Int)));
1288 pub static SEQUENCE_NUMBER: Lazy<Field> =
1289 Lazy::new(|| Field::required(515, "sequence_number", Any::Primitive(Primitive::Long)));
1290 pub static MIN_SEQUENCE_NUMBER: Lazy<Field> =
1291 Lazy::new(|| Field::required(516, "min_sequence_number", Any::Primitive(Primitive::Long)));
1292 pub static ADDED_SNAPSHOT_ID: Lazy<Field> =
1293 Lazy::new(|| Field::required(503, "added_snapshot_id", Any::Primitive(Primitive::Long)));
1294 pub static ADDED_FILES_COUNT: Lazy<Field> = Lazy::new(|| {
1295 Field::required(
1296 504,
1297 "added_data_files_count",
1298 Any::Primitive(Primitive::Int),
1299 )
1300 });
1301 pub static EXISTING_FILES_COUNT: Lazy<Field> = Lazy::new(|| {
1302 Field::required(
1303 505,
1304 "existing_data_files_count",
1305 Any::Primitive(Primitive::Int),
1306 )
1307 });
1308 pub static DELETED_FILES_COUNT: Lazy<Field> = Lazy::new(|| {
1309 Field::required(
1310 506,
1311 "deleted_data_files_count",
1312 Any::Primitive(Primitive::Int),
1313 )
1314 });
1315 pub static ADDED_ROWS_COUNT: Lazy<Field> =
1316 Lazy::new(|| Field::required(512, "added_rows_count", Any::Primitive(Primitive::Long)));
1317 pub static EXISTING_ROWS_COUNT: Lazy<Field> =
1318 Lazy::new(|| Field::required(513, "existing_rows_count", Any::Primitive(Primitive::Long)));
1319 pub static DELETED_ROWS_COUNT: Lazy<Field> =
1320 Lazy::new(|| Field::required(514, "deleted_rows_count", Any::Primitive(Primitive::Long)));
1321 pub static PARTITIONS: Lazy<Field> = Lazy::new(|| {
1322 Field::optional(
1323 507,
1324 "partitions",
1325 Any::List(List {
1326 element_id: 508,
1327 element_required: true,
1328 element_type: Box::new(Any::Struct(
1329 Struct::new(vec![
1330 Field::required(509, "contains_null", Any::Primitive(Primitive::Boolean))
1331 .into(),
1332 Field::optional(518, "contains_nan", Any::Primitive(Primitive::Boolean))
1333 .into(),
1334 Field::optional(510, "lower_bound", Any::Primitive(Primitive::Binary))
1335 .into(),
1336 Field::optional(511, "upper_bound", Any::Primitive(Primitive::Binary))
1337 .into(),
1338 ])
1339 .into(),
1340 )),
1341 }),
1342 )
1343 });
1344 pub static KEY_METADATA: Lazy<Field> =
1345 Lazy::new(|| Field::optional(519, "key_metadata", Any::Primitive(Primitive::Binary)));
1346}
1347
1348#[derive(Debug, PartialEq, Eq, Clone)]
1354pub struct FieldSummary {
1355 pub contains_null: bool,
1360 pub contains_nan: Option<bool>,
1364 pub lower_bound: Option<Vec<u8>>,
1368 pub upper_bound: Option<Vec<u8>>,
1372}
1373
1374#[derive(Debug, PartialEq, Clone)]
1378pub struct ManifestEntry {
1379 pub status: ManifestStatus,
1383 pub snapshot_id: Option<i64>,
1388 pub sequence_number: Option<i64>,
1393 pub file_sequence_number: Option<i64>,
1398 pub data_file: DataFile,
1402}
1403
1404impl ManifestEntry {
1405 pub fn is_alive(&self) -> bool {
1407 matches!(
1408 self.status,
1409 ManifestStatus::Added | ManifestStatus::Existing
1410 )
1411 }
1412}
1413
1414mod manifest_file {
1415 use super::*;
1416 use once_cell::sync::Lazy;
1417 pub static STATUS: Lazy<Field> =
1418 Lazy::new(|| Field::required(0, "status", Any::Primitive(Primitive::Int)));
1419 pub static SNAPSHOT_ID: Lazy<Field> =
1420 Lazy::new(|| Field::optional(1, "snapshot_id", Any::Primitive(Primitive::Long)));
1421 pub static SEQUENCE_NUMBER: Lazy<Field> =
1422 Lazy::new(|| Field::optional(3, "sequence_number", Any::Primitive(Primitive::Long)));
1423 pub static FILE_SEQUENCE_NUMBER: Lazy<Field> =
1424 Lazy::new(|| Field::optional(4, "file_sequence_number", Any::Primitive(Primitive::Long)));
1425
1426 pub const DATA_FILE_ID: i32 = 2;
1427 pub const DATA_FILE_NAME: &str = "data_file";
1428}
1429
1430#[derive(Debug, PartialEq, Clone)]
1432pub struct ManifestMetadata {
1433 pub schema: Schema,
1436 pub schema_id: i32,
1438
1439 pub partition_spec: PartitionSpec,
1441
1442 pub format_version: Option<TableFormatVersion>,
1444 pub content: ManifestContentType,
1446}
1447
1448#[derive(Debug, PartialEq, Clone)]
1450pub struct ManifestFile {
1451 pub metadata: ManifestMetadata,
1453 pub entries: Vec<ManifestEntry>,
1455}
1456
1457impl ManifestFile {
1458 pub(crate) fn v2_schema(partition_type: Struct) -> Schema {
1459 Schema::new(
1460 0,
1461 None,
1462 Struct::new(vec![
1463 manifest_file::STATUS.clone().into(),
1464 manifest_file::SNAPSHOT_ID.clone().into(),
1465 manifest_file::SEQUENCE_NUMBER.clone().into(),
1466 manifest_file::FILE_SEQUENCE_NUMBER.clone().into(),
1467 Field::required(
1468 manifest_file::DATA_FILE_ID,
1469 manifest_file::DATA_FILE_NAME,
1470 Any::Struct(
1471 Struct::new(vec![
1472 datafile::CONTENT.clone().with_required().into(),
1473 datafile::FILE_PATH.clone().into(),
1474 datafile::FILE_FORMAT.clone().into(),
1475 DataFile::partition_field(partition_type).into(),
1476 datafile::RECORD_COUNT.clone().into(),
1477 datafile::FILE_SIZE.clone().into(),
1478 datafile::COLUMN_SIZES.clone().into(),
1479 datafile::VALUE_COUNTS.clone().into(),
1480 datafile::NULL_VALUE_COUNTS.clone().into(),
1481 datafile::NAN_VALUE_COUNTS.clone().into(),
1482 datafile::LOWER_BOUNDS.clone().into(),
1483 datafile::UPPER_BOUNDS.clone().into(),
1484 datafile::KEY_METADATA.clone().into(),
1485 datafile::SPLIT_OFFSETS.clone().into(),
1486 datafile::EQUALITY_IDS.clone().into(),
1487 datafile::SORT_ORDER_ID.clone().into(),
1488 ])
1489 .into(),
1490 ),
1491 )
1492 .into(),
1493 ]),
1494 )
1495 }
1496}
1497
1498#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1500pub enum ManifestContentType {
1501 Data = 0,
1503 Deletes = 1,
1505}
1506
1507impl Display for ManifestContentType {
1508 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1509 match self {
1510 ManifestContentType::Data => write!(f, "data"),
1511 ManifestContentType::Deletes => write!(f, "deletes"),
1512 }
1513 }
1514}
1515
1516impl FromStr for ManifestContentType {
1517 type Err = Error;
1518
1519 fn from_str(s: &str) -> Result<Self> {
1520 match s {
1521 "data" => Ok(ManifestContentType::Data),
1522 "deletes" => Ok(ManifestContentType::Deletes),
1523 _ => Err(Error::new(
1524 ErrorKind::IcebergDataInvalid,
1525 format!("Invalid manifest content type: {s}"),
1526 )),
1527 }
1528 }
1529}
1530
1531impl TryFrom<u8> for ManifestContentType {
1532 type Error = Error;
1533
1534 fn try_from(v: u8) -> Result<ManifestContentType> {
1535 match v {
1536 0 => Ok(ManifestContentType::Data),
1537 1 => Ok(ManifestContentType::Deletes),
1538 _ => Err(Error::new(
1539 ErrorKind::IcebergDataInvalid,
1540 format!("manifest content type {} is invalid", v),
1541 )),
1542 }
1543 }
1544}
1545
1546#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1548pub enum ManifestStatus {
1549 Existing = 0,
1551 Added = 1,
1553 Deleted = 2,
1557}
1558
1559impl TryFrom<u8> for ManifestStatus {
1560 type Error = Error;
1561
1562 fn try_from(v: u8) -> Result<ManifestStatus> {
1563 match v {
1564 0 => Ok(ManifestStatus::Existing),
1565 1 => Ok(ManifestStatus::Added),
1566 2 => Ok(ManifestStatus::Deleted),
1567 _ => Err(Error::new(
1568 ErrorKind::IcebergDataInvalid,
1569 format!("manifest status {} is invalid", v),
1570 )),
1571 }
1572 }
1573}
1574
1575pub struct DataFileBuilder {
1577 meta_data: FileMetaData,
1578 file_location: String,
1579 written_size: u64,
1580 table_location: String,
1581 content: Option<DataContentType>,
1582 partition_value: Option<StructValue>,
1583 equality_ids: Option<Vec<i32>>,
1584}
1585
1586impl DataFileBuilder {
1587 pub fn new(
1589 meta_data: FileMetaData,
1590 table_location: String,
1591 file_location: String,
1592 written_size: u64,
1593 ) -> Self {
1594 Self {
1595 meta_data,
1596 file_location,
1597 written_size,
1598 table_location,
1599 content: None,
1600 partition_value: None,
1601 equality_ids: None,
1602 }
1603 }
1604
1605 pub fn with_content(self, content: DataContentType) -> Self {
1608 Self {
1609 content: Some(content),
1610 ..self
1611 }
1612 }
1613
1614 pub fn with_partition_value(self, value: Option<StructValue>) -> Self {
1616 Self {
1617 partition_value: value,
1618 ..self
1619 }
1620 }
1621
1622 pub fn with_equality_ids(self, ids: Vec<i32>) -> Self {
1624 Self {
1625 equality_ids: Some(ids),
1626 ..self
1627 }
1628 }
1629
1630 pub fn build(self) -> DataFile {
1632 log::info!("{:?}", self.meta_data);
1633 let (column_sizes, value_counts, null_value_counts, distinct_counts) = {
1634 let mut per_col_size: HashMap<i32, _> = HashMap::new();
1636 let mut per_col_val_num: HashMap<i32, _> = HashMap::new();
1637 let mut per_col_null_val_num: HashMap<i32, _> = HashMap::new();
1638 let mut per_col_distinct_val_num: HashMap<i32, _> = HashMap::new();
1639 self.meta_data.row_groups.iter().for_each(|group| {
1640 group
1641 .columns
1642 .iter()
1643 .enumerate()
1644 .for_each(|(column_id, column_chunk)| {
1645 if let Some(column_chunk_metadata) = &column_chunk.meta_data {
1646 *per_col_size.entry(column_id as i32).or_insert(0) +=
1647 column_chunk_metadata.total_compressed_size;
1648 *per_col_val_num.entry(column_id as i32).or_insert(0) +=
1649 column_chunk_metadata.num_values;
1650 *per_col_null_val_num
1651 .entry(column_id as i32)
1652 .or_insert(0_i64) += column_chunk_metadata
1653 .statistics
1654 .as_ref()
1655 .map(|s| s.null_count)
1656 .unwrap_or(None)
1657 .unwrap_or(0);
1658 *per_col_distinct_val_num
1659 .entry(column_id as i32)
1660 .or_insert(0_i64) += column_chunk_metadata
1661 .statistics
1662 .as_ref()
1663 .map(|s| s.distinct_count)
1664 .unwrap_or(None)
1665 .unwrap_or(0);
1666 }
1667 })
1668 });
1669 (
1670 per_col_size,
1671 per_col_val_num,
1672 per_col_null_val_num,
1673 per_col_distinct_val_num,
1674 )
1675 };
1676
1677 if self.content.unwrap() == DataContentType::EqualityDeletes {
1679 assert!(self.equality_ids.is_some());
1680 }
1681
1682 DataFile {
1683 content: self.content.unwrap(),
1684 file_path: format!("{}/{}", self.table_location, self.file_location),
1685 file_format: crate::types::DataFileFormat::Parquet,
1686 partition: self.partition_value.unwrap_or_default(),
1689 record_count: self.meta_data.num_rows,
1690 column_sizes: Some(column_sizes),
1691 value_counts: Some(value_counts),
1692 null_value_counts: Some(null_value_counts),
1693 distinct_counts: Some(distinct_counts),
1694 key_metadata: self.meta_data.footer_signing_key_metadata,
1695 file_size_in_bytes: self.written_size as i64,
1696 split_offsets: Some(
1704 self.meta_data
1705 .row_groups
1706 .iter()
1707 .filter_map(|group| group.file_offset)
1708 .collect(),
1709 ),
1710 nan_value_counts: None,
1711 lower_bounds: None,
1712 upper_bounds: None,
1713 equality_ids: self.equality_ids,
1714 sort_order_id: None,
1715 }
1716 }
1717}
1718
1719#[derive(Debug, PartialEq, Clone, Builder)]
1721#[builder(name = "DataFileBuilderV2", setter(prefix = "with"))]
1722pub struct DataFile {
1723 pub content: DataContentType,
1728 pub file_path: String,
1732 pub file_format: DataFileFormat,
1736 pub partition: StructValue,
1741 pub record_count: i64,
1745 pub file_size_in_bytes: i64,
1749 #[builder(setter(strip_option), default)]
1757 pub column_sizes: Option<HashMap<i32, i64>>,
1758 #[builder(setter(strip_option), default)]
1765 pub value_counts: Option<HashMap<i32, i64>>,
1766 #[builder(setter(strip_option), default)]
1772 pub null_value_counts: Option<HashMap<i32, i64>>,
1773 #[builder(setter(strip_option), default)]
1779 pub nan_value_counts: Option<HashMap<i32, i64>>,
1780 #[builder(setter(strip_option), default)]
1789 pub distinct_counts: Option<HashMap<i32, i64>>,
1790 #[builder(setter(strip_option), default)]
1802 pub lower_bounds: Option<HashMap<i32, Vec<u8>>>,
1803 #[builder(setter(strip_option), default)]
1815 pub upper_bounds: Option<HashMap<i32, Vec<u8>>>,
1816 pub key_metadata: Option<Vec<u8>>,
1820 #[builder(setter(strip_option), default)]
1826 pub split_offsets: Option<Vec<i64>>,
1827 #[builder(setter(strip_option), default)]
1835 pub equality_ids: Option<Vec<i32>>,
1836 #[builder(setter(strip_option), default)]
1847 pub sort_order_id: Option<i32>,
1848}
1849
1850mod datafile {
1851 use super::*;
1852 use once_cell::sync::Lazy;
1853 pub static CONTENT: Lazy<Field> = Lazy::new(|| {
1854 Field::optional(134, "content", Any::Primitive(Primitive::Int))
1855 .with_comment("Contents of the file: 0=data, 1=position deletes, 2=equality deletes")
1856 });
1857 pub static FILE_PATH: Lazy<Field> = Lazy::new(|| {
1858 Field::required(100, "file_path", Any::Primitive(Primitive::String))
1859 .with_comment("Location URI with FS scheme")
1860 });
1861 pub static FILE_FORMAT: Lazy<Field> = Lazy::new(|| {
1862 Field::required(101, "file_format", Any::Primitive(Primitive::String))
1863 .with_comment("File format name: avro, orc, or parquet")
1864 });
1865 pub static RECORD_COUNT: Lazy<Field> = Lazy::new(|| {
1866 Field::required(103, "record_count", Any::Primitive(Primitive::Long))
1867 .with_comment("Number of records in the file")
1868 });
1869 pub static FILE_SIZE: Lazy<Field> = Lazy::new(|| {
1870 Field::required(104, "file_size_in_bytes", Any::Primitive(Primitive::Long))
1871 .with_comment("Total file size in bytes")
1872 });
1873 pub static COLUMN_SIZES: Lazy<Field> = Lazy::new(|| {
1874 Field::optional(
1875 108,
1876 "column_sizes",
1877 Any::Map(Map {
1878 key_id: 117,
1879 key_type: Box::new(Any::Primitive(Primitive::Int)),
1880 value_id: 118,
1881 value_type: Box::new(Any::Primitive(Primitive::Long)),
1882 value_required: true,
1883 }),
1884 )
1885 .with_comment("Map of column id to total size on disk")
1886 });
1887 pub static VALUE_COUNTS: Lazy<Field> = Lazy::new(|| {
1888 Field::optional(
1889 109,
1890 "value_counts",
1891 Any::Map(Map {
1892 key_id: 119,
1893 key_type: Box::new(Any::Primitive(Primitive::Int)),
1894 value_id: 120,
1895 value_type: Box::new(Any::Primitive(Primitive::Long)),
1896 value_required: true,
1897 }),
1898 )
1899 .with_comment("Map of column id to total count, including null and NaN")
1900 });
1901 pub static NULL_VALUE_COUNTS: Lazy<Field> = Lazy::new(|| {
1902 Field::optional(
1903 110,
1904 "null_value_counts",
1905 Any::Map(Map {
1906 key_id: 121,
1907 key_type: Box::new(Any::Primitive(Primitive::Int)),
1908 value_id: 122,
1909 value_type: Box::new(Any::Primitive(Primitive::Long)),
1910 value_required: true,
1911 }),
1912 )
1913 .with_comment("Map of column id to null value count")
1914 });
1915 pub static NAN_VALUE_COUNTS: Lazy<Field> = Lazy::new(|| {
1916 Field::optional(
1917 137,
1918 "nan_value_counts",
1919 Any::Map(Map {
1920 key_id: 138,
1921 key_type: Box::new(Any::Primitive(Primitive::Int)),
1922 value_id: 139,
1923 value_type: Box::new(Any::Primitive(Primitive::Long)),
1924 value_required: true,
1925 }),
1926 )
1927 .with_comment("Map of column id to number of NaN values in the column")
1928 });
1929 pub static LOWER_BOUNDS: Lazy<Field> = Lazy::new(|| {
1930 Field::optional(
1931 125,
1932 "lower_bounds",
1933 Any::Map(Map {
1934 key_id: 126,
1935 key_type: Box::new(Any::Primitive(Primitive::Int)),
1936 value_id: 127,
1937 value_type: Box::new(Any::Primitive(Primitive::Binary)),
1938 value_required: true,
1939 }),
1940 )
1941 .with_comment("Map of column id to lower bound")
1942 });
1943 pub static UPPER_BOUNDS: Lazy<Field> = Lazy::new(|| {
1944 Field::optional(
1945 128,
1946 "upper_bounds",
1947 Any::Map(Map {
1948 key_id: 129,
1949 key_type: Box::new(Any::Primitive(Primitive::Int)),
1950 value_id: 130,
1951 value_type: Box::new(Any::Primitive(Primitive::Binary)),
1952 value_required: true,
1953 }),
1954 )
1955 .with_comment("Map of column id to upper bound")
1956 });
1957 pub static KEY_METADATA: Lazy<Field> = Lazy::new(|| {
1958 Field::optional(131, "key_metadata", Any::Primitive(Primitive::Binary))
1959 .with_comment("Encryption key metadata blob")
1960 });
1961 pub static SPLIT_OFFSETS: Lazy<Field> = Lazy::new(|| {
1962 Field::optional(
1963 132,
1964 "split_offsets",
1965 Any::List(List {
1966 element_id: 133,
1967 element_required: true,
1968 element_type: Box::new(Any::Primitive(Primitive::Long)),
1969 }),
1970 )
1971 .with_comment("Splittable offsets")
1972 });
1973 pub static EQUALITY_IDS: Lazy<Field> = Lazy::new(|| {
1974 Field::optional(
1975 135,
1976 "equality_ids",
1977 Any::List(List {
1978 element_id: 136,
1979 element_required: true,
1980 element_type: Box::new(Any::Primitive(Primitive::Int)),
1981 }),
1982 )
1983 .with_comment("Equality comparison field IDs")
1984 });
1985 pub static SORT_ORDER_ID: Lazy<Field> = Lazy::new(|| {
1986 Field::optional(140, "sort_order_id", Any::Primitive(Primitive::Int))
1987 .with_comment("Sort order ID")
1988 });
1989}
1990
1991impl DataFile {
1992 pub(crate) fn partition_field(partition_type: Struct) -> Field {
1993 Field::required(102, "partition", Any::Struct(partition_type.into()))
1994 .with_comment("Partition data tuple, schema based on the partition spec")
1995 }
1996
1997 #[cfg(test)]
1998 pub(crate) fn new(
1999 content: DataContentType,
2000 file_path: impl Into<String>,
2001 file_format: DataFileFormat,
2002 record_count: i64,
2003 file_size_in_bytes: i64,
2004 ) -> Self {
2005 Self {
2006 content,
2007 file_path: file_path.into(),
2008 file_format,
2009 partition: StructValue::default(),
2011 record_count,
2012 file_size_in_bytes,
2013 column_sizes: None,
2014 value_counts: None,
2015 null_value_counts: None,
2016 nan_value_counts: None,
2017 distinct_counts: None,
2018 lower_bounds: None,
2019 upper_bounds: None,
2020 key_metadata: None,
2021 split_offsets: None,
2022 equality_ids: None,
2023 sort_order_id: None,
2024 }
2025 }
2026}
2027
2028#[derive(Debug, PartialEq, Eq, Clone, Copy)]
2031pub enum DataContentType {
2032 Data = 0,
2034 PositionDeletes = 1,
2036 EqualityDeletes = 2,
2038}
2039
2040impl TryFrom<u8> for DataContentType {
2041 type Error = Error;
2042
2043 fn try_from(v: u8) -> Result<DataContentType> {
2044 match v {
2045 0 => Ok(DataContentType::Data),
2046 1 => Ok(DataContentType::PositionDeletes),
2047 2 => Ok(DataContentType::EqualityDeletes),
2048 _ => Err(Error::new(
2049 ErrorKind::IcebergDataInvalid,
2050 format!("data content type {} is invalid", v),
2051 )),
2052 }
2053 }
2054}
2055
2056#[derive(Debug, PartialEq, Eq, Clone, Copy)]
2058pub enum DataFileFormat {
2059 Avro,
2061 Orc,
2063 Parquet,
2065}
2066
2067impl FromStr for DataFileFormat {
2068 type Err = Error;
2069
2070 fn from_str(s: &str) -> Result<Self> {
2071 match s.to_lowercase().as_str() {
2072 "avro" => Ok(Self::Avro),
2073 "orc" => Ok(Self::Orc),
2074 "parquet" => Ok(Self::Parquet),
2075 _ => Err(Error::new(
2076 ErrorKind::IcebergFeatureUnsupported,
2077 format!("Unsupported data file format: {}", s),
2078 )),
2079 }
2080 }
2081}
2082
2083impl Display for DataFileFormat {
2084 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2085 match self {
2086 DataFileFormat::Avro => f.write_str("avro"),
2087 DataFileFormat::Orc => f.write_str("orc"),
2088 DataFileFormat::Parquet => f.write_str("parquet"),
2089 }
2090 }
2091}
2092
2093#[derive(Debug, PartialEq, Eq, Clone, Default)]
2095pub struct Snapshot {
2096 pub snapshot_id: i64,
2098 pub parent_snapshot_id: Option<i64>,
2101 pub sequence_number: i64,
2104 pub timestamp_ms: i64,
2107 pub manifest_list: String,
2110 pub summary: HashMap<String, String>,
2141 pub schema_id: Option<i64>,
2143}
2144
2145impl Snapshot {
2146 pub(crate) async fn load_manifest_list(&self, op: &Operator) -> Result<ManifestList> {
2147 parse_manifest_list(
2148 &op.read(Table::relative_path(op, self.manifest_list.as_str())?.as_str())
2149 .await?
2150 .to_vec(),
2151 )
2152 }
2153
2154 pub(crate) fn log(&self) -> SnapshotLog {
2155 SnapshotLog {
2156 timestamp_ms: self.timestamp_ms,
2157 snapshot_id: self.snapshot_id,
2158 }
2159 }
2160}
2161
2162#[derive(Default)]
2163pub struct SnapshotSummaryBuilder {
2164 added_data_files: i64,
2165 added_delete_files: i64,
2166 added_equality_delete_files: i64,
2167 added_position_delete_files: i64,
2168
2169 added_data_records: i64,
2170 added_position_deletes_records: i64,
2171 added_equality_deletes_records: i64,
2172
2173 added_files_size: i64,
2174}
2175
2176impl SnapshotSummaryBuilder {
2177 const OPERATION: &'static str = "operation";
2178 const ADDED_DATA_FILES: &'static str = "added-data-files";
2179 const TOTAL_DATA_FILES: &'static str = "total-data-files";
2180 const ADDED_DELETE_FILES: &'static str = "added-delete-files";
2181 const ADDED_EQUALITY_DELETE_FILES: &'static str = "added-equality-delete-files";
2182 const ADDED_POSITION_DELETE_FILES: &'static str = "added-position-delete-files";
2183 const TOTAL_DELETE_FILES: &'static str = "total-delete-files";
2184 const ADDED_RECORDS: &'static str = "added-records";
2185 const TOTAL_RECORDS: &'static str = "total-records";
2186 const ADDED_POSITION_DELETES: &'static str = "added-position-deletes";
2187 const TOTAL_POSITION_DELETES: &'static str = "total-position-deletes";
2188 const ADDED_EQUALITY_DELETES: &'static str = "added-equality-deletes";
2189 const TOTAL_EQUALITY_DELETES: &'static str = "total-equality-deletes";
2190 const ADDED_FILES_SIZE: &'static str = "added-files-size";
2191 const TOTAL_FILES_SIZE: &'static str = "total-files-size";
2192
2193 pub fn new() -> Self {
2194 Default::default()
2195 }
2196
2197 pub fn add(&mut self, datafile: &DataFile) {
2198 match datafile.content {
2199 DataContentType::Data => {
2200 self.added_data_files += 1;
2201 self.added_data_records += datafile.record_count;
2202 self.added_files_size += datafile.file_size_in_bytes;
2203 }
2204 DataContentType::PositionDeletes => {
2205 self.added_delete_files += 1;
2206 self.added_position_delete_files += 1;
2207 self.added_position_deletes_records += datafile.record_count;
2208 self.added_files_size += datafile.file_size_in_bytes;
2209 }
2210 DataContentType::EqualityDeletes => {
2211 self.added_delete_files += 1;
2212 self.added_equality_delete_files += 1;
2213 self.added_equality_deletes_records += datafile.record_count;
2214 self.added_files_size += datafile.file_size_in_bytes;
2215 }
2216 }
2217 }
2218
2219 fn operation(&self) -> String {
2220 if self.added_delete_files == 0 && self.added_data_files != 0 {
2221 "append".to_string()
2222 } else if self.added_delete_files != 0 && self.added_data_files != 0 {
2223 "overwrite".to_string()
2224 } else if self.added_delete_files != 0 && self.added_data_files == 0 {
2225 "delete".to_string()
2226 } else {
2227 "append".to_string()
2228 }
2229 }
2230
2231 pub fn merge(
2232 self,
2233 last_summary: &HashMap<String, String>,
2234 is_compact_op: bool,
2235 ) -> Result<HashMap<String, String>> {
2236 let operation = if is_compact_op {
2237 "replace".to_string()
2238 } else {
2239 self.operation()
2240 };
2241
2242 #[inline]
2243 fn get_i64(value: &HashMap<String, String>, key: &str) -> std::result::Result<i64, Error> {
2244 Ok(value
2245 .get(key)
2246 .map(|val| val.parse::<i64>())
2247 .transpose()?
2248 .unwrap_or_default())
2249 }
2250
2251 let added_data_files = self.added_data_files;
2252 let total_data_files = {
2253 let total_data_files = get_i64(last_summary, Self::TOTAL_DATA_FILES)?;
2254 total_data_files + self.added_data_files
2255 };
2256 let added_delete_files = self.added_delete_files;
2257 let added_equality_delete_files = self.added_equality_delete_files;
2258 let added_position_delete_files = self.added_position_delete_files;
2259 let total_delete_files = {
2260 let total_delete_files = get_i64(last_summary, Self::TOTAL_DELETE_FILES)?;
2261 total_delete_files + self.added_delete_files
2262 };
2263 let added_records = self.added_data_records;
2264 let total_records = {
2265 let total_records = get_i64(last_summary, Self::TOTAL_RECORDS)?;
2266 total_records + self.added_data_records
2267 };
2268 let added_position_deletes = self.added_position_deletes_records;
2269 let total_position_deletes = {
2270 let total_position_deletes = get_i64(last_summary, Self::TOTAL_POSITION_DELETES)?;
2271 total_position_deletes + self.added_position_deletes_records
2272 };
2273 let added_equality_deletes = self.added_equality_deletes_records;
2274 let total_equality_deletes = {
2275 let total_equality_deletes = get_i64(last_summary, Self::TOTAL_EQUALITY_DELETES)?;
2276 total_equality_deletes + self.added_equality_deletes_records
2277 };
2278 let added_files_size = self.added_files_size;
2279 let total_files_size = {
2280 let total_files_size = get_i64(last_summary, Self::TOTAL_FILES_SIZE)?;
2281 total_files_size + self.added_files_size
2282 };
2283
2284 let mut m = HashMap::with_capacity(16);
2285 m.insert(Self::OPERATION.to_string(), operation);
2286 m.insert(
2287 Self::ADDED_DATA_FILES.to_string(),
2288 added_data_files.to_string(),
2289 );
2290 m.insert(
2291 Self::TOTAL_DATA_FILES.to_string(),
2292 total_data_files.to_string(),
2293 );
2294 m.insert(
2295 Self::ADDED_DELETE_FILES.to_string(),
2296 added_delete_files.to_string(),
2297 );
2298 m.insert(
2299 Self::ADDED_EQUALITY_DELETE_FILES.to_string(),
2300 added_equality_delete_files.to_string(),
2301 );
2302 m.insert(
2303 Self::ADDED_POSITION_DELETE_FILES.to_string(),
2304 added_position_delete_files.to_string(),
2305 );
2306 m.insert(
2307 Self::TOTAL_DELETE_FILES.to_string(),
2308 total_delete_files.to_string(),
2309 );
2310 m.insert(Self::ADDED_RECORDS.to_string(), added_records.to_string());
2311 m.insert(Self::TOTAL_RECORDS.to_string(), total_records.to_string());
2312 m.insert(
2313 Self::ADDED_POSITION_DELETES.to_string(),
2314 added_position_deletes.to_string(),
2315 );
2316 m.insert(
2317 Self::TOTAL_POSITION_DELETES.to_string(),
2318 total_position_deletes.to_string(),
2319 );
2320 m.insert(
2321 Self::ADDED_EQUALITY_DELETES.to_string(),
2322 added_equality_deletes.to_string(),
2323 );
2324 m.insert(
2325 Self::TOTAL_EQUALITY_DELETES.to_string(),
2326 total_equality_deletes.to_string(),
2327 );
2328 m.insert(
2329 Self::ADDED_FILES_SIZE.to_string(),
2330 added_files_size.to_string(),
2331 );
2332 m.insert(
2333 Self::TOTAL_FILES_SIZE.to_string(),
2334 total_files_size.to_string(),
2335 );
2336
2337 Ok(m)
2338 }
2339}
2340
2341#[derive(Debug, PartialEq, Eq, Clone, Copy)]
2344pub struct SnapshotLog {
2345 pub timestamp_ms: i64,
2349 pub snapshot_id: i64,
2351}
2352
2353#[derive(Debug, PartialEq, Eq, Clone)]
2360pub struct SnapshotReference {
2361 pub snapshot_id: i64,
2364 pub typ: SnapshotReferenceType,
2366 pub min_snapshots_to_keep: Option<i32>,
2373 pub max_snapshot_age_ms: Option<i64>,
2380 pub max_ref_age_ms: Option<i64>,
2389}
2390
2391impl SnapshotReference {
2392 pub(crate) fn new(snapshot_id: i64, typ: SnapshotReferenceType) -> Self {
2393 Self {
2394 snapshot_id,
2395 typ,
2396 min_snapshots_to_keep: None,
2397 max_snapshot_age_ms: None,
2398 max_ref_age_ms: None,
2399 }
2400 }
2401}
2402
2403#[derive(Debug, PartialEq, Eq, Clone, Copy)]
2405pub enum SnapshotReferenceType {
2406 Tag,
2408 Branch,
2410}
2411
2412impl Display for SnapshotReferenceType {
2413 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2414 match self {
2415 SnapshotReferenceType::Tag => f.write_str("tag"),
2416 SnapshotReferenceType::Branch => f.write_str("branch"),
2417 }
2418 }
2419}
2420
2421impl FromStr for SnapshotReferenceType {
2422 type Err = Error;
2423
2424 fn from_str(s: &str) -> Result<Self> {
2425 match s {
2426 "tag" => Ok(SnapshotReferenceType::Tag),
2427 "branch" => Ok(SnapshotReferenceType::Branch),
2428 _ => Err(Error::new(
2429 ErrorKind::IcebergDataInvalid,
2430 format!("Invalid snapshot reference type: {s}"),
2431 )),
2432 }
2433 }
2434}
2435
2436#[derive(Debug, PartialEq, Eq, Clone)]
2439pub struct MetadataLog {
2440 pub timestamp_ms: i64,
2442 pub metadata_file: String,
2444}
2445
2446#[derive(Debug, PartialEq, Clone)]
2454pub struct TableMetadata {
2455 pub format_version: TableFormatVersion,
2459 pub table_uuid: String,
2463 pub location: String,
2467 pub last_sequence_number: i64,
2470 pub last_updated_ms: i64,
2474 pub last_column_id: i32,
2479 pub schemas: Vec<Schema>,
2481 pub current_schema_id: i32,
2483 pub partition_specs: Vec<PartitionSpec>,
2485 pub default_spec_id: i32,
2487 pub last_partition_id: i32,
2491 pub properties: Option<HashMap<String, String>>,
2498 pub current_snapshot_id: Option<i64>,
2501 pub snapshots: Option<Vec<Snapshot>>,
2507 pub snapshot_log: Option<Vec<SnapshotLog>>,
2515 pub metadata_log: Option<Vec<MetadataLog>>,
2523 pub sort_orders: Vec<SortOrder>,
2525 pub default_sort_order_id: i32,
2530 pub refs: HashMap<String, SnapshotReference>,
2538}
2539
2540impl TableMetadata {
2541 pub fn current_partition_spec(&self) -> Result<&PartitionSpec> {
2543 self.partition_spec(self.default_spec_id).ok_or_else(|| {
2544 Error::new(
2545 ErrorKind::IcebergDataInvalid,
2546 format!("Partition spec id {} not found!", self.default_spec_id),
2547 )
2548 })
2549 }
2550
2551 pub fn partition_spec(&self, spec_id: i32) -> Option<&PartitionSpec> {
2553 self.partition_specs.iter().find(|p| p.spec_id == spec_id)
2554 }
2555
2556 pub fn current_schema(&self) -> Result<&Schema> {
2558 self.schema(self.current_schema_id).ok_or_else(|| {
2559 Error::new(
2560 ErrorKind::IcebergDataInvalid,
2561 format!("Schema id {} not found!", self.current_schema_id),
2562 )
2563 })
2564 }
2565
2566 pub fn schema(&self, schema_id: i32) -> Option<&Schema> {
2568 self.schemas.iter().find(|s| s.schema_id == schema_id)
2569 }
2570
2571 pub fn current_snapshot(&self) -> Result<Option<&Snapshot>> {
2573 if self.current_snapshot_id == Some(EMPTY_SNAPSHOT_ID) || self.current_snapshot_id.is_none()
2574 {
2575 return Ok(None);
2576 }
2577
2578 Ok(Some(
2579 self.snapshot(self.current_snapshot_id.unwrap())
2580 .ok_or_else(|| {
2581 Error::new(
2582 ErrorKind::IcebergDataInvalid,
2583 format!(
2584 "Snapshot id {} not found!",
2585 self.current_snapshot_id.unwrap()
2586 ),
2587 )
2588 })?,
2589 ))
2590 }
2591
2592 pub fn snapshot(&self, snapshot_id: i64) -> Option<&Snapshot> {
2593 if let Some(snapshots) = &self.snapshots {
2594 snapshots.iter().find(|s| s.snapshot_id == snapshot_id)
2595 } else {
2596 None
2597 }
2598 }
2599
2600 pub fn snapshot_ref(&self, branch: &str) -> Option<&SnapshotReference> {
2602 self.refs.get(branch)
2603 }
2604
2605 pub fn set_snapshot_ref(&mut self, branch: &str, snap_ref: SnapshotReference) -> Result<()> {
2607 let snapshot = self
2608 .snapshots
2609 .as_ref()
2610 .and_then(|s| s.iter().find(|s| s.snapshot_id == snap_ref.snapshot_id))
2611 .ok_or_else(|| {
2612 Error::new(
2613 ErrorKind::IcebergDataInvalid,
2614 format!("Snapshot id {} not found!", snap_ref.snapshot_id),
2615 )
2616 })?;
2617 self.refs
2618 .entry(branch.to_string())
2619 .and_modify(|s| {
2620 s.snapshot_id = snap_ref.snapshot_id;
2621 s.typ = snap_ref.typ;
2622
2623 if let Some(min_snapshots_to_keep) = snap_ref.min_snapshots_to_keep {
2624 s.min_snapshots_to_keep = Some(min_snapshots_to_keep);
2625 }
2626
2627 if let Some(max_snapshot_age_ms) = snap_ref.max_snapshot_age_ms {
2628 s.max_snapshot_age_ms = Some(max_snapshot_age_ms);
2629 }
2630
2631 if let Some(max_ref_age_ms) = snap_ref.max_ref_age_ms {
2632 s.max_ref_age_ms = Some(max_ref_age_ms);
2633 }
2634 })
2635 .or_insert_with(|| {
2636 SnapshotReference::new(snap_ref.snapshot_id, SnapshotReferenceType::Branch)
2637 });
2638
2639 if branch == MAIN_BRANCH {
2640 self.current_snapshot_id = Some(snap_ref.snapshot_id);
2641 self.last_updated_ms = snapshot.timestamp_ms;
2642 self.last_sequence_number = snapshot.sequence_number;
2643 if let Some(snap_logs) = self.snapshot_log.as_mut() {
2644 snap_logs.push(snapshot.log());
2645 } else {
2646 self.snapshot_log = Some(vec![snapshot.log()]);
2647 }
2648 }
2649 Ok(())
2650 }
2651
2652 pub(crate) fn add_snapshot(&mut self, snapshot: Snapshot) -> Result<()> {
2653 if let Some(snapshots) = &mut self.snapshots {
2654 snapshots.push(snapshot);
2655 } else {
2656 self.snapshots = Some(vec![snapshot]);
2657 }
2658
2659 Ok(())
2660 }
2661}
2662
2663#[derive(Debug, PartialEq, Eq, Clone, Copy)]
2665pub enum TableFormatVersion {
2666 V1 = 1,
2668 V2 = 2,
2670}
2671
2672impl TryFrom<u8> for TableFormatVersion {
2673 type Error = Error;
2674
2675 fn try_from(value: u8) -> Result<TableFormatVersion> {
2676 match value {
2677 1 => Ok(TableFormatVersion::V1),
2678 2 => Ok(TableFormatVersion::V2),
2679 _ => Err(Error::new(
2680 ErrorKind::IcebergDataInvalid,
2681 format!("Unknown table format: {value}"),
2682 )),
2683 }
2684 }
2685}
2686
2687impl Display for TableFormatVersion {
2688 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2689 match self {
2690 TableFormatVersion::V1 => f.write_str("1"),
2691 TableFormatVersion::V2 => f.write_str("2"),
2692 }
2693 }
2694}
2695
2696#[cfg(test)]
2697mod test {
2698 use apache_avro::{schema, types::Value};
2699 use std::collections::HashMap;
2700
2701 use crate::types::SnapshotSummaryBuilder;
2702 use crate::types::{Field, PrimitiveValue, Struct, StructValueBuilder};
2703
2704 use super::{AnyValue, DataFile, StructValue};
2705
2706 #[test]
2707 fn test_struct_to_avro() {
2708 let value = {
2709 let struct_value = {
2710 let struct_type = Struct::new(vec![
2711 Field::optional(
2712 1,
2713 "a",
2714 crate::types::Any::Primitive(crate::types::Primitive::Int),
2715 )
2716 .into(),
2717 Field::required(
2718 2,
2719 "b",
2720 crate::types::Any::Primitive(crate::types::Primitive::String),
2721 )
2722 .into(),
2723 ]);
2724 let mut builder = StructValueBuilder::new(struct_type.into());
2725 builder.add_field(1, None).unwrap();
2726 builder
2727 .add_field(
2728 2,
2729 Some(AnyValue::Primitive(PrimitiveValue::String(
2730 "hello".to_string(),
2731 ))),
2732 )
2733 .unwrap();
2734 AnyValue::Struct(builder.build().unwrap())
2735 };
2736
2737 let mut res = apache_avro::to_value(struct_value).unwrap();
2738
2739 if let Value::Record(ref mut record) = res {
2741 record.sort_by(|a, b| a.0.cmp(&b.0));
2742 }
2743
2744 res
2745 };
2746
2747 let expect_value = {
2748 let raw_schema = r#"
2749 {
2750 "type": "record",
2751 "name": "test",
2752 "fields": [
2753 {"name": "a", "type": ["int","null"]},
2754 {"name": "b", "type": "string"}
2755 ]
2756 }
2757 "#;
2758
2759 let schema = schema::Schema::parse_str(raw_schema).unwrap();
2760
2761 let mut record = apache_avro::types::Record::new(&schema).unwrap();
2762 record.put("a", None::<String>);
2763 record.put("b", "hello");
2764
2765 record.into()
2766 };
2767
2768 assert_eq!(value, expect_value);
2769 }
2770
2771 #[test]
2772 fn test_struct_field_id_lookup() {
2773 let struct_type1 = Struct::new(vec![
2774 Field::optional(
2775 1,
2776 "a",
2777 crate::types::Any::Primitive(crate::types::Primitive::Int),
2778 )
2779 .into(),
2780 Field::required(
2781 2,
2782 "b",
2783 crate::types::Any::Primitive(crate::types::Primitive::String),
2784 )
2785 .into(),
2786 ]);
2787 let struct_type2 = Struct::new(vec![
2788 Field::required(3, "c", crate::types::Any::Struct(struct_type1.into())).into(),
2789 Field::required(
2790 4,
2791 "d",
2792 crate::types::Any::Primitive(crate::types::Primitive::Int),
2793 )
2794 .into(),
2795 ]);
2796 assert_eq!(struct_type2.lookup_field(1).unwrap().name, "a");
2797 assert_eq!(struct_type2.lookup_field(2).unwrap().name, "b");
2798 assert_eq!(struct_type2.lookup_field(3).unwrap().name, "c");
2799 assert_eq!(struct_type2.lookup_field(4).unwrap().name, "d");
2800 }
2801
2802 #[test]
2803 fn test_snapshot_summary() {
2804 let (data_file, pos_delete_file, eq_delete_file) = {
2805 let data_file = DataFile {
2806 content: super::DataContentType::Data,
2807 file_path: String::new(),
2808 file_format: super::DataFileFormat::Parquet,
2809 partition: StructValue::default(),
2810 record_count: 10,
2811 file_size_in_bytes: 100,
2812 column_sizes: None,
2813 value_counts: None,
2814 null_value_counts: None,
2815 nan_value_counts: None,
2816 distinct_counts: None,
2817 lower_bounds: None,
2818 upper_bounds: None,
2819 key_metadata: None,
2820 split_offsets: None,
2821 equality_ids: None,
2822 sort_order_id: None,
2823 };
2824 let pos_delete_file = {
2825 let mut data_file = data_file.clone();
2826 data_file.content = super::DataContentType::PositionDeletes;
2827 data_file
2828 };
2829 let eq_delete_file = {
2830 let mut data_file = data_file.clone();
2831 data_file.content = super::DataContentType::EqualityDeletes;
2832 data_file
2833 };
2834 (data_file, pos_delete_file, eq_delete_file)
2835 };
2836
2837 let mut builder = SnapshotSummaryBuilder::new();
2839 builder.add(&data_file);
2840 let summary_1 = builder.merge(&HashMap::new(), false).unwrap();
2841 assert_eq!(
2842 summary_1.get(SnapshotSummaryBuilder::OPERATION).unwrap(),
2843 "append"
2844 );
2845 assert_eq!(
2846 summary_1
2847 .get(SnapshotSummaryBuilder::ADDED_DATA_FILES)
2848 .unwrap(),
2849 "1"
2850 );
2851 assert_eq!(
2852 summary_1
2853 .get(SnapshotSummaryBuilder::TOTAL_DATA_FILES)
2854 .unwrap(),
2855 "1"
2856 );
2857 assert_eq!(
2858 summary_1
2859 .get(SnapshotSummaryBuilder::ADDED_RECORDS)
2860 .unwrap(),
2861 "10"
2862 );
2863 assert_eq!(
2864 summary_1
2865 .get(SnapshotSummaryBuilder::TOTAL_RECORDS)
2866 .unwrap(),
2867 "10"
2868 );
2869 assert_eq!(
2870 summary_1
2871 .get(SnapshotSummaryBuilder::ADDED_FILES_SIZE)
2872 .unwrap(),
2873 "100"
2874 );
2875 assert_eq!(
2876 summary_1
2877 .get(SnapshotSummaryBuilder::TOTAL_FILES_SIZE)
2878 .unwrap(),
2879 "100"
2880 );
2881
2882 let mut builder = SnapshotSummaryBuilder::new();
2885 builder.add(&pos_delete_file);
2886 builder.add(&eq_delete_file);
2887 let summary_2 = builder.merge(&summary_1, false).unwrap();
2888 assert_eq!(
2889 summary_2.get(SnapshotSummaryBuilder::OPERATION).unwrap(),
2890 "delete"
2891 );
2892 assert_eq!(
2893 summary_2
2894 .get(SnapshotSummaryBuilder::ADDED_DATA_FILES)
2895 .unwrap(),
2896 "0"
2897 );
2898 assert_eq!(
2899 summary_2
2900 .get(SnapshotSummaryBuilder::TOTAL_DATA_FILES)
2901 .unwrap(),
2902 "1"
2903 );
2904 assert_eq!(
2905 summary_2
2906 .get(SnapshotSummaryBuilder::ADDED_DELETE_FILES)
2907 .unwrap(),
2908 "2"
2909 );
2910 assert_eq!(
2911 summary_2
2912 .get(SnapshotSummaryBuilder::ADDED_POSITION_DELETE_FILES)
2913 .unwrap(),
2914 "1"
2915 );
2916 assert_eq!(
2917 summary_2
2918 .get(SnapshotSummaryBuilder::ADDED_EQUALITY_DELETE_FILES)
2919 .unwrap(),
2920 "1"
2921 );
2922 assert_eq!(
2923 summary_2
2924 .get(SnapshotSummaryBuilder::TOTAL_DELETE_FILES)
2925 .unwrap(),
2926 "2"
2927 );
2928 assert_eq!(
2929 summary_2
2930 .get(SnapshotSummaryBuilder::ADDED_RECORDS)
2931 .unwrap(),
2932 "0"
2933 );
2934 assert_eq!(
2935 summary_2
2936 .get(SnapshotSummaryBuilder::TOTAL_RECORDS)
2937 .unwrap(),
2938 "10"
2939 );
2940 assert_eq!(
2941 summary_2
2942 .get(SnapshotSummaryBuilder::ADDED_POSITION_DELETES)
2943 .unwrap(),
2944 "10"
2945 );
2946 assert_eq!(
2947 summary_2
2948 .get(SnapshotSummaryBuilder::TOTAL_POSITION_DELETES)
2949 .unwrap(),
2950 "10"
2951 );
2952 assert_eq!(
2953 summary_2
2954 .get(SnapshotSummaryBuilder::ADDED_EQUALITY_DELETES)
2955 .unwrap(),
2956 "10"
2957 );
2958 assert_eq!(
2959 summary_2
2960 .get(SnapshotSummaryBuilder::TOTAL_EQUALITY_DELETES)
2961 .unwrap(),
2962 "10"
2963 );
2964 assert_eq!(
2965 summary_2
2966 .get(SnapshotSummaryBuilder::ADDED_FILES_SIZE)
2967 .unwrap(),
2968 "200"
2969 );
2970 assert_eq!(
2971 summary_2
2972 .get(SnapshotSummaryBuilder::TOTAL_FILES_SIZE)
2973 .unwrap(),
2974 "300"
2975 );
2976
2977 let mut builder = SnapshotSummaryBuilder::new();
2981 builder.add(&data_file);
2982 builder.add(&pos_delete_file);
2983 builder.add(&eq_delete_file);
2984 let summary_3 = builder.merge(&summary_2, false).unwrap();
2985 assert_eq!(
2986 summary_3.get(SnapshotSummaryBuilder::OPERATION).unwrap(),
2987 "overwrite"
2988 );
2989 assert_eq!(
2990 summary_3
2991 .get(SnapshotSummaryBuilder::ADDED_DATA_FILES)
2992 .unwrap(),
2993 "1"
2994 );
2995 assert_eq!(
2996 summary_3
2997 .get(SnapshotSummaryBuilder::TOTAL_DATA_FILES)
2998 .unwrap(),
2999 "2"
3000 );
3001 assert_eq!(
3002 summary_3
3003 .get(SnapshotSummaryBuilder::ADDED_DELETE_FILES)
3004 .unwrap(),
3005 "2"
3006 );
3007 assert_eq!(
3008 summary_3
3009 .get(SnapshotSummaryBuilder::ADDED_POSITION_DELETE_FILES)
3010 .unwrap(),
3011 "1"
3012 );
3013 assert_eq!(
3014 summary_3
3015 .get(SnapshotSummaryBuilder::ADDED_EQUALITY_DELETE_FILES)
3016 .unwrap(),
3017 "1"
3018 );
3019 assert_eq!(
3020 summary_3
3021 .get(SnapshotSummaryBuilder::TOTAL_DELETE_FILES)
3022 .unwrap(),
3023 "4"
3024 );
3025 assert_eq!(
3026 summary_3
3027 .get(SnapshotSummaryBuilder::ADDED_RECORDS)
3028 .unwrap(),
3029 "10"
3030 );
3031 assert_eq!(
3032 summary_3
3033 .get(SnapshotSummaryBuilder::TOTAL_RECORDS)
3034 .unwrap(),
3035 "20"
3036 );
3037 assert_eq!(
3038 summary_3
3039 .get(SnapshotSummaryBuilder::ADDED_POSITION_DELETES)
3040 .unwrap(),
3041 "10"
3042 );
3043 assert_eq!(
3044 summary_3
3045 .get(SnapshotSummaryBuilder::TOTAL_POSITION_DELETES)
3046 .unwrap(),
3047 "20"
3048 );
3049 assert_eq!(
3050 summary_3
3051 .get(SnapshotSummaryBuilder::ADDED_EQUALITY_DELETES)
3052 .unwrap(),
3053 "10"
3054 );
3055 assert_eq!(
3056 summary_3
3057 .get(SnapshotSummaryBuilder::TOTAL_EQUALITY_DELETES)
3058 .unwrap(),
3059 "20"
3060 );
3061 assert_eq!(
3062 summary_3
3063 .get(SnapshotSummaryBuilder::ADDED_FILES_SIZE)
3064 .unwrap(),
3065 "300"
3066 );
3067 assert_eq!(
3068 summary_3
3069 .get(SnapshotSummaryBuilder::TOTAL_FILES_SIZE)
3070 .unwrap(),
3071 "600"
3072 );
3073 }
3074}