Skip to main content

iceberg_rust_spec/spec/
materialized_view_metadata.rs

1//! Materialized view metadata types and functionality
2//!
3//! This module contains the types and implementations for managing materialized view metadata in Apache Iceberg.
4//! It includes structures for tracking view states, source tables, and refresh operations.
5//!
6//! The main types are:
7//! - [`MaterializedViewMetadata`]: The top-level metadata for a materialized view
8//! - [`RefreshState`]: Information about the last refresh operation
9//! - [`SourceStates`]: Collection of source states
10
11use std::{collections::HashMap, ops::Deref};
12
13use serde::{Deserialize, Serialize};
14use uuid::Uuid;
15
16use crate::{
17    identifier::{FullIdentifier, Identifier},
18    namespace::Namespace,
19};
20
21use super::{
22    tabular::TabularMetadataRef,
23    view_metadata::{GeneralViewMetadata, GeneralViewMetadataBuilder},
24};
25
26pub static REFRESH_STATE: &str = "refresh-state";
27
28/// Fields for the version 1 of the view metadata.
29pub type MaterializedViewMetadata = GeneralViewMetadata<Identifier>;
30/// Builder for materialized view metadata
31pub type MaterializedViewMetadataBuilder = GeneralViewMetadataBuilder<Identifier>;
32
33impl MaterializedViewMetadata {
34    pub fn as_ref(&self) -> TabularMetadataRef<'_> {
35        TabularMetadataRef::MaterializedView(self)
36    }
37}
38
39#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
40#[serde(rename_all = "kebab-case")]
41/// Freshness information of the materialized view
42pub struct RefreshState {
43    /// The version-id of the materialized view when the refresh operation was performed.
44    pub refresh_version_id: i64,
45    /// A map from sequence-id (as defined in the view lineage) to the source tables’ snapshot-id of when the last refresh operation was performed.
46    pub source_states: SourceStates,
47    // A timestamp of when the refresh operation was started
48    pub refresh_start_timestamp_ms: i64,
49}
50
51/// Represents a collection of source view states in a materialized view refresh
52///
53/// # Fields
54/// * `0` - A HashMap mapping (table UUID, optional reference) pairs to version IDs
55#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
56#[serde(from = "Vec<SourceState>", into = "Vec<SourceState>")]
57pub struct SourceStates(pub HashMap<FullIdentifier, SourceState>);
58
59#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
60#[serde(rename_all = "kebab-case", tag = "type")]
61pub enum SourceState {
62    Table(SourceTable),
63    View(SourceView),
64}
65
66/// Represents a source table state in a materialized view refresh
67///
68/// # Fields
69/// * `uuid` - The UUID of the source table
70/// * `snapshot_id` - The snapshot ID of the source table at refresh time
71/// * `ref` - Optional reference name (e.g. branch or tag) used to access the source table
72#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
73#[serde(rename_all = "kebab-case")]
74pub struct SourceTable {
75    name: String,
76    namespace: Namespace,
77    #[serde(skip_serializing_if = "Option::is_none")]
78    catalog: Option<String>,
79    uuid: Uuid,
80    snapshot_id: i64,
81    #[serde(skip_serializing_if = "Option::is_none")]
82    r#ref: Option<String>,
83}
84
85/// Represents a source view state in a materialized view refresh
86///
87/// # Fields
88/// * `uuid` - The UUID of the source view
89/// * `version_id` - The version ID of the source view at refresh time
90#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
91#[serde(rename_all = "kebab-case")]
92pub struct SourceView {
93    name: String,
94    namespace: Namespace,
95    #[serde(skip_serializing_if = "Option::is_none")]
96    catalog: Option<String>,
97    uuid: Uuid,
98    version_id: i64,
99}
100
101impl SourceTable {
102    /// Create a new SourceTable
103    pub fn new(
104        catalog: Option<&str>,
105        namespace: &[String],
106        name: &str,
107        uuid: Uuid,
108        snapshot_id: i64,
109        r#ref: Option<String>,
110    ) -> Self {
111        Self {
112            catalog: catalog.map(ToString::to_string),
113            namespace: Namespace(namespace.to_owned()),
114            name: name.to_owned(),
115            uuid,
116            snapshot_id,
117            r#ref,
118        }
119    }
120}
121
122impl SourceState {
123    /// Returns the snapshot_id for Table states, None for View states
124    pub fn snapshot_id(&self) -> Option<i64> {
125        match self {
126            SourceState::Table(t) => Some(t.snapshot_id),
127            SourceState::View(_) => None,
128        }
129    }
130}
131
132impl From<Vec<SourceState>> for SourceStates {
133    fn from(value: Vec<SourceState>) -> Self {
134        SourceStates(
135            value
136                .into_iter()
137                .map(|x| match &x {
138                    SourceState::Table(table) => (
139                        FullIdentifier::new(
140                            table.catalog.as_deref(),
141                            &table.namespace,
142                            &table.name,
143                        ),
144                        x,
145                    ),
146                    SourceState::View(view) => (
147                        FullIdentifier::new(view.catalog.as_deref(), &view.namespace, &view.name),
148                        x,
149                    ),
150                })
151                .collect(),
152        )
153    }
154}
155
156impl From<SourceStates> for Vec<SourceState> {
157    fn from(value: SourceStates) -> Self {
158        value.0.into_values().collect()
159    }
160}
161
162impl Deref for SourceStates {
163    type Target = HashMap<FullIdentifier, SourceState>;
164    fn deref(&self) -> &Self::Target {
165        &self.0
166    }
167}
168
169#[cfg(test)]
170mod tests {
171
172    use crate::{error::Error, spec::materialized_view_metadata::MaterializedViewMetadata};
173
174    #[test]
175    fn test_deserialize_materialized_view_metadata_v1() -> Result<(), Error> {
176        let data = r#"
177        {
178        "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
179        "format-version" : 1,
180        "location" : "s3://bucket/warehouse/default.db/event_agg",
181        "current-version-id" : 1,
182        "properties" : {
183            "comment" : "Daily event counts"
184        },
185        "versions" : [ {
186            "version-id" : 1,
187            "timestamp-ms" : 1573518431292,
188            "schema-id" : 1,
189            "default-catalog" : "prod",
190            "default-namespace" : [ "default" ],
191            "summary" : {
192            "operation" : "create",
193            "engine-name" : "Spark",
194            "engineVersion" : "3.3.2"
195            },
196            "representations" : [ {
197            "type" : "sql",
198            "sql" : "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
199            "dialect" : "spark"
200            } ],
201            "storage-table": {
202                "namespace": ["default"],
203                "name": "event_agg_storage"
204            }
205        } ],
206        "schemas": [ {
207            "schema-id": 1,
208            "type" : "struct",
209            "fields" : [ {
210            "id" : 1,
211            "name" : "event_count",
212            "required" : false,
213            "type" : "int",
214            "doc" : "Count of events"
215            }, {
216            "id" : 2,
217            "name" : "event_date",
218            "required" : false,
219            "type" : "date"
220            } ]
221        } ],
222        "version-log" : [ {
223            "timestamp-ms" : 1573518431292,
224            "version-id" : 1
225        } ]
226        }
227        "#;
228        let metadata = serde_json::from_str::<MaterializedViewMetadata>(data)
229            .expect("Failed to deserialize json");
230        //test serialise deserialise works.
231        let metadata_two: MaterializedViewMetadata = serde_json::from_str(
232            &serde_json::to_string(&metadata).expect("Failed to serialize metadata"),
233        )
234        .expect("Failed to serialize json");
235        assert_eq!(metadata, metadata_two);
236
237        Ok(())
238    }
239}