iceberg_rust_spec/spec/
manifest.rs

1//! Manifest file handling and data file tracking for Iceberg tables.
2//!
3//! This module provides the core types and implementations for working with manifest files,
4//! which track the data files that comprise an Iceberg table. Key components include:
5//!
6//! - [`ManifestEntry`] - Entries tracking data file additions and deletions
7//! - [`DataFile`] - Metadata about data files including statistics and metrics
8//! - [`Content`] - Types of content stored in data files (data vs deletes)
9//! - [`Status`] - Tracking additions and deletions of files
10//! - [`FileFormat`] - Supported file formats (Avro, ORC, Parquet)
11//!
12//! Manifest files are a key part of Iceberg's architecture, providing metadata about
13//! data files and enabling efficient file pruning during queries.
14
15use std::collections::HashMap;
16
17use apache_avro::Schema as AvroSchema;
18use derive_builder::Builder;
19use derive_getters::Getters;
20use serde::{de::DeserializeOwned, ser::SerializeSeq, Deserialize, Serialize};
21use serde_bytes::ByteBuf;
22use serde_repr::{Deserialize_repr, Serialize_repr};
23
24use crate::{error::Error, partition::BoundPartitionField};
25
26use super::{
27    partition::PartitionSpec,
28    schema::Schema,
29    table_metadata::FormatVersion,
30    types::{PrimitiveType, StructType, Type},
31    values::{Struct, Value},
32};
33
34/// Entry in manifest with the iceberg spec version 2.
35#[derive(Debug, Serialize, PartialEq, Clone, Getters, Builder)]
36#[serde(into = "ManifestEntryEnum")]
37#[builder(build_fn(error = "Error"), setter(prefix = "with"))]
38pub struct ManifestEntry {
39    /// Table format version
40    format_version: FormatVersion,
41    /// Used to track additions and deletions
42    status: Status,
43    /// Snapshot id where the file was added, or deleted if status is 2.
44    /// Inherited when null.
45    #[builder(setter(strip_option), default)]
46    snapshot_id: Option<i64>,
47    /// Sequence number when the file was added. Inherited when null.
48    #[builder(setter(strip_option), default)]
49    sequence_number: Option<i64>,
50    /// File path, partition tuple, metrics, …
51    data_file: DataFile,
52}
53
54impl ManifestEntry {
55    /// Creates a new builder for constructing a ManifestEntry.
56    ///
57    /// The builder provides a fluent interface for setting all the fields of a ManifestEntry.
58    /// Use this when you need to create a new manifest entry with custom values.
59    ///
60    /// # Returns
61    /// * A new ManifestEntryBuilder instance with default values
62    pub fn builder() -> ManifestEntryBuilder {
63        ManifestEntryBuilder::default()
64    }
65
66    /// Returns a mutable reference to the status field of this manifest entry.
67    ///
68    /// This allows modifying the status to track additions and deletions of data files.
69    pub fn status_mut(&mut self) -> &mut Status {
70        &mut self.status
71    }
72
73    /// Returns a mutable reference to the sequence number field of this manifest entry.
74    ///
75    /// The sequence number tracks the order of changes to a table. Modifying this allows
76    /// updating the sequence number when new changes are made.
77    pub fn sequence_number_mut(&mut self) -> &mut Option<i64> {
78        &mut self.sequence_number
79    }
80
81    /// Returns a mutable reference to the snapshot ID field of this manifest entry.
82    ///
83    /// The snapshot ID identifies which snapshot added or deleted this data file.
84    /// Modifying this allows updating which snapshot this manifest entry belongs to.
85    pub fn snapshot_id_mut(&mut self) -> &mut Option<i64> {
86        &mut self.snapshot_id
87    }
88}
89
90impl ManifestEntry {
91    pub fn try_from_v2(
92        value: ManifestEntryV2,
93        schema: &Schema,
94        partition_spec: &PartitionSpec,
95    ) -> Result<Self, Error> {
96        Ok(ManifestEntry {
97            format_version: FormatVersion::V2,
98            status: value.status,
99            snapshot_id: value.snapshot_id,
100            sequence_number: value.sequence_number,
101            data_file: DataFile::try_from_v2(value.data_file, schema, partition_spec)?,
102        })
103    }
104
105    pub fn try_from_v1(
106        value: ManifestEntryV1,
107        schema: &Schema,
108        partition_spec: &PartitionSpec,
109    ) -> Result<Self, Error> {
110        Ok(ManifestEntry {
111            format_version: FormatVersion::V2,
112            status: value.status,
113            snapshot_id: Some(value.snapshot_id),
114            sequence_number: None,
115            data_file: DataFile::try_from_v1(value.data_file, schema, partition_spec)?,
116        })
117    }
118}
119
120/// Entry in manifest
121#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
122#[serde(untagged)]
123pub enum ManifestEntryEnum {
124    /// Manifest entry version 2
125    V2(ManifestEntryV2),
126    /// Manifest entry version 1
127    V1(ManifestEntryV1),
128}
129
130/// Entry in manifest with the iceberg spec version 2.
131#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
132pub struct ManifestEntryV2 {
133    /// Used to track additions and deletions
134    pub status: Status,
135    /// Snapshot id where the file was added, or deleted if status is 2.
136    /// Inherited when null.
137    pub snapshot_id: Option<i64>,
138    /// Sequence number when the file was added. Inherited when null.
139    pub sequence_number: Option<i64>,
140    /// File path, partition tuple, metrics, …
141    pub data_file: DataFileV2,
142}
143
144/// Entry in manifest with the iceberg spec version 1.
145#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
146pub struct ManifestEntryV1 {
147    /// Used to track additions and deletions
148    pub status: Status,
149    /// Snapshot id where the file was added, or deleted if status is 2.
150    /// Inherited when null.
151    pub snapshot_id: i64,
152    /// File path, partition tuple, metrics, …
153    pub data_file: DataFileV1,
154}
155
156impl From<ManifestEntry> for ManifestEntryEnum {
157    fn from(value: ManifestEntry) -> Self {
158        match value.format_version {
159            FormatVersion::V2 => ManifestEntryEnum::V2(value.into()),
160            FormatVersion::V1 => ManifestEntryEnum::V1(value.into()),
161        }
162    }
163}
164
165impl From<ManifestEntry> for ManifestEntryV2 {
166    fn from(value: ManifestEntry) -> Self {
167        ManifestEntryV2 {
168            status: value.status,
169            snapshot_id: value.snapshot_id,
170            sequence_number: value.sequence_number,
171            data_file: value.data_file.into(),
172        }
173    }
174}
175
176impl From<ManifestEntry> for ManifestEntryV1 {
177    fn from(v1: ManifestEntry) -> Self {
178        ManifestEntryV1 {
179            status: v1.status,
180            snapshot_id: v1.snapshot_id.unwrap_or(0),
181            data_file: v1.data_file.into(),
182        }
183    }
184}
185
186impl From<ManifestEntryV1> for ManifestEntryV2 {
187    fn from(v1: ManifestEntryV1) -> Self {
188        ManifestEntryV2 {
189            status: v1.status,
190            snapshot_id: Some(v1.snapshot_id),
191            sequence_number: Some(0),
192            data_file: v1.data_file.into(),
193        }
194    }
195}
196
197impl ManifestEntry {
198    /// Get schema of manifest entry.
199    pub fn schema(
200        partition_schema: &str,
201        format_version: &FormatVersion,
202    ) -> Result<AvroSchema, Error> {
203        let schema = match format_version {
204            FormatVersion::V1 => {
205                let datafile_schema = DataFileV1::schema(partition_schema);
206                r#"{
207            "type": "record",
208            "name": "manifest_entry",
209            "fields": [
210                {
211                    "name": "status",
212                    "type": "int",
213                    "field-id": 0
214                },
215                {
216                    "name": "snapshot_id",
217                    "type": "long",
218                    "field-id": 1
219                },
220                {
221                    "name": "data_file",
222                    "type": "#
223                    .to_owned()
224                    + &datafile_schema
225                    + r#",
226                    "field-id": 2
227                }
228            ]
229        }"#
230            }
231            FormatVersion::V2 => {
232                let datafile_schema = DataFileV2::schema(partition_schema);
233                r#"{
234            "type": "record",
235            "name": "manifest_entry",
236            "fields": [
237                {
238                    "name": "status",
239                    "type": "int",
240                    "field-id": 0
241                },
242                {
243                    "name": "snapshot_id",
244                    "type": [
245                        "null",
246                        "long"
247                    ],
248                    "default": null,
249                    "field-id": 1
250                },
251                {
252                    "name": "sequence_number",
253                    "type": [
254                        "null",
255                        "long"
256                    ],
257                    "default": null,
258                    "field-id": 3
259                },
260                {
261                    "name": "data_file",
262                    "type": "#
263                    .to_owned()
264                    + &datafile_schema
265                    + r#",
266                    "field-id": 2
267                }
268            ]
269        }"#
270            }
271        };
272        AvroSchema::parse_str(&schema).map_err(Into::into)
273    }
274}
275
276#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy)]
277#[repr(u8)]
278/// Used to track additions and deletions
279pub enum Status {
280    /// Existing files
281    Existing = 0,
282    /// Added files
283    Added = 1,
284    /// Deleted files
285    Deleted = 2,
286}
287
288#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone)]
289#[repr(u8)]
290/// Type of content stored by the data file.
291pub enum Content {
292    /// Data.
293    Data = 0,
294    /// Deletes at position.
295    PositionDeletes = 1,
296    /// Delete by equality.
297    EqualityDeletes = 2,
298}
299
300impl TryFrom<Vec<u8>> for Content {
301    type Error = Error;
302    fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
303        match String::from_utf8(value)?.to_uppercase().as_str() {
304            "DATA" => Ok(Content::Data),
305            "POSITION DELETES" => Ok(Content::PositionDeletes),
306            "EQUALITY DELETES" => Ok(Content::EqualityDeletes),
307            _ => Err(Error::Conversion(
308                "string".to_string(),
309                "content".to_string(),
310            )),
311        }
312    }
313}
314
315impl From<Content> for Vec<u8> {
316    fn from(value: Content) -> Self {
317        match value {
318            Content::Data => "DATA".as_bytes().to_owned(),
319            Content::PositionDeletes => "POSITION DELETES".as_bytes().to_owned(),
320            Content::EqualityDeletes => "EQUALITY DELETES".as_bytes().to_owned(),
321        }
322    }
323}
324
325#[derive(Debug, PartialEq, Eq, Clone)]
326#[repr(u8)]
327/// Name of file format
328pub enum FileFormat {
329    /// Avro file
330    Avro = 0,
331    /// Orc file
332    Orc = 1,
333    /// Parquet file
334    Parquet = 2,
335}
336
337/// Serialize for PrimitiveType wit special handling for
338/// Decimal and Fixed types.
339impl Serialize for FileFormat {
340    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
341    where
342        S: serde::Serializer,
343    {
344        use FileFormat::*;
345        match self {
346            Avro => serializer.serialize_str("AVRO"),
347            Orc => serializer.serialize_str("ORC"),
348            Parquet => serializer.serialize_str("PARQUET"),
349        }
350    }
351}
352
353/// Serialize for PrimitiveType wit special handling for
354/// Decimal and Fixed types.
355impl<'de> Deserialize<'de> for FileFormat {
356    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
357    where
358        D: serde::Deserializer<'de>,
359    {
360        let s = String::deserialize(deserializer)?;
361        if s == "AVRO" {
362            Ok(FileFormat::Avro)
363        } else if s == "ORC" {
364            Ok(FileFormat::Orc)
365        } else if s == "PARQUET" {
366            Ok(FileFormat::Parquet)
367        } else {
368            Err(serde::de::Error::custom("Invalid data file format."))
369        }
370    }
371}
372
373/// Get schema for partition values depending on partition spec and table schema
374pub fn partition_value_schema(spec: &[BoundPartitionField<'_>]) -> Result<String, Error> {
375    Ok(spec
376        .iter()
377        .map(|field| {
378            let data_type = avro_schema_datatype(field.field_type());
379            Ok::<_, Error>(
380                r#"
381                {
382                    "name": ""#
383                    .to_owned()
384                    + field.name()
385                    + r#"", 
386                    "type":  ["null",""#
387                    + &format!("{}", &data_type)
388                    + r#""],
389                    "field-id": "#
390                    + &field.field_id().to_string()
391                    + r#",
392                    "default": null
393                },"#,
394            )
395        })
396        .try_fold(
397            r#"{"type": "record","name": "r102","fields": ["#.to_owned(),
398            |acc, x| {
399                let result = acc + &x?;
400                Ok::<_, Error>(result)
401            },
402        )?
403        .trim_end_matches(',')
404        .to_owned()
405        + r#"]}"#)
406}
407
408fn avro_schema_datatype(data_type: &Type) -> Type {
409    match data_type {
410        Type::Primitive(prim) => match prim {
411            PrimitiveType::Date => Type::Primitive(PrimitiveType::Int),
412            PrimitiveType::Time => Type::Primitive(PrimitiveType::Long),
413            PrimitiveType::Timestamp => Type::Primitive(PrimitiveType::Long),
414            PrimitiveType::Timestamptz => Type::Primitive(PrimitiveType::Long),
415            p => Type::Primitive(p.clone()),
416        },
417        t => t.clone(),
418    }
419}
420
421#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
422struct KeyValue<T: Serialize + Clone> {
423    key: i32,
424    value: T,
425}
426
427/// Utility struct to convert avro maps to rust hashmaps. Derefences to a Hashmap.
428#[derive(Debug, PartialEq, Eq, Clone)]
429pub struct AvroMap<T: Serialize + Clone>(pub HashMap<i32, T>);
430
431impl<T: Serialize + Clone> core::ops::Deref for AvroMap<T> {
432    type Target = HashMap<i32, T>;
433
434    fn deref(self: &'_ AvroMap<T>) -> &'_ Self::Target {
435        &self.0
436    }
437}
438
439impl<T: Serialize + Clone> core::ops::DerefMut for AvroMap<T> {
440    fn deref_mut(self: &'_ mut AvroMap<T>) -> &'_ mut Self::Target {
441        &mut self.0
442    }
443}
444
445impl<T: Serialize + Clone> Serialize for AvroMap<T> {
446    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
447    where
448        S: serde::Serializer,
449    {
450        let entries = self
451            .0
452            .iter()
453            .map(|(key, value)| KeyValue {
454                key: *key,
455                value: (*value).clone(),
456            })
457            .collect::<Vec<KeyValue<T>>>();
458        let mut seq = serializer.serialize_seq(Some(entries.len()))?;
459        for element in entries {
460            seq.serialize_element(&element)?;
461        }
462        seq.end()
463    }
464}
465
466impl<'de, T: Serialize + DeserializeOwned + Clone> Deserialize<'de> for AvroMap<T> {
467    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
468    where
469        D: serde::Deserializer<'de>,
470    {
471        let vec: Vec<KeyValue<T>> = Vec::deserialize(deserializer)?;
472        Ok(AvroMap(HashMap::from_iter(
473            vec.into_iter().map(|x| (x.key, x.value)),
474        )))
475    }
476}
477
478impl AvroMap<ByteBuf> {
479    /// Converts a map of byte buffers into a map of typed Iceberg values using the provided schema.
480    ///
481    /// # Arguments
482    /// * `schema` - The struct type schema used to determine the correct type for each value
483    ///
484    /// # Returns
485    /// * `Result<HashMap<i32, Value>, Error>` - A map of field IDs to their typed values, or an error if conversion fails
486    fn into_value_map(self, schema: &StructType) -> Result<HashMap<i32, Value>, Error> {
487        Ok(HashMap::from_iter(
488            self.0
489                .into_iter()
490                .map(|(k, v)| {
491                    Ok((
492                        k,
493                        Value::try_from_bytes(
494                            &v,
495                            &schema
496                                .get(k as usize)
497                                .ok_or(Error::ColumnNotInSchema(
498                                    k.to_string(),
499                                    format!("{schema:?}"),
500                                ))?
501                                .field_type,
502                        )?,
503                    ))
504                })
505                .collect::<Result<Vec<_>, Error>>()?,
506        ))
507    }
508}
509
510impl From<HashMap<i32, Value>> for AvroMap<ByteBuf> {
511    fn from(value: HashMap<i32, Value>) -> Self {
512        AvroMap(HashMap::from_iter(
513            value.into_iter().map(|(k, v)| (k, v.into())),
514        ))
515    }
516}
517
518#[derive(Debug, PartialEq, Clone, Getters, Builder)]
519#[builder(build_fn(error = "Error"), setter(prefix = "with"))]
520/// DataFile found in Manifest.
521pub struct DataFile {
522    ///Type of content in data file.
523    content: Content,
524    /// Full URI for the file with a FS scheme.
525    file_path: String,
526    /// String file format name, avro, orc or parquet
527    file_format: FileFormat,
528    /// Partition data tuple, schema based on the partition spec output using partition field ids for the struct field ids
529    partition: Struct,
530    /// Number of records in this file
531    record_count: i64,
532    /// Total file size in bytes
533    file_size_in_bytes: i64,
534    /// Map from column id to total size on disk
535    column_sizes: Option<AvroMap<i64>>,
536    /// Map from column id to number of values in the column (including null and NaN values)
537    value_counts: Option<AvroMap<i64>>,
538    /// Map from column id to number of null values
539    null_value_counts: Option<AvroMap<i64>>,
540    /// Map from column id to number of NaN values
541    nan_value_counts: Option<AvroMap<i64>>,
542    /// Map from column id to lower bound in the column
543    lower_bounds: Option<HashMap<i32, Value>>,
544    /// Map from column id to upper bound in the column
545    upper_bounds: Option<HashMap<i32, Value>>,
546    /// Implementation specific key metadata for encryption
547    #[builder(default)]
548    key_metadata: Option<ByteBuf>,
549    /// Split offsets for the data file.
550    #[builder(default)]
551    split_offsets: Option<Vec<i64>>,
552    /// Field ids used to determine row equality in equality delete files.
553    #[builder(default)]
554    equality_ids: Option<Vec<i32>>,
555    /// ID representing sort order for this file
556    #[builder(default)]
557    sort_order_id: Option<i32>,
558}
559
560impl DataFile {
561    pub fn builder() -> DataFileBuilder {
562        DataFileBuilder::default()
563    }
564}
565
566impl DataFile {
567    pub(crate) fn try_from_v2(
568        value: DataFileV2,
569        schema: &Schema,
570        partition_spec: &PartitionSpec,
571    ) -> Result<Self, Error> {
572        Ok(DataFile {
573            content: value.content,
574            file_path: value.file_path,
575            file_format: value.file_format,
576            partition: value
577                .partition
578                .cast(schema.fields(), partition_spec.fields())?,
579            record_count: value.record_count,
580            file_size_in_bytes: value.file_size_in_bytes,
581            column_sizes: value.column_sizes,
582            value_counts: value.value_counts,
583            null_value_counts: value.null_value_counts,
584            nan_value_counts: value.nan_value_counts,
585            lower_bounds: value
586                .lower_bounds
587                .map(|map| map.into_value_map(schema.fields()))
588                .transpose()?,
589            upper_bounds: value
590                .upper_bounds
591                .map(|map| map.into_value_map(schema.fields()))
592                .transpose()?,
593            key_metadata: value.key_metadata,
594            split_offsets: value.split_offsets,
595            equality_ids: value.equality_ids,
596            sort_order_id: value.sort_order_id,
597        })
598    }
599
600    pub(crate) fn try_from_v1(
601        value: DataFileV1,
602        schema: &Schema,
603        partition_spec: &PartitionSpec,
604    ) -> Result<Self, Error> {
605        Ok(DataFile {
606            content: Content::Data,
607            file_path: value.file_path,
608            file_format: value.file_format,
609            partition: value
610                .partition
611                .cast(schema.fields(), partition_spec.fields())?,
612            record_count: value.record_count,
613            file_size_in_bytes: value.file_size_in_bytes,
614            column_sizes: value.column_sizes,
615            value_counts: value.value_counts,
616            null_value_counts: value.null_value_counts,
617            nan_value_counts: value.nan_value_counts,
618            lower_bounds: value
619                .lower_bounds
620                .map(|map| map.into_value_map(schema.fields()))
621                .transpose()?,
622            upper_bounds: value
623                .upper_bounds
624                .map(|map| map.into_value_map(schema.fields()))
625                .transpose()?,
626            key_metadata: value.key_metadata,
627            split_offsets: value.split_offsets,
628            equality_ids: None,
629            sort_order_id: value.sort_order_id,
630        })
631    }
632}
633
634#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
635/// DataFile found in Manifest.
636pub struct DataFileV2 {
637    ///Type of content in data file.
638    pub content: Content,
639    /// Full URI for the file with a FS scheme.
640    pub file_path: String,
641    /// String file format name, avro, orc or parquet
642    pub file_format: FileFormat,
643    /// Partition data tuple, schema based on the partition spec output using partition field ids for the struct field ids
644    pub partition: Struct,
645    /// Number of records in this file
646    pub record_count: i64,
647    /// Total file size in bytes
648    pub file_size_in_bytes: i64,
649    /// Map from column id to total size on disk
650    pub column_sizes: Option<AvroMap<i64>>,
651    /// Map from column id to number of values in the column (including null and NaN values)
652    pub value_counts: Option<AvroMap<i64>>,
653    /// Map from column id to number of null values
654    pub null_value_counts: Option<AvroMap<i64>>,
655    /// Map from column id to number of NaN values
656    pub nan_value_counts: Option<AvroMap<i64>>,
657    /// Map from column id to lower bound in the column
658    pub lower_bounds: Option<AvroMap<ByteBuf>>,
659    /// Map from column id to upper bound in the column
660    pub upper_bounds: Option<AvroMap<ByteBuf>>,
661    /// Implementation specific key metadata for encryption
662    pub key_metadata: Option<ByteBuf>,
663    /// Split offsets for the data file.
664    pub split_offsets: Option<Vec<i64>>,
665    /// Field ids used to determine row equality in equality delete files.
666    pub equality_ids: Option<Vec<i32>>,
667    /// ID representing sort order for this file
668    pub sort_order_id: Option<i32>,
669}
670
671#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
672/// DataFile found in Manifest.
673pub struct DataFileV1 {
674    /// Full URI for the file with a FS scheme.
675    pub file_path: String,
676    /// String file format name, avro, orc or parquet
677    pub file_format: FileFormat,
678    /// Partition data tuple, schema based on the partition spec output using partition field ids for the struct field ids
679    pub partition: Struct,
680    /// Number of records in this file
681    pub record_count: i64,
682    /// Total file size in bytes
683    pub file_size_in_bytes: i64,
684    /// Block size
685    pub block_size_in_bytes: i64,
686    /// File ordinal
687    pub file_ordinal: Option<i32>,
688    /// Columns to sort
689    pub sort_columns: Option<Vec<i32>>,
690    /// Map from column id to total size on disk
691    pub column_sizes: Option<AvroMap<i64>>,
692    /// Map from column id to number of values in the column (including null and NaN values)
693    pub value_counts: Option<AvroMap<i64>>,
694    /// Map from column id to number of null values
695    pub null_value_counts: Option<AvroMap<i64>>,
696    /// Map from column id to number of NaN values
697    pub nan_value_counts: Option<AvroMap<i64>>,
698    /// Map from column id to lower bound in the column
699    pub lower_bounds: Option<AvroMap<ByteBuf>>,
700    /// Map from column id to upper bound in the column
701    pub upper_bounds: Option<AvroMap<ByteBuf>>,
702    /// Implementation specific key metadata for encryption
703    pub key_metadata: Option<ByteBuf>,
704    /// Split offsets for the data file.
705    pub split_offsets: Option<Vec<i64>>,
706    /// ID representing sort order for this file
707    pub sort_order_id: Option<i32>,
708}
709
710impl From<DataFile> for DataFileV2 {
711    fn from(value: DataFile) -> Self {
712        DataFileV2 {
713            content: value.content,
714            file_path: value.file_path,
715            file_format: value.file_format,
716            partition: value.partition,
717            record_count: value.record_count,
718            file_size_in_bytes: value.file_size_in_bytes,
719            column_sizes: value.column_sizes,
720            value_counts: value.value_counts,
721            null_value_counts: value.null_value_counts,
722            nan_value_counts: value.nan_value_counts,
723            lower_bounds: value.lower_bounds.map(Into::into),
724            upper_bounds: value.upper_bounds.map(Into::into),
725            key_metadata: value.key_metadata,
726            split_offsets: value.split_offsets,
727            equality_ids: value.equality_ids,
728            sort_order_id: value.sort_order_id,
729        }
730    }
731}
732
733impl From<DataFile> for DataFileV1 {
734    fn from(value: DataFile) -> Self {
735        DataFileV1 {
736            file_path: value.file_path,
737            file_format: value.file_format,
738            partition: value.partition,
739            record_count: value.record_count,
740            file_size_in_bytes: value.file_size_in_bytes,
741            column_sizes: value.column_sizes,
742            value_counts: value.value_counts,
743            null_value_counts: value.null_value_counts,
744            nan_value_counts: value.nan_value_counts,
745            lower_bounds: value.lower_bounds.map(Into::into),
746            upper_bounds: value.upper_bounds.map(Into::into),
747            key_metadata: value.key_metadata,
748            split_offsets: value.split_offsets,
749            sort_order_id: value.sort_order_id,
750            block_size_in_bytes: 0,
751            file_ordinal: None,
752            sort_columns: None,
753        }
754    }
755}
756
757impl From<DataFileV1> for DataFileV2 {
758    fn from(v1: DataFileV1) -> Self {
759        DataFileV2 {
760            content: Content::Data,
761            file_path: v1.file_path,
762            file_format: v1.file_format,
763            partition: v1.partition,
764            record_count: v1.record_count,
765            file_size_in_bytes: v1.file_size_in_bytes,
766            column_sizes: v1.column_sizes,
767            value_counts: v1.value_counts,
768            null_value_counts: v1.null_value_counts,
769            nan_value_counts: v1.nan_value_counts,
770            lower_bounds: v1.lower_bounds,
771            upper_bounds: v1.upper_bounds,
772            key_metadata: v1.key_metadata,
773            split_offsets: v1.split_offsets,
774            equality_ids: None,
775            sort_order_id: v1.sort_order_id,
776        }
777    }
778}
779
780impl DataFileV1 {
781    /// Get schema
782    pub fn schema(partition_schema: &str) -> String {
783        r#"{
784            "type": "record",
785            "name": "r2",
786            "fields": [
787                {
788                    "name": "file_path",
789                    "type": "string",
790                    "field-id": 100
791                },
792                {
793                    "name": "file_format",
794                    "type": "string",
795                    "field-id": 101
796                },
797                {
798                    "name": "partition",
799                    "type": "#
800            .to_owned()
801            + partition_schema
802            + r#",
803                    "field-id": 102
804                },
805                {
806                    "name": "record_count",
807                    "type": "long",
808                    "field-id": 103
809                },
810                {
811                    "name": "file_size_in_bytes",
812                    "type": "long",
813                    "field-id": 104
814                },
815                {
816                    "name": "block_size_in_bytes",
817                    "type": "long",
818                    "field-id": 105
819                },
820                {
821                    "name": "file_ordinal",
822                    "type": [
823                        "null",
824                        "int"
825                    ],
826                    "default": null,
827                    "field-id": 106
828                },
829                {
830                    "name": "sort_columns",
831                    "type": [
832                        "null",
833                        {
834                            "type": "array",
835                            "items": "int",
836                            "element-id": 112
837                        }
838                    ],
839                    "default": null,
840                    "field-id": 107
841                },
842                {
843                    "name": "column_sizes",
844                    "type": [
845                        "null",
846                        {
847                            "type": "array",
848                            "logicalType": "map",
849                            "items": {
850                                "type": "record",
851                                "name": "k117_v118",
852                                "fields": [
853                                    {
854                                        "name": "key",
855                                        "type": "int",
856                                        "field-id": 117
857                                    },
858                                    {
859                                        "name": "value",
860                                        "type": "long",
861                                        "field-id": 118
862                                    }
863                                ]
864                            }
865                        }
866                    ],
867                    "default": null,
868                    "field-id": 108
869                },
870                {
871                    "name": "value_counts",
872                    "type": [
873                        "null",
874                        {
875                            "type": "array",
876                            "logicalType": "map",
877                            "items": {
878                                "type": "record",
879                                "name": "k119_v120",
880                                "fields": [
881                                    {
882                                        "name": "key",
883                                        "type": "int",
884                                        "field-id": 119
885                                    },
886                                    {
887                                        "name": "value",
888                                        "type": "long",
889                                        "field-id": 120
890                                    }
891                                ]
892                            }
893                        }
894                    ],
895                    "default": null,
896                    "field-id": 109
897                },
898                {
899                    "name": "null_value_counts",
900                    "type": [
901                        "null",
902                        {
903                            "type": "array",
904                            "logicalType": "map",
905                            "items": {
906                                "type": "record",
907                                "name": "k121_v122",
908                                "fields": [
909                                    {
910                                        "name": "key",
911                                        "type": "int",
912                                        "field-id": 121
913                                    },
914                                    {
915                                        "name": "value",
916                                        "type": "long",
917                                        "field-id": 122
918                                    }
919                                ]
920                            }
921                        }
922                    ],
923                    "default": null,
924                    "field-id": 110
925                },
926                {
927                    "name": "nan_value_counts",
928                    "type": [
929                        "null",
930                        {
931                            "type": "array",
932                            "logicalType": "map",
933                            "items": {
934                                "type": "record",
935                                "name": "k138_v139",
936                                "fields": [
937                                    {
938                                        "name": "key",
939                                        "type": "int",
940                                        "field-id": 138
941                                    },
942                                    {
943                                        "name": "value",
944                                        "type": "long",
945                                        "field-id": 139
946                                    }
947                                ]
948                            }
949                        }
950                    ],
951                    "default": null,
952                    "field-id": 137
953                },
954                {
955                    "name": "lower_bounds",
956                    "type": [
957                        "null",
958                        {
959                            "type": "array",
960                            "logicalType": "map",
961                            "items": {
962                                "type": "record",
963                                "name": "k126_v127",
964                                "fields": [
965                                    {
966                                        "name": "key",
967                                        "type": "int",
968                                        "field-id": 126
969                                    },
970                                    {
971                                        "name": "value",
972                                        "type": "bytes",
973                                        "field-id": 127
974                                    }
975                                ]
976                            }
977                        }
978                    ],
979                    "default": null,
980                    "field-id": 125
981                },
982                {
983                    "name": "upper_bounds",
984                    "type": [
985                        "null",
986                        {
987                            "type": "array",
988                            "logicalType": "map",
989                            "items": {
990                                "type": "record",
991                                "name": "k129_v130",
992                                "fields": [
993                                    {
994                                        "name": "key",
995                                        "type": "int",
996                                        "field-id": 129
997                                    },
998                                    {
999                                        "name": "value",
1000                                        "type": "bytes",
1001                                        "field-id": 130
1002                                    }
1003                                ]
1004                            }
1005                        }
1006                    ],
1007                    "default": null,
1008                    "field-id": 128
1009                },
1010                {
1011                    "name": "key_metadata",
1012                    "type": [
1013                        "null",
1014                        "bytes"
1015                    ],
1016                    "default": null,
1017                    "field-id": 131
1018                },
1019                {
1020                    "name": "split_offsets",
1021                    "type": [
1022                        "null",
1023                        {
1024                            "type": "array",
1025                            "items": "long",
1026                            "element-id": 133
1027                        }
1028                    ],
1029                    "default": null,
1030                    "field-id": 132
1031                },
1032                {
1033                    "name": "sort_order_id",
1034                    "type": [
1035                        "null",
1036                        "int"
1037                    ],
1038                    "default": null,
1039                    "field-id": 140
1040                }
1041            ]
1042        }"#
1043    }
1044}
1045
1046impl DataFileV2 {
1047    /// Get schema
1048    pub fn schema(partition_schema: &str) -> String {
1049        r#"{
1050            "type": "record",
1051            "name": "r2",
1052            "fields": [
1053                {
1054                    "name": "content",
1055                    "type": "int",
1056                    "field-id": 134
1057                },
1058                {
1059                    "name": "file_path",
1060                    "type": "string",
1061                    "field-id": 100
1062                },
1063                {
1064                    "name": "file_format",
1065                    "type": "string",
1066                    "field-id": 101
1067                },
1068                {
1069                    "name": "partition",
1070                    "type": "#
1071            .to_owned()
1072            + partition_schema
1073            + r#",
1074                    "field-id": 102
1075                },
1076                {
1077                    "name": "record_count",
1078                    "type": "long",
1079                    "field-id": 103
1080                },
1081                {
1082                    "name": "file_size_in_bytes",
1083                    "type": "long",
1084                    "field-id": 104
1085                },
1086                {
1087                    "name": "column_sizes",
1088                    "type": [
1089                        "null",
1090                        {
1091                            "type": "array",
1092                            "logicalType": "map",
1093                            "items": {
1094                                "type": "record",
1095                                "name": "k117_v118",
1096                                "fields": [
1097                                    {
1098                                        "name": "key",
1099                                        "type": "int",
1100                                        "field-id": 117
1101                                    },
1102                                    {
1103                                        "name": "value",
1104                                        "type": "long",
1105                                        "field-id": 118
1106                                    }
1107                                ]
1108                            }
1109                        }
1110                    ],
1111                    "default": null,
1112                    "field-id": 108
1113                },
1114                {
1115                    "name": "value_counts",
1116                    "type": [
1117                        "null",
1118                        {
1119                            "type": "array",
1120                            "logicalType": "map",
1121                            "items": {
1122                                "type": "record",
1123                                "name": "k119_v120",
1124                                "fields": [
1125                                    {
1126                                        "name": "key",
1127                                        "type": "int",
1128                                        "field-id": 119
1129                                    },
1130                                    {
1131                                        "name": "value",
1132                                        "type": "long",
1133                                        "field-id": 120
1134                                    }
1135                                ]
1136                            }
1137                        }
1138                    ],
1139                    "default": null,
1140                    "field-id": 109
1141                },
1142                {
1143                    "name": "null_value_counts",
1144                    "type": [
1145                        "null",
1146                        {
1147                            "type": "array",
1148                            "logicalType": "map",
1149                            "items": {
1150                                "type": "record",
1151                                "name": "k121_v122",
1152                                "fields": [
1153                                    {
1154                                        "name": "key",
1155                                        "type": "int",
1156                                        "field-id": 121
1157                                    },
1158                                    {
1159                                        "name": "value",
1160                                        "type": "long",
1161                                        "field-id": 122
1162                                    }
1163                                ]
1164                            }
1165                        }
1166                    ],
1167                    "default": null,
1168                    "field-id": 110
1169                },
1170                {
1171                    "name": "nan_value_counts",
1172                    "type": [
1173                        "null",
1174                        {
1175                            "type": "array",
1176                            "logicalType": "map",
1177                            "items": {
1178                                "type": "record",
1179                                "name": "k138_v139",
1180                                "fields": [
1181                                    {
1182                                        "name": "key",
1183                                        "type": "int",
1184                                        "field-id": 138
1185                                    },
1186                                    {
1187                                        "name": "value",
1188                                        "type": "long",
1189                                        "field-id": 139
1190                                    }
1191                                ]
1192                            }
1193                        }
1194                    ],
1195                    "default": null,
1196                    "field-id": 137
1197                },
1198                {
1199                    "name": "lower_bounds",
1200                    "type": [
1201                        "null",
1202                        {
1203                            "type": "array",
1204                            "logicalType": "map",
1205                            "items": {
1206                                "type": "record",
1207                                "name": "k126_v127",
1208                                "fields": [
1209                                    {
1210                                        "name": "key",
1211                                        "type": "int",
1212                                        "field-id": 126
1213                                    },
1214                                    {
1215                                        "name": "value",
1216                                        "type": "bytes",
1217                                        "field-id": 127
1218                                    }
1219                                ]
1220                            }
1221                        }
1222                    ],
1223                    "default": null,
1224                    "field-id": 125
1225                },
1226                {
1227                    "name": "upper_bounds",
1228                    "type": [
1229                        "null",
1230                        {
1231                            "type": "array",
1232                            "logicalType": "map",
1233                            "items": {
1234                                "type": "record",
1235                                "name": "k129_v130",
1236                                "fields": [
1237                                    {
1238                                        "name": "key",
1239                                        "type": "int",
1240                                        "field-id": 129
1241                                    },
1242                                    {
1243                                        "name": "value",
1244                                        "type": "bytes",
1245                                        "field-id": 130
1246                                    }
1247                                ]
1248                            }
1249                        }
1250                    ],
1251                    "default": null,
1252                    "field-id": 128
1253                },
1254                {
1255                    "name": "key_metadata",
1256                    "type": [
1257                        "null",
1258                        "bytes"
1259                    ],
1260                    "default": null,
1261                    "field-id": 131
1262                },
1263                {
1264                    "name": "split_offsets",
1265                    "type": [
1266                        "null",
1267                        {
1268                            "type": "array",
1269                            "items": "long",
1270                            "element-id": 133
1271                        }
1272                    ],
1273                    "default": null,
1274                    "field-id": 132
1275                },
1276                {
1277                    "name": "equality_ids",
1278                    "type": [
1279                        "null",
1280                        {
1281                            "type": "array",
1282                            "items": "int",
1283                            "element-id": 136
1284                        }
1285                    ],
1286                    "default": null,
1287                    "field-id": 135
1288                },
1289                {
1290                    "name": "sort_order_id",
1291                    "type": [
1292                        "null",
1293                        "int"
1294                    ],
1295                    "default": null,
1296                    "field-id": 140
1297                }
1298            ]
1299        }"#
1300    }
1301}
1302
1303#[cfg(test)]
1304mod tests {
1305    use crate::spec::{
1306        partition::{PartitionField, Transform},
1307        table_metadata::TableMetadataBuilder,
1308        types::{PrimitiveType, StructField, Type},
1309        values::Value,
1310    };
1311
1312    use super::*;
1313    use apache_avro::{self, types::Value as AvroValue};
1314
1315    #[test]
1316    fn manifest_entry() {
1317        let table_metadata = TableMetadataBuilder::default()
1318            .location("/")
1319            .current_schema_id(0)
1320            .schemas(HashMap::from_iter(vec![(
1321                0,
1322                Schema::builder()
1323                    .with_struct_field(StructField {
1324                        id: 0,
1325                        name: "date".to_string(),
1326                        required: true,
1327                        field_type: Type::Primitive(PrimitiveType::Date),
1328                        doc: None,
1329                    })
1330                    .build()
1331                    .unwrap(),
1332            )]))
1333            .default_spec_id(0)
1334            .partition_specs(HashMap::from_iter(vec![(
1335                0,
1336                PartitionSpec::builder()
1337                    .with_partition_field(PartitionField::new(0, 1000, "day", Transform::Day))
1338                    .build()
1339                    .unwrap(),
1340            )]))
1341            .build()
1342            .unwrap();
1343
1344        let manifest_entry = ManifestEntry {
1345            format_version: FormatVersion::V2,
1346            status: Status::Added,
1347            snapshot_id: Some(638933773299822130),
1348            sequence_number: Some(1),
1349            data_file: DataFile {
1350                content: Content::Data,
1351                file_path: "/".to_string(),
1352                file_format: FileFormat::Parquet,
1353                partition: Struct::from_iter(vec![("day".to_owned(), Some(Value::Int(1)))]),
1354                record_count: 4,
1355                file_size_in_bytes: 1200,
1356                column_sizes: None,
1357                value_counts: None,
1358                null_value_counts: None,
1359                nan_value_counts: None,
1360                lower_bounds: Some(HashMap::from_iter(vec![(0, Value::Date(0))])),
1361                upper_bounds: None,
1362                key_metadata: None,
1363                split_offsets: None,
1364                equality_ids: None,
1365                sort_order_id: None,
1366            },
1367        };
1368
1369        let partition_schema =
1370            partition_value_schema(&table_metadata.current_partition_fields(None).unwrap())
1371                .unwrap();
1372
1373        let schema = ManifestEntry::schema(&partition_schema, &FormatVersion::V2).unwrap();
1374
1375        // TODO: make this a correct partition spec
1376        let partition_spec = r#"[{
1377            "source-id": 4,
1378            "field-id": 1000,
1379            "name": "date",
1380            "transform": "day"
1381          }]"#;
1382        let partition_spec_id = "0";
1383        // TODO: make this a correct schema
1384        let table_schema = r#"{"schema": "0"}"#;
1385        let table_schema_id = "1";
1386        let format_version = FormatVersion::V1;
1387        let content = "DATA";
1388
1389        let meta: std::collections::HashMap<String, apache_avro::types::Value> =
1390            std::collections::HashMap::from_iter(vec![
1391                ("schema".to_string(), AvroValue::Bytes(table_schema.into())),
1392                (
1393                    "schema-id".to_string(),
1394                    AvroValue::Bytes(table_schema_id.into()),
1395                ),
1396                (
1397                    "partition-spec".to_string(),
1398                    AvroValue::Bytes(partition_spec.into()),
1399                ),
1400                (
1401                    "partition-spec-id".to_string(),
1402                    AvroValue::Bytes(partition_spec_id.into()),
1403                ),
1404                (
1405                    "format-version".to_string(),
1406                    AvroValue::Bytes(vec![u8::from(format_version)]),
1407                ),
1408                ("content".to_string(), AvroValue::Bytes(content.into())),
1409            ]);
1410        let mut writer = apache_avro::Writer::builder()
1411            .schema(&schema)
1412            .writer(vec![])
1413            .user_metadata(meta)
1414            .build();
1415        writer.append_ser(manifest_entry.clone()).unwrap();
1416
1417        let encoded = writer.into_inner().unwrap();
1418
1419        let reader = apache_avro::Reader::new(&encoded[..]).unwrap();
1420
1421        for value in reader {
1422            let entry = apache_avro::from_value::<ManifestEntryV2>(&value.unwrap()).unwrap();
1423            assert_eq!(
1424                manifest_entry,
1425                ManifestEntry::try_from_v2(
1426                    entry,
1427                    table_metadata.current_schema(None).unwrap(),
1428                    table_metadata.default_partition_spec().unwrap()
1429                )
1430                .unwrap()
1431            )
1432        }
1433    }
1434
1435    #[test]
1436    fn test_read_manifest_entry() {
1437        let table_metadata = TableMetadataBuilder::default()
1438            .location("/")
1439            .current_schema_id(0)
1440            .schemas(HashMap::from_iter(vec![(
1441                0,
1442                Schema::builder()
1443                    .with_struct_field(StructField {
1444                        id: 0,
1445                        name: "date".to_string(),
1446                        required: true,
1447                        field_type: Type::Primitive(PrimitiveType::Date),
1448                        doc: None,
1449                    })
1450                    .build()
1451                    .unwrap(),
1452            )]))
1453            .default_spec_id(0)
1454            .partition_specs(HashMap::from_iter(vec![(
1455                0,
1456                PartitionSpec::builder()
1457                    .with_partition_field(PartitionField::new(0, 1000, "day", Transform::Day))
1458                    .build()
1459                    .unwrap(),
1460            )]))
1461            .build()
1462            .unwrap();
1463
1464        let manifest_entry = ManifestEntry {
1465            format_version: FormatVersion::V2,
1466            status: Status::Added,
1467            snapshot_id: Some(638933773299822130),
1468            sequence_number: Some(1),
1469            data_file: DataFile {
1470                content: Content::Data,
1471                file_path: "/".to_string(),
1472                file_format: FileFormat::Parquet,
1473                partition: Struct::from_iter(vec![("day".to_owned(), Some(Value::Int(1)))]),
1474                record_count: 4,
1475                file_size_in_bytes: 1200,
1476                column_sizes: None,
1477                value_counts: None,
1478                null_value_counts: None,
1479                nan_value_counts: None,
1480                lower_bounds: Some(HashMap::from_iter(vec![(0, Value::Date(0))])),
1481                upper_bounds: None,
1482                key_metadata: None,
1483                split_offsets: None,
1484                equality_ids: None,
1485                sort_order_id: None,
1486            },
1487        };
1488
1489        let partition_schema =
1490            partition_value_schema(&table_metadata.current_partition_fields(None).unwrap())
1491                .unwrap();
1492
1493        let schema = ManifestEntry::schema(&partition_schema, &FormatVersion::V2).unwrap();
1494
1495        // TODO: make this a correct partition spec
1496        let partition_spec = r#"[{
1497                "source-id": 4,
1498                "field-id": 1000,
1499                "name": "date",
1500                "transform": "day"
1501              }]"#;
1502        let partition_spec_id = "0";
1503        // TODO: make this a correct schema
1504        let table_schema = r#"{"schema": "0"}"#;
1505        let table_schema_id = "1";
1506        let format_version = "1";
1507        let content = "DATA";
1508
1509        let meta: std::collections::HashMap<String, apache_avro::types::Value> =
1510            std::collections::HashMap::from_iter(vec![
1511                ("schema".to_string(), AvroValue::Bytes(table_schema.into())),
1512                (
1513                    "schema-id".to_string(),
1514                    AvroValue::Bytes(table_schema_id.into()),
1515                ),
1516                (
1517                    "partition-spec".to_string(),
1518                    AvroValue::Bytes(partition_spec.into()),
1519                ),
1520                (
1521                    "partition-spec-id".to_string(),
1522                    AvroValue::Bytes(partition_spec_id.into()),
1523                ),
1524                (
1525                    "format-version".to_string(),
1526                    AvroValue::Bytes(format_version.into()),
1527                ),
1528                ("content".to_string(), AvroValue::Bytes(content.into())),
1529            ]);
1530        let mut writer = apache_avro::Writer::builder()
1531            .schema(&schema)
1532            .writer(vec![])
1533            .user_metadata(meta)
1534            .build();
1535        writer.append_ser(manifest_entry.clone()).unwrap();
1536
1537        let encoded = writer.into_inner().unwrap();
1538
1539        let reader = apache_avro::Reader::new(&encoded[..]).unwrap();
1540        let record = reader.into_iter().next().unwrap().unwrap();
1541
1542        let metadata_entry = apache_avro::from_value::<ManifestEntryV2>(&record).unwrap();
1543        assert_eq!(
1544            manifest_entry,
1545            ManifestEntry::try_from_v2(
1546                metadata_entry,
1547                table_metadata.current_schema(None).unwrap(),
1548                table_metadata.default_partition_spec().unwrap()
1549            )
1550            .unwrap()
1551        );
1552    }
1553
1554    #[test]
1555    pub fn test_partition_values() {
1556        let partition_values = Struct::from_iter(vec![("day".to_owned(), Some(Value::Int(1)))]);
1557
1558        let part_field = PartitionField::new(4, 1000, "day", Transform::Day);
1559        let field = StructField {
1560            id: 4,
1561            name: "day".to_owned(),
1562            required: false,
1563            field_type: Type::Primitive(PrimitiveType::Int),
1564            doc: None,
1565        };
1566        let partition_fields = vec![BoundPartitionField::new(&part_field, &field)];
1567
1568        let raw_schema = partition_value_schema(&partition_fields).unwrap();
1569
1570        let schema = apache_avro::Schema::parse_str(&raw_schema).unwrap();
1571
1572        let mut writer = apache_avro::Writer::new(&schema, Vec::new());
1573
1574        writer.append_ser(partition_values.clone()).unwrap();
1575
1576        let encoded = writer.into_inner().unwrap();
1577
1578        let reader = apache_avro::Reader::new(&*encoded).unwrap();
1579
1580        for record in reader {
1581            let result = apache_avro::from_value::<Struct>(&record.unwrap()).unwrap();
1582            assert_eq!(partition_values, result);
1583        }
1584    }
1585}