iceberg_rust/materialized_view/transaction/
mod.rs1use std::collections::HashMap;
6
7use iceberg_rust_spec::{
8 materialized_view_metadata::{RefreshState, REFRESH_STATE},
9 spec::{manifest::DataFile, types::StructType, view_metadata::ViewRepresentation},
10};
11
12use crate::{
13 catalog::commit::{CommitTable, CommitView},
14 error::Error,
15 table::{
16 delete_all_table_files,
17 transaction::{operation::Operation as TableOperation, APPEND_INDEX, REPLACE_INDEX},
18 },
19 view::transaction::operation::Operation as ViewOperation,
20};
21
22use super::MaterializedView;
23
24pub struct Transaction<'view> {
26 materialized_view: &'view mut MaterializedView,
27 view_operations: Vec<ViewOperation>,
28 storage_table_operations: Vec<Option<TableOperation>>,
29 branch: Option<String>,
30}
31
32impl<'view> Transaction<'view> {
33 pub fn new(view: &'view mut MaterializedView, branch: Option<&str>) -> Self {
35 Transaction {
36 materialized_view: view,
37 view_operations: vec![],
38 storage_table_operations: (0..6).map(|_| None).collect(), branch: branch.map(ToString::to_string),
40 }
41 }
42
43 pub fn update_representations(
45 mut self,
46 representations: Vec<ViewRepresentation>,
47 schema: StructType,
48 ) -> Self {
49 self.view_operations
50 .push(ViewOperation::UpdateRepresentations {
51 representations,
52 schema,
53 branch: self.branch.clone(),
54 });
55 self
56 }
57
58 pub fn update_properties(mut self, entries: Vec<(String, String)>) -> Self {
60 self.view_operations
61 .push(ViewOperation::UpdateProperties(entries));
62 self
63 }
64
65 pub fn full_refresh(
67 mut self,
68 files: Vec<DataFile>,
69 refresh_state: RefreshState,
70 ) -> Result<Self, Error> {
71 let refresh_state = serde_json::to_string(&refresh_state)?;
72 if let Some(ref mut operation) = self.storage_table_operations[REPLACE_INDEX] {
73 if let TableOperation::Replace {
74 branch: _,
75 files: old,
76 additional_summary: old_lineage,
77 } = operation
78 {
79 old.extend_from_slice(&files);
80 *old_lineage = Some(HashMap::from_iter(vec![(
81 REFRESH_STATE.to_owned(),
82 refresh_state.clone(),
83 )]));
84 }
85 } else {
86 self.storage_table_operations[REPLACE_INDEX] = Some(TableOperation::Replace {
87 branch: self.branch.clone(),
88 files,
89 additional_summary: Some(HashMap::from_iter(vec![(
90 REFRESH_STATE.to_owned(),
91 refresh_state,
92 )])),
93 });
94 }
95 Ok(self)
96 }
97
98 pub fn append(
100 mut self,
101 files: Vec<DataFile>,
102 refresh_state: RefreshState,
103 ) -> Result<Self, Error> {
104 let refresh_state = serde_json::to_string(&refresh_state)?;
105 if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] {
106 if let TableOperation::Append {
107 branch: _,
108 data_files: old,
109 delete_files: _,
110 additional_summary: old_lineage,
111 } = operation
112 {
113 old.extend_from_slice(&files);
114 *old_lineage = Some(HashMap::from_iter(vec![(
115 REFRESH_STATE.to_owned(),
116 refresh_state.clone(),
117 )]));
118 }
119 } else {
120 self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append {
121 branch: self.branch.clone(),
122 data_files: files,
123 delete_files: Vec::new(),
124 additional_summary: Some(HashMap::from_iter(vec![(
125 REFRESH_STATE.to_owned(),
126 refresh_state,
127 )])),
128 });
129 }
130 Ok(self)
131 }
132
133 pub fn delete(
135 mut self,
136 files: Vec<DataFile>,
137 refresh_state: RefreshState,
138 ) -> Result<Self, Error> {
139 let refresh_state = serde_json::to_string(&refresh_state)?;
140 if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] {
141 if let TableOperation::Append {
142 branch: _,
143 data_files: _,
144 delete_files: old,
145 additional_summary: old_lineage,
146 } = operation
147 {
148 old.extend_from_slice(&files);
149 *old_lineage = Some(HashMap::from_iter(vec![(
150 REFRESH_STATE.to_owned(),
151 refresh_state.clone(),
152 )]));
153 }
154 } else {
155 self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append {
156 branch: self.branch.clone(),
157 data_files: Vec::new(),
158 delete_files: files,
159 additional_summary: Some(HashMap::from_iter(vec![(
160 REFRESH_STATE.to_owned(),
161 refresh_state,
162 )])),
163 });
164 }
165 Ok(self)
166 }
167
168 pub async fn commit(self) -> Result<(), Error> {
170 let catalog = self.materialized_view.catalog();
171
172 let identifier = self.materialized_view.identifier().clone();
173
174 let storage_table = self.materialized_view.storage_table().await?;
175
176 let delete_data = if !self.storage_table_operations.is_empty() {
177 let (mut table_requirements, mut table_updates) = (Vec::new(), Vec::new());
178
179 let delete_data = if self
181 .storage_table_operations
182 .iter()
183 .flatten()
184 .any(|x| matches!(x, TableOperation::Replace { .. }))
185 {
186 Some(storage_table.metadata().clone())
187 } else {
188 None
189 };
190
191 for operation in self.storage_table_operations.into_iter().flatten() {
193 let (requirement, update) = operation
194 .execute(storage_table.metadata(), storage_table.object_store())
195 .await?;
196
197 if let Some(requirement) = requirement {
198 table_requirements.push(requirement);
199 }
200 table_updates.extend(update);
201 }
202
203 storage_table
204 .catalog()
205 .update_table(CommitTable {
206 identifier: storage_table.identifier().clone(),
207 requirements: table_requirements,
208 updates: table_updates,
209 })
210 .await?;
211
212 delete_data
213 } else {
214 None
215 };
216 let (mut view_requirements, mut view_updates) = (Vec::new(), Vec::new());
218 for operation in self.view_operations {
219 let (requirement, update) = operation.execute(&self.materialized_view.metadata).await?;
220
221 if let Some(requirement) = requirement {
222 view_requirements.push(requirement);
223 }
224 view_updates.extend(update);
225 }
226
227 let new_matview = catalog
228 .clone()
229 .update_materialized_view(CommitView {
230 identifier,
231 requirements: view_requirements,
232 updates: view_updates,
233 })
234 .await?;
235 if let Some(old_metadata) = delete_data {
237 delete_all_table_files(&old_metadata, storage_table.object_store()).await?;
238 }
239 *self.materialized_view = new_matview;
240 Ok(())
241 }
242}