Skip to main content

iceberg_rust/materialized_view/transaction/
mod.rs

1/*!
2 * Defines the [Transaction] type for materialized views to perform multiple [Operation]s with ACID guarantees.
3*/
4
5use std::collections::HashMap;
6
7use iceberg_rust_spec::{
8    materialized_view_metadata::{RefreshState, REFRESH_STATE},
9    spec::{manifest::DataFile, types::StructType, view_metadata::ViewRepresentation},
10};
11
12use crate::{
13    catalog::commit::{CommitTable, CommitView},
14    error::Error,
15    table::{
16        delete_all_table_files,
17        transaction::{operation::Operation as TableOperation, APPEND_INDEX, REPLACE_INDEX},
18    },
19    view::transaction::operation::Operation as ViewOperation,
20};
21
22use super::MaterializedView;
23
24/// Transactions let you perform a sequence of [Operation]s that can be committed to be performed with ACID guarantees.
25pub struct Transaction<'view> {
26    materialized_view: &'view mut MaterializedView,
27    view_operations: Vec<ViewOperation>,
28    storage_table_operations: Vec<Option<TableOperation>>,
29    branch: Option<String>,
30}
31
32impl<'view> Transaction<'view> {
33    /// Create a transaction for the given view.
34    pub fn new(view: &'view mut MaterializedView, branch: Option<&str>) -> Self {
35        Transaction {
36            materialized_view: view,
37            view_operations: vec![],
38            storage_table_operations: (0..6).map(|_| None).collect(), // 6 operation types
39            branch: branch.map(ToString::to_string),
40        }
41    }
42
43    /// Update the schmema of the view
44    pub fn update_representations(
45        mut self,
46        representations: Vec<ViewRepresentation>,
47        schema: StructType,
48    ) -> Self {
49        self.view_operations
50            .push(ViewOperation::UpdateRepresentations {
51                representations,
52                schema,
53                branch: self.branch.clone(),
54            });
55        self
56    }
57
58    /// Update view properties
59    pub fn update_properties(mut self, entries: Vec<(String, String)>) -> Self {
60        self.view_operations
61            .push(ViewOperation::UpdateProperties(entries));
62        self
63    }
64
65    /// Perform full refresh operation
66    pub fn full_refresh(
67        mut self,
68        files: Vec<DataFile>,
69        refresh_state: RefreshState,
70    ) -> Result<Self, Error> {
71        let refresh_state = serde_json::to_string(&refresh_state)?;
72        if let Some(ref mut operation) = self.storage_table_operations[REPLACE_INDEX] {
73            if let TableOperation::Replace {
74                branch: _,
75                files: old,
76                additional_summary: old_lineage,
77            } = operation
78            {
79                old.extend_from_slice(&files);
80                *old_lineage = Some(HashMap::from_iter(vec![(
81                    REFRESH_STATE.to_owned(),
82                    refresh_state.clone(),
83                )]));
84            }
85        } else {
86            self.storage_table_operations[REPLACE_INDEX] = Some(TableOperation::Replace {
87                branch: self.branch.clone(),
88                files,
89                additional_summary: Some(HashMap::from_iter(vec![(
90                    REFRESH_STATE.to_owned(),
91                    refresh_state,
92                )])),
93            });
94        }
95        Ok(self)
96    }
97
98    /// Append files to the storage table
99    pub fn append(
100        mut self,
101        files: Vec<DataFile>,
102        refresh_state: RefreshState,
103    ) -> Result<Self, Error> {
104        let refresh_state = serde_json::to_string(&refresh_state)?;
105        if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] {
106            if let TableOperation::Append {
107                branch: _,
108                data_files: old,
109                delete_files: _,
110                additional_summary: old_lineage,
111            } = operation
112            {
113                old.extend_from_slice(&files);
114                *old_lineage = Some(HashMap::from_iter(vec![(
115                    REFRESH_STATE.to_owned(),
116                    refresh_state.clone(),
117                )]));
118            }
119        } else {
120            self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append {
121                branch: self.branch.clone(),
122                data_files: files,
123                delete_files: Vec::new(),
124                additional_summary: Some(HashMap::from_iter(vec![(
125                    REFRESH_STATE.to_owned(),
126                    refresh_state,
127                )])),
128            });
129        }
130        Ok(self)
131    }
132
133    /// Append files to the storage table
134    pub fn delete(
135        mut self,
136        files: Vec<DataFile>,
137        refresh_state: RefreshState,
138    ) -> Result<Self, Error> {
139        let refresh_state = serde_json::to_string(&refresh_state)?;
140        if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] {
141            if let TableOperation::Append {
142                branch: _,
143                data_files: _,
144                delete_files: old,
145                additional_summary: old_lineage,
146            } = operation
147            {
148                old.extend_from_slice(&files);
149                *old_lineage = Some(HashMap::from_iter(vec![(
150                    REFRESH_STATE.to_owned(),
151                    refresh_state.clone(),
152                )]));
153            }
154        } else {
155            self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append {
156                branch: self.branch.clone(),
157                data_files: Vec::new(),
158                delete_files: files,
159                additional_summary: Some(HashMap::from_iter(vec![(
160                    REFRESH_STATE.to_owned(),
161                    refresh_state,
162                )])),
163            });
164        }
165        Ok(self)
166    }
167
168    /// Commit the transaction to perform the [Operation]s with ACID guarantees.
169    pub async fn commit(self) -> Result<(), Error> {
170        let catalog = self.materialized_view.catalog();
171
172        let identifier = self.materialized_view.identifier().clone();
173
174        let storage_table = self.materialized_view.storage_table().await?;
175
176        let delete_data = if !self.storage_table_operations.is_empty() {
177            let (mut table_requirements, mut table_updates) = (Vec::new(), Vec::new());
178
179            // Save old metadata to be able to remove old data after a rewrite operation
180            let delete_data = if self
181                .storage_table_operations
182                .iter()
183                .flatten()
184                .any(|x| matches!(x, TableOperation::Replace { .. }))
185            {
186                Some(storage_table.metadata().clone())
187            } else {
188                None
189            };
190
191            // Execute table operations
192            for operation in self.storage_table_operations.into_iter().flatten() {
193                let (requirement, update) = operation
194                    .execute(storage_table.metadata(), storage_table.object_store())
195                    .await?;
196
197                if let Some(requirement) = requirement {
198                    table_requirements.push(requirement);
199                }
200                table_updates.extend(update);
201            }
202
203            storage_table
204                .catalog()
205                .update_table(CommitTable {
206                    identifier: storage_table.identifier().clone(),
207                    requirements: table_requirements,
208                    updates: table_updates,
209                })
210                .await?;
211
212            delete_data
213        } else {
214            None
215        };
216        // Execute the view operations
217        let (mut view_requirements, mut view_updates) = (Vec::new(), Vec::new());
218        for operation in self.view_operations {
219            let (requirement, update) = operation.execute(&self.materialized_view.metadata).await?;
220
221            if let Some(requirement) = requirement {
222                view_requirements.push(requirement);
223            }
224            view_updates.extend(update);
225        }
226
227        let new_matview = catalog
228            .clone()
229            .update_materialized_view(CommitView {
230                identifier,
231                requirements: view_requirements,
232                updates: view_updates,
233            })
234            .await?;
235        // Delete data files in case of a rewrite operation
236        if let Some(old_metadata) = delete_data {
237            delete_all_table_files(&old_metadata, storage_table.object_store()).await?;
238        }
239        *self.materialized_view = new_matview;
240        Ok(())
241    }
242}