iceberg_rust_spec/spec/
view_metadata.rs

1//! View metadata implementation for Iceberg views
2//!
3//! This module contains the implementation of view metadata for Iceberg views, including:
4//! - View metadata structure and versioning (V1)
5//! - Schema management
6//! - View versions and history
7//! - View representations (SQL)
8//! - Metadata properties and logging
9//!
10//! The view metadata format is defined in the [Iceberg View Spec](https://iceberg.apache.org/spec/#view-metadata)
11
12use std::{
13    collections::HashMap,
14    fmt::{self},
15    str,
16    time::{SystemTime, UNIX_EPOCH},
17};
18
19use derive_builder::Builder;
20use derive_getters::Getters;
21use serde::{Deserialize, Serialize};
22use serde_repr::{Deserialize_repr, Serialize_repr};
23use uuid::Uuid;
24
25use crate::{error::Error, identifier::FullIdentifier};
26
27use super::{
28    schema::{Schema, DEFAULT_SCHEMA_ID},
29    tabular::TabularMetadataRef,
30};
31
32pub use _serde::ViewMetadataV1;
33
34use _serde::ViewMetadataEnum;
35
36/// Prefix used to denote branch references in the view properties
37pub static REF_PREFIX: &str = "ref-";
38
39/// Default version id
40pub static DEFAULT_VERSION_ID: i64 = 0;
41
42/// Fields for the version 1 of the view metadata.
43pub type ViewMetadata = GeneralViewMetadata<Option<()>>;
44/// Builder for the view metadata
45pub type ViewMetadataBuilder = GeneralViewMetadataBuilder<Option<()>>;
46
47#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)]
48#[serde(try_from = "ViewMetadataEnum<T>", into = "ViewMetadataEnum<T>")]
49/// Fields for the version 1 of the view metadata.
50pub struct GeneralViewMetadata<T: Materialization> {
51    #[builder(default = "Uuid::new_v4()")]
52    /// A UUID that identifies the view, generated when the view is created. Implementations must throw an exception if a view’s UUID does not match the expected UUID after refreshing metadata
53    pub view_uuid: Uuid,
54    #[builder(default)]
55    /// An integer version number for the view format; must be 1
56    pub format_version: FormatVersion,
57    #[builder(setter(into))]
58    /// The view’s base location. This is used to determine where to store manifest files and view metadata files.
59    pub location: String,
60    /// Current version of the view. Set to ‘1’ when the view is first created.
61    pub current_version_id: i64,
62    #[builder(setter(each(name = "with_version")), default)]
63    /// An array of structs describing the last known versions of the view. Controlled by the table property: “version.history.num-entries”. See section Versions.
64    pub versions: HashMap<i64, Version<T>>,
65    #[builder(default)]
66    /// A list of timestamp and version ID pairs that encodes changes to the current version for the view.
67    /// Each time the current-version-id is changed, a new entry should be added with the last-updated-ms and the new current-version-id.
68    pub version_log: Vec<VersionLogStruct>,
69    #[builder(setter(each(name = "with_schema")), default)]
70    /// A list of schemas, the same as the ‘schemas’ field from Iceberg table spec.
71    pub schemas: HashMap<i32, Schema>,
72    #[builder(default)]
73    /// A string to string map of view properties. This is used for metadata such as “comment” and for settings that affect view maintenance.
74    /// This is not intended to be used for arbitrary metadata.
75    pub properties: HashMap<String, String>,
76}
77
78impl<T: Materialization> GeneralViewMetadata<T> {
79    /// Gets the current schema for a given branch, or the view's current schema if no branch is specified
80    ///
81    /// # Arguments
82    /// * `branch` - Optional branch name to get the schema for
83    ///
84    /// # Returns
85    /// * `Result<&Schema, Error>` - The current schema, or an error if the schema cannot be found
86    #[inline]
87    pub fn current_schema(&self, branch: Option<&str>) -> Result<&Schema, Error> {
88        let id = self.current_version(branch)?.schema_id;
89        self.schemas
90            .get(&id)
91            .ok_or_else(|| Error::InvalidFormat("view metadata".to_string()))
92    }
93
94    /// Gets the schema for a specific version ID
95    ///
96    /// # Arguments
97    /// * `version_id` - The ID of the version to get the schema for
98    ///
99    /// # Returns
100    /// * `Result<&Schema, Error>` - The schema for the version, or an error if the schema cannot be found
101    #[inline]
102    pub fn schema(&self, version_id: i64) -> Result<&Schema, Error> {
103        let id = self
104            .versions
105            .get(&version_id)
106            .ok_or_else(|| Error::NotFound(format!("View version {version_id}")))?
107            .schema_id;
108        self.schemas
109            .get(&id)
110            .ok_or_else(|| Error::InvalidFormat("view metadata".to_string()))
111    }
112
113    /// Gets the current version for a given reference, or the view's current version if no reference is specified
114    ///
115    /// # Arguments
116    /// * `snapshot_ref` - Optional snapshot reference name to get the version for
117    ///
118    /// # Returns
119    /// * `Result<&Version<T>, Error>` - The current version, or an error if the version cannot be found
120    #[inline]
121    pub fn current_version(&self, snapshot_ref: Option<&str>) -> Result<&Version<T>, Error> {
122        let version_id: i64 = match snapshot_ref {
123            None => self.current_version_id,
124            Some(reference) => self
125                .properties
126                .get(&(REF_PREFIX.to_string() + reference))
127                .and_then(|x| x.parse().ok())
128                .unwrap_or(self.current_version_id),
129        };
130        self.versions
131            .get(&version_id)
132            .ok_or_else(|| Error::InvalidFormat("view metadata".to_string()))
133    }
134
135    /// Adds a new schema to the view metadata
136    ///
137    /// # Arguments
138    /// * `schema` - The schema to add
139    #[inline]
140    pub fn add_schema(&mut self, schema: Schema) {
141        self.schemas.insert(*schema.schema_id(), schema);
142    }
143}
144
145impl ViewMetadata {
146    pub fn as_ref(&self) -> TabularMetadataRef<'_> {
147        TabularMetadataRef::View(self)
148    }
149}
150
151impl fmt::Display for ViewMetadata {
152    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153        write!(
154            f,
155            "{}",
156            &serde_json::to_string(self).map_err(|_| fmt::Error)?,
157        )
158    }
159}
160
161impl str::FromStr for ViewMetadata {
162    type Err = Error;
163    fn from_str(s: &str) -> Result<Self, Self::Err> {
164        serde_json::from_str(s).map_err(Error::from)
165    }
166}
167
168mod _serde {
169    use std::collections::HashMap;
170
171    use serde::{Deserialize, Serialize};
172    use uuid::Uuid;
173
174    use crate::{
175        error::Error,
176        spec::{schema::SchemaV2, table_metadata::VersionNumber},
177    };
178
179    use super::{FormatVersion, GeneralViewMetadata, Materialization, Version, VersionLogStruct};
180
181    /// Metadata of an iceberg view
182    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
183    #[serde(untagged)]
184    pub(super) enum ViewMetadataEnum<T: Materialization> {
185        /// Version 1 of the table metadata
186        V1(ViewMetadataV1<T>),
187    }
188
189    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
190    #[serde(rename_all = "kebab-case")]
191    /// Fields for the version 1 of the view metadata.
192    pub struct ViewMetadataV1<T: Materialization> {
193        /// A UUID that identifies the view, generated when the view is created. Implementations must throw an exception if a view’s UUID does not match the expected UUID after refreshing metadata
194        pub view_uuid: Uuid,
195        /// An integer version number for the view format; must be 1
196        pub format_version: VersionNumber<1>,
197        /// The view’s base location. This is used to determine where to store manifest files and view metadata files.
198        pub location: String,
199        /// Current version of the view. Set to ‘1’ when the view is first created.
200        pub current_version_id: i64,
201        /// An array of structs describing the last known versions of the view. Controlled by the table property: “version.history.num-entries”. See section Versions.
202        pub versions: Vec<Version<T>>,
203        /// A list of timestamp and version ID pairs that encodes changes to the current version for the view.
204        /// Each time the current-version-id is changed, a new entry should be added with the last-updated-ms and the new current-version-id.
205        pub version_log: Vec<VersionLogStruct>,
206        /// A list of schemas, the same as the ‘schemas’ field from Iceberg table spec.
207        pub schemas: Vec<SchemaV2>,
208        /// A string to string map of view properties. This is used for metadata such as “comment” and for settings that affect view maintenance.
209        /// This is not intended to be used for arbitrary metadata.
210        #[serde(skip_serializing_if = "Option::is_none")]
211        pub properties: Option<HashMap<String, String>>,
212    }
213
214    impl<T: Materialization> TryFrom<ViewMetadataEnum<T>> for GeneralViewMetadata<T> {
215        type Error = Error;
216        fn try_from(value: ViewMetadataEnum<T>) -> Result<Self, Self::Error> {
217            match value {
218                ViewMetadataEnum::V1(metadata) => metadata.try_into(),
219            }
220        }
221    }
222
223    impl<T: Materialization> From<GeneralViewMetadata<T>> for ViewMetadataEnum<T> {
224        fn from(value: GeneralViewMetadata<T>) -> Self {
225            match value.format_version {
226                FormatVersion::V1 => ViewMetadataEnum::V1(value.into()),
227            }
228        }
229    }
230
231    impl<T: Materialization> TryFrom<ViewMetadataV1<T>> for GeneralViewMetadata<T> {
232        type Error = Error;
233        fn try_from(value: ViewMetadataV1<T>) -> Result<Self, Self::Error> {
234            Ok(GeneralViewMetadata {
235                view_uuid: value.view_uuid,
236                format_version: FormatVersion::V1,
237                location: value.location,
238                current_version_id: value.current_version_id,
239                versions: HashMap::from_iter(value.versions.into_iter().map(|x| (x.version_id, x))),
240                version_log: value.version_log,
241                properties: value.properties.unwrap_or_default(),
242                schemas: HashMap::from_iter(
243                    value
244                        .schemas
245                        .into_iter()
246                        .map(|x| Ok((x.schema_id, x.try_into()?)))
247                        .collect::<Result<Vec<_>, Error>>()?,
248                ),
249            })
250        }
251    }
252
253    impl<T: Materialization> From<GeneralViewMetadata<T>> for ViewMetadataV1<T> {
254        fn from(value: GeneralViewMetadata<T>) -> Self {
255            ViewMetadataV1 {
256                view_uuid: value.view_uuid,
257                format_version: VersionNumber::<1>,
258                location: value.location,
259                current_version_id: value.current_version_id,
260                versions: value.versions.into_values().collect(),
261                version_log: value.version_log,
262                properties: if value.properties.is_empty() {
263                    None
264                } else {
265                    Some(value.properties)
266                },
267                schemas: value.schemas.into_values().map(Into::into).collect(),
268            }
269        }
270    }
271}
272
273#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone)]
274#[repr(u8)]
275/// Iceberg format version
276#[derive(Default)]
277pub enum FormatVersion {
278    /// Iceberg spec version 1
279    #[default]
280    V1 = b'1',
281}
282
283impl TryFrom<u8> for FormatVersion {
284    type Error = Error;
285    fn try_from(value: u8) -> Result<Self, Self::Error> {
286        match value {
287            1 => Ok(FormatVersion::V1),
288            _ => Err(Error::Conversion(
289                "u8".to_string(),
290                "format version".to_string(),
291            )),
292        }
293    }
294}
295
296impl From<FormatVersion> for u8 {
297    fn from(value: FormatVersion) -> Self {
298        match value {
299            FormatVersion::V1 => b'1',
300        }
301    }
302}
303
304impl From<FormatVersion> for i32 {
305    fn from(value: FormatVersion) -> Self {
306        match value {
307            FormatVersion::V1 => 1,
308        }
309    }
310}
311
312#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder, Getters)]
313#[serde(rename_all = "kebab-case")]
314/// Fields for the version 2 of the view metadata.
315pub struct Version<T: Materialization> {
316    /// Monotonically increasing id indicating the version of the view. Starts with 1.
317    #[builder(default = "DEFAULT_VERSION_ID")]
318    pub version_id: i64,
319    /// ID of the schema for the view version
320    #[builder(default = "DEFAULT_SCHEMA_ID")]
321    pub schema_id: i32,
322    #[builder(
323        default = "SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as i64"
324    )]
325    /// Timestamp expressed in ms since epoch at which the version of the view was created.
326    pub timestamp_ms: i64,
327    #[builder(default)]
328    /// A string map summarizes the version changes, including operation, described in Summary.
329    pub summary: Summary,
330    #[builder(setter(each(name = "with_representation")), default)]
331    /// A list of “representations” as described in Representations.
332    pub representations: Vec<ViewRepresentation>,
333    #[builder(setter(strip_option), default)]
334    #[serde(skip_serializing_if = "Option::is_none")]
335    /// A string specifying the catalog to use when the table or view references in the view definition do not contain an explicit catalog.
336    pub default_catalog: Option<String>,
337    #[builder(setter(strip_option), default)]
338    /// The namespace to use when the table or view references in the view definition do not contain an explicit namespace.
339    /// Since the namespace may contain multiple parts, it is serialized as a list of strings.
340    pub default_namespace: Vec<String>,
341    /// Full identifier record of the storage table
342    #[builder(default)]
343    #[serde(skip_serializing_if = "Materialization::is_none")]
344    pub storage_table: T,
345}
346
347pub trait Materialization: Clone + Default {
348    fn is_none(&self) -> bool;
349}
350
351impl Materialization for Option<()> {
352    fn is_none(&self) -> bool {
353        true
354    }
355}
356
357impl Materialization for FullIdentifier {
358    fn is_none(&self) -> bool {
359        false
360    }
361}
362
363impl<T: Materialization> Version<T> {
364    pub fn builder() -> VersionBuilder<T> {
365        VersionBuilder::default()
366    }
367}
368
369impl<T: Materialization + Serialize> fmt::Display for Version<T> {
370    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
371        write!(
372            f,
373            "{}",
374            &serde_json::to_string(self).map_err(|_| fmt::Error)?,
375        )
376    }
377}
378
379impl<T: Materialization + for<'de> Deserialize<'de>> str::FromStr for Version<T> {
380    type Err = Error;
381    fn from_str(s: &str) -> Result<Self, Self::Err> {
382        serde_json::from_str(s).map_err(Error::from)
383    }
384}
385
386#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
387#[serde(rename_all = "kebab-case")]
388/// Fields for the version 2 of the view metadata.
389pub struct VersionLogStruct {
390    /// The timestamp when the referenced version was made the current version
391    pub timestamp_ms: i64,
392    /// Version id of the view
393    pub version_id: i64,
394}
395
396#[derive(Debug, PartialEq, Eq, Clone)]
397/// View operation that create the metadata file
398#[derive(Default)]
399pub enum Operation {
400    /// Create view
401    #[default]
402    Create,
403    /// Replace view
404    Replace,
405}
406
407/// Serialize for PrimitiveType wit special handling for
408/// Decimal and Fixed types.
409impl Serialize for Operation {
410    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
411    where
412        S: serde::Serializer,
413    {
414        use Operation::*;
415        match self {
416            Create => serializer.serialize_str("create"),
417            Replace => serializer.serialize_str("replace"),
418        }
419    }
420}
421
422/// Serialize for PrimitiveType wit special handling for
423/// Decimal and Fixed types.
424impl<'de> Deserialize<'de> for Operation {
425    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
426    where
427        D: serde::Deserializer<'de>,
428    {
429        let s = String::deserialize(deserializer)?;
430        if s == "create" {
431            Ok(Operation::Create)
432        } else if s == "replace" {
433            Ok(Operation::Replace)
434        } else {
435            Err(serde::de::Error::custom("Invalid view operation."))
436        }
437    }
438}
439
440#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)]
441#[serde(rename_all = "kebab-case")]
442/// Fields for the version 2 of the view metadata.
443pub struct Summary {
444    /// A string value indicating the view operation that caused this metadata to be created. Allowed values are “create” and “replace”.
445    pub operation: Operation,
446    #[serde(skip_serializing_if = "Option::is_none")]
447    /// Name of the engine that created the view version
448    pub engine_name: Option<String>,
449    #[serde(skip_serializing_if = "Option::is_none")]
450    /// A string value indicating the version of the engine that performed the operation
451    pub engine_version: Option<String>,
452}
453
454#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
455#[serde(rename_all = "kebab-case", tag = "type")]
456/// Fields for the version 2 of the view metadata.
457pub enum ViewRepresentation {
458    #[serde(rename = "sql")]
459    /// This type of representation stores the original view definition in SQL and its SQL dialect.
460    Sql {
461        /// A string representing the original view definition in SQL
462        sql: String,
463        /// A string specifying the dialect of the ‘sql’ field. It can be used by the engines to detect the SQL dialect.
464        dialect: String,
465    },
466}
467
468impl ViewRepresentation {
469    pub fn sql(sql: &str, dialect: Option<&str>) -> Self {
470        ViewRepresentation::Sql {
471            sql: sql.to_owned(),
472            dialect: dialect.unwrap_or("ansi").to_owned(),
473        }
474    }
475}
476
477#[cfg(test)]
478mod tests {
479
480    use crate::{error::Error, spec::view_metadata::ViewMetadata};
481
482    #[test]
483    fn test_deserialize_view_data_v1() -> Result<(), Error> {
484        let data = r#"
485        {
486        "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
487        "format-version" : 1,
488        "location" : "s3://bucket/warehouse/default.db/event_agg",
489        "current-version-id" : 1,
490        "properties" : {
491            "comment" : "Daily event counts"
492        },
493        "versions" : [ {
494            "version-id" : 1,
495            "timestamp-ms" : 1573518431292,
496            "schema-id" : 1,
497            "default-catalog" : "prod",
498            "default-namespace" : [ "default" ],
499            "summary" : {
500            "operation" : "create",
501            "engine-name" : "Spark",
502            "engineVersion" : "3.3.2"
503            },
504            "representations" : [ {
505            "type" : "sql",
506            "sql" : "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
507            "dialect" : "spark"
508            } ]
509        } ],
510        "schemas": [ {
511            "schema-id": 1,
512            "type" : "struct",
513            "fields" : [ {
514            "id" : 1,
515            "name" : "event_count",
516            "required" : false,
517            "type" : "int",
518            "doc" : "Count of events"
519            }, {
520            "id" : 2,
521            "name" : "event_date",
522            "required" : false,
523            "type" : "date"
524            } ]
525        } ],
526        "version-log" : [ {
527            "timestamp-ms" : 1573518431292,
528            "version-id" : 1
529        } ]
530        }
531        "#;
532        let metadata =
533            serde_json::from_str::<ViewMetadata>(data).expect("Failed to deserialize json");
534        //test serialise deserialise works.
535        let metadata_two: ViewMetadata = serde_json::from_str(
536            &serde_json::to_string(&metadata).expect("Failed to serialize metadata"),
537        )
538        .expect("Failed to serialize json");
539        assert_eq!(metadata, metadata_two);
540
541        Ok(())
542    }
543}