icelake/types/
in_memory.rs

1//! in_memory module provides the definition of iceberg in-memory data types.
2
3use std::fmt::{Display, Formatter};
4use std::hash::Hasher;
5use std::sync::Arc;
6use std::{collections::HashMap, str::FromStr};
7
8use bitvec::vec::BitVec;
9use chrono::NaiveDate;
10use chrono::NaiveDateTime;
11use chrono::NaiveTime;
12use chrono::Utc;
13use chrono::{DateTime, Datelike};
14use derive_builder::Builder;
15use opendal::Operator;
16use ordered_float::OrderedFloat;
17use parquet::format::FileMetaData;
18use serde::ser::SerializeMap;
19use serde::ser::SerializeStruct;
20use serde::Serialize;
21use std::hash::Hash;
22use uuid::Uuid;
23
24use crate::types::in_memory::_decimal::REQUIRED_LENGTH;
25use crate::types::parse_manifest_list;
26use crate::ErrorKind;
27use crate::Result;
28use crate::{Error, Table};
29
30pub(crate) const UNASSIGNED_SEQ_NUM: i64 = -1;
31pub(crate) const MAIN_BRANCH: &str = "main";
32const EMPTY_SNAPSHOT_ID: i64 = -1;
33
34pub(crate) const MAX_DECIMAL_BYTES: u32 = 24;
35pub(crate) const MAX_DECIMAL_PRECISION: u32 = 38;
36
37mod _decimal {
38    use lazy_static::lazy_static;
39
40    use super::{MAX_DECIMAL_BYTES, MAX_DECIMAL_PRECISION};
41
42    lazy_static! {
43        // Max precision of bytes, starts from 1
44        pub(super) static ref MAX_PRECISION: [u32; MAX_DECIMAL_BYTES as usize] = {
45            let mut ret: [u32; 24] = [0; 24];
46            for (i, prec) in ret.iter_mut().enumerate() {
47                *prec = 2f64.powi((8 * (i + 1) - 1) as i32).log10().floor() as u32;
48            }
49
50            ret
51        };
52
53        //  Required bytes of precision, starts from 1
54        pub(super) static ref REQUIRED_LENGTH: [u32; MAX_DECIMAL_PRECISION as usize] = {
55            let mut ret: [u32; MAX_DECIMAL_PRECISION as usize] = [0; MAX_DECIMAL_PRECISION as usize];
56
57            for (i, required_len) in ret.iter_mut().enumerate() {
58                for j in 0..MAX_PRECISION.len() {
59                    if MAX_PRECISION[j] >= ((i+1) as u32) {
60                        *required_len = (j+1) as u32;
61                        break;
62                    }
63                }
64            }
65
66            ret
67        };
68
69    }
70}
71
72/// All data types are either primitives or nested types, which are maps, lists, or structs.
73#[derive(Debug, PartialEq, Clone, Eq)]
74pub enum Any {
75    /// A Primitive type
76    Primitive(Primitive),
77    /// A Struct type
78    Struct(Arc<Struct>),
79    /// A List type.
80    List(List),
81    /// A Map type
82    Map(Map),
83}
84
85/// All data values are either primitives or nested values, which are maps, lists, or structs.
86///
87/// # TODO:
88/// Allow derived hash because Map will not be used in hash, so it's ok to derive hash.
89/// But this may be misuse in the future.
90#[allow(clippy::derived_hash_with_manual_eq)]
91#[derive(Debug, Clone, Eq, Hash)]
92pub enum AnyValue {
93    /// A Primitive type
94    Primitive(PrimitiveValue),
95    /// A Struct type is a tuple of typed values.
96    ///
97    /// struct value carries the value of a struct type, could be used as
98    /// default value.
99    ///
100    /// struct value stores as a map from field id to field value.
101    Struct(StructValue),
102    /// A list type with a list of typed values.
103    List(Vec<Option<AnyValue>>),
104    /// A map is a collection of key-value pairs with a key type and a value type.
105    ///
106    /// map value carries the value of a map type, could be used as
107    /// default value.
108    Map {
109        /// All keys in this map.
110        keys: Vec<AnyValue>,
111        /// All values in this map.
112        values: Vec<Option<AnyValue>>,
113    },
114}
115
116impl PartialEq for AnyValue {
117    fn eq(&self, other: &Self) -> bool {
118        match (self, other) {
119            (Self::Primitive(l0), Self::Primitive(r0)) => l0 == r0,
120            (Self::Struct(l0), Self::Struct(r0)) => l0 == r0,
121            (Self::List(l0), Self::List(r0)) => l0 == r0,
122            (
123                Self::Map {
124                    keys: l_keys,
125                    values: l_values,
126                },
127                Self::Map {
128                    keys: r_keys,
129                    values: r_values,
130                },
131            ) => {
132                // # TODO
133                // A inefficient way to compare map.
134                let mut map = HashMap::with_capacity(l_keys.len());
135                l_keys.iter().zip(l_values.iter()).for_each(|(key, value)| {
136                    map.insert(key, value);
137                });
138                r_keys
139                    .iter()
140                    .zip(r_values.iter())
141                    .all(|(key, value)| map.get(key).map_or(false, |v| *v == value))
142            }
143            _ => false,
144        }
145    }
146}
147
148impl Serialize for AnyValue {
149    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
150    where
151        S: serde::Serializer,
152    {
153        match self {
154            AnyValue::Primitive(value) => value.serialize(serializer),
155            AnyValue::Struct(value) => value.serialize(serializer),
156            AnyValue::List(value) => value.serialize(serializer),
157            AnyValue::Map { keys, values } => {
158                let mut map = serializer.serialize_map(Some(keys.len()))?;
159                for (key, value) in keys.iter().zip(values.iter()) {
160                    map.serialize_entry(key, value)?;
161                }
162                map.end()
163            }
164        }
165    }
166}
167
168/// Primitive Types within a schema.
169#[derive(Debug, PartialEq, Eq, Clone, Copy)]
170pub enum Primitive {
171    /// True or False
172    Boolean,
173    /// 32-bit signed integer, Can promote to long
174    Int,
175    /// 64-bit signed integer
176    Long,
177    /// 32-bit IEEE 753 floating bit, Can promote to double
178    Float,
179    /// 64-bit IEEE 753 floating bit.
180    Double,
181    /// Fixed point decimal
182    ///
183    /// - Precision can only be widened.
184    /// - Scale is fixed and cannot be changed by schema evolution.
185    Decimal {
186        /// The number of digits in the number, precision must be 38 or less
187        precision: u8,
188        /// The number of digits to the right of the decimal point.
189        scale: u8,
190    },
191    /// Calendar date without timezone or time
192    Date,
193    /// Time of day without date or timezone.
194    ///
195    /// Time values are stored with microsecond precision.
196    Time,
197    /// Timestamp without timezone
198    ///
199    /// Timestamp values are stored with microsecond precision.
200    ///
201    /// Timestamps without time zone represent a date and time of day regardless of zone:
202    /// the time value is independent of zone adjustments (`2017-11-16 17:10:34` is always retrieved as `2017-11-16 17:10:34`).
203    /// Timestamp values are stored as a long that encodes microseconds from the unix epoch.
204    Timestamp,
205    /// Timestamp with timezone
206    ///
207    /// Timestampz values are stored with microsecond precision.
208    ///
209    /// Timestamps with time zone represent a point in time:
210    /// values are stored as UTC and do not retain a source time zone
211    /// (`2017-11-16 17:10:34 PST` is stored/retrieved as `2017-11-17 01:10:34 UTC` and these values are considered identical).
212    Timestampz,
213    /// Arbitrary-length character sequences, Encoded with UTF-8
214    ///
215    /// Character strings must be stored as UTF-8 encoded byte arrays.
216    String,
217    /// Universally Unique Identifiers, Should use 16-byte fixed
218    Uuid,
219    /// Fixed-length byte array of length.
220    Fixed(u64),
221    /// Arbitrary-length byte array.
222    Binary,
223}
224
225impl From<Primitive> for Any {
226    fn from(value: Primitive) -> Self {
227        Any::Primitive(value)
228    }
229}
230
231impl Primitive {
232    /// Returns minimum bytes required for decimal with [`precision`].
233    #[inline(always)]
234    pub fn decimal_required_bytes(precision: u32) -> Result<u32> {
235        if precision == 0 || precision > MAX_DECIMAL_PRECISION {
236            return Err(Error::new(
237                ErrorKind::IcebergDataInvalid,
238                format!(
239                    "Decimal precision must be between 1 and {MAX_DECIMAL_PRECISION}: {precision}",
240                ),
241            ));
242        }
243        Ok(REQUIRED_LENGTH[precision as usize - 1])
244    }
245}
246
247/// Primitive Values within a schema.
248///
249/// Used to represent the value of a primitive type, like as default value.
250#[derive(Debug, PartialEq, Clone, Eq, Hash)]
251pub enum PrimitiveValue {
252    /// True or False
253    Boolean(bool),
254    /// 32-bit signed integer, Can promote to long
255    Int(i32),
256    /// 64-bit signed integer
257    Long(i64),
258    /// 32-bit IEEE 753 floating bit, Can promote to double
259    Float(OrderedFloat<f32>),
260    /// 64-bit IEEE 753 floating bit.
261    Double(OrderedFloat<f64>),
262    /// Fixed point decimal
263    Decimal(i128),
264    /// Calendar date without timezone or time
265    Date(NaiveDate),
266    /// Time of day without date or timezone.
267    ///
268    /// Time values are stored with microsecond precision.
269    Time(NaiveTime),
270    /// Timestamp without timezone
271    ///
272    /// Timestamp values are stored with microsecond precision.
273    ///
274    /// Timestamps without time zone represent a date and time of day regardless of zone:
275    /// the time value is independent of zone adjustments (`2017-11-16 17:10:34` is always retrieved as `2017-11-16 17:10:34`).
276    /// Timestamp values are stored as a long that encodes microseconds from the unix epoch.
277    Timestamp(NaiveDateTime),
278    /// Timestamp with timezone
279    ///
280    /// Timestampz values are stored with microsecond precision.
281    ///
282    /// Timestamps with time zone represent a point in time:
283    /// values are stored as UTC and do not retain a source time zone
284    /// (`2017-11-16 17:10:34 PST` is stored/retrieved as `2017-11-17 01:10:34 UTC` and these values are considered identical).
285    Timestampz(DateTime<Utc>),
286    /// Arbitrary-length character sequences, Encoded with UTF-8
287    ///
288    /// Character strings must be stored as UTF-8 encoded byte arrays.
289    String(String),
290    /// Universally Unique Identifiers, Should use 16-byte fixed
291    Uuid(Uuid),
292    /// Fixed-length byte array of length.
293    Fixed(Vec<u8>),
294    /// Arbitrary-length byte array.
295    Binary(Vec<u8>),
296}
297
298impl From<PrimitiveValue> for AnyValue {
299    fn from(value: PrimitiveValue) -> Self {
300        AnyValue::Primitive(value)
301    }
302}
303
304impl Serialize for PrimitiveValue {
305    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
306    where
307        S: serde::Serializer,
308    {
309        match self {
310            PrimitiveValue::Boolean(value) => serializer.serialize_bool(*value),
311            PrimitiveValue::Int(value) => serializer.serialize_i32(*value),
312            PrimitiveValue::Long(value) => serializer.serialize_i64(*value),
313            PrimitiveValue::Float(value) => serializer.serialize_f32(value.0),
314            PrimitiveValue::Double(value) => serializer.serialize_f64(value.0),
315            PrimitiveValue::Decimal(value) => serializer.serialize_bytes(&value.to_be_bytes()),
316            PrimitiveValue::Date(value) => serializer.serialize_i32(value.num_days_from_ce()),
317            PrimitiveValue::Time(value) => serializer.serialize_i64(
318                NaiveDateTime::new(NaiveDate::default(), *value)
319                    .and_utc()
320                    .timestamp_micros(),
321            ),
322            PrimitiveValue::Timestamp(value) => {
323                serializer.serialize_i64(value.and_utc().timestamp_micros())
324            }
325            PrimitiveValue::Timestampz(value) => serializer.serialize_i64(value.timestamp_micros()),
326            PrimitiveValue::String(value) => serializer.serialize_str(value),
327            PrimitiveValue::Uuid(value) => serializer.serialize_str(&value.to_string()),
328            PrimitiveValue::Fixed(value) => serializer.serialize_bytes(value),
329            PrimitiveValue::Binary(value) => serializer.serialize_bytes(value),
330        }
331    }
332}
333
334/// A struct is a tuple of typed values.
335///
336/// - Each field in the tuple is named and has an integer id that is unique in the table schema.
337/// - Each field can be either optional or required, meaning that values can (or cannot) be null.
338/// - Fields may be any type.
339/// - Fields may have an optional comment or doc string.
340/// - Fields can have default values.
341#[derive(Default, Debug, Clone, Eq)]
342pub struct Struct {
343    /// Fields contained in this struct.
344    fields: Vec<FieldRef>,
345    /// Map field id to index
346    id_lookup: HashMap<i32, FieldRef>,
347}
348
349impl PartialEq for Struct {
350    fn eq(&self, other: &Self) -> bool {
351        for (id, field) in self.id_lookup.iter() {
352            if let Some(other_field) = other.id_lookup.get(id) {
353                if field != other_field {
354                    return false;
355                }
356            } else {
357                return false;
358            }
359        }
360        true
361    }
362}
363
364impl Struct {
365    /// Create a new struct.
366    pub fn new(fields: Vec<FieldRef>) -> Self {
367        let mut id_lookup = HashMap::with_capacity(fields.len());
368        fields.iter().for_each(|field| {
369            id_lookup.insert(field.id, field.clone());
370            Self::fetch_any_field_id_map(&field.field_type, &mut id_lookup)
371        });
372        Struct { fields, id_lookup }
373    }
374
375    fn fetch_any_field_id_map(ty: &Any, map: &mut HashMap<i32, FieldRef>) {
376        match ty {
377            Any::Primitive(_) => {
378                // just skip
379            }
380            Any::Struct(inner) => Self::fetch_struct_field_id_map(inner, map),
381            Any::List(_) => {
382                // #TODO
383                // field id map is only used in partition now. For partition, it don't need list,
384                // so we ignore it now.
385            }
386            Any::Map(_) => {
387                // #TODO
388                // field id map is only used in partition now. For partition, it don't need map,
389                // so we ignore it now.
390            }
391        }
392    }
393
394    fn fetch_struct_field_id_map(ty: &Struct, map: &mut HashMap<i32, FieldRef>) {
395        map.extend(ty.id_lookup.clone())
396    }
397
398    /// Return the number of fields in the struct.
399    pub fn len(&self) -> usize {
400        self.fields.len()
401    }
402
403    /// Check if the struct is empty.
404    pub fn is_empty(&self) -> bool {
405        self.fields.is_empty()
406    }
407
408    /// Return the reference to the field of this struct.
409    pub fn fields(&self) -> &[FieldRef] {
410        &self.fields
411    }
412
413    /// Lookup the field type according to the field id.
414    pub fn lookup_type(&self, field_id: i32) -> Option<Any> {
415        self.id_lookup
416            .get(&field_id)
417            .map(|field| field.field_type.clone())
418    }
419
420    /// Lookup the field according to the field id.
421    pub fn lookup_field(&self, field_id: i32) -> Option<&FieldRef> {
422        self.id_lookup.get(&field_id)
423    }
424
425    /// Lookup field by field name.
426    pub fn lookup_field_by_name(&self, field_name: &str) -> Option<&FieldRef> {
427        self.fields.iter().find(|field| field.name == field_name)
428    }
429}
430
431/// The reference to a Field.
432pub type FieldRef = Arc<Field>;
433
434/// A Field is the field of a struct.
435#[derive(Debug, PartialEq, Clone, Eq)]
436pub struct Field {
437    /// An integer id that is unique in the table schema
438    pub id: i32,
439    /// Field Name
440    pub name: String,
441    /// Optional or required, meaning that values can (or can not be null)
442    pub required: bool,
443    /// Field can have any type
444    pub field_type: Any,
445    /// Fields can have any optional comment or doc string.
446    pub comment: Option<String>,
447    /// `initial-default` is used to populate the field’s value for all records that were written before the field was added to the schema
448    pub initial_default: Option<AnyValue>,
449    /// `write-default` is used to populate the field’s value for any
450    /// records written after the field was added to the schema, if the
451    /// writer does not supply the field’s value
452    pub write_default: Option<AnyValue>,
453}
454
455impl Field {
456    /// Create a required field.
457    pub fn required(id: i32, name: impl Into<String>, r#type: Any) -> Self {
458        Self {
459            id,
460            name: name.into(),
461            required: true,
462            field_type: r#type,
463            comment: None,
464            initial_default: None,
465            write_default: None,
466        }
467    }
468
469    /// Create an optional field.
470    pub fn optional(id: i32, name: impl Into<String>, r#type: Any) -> Self {
471        Self {
472            id,
473            name: name.into(),
474            required: false,
475            field_type: r#type,
476            comment: None,
477            initial_default: None,
478            write_default: None,
479        }
480    }
481
482    fn with_comment(mut self, doc: impl Into<String>) -> Self {
483        self.comment = Some(doc.into());
484        self
485    }
486
487    fn with_required(mut self) -> Self {
488        self.required = true;
489        self
490    }
491}
492
493/// A Struct type is a tuple of typed values.
494#[derive(Default, Debug, PartialEq, Clone, Eq)]
495pub struct StructValue {
496    field_values: Vec<AnyValue>,
497    type_info: Arc<Struct>,
498    null_bitmap: BitVec,
499}
500
501impl From<StructValue> for AnyValue {
502    fn from(value: StructValue) -> Self {
503        AnyValue::Struct(value)
504    }
505}
506
507impl StructValue {
508    /// Create a iterator to read the field in order of (field_id, field_value, field_name,
509    /// field_required).
510    pub fn iter(&self) -> impl Iterator<Item = (i32, Option<&AnyValue>, &str, bool)> {
511        self.null_bitmap
512            .iter()
513            .zip(self.field_values.iter())
514            .zip(self.type_info.fields().iter())
515            .map(|((null, value), field)| {
516                (
517                    field.id,
518                    if *null { None } else { Some(value) },
519                    field.name.as_str(),
520                    field.required,
521                )
522            })
523    }
524}
525
526impl Serialize for StructValue {
527    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
528    where
529        S: serde::Serializer,
530    {
531        let mut record = serializer.serialize_struct("", self.field_values.len())?;
532        for (_, value, key, required) in self.iter() {
533            if required {
534                record.serialize_field(Box::leak(key.to_string().into_boxed_str()), &value.expect("Struct Builder should guaranteed that the value is always if the field is required."))?;
535            } else {
536                record.serialize_field(Box::leak(key.to_string().into_boxed_str()), &value)?;
537            }
538        }
539        record.end()
540    }
541}
542
543impl Hash for StructValue {
544    fn hash<H: Hasher>(&self, state: &mut H) {
545        for (id, value, name, required) in self.iter() {
546            id.hash(state);
547            value.hash(state);
548            name.hash(state);
549            required.hash(state);
550        }
551    }
552}
553
554/// A builder to build a struct value. Buidler will guaranteed that the StructValue is valid for the Struct.
555pub struct StructValueBuilder {
556    fields: HashMap<i32, Option<AnyValue>>,
557    type_info: Arc<Struct>,
558}
559
560impl StructValueBuilder {
561    /// Create a new builder.
562    pub fn new(type_info: Arc<Struct>) -> Self {
563        Self {
564            fields: HashMap::with_capacity(type_info.len()),
565            type_info,
566        }
567    }
568
569    /// Add a field to the struct value.
570    pub fn add_field(&mut self, field_id: i32, field_value: Option<AnyValue>) -> Result<()> {
571        // Check the value is valid if the field is required.
572        if self
573            .type_info
574            .lookup_field(field_id)
575            .ok_or_else(|| {
576                Error::new(
577                    ErrorKind::IcebergDataInvalid,
578                    format!("Field {} is not found", field_id),
579                )
580            })?
581            .required
582            && field_value.is_none()
583        {
584            return Err(Error::new(
585                ErrorKind::IcebergDataInvalid,
586                format!("Field {} is required", field_id),
587            ));
588        }
589        // Check the field id is valid.
590        self.type_info.lookup_type(field_id).ok_or_else(|| {
591            Error::new(
592                ErrorKind::IcebergDataInvalid,
593                format!("Field {} is not found", field_id),
594            )
595        })?;
596        // TODO: Check the field type is consistent.
597        // TODO: Check the duplication of field.
598
599        self.fields.insert(field_id, field_value);
600        Ok(())
601    }
602
603    /// Build the struct value.
604    pub fn build(mut self) -> Result<StructValue> {
605        let mut field_values = Vec::with_capacity(self.fields.len());
606        let mut null_bitmap = BitVec::with_capacity(self.fields.len());
607
608        for field in self.type_info.fields.iter() {
609            let field_value = self.fields.remove(&field.id).ok_or_else(|| {
610                Error::new(
611                    ErrorKind::IcebergDataInvalid,
612                    format!("Field {} is required", field.name),
613                )
614            })?;
615            if let Some(value) = field_value {
616                null_bitmap.push(false);
617                field_values.push(value);
618            } else {
619                null_bitmap.push(true);
620                // `1` is just as a placeholder. It will be ignored.
621                field_values.push(AnyValue::Primitive(PrimitiveValue::Int(1)));
622            }
623        }
624
625        Ok(StructValue {
626            field_values,
627            type_info: self.type_info,
628            null_bitmap,
629        })
630    }
631}
632
633/// A list is a collection of values with some element type.
634///
635/// - The element field has an integer id that is unique in the table schema.
636/// - Elements can be either optional or required.
637/// - Element types may be any type.
638#[derive(Debug, PartialEq, Clone, Eq)]
639pub struct List {
640    /// an integer id that is unique in the table schema.
641    pub element_id: i32,
642    /// Optional or required, meaning that values can (or can not be null)
643    pub element_required: bool,
644    /// Element types may be any type.
645    pub element_type: Box<Any>,
646}
647
648/// A map is a collection of key-value pairs with a key type and a value type.
649///
650/// - Both the key field and value field each have an integer id that is unique in the table schema.
651/// - Map keys are required and map values can be either optional or required.
652/// - Both map keys and map values may be any type, including nested types.
653#[derive(Debug, PartialEq, Clone, Eq)]
654pub struct Map {
655    /// an integer id that is unique in the table schema
656    pub key_id: i32,
657    /// Both map keys and map values may be any type, including nested types.
658    pub key_type: Box<Any>,
659
660    /// an integer id that is unique in the table schema
661    pub value_id: i32,
662    /// map values can be either optional or required.
663    pub value_required: bool,
664    /// Both map keys and map values may be any type, including nested types.
665    pub value_type: Box<Any>,
666}
667
668/// A table’s schema is a list of named columns.
669///
670/// All data types are either primitives or nested types, which are maps, lists, or structs.
671/// A table schema is also a struct type.
672#[derive(Debug, PartialEq, Clone)]
673pub struct Schema {
674    /// The unique id for this schema.
675    pub schema_id: i32,
676    /// A schema can optionally track the set of primitive fields that
677    /// identify rows in a table, using the property identifier-field-ids
678    pub identifier_field_ids: Option<Vec<i32>>,
679    /// fields contained in this schema.
680    r#struct: Struct,
681}
682
683impl Schema {
684    /// Create a schema
685    pub fn new(schema_id: i32, identifier_field_ids: Option<Vec<i32>>, r#struct: Struct) -> Self {
686        Schema {
687            schema_id,
688            identifier_field_ids,
689            r#struct,
690        }
691    }
692
693    /// Return the fields of the schema
694    pub fn fields(&self) -> &[FieldRef] {
695        self.r#struct.fields()
696    }
697
698    /// Look up field by field id
699    pub fn look_up_field_by_id(&self, field_id: i32) -> Option<&FieldRef> {
700        self.r#struct.lookup_field(field_id)
701    }
702}
703
704/// Transform is used to transform predicates to partition predicates,
705/// in addition to transforming data values.
706///
707/// Deriving partition predicates from column predicates on the table data
708/// is used to separate the logical queries from physical storage: the
709/// partitioning can change and the correct partition filters are always
710/// derived from column predicates.
711///
712/// This simplifies queries because users don’t have to supply both logical
713/// predicates and partition predicates.
714///
715/// All transforms must return `null` for a `null` input value.
716#[derive(Debug, PartialEq, Eq, Clone, Copy)]
717pub enum Transform {
718    /// Source value, unmodified
719    ///
720    /// - Source type could be `Any`.
721    /// - Return type is the same with source type.
722    Identity,
723    /// Hash of value, mod `N`.
724    ///
725    /// Bucket partition transforms use a 32-bit hash of the source value.
726    /// The 32-bit hash implementation is the 32-bit Murmur3 hash, x86
727    /// variant, seeded with 0.
728    ///
729    /// Transforms are parameterized by a number of buckets, N. The hash mod
730    /// N must produce a positive value by first discarding the sign bit of
731    /// the hash value. In pseudo-code, the function is:
732    ///
733    /// ```text
734    /// def bucket_N(x) = (murmur3_x86_32_hash(x) & Integer.MAX_VALUE) % N
735    /// ```
736    ///
737    /// - Source type could be `int`, `long`, `decimal`, `date`, `time`,
738    ///   `timestamp`, `timestamptz`, `string`, `uuid`, `fixed`, `binary`.
739    /// - Return type is `int`.
740    Bucket(i32),
741    /// Value truncated to width `W`
742    ///
743    /// For `int`:
744    ///
745    /// - `v - (v % W)` remainders must be positive
746    /// - example: W=10: 1 → 0, -1 → -10
747    /// - note: The remainder, v % W, must be positive.
748    ///
749    /// For `long`:
750    ///
751    /// - `v - (v % W)` remainders must be positive
752    /// - example: W=10: 1 → 0, -1 → -10
753    /// - note: The remainder, v % W, must be positive.
754    ///
755    /// For `decimal`:
756    ///
757    /// - `scaled_W = decimal(W, scale(v)) v - (v % scaled_W)`
758    /// - example: W=50, s=2: 10.65 → 10.50
759    ///
760    /// For `string`:
761    ///
762    /// - Substring of length L: `v.substring(0, L)`
763    /// - example: L=3: iceberg → ice
764    /// - note: Strings are truncated to a valid UTF-8 string with no more
765    ///   than L code points.
766    ///
767    /// - Source type could be `int`, `long`, `decimal`, `string`
768    /// - Return type is the same with source type.
769    Truncate(i32),
770    /// Extract a date or timestamp year, as years from 1970
771    ///
772    /// - Source type could be `date`, `timestamp`, `timestamptz`
773    /// - Return type is `int`
774    Year,
775    /// Extract a date or timestamp month, as months from 1970-01-01
776    ///
777    /// - Source type could be `date`, `timestamp`, `timestamptz`
778    /// - Return type is `int`
779    Month,
780    /// Extract a date or timestamp day, as days from 1970-01-01
781    ///
782    /// - Source type could be `date`, `timestamp`, `timestamptz`
783    /// - Return type is `int`
784    Day,
785    /// Extract a timestamp hour, as hours from 1970-01-01 00:00:00
786    ///
787    /// - Source type could be `timestamp`, `timestamptz`
788    /// - Return type is `int`
789    Hour,
790    /// Always produces `null`
791    ///
792    /// The void transform may be used to replace the transform in an
793    /// existing partition field so that the field is effectively dropped in
794    /// v1 tables.
795    ///
796    /// - Source type could be `Any`.
797    /// - Return type is Source type or `int`
798    Void,
799}
800
801impl Transform {
802    pub fn result_type(&self, input_type: &Any) -> Result<Any> {
803        let check_time = |input_type: &Any| {
804            if !matches!(
805                input_type,
806                Any::Primitive(Primitive::Date)
807                    | Any::Primitive(Primitive::Timestamp)
808                    | Any::Primitive(Primitive::Timestampz)
809            ) {
810                return Err(Error::new(
811                    ErrorKind::IcebergDataInvalid,
812                    format!("transform year type {input_type:?} is invalid"),
813                ));
814            }
815            Ok(())
816        };
817        let check_bucket = |input_type: &Any| {
818            if !matches!(
819                input_type,
820                Any::Primitive(Primitive::Int)
821                    | Any::Primitive(Primitive::Long)
822                    | Any::Primitive(Primitive::Decimal { .. })
823                    | Any::Primitive(Primitive::Date)
824                    | Any::Primitive(Primitive::Time)
825                    | Any::Primitive(Primitive::Timestamp)
826                    | Any::Primitive(Primitive::Timestampz)
827                    | Any::Primitive(Primitive::String)
828                    | Any::Primitive(Primitive::Uuid)
829                    | Any::Primitive(Primitive::Fixed(_))
830                    | Any::Primitive(Primitive::Binary)
831            ) {
832                return Err(Error::new(
833                    ErrorKind::IcebergDataInvalid,
834                    format!("transform bucket type {input_type:?} is invalid"),
835                ));
836            }
837            Ok(())
838        };
839        let check_truncate = |input_type: &Any| {
840            if !matches!(
841                input_type,
842                Any::Primitive(Primitive::Int)
843                    | Any::Primitive(Primitive::Long)
844                    | Any::Primitive(Primitive::Decimal { .. })
845                    | Any::Primitive(Primitive::String)
846            ) {
847                return Err(Error::new(
848                    ErrorKind::IcebergDataInvalid,
849                    format!("transform truncate type {input_type:?} is invalid"),
850                ));
851            }
852            Ok(())
853        };
854        let check_hour = |input_type: &Any| {
855            if !matches!(
856                input_type,
857                Any::Primitive(Primitive::Timestamp) | Any::Primitive(Primitive::Timestampz)
858            ) {
859                return Err(Error::new(
860                    ErrorKind::IcebergDataInvalid,
861                    format!("transform hour type {input_type:?} is invalid"),
862                ));
863            }
864            Ok(())
865        };
866        match self {
867            Transform::Identity => Ok(input_type.clone()),
868            Transform::Void => Ok(Primitive::Int.into()),
869            Transform::Year => {
870                check_time(input_type)?;
871                Ok(Primitive::Int.into())
872            }
873            Transform::Month => {
874                check_time(input_type)?;
875                Ok(Primitive::Int.into())
876            }
877            Transform::Day => {
878                check_time(input_type)?;
879                Ok(Primitive::Int.into())
880            }
881            Transform::Hour => {
882                check_hour(input_type)?;
883                Ok(Primitive::Int.into())
884            }
885            Transform::Bucket(_) => {
886                check_bucket(input_type)?;
887                Ok(Primitive::Int.into())
888            }
889            Transform::Truncate(_) => {
890                check_truncate(input_type)?;
891                Ok(input_type.clone())
892            }
893        }
894    }
895}
896
897impl<'a> Display for &'a Transform {
898    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
899        match self {
900            Transform::Identity => write!(f, "identity"),
901            Transform::Year => write!(f, "year"),
902            Transform::Month => write!(f, "month"),
903            Transform::Day => write!(f, "day"),
904            Transform::Hour => write!(f, "hour"),
905            Transform::Void => write!(f, "void"),
906            Transform::Bucket(length) => write!(f, "bucket[{}]", length),
907            Transform::Truncate(width) => write!(f, "truncate[{}]", width),
908        }
909    }
910}
911
912impl FromStr for Transform {
913    type Err = Error;
914
915    fn from_str(s: &str) -> Result<Self> {
916        let t = match s {
917            "identity" => Transform::Identity,
918            "year" => Transform::Year,
919            "month" => Transform::Month,
920            "day" => Transform::Day,
921            "hour" => Transform::Hour,
922            "void" => Transform::Void,
923            v if v.starts_with("bucket") => {
924                let length = v
925                    .strip_prefix("bucket")
926                    .expect("transform must starts with `bucket`")
927                    .trim_start_matches('[')
928                    .trim_end_matches(']')
929                    .parse()
930                    .map_err(|err| {
931                        Error::new(
932                            ErrorKind::IcebergDataInvalid,
933                            format!("transform bucket type {v:?} is invalid"),
934                        )
935                        .set_source(err)
936                    })?;
937
938                Transform::Bucket(length)
939            }
940            v if v.starts_with("truncate") => {
941                let width = v
942                    .strip_prefix("truncate")
943                    .expect("transform must starts with `truncate`")
944                    .trim_start_matches('[')
945                    .trim_end_matches(']')
946                    .parse()
947                    .map_err(|err| {
948                        Error::new(
949                            ErrorKind::IcebergDataInvalid,
950                            format!("transform truncate type {v:?} is invalid"),
951                        )
952                        .set_source(err)
953                    })?;
954
955                Transform::Truncate(width)
956            }
957            v => {
958                return Err(Error::new(
959                    ErrorKind::IcebergDataInvalid,
960                    format!("transform {v:?} is invalid"),
961                ))
962            }
963        };
964
965        Ok(t)
966    }
967}
968
969/// Data files are stored in manifests with a tuple of partition values
970/// that are used in scans to filter out files that cannot contain records
971///  that match the scan’s filter predicate.
972///
973/// Partition values for a data file must be the same for all records stored
974/// in the data file. (Manifests store data files from any partition, as long
975/// as the partition spec is the same for the data files.)
976///
977/// Tables are configured with a partition spec that defines how to produce a tuple of partition values from a record.
978#[derive(Debug, PartialEq, Eq, Clone)]
979pub struct PartitionSpec {
980    /// The spec id.
981    pub spec_id: i32,
982    /// Partition fields.
983    pub fields: Vec<PartitionField>,
984}
985
986impl PartitionSpec {
987    pub(crate) fn partition_type(&self, schema: &Schema) -> Result<Struct> {
988        let mut fields = Vec::with_capacity(self.fields.len());
989        for partition_field in &self.fields {
990            let source_field = schema
991                .look_up_field_by_id(partition_field.source_column_id)
992                .ok_or_else(|| {
993                    Error::new(
994                        ErrorKind::IcebergDataInvalid,
995                        format!(
996                            "Can't find field id {} in schema",
997                            partition_field.source_column_id
998                        ),
999                    )
1000                })?;
1001            let result_type = partition_field
1002                .transform
1003                .result_type(&source_field.field_type)?;
1004            fields.push(
1005                Field::optional(
1006                    partition_field.partition_field_id,
1007                    partition_field.name.as_str(),
1008                    result_type,
1009                )
1010                .into(),
1011            );
1012        }
1013
1014        Ok(Struct::new(fields))
1015    }
1016
1017    pub fn column_ids(&self) -> Vec<i32> {
1018        self.fields
1019            .iter()
1020            .map(|field| field.source_column_id)
1021            .collect()
1022    }
1023
1024    /// Check if this partition spec is unpartitioned.
1025    pub fn is_unpartitioned(&self) -> bool {
1026        self.fields.is_empty()
1027    }
1028}
1029
1030/// Field of the specified partition spec.
1031#[derive(Debug, PartialEq, Eq, Clone)]
1032pub struct PartitionField {
1033    /// A source column id from the table’s schema
1034    pub source_column_id: i32,
1035    /// A partition field id that is used to identify a partition field
1036    /// and is unique within a partition spec.
1037    ///
1038    /// In v2 table metadata, it is unique across all partition specs.
1039    pub partition_field_id: i32,
1040    /// A transform that is applied to the source column to produce
1041    /// a partition value
1042    ///
1043    /// The source column, selected by id, must be a primitive type
1044    /// and cannot be contained in a map or list, but may be nested in
1045    /// a struct.
1046    pub transform: Transform,
1047    /// A partition name
1048    pub name: String,
1049}
1050
1051/// Users can sort their data within partitions by columns to gain
1052/// performance. The information on how the data is sorted can be declared
1053/// per data or delete file, by a sort order.
1054///
1055/// - Order id `0` is reserved for the unsorted order.
1056/// - Sorting floating-point numbers should produce the following behavior:
1057///   `-NaN` < `-Infinity` < `-value` < `-0` < `0` < `value` < `Infinity`
1058///   < `NaN`
1059#[derive(Debug, PartialEq, Eq, Clone)]
1060pub struct SortOrder {
1061    /// The sort order id of this SortOrder
1062    pub order_id: i32,
1063    /// The order of the sort fields within the list defines the order in
1064    /// which the sort is applied to the data
1065    pub fields: Vec<SortField>,
1066}
1067
1068/// Field of the specified sort order.
1069#[derive(Debug, PartialEq, Eq, Clone)]
1070pub struct SortField {
1071    /// A source column id from the table’s schema
1072    pub source_column_id: i32,
1073    /// A transform that is applied to the source column to produce
1074    /// a partition value
1075    ///
1076    /// The source column, selected by id, must be a primitive type
1077    /// and cannot be contained in a map or list, but may be nested in
1078    /// a struct.
1079    pub transform: Transform,
1080    /// sort direction, that can only be either `asc` or `desc`
1081    pub direction: SortDirection,
1082    /// A null order that describes the order of null values when sorted.
1083    /// Can only be either nulls-first or nulls-last
1084    pub null_order: NullOrder,
1085}
1086
1087/// sort direction, that can only be either `asc` or `desc`
1088#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1089pub enum SortDirection {
1090    /// Ascending order
1091    ASC,
1092    /// Descending order
1093    DESC,
1094}
1095
1096impl Display for SortDirection {
1097    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1098        match self {
1099            SortDirection::ASC => write!(f, "asc"),
1100            SortDirection::DESC => write!(f, "desc"),
1101        }
1102    }
1103}
1104
1105impl FromStr for SortDirection {
1106    type Err = Error;
1107
1108    fn from_str(s: &str) -> Result<Self> {
1109        match s {
1110            "asc" => Ok(SortDirection::ASC),
1111            "desc" => Ok(SortDirection::DESC),
1112            v => Err(Error::new(
1113                ErrorKind::IcebergDataInvalid,
1114                format!("sort direction {:?} is invalid", v),
1115            )),
1116        }
1117    }
1118}
1119
1120/// A null order that describes the order of null values when sorted.
1121/// Can only be either nulls-first or nulls-last
1122#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1123pub enum NullOrder {
1124    /// Nulls are sorted before non-null values
1125    First,
1126    /// Nulls are sorted after non-null values
1127    Last,
1128}
1129
1130impl Display for NullOrder {
1131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1132        match self {
1133            NullOrder::First => write!(f, "nulls-first"),
1134            NullOrder::Last => write!(f, "nulls-last"),
1135        }
1136    }
1137}
1138
1139impl FromStr for NullOrder {
1140    type Err = Error;
1141
1142    fn from_str(s: &str) -> Result<Self> {
1143        match s {
1144            "nulls-first" => Ok(NullOrder::First),
1145            "nulls-last" => Ok(NullOrder::Last),
1146            v => Err(Error::new(
1147                ErrorKind::IcebergDataInvalid,
1148                format!("null order {:?} is invalid", v),
1149            )),
1150        }
1151    }
1152}
1153
1154/// Snapshots are embedded in table metadata, but the list of manifests for a
1155/// snapshot are stored in a separate manifest list file.
1156///
1157/// A new manifest list is written for each attempt to commit a snapshot
1158/// because the list of manifests always changes to produce a new snapshot.
1159/// When a manifest list is written, the (optimistic) sequence number of the
1160/// snapshot is written for all new manifest files tracked by the list.
1161///
1162/// A manifest list includes summary metadata that can be used to avoid
1163/// scanning all of the manifests in a snapshot when planning a table scan.
1164/// This includes the number of added, existing, and deleted files, and a
1165/// summary of values for each field of the partition spec used to write the
1166/// manifest.
1167#[derive(Debug, PartialEq, Clone)]
1168pub struct ManifestList {
1169    /// Entries in a manifest list.
1170    pub entries: Vec<ManifestListEntry>,
1171}
1172
1173impl ManifestList {
1174    pub(crate) fn v2_schema() -> Schema {
1175        Schema::new(
1176            1,
1177            None,
1178            Struct::new(vec![
1179                manifest_list::MANIFEST_PATH.clone().into(),
1180                manifest_list::MANIFEST_LENGTH.clone().into(),
1181                manifest_list::PARTITION_SPEC_ID.clone().into(),
1182                manifest_list::CONTENT.clone().into(),
1183                manifest_list::SEQUENCE_NUMBER.clone().into(),
1184                manifest_list::MIN_SEQUENCE_NUMBER.clone().into(),
1185                manifest_list::ADDED_SNAPSHOT_ID.clone().into(),
1186                manifest_list::ADDED_FILES_COUNT.clone().into(),
1187                manifest_list::EXISTING_FILES_COUNT.clone().into(),
1188                manifest_list::DELETED_FILES_COUNT.clone().into(),
1189                manifest_list::ADDED_ROWS_COUNT.clone().into(),
1190                manifest_list::EXISTING_ROWS_COUNT.clone().into(),
1191                manifest_list::DELETED_ROWS_COUNT.clone().into(),
1192                manifest_list::PARTITIONS.clone().into(),
1193                manifest_list::KEY_METADATA.clone().into(),
1194            ]),
1195        )
1196    }
1197}
1198
1199/// Entry in a manifest list.
1200#[derive(Debug, PartialEq, Clone)]
1201pub struct ManifestListEntry {
1202    /// field: 500
1203    ///
1204    /// Location of the manifest file
1205    pub manifest_path: String,
1206    /// field: 501
1207    ///
1208    /// Length of the manifest file in bytes
1209    pub manifest_length: i64,
1210    /// field: 502
1211    ///
1212    /// ID of a partition spec used to write the manifest; must be listed
1213    /// in table metadata partition-specs
1214    pub partition_spec_id: i32,
1215    /// field: 517
1216    ///
1217    /// The type of files tracked by the manifest, either data or delete
1218    /// files; 0 for all v1 manifests
1219    pub content: ManifestContentType,
1220    /// field: 515
1221    ///
1222    /// The sequence number when the manifest was added to the table; use 0
1223    /// when reading v1 manifest lists
1224    pub sequence_number: i64,
1225    /// field: 516
1226    ///
1227    /// The minimum data sequence number of all live data or delete files in
1228    /// the manifest; use 0 when reading v1 manifest lists
1229    pub min_sequence_number: i64,
1230    /// field: 503
1231    ///
1232    /// ID of the snapshot where the manifest file was added
1233    pub added_snapshot_id: i64,
1234    /// field: 504
1235    ///
1236    /// Number of entries in the manifest that have status ADDED, when null
1237    /// this is assumed to be non-zero
1238    pub added_data_files_count: i32,
1239    /// field: 505
1240    ///
1241    /// Number of entries in the manifest that have status EXISTING (0),
1242    /// when null this is assumed to be non-zero
1243    pub existing_data_files_count: i32,
1244    /// field: 506
1245    ///
1246    /// Number of entries in the manifest that have status DELETED (2),
1247    /// when null this is assumed to be non-zero
1248    pub deleted_data_files_count: i32,
1249    /// field: 512
1250    ///
1251    /// Number of rows in all of files in the manifest that have status
1252    /// ADDED, when null this is assumed to be non-zero
1253    pub added_rows_count: i64,
1254    /// field: 513
1255    ///
1256    /// Number of rows in all of files in the manifest that have status
1257    /// EXISTING, when null this is assumed to be non-zero
1258    pub existing_rows_count: i64,
1259    /// field: 514
1260    ///
1261    /// Number of rows in all of files in the manifest that have status
1262    /// DELETED, when null this is assumed to be non-zero
1263    pub deleted_rows_count: i64,
1264    /// field: 507
1265    /// element_field: 508
1266    ///
1267    /// A list of field summaries for each partition field in the spec. Each
1268    /// field in the list corresponds to a field in the manifest file’s
1269    /// partition spec.
1270    pub partitions: Option<Vec<FieldSummary>>,
1271    /// field: 519
1272    ///
1273    /// Implementation-specific key metadata for encryption
1274    pub key_metadata: Option<Vec<u8>>,
1275}
1276
1277mod manifest_list {
1278    use super::*;
1279    use once_cell::sync::Lazy;
1280    pub static MANIFEST_PATH: Lazy<Field> =
1281        Lazy::new(|| Field::required(500, "manifest_path", Any::Primitive(Primitive::String)));
1282    pub static MANIFEST_LENGTH: Lazy<Field> =
1283        Lazy::new(|| Field::required(501, "manifest_length", Any::Primitive(Primitive::Long)));
1284    pub static PARTITION_SPEC_ID: Lazy<Field> =
1285        Lazy::new(|| Field::required(502, "partition_spec_id", Any::Primitive(Primitive::Int)));
1286    pub static CONTENT: Lazy<Field> =
1287        Lazy::new(|| Field::required(517, "content", Any::Primitive(Primitive::Int)));
1288    pub static SEQUENCE_NUMBER: Lazy<Field> =
1289        Lazy::new(|| Field::required(515, "sequence_number", Any::Primitive(Primitive::Long)));
1290    pub static MIN_SEQUENCE_NUMBER: Lazy<Field> =
1291        Lazy::new(|| Field::required(516, "min_sequence_number", Any::Primitive(Primitive::Long)));
1292    pub static ADDED_SNAPSHOT_ID: Lazy<Field> =
1293        Lazy::new(|| Field::required(503, "added_snapshot_id", Any::Primitive(Primitive::Long)));
1294    pub static ADDED_FILES_COUNT: Lazy<Field> = Lazy::new(|| {
1295        Field::required(
1296            504,
1297            "added_data_files_count",
1298            Any::Primitive(Primitive::Int),
1299        )
1300    });
1301    pub static EXISTING_FILES_COUNT: Lazy<Field> = Lazy::new(|| {
1302        Field::required(
1303            505,
1304            "existing_data_files_count",
1305            Any::Primitive(Primitive::Int),
1306        )
1307    });
1308    pub static DELETED_FILES_COUNT: Lazy<Field> = Lazy::new(|| {
1309        Field::required(
1310            506,
1311            "deleted_data_files_count",
1312            Any::Primitive(Primitive::Int),
1313        )
1314    });
1315    pub static ADDED_ROWS_COUNT: Lazy<Field> =
1316        Lazy::new(|| Field::required(512, "added_rows_count", Any::Primitive(Primitive::Long)));
1317    pub static EXISTING_ROWS_COUNT: Lazy<Field> =
1318        Lazy::new(|| Field::required(513, "existing_rows_count", Any::Primitive(Primitive::Long)));
1319    pub static DELETED_ROWS_COUNT: Lazy<Field> =
1320        Lazy::new(|| Field::required(514, "deleted_rows_count", Any::Primitive(Primitive::Long)));
1321    pub static PARTITIONS: Lazy<Field> = Lazy::new(|| {
1322        Field::optional(
1323            507,
1324            "partitions",
1325            Any::List(List {
1326                element_id: 508,
1327                element_required: true,
1328                element_type: Box::new(Any::Struct(
1329                    Struct::new(vec![
1330                        Field::required(509, "contains_null", Any::Primitive(Primitive::Boolean))
1331                            .into(),
1332                        Field::optional(518, "contains_nan", Any::Primitive(Primitive::Boolean))
1333                            .into(),
1334                        Field::optional(510, "lower_bound", Any::Primitive(Primitive::Binary))
1335                            .into(),
1336                        Field::optional(511, "upper_bound", Any::Primitive(Primitive::Binary))
1337                            .into(),
1338                    ])
1339                    .into(),
1340                )),
1341            }),
1342        )
1343    });
1344    pub static KEY_METADATA: Lazy<Field> =
1345        Lazy::new(|| Field::optional(519, "key_metadata", Any::Primitive(Primitive::Binary)));
1346}
1347
1348/// Field summary for partition field in the spec.
1349///
1350/// Each field in the list corresponds to a field in the manifest file’s partition spec.
1351///
1352/// TODO: add lower_bound and upper_bound support
1353#[derive(Debug, PartialEq, Eq, Clone)]
1354pub struct FieldSummary {
1355    /// field: 509
1356    ///
1357    /// Whether the manifest contains at least one partition with a null
1358    /// value for the field
1359    pub contains_null: bool,
1360    /// field: 518
1361    /// Whether the manifest contains at least one partition with a NaN
1362    /// value for the field
1363    pub contains_nan: Option<bool>,
1364    /// field: 510
1365    /// The minimum value for the field in the manifests
1366    /// partitions.
1367    pub lower_bound: Option<Vec<u8>>,
1368    /// field: 511
1369    /// The maximum value for the field in the manifests
1370    /// partitions.
1371    pub upper_bound: Option<Vec<u8>>,
1372}
1373
1374/// A manifest is an immutable Avro file that lists data files or delete
1375/// files, along with each file’s partition data tuple, metrics, and tracking
1376/// information.
1377#[derive(Debug, PartialEq, Clone)]
1378pub struct ManifestEntry {
1379    /// field: 0
1380    ///
1381    /// Used to track additions and deletions.
1382    pub status: ManifestStatus,
1383    /// field id: 1
1384    ///
1385    /// Snapshot id where the file was added, or deleted if status is 2.
1386    /// Inherited when null.
1387    pub snapshot_id: Option<i64>,
1388    /// field id: 3
1389    ///
1390    /// Data sequence number of the file.
1391    /// Inherited when null and status is 1 (added).
1392    pub sequence_number: Option<i64>,
1393    /// field id: 4
1394    ///
1395    /// File sequence number indicating when the file was added.
1396    /// Inherited when null and status is 1 (added).
1397    pub file_sequence_number: Option<i64>,
1398    /// field id: 2
1399    ///
1400    /// File path, partition tuple, metrics, …
1401    pub data_file: DataFile,
1402}
1403
1404impl ManifestEntry {
1405    /// Check if this manifest entry is deleted.
1406    pub fn is_alive(&self) -> bool {
1407        matches!(
1408            self.status,
1409            ManifestStatus::Added | ManifestStatus::Existing
1410        )
1411    }
1412}
1413
1414mod manifest_file {
1415    use super::*;
1416    use once_cell::sync::Lazy;
1417    pub static STATUS: Lazy<Field> =
1418        Lazy::new(|| Field::required(0, "status", Any::Primitive(Primitive::Int)));
1419    pub static SNAPSHOT_ID: Lazy<Field> =
1420        Lazy::new(|| Field::optional(1, "snapshot_id", Any::Primitive(Primitive::Long)));
1421    pub static SEQUENCE_NUMBER: Lazy<Field> =
1422        Lazy::new(|| Field::optional(3, "sequence_number", Any::Primitive(Primitive::Long)));
1423    pub static FILE_SEQUENCE_NUMBER: Lazy<Field> =
1424        Lazy::new(|| Field::optional(4, "file_sequence_number", Any::Primitive(Primitive::Long)));
1425
1426    pub const DATA_FILE_ID: i32 = 2;
1427    pub const DATA_FILE_NAME: &str = "data_file";
1428}
1429
1430/// FIXME: partition_spec is not parsed.
1431#[derive(Debug, PartialEq, Clone)]
1432pub struct ManifestMetadata {
1433    /// The table schema at the time the manifest
1434    /// was written
1435    pub schema: Schema,
1436    /// ID of the schema used to write the manifest as a string
1437    pub schema_id: i32,
1438
1439    /// The partition spec used  to write the manifest
1440    pub partition_spec: PartitionSpec,
1441
1442    /// Table format version number of the manifest as a string
1443    pub format_version: Option<TableFormatVersion>,
1444    /// Type of content files tracked by the manifest: “data” or “deletes”
1445    pub content: ManifestContentType,
1446}
1447
1448/// A manifest contains metadata and a list of entries.
1449#[derive(Debug, PartialEq, Clone)]
1450pub struct ManifestFile {
1451    /// Metadata of a manifest file.
1452    pub metadata: ManifestMetadata,
1453    /// Entries in manifest file.
1454    pub entries: Vec<ManifestEntry>,
1455}
1456
1457impl ManifestFile {
1458    pub(crate) fn v2_schema(partition_type: Struct) -> Schema {
1459        Schema::new(
1460            0,
1461            None,
1462            Struct::new(vec![
1463                manifest_file::STATUS.clone().into(),
1464                manifest_file::SNAPSHOT_ID.clone().into(),
1465                manifest_file::SEQUENCE_NUMBER.clone().into(),
1466                manifest_file::FILE_SEQUENCE_NUMBER.clone().into(),
1467                Field::required(
1468                    manifest_file::DATA_FILE_ID,
1469                    manifest_file::DATA_FILE_NAME,
1470                    Any::Struct(
1471                        Struct::new(vec![
1472                            datafile::CONTENT.clone().with_required().into(),
1473                            datafile::FILE_PATH.clone().into(),
1474                            datafile::FILE_FORMAT.clone().into(),
1475                            DataFile::partition_field(partition_type).into(),
1476                            datafile::RECORD_COUNT.clone().into(),
1477                            datafile::FILE_SIZE.clone().into(),
1478                            datafile::COLUMN_SIZES.clone().into(),
1479                            datafile::VALUE_COUNTS.clone().into(),
1480                            datafile::NULL_VALUE_COUNTS.clone().into(),
1481                            datafile::NAN_VALUE_COUNTS.clone().into(),
1482                            datafile::LOWER_BOUNDS.clone().into(),
1483                            datafile::UPPER_BOUNDS.clone().into(),
1484                            datafile::KEY_METADATA.clone().into(),
1485                            datafile::SPLIT_OFFSETS.clone().into(),
1486                            datafile::EQUALITY_IDS.clone().into(),
1487                            datafile::SORT_ORDER_ID.clone().into(),
1488                        ])
1489                        .into(),
1490                    ),
1491                )
1492                .into(),
1493            ]),
1494        )
1495    }
1496}
1497
1498/// Type of content files tracked by the manifest
1499#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1500pub enum ManifestContentType {
1501    /// The manifest content is data.
1502    Data = 0,
1503    /// The manifest content is deletes.
1504    Deletes = 1,
1505}
1506
1507impl Display for ManifestContentType {
1508    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1509        match self {
1510            ManifestContentType::Data => write!(f, "data"),
1511            ManifestContentType::Deletes => write!(f, "deletes"),
1512        }
1513    }
1514}
1515
1516impl FromStr for ManifestContentType {
1517    type Err = Error;
1518
1519    fn from_str(s: &str) -> Result<Self> {
1520        match s {
1521            "data" => Ok(ManifestContentType::Data),
1522            "deletes" => Ok(ManifestContentType::Deletes),
1523            _ => Err(Error::new(
1524                ErrorKind::IcebergDataInvalid,
1525                format!("Invalid manifest content type: {s}"),
1526            )),
1527        }
1528    }
1529}
1530
1531impl TryFrom<u8> for ManifestContentType {
1532    type Error = Error;
1533
1534    fn try_from(v: u8) -> Result<ManifestContentType> {
1535        match v {
1536            0 => Ok(ManifestContentType::Data),
1537            1 => Ok(ManifestContentType::Deletes),
1538            _ => Err(Error::new(
1539                ErrorKind::IcebergDataInvalid,
1540                format!("manifest content type {} is invalid", v),
1541            )),
1542        }
1543    }
1544}
1545
1546/// Used to track additions and deletions in ManifestEntry.
1547#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1548pub enum ManifestStatus {
1549    /// Value: 0
1550    Existing = 0,
1551    /// Value: 1
1552    Added = 1,
1553    /// Value: 2
1554    ///
1555    /// Deletes are informational only and not used in scans.
1556    Deleted = 2,
1557}
1558
1559impl TryFrom<u8> for ManifestStatus {
1560    type Error = Error;
1561
1562    fn try_from(v: u8) -> Result<ManifestStatus> {
1563        match v {
1564            0 => Ok(ManifestStatus::Existing),
1565            1 => Ok(ManifestStatus::Added),
1566            2 => Ok(ManifestStatus::Deleted),
1567            _ => Err(Error::new(
1568                ErrorKind::IcebergDataInvalid,
1569                format!("manifest status {} is invalid", v),
1570            )),
1571        }
1572    }
1573}
1574
1575/// This type used to build DataFile.
1576pub struct DataFileBuilder {
1577    meta_data: FileMetaData,
1578    file_location: String,
1579    written_size: u64,
1580    table_location: String,
1581    content: Option<DataContentType>,
1582    partition_value: Option<StructValue>,
1583    equality_ids: Option<Vec<i32>>,
1584}
1585
1586impl DataFileBuilder {
1587    /// Create a new `DataFileBuilder`.
1588    pub fn new(
1589        meta_data: FileMetaData,
1590        table_location: String,
1591        file_location: String,
1592        written_size: u64,
1593    ) -> Self {
1594        Self {
1595            meta_data,
1596            file_location,
1597            written_size,
1598            table_location,
1599            content: None,
1600            partition_value: None,
1601            equality_ids: None,
1602        }
1603    }
1604
1605    /// Set the content type of this data file.
1606    /// This function must be call before build.
1607    pub fn with_content(self, content: DataContentType) -> Self {
1608        Self {
1609            content: Some(content),
1610            ..self
1611        }
1612    }
1613
1614    /// Set the partition value of this data file.
1615    pub fn with_partition_value(self, value: Option<StructValue>) -> Self {
1616        Self {
1617            partition_value: value,
1618            ..self
1619        }
1620    }
1621
1622    /// Set the equality ids of this data file.
1623    pub fn with_equality_ids(self, ids: Vec<i32>) -> Self {
1624        Self {
1625            equality_ids: Some(ids),
1626            ..self
1627        }
1628    }
1629
1630    /// Build the `DataFile`.
1631    pub fn build(self) -> DataFile {
1632        log::info!("{:?}", self.meta_data);
1633        let (column_sizes, value_counts, null_value_counts, distinct_counts) = {
1634            // how to decide column id
1635            let mut per_col_size: HashMap<i32, _> = HashMap::new();
1636            let mut per_col_val_num: HashMap<i32, _> = HashMap::new();
1637            let mut per_col_null_val_num: HashMap<i32, _> = HashMap::new();
1638            let mut per_col_distinct_val_num: HashMap<i32, _> = HashMap::new();
1639            self.meta_data.row_groups.iter().for_each(|group| {
1640                group
1641                    .columns
1642                    .iter()
1643                    .enumerate()
1644                    .for_each(|(column_id, column_chunk)| {
1645                        if let Some(column_chunk_metadata) = &column_chunk.meta_data {
1646                            *per_col_size.entry(column_id as i32).or_insert(0) +=
1647                                column_chunk_metadata.total_compressed_size;
1648                            *per_col_val_num.entry(column_id as i32).or_insert(0) +=
1649                                column_chunk_metadata.num_values;
1650                            *per_col_null_val_num
1651                                .entry(column_id as i32)
1652                                .or_insert(0_i64) += column_chunk_metadata
1653                                .statistics
1654                                .as_ref()
1655                                .map(|s| s.null_count)
1656                                .unwrap_or(None)
1657                                .unwrap_or(0);
1658                            *per_col_distinct_val_num
1659                                .entry(column_id as i32)
1660                                .or_insert(0_i64) += column_chunk_metadata
1661                                .statistics
1662                                .as_ref()
1663                                .map(|s| s.distinct_count)
1664                                .unwrap_or(None)
1665                                .unwrap_or(0);
1666                        }
1667                    })
1668            });
1669            (
1670                per_col_size,
1671                per_col_val_num,
1672                per_col_null_val_num,
1673                per_col_distinct_val_num,
1674            )
1675        };
1676
1677        // equality_ids is required when content is EqualityDeletes.
1678        if self.content.unwrap() == DataContentType::EqualityDeletes {
1679            assert!(self.equality_ids.is_some());
1680        }
1681
1682        DataFile {
1683            content: self.content.unwrap(),
1684            file_path: format!("{}/{}", self.table_location, self.file_location),
1685            file_format: crate::types::DataFileFormat::Parquet,
1686            // # NOTE
1687            // DataFileWriter only response to write data. Partition should place by more high level writer.
1688            partition: self.partition_value.unwrap_or_default(),
1689            record_count: self.meta_data.num_rows,
1690            column_sizes: Some(column_sizes),
1691            value_counts: Some(value_counts),
1692            null_value_counts: Some(null_value_counts),
1693            distinct_counts: Some(distinct_counts),
1694            key_metadata: self.meta_data.footer_signing_key_metadata,
1695            file_size_in_bytes: self.written_size as i64,
1696            // # TODO
1697            //
1698            // Following fields unsupported now:
1699            // - `file_size_in_bytes` can't get from `FileMetaData` now.
1700            // - `file_offset` in `FileMetaData` always be None now.
1701            // - `nan_value_counts` can't get from `FileMetaData` now.
1702            // Currently arrow parquet writer doesn't fill row group offsets, we can use first column chunk offset for it.
1703            split_offsets: Some(
1704                self.meta_data
1705                    .row_groups
1706                    .iter()
1707                    .filter_map(|group| group.file_offset)
1708                    .collect(),
1709            ),
1710            nan_value_counts: None,
1711            lower_bounds: None,
1712            upper_bounds: None,
1713            equality_ids: self.equality_ids,
1714            sort_order_id: None,
1715        }
1716    }
1717}
1718
1719/// Data file carries data file path, partition tuple, metrics, …
1720#[derive(Debug, PartialEq, Clone, Builder)]
1721#[builder(name = "DataFileBuilderV2", setter(prefix = "with"))]
1722pub struct DataFile {
1723    /// field id: 134
1724    ///
1725    /// Type of content stored by the data file: data, equality deletes,
1726    /// or position deletes (all v1 files are data files)
1727    pub content: DataContentType,
1728    /// field id: 100
1729    ///
1730    /// Full URI for the file with FS scheme
1731    pub file_path: String,
1732    /// field id: 101
1733    ///
1734    /// String file format name, avro, orc or parquet
1735    pub file_format: DataFileFormat,
1736    /// field id: 102
1737    ///
1738    /// Partition data tuple, schema based on the partition spec output using
1739    /// partition field ids for the struct field ids
1740    pub partition: StructValue,
1741    /// field id: 103
1742    ///
1743    /// Number of records in this file
1744    pub record_count: i64,
1745    /// field id: 104
1746    ///
1747    /// Total file size in bytes
1748    pub file_size_in_bytes: i64,
1749    /// field id: 108
1750    /// key field id: 117
1751    /// value field id: 118
1752    ///
1753    /// Map from column id to the total size on disk of all regions that
1754    /// store the column. Does not include bytes necessary to read other
1755    /// columns, like footers. Leave null for row-oriented formats (Avro)
1756    #[builder(setter(strip_option), default)]
1757    pub column_sizes: Option<HashMap<i32, i64>>,
1758    /// field id: 109
1759    /// key field id: 119
1760    /// value field id: 120
1761    ///
1762    /// Map from column id to number of values in the column (including null
1763    /// and NaN values)
1764    #[builder(setter(strip_option), default)]
1765    pub value_counts: Option<HashMap<i32, i64>>,
1766    /// field id: 110
1767    /// key field id: 121
1768    /// value field id: 122
1769    ///
1770    /// Map from column id to number of null values in the column
1771    #[builder(setter(strip_option), default)]
1772    pub null_value_counts: Option<HashMap<i32, i64>>,
1773    /// field id: 137
1774    /// key field id: 138
1775    /// value field id: 139
1776    ///
1777    /// Map from column id to number of NaN values in the column
1778    #[builder(setter(strip_option), default)]
1779    pub nan_value_counts: Option<HashMap<i32, i64>>,
1780    /// field id: 111
1781    /// key field id: 123
1782    /// value field id: 124
1783    ///
1784    /// Map from column id to number of distinct values in the column;
1785    /// distinct counts must be derived using values in the file by counting
1786    /// or using sketches, but not using methods like merging existing
1787    /// distinct counts
1788    #[builder(setter(strip_option), default)]
1789    pub distinct_counts: Option<HashMap<i32, i64>>,
1790    /// field id: 125
1791    /// key field id: 126
1792    /// value field id: 127
1793    ///
1794    /// Map from column id to lower bound in the column serialized as binary.
1795    /// Each value must be less than or equal to all non-null, non-NaN values
1796    /// in the column for the file.
1797    ///
1798    /// Reference:
1799    ///
1800    /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
1801    #[builder(setter(strip_option), default)]
1802    pub lower_bounds: Option<HashMap<i32, Vec<u8>>>,
1803    /// field id: 128
1804    /// key field id: 129
1805    /// value field id: 130
1806    ///
1807    /// Map from column id to upper bound in the column serialized as binary.
1808    /// Each value must be greater than or equal to all non-null, non-Nan
1809    /// values in the column for the file.
1810    ///
1811    /// Reference:
1812    ///
1813    /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
1814    #[builder(setter(strip_option), default)]
1815    pub upper_bounds: Option<HashMap<i32, Vec<u8>>>,
1816    /// field id: 131
1817    ///
1818    /// Implementation-specific key metadata for encryption
1819    pub key_metadata: Option<Vec<u8>>,
1820    /// field id: 132
1821    /// element field id: 133
1822    ///
1823    /// Split offsets for the data file. For example, all row group offsets
1824    /// in a Parquet file. Must be sorted ascending
1825    #[builder(setter(strip_option), default)]
1826    pub split_offsets: Option<Vec<i64>>,
1827    /// field id: 135
1828    /// element field id: 136
1829    ///
1830    /// Field ids used to determine row equality in equality delete files.
1831    /// Required when content is EqualityDeletes and should be null
1832    /// otherwise. Fields with ids listed in this column must be present
1833    /// in the delete file
1834    #[builder(setter(strip_option), default)]
1835    pub equality_ids: Option<Vec<i32>>,
1836    /// field id: 140
1837    ///
1838    /// ID representing sort order for this file.
1839    ///
1840    /// If sort order ID is missing or unknown, then the order is assumed to
1841    /// be unsorted. Only data files and equality delete files should be
1842    /// written with a non-null order id. Position deletes are required to be
1843    /// sorted by file and position, not a table order, and should set sort
1844    /// order id to null. Readers must ignore sort order id for position
1845    /// delete files.
1846    #[builder(setter(strip_option), default)]
1847    pub sort_order_id: Option<i32>,
1848}
1849
1850mod datafile {
1851    use super::*;
1852    use once_cell::sync::Lazy;
1853    pub static CONTENT: Lazy<Field> = Lazy::new(|| {
1854        Field::optional(134, "content", Any::Primitive(Primitive::Int))
1855            .with_comment("Contents of the file: 0=data, 1=position deletes, 2=equality deletes")
1856    });
1857    pub static FILE_PATH: Lazy<Field> = Lazy::new(|| {
1858        Field::required(100, "file_path", Any::Primitive(Primitive::String))
1859            .with_comment("Location URI with FS scheme")
1860    });
1861    pub static FILE_FORMAT: Lazy<Field> = Lazy::new(|| {
1862        Field::required(101, "file_format", Any::Primitive(Primitive::String))
1863            .with_comment("File format name: avro, orc, or parquet")
1864    });
1865    pub static RECORD_COUNT: Lazy<Field> = Lazy::new(|| {
1866        Field::required(103, "record_count", Any::Primitive(Primitive::Long))
1867            .with_comment("Number of records in the file")
1868    });
1869    pub static FILE_SIZE: Lazy<Field> = Lazy::new(|| {
1870        Field::required(104, "file_size_in_bytes", Any::Primitive(Primitive::Long))
1871            .with_comment("Total file size in bytes")
1872    });
1873    pub static COLUMN_SIZES: Lazy<Field> = Lazy::new(|| {
1874        Field::optional(
1875            108,
1876            "column_sizes",
1877            Any::Map(Map {
1878                key_id: 117,
1879                key_type: Box::new(Any::Primitive(Primitive::Int)),
1880                value_id: 118,
1881                value_type: Box::new(Any::Primitive(Primitive::Long)),
1882                value_required: true,
1883            }),
1884        )
1885        .with_comment("Map of column id to total size on disk")
1886    });
1887    pub static VALUE_COUNTS: Lazy<Field> = Lazy::new(|| {
1888        Field::optional(
1889            109,
1890            "value_counts",
1891            Any::Map(Map {
1892                key_id: 119,
1893                key_type: Box::new(Any::Primitive(Primitive::Int)),
1894                value_id: 120,
1895                value_type: Box::new(Any::Primitive(Primitive::Long)),
1896                value_required: true,
1897            }),
1898        )
1899        .with_comment("Map of column id to total count, including null and NaN")
1900    });
1901    pub static NULL_VALUE_COUNTS: Lazy<Field> = Lazy::new(|| {
1902        Field::optional(
1903            110,
1904            "null_value_counts",
1905            Any::Map(Map {
1906                key_id: 121,
1907                key_type: Box::new(Any::Primitive(Primitive::Int)),
1908                value_id: 122,
1909                value_type: Box::new(Any::Primitive(Primitive::Long)),
1910                value_required: true,
1911            }),
1912        )
1913        .with_comment("Map of column id to null value count")
1914    });
1915    pub static NAN_VALUE_COUNTS: Lazy<Field> = Lazy::new(|| {
1916        Field::optional(
1917            137,
1918            "nan_value_counts",
1919            Any::Map(Map {
1920                key_id: 138,
1921                key_type: Box::new(Any::Primitive(Primitive::Int)),
1922                value_id: 139,
1923                value_type: Box::new(Any::Primitive(Primitive::Long)),
1924                value_required: true,
1925            }),
1926        )
1927        .with_comment("Map of column id to number of NaN values in the column")
1928    });
1929    pub static LOWER_BOUNDS: Lazy<Field> = Lazy::new(|| {
1930        Field::optional(
1931            125,
1932            "lower_bounds",
1933            Any::Map(Map {
1934                key_id: 126,
1935                key_type: Box::new(Any::Primitive(Primitive::Int)),
1936                value_id: 127,
1937                value_type: Box::new(Any::Primitive(Primitive::Binary)),
1938                value_required: true,
1939            }),
1940        )
1941        .with_comment("Map of column id to lower bound")
1942    });
1943    pub static UPPER_BOUNDS: Lazy<Field> = Lazy::new(|| {
1944        Field::optional(
1945            128,
1946            "upper_bounds",
1947            Any::Map(Map {
1948                key_id: 129,
1949                key_type: Box::new(Any::Primitive(Primitive::Int)),
1950                value_id: 130,
1951                value_type: Box::new(Any::Primitive(Primitive::Binary)),
1952                value_required: true,
1953            }),
1954        )
1955        .with_comment("Map of column id to upper bound")
1956    });
1957    pub static KEY_METADATA: Lazy<Field> = Lazy::new(|| {
1958        Field::optional(131, "key_metadata", Any::Primitive(Primitive::Binary))
1959            .with_comment("Encryption key metadata blob")
1960    });
1961    pub static SPLIT_OFFSETS: Lazy<Field> = Lazy::new(|| {
1962        Field::optional(
1963            132,
1964            "split_offsets",
1965            Any::List(List {
1966                element_id: 133,
1967                element_required: true,
1968                element_type: Box::new(Any::Primitive(Primitive::Long)),
1969            }),
1970        )
1971        .with_comment("Splittable offsets")
1972    });
1973    pub static EQUALITY_IDS: Lazy<Field> = Lazy::new(|| {
1974        Field::optional(
1975            135,
1976            "equality_ids",
1977            Any::List(List {
1978                element_id: 136,
1979                element_required: true,
1980                element_type: Box::new(Any::Primitive(Primitive::Int)),
1981            }),
1982        )
1983        .with_comment("Equality comparison field IDs")
1984    });
1985    pub static SORT_ORDER_ID: Lazy<Field> = Lazy::new(|| {
1986        Field::optional(140, "sort_order_id", Any::Primitive(Primitive::Int))
1987            .with_comment("Sort order ID")
1988    });
1989}
1990
1991impl DataFile {
1992    pub(crate) fn partition_field(partition_type: Struct) -> Field {
1993        Field::required(102, "partition", Any::Struct(partition_type.into()))
1994            .with_comment("Partition data tuple, schema based on the partition spec")
1995    }
1996
1997    #[cfg(test)]
1998    pub(crate) fn new(
1999        content: DataContentType,
2000        file_path: impl Into<String>,
2001        file_format: DataFileFormat,
2002        record_count: i64,
2003        file_size_in_bytes: i64,
2004    ) -> Self {
2005        Self {
2006            content,
2007            file_path: file_path.into(),
2008            file_format,
2009            // // TODO: Should not use default partition here. Replace it after introduce deserialize of `StructValue`.
2010            partition: StructValue::default(),
2011            record_count,
2012            file_size_in_bytes,
2013            column_sizes: None,
2014            value_counts: None,
2015            null_value_counts: None,
2016            nan_value_counts: None,
2017            distinct_counts: None,
2018            lower_bounds: None,
2019            upper_bounds: None,
2020            key_metadata: None,
2021            split_offsets: None,
2022            equality_ids: None,
2023            sort_order_id: None,
2024        }
2025    }
2026}
2027
2028/// Type of content stored by the data file: data, equality deletes, or
2029/// position deletes (all v1 files are data files)
2030#[derive(Debug, PartialEq, Eq, Clone, Copy)]
2031pub enum DataContentType {
2032    /// value: 0
2033    Data = 0,
2034    /// value: 1
2035    PositionDeletes = 1,
2036    /// value: 2
2037    EqualityDeletes = 2,
2038}
2039
2040impl TryFrom<u8> for DataContentType {
2041    type Error = Error;
2042
2043    fn try_from(v: u8) -> Result<DataContentType> {
2044        match v {
2045            0 => Ok(DataContentType::Data),
2046            1 => Ok(DataContentType::PositionDeletes),
2047            2 => Ok(DataContentType::EqualityDeletes),
2048            _ => Err(Error::new(
2049                ErrorKind::IcebergDataInvalid,
2050                format!("data content type {} is invalid", v),
2051            )),
2052        }
2053    }
2054}
2055
2056/// Format of this data.
2057#[derive(Debug, PartialEq, Eq, Clone, Copy)]
2058pub enum DataFileFormat {
2059    /// Avro file format: <https://avro.apache.org/>
2060    Avro,
2061    /// Orc file format: <https://orc.apache.org/>
2062    Orc,
2063    /// Parquet file format: <https://parquet.apache.org/>
2064    Parquet,
2065}
2066
2067impl FromStr for DataFileFormat {
2068    type Err = Error;
2069
2070    fn from_str(s: &str) -> Result<Self> {
2071        match s.to_lowercase().as_str() {
2072            "avro" => Ok(Self::Avro),
2073            "orc" => Ok(Self::Orc),
2074            "parquet" => Ok(Self::Parquet),
2075            _ => Err(Error::new(
2076                ErrorKind::IcebergFeatureUnsupported,
2077                format!("Unsupported data file format: {}", s),
2078            )),
2079        }
2080    }
2081}
2082
2083impl Display for DataFileFormat {
2084    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2085        match self {
2086            DataFileFormat::Avro => f.write_str("avro"),
2087            DataFileFormat::Orc => f.write_str("orc"),
2088            DataFileFormat::Parquet => f.write_str("parquet"),
2089        }
2090    }
2091}
2092
2093/// Snapshot of contains all data of a table at a point in time.
2094#[derive(Debug, PartialEq, Eq, Clone, Default)]
2095pub struct Snapshot {
2096    /// A unique long ID
2097    pub snapshot_id: i64,
2098    /// The snapshot ID of the snapshot’s parent. Omitted for any snapshot
2099    /// with no parent
2100    pub parent_snapshot_id: Option<i64>,
2101    /// A monotonically increasing long that tracks the order of changes to a
2102    /// table
2103    pub sequence_number: i64,
2104    /// A timestamp when the snapshot was created, used for garbage
2105    /// collection and table inspection
2106    pub timestamp_ms: i64,
2107    /// The location of a manifest list for this snapshot that tracks
2108    /// manifest files with additional metadata
2109    pub manifest_list: String,
2110    /// A string map that summarizes the snapshot changes, including
2111    /// operation (see below)
2112    ///
2113    /// The snapshot summary’s operation field is used by some operations,
2114    /// like snapshot expiration, to skip processing certain snapshots.
2115    /// Possible operation values are:
2116    ///
2117    /// - append – Only data files were added and no files were removed.
2118    /// - replace – Data and delete files were added and removed without changing table data; i.e., compaction, changing the data file format, or relocating data files.
2119    /// - overwrite – Data and delete files were added and removed in a logical overwrite operation.
2120    /// - delete – Data files were removed and their contents logically deleted and/or delete files were added to delete rows.
2121    ///
2122    /// For example:
2123    ///
2124    /// ```json
2125    /// {
2126    ///   "operation" : "append",
2127    ///   "spark.app.id" : "local-1686911651377",
2128    ///   "added-data-files" : "3",
2129    ///   "added-records" : "3",
2130    ///   "added-files-size" : "1929",
2131    ///   "changed-partition-count" : "1",
2132    ///   "total-records" : "3",
2133    ///   "total-files-size" : "1929",
2134    ///   "total-data-files" : "3",
2135    ///   "total-delete-files" : "0",
2136    ///   "total-position-deletes" : "0",
2137    ///   "total-equality-deletes" : "0"
2138    /// }
2139    /// ```
2140    pub summary: HashMap<String, String>,
2141    /// ID of the table’s current schema when the snapshot was created
2142    pub schema_id: Option<i64>,
2143}
2144
2145impl Snapshot {
2146    pub(crate) async fn load_manifest_list(&self, op: &Operator) -> Result<ManifestList> {
2147        parse_manifest_list(
2148            &op.read(Table::relative_path(op, self.manifest_list.as_str())?.as_str())
2149                .await?
2150                .to_vec(),
2151        )
2152    }
2153
2154    pub(crate) fn log(&self) -> SnapshotLog {
2155        SnapshotLog {
2156            timestamp_ms: self.timestamp_ms,
2157            snapshot_id: self.snapshot_id,
2158        }
2159    }
2160}
2161
2162#[derive(Default)]
2163pub struct SnapshotSummaryBuilder {
2164    added_data_files: i64,
2165    added_delete_files: i64,
2166    added_equality_delete_files: i64,
2167    added_position_delete_files: i64,
2168
2169    added_data_records: i64,
2170    added_position_deletes_records: i64,
2171    added_equality_deletes_records: i64,
2172
2173    added_files_size: i64,
2174}
2175
2176impl SnapshotSummaryBuilder {
2177    const OPERATION: &'static str = "operation";
2178    const ADDED_DATA_FILES: &'static str = "added-data-files";
2179    const TOTAL_DATA_FILES: &'static str = "total-data-files";
2180    const ADDED_DELETE_FILES: &'static str = "added-delete-files";
2181    const ADDED_EQUALITY_DELETE_FILES: &'static str = "added-equality-delete-files";
2182    const ADDED_POSITION_DELETE_FILES: &'static str = "added-position-delete-files";
2183    const TOTAL_DELETE_FILES: &'static str = "total-delete-files";
2184    const ADDED_RECORDS: &'static str = "added-records";
2185    const TOTAL_RECORDS: &'static str = "total-records";
2186    const ADDED_POSITION_DELETES: &'static str = "added-position-deletes";
2187    const TOTAL_POSITION_DELETES: &'static str = "total-position-deletes";
2188    const ADDED_EQUALITY_DELETES: &'static str = "added-equality-deletes";
2189    const TOTAL_EQUALITY_DELETES: &'static str = "total-equality-deletes";
2190    const ADDED_FILES_SIZE: &'static str = "added-files-size";
2191    const TOTAL_FILES_SIZE: &'static str = "total-files-size";
2192
2193    pub fn new() -> Self {
2194        Default::default()
2195    }
2196
2197    pub fn add(&mut self, datafile: &DataFile) {
2198        match datafile.content {
2199            DataContentType::Data => {
2200                self.added_data_files += 1;
2201                self.added_data_records += datafile.record_count;
2202                self.added_files_size += datafile.file_size_in_bytes;
2203            }
2204            DataContentType::PositionDeletes => {
2205                self.added_delete_files += 1;
2206                self.added_position_delete_files += 1;
2207                self.added_position_deletes_records += datafile.record_count;
2208                self.added_files_size += datafile.file_size_in_bytes;
2209            }
2210            DataContentType::EqualityDeletes => {
2211                self.added_delete_files += 1;
2212                self.added_equality_delete_files += 1;
2213                self.added_equality_deletes_records += datafile.record_count;
2214                self.added_files_size += datafile.file_size_in_bytes;
2215            }
2216        }
2217    }
2218
2219    fn operation(&self) -> String {
2220        if self.added_delete_files == 0 && self.added_data_files != 0 {
2221            "append".to_string()
2222        } else if self.added_delete_files != 0 && self.added_data_files != 0 {
2223            "overwrite".to_string()
2224        } else if self.added_delete_files != 0 && self.added_data_files == 0 {
2225            "delete".to_string()
2226        } else {
2227            "append".to_string()
2228        }
2229    }
2230
2231    pub fn merge(
2232        self,
2233        last_summary: &HashMap<String, String>,
2234        is_compact_op: bool,
2235    ) -> Result<HashMap<String, String>> {
2236        let operation = if is_compact_op {
2237            "replace".to_string()
2238        } else {
2239            self.operation()
2240        };
2241
2242        #[inline]
2243        fn get_i64(value: &HashMap<String, String>, key: &str) -> std::result::Result<i64, Error> {
2244            Ok(value
2245                .get(key)
2246                .map(|val| val.parse::<i64>())
2247                .transpose()?
2248                .unwrap_or_default())
2249        }
2250
2251        let added_data_files = self.added_data_files;
2252        let total_data_files = {
2253            let total_data_files = get_i64(last_summary, Self::TOTAL_DATA_FILES)?;
2254            total_data_files + self.added_data_files
2255        };
2256        let added_delete_files = self.added_delete_files;
2257        let added_equality_delete_files = self.added_equality_delete_files;
2258        let added_position_delete_files = self.added_position_delete_files;
2259        let total_delete_files = {
2260            let total_delete_files = get_i64(last_summary, Self::TOTAL_DELETE_FILES)?;
2261            total_delete_files + self.added_delete_files
2262        };
2263        let added_records = self.added_data_records;
2264        let total_records = {
2265            let total_records = get_i64(last_summary, Self::TOTAL_RECORDS)?;
2266            total_records + self.added_data_records
2267        };
2268        let added_position_deletes = self.added_position_deletes_records;
2269        let total_position_deletes = {
2270            let total_position_deletes = get_i64(last_summary, Self::TOTAL_POSITION_DELETES)?;
2271            total_position_deletes + self.added_position_deletes_records
2272        };
2273        let added_equality_deletes = self.added_equality_deletes_records;
2274        let total_equality_deletes = {
2275            let total_equality_deletes = get_i64(last_summary, Self::TOTAL_EQUALITY_DELETES)?;
2276            total_equality_deletes + self.added_equality_deletes_records
2277        };
2278        let added_files_size = self.added_files_size;
2279        let total_files_size = {
2280            let total_files_size = get_i64(last_summary, Self::TOTAL_FILES_SIZE)?;
2281            total_files_size + self.added_files_size
2282        };
2283
2284        let mut m = HashMap::with_capacity(16);
2285        m.insert(Self::OPERATION.to_string(), operation);
2286        m.insert(
2287            Self::ADDED_DATA_FILES.to_string(),
2288            added_data_files.to_string(),
2289        );
2290        m.insert(
2291            Self::TOTAL_DATA_FILES.to_string(),
2292            total_data_files.to_string(),
2293        );
2294        m.insert(
2295            Self::ADDED_DELETE_FILES.to_string(),
2296            added_delete_files.to_string(),
2297        );
2298        m.insert(
2299            Self::ADDED_EQUALITY_DELETE_FILES.to_string(),
2300            added_equality_delete_files.to_string(),
2301        );
2302        m.insert(
2303            Self::ADDED_POSITION_DELETE_FILES.to_string(),
2304            added_position_delete_files.to_string(),
2305        );
2306        m.insert(
2307            Self::TOTAL_DELETE_FILES.to_string(),
2308            total_delete_files.to_string(),
2309        );
2310        m.insert(Self::ADDED_RECORDS.to_string(), added_records.to_string());
2311        m.insert(Self::TOTAL_RECORDS.to_string(), total_records.to_string());
2312        m.insert(
2313            Self::ADDED_POSITION_DELETES.to_string(),
2314            added_position_deletes.to_string(),
2315        );
2316        m.insert(
2317            Self::TOTAL_POSITION_DELETES.to_string(),
2318            total_position_deletes.to_string(),
2319        );
2320        m.insert(
2321            Self::ADDED_EQUALITY_DELETES.to_string(),
2322            added_equality_deletes.to_string(),
2323        );
2324        m.insert(
2325            Self::TOTAL_EQUALITY_DELETES.to_string(),
2326            total_equality_deletes.to_string(),
2327        );
2328        m.insert(
2329            Self::ADDED_FILES_SIZE.to_string(),
2330            added_files_size.to_string(),
2331        );
2332        m.insert(
2333            Self::TOTAL_FILES_SIZE.to_string(),
2334            total_files_size.to_string(),
2335        );
2336
2337        Ok(m)
2338    }
2339}
2340
2341/// timestamp and snapshot ID pairs that encodes changes to the current
2342/// snapshot for the table.
2343#[derive(Debug, PartialEq, Eq, Clone, Copy)]
2344pub struct SnapshotLog {
2345    /// The timestamp of this snapshot log.
2346    ///
2347    /// TODO: we should use `chrono::DateTime` instead of `i64`.
2348    pub timestamp_ms: i64,
2349    /// The snapshot ID of this snapshot log.
2350    pub snapshot_id: i64,
2351}
2352
2353/// Iceberg tables keep track of branches and tags using snapshot references.
2354///
2355/// Tags are labels for individual snapshots. Branches are mutable named
2356/// references that can be updated by committing a new snapshot as the
2357/// branch’s referenced snapshot using the Commit Conflict Resolution and
2358/// Retry procedures.
2359#[derive(Debug, PartialEq, Eq, Clone)]
2360pub struct SnapshotReference {
2361    /// A reference’s snapshot ID. The tagged snapshot or latest snapshot of
2362    /// a branch.
2363    pub snapshot_id: i64,
2364    /// Type of the reference, tag or branch
2365    pub typ: SnapshotReferenceType,
2366    /// For branch type only.
2367    ///
2368    /// A positive number for the minimum number of snapshots to keep in a
2369    /// branch while expiring snapshots.
2370    ///
2371    /// Defaults to table property `history.expire.min-snapshots-to-keep`.
2372    pub min_snapshots_to_keep: Option<i32>,
2373    /// For branch type only.
2374    ///
2375    /// A positive number for the max age of snapshots to keep when expiring,
2376    /// including the latest snapshot.
2377    ///
2378    /// Defaults to table property `history.expire.max-snapshot-age-ms`.
2379    pub max_snapshot_age_ms: Option<i64>,
2380    /// For snapshot references except the `main` branch.
2381    ///
2382    /// A positive number for the max age of the snapshot reference to keep
2383    /// while expiring snapshots.
2384    ///
2385    /// Defaults to table property `history.expire.max-ref-age-ms`
2386    ///
2387    /// The main branch never expires.
2388    pub max_ref_age_ms: Option<i64>,
2389}
2390
2391impl SnapshotReference {
2392    pub(crate) fn new(snapshot_id: i64, typ: SnapshotReferenceType) -> Self {
2393        Self {
2394            snapshot_id,
2395            typ,
2396            min_snapshots_to_keep: None,
2397            max_snapshot_age_ms: None,
2398            max_ref_age_ms: None,
2399        }
2400    }
2401}
2402
2403/// Type of the reference
2404#[derive(Debug, PartialEq, Eq, Clone, Copy)]
2405pub enum SnapshotReferenceType {
2406    /// Tag is used to reference to a specific tag.
2407    Tag,
2408    /// Branch is used to reference to a specific branch, like `master`.
2409    Branch,
2410}
2411
2412impl Display for SnapshotReferenceType {
2413    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2414        match self {
2415            SnapshotReferenceType::Tag => f.write_str("tag"),
2416            SnapshotReferenceType::Branch => f.write_str("branch"),
2417        }
2418    }
2419}
2420
2421impl FromStr for SnapshotReferenceType {
2422    type Err = Error;
2423
2424    fn from_str(s: &str) -> Result<Self> {
2425        match s {
2426            "tag" => Ok(SnapshotReferenceType::Tag),
2427            "branch" => Ok(SnapshotReferenceType::Branch),
2428            _ => Err(Error::new(
2429                ErrorKind::IcebergDataInvalid,
2430                format!("Invalid snapshot reference type: {s}"),
2431            )),
2432        }
2433    }
2434}
2435
2436/// timestamp and metadata file location pairs that encodes changes to the
2437/// previous metadata files for the table
2438#[derive(Debug, PartialEq, Eq, Clone)]
2439pub struct MetadataLog {
2440    /// Related timestamp for this metadata log.
2441    pub timestamp_ms: i64,
2442    /// The metadata file's location.
2443    pub metadata_file: String,
2444}
2445
2446/// Table metadata is stored as JSON. Each table metadata change creates a
2447/// new table metadata file that is committed by an atomic operation. This
2448/// operation is used to ensure that a new version of table metadata replaces
2449/// the version on which it was based. This produces a linear history of
2450/// table versions and ensures that concurrent writes are not lost.
2451///
2452/// TODO: statistics is not supported.
2453#[derive(Debug, PartialEq, Clone)]
2454pub struct TableMetadata {
2455    /// Currently, this can be 1 or 2 based on the spec. Implementations
2456    /// must throw an exception if a table’s version is higher than the
2457    /// supported version.
2458    pub format_version: TableFormatVersion,
2459    /// A UUID that identifies the table, generated when the table is
2460    /// created. Implementations must throw an exception if a table’s UUID
2461    /// does not match the expected UUID after refreshing metadata.
2462    pub table_uuid: String,
2463    /// The table’s base location.
2464    ///
2465    /// This is used by writers to determine where to store data files, manifest files, and table metadata files.
2466    pub location: String,
2467    /// The table’s highest assigned sequence number, a monotonically
2468    /// increasing long that tracks the order of snapshots in a table.
2469    pub last_sequence_number: i64,
2470    /// Timestamp in milliseconds from the unix epoch when the table was last
2471    /// updated. Each table metadata file should update this field just
2472    /// before writing.
2473    pub last_updated_ms: i64,
2474    /// The highest assigned column ID for the table.
2475    ///
2476    /// This is used to ensure columns are always assigned an unused ID when
2477    /// evolving schemas.
2478    pub last_column_id: i32,
2479    /// A list of schemas, stored as objects with schema-id.
2480    pub schemas: Vec<Schema>,
2481    /// ID of the table’s current schema.
2482    pub current_schema_id: i32,
2483    /// A list of partition specs, stored as full partition spec objects.
2484    pub partition_specs: Vec<PartitionSpec>,
2485    /// ID of the “current” spec that writers should use by default.
2486    pub default_spec_id: i32,
2487    /// the highest assigned partition field ID across all partition specs
2488    /// for the table. This is used to ensure partition fields are always
2489    /// assigned an unused ID when evolving specs.
2490    pub last_partition_id: i32,
2491    /// A string to string map of table properties.
2492    ///
2493    /// This is used to control settings that affect reading and writing and
2494    /// is not intended to be used for arbitrary metadata. For example,
2495    /// `commit.retry.num-retries` is used to control the number of commit
2496    /// retries.
2497    pub properties: Option<HashMap<String, String>>,
2498    /// ID of the current table snapshot; must be the same as the current ID
2499    /// of the main branch in refs.
2500    pub current_snapshot_id: Option<i64>,
2501    /// A list of valid snapshots.
2502    ///
2503    /// Valid snapshots are snapshots for which all data files exist in the
2504    /// file system. A data file must not be deleted from the file system
2505    /// until the last snapshot in which it was listed is garbage collected.
2506    pub snapshots: Option<Vec<Snapshot>>,
2507    /// A list (optional) of timestamp and metadata file location pairs that
2508    /// encodes changes to the previous metadata files for the table.
2509    ///
2510    /// Each time a new metadata file is created, a new entry of the previous
2511    /// metadata file location should be added to the list. Tables can be
2512    /// configured to remove oldest metadata log entries and keep a
2513    /// fixed-size log of the most recent entries after a commit.
2514    pub snapshot_log: Option<Vec<SnapshotLog>>,
2515    /// A list (optional) of timestamp and metadata file location pairs that
2516    /// encodes changes to the previous metadata files for the table.
2517    ///
2518    /// Each time a new metadata file is created, a new entry of the previous
2519    /// metadata file location should be added to the list. Tables can be
2520    /// configured to remove oldest metadata log entries and keep a
2521    /// fixed-size log of the most recent entries after a commit.
2522    pub metadata_log: Option<Vec<MetadataLog>>,
2523    /// A list of sort orders, stored as full sort order objects.
2524    pub sort_orders: Vec<SortOrder>,
2525    /// Default sort order id of the table.
2526    ///
2527    /// Note that this could be used by writers, but is not used when reading
2528    /// because reads use the specs stored in manifest files.
2529    pub default_sort_order_id: i32,
2530    /// A map of snapshot references.
2531    ///
2532    /// The map keys are the unique snapshot reference names in the table,
2533    /// and the map values are snapshot reference objects.
2534    ///
2535    /// There is always a main branch reference pointing to the
2536    /// `current-snapshot-id` even if the refs map is null.
2537    pub refs: HashMap<String, SnapshotReference>,
2538}
2539
2540impl TableMetadata {
2541    /// Current partition spec.
2542    pub fn current_partition_spec(&self) -> Result<&PartitionSpec> {
2543        self.partition_spec(self.default_spec_id).ok_or_else(|| {
2544            Error::new(
2545                ErrorKind::IcebergDataInvalid,
2546                format!("Partition spec id {} not found!", self.default_spec_id),
2547            )
2548        })
2549    }
2550
2551    /// Partition spec by id.
2552    pub fn partition_spec(&self, spec_id: i32) -> Option<&PartitionSpec> {
2553        self.partition_specs.iter().find(|p| p.spec_id == spec_id)
2554    }
2555
2556    /// Current schema.
2557    pub fn current_schema(&self) -> Result<&Schema> {
2558        self.schema(self.current_schema_id).ok_or_else(|| {
2559            Error::new(
2560                ErrorKind::IcebergDataInvalid,
2561                format!("Schema id {} not found!", self.current_schema_id),
2562            )
2563        })
2564    }
2565
2566    /// Get schema by id
2567    pub fn schema(&self, schema_id: i32) -> Option<&Schema> {
2568        self.schemas.iter().find(|s| s.schema_id == schema_id)
2569    }
2570
2571    /// Current schema.
2572    pub fn current_snapshot(&self) -> Result<Option<&Snapshot>> {
2573        if self.current_snapshot_id == Some(EMPTY_SNAPSHOT_ID) || self.current_snapshot_id.is_none()
2574        {
2575            return Ok(None);
2576        }
2577
2578        Ok(Some(
2579            self.snapshot(self.current_snapshot_id.unwrap())
2580                .ok_or_else(|| {
2581                    Error::new(
2582                        ErrorKind::IcebergDataInvalid,
2583                        format!(
2584                            "Snapshot id {} not found!",
2585                            self.current_snapshot_id.unwrap()
2586                        ),
2587                    )
2588                })?,
2589        ))
2590    }
2591
2592    pub fn snapshot(&self, snapshot_id: i64) -> Option<&Snapshot> {
2593        if let Some(snapshots) = &self.snapshots {
2594            snapshots.iter().find(|s| s.snapshot_id == snapshot_id)
2595        } else {
2596            None
2597        }
2598    }
2599
2600    /// Returns snapshot reference of branch
2601    pub fn snapshot_ref(&self, branch: &str) -> Option<&SnapshotReference> {
2602        self.refs.get(branch)
2603    }
2604
2605    /// Set snapshot reference for branch
2606    pub fn set_snapshot_ref(&mut self, branch: &str, snap_ref: SnapshotReference) -> Result<()> {
2607        let snapshot = self
2608            .snapshots
2609            .as_ref()
2610            .and_then(|s| s.iter().find(|s| s.snapshot_id == snap_ref.snapshot_id))
2611            .ok_or_else(|| {
2612                Error::new(
2613                    ErrorKind::IcebergDataInvalid,
2614                    format!("Snapshot id {} not found!", snap_ref.snapshot_id),
2615                )
2616            })?;
2617        self.refs
2618            .entry(branch.to_string())
2619            .and_modify(|s| {
2620                s.snapshot_id = snap_ref.snapshot_id;
2621                s.typ = snap_ref.typ;
2622
2623                if let Some(min_snapshots_to_keep) = snap_ref.min_snapshots_to_keep {
2624                    s.min_snapshots_to_keep = Some(min_snapshots_to_keep);
2625                }
2626
2627                if let Some(max_snapshot_age_ms) = snap_ref.max_snapshot_age_ms {
2628                    s.max_snapshot_age_ms = Some(max_snapshot_age_ms);
2629                }
2630
2631                if let Some(max_ref_age_ms) = snap_ref.max_ref_age_ms {
2632                    s.max_ref_age_ms = Some(max_ref_age_ms);
2633                }
2634            })
2635            .or_insert_with(|| {
2636                SnapshotReference::new(snap_ref.snapshot_id, SnapshotReferenceType::Branch)
2637            });
2638
2639        if branch == MAIN_BRANCH {
2640            self.current_snapshot_id = Some(snap_ref.snapshot_id);
2641            self.last_updated_ms = snapshot.timestamp_ms;
2642            self.last_sequence_number = snapshot.sequence_number;
2643            if let Some(snap_logs) = self.snapshot_log.as_mut() {
2644                snap_logs.push(snapshot.log());
2645            } else {
2646                self.snapshot_log = Some(vec![snapshot.log()]);
2647            }
2648        }
2649        Ok(())
2650    }
2651
2652    pub(crate) fn add_snapshot(&mut self, snapshot: Snapshot) -> Result<()> {
2653        if let Some(snapshots) = &mut self.snapshots {
2654            snapshots.push(snapshot);
2655        } else {
2656            self.snapshots = Some(vec![snapshot]);
2657        }
2658
2659        Ok(())
2660    }
2661}
2662
2663/// Table format version number.
2664#[derive(Debug, PartialEq, Eq, Clone, Copy)]
2665pub enum TableFormatVersion {
2666    /// The V1 Table Format Version.
2667    V1 = 1,
2668    /// The V2 Table Format Version.
2669    V2 = 2,
2670}
2671
2672impl TryFrom<u8> for TableFormatVersion {
2673    type Error = Error;
2674
2675    fn try_from(value: u8) -> Result<TableFormatVersion> {
2676        match value {
2677            1 => Ok(TableFormatVersion::V1),
2678            2 => Ok(TableFormatVersion::V2),
2679            _ => Err(Error::new(
2680                ErrorKind::IcebergDataInvalid,
2681                format!("Unknown table format: {value}"),
2682            )),
2683        }
2684    }
2685}
2686
2687impl Display for TableFormatVersion {
2688    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2689        match self {
2690            TableFormatVersion::V1 => f.write_str("1"),
2691            TableFormatVersion::V2 => f.write_str("2"),
2692        }
2693    }
2694}
2695
2696#[cfg(test)]
2697mod test {
2698    use apache_avro::{schema, types::Value};
2699    use std::collections::HashMap;
2700
2701    use crate::types::SnapshotSummaryBuilder;
2702    use crate::types::{Field, PrimitiveValue, Struct, StructValueBuilder};
2703
2704    use super::{AnyValue, DataFile, StructValue};
2705
2706    #[test]
2707    fn test_struct_to_avro() {
2708        let value = {
2709            let struct_value = {
2710                let struct_type = Struct::new(vec![
2711                    Field::optional(
2712                        1,
2713                        "a",
2714                        crate::types::Any::Primitive(crate::types::Primitive::Int),
2715                    )
2716                    .into(),
2717                    Field::required(
2718                        2,
2719                        "b",
2720                        crate::types::Any::Primitive(crate::types::Primitive::String),
2721                    )
2722                    .into(),
2723                ]);
2724                let mut builder = StructValueBuilder::new(struct_type.into());
2725                builder.add_field(1, None).unwrap();
2726                builder
2727                    .add_field(
2728                        2,
2729                        Some(AnyValue::Primitive(PrimitiveValue::String(
2730                            "hello".to_string(),
2731                        ))),
2732                    )
2733                    .unwrap();
2734                AnyValue::Struct(builder.build().unwrap())
2735            };
2736
2737            let mut res = apache_avro::to_value(struct_value).unwrap();
2738
2739            // Guarantee the order of fields order of field names for later compare.
2740            if let Value::Record(ref mut record) = res {
2741                record.sort_by(|a, b| a.0.cmp(&b.0));
2742            }
2743
2744            res
2745        };
2746
2747        let expect_value = {
2748            let raw_schema = r#"
2749                {
2750                    "type": "record",
2751                    "name": "test",
2752                    "fields": [
2753                        {"name": "a", "type": ["int","null"]},
2754                        {"name": "b", "type": "string"}
2755                    ]
2756                }
2757            "#;
2758
2759            let schema = schema::Schema::parse_str(raw_schema).unwrap();
2760
2761            let mut record = apache_avro::types::Record::new(&schema).unwrap();
2762            record.put("a", None::<String>);
2763            record.put("b", "hello");
2764
2765            record.into()
2766        };
2767
2768        assert_eq!(value, expect_value);
2769    }
2770
2771    #[test]
2772    fn test_struct_field_id_lookup() {
2773        let struct_type1 = Struct::new(vec![
2774            Field::optional(
2775                1,
2776                "a",
2777                crate::types::Any::Primitive(crate::types::Primitive::Int),
2778            )
2779            .into(),
2780            Field::required(
2781                2,
2782                "b",
2783                crate::types::Any::Primitive(crate::types::Primitive::String),
2784            )
2785            .into(),
2786        ]);
2787        let struct_type2 = Struct::new(vec![
2788            Field::required(3, "c", crate::types::Any::Struct(struct_type1.into())).into(),
2789            Field::required(
2790                4,
2791                "d",
2792                crate::types::Any::Primitive(crate::types::Primitive::Int),
2793            )
2794            .into(),
2795        ]);
2796        assert_eq!(struct_type2.lookup_field(1).unwrap().name, "a");
2797        assert_eq!(struct_type2.lookup_field(2).unwrap().name, "b");
2798        assert_eq!(struct_type2.lookup_field(3).unwrap().name, "c");
2799        assert_eq!(struct_type2.lookup_field(4).unwrap().name, "d");
2800    }
2801
2802    #[test]
2803    fn test_snapshot_summary() {
2804        let (data_file, pos_delete_file, eq_delete_file) = {
2805            let data_file = DataFile {
2806                content: super::DataContentType::Data,
2807                file_path: String::new(),
2808                file_format: super::DataFileFormat::Parquet,
2809                partition: StructValue::default(),
2810                record_count: 10,
2811                file_size_in_bytes: 100,
2812                column_sizes: None,
2813                value_counts: None,
2814                null_value_counts: None,
2815                nan_value_counts: None,
2816                distinct_counts: None,
2817                lower_bounds: None,
2818                upper_bounds: None,
2819                key_metadata: None,
2820                split_offsets: None,
2821                equality_ids: None,
2822                sort_order_id: None,
2823            };
2824            let pos_delete_file = {
2825                let mut data_file = data_file.clone();
2826                data_file.content = super::DataContentType::PositionDeletes;
2827                data_file
2828            };
2829            let eq_delete_file = {
2830                let mut data_file = data_file.clone();
2831                data_file.content = super::DataContentType::EqualityDeletes;
2832                data_file
2833            };
2834            (data_file, pos_delete_file, eq_delete_file)
2835        };
2836
2837        // add data file
2838        let mut builder = SnapshotSummaryBuilder::new();
2839        builder.add(&data_file);
2840        let summary_1 = builder.merge(&HashMap::new(), false).unwrap();
2841        assert_eq!(
2842            summary_1.get(SnapshotSummaryBuilder::OPERATION).unwrap(),
2843            "append"
2844        );
2845        assert_eq!(
2846            summary_1
2847                .get(SnapshotSummaryBuilder::ADDED_DATA_FILES)
2848                .unwrap(),
2849            "1"
2850        );
2851        assert_eq!(
2852            summary_1
2853                .get(SnapshotSummaryBuilder::TOTAL_DATA_FILES)
2854                .unwrap(),
2855            "1"
2856        );
2857        assert_eq!(
2858            summary_1
2859                .get(SnapshotSummaryBuilder::ADDED_RECORDS)
2860                .unwrap(),
2861            "10"
2862        );
2863        assert_eq!(
2864            summary_1
2865                .get(SnapshotSummaryBuilder::TOTAL_RECORDS)
2866                .unwrap(),
2867            "10"
2868        );
2869        assert_eq!(
2870            summary_1
2871                .get(SnapshotSummaryBuilder::ADDED_FILES_SIZE)
2872                .unwrap(),
2873            "100"
2874        );
2875        assert_eq!(
2876            summary_1
2877                .get(SnapshotSummaryBuilder::TOTAL_FILES_SIZE)
2878                .unwrap(),
2879            "100"
2880        );
2881
2882        // add position delete file
2883        // add eq delete file
2884        let mut builder = SnapshotSummaryBuilder::new();
2885        builder.add(&pos_delete_file);
2886        builder.add(&eq_delete_file);
2887        let summary_2 = builder.merge(&summary_1, false).unwrap();
2888        assert_eq!(
2889            summary_2.get(SnapshotSummaryBuilder::OPERATION).unwrap(),
2890            "delete"
2891        );
2892        assert_eq!(
2893            summary_2
2894                .get(SnapshotSummaryBuilder::ADDED_DATA_FILES)
2895                .unwrap(),
2896            "0"
2897        );
2898        assert_eq!(
2899            summary_2
2900                .get(SnapshotSummaryBuilder::TOTAL_DATA_FILES)
2901                .unwrap(),
2902            "1"
2903        );
2904        assert_eq!(
2905            summary_2
2906                .get(SnapshotSummaryBuilder::ADDED_DELETE_FILES)
2907                .unwrap(),
2908            "2"
2909        );
2910        assert_eq!(
2911            summary_2
2912                .get(SnapshotSummaryBuilder::ADDED_POSITION_DELETE_FILES)
2913                .unwrap(),
2914            "1"
2915        );
2916        assert_eq!(
2917            summary_2
2918                .get(SnapshotSummaryBuilder::ADDED_EQUALITY_DELETE_FILES)
2919                .unwrap(),
2920            "1"
2921        );
2922        assert_eq!(
2923            summary_2
2924                .get(SnapshotSummaryBuilder::TOTAL_DELETE_FILES)
2925                .unwrap(),
2926            "2"
2927        );
2928        assert_eq!(
2929            summary_2
2930                .get(SnapshotSummaryBuilder::ADDED_RECORDS)
2931                .unwrap(),
2932            "0"
2933        );
2934        assert_eq!(
2935            summary_2
2936                .get(SnapshotSummaryBuilder::TOTAL_RECORDS)
2937                .unwrap(),
2938            "10"
2939        );
2940        assert_eq!(
2941            summary_2
2942                .get(SnapshotSummaryBuilder::ADDED_POSITION_DELETES)
2943                .unwrap(),
2944            "10"
2945        );
2946        assert_eq!(
2947            summary_2
2948                .get(SnapshotSummaryBuilder::TOTAL_POSITION_DELETES)
2949                .unwrap(),
2950            "10"
2951        );
2952        assert_eq!(
2953            summary_2
2954                .get(SnapshotSummaryBuilder::ADDED_EQUALITY_DELETES)
2955                .unwrap(),
2956            "10"
2957        );
2958        assert_eq!(
2959            summary_2
2960                .get(SnapshotSummaryBuilder::TOTAL_EQUALITY_DELETES)
2961                .unwrap(),
2962            "10"
2963        );
2964        assert_eq!(
2965            summary_2
2966                .get(SnapshotSummaryBuilder::ADDED_FILES_SIZE)
2967                .unwrap(),
2968            "200"
2969        );
2970        assert_eq!(
2971            summary_2
2972                .get(SnapshotSummaryBuilder::TOTAL_FILES_SIZE)
2973                .unwrap(),
2974            "300"
2975        );
2976
2977        // add data file
2978        // add position delete file
2979        // add eq delete file
2980        let mut builder = SnapshotSummaryBuilder::new();
2981        builder.add(&data_file);
2982        builder.add(&pos_delete_file);
2983        builder.add(&eq_delete_file);
2984        let summary_3 = builder.merge(&summary_2, false).unwrap();
2985        assert_eq!(
2986            summary_3.get(SnapshotSummaryBuilder::OPERATION).unwrap(),
2987            "overwrite"
2988        );
2989        assert_eq!(
2990            summary_3
2991                .get(SnapshotSummaryBuilder::ADDED_DATA_FILES)
2992                .unwrap(),
2993            "1"
2994        );
2995        assert_eq!(
2996            summary_3
2997                .get(SnapshotSummaryBuilder::TOTAL_DATA_FILES)
2998                .unwrap(),
2999            "2"
3000        );
3001        assert_eq!(
3002            summary_3
3003                .get(SnapshotSummaryBuilder::ADDED_DELETE_FILES)
3004                .unwrap(),
3005            "2"
3006        );
3007        assert_eq!(
3008            summary_3
3009                .get(SnapshotSummaryBuilder::ADDED_POSITION_DELETE_FILES)
3010                .unwrap(),
3011            "1"
3012        );
3013        assert_eq!(
3014            summary_3
3015                .get(SnapshotSummaryBuilder::ADDED_EQUALITY_DELETE_FILES)
3016                .unwrap(),
3017            "1"
3018        );
3019        assert_eq!(
3020            summary_3
3021                .get(SnapshotSummaryBuilder::TOTAL_DELETE_FILES)
3022                .unwrap(),
3023            "4"
3024        );
3025        assert_eq!(
3026            summary_3
3027                .get(SnapshotSummaryBuilder::ADDED_RECORDS)
3028                .unwrap(),
3029            "10"
3030        );
3031        assert_eq!(
3032            summary_3
3033                .get(SnapshotSummaryBuilder::TOTAL_RECORDS)
3034                .unwrap(),
3035            "20"
3036        );
3037        assert_eq!(
3038            summary_3
3039                .get(SnapshotSummaryBuilder::ADDED_POSITION_DELETES)
3040                .unwrap(),
3041            "10"
3042        );
3043        assert_eq!(
3044            summary_3
3045                .get(SnapshotSummaryBuilder::TOTAL_POSITION_DELETES)
3046                .unwrap(),
3047            "20"
3048        );
3049        assert_eq!(
3050            summary_3
3051                .get(SnapshotSummaryBuilder::ADDED_EQUALITY_DELETES)
3052                .unwrap(),
3053            "10"
3054        );
3055        assert_eq!(
3056            summary_3
3057                .get(SnapshotSummaryBuilder::TOTAL_EQUALITY_DELETES)
3058                .unwrap(),
3059            "20"
3060        );
3061        assert_eq!(
3062            summary_3
3063                .get(SnapshotSummaryBuilder::ADDED_FILES_SIZE)
3064                .unwrap(),
3065            "300"
3066        );
3067        assert_eq!(
3068            summary_3
3069                .get(SnapshotSummaryBuilder::TOTAL_FILES_SIZE)
3070                .unwrap(),
3071            "600"
3072        );
3073    }
3074}