iceberg_rust_spec/spec/
table_metadata.rs

1//! Table metadata implementation for Iceberg tables
2//!
3//! This module contains the implementation of table metadata for Iceberg tables, including:
4//! - Table metadata structure and versioning (V1 and V2)
5//! - Schema management
6//! - Partition specifications
7//! - Sort orders
8//! - Snapshot management and history
9//! - Metadata properties and logging
10//!
11//! The table metadata format is defined in the [Iceberg Table Spec](https://iceberg.apache.org/spec/#table-metadata)
12
13use std::{
14    collections::HashMap,
15    fmt, str,
16    time::{SystemTime, UNIX_EPOCH},
17};
18
19use crate::{
20    error::Error,
21    partition::BoundPartitionField,
22    spec::{
23        partition::PartitionSpec,
24        sort::{self, SortOrder},
25    },
26};
27
28use serde::{Deserialize, Serialize};
29use serde_repr::{Deserialize_repr, Serialize_repr};
30use uuid::Uuid;
31
32use derive_builder::Builder;
33
34use super::{
35    schema::Schema,
36    snapshot::{Snapshot, SnapshotReference},
37    tabular::TabularMetadataRef,
38};
39
40pub static MAIN_BRANCH: &str = "main";
41static DEFAULT_SORT_ORDER_ID: i32 = 0;
42static DEFAULT_SPEC_ID: i32 = 0;
43
44// Properties
45
46pub const WRITE_PARQUET_COMPRESSION_CODEC: &str = "write.parquet.compression-codec";
47pub const WRITE_PARQUET_COMPRESSION_LEVEL: &str = "write.parquet.compression-level";
48pub const WRITE_OBJECT_STORAGE_ENABLED: &str = "write.object-storage.enabled";
49pub const WRITE_DATA_PATH: &str = "write.data.path";
50
51pub use _serde::{TableMetadataV1, TableMetadataV2};
52
53use _serde::TableMetadataEnum;
54
55#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)]
56#[serde(try_from = "TableMetadataEnum", into = "TableMetadataEnum")]
57/// Fields for the version 2 of the table metadata.
58pub struct TableMetadata {
59    #[builder(default)]
60    /// Integer Version for the format.
61    pub format_version: FormatVersion,
62    #[builder(default = "Uuid::new_v4()")]
63    /// A UUID that identifies the table
64    pub table_uuid: Uuid,
65    #[builder(setter(into))]
66    /// Location tables base location
67    pub location: String,
68    #[builder(default)]
69    /// The tables highest sequence number
70    pub last_sequence_number: i64,
71    #[builder(
72        default = "SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as i64"
73    )]
74    /// Timestamp in milliseconds from the unix epoch when the table was last updated.
75    pub last_updated_ms: i64,
76    #[builder(default)]
77    /// An integer; the highest assigned column ID for the table.
78    pub last_column_id: i32,
79    #[builder(setter(each(name = "with_schema")))]
80    /// A list of schemas, stored as objects with schema-id.
81    pub schemas: HashMap<i32, Schema>,
82    /// ID of the table’s current schema.
83    pub current_schema_id: i32,
84    #[builder(
85        setter(each(name = "with_partition_spec")),
86        default = "HashMap::from_iter(vec![(0,PartitionSpec::default())])"
87    )]
88    /// A list of partition specs, stored as full partition spec objects.
89    pub partition_specs: HashMap<i32, PartitionSpec>,
90    #[builder(default)]
91    /// ID of the “current” spec that writers should use by default.
92    pub default_spec_id: i32,
93    #[builder(default)]
94    /// An integer; the highest assigned partition field ID across all partition specs for the table.
95    pub last_partition_id: i32,
96    ///A string to string map of table properties. This is used to control settings that
97    /// affect reading and writing and is not intended to be used for arbitrary metadata.
98    /// For example, commit.retry.num-retries is used to control the number of commit retries.
99    #[builder(default)]
100    pub properties: HashMap<String, String>,
101    /// long ID of the current table snapshot; must be the same as the current
102    /// ID of the main branch in refs.
103    #[builder(default)]
104    pub current_snapshot_id: Option<i64>,
105    ///A list of valid snapshots. Valid snapshots are snapshots for which all
106    /// data files exist in the file system. A data file must not be deleted
107    /// from the file system until the last snapshot in which it was listed is
108    /// garbage collected.
109    #[builder(default)]
110    pub snapshots: HashMap<i64, Snapshot>,
111    /// A list (optional) of timestamp and snapshot ID pairs that encodes changes
112    /// to the current snapshot for the table. Each time the current-snapshot-id
113    /// is changed, a new entry should be added with the last-updated-ms
114    /// and the new current-snapshot-id. When snapshots are expired from
115    /// the list of valid snapshots, all entries before a snapshot that has
116    /// expired should be removed.
117    #[builder(default)]
118    pub snapshot_log: Vec<SnapshotLog>,
119
120    /// A list (optional) of timestamp and metadata file location pairs
121    /// that encodes changes to the previous metadata files for the table.
122    /// Each time a new metadata file is created, a new entry of the
123    /// previous metadata file location should be added to the list.
124    /// Tables can be configured to remove oldest metadata log entries and
125    /// keep a fixed-size log of the most recent entries after a commit.
126    #[builder(default)]
127    pub metadata_log: Vec<MetadataLog>,
128    #[builder(
129        setter(each(name = "with_sort_order")),
130        default = "HashMap::from_iter(vec![(0, SortOrder::default())])"
131    )]
132    /// A list of sort orders, stored as full sort order objects.
133    pub sort_orders: HashMap<i32, sort::SortOrder>,
134    #[builder(default)]
135    /// Default sort order id of the table. Note that this could be used by
136    /// writers, but is not used when reading because reads use the specs
137    /// stored in manifest files.
138    pub default_sort_order_id: i32,
139    ///A map of snapshot references. The map keys are the unique snapshot reference
140    /// names in the table, and the map values are snapshot reference objects.
141    /// There is always a main branch reference pointing to the current-snapshot-id
142    /// even if the refs map is null.
143    #[builder(default)]
144    pub refs: HashMap<String, SnapshotReference>,
145}
146
147impl TableMetadata {
148    /// Gets the current schema for a given branch, or the table's current schema if no branch is specified.
149    ///
150    /// # Arguments
151    /// * `branch` - Optional branch name to get the schema for
152    ///
153    /// # Returns
154    /// * `Result<&Schema, Error>` - The current schema, or an error if the schema cannot be found
155    #[inline]
156    pub fn current_schema(&self, branch: Option<&str>) -> Result<&Schema, Error> {
157        let schema_id = self
158            .current_snapshot(branch)?
159            .and_then(|x| *x.schema_id())
160            .unwrap_or(self.current_schema_id);
161        self.schemas
162            .get(&schema_id)
163            .ok_or_else(|| Error::InvalidFormat("schema".to_string()))
164    }
165
166    /// Gets the schema for a specific snapshot ID.
167    ///
168    /// # Arguments
169    /// * `snapshot_id` - The ID of the snapshot to get the schema for
170    ///
171    /// # Returns
172    /// * `Result<&Schema, Error>` - The schema for the snapshot, or an error if the schema cannot be found
173    #[inline]
174    pub fn schema(&self, snapshot_id: i64) -> Result<&Schema, Error> {
175        let schema_id = self
176            .snapshots
177            .get(&snapshot_id)
178            .and_then(|x| *x.schema_id())
179            .unwrap_or(self.current_schema_id);
180        self.schemas
181            .get(&schema_id)
182            .ok_or_else(|| Error::InvalidFormat("schema".to_string()))
183    }
184
185    /// Gets the default partition specification for the table
186    ///
187    /// # Returns
188    /// * `Result<&PartitionSpec, Error>` - The default partition spec, or an error if it cannot be found
189    #[inline]
190    pub fn default_partition_spec(&self) -> Result<&PartitionSpec, Error> {
191        self.partition_specs
192            .get(&self.default_spec_id)
193            .ok_or_else(|| Error::InvalidFormat("partition spec".to_string()))
194    }
195
196    /// Gets the current partition fields for a given branch, binding them to their source schema fields
197    ///
198    /// # Arguments
199    /// * `branch` - Optional branch name to get the partition fields for
200    ///
201    /// # Returns
202    /// * `Result<Vec<BoundPartitionField>, Error>` - Vector of partition fields bound to their source schema fields,
203    ///   or an error if the schema or partition spec cannot be found
204    pub fn current_partition_fields(
205        &self,
206        branch: Option<&str>,
207    ) -> Result<Vec<BoundPartitionField<'_>>, Error> {
208        let schema = self.current_schema(branch)?;
209        let partition_spec = self.default_partition_spec()?;
210        partition_fields(partition_spec, schema)
211    }
212
213    /// Gets the partition fields for a specific snapshot, binding them to their source schema fields
214    ///
215    /// # Arguments
216    /// * `snapshot_id` - The ID of the snapshot to get the partition fields for
217    ///
218    /// # Returns
219    /// * `Result<Vec<BoundPartitionField>, Error>` - Vector of partition fields bound to their source schema fields,
220    ///   or an error if the schema or partition spec cannot be found
221    pub fn partition_fields(
222        &self,
223        snapshot_id: i64,
224    ) -> Result<Vec<BoundPartitionField<'_>>, Error> {
225        let schema = self.schema(snapshot_id)?;
226        self.default_partition_spec()?
227            .fields()
228            .iter()
229            .map(|partition_field| {
230                let field =
231                    schema
232                        .get(*partition_field.source_id() as usize)
233                        .ok_or(Error::NotFound(format!(
234                            "Schema field with id {}",
235                            partition_field.source_id()
236                        )))?;
237                Ok(BoundPartitionField::new(partition_field, field))
238            })
239            .collect()
240    }
241
242    /// Gets the current snapshot for a given reference, or the table's current snapshot if no reference is specified
243    ///
244    /// # Arguments
245    /// * `snapshot_ref` - Optional snapshot reference name to get the snapshot for
246    ///
247    /// # Returns
248    /// * `Result<Option<&Snapshot>, Error>` - The current snapshot if it exists, None if there are no snapshots,
249    ///   or an error if the snapshots are in an invalid state
250    #[inline]
251    pub fn current_snapshot(&self, snapshot_ref: Option<&str>) -> Result<Option<&Snapshot>, Error> {
252        let snapshot_id = match snapshot_ref {
253            None => self
254                .refs
255                .get("main")
256                .map(|x| x.snapshot_id)
257                .or(self.current_snapshot_id),
258            Some(reference) => self.refs.get(reference).map(|x| x.snapshot_id),
259        };
260        match snapshot_id {
261            Some(snapshot_id) => Ok(self.snapshots.get(&snapshot_id)),
262            None => {
263                if self.snapshots.is_empty()
264                    || (snapshot_ref.is_some() && snapshot_ref != Some("main"))
265                {
266                    Ok(None)
267                } else {
268                    Err(Error::InvalidFormat("snapshots".to_string()))
269                }
270            }
271        }
272    }
273
274    /// Gets a mutable reference to the current snapshot for a given reference, or the table's current snapshot if no reference is specified
275    ///
276    /// # Arguments
277    /// * `snapshot_ref` - Optional snapshot reference name to get the snapshot for
278    ///
279    /// # Returns
280    /// * `Result<Option<&mut Snapshot>, Error>` - Mutable reference to the current snapshot if it exists, None if there are no snapshots,
281    ///   or an error if the snapshots are in an invalid state
282    #[inline]
283    pub fn current_snapshot_mut(
284        &mut self,
285        snapshot_ref: Option<String>,
286    ) -> Result<Option<&mut Snapshot>, Error> {
287        let snapshot_id = match &snapshot_ref {
288            None => self
289                .refs
290                .get("main")
291                .map(|x| x.snapshot_id)
292                .or(self.current_snapshot_id),
293            Some(reference) => self.refs.get(reference).map(|x| x.snapshot_id),
294        };
295        match snapshot_id {
296            Some(-1) => {
297                if self.snapshots.is_empty()
298                    || (snapshot_ref.is_some() && snapshot_ref.as_deref() != Some("main"))
299                {
300                    Ok(None)
301                } else {
302                    Err(Error::InvalidFormat("snapshots".to_string()))
303                }
304            }
305            Some(snapshot_id) => Ok(self.snapshots.get_mut(&snapshot_id)),
306            None => {
307                if self.snapshots.is_empty()
308                    || (snapshot_ref.is_some() && snapshot_ref.as_deref() != Some("main"))
309                {
310                    Ok(None)
311                } else {
312                    Err(Error::InvalidFormat("snapshots".to_string()))
313                }
314            }
315        }
316    }
317
318    /// Gets the sequence number for a specific snapshot
319    ///
320    /// # Arguments
321    /// * `snapshot_id` - The ID of the snapshot to get the sequence number for
322    ///
323    /// # Returns
324    /// * `Option<i64>` - The sequence number if the snapshot exists, None otherwise
325    pub fn sequence_number(&self, snapshot_id: i64) -> Option<i64> {
326        self.snapshots
327            .get(&snapshot_id)
328            .map(|x| *x.sequence_number())
329    }
330
331    pub fn as_ref(&self) -> TabularMetadataRef<'_> {
332        TabularMetadataRef::Table(self)
333    }
334}
335
336pub fn partition_fields<'a>(
337    partition_spec: &'a PartitionSpec,
338    schema: &'a Schema,
339) -> Result<Vec<BoundPartitionField<'a>>, Error> {
340    partition_spec
341        .fields()
342        .iter()
343        .map(|partition_field| {
344            let field =
345                schema
346                    .get(*partition_field.source_id() as usize)
347                    .ok_or(Error::NotFound(format!(
348                        "Schema field with id {}",
349                        partition_field.source_id()
350                    )))?;
351            Ok(BoundPartitionField::new(partition_field, field))
352        })
353        .collect()
354}
355
356/// Creates a new metadata file location for a table
357///
358/// # Arguments
359/// * `metadata` - The table metadata to create a location for
360///
361/// # Returns
362/// * `String` - The path where the new metadata file should be stored
363pub fn new_metadata_location<'a, T: Into<TabularMetadataRef<'a>>>(metadata: T) -> String {
364    let metadata: TabularMetadataRef = metadata.into();
365    let transaction_uuid = Uuid::new_v4();
366    let version = metadata.sequence_number();
367
368    format!(
369        "{}/metadata/{:05}-{}.metadata.json",
370        metadata.location(),
371        version,
372        transaction_uuid
373    )
374}
375
376impl fmt::Display for TableMetadata {
377    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
378        write!(
379            f,
380            "{}",
381            &serde_json::to_string(self).map_err(|_| fmt::Error)?,
382        )
383    }
384}
385
386impl str::FromStr for TableMetadata {
387    type Err = Error;
388    fn from_str(s: &str) -> Result<Self, Self::Err> {
389        serde_json::from_str(s).map_err(Error::from)
390    }
391}
392
393pub mod _serde {
394    use std::collections::HashMap;
395
396    use itertools::Itertools;
397    use serde::{Deserialize, Serialize};
398    use uuid::Uuid;
399
400    use crate::{
401        error::Error,
402        spec::{
403            partition::{PartitionField, PartitionSpec},
404            schema,
405            snapshot::{
406                SnapshotReference, SnapshotRetention,
407                _serde::{SnapshotV1, SnapshotV2},
408            },
409            sort,
410        },
411    };
412
413    use super::{
414        FormatVersion, MetadataLog, SnapshotLog, TableMetadata, VersionNumber,
415        DEFAULT_SORT_ORDER_ID, DEFAULT_SPEC_ID, MAIN_BRANCH,
416    };
417
418    /// Metadata of an iceberg table
419    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
420    #[serde(untagged)]
421    pub(super) enum TableMetadataEnum {
422        /// Version 2 of the table metadata
423        V2(TableMetadataV2),
424        /// Version 1 of the table metadata
425        V1(TableMetadataV1),
426    }
427
428    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
429    #[serde(rename_all = "kebab-case")]
430    /// Fields for the version 2 of the table metadata.
431    pub struct TableMetadataV2 {
432        /// Integer Version for the format.
433        pub format_version: VersionNumber<2>,
434        /// A UUID that identifies the table
435        pub table_uuid: Uuid,
436        /// Location tables base location
437        pub location: String,
438        /// The tables highest sequence number
439        pub last_sequence_number: i64,
440        /// Timestamp in milliseconds from the unix epoch when the table was last updated.
441        pub last_updated_ms: i64,
442        /// An integer; the highest assigned column ID for the table.
443        pub last_column_id: i32,
444        /// A list of schemas, stored as objects with schema-id.
445        pub schemas: Vec<schema::SchemaV2>,
446        /// ID of the table’s current schema.
447        pub current_schema_id: i32,
448        /// A list of partition specs, stored as full partition spec objects.
449        pub partition_specs: Vec<PartitionSpec>,
450        /// ID of the “current” spec that writers should use by default.
451        pub default_spec_id: i32,
452        /// An integer; the highest assigned partition field ID across all partition specs for the table.
453        pub last_partition_id: i32,
454        ///A string to string map of table properties. This is used to control settings that
455        /// affect reading and writing and is not intended to be used for arbitrary metadata.
456        /// For example, commit.retry.num-retries is used to control the number of commit retries.
457        #[serde(skip_serializing_if = "HashMap::is_empty", default)]
458        pub properties: HashMap<String, String>,
459        /// long ID of the current table snapshot; must be the same as the current
460        /// ID of the main branch in refs.
461        #[serde(skip_serializing_if = "Option::is_none")]
462        pub current_snapshot_id: Option<i64>,
463        ///A list of valid snapshots. Valid snapshots are snapshots for which all
464        /// data files exist in the file system. A data file must not be deleted
465        /// from the file system until the last snapshot in which it was listed is
466        /// garbage collected.
467        #[serde(skip_serializing_if = "Option::is_none")]
468        pub snapshots: Option<Vec<SnapshotV2>>,
469        /// A list (optional) of timestamp and snapshot ID pairs that encodes changes
470        /// to the current snapshot for the table. Each time the current-snapshot-id
471        /// is changed, a new entry should be added with the last-updated-ms
472        /// and the new current-snapshot-id. When snapshots are expired from
473        /// the list of valid snapshots, all entries before a snapshot that has
474        /// expired should be removed.
475        #[serde(skip_serializing_if = "Vec::is_empty", default)]
476        pub snapshot_log: Vec<SnapshotLog>,
477
478        /// A list (optional) of timestamp and metadata file location pairs
479        /// that encodes changes to the previous metadata files for the table.
480        /// Each time a new metadata file is created, a new entry of the
481        /// previous metadata file location should be added to the list.
482        /// Tables can be configured to remove oldest metadata log entries and
483        /// keep a fixed-size log of the most recent entries after a commit.
484        #[serde(skip_serializing_if = "Vec::is_empty", default)]
485        pub metadata_log: Vec<MetadataLog>,
486
487        /// A list of sort orders, stored as full sort order objects.
488        pub sort_orders: Vec<sort::SortOrder>,
489        /// Default sort order id of the table. Note that this could be used by
490        /// writers, but is not used when reading because reads use the specs
491        /// stored in manifest files.
492        pub default_sort_order_id: i32,
493        ///A map of snapshot references. The map keys are the unique snapshot reference
494        /// names in the table, and the map values are snapshot reference objects.
495        /// There is always a main branch reference pointing to the current-snapshot-id
496        /// even if the refs map is null.
497        #[serde(skip_serializing_if = "HashMap::is_empty", default)]
498        pub refs: HashMap<String, SnapshotReference>,
499    }
500
501    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
502    #[serde(rename_all = "kebab-case")]
503    /// Fields for the version 1 of the table metadata.
504    pub struct TableMetadataV1 {
505        /// Integer Version for the format.
506        pub format_version: VersionNumber<1>,
507        /// A UUID that identifies the table
508        #[serde(skip_serializing_if = "Option::is_none")]
509        pub table_uuid: Option<Uuid>,
510        /// Location tables base location
511        pub location: String,
512        /// Timestamp in milliseconds from the unix epoch when the table was last updated.
513        pub last_updated_ms: i64,
514        /// An integer; the highest assigned column ID for the table.
515        pub last_column_id: i32,
516        /// The table’s current schema.
517        pub schema: schema::SchemaV1,
518        /// A list of schemas, stored as objects with schema-id.
519        #[serde(skip_serializing_if = "Option::is_none")]
520        pub schemas: Option<Vec<schema::SchemaV1>>,
521        /// ID of the table’s current schema.
522        #[serde(skip_serializing_if = "Option::is_none")]
523        pub current_schema_id: Option<i32>,
524        /// The table’s current partition spec, stored as only fields. Note that this is used by writers to partition data,
525        /// but is not used when reading because reads use the specs stored in manifest files.
526        pub partition_spec: Vec<PartitionField>,
527        /// A list of partition specs, stored as full partition spec objects.
528        #[serde(skip_serializing_if = "Option::is_none")]
529        pub partition_specs: Option<Vec<PartitionSpec>>,
530        /// ID of the “current” spec that writers should use by default.
531        #[serde(skip_serializing_if = "Option::is_none")]
532        pub default_spec_id: Option<i32>,
533        /// An integer; the highest assigned partition field ID across all partition specs for the table.
534        #[serde(skip_serializing_if = "Option::is_none")]
535        pub last_partition_id: Option<i32>,
536        ///A string to string map of table properties. This is used to control settings that
537        /// affect reading and writing and is not intended to be used for arbitrary metadata.
538        /// For example, commit.retry.num-retries is used to control the number of commit retries.
539        #[serde(skip_serializing_if = "HashMap::is_empty", default)]
540        pub properties: HashMap<String, String>,
541        /// long ID of the current table snapshot; must be the same as the current
542        /// ID of the main branch in refs.
543        #[serde(skip_serializing_if = "Option::is_none")]
544        pub current_snapshot_id: Option<i64>,
545        ///A list of valid snapshots. Valid snapshots are snapshots for which all
546        /// data files exist in the file system. A data file must not be deleted
547        /// from the file system until the last snapshot in which it was listed is
548        /// garbage collected.
549        #[serde(skip_serializing_if = "Option::is_none")]
550        pub snapshots: Option<Vec<SnapshotV1>>,
551        /// A list (optional) of timestamp and snapshot ID pairs that encodes changes
552        /// to the current snapshot for the table. Each time the current-snapshot-id
553        /// is changed, a new entry should be added with the last-updated-ms
554        /// and the new current-snapshot-id. When snapshots are expired from
555        /// the list of valid snapshots, all entries before a snapshot that has
556        /// expired should be removed.
557        #[serde(skip_serializing_if = "Vec::is_empty", default)]
558        pub snapshot_log: Vec<SnapshotLog>,
559
560        /// A list (optional) of timestamp and metadata file location pairs
561        /// that encodes changes to the previous metadata files for the table.
562        /// Each time a new metadata file is created, a new entry of the
563        /// previous metadata file location should be added to the list.
564        /// Tables can be configured to remove oldest metadata log entries and
565        /// keep a fixed-size log of the most recent entries after a commit.
566        #[serde(skip_serializing_if = "Vec::is_empty", default)]
567        pub metadata_log: Vec<MetadataLog>,
568
569        /// A list of sort orders, stored as full sort order objects.
570        pub sort_orders: Option<Vec<sort::SortOrder>>,
571        /// Default sort order id of the table. Note that this could be used by
572        /// writers, but is not used when reading because reads use the specs
573        /// stored in manifest files.
574        pub default_sort_order_id: Option<i32>,
575    }
576
577    impl TryFrom<TableMetadataEnum> for TableMetadata {
578        type Error = Error;
579        fn try_from(value: TableMetadataEnum) -> Result<Self, Error> {
580            match value {
581                TableMetadataEnum::V2(value) => value.try_into(),
582                TableMetadataEnum::V1(value) => value.try_into(),
583            }
584        }
585    }
586
587    impl From<TableMetadata> for TableMetadataEnum {
588        fn from(value: TableMetadata) -> Self {
589            match value.format_version {
590                FormatVersion::V2 => TableMetadataEnum::V2(value.into()),
591                FormatVersion::V1 => TableMetadataEnum::V1(value.into()),
592            }
593        }
594    }
595
596    impl TryFrom<TableMetadataV2> for TableMetadata {
597        type Error = Error;
598        fn try_from(value: TableMetadataV2) -> Result<Self, Error> {
599            let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
600                None
601            } else {
602                value.current_snapshot_id
603            };
604            let schemas = HashMap::from_iter(
605                value
606                    .schemas
607                    .into_iter()
608                    .map(|schema| Ok((schema.schema_id, schema.try_into()?)))
609                    .collect::<Result<Vec<_>, Error>>()?,
610            );
611            let mut refs = value.refs;
612            if let Some(snapshot_id) = current_snapshot_id {
613                refs.entry(MAIN_BRANCH.to_string())
614                    .or_insert(SnapshotReference {
615                        snapshot_id,
616                        retention: SnapshotRetention::default(),
617                    });
618            }
619            Ok(TableMetadata {
620                format_version: FormatVersion::V2,
621                table_uuid: value.table_uuid,
622                location: value.location,
623                last_sequence_number: value.last_sequence_number,
624                last_updated_ms: value.last_updated_ms,
625                last_column_id: value.last_column_id,
626                current_schema_id: if schemas.keys().contains(&value.current_schema_id) {
627                    Ok(value.current_schema_id)
628                } else {
629                    Err(Error::InvalidFormat("schema".to_string()))
630                }?,
631                schemas,
632                partition_specs: HashMap::from_iter(
633                    value.partition_specs.into_iter().map(|x| (*x.spec_id(), x)),
634                ),
635                default_spec_id: value.default_spec_id,
636                last_partition_id: value.last_partition_id,
637                properties: value.properties,
638                current_snapshot_id,
639                snapshots: value
640                    .snapshots
641                    .map(|snapshots| {
642                        HashMap::from_iter(snapshots.into_iter().map(|x| (x.snapshot_id, x.into())))
643                    })
644                    .unwrap_or_default(),
645                snapshot_log: value.snapshot_log,
646                metadata_log: value.metadata_log,
647                sort_orders: HashMap::from_iter(
648                    value.sort_orders.into_iter().map(|x| (x.order_id, x)),
649                ),
650                default_sort_order_id: value.default_sort_order_id,
651                refs,
652            })
653        }
654    }
655
656    impl TryFrom<TableMetadataV1> for TableMetadata {
657        type Error = Error;
658        fn try_from(value: TableMetadataV1) -> Result<Self, Error> {
659            let schemas = value
660                .schemas
661                .map(|schemas| {
662                    Ok::<_, Error>(HashMap::from_iter(
663                        schemas
664                            .into_iter()
665                            .enumerate()
666                            .map(|(i, schema)| {
667                                Ok((schema.schema_id.unwrap_or(i as i32), schema.try_into()?))
668                            })
669                            .collect::<Result<Vec<_>, Error>>()?
670                            .into_iter(),
671                    ))
672                })
673                .or_else(|| {
674                    Some(Ok(HashMap::from_iter(vec![(
675                        value.schema.schema_id.unwrap_or(0),
676                        value.schema.try_into().ok()?,
677                    )])))
678                })
679                .transpose()?
680                .unwrap_or_default();
681            let partition_specs = HashMap::from_iter(
682                value
683                    .partition_specs
684                    .unwrap_or_else(|| {
685                        vec![PartitionSpec::builder()
686                            .with_spec_id(DEFAULT_SPEC_ID)
687                            .with_fields(value.partition_spec)
688                            .build()
689                            .unwrap()]
690                    })
691                    .into_iter()
692                    .map(|x| (*x.spec_id(), x)),
693            );
694            Ok(TableMetadata {
695                format_version: FormatVersion::V1,
696                table_uuid: value.table_uuid.unwrap_or_default(),
697                location: value.location,
698                last_sequence_number: 0,
699                last_updated_ms: value.last_updated_ms,
700                last_column_id: value.last_column_id,
701                current_schema_id: value
702                    .current_schema_id
703                    .unwrap_or_else(|| schemas.keys().copied().max().unwrap_or_default()),
704                default_spec_id: value
705                    .default_spec_id
706                    .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()),
707                last_partition_id: value
708                    .last_partition_id
709                    .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()),
710                partition_specs,
711                schemas,
712
713                properties: value.properties,
714                current_snapshot_id: if let &Some(id) = &value.current_snapshot_id {
715                    if id == -1 {
716                        None
717                    } else {
718                        Some(id)
719                    }
720                } else {
721                    value.current_snapshot_id
722                },
723                snapshots: value
724                    .snapshots
725                    .map(|snapshots| {
726                        Ok::<_, Error>(HashMap::from_iter(
727                            snapshots
728                                .into_iter()
729                                .map(|x| Ok((x.snapshot_id, x.into())))
730                                .collect::<Result<Vec<_>, Error>>()?,
731                        ))
732                    })
733                    .transpose()?
734                    .unwrap_or_default(),
735                snapshot_log: value.snapshot_log,
736                metadata_log: value.metadata_log,
737                sort_orders: match value.sort_orders {
738                    Some(sort_orders) => {
739                        HashMap::from_iter(sort_orders.into_iter().map(|x| (x.order_id, x)))
740                    }
741                    None => HashMap::new(),
742                },
743                default_sort_order_id: value.default_sort_order_id.unwrap_or(DEFAULT_SORT_ORDER_ID),
744                refs: HashMap::from_iter(vec![(
745                    MAIN_BRANCH.to_string(),
746                    SnapshotReference {
747                        snapshot_id: value.current_snapshot_id.unwrap_or_default(),
748                        retention: SnapshotRetention::Branch {
749                            min_snapshots_to_keep: None,
750                            max_snapshot_age_ms: None,
751                            max_ref_age_ms: None,
752                        },
753                    },
754                )]),
755            })
756        }
757    }
758
759    impl From<TableMetadata> for TableMetadataV2 {
760        fn from(v: TableMetadata) -> Self {
761            TableMetadataV2 {
762                format_version: VersionNumber::<2>,
763                table_uuid: v.table_uuid,
764                location: v.location,
765                last_sequence_number: v.last_sequence_number,
766                last_updated_ms: v.last_updated_ms,
767                last_column_id: v.last_column_id,
768                schemas: v.schemas.into_values().map(|x| x.into()).collect(),
769                current_schema_id: v.current_schema_id,
770                partition_specs: v.partition_specs.into_values().collect(),
771                default_spec_id: v.default_spec_id,
772                last_partition_id: v.last_partition_id,
773                properties: v.properties,
774                current_snapshot_id: v.current_snapshot_id.or(Some(-1)),
775                snapshots: Some(v.snapshots.into_values().map(|x| x.into()).collect()),
776                snapshot_log: v.snapshot_log,
777                metadata_log: v.metadata_log,
778                sort_orders: v.sort_orders.into_values().collect(),
779                default_sort_order_id: v.default_sort_order_id,
780                refs: v.refs,
781            }
782        }
783    }
784
785    impl From<TableMetadata> for TableMetadataV1 {
786        fn from(v: TableMetadata) -> Self {
787            TableMetadataV1 {
788                format_version: VersionNumber::<1>,
789                table_uuid: Some(v.table_uuid),
790                location: v.location,
791                last_updated_ms: v.last_updated_ms,
792                last_column_id: v.last_column_id,
793                schema: v.schemas.get(&v.current_schema_id).unwrap().clone().into(),
794                schemas: Some(v.schemas.into_values().map(|x| x.into()).collect()),
795                current_schema_id: Some(v.current_schema_id),
796                partition_spec: v
797                    .partition_specs
798                    .get(&v.default_spec_id)
799                    .map(|x| x.fields().clone())
800                    .unwrap_or_default(),
801                partition_specs: Some(v.partition_specs.into_values().collect()),
802                default_spec_id: Some(v.default_spec_id),
803                last_partition_id: Some(v.last_partition_id),
804                properties: v.properties,
805                current_snapshot_id: v.current_snapshot_id.or(Some(-1)),
806                snapshots: Some(v.snapshots.into_values().map(|x| x.into()).collect()),
807                snapshot_log: v.snapshot_log,
808                metadata_log: v.metadata_log,
809                sort_orders: Some(v.sort_orders.into_values().collect()),
810                default_sort_order_id: Some(v.default_sort_order_id),
811            }
812        }
813    }
814}
815
816/// Helper to serialize and deserialize the format version.
817#[derive(Debug, PartialEq, Eq, Clone)]
818pub struct VersionNumber<const V: u8>;
819
820impl<const V: u8> Serialize for VersionNumber<V> {
821    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
822    where
823        S: serde::Serializer,
824    {
825        serializer.serialize_u8(V)
826    }
827}
828
829impl<'de, const V: u8> Deserialize<'de> for VersionNumber<V> {
830    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
831    where
832        D: serde::Deserializer<'de>,
833    {
834        let value = u8::deserialize(deserializer)?;
835        if value == V {
836            Ok(VersionNumber::<V>)
837        } else {
838            Err(serde::de::Error::custom("Invalid Version"))
839        }
840    }
841}
842
843#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
844#[serde(rename_all = "kebab-case")]
845/// Encodes changes to the previous metadata files for the table
846pub struct MetadataLog {
847    /// The file for the log.
848    pub metadata_file: String,
849    /// Time new metadata was created
850    pub timestamp_ms: i64,
851}
852
853#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
854#[serde(rename_all = "kebab-case")]
855/// A log of when each snapshot was made.
856pub struct SnapshotLog {
857    /// Id of the snapshot.
858    pub snapshot_id: i64,
859    /// Last updated timestamp
860    pub timestamp_ms: i64,
861}
862
863#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy)]
864#[repr(u8)]
865/// Iceberg format version
866#[derive(Default)]
867pub enum FormatVersion {
868    /// Iceberg spec version 1
869    V1 = b'1',
870    /// Iceberg spec version 2
871    #[default]
872    V2 = b'2',
873}
874
875impl TryFrom<u8> for FormatVersion {
876    type Error = Error;
877    fn try_from(value: u8) -> Result<Self, Self::Error> {
878        match value {
879            1 => Ok(FormatVersion::V1),
880            2 => Ok(FormatVersion::V2),
881            _ => Err(Error::Conversion(
882                "u8".to_string(),
883                "format version".to_string(),
884            )),
885        }
886    }
887}
888
889impl From<FormatVersion> for u8 {
890    fn from(value: FormatVersion) -> Self {
891        match value {
892            FormatVersion::V1 => b'1',
893            FormatVersion::V2 => b'2',
894        }
895    }
896}
897
898impl From<FormatVersion> for i32 {
899    fn from(value: FormatVersion) -> Self {
900        match value {
901            FormatVersion::V1 => 1,
902            FormatVersion::V2 => 2,
903        }
904    }
905}
906
907#[cfg(test)]
908mod tests {
909
910    use std::{collections::HashMap, fs};
911
912    use uuid::Uuid;
913
914    use crate::{
915        error::Error,
916        spec::{
917            partition::{PartitionField, PartitionSpec, Transform},
918            schema::SchemaBuilder,
919            snapshot::{Operation, SnapshotBuilder, SnapshotReference, SnapshotRetention, Summary},
920            sort::{NullOrder, SortDirection, SortField, SortOrderBuilder},
921            table_metadata::TableMetadata,
922            types::{PrimitiveType, StructField, Type},
923        },
924    };
925
926    use super::{FormatVersion, SnapshotLog};
927
928    fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
929        let desered_type: TableMetadata = serde_json::from_str(json).unwrap();
930        assert_eq!(desered_type, expected_type);
931
932        let sered_json = serde_json::to_string(&expected_type).unwrap();
933        let parsed_json_value = serde_json::from_str::<TableMetadata>(&sered_json).unwrap();
934
935        assert_eq!(parsed_json_value, desered_type);
936    }
937
938    #[test]
939    fn test_deserialize_table_data_v2() -> Result<(), Error> {
940        let data = r#"
941            {
942                "format-version" : 2,
943                "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
944                "location": "s3://b/wh/data.db/table",
945                "last-sequence-number" : 1,
946                "last-updated-ms": 1515100955770,
947                "last-column-id": 1,
948                "schemas": [
949                    {
950                        "schema-id" : 1,
951                        "type" : "struct",
952                        "fields" :[
953                            {
954                                "id": 1,
955                                "name": "struct_name",
956                                "required": true,
957                                "type": "fixed[1]"
958                            }
959                        ]
960                    }
961                ],
962                "current-schema-id" : 1,
963                "partition-specs": [
964                    {
965                        "spec-id": 1,
966                        "fields": [
967                            {  
968                                "source-id": 4,  
969                                "field-id": 1000,  
970                                "name": "ts_day",  
971                                "transform": "day"
972                            } 
973                        ]
974                    }
975                ],
976                "default-spec-id": 1,
977                "last-partition-id": 1,
978                "properties": {
979                    "commit.retry.num-retries": "1"
980                },
981                "metadata-log": [
982                    {  
983                        "metadata-file": "s3://bucket/.../v1.json",  
984                        "timestamp-ms": 1515100
985                    }
986                ],
987                "sort-orders": [],
988                "default-sort-order-id": 0
989            }
990        "#;
991        let metadata =
992            serde_json::from_str::<TableMetadata>(data).expect("Failed to deserialize json");
993        //test serialise deserialise works.
994        let metadata_two: TableMetadata = serde_json::from_str(
995            &serde_json::to_string(&metadata).expect("Failed to serialize metadata"),
996        )
997        .expect("Failed to serialize json");
998        assert_eq!(metadata, metadata_two);
999
1000        Ok(())
1001    }
1002
1003    #[test]
1004    fn test_deserialize_table_data_v1() -> Result<(), Error> {
1005        let data = r#"
1006        {
1007            "format-version" : 1,
1008            "table-uuid" : "df838b92-0b32-465d-a44e-d39936e538b7",
1009            "location" : "/home/iceberg/warehouse/nyc/taxis",
1010            "last-updated-ms" : 1662532818843,
1011            "last-column-id" : 5,
1012            "schema" : {
1013              "type" : "struct",
1014              "schema-id" : 0,
1015              "fields" : [ {
1016                "id" : 1,
1017                "name" : "vendor_id",
1018                "required" : false,
1019                "type" : "long"
1020              }, {
1021                "id" : 2,
1022                "name" : "trip_id",
1023                "required" : false,
1024                "type" : "long"
1025              }, {
1026                "id" : 3,
1027                "name" : "trip_distance",
1028                "required" : false,
1029                "type" : "float"
1030              }, {
1031                "id" : 4,
1032                "name" : "fare_amount",
1033                "required" : false,
1034                "type" : "double"
1035              }, {
1036                "id" : 5,
1037                "name" : "store_and_fwd_flag",
1038                "required" : false,
1039                "type" : "string"
1040              } ]
1041            },
1042            "current-schema-id" : 0,
1043            "schemas" : [ {
1044              "type" : "struct",
1045              "schema-id" : 0,
1046              "fields" : [ {
1047                "id" : 1,
1048                "name" : "vendor_id",
1049                "required" : false,
1050                "type" : "long"
1051              }, {
1052                "id" : 2,
1053                "name" : "trip_id",
1054                "required" : false,
1055                "type" : "long"
1056              }, {
1057                "id" : 3,
1058                "name" : "trip_distance",
1059                "required" : false,
1060                "type" : "float"
1061              }, {
1062                "id" : 4,
1063                "name" : "fare_amount",
1064                "required" : false,
1065                "type" : "double"
1066              }, {
1067                "id" : 5,
1068                "name" : "store_and_fwd_flag",
1069                "required" : false,
1070                "type" : "string"
1071              } ]
1072            } ],
1073            "partition-spec" : [ {
1074              "name" : "vendor_id",
1075              "transform" : "identity",
1076              "source-id" : 1,
1077              "field-id" : 1000
1078            } ],
1079            "default-spec-id" : 0,
1080            "partition-specs" : [ {
1081              "spec-id" : 0,
1082              "fields" : [ {
1083                "name" : "vendor_id",
1084                "transform" : "identity",
1085                "source-id" : 1,
1086                "field-id" : 1000
1087              } ]
1088            } ],
1089            "last-partition-id" : 1000,
1090            "default-sort-order-id" : 0,
1091            "sort-orders" : [ {
1092              "order-id" : 0,
1093              "fields" : [ ]
1094            } ],
1095            "properties" : {
1096              "owner" : "root"
1097            },
1098            "current-snapshot-id" : 638933773299822130,
1099            "refs" : {
1100              "main" : {
1101                "snapshot-id" : 638933773299822130,
1102                "type" : "branch"
1103              }
1104            },
1105            "snapshots" : [ {
1106              "snapshot-id" : 638933773299822130,
1107              "timestamp-ms" : 1662532818843,
1108              "summary" : {
1109                "operation" : "append",
1110                "spark.app.id" : "local-1662532784305",
1111                "added-data-files" : "4",
1112                "added-records" : "4",
1113                "added-files-size" : "6001",
1114                "changed-partition-count" : "2",
1115                "total-records" : "4",
1116                "total-files-size" : "6001",
1117                "total-data-files" : "4",
1118                "total-delete-files" : "0",
1119                "total-position-deletes" : "0",
1120                "total-equality-deletes" : "0"
1121              },
1122              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1123              "schema-id" : 0
1124            } ],
1125            "snapshot-log" : [ {
1126              "timestamp-ms" : 1662532818843,
1127              "snapshot-id" : 638933773299822130
1128            } ],
1129            "metadata-log" : [ {
1130              "timestamp-ms" : 1662532805245,
1131              "metadata-file" : "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json"
1132            } ]
1133          }
1134        "#;
1135        let metadata =
1136            serde_json::from_str::<TableMetadata>(data).expect("Failed to deserialize json");
1137        //test serialise deserialise works.
1138        let metadata_two: TableMetadata = serde_json::from_str(
1139            &serde_json::to_string(&metadata).expect("Failed to serialize metadata"),
1140        )
1141        .expect("Failed to serialize json");
1142        assert_eq!(metadata, metadata_two);
1143
1144        Ok(())
1145    }
1146
1147    #[test]
1148    fn test_table_metadata_v2_file_valid() {
1149        let metadata =
1150            fs::read_to_string("testdata/table_metadata/TableMetadataV2Valid.json").unwrap();
1151
1152        let schema1 = SchemaBuilder::default()
1153            .with_schema_id(0)
1154            .with_struct_field(StructField {
1155                id: 1,
1156                name: "x".to_owned(),
1157                required: true,
1158                field_type: Type::Primitive(PrimitiveType::Long),
1159                doc: None,
1160            })
1161            .build()
1162            .unwrap();
1163
1164        let schema2 = SchemaBuilder::default()
1165            .with_schema_id(1)
1166            .with_struct_field(StructField {
1167                id: 1,
1168                name: "x".to_owned(),
1169                required: true,
1170                field_type: Type::Primitive(PrimitiveType::Long),
1171                doc: None,
1172            })
1173            .with_struct_field(StructField {
1174                id: 2,
1175                name: "y".to_owned(),
1176                required: true,
1177                field_type: Type::Primitive(PrimitiveType::Long),
1178                doc: Some("comment".to_owned()),
1179            })
1180            .with_struct_field(StructField {
1181                id: 3,
1182                name: "z".to_owned(),
1183                required: true,
1184                field_type: Type::Primitive(PrimitiveType::Long),
1185                doc: None,
1186            })
1187            .with_identifier_field_ids(vec![1, 2])
1188            .build()
1189            .unwrap();
1190
1191        let partition_spec = PartitionSpec::builder()
1192            .with_partition_field(PartitionField::new(1, 1000, "x", Transform::Identity))
1193            .build()
1194            .unwrap();
1195
1196        let sort_order = SortOrderBuilder::default()
1197            .with_order_id(3)
1198            .with_sort_field(SortField {
1199                source_id: 2,
1200                transform: Transform::Identity,
1201                direction: SortDirection::Ascending,
1202                null_order: NullOrder::First,
1203            })
1204            .with_sort_field(SortField {
1205                source_id: 3,
1206                transform: Transform::Bucket(4),
1207                direction: SortDirection::Descending,
1208                null_order: NullOrder::Last,
1209            })
1210            .build()
1211            .unwrap();
1212
1213        let snapshot1 = SnapshotBuilder::default()
1214            .with_snapshot_id(3051729675574597004)
1215            .with_timestamp_ms(1515100955770)
1216            .with_sequence_number(0)
1217            .with_manifest_list("s3://a/b/1.avro".to_string())
1218            .with_summary(Summary {
1219                operation: Operation::Append,
1220                other: HashMap::new(),
1221            })
1222            .build()
1223            .expect("Failed to create snapshot");
1224
1225        let snapshot2 = SnapshotBuilder::default()
1226            .with_snapshot_id(3055729675574597004)
1227            .with_parent_snapshot_id(3051729675574597004)
1228            .with_timestamp_ms(1555100955770)
1229            .with_sequence_number(1)
1230            .with_schema_id(1)
1231            .with_manifest_list("s3://a/b/2.avro".to_string())
1232            .with_summary(Summary {
1233                operation: Operation::Append,
1234                other: HashMap::new(),
1235            })
1236            .build()
1237            .expect("Failed to create snapshot");
1238
1239        let expected = TableMetadata {
1240            format_version: FormatVersion::V2,
1241            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
1242            location: "s3://bucket/test/location".to_string(),
1243            last_updated_ms: 1602638573590,
1244            last_column_id: 3,
1245            schemas: HashMap::from_iter(vec![(0, schema1), (1, schema2)]),
1246            current_schema_id: 1,
1247            partition_specs: HashMap::from_iter(vec![(0, partition_spec)]),
1248            default_spec_id: 0,
1249            last_partition_id: 1000,
1250            default_sort_order_id: 3,
1251            sort_orders: HashMap::from_iter(vec![(3, sort_order)]),
1252            snapshots: HashMap::from_iter(vec![
1253                (3051729675574597004, snapshot1),
1254                (3055729675574597004, snapshot2),
1255            ]),
1256            current_snapshot_id: Some(3055729675574597004),
1257            last_sequence_number: 34,
1258            properties: HashMap::new(),
1259            snapshot_log: vec![
1260                SnapshotLog {
1261                    snapshot_id: 3051729675574597004,
1262                    timestamp_ms: 1515100955770,
1263                },
1264                SnapshotLog {
1265                    snapshot_id: 3055729675574597004,
1266                    timestamp_ms: 1555100955770,
1267                },
1268            ],
1269            metadata_log: Vec::new(),
1270            refs: HashMap::from_iter(vec![(
1271                "main".to_string(),
1272                SnapshotReference {
1273                    snapshot_id: 3055729675574597004,
1274                    retention: SnapshotRetention::Branch {
1275                        min_snapshots_to_keep: None,
1276                        max_snapshot_age_ms: None,
1277                        max_ref_age_ms: None,
1278                    },
1279                },
1280            )]),
1281        };
1282
1283        check_table_metadata_serde(&metadata, expected);
1284    }
1285
1286    #[test]
1287    fn test_table_metadata_v2_file_valid_minimal() {
1288        let metadata =
1289            fs::read_to_string("testdata/table_metadata/TableMetadataV2ValidMinimal.json").unwrap();
1290
1291        let schema = SchemaBuilder::default()
1292            .with_schema_id(0)
1293            .with_struct_field(StructField {
1294                id: 1,
1295                name: "x".to_owned(),
1296                required: true,
1297                field_type: Type::Primitive(PrimitiveType::Long),
1298                doc: None,
1299            })
1300            .with_struct_field(StructField {
1301                id: 2,
1302                name: "y".to_owned(),
1303                required: true,
1304                field_type: Type::Primitive(PrimitiveType::Long),
1305                doc: Some("comment".to_owned()),
1306            })
1307            .with_struct_field(StructField {
1308                id: 3,
1309                name: "z".to_owned(),
1310                required: true,
1311                field_type: Type::Primitive(PrimitiveType::Long),
1312                doc: None,
1313            })
1314            .build()
1315            .unwrap();
1316
1317        let partition_spec = PartitionSpec::builder()
1318            .with_partition_field(PartitionField::new(1, 1000, "x", Transform::Identity))
1319            .build()
1320            .unwrap();
1321
1322        let sort_order = SortOrderBuilder::default()
1323            .with_order_id(3)
1324            .with_sort_field(SortField {
1325                source_id: 2,
1326                transform: Transform::Identity,
1327                direction: SortDirection::Ascending,
1328                null_order: NullOrder::First,
1329            })
1330            .with_sort_field(SortField {
1331                source_id: 3,
1332                transform: Transform::Bucket(4),
1333                direction: SortDirection::Descending,
1334                null_order: NullOrder::Last,
1335            })
1336            .build()
1337            .unwrap();
1338
1339        let expected = TableMetadata {
1340            format_version: FormatVersion::V2,
1341            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
1342            location: "s3://bucket/test/location".to_string(),
1343            last_updated_ms: 1602638573590,
1344            last_column_id: 3,
1345            schemas: HashMap::from_iter(vec![(0, schema)]),
1346            current_schema_id: 0,
1347            partition_specs: HashMap::from_iter(vec![(0, partition_spec)]),
1348            default_spec_id: 0,
1349            last_partition_id: 1000,
1350            default_sort_order_id: 3,
1351            sort_orders: HashMap::from_iter(vec![(3, sort_order)]),
1352            snapshots: HashMap::default(),
1353            current_snapshot_id: None,
1354            last_sequence_number: 34,
1355            properties: HashMap::new(),
1356            snapshot_log: vec![],
1357            metadata_log: Vec::new(),
1358            refs: HashMap::new(),
1359        };
1360
1361        check_table_metadata_serde(&metadata, expected);
1362    }
1363
1364    #[test]
1365    fn test_table_metadata_v1_file_valid() {
1366        let metadata =
1367            fs::read_to_string("testdata/table_metadata/TableMetadataV1Valid.json").unwrap();
1368
1369        let schema = SchemaBuilder::default()
1370            .with_schema_id(0)
1371            .with_struct_field(StructField {
1372                id: 1,
1373                name: "x".to_owned(),
1374                required: true,
1375                field_type: Type::Primitive(PrimitiveType::Long),
1376                doc: None,
1377            })
1378            .with_struct_field(StructField {
1379                id: 2,
1380                name: "y".to_owned(),
1381                required: true,
1382                field_type: Type::Primitive(PrimitiveType::Long),
1383                doc: Some("comment".to_owned()),
1384            })
1385            .with_struct_field(StructField {
1386                id: 3,
1387                name: "z".to_owned(),
1388                required: true,
1389                field_type: Type::Primitive(PrimitiveType::Long),
1390                doc: None,
1391            })
1392            .build()
1393            .unwrap();
1394
1395        let partition_spec = PartitionSpec::builder()
1396            .with_partition_field(PartitionField::new(1, 1000, "x", Transform::Identity))
1397            .build()
1398            .unwrap();
1399
1400        let expected = TableMetadata {
1401            format_version: FormatVersion::V1,
1402            table_uuid: Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap(),
1403            location: "s3://bucket/test/location".to_string(),
1404            last_updated_ms: 1602638573874,
1405            last_column_id: 3,
1406            schemas: HashMap::from_iter(vec![(0, schema)]),
1407            current_schema_id: 0,
1408            partition_specs: HashMap::from_iter(vec![(0, partition_spec)]),
1409            default_spec_id: 0,
1410            last_partition_id: 0,
1411            default_sort_order_id: 0,
1412            sort_orders: HashMap::new(),
1413            snapshots: HashMap::new(),
1414            current_snapshot_id: None,
1415            last_sequence_number: 0,
1416            properties: HashMap::new(),
1417            snapshot_log: vec![],
1418            metadata_log: Vec::new(),
1419            refs: HashMap::from_iter(vec![(
1420                "main".to_string(),
1421                SnapshotReference {
1422                    snapshot_id: -1,
1423                    retention: SnapshotRetention::Branch {
1424                        min_snapshots_to_keep: None,
1425                        max_snapshot_age_ms: None,
1426                        max_ref_age_ms: None,
1427                    },
1428                },
1429            )]),
1430        };
1431
1432        check_table_metadata_serde(&metadata, expected);
1433    }
1434
1435    #[test]
1436    fn test_table_metadata_v2_missing_sort_order() {
1437        let metadata =
1438            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSortOrder.json")
1439                .unwrap();
1440
1441        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
1442
1443        assert_eq!(
1444            desered.unwrap_err().to_string(),
1445            "data did not match any variant of untagged enum TableMetadataEnum"
1446        )
1447    }
1448}