iceberg_rust/table/transaction/mod.rs
1//! Transaction module for atomic table operations
2//!
3//! This module provides the transaction system for Iceberg tables, allowing multiple
4//! operations to be grouped and executed atomically. The main types are:
5//!
6//! * [`TableTransaction`] - Builder for creating and executing atomic transactions
7//! * [`Operation`] - Individual operations that can be part of a transaction
8//!
9//! Transactions ensure that either all operations succeed or none do, maintaining
10//! table consistency. Common operations include:
11//!
12//! * Adding/updating schemas
13//! * Appending data files
14//! * Replacing data files
15//! * Updating table properties
16//! * Managing snapshots and branches
17
18use std::collections::HashMap;
19use tracing::{debug, instrument};
20
21use iceberg_rust_spec::spec::{manifest::DataFile, schema::Schema, snapshot::SnapshotReference};
22
23use crate::table::transaction::append::append_summary;
24use crate::table::transaction::operation::SequenceGroup;
25use crate::{catalog::commit::CommitTable, error::Error, table::Table};
26
27use self::operation::Operation;
28
29pub(crate) mod append;
30pub(crate) mod operation;
31pub(crate) mod overwrite;
32
33pub(crate) static ADD_SCHEMA_INDEX: usize = 0;
34pub(crate) static SET_DEFAULT_SPEC_INDEX: usize = 1;
35pub(crate) static APPEND_INDEX: usize = 2;
36pub(crate) static APPEND_SEQUENCE_GROUPS_INDEX: usize = 3;
37pub(crate) static REPLACE_INDEX: usize = 4;
38pub(crate) static OVERWRITE_INDEX: usize = 5;
39pub(crate) static UPDATE_PROPERTIES_INDEX: usize = 6;
40pub(crate) static SET_SNAPSHOT_REF_INDEX: usize = 7;
41
42pub(crate) static NUM_OPERATIONS: usize = 8;
43
44/// A transaction that can perform multiple operations on a table atomically
45///
46/// TableTransaction allows grouping multiple table operations (like schema updates,
47/// appends, overwrites) into a single atomic transaction. The transaction must be
48/// committed for changes to take effect.
49///
50/// # Type Parameters
51/// * `'table` - Lifetime of the reference to the table being modified
52///
53/// # Examples
54/// ```
55/// let mut table = // ... get table reference
56/// table.new_transaction(None)
57/// .add_schema(new_schema)
58/// .append(data_files)
59/// .commit()
60/// .await?;
61/// ```
62pub struct TableTransaction<'table> {
63 table: &'table mut Table,
64 operations: Vec<Option<Operation>>,
65 branch: Option<String>,
66}
67
68impl<'table> TableTransaction<'table> {
69 /// Create a transaction for the given table.
70 pub(crate) fn new(table: &'table mut Table, branch: Option<&str>) -> Self {
71 TableTransaction {
72 table,
73 operations: (0..NUM_OPERATIONS).map(|_| None).collect(), // 6 operation types
74 branch: branch.map(ToString::to_string),
75 }
76 }
77 /// Adds a new schema to the table
78 ///
79 /// This operation adds a new schema version to the table. The schema ID will be
80 /// automatically assigned when the transaction is committed.
81 ///
82 /// # Arguments
83 /// * `schema` - The new schema to add to the table
84 ///
85 /// # Returns
86 /// * `Self` - The transaction builder for method chaining
87 pub fn add_schema(mut self, schema: Schema) -> Self {
88 self.operations[ADD_SCHEMA_INDEX] = Some(Operation::AddSchema(schema));
89 self
90 }
91 /// Sets the default partition specification ID for the table
92 ///
93 /// # Arguments
94 /// * `spec_id` - The ID of the partition specification to set as default
95 ///
96 /// # Returns
97 /// * `Self` - The transaction builder for method chaining
98 ///
99 /// The specified partition specification must already exist in the table metadata.
100 pub fn set_default_spec(mut self, spec_id: i32) -> Self {
101 self.operations[SET_DEFAULT_SPEC_INDEX] = Some(Operation::SetDefaultSpec(spec_id));
102 self
103 }
104 /// Appends new data files to the table
105 ///
106 /// This operation adds new data files to the table's current snapshot. Multiple
107 /// append operations in the same transaction will be combined.
108 ///
109 /// # Arguments
110 /// * `files` - Vector of data files to append to the table
111 ///
112 /// # Returns
113 /// * `Self` - The transaction builder for method chaining
114 ///
115 /// # Examples
116 /// ```
117 /// let transaction = table.new_transaction(None)
118 /// .append_data(data_files)
119 /// .commit()
120 /// .await?;
121 /// ```
122 pub fn append_data(mut self, files: Vec<DataFile>) -> Self {
123 if self.operations[APPEND_SEQUENCE_GROUPS_INDEX].is_some() {
124 panic!("Cannot use append and append_sequence_group in the same transaction");
125 }
126 let summary = append_summary(&files);
127
128 if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
129 if let Operation::Append {
130 data_files: old, ..
131 } = operation
132 {
133 old.extend_from_slice(&files);
134 }
135 } else {
136 self.operations[APPEND_INDEX] = Some(Operation::Append {
137 branch: self.branch.clone(),
138 data_files: files,
139 delete_files: Vec::new(),
140 additional_summary: summary,
141 });
142 }
143 self
144 }
145 /// Appends delete files to the table
146 ///
147 /// This operation adds files that mark records for deletion in the table's current snapshot.
148 /// Multiple delete operations in the same transaction will be combined. The delete files
149 /// specify which records should be removed when reading the table.
150 ///
151 /// # Arguments
152 /// * `files` - Vector of delete files to append to the table
153 ///
154 /// # Returns
155 /// * `Self` - The transaction builder for method chaining
156 ///
157 /// # Examples
158 /// ```
159 /// let transaction = table.new_transaction(None)
160 /// .append_delete(delete_files)
161 /// .commit()
162 /// .await?;
163 /// ```
164 pub fn append_delete(mut self, files: Vec<DataFile>) -> Self {
165 if self.operations[APPEND_SEQUENCE_GROUPS_INDEX].is_some() {
166 panic!("Cannot use append and append_sequence_group in the same transaction");
167 }
168 if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
169 if let Operation::Append {
170 delete_files: old, ..
171 } = operation
172 {
173 old.extend_from_slice(&files);
174 }
175 } else {
176 self.operations[APPEND_INDEX] = Some(Operation::Append {
177 branch: self.branch.clone(),
178 data_files: Vec::new(),
179 delete_files: files,
180 additional_summary: None,
181 });
182 }
183 self
184 }
185
186 /// Appends a group of data and delete files to the table
187 ///
188 pub fn append_sequence_group(
189 mut self,
190 data_files: Vec<DataFile>,
191 delete_files: Vec<DataFile>,
192 ) -> Self {
193 if self.operations[APPEND_INDEX].is_some() {
194 panic!("Cannot use append and append_sequence_group in the same transaction");
195 }
196 if let Some(ref mut operation) = self.operations[APPEND_SEQUENCE_GROUPS_INDEX] {
197 if let Operation::AppendSequenceGroups {
198 sequence_groups: old,
199 ..
200 } = operation
201 {
202 old.push(SequenceGroup {
203 delete_files,
204 data_files,
205 });
206 }
207 } else {
208 self.operations[APPEND_SEQUENCE_GROUPS_INDEX] = Some(Operation::AppendSequenceGroups {
209 branch: self.branch.clone(),
210 sequence_groups: vec![SequenceGroup {
211 delete_files,
212 data_files,
213 }],
214 });
215 }
216 self
217 }
218 /// Overwrites specific data files in the table with new ones
219 ///
220 /// This operation replaces specified existing data files with new ones, rather than
221 /// replacing all files (like `replace`) or adding new files (like `append`). It allows
222 /// for selective replacement of data files based on the mapping provided.
223 ///
224 /// Multiple overwrite operations in the same transaction will be combined, with new
225 /// data files appended and the files-to-overwrite mapping merged.
226 ///
227 /// # Arguments
228 /// * `files` - Vector of new data files to add to the table
229 /// * `files_to_overwrite` - HashMap mapping manifest file paths to lists of data file
230 /// paths that should be overwritten/replaced
231 ///
232 /// # Returns
233 /// * `Self` - The transaction builder for method chaining
234 ///
235 /// # Examples
236 /// ```
237 /// use std::collections::HashMap;
238 ///
239 /// let mut files_to_overwrite = HashMap::new();
240 /// files_to_overwrite.insert(
241 /// "manifest-001.avro".to_string(),
242 /// vec!["data-001.parquet".to_string(), "data-002.parquet".to_string()]
243 /// );
244 ///
245 /// let transaction = table.new_transaction(None)
246 /// .overwrite(new_data_files, files_to_overwrite)
247 /// .commit()
248 /// .await?;
249 /// ```
250 pub fn overwrite(
251 mut self,
252 files: Vec<DataFile>,
253 files_to_overwrite: HashMap<String, Vec<String>>,
254 ) -> Self {
255 let summary = append_summary(&files);
256
257 if let Some(ref mut operation) = self.operations[OVERWRITE_INDEX] {
258 if let Operation::Overwrite {
259 data_files: old_data_files,
260 files_to_overwrite: old_files_to_overwrite,
261 ..
262 } = operation
263 {
264 old_data_files.extend_from_slice(&files);
265 old_files_to_overwrite.extend(files_to_overwrite);
266 }
267 } else {
268 self.operations[OVERWRITE_INDEX] = Some(Operation::Overwrite {
269 branch: self.branch.clone(),
270 data_files: files,
271 files_to_overwrite,
272 additional_summary: summary,
273 });
274 }
275 self
276 }
277 /// Replaces all data files in the table with new ones
278 ///
279 /// This operation removes all existing data files and replaces them with the provided
280 /// files. Multiple replace operations in the same transaction will be combined.
281 ///
282 /// # Arguments
283 /// * `files` - Vector of data files that will replace the existing ones
284 ///
285 /// # Returns
286 /// * `Self` - The transaction builder for method chaining
287 ///
288 /// # Examples
289 /// ```
290 /// let transaction = table.new_transaction(None)
291 /// .replace(new_files)
292 /// .commit()
293 /// .await?;
294 /// ```
295 pub fn replace(mut self, files: Vec<DataFile>) -> Self {
296 if let Some(ref mut operation) = self.operations[REPLACE_INDEX] {
297 if let Operation::Replace {
298 branch: _,
299 files: old,
300 additional_summary: None,
301 } = operation
302 {
303 old.extend_from_slice(&files);
304 }
305 } else {
306 self.operations[REPLACE_INDEX] = Some(Operation::Replace {
307 branch: self.branch.clone(),
308 files,
309 additional_summary: None,
310 });
311 }
312 self
313 }
314 /// Quickly append files to the table
315 pub fn replace_with_lineage(
316 mut self,
317 files: Vec<DataFile>,
318 additional_summary: std::collections::HashMap<String, String>,
319 ) -> Self {
320 if let Some(ref mut operation) = self.operations[REPLACE_INDEX] {
321 if let Operation::Replace {
322 branch: _,
323 files: old,
324 additional_summary: old_lineage,
325 } = operation
326 {
327 old.extend_from_slice(&files);
328 *old_lineage = Some(additional_summary.clone());
329 }
330 } else {
331 self.operations[REPLACE_INDEX] = Some(Operation::Replace {
332 branch: self.branch.clone(),
333 files,
334 additional_summary: Some(additional_summary),
335 });
336 }
337 self
338 }
339 /// Updates the table properties with new key-value pairs
340 ///
341 /// This operation adds or updates table properties. Multiple update operations
342 /// in the same transaction will be combined.
343 ///
344 /// # Arguments
345 /// * `entries` - Vector of (key, value) pairs to update in the table properties
346 ///
347 /// # Returns
348 /// * `Self` - The transaction builder for method chaining
349 ///
350 /// # Examples
351 /// ```
352 /// let transaction = table.new_transaction(None)
353 /// .update_properties(vec![
354 /// ("write.format.default".to_string(), "parquet".to_string()),
355 /// ("write.metadata.compression-codec".to_string(), "gzip".to_string())
356 /// ])
357 /// .commit()
358 /// .await?;
359 /// ```
360 pub fn update_properties(mut self, entries: Vec<(String, String)>) -> Self {
361 if let Some(ref mut operation) = self.operations[UPDATE_PROPERTIES_INDEX] {
362 if let Operation::UpdateProperties(props) = operation {
363 props.extend_from_slice(&entries);
364 }
365 } else {
366 self.operations[UPDATE_PROPERTIES_INDEX] = Some(Operation::UpdateProperties(entries));
367 }
368 self
369 }
370 /// Sets a snapshot reference for the table
371 ///
372 /// This operation creates or updates a named reference to a specific snapshot,
373 /// allowing for features like branches and tags.
374 ///
375 /// # Arguments
376 /// * `entry` - Tuple of (reference name, snapshot reference) defining the reference
377 ///
378 /// # Returns
379 /// * `Self` - The transaction builder for method chaining
380 ///
381 /// # Examples
382 /// ```
383 /// let transaction = table.new_transaction(None)
384 /// .set_snapshot_ref((
385 /// "test-branch".to_string(),
386 /// SnapshotReference {
387 /// snapshot_id: 123,
388 /// retention: SnapshotRetention::default(),
389 /// }
390 /// ))
391 /// .commit()
392 /// .await?;
393 /// ```
394 pub fn set_snapshot_ref(mut self, entry: (String, SnapshotReference)) -> Self {
395 self.operations[SET_SNAPSHOT_REF_INDEX] = Some(Operation::SetSnapshotRef(entry));
396 self
397 }
398 /// Commits all operations in this transaction atomically
399 ///
400 /// This method executes all operations in the transaction and updates the table
401 /// metadata. The changes are atomic - either all operations succeed or none do.
402 /// After commit, the transaction is consumed and the table is updated with the
403 /// new metadata.
404 ///
405 /// # Returns
406 /// * `Result<(), Error>` - Ok(()) if the commit succeeds, Error if it fails
407 ///
408 /// # Errors
409 /// Returns an error if:
410 /// * Any operation fails to execute
411 /// * The catalog update fails
412 /// * Cleanup of old data files fails (for replace operations)
413 ///
414 /// # Examples
415 /// ```
416 /// let result = table.new_transaction(None)
417 /// .append(data_files)
418 /// .update_properties(properties)
419 /// .commit()
420 /// .await?;
421 /// ```
422 #[instrument(name = "iceberg_rust::table::transaction::commit", level = "debug", skip(self), fields(
423 table_identifier = %self.table.identifier,
424 branch = ?self.branch
425 ))]
426 pub async fn commit(self) -> Result<(), Error> {
427 let catalog = self.table.catalog();
428 let identifier = self.table.identifier.clone();
429
430 // Execute the table operations
431 let (mut requirements, mut updates) = (Vec::new(), Vec::new());
432 for operation in self.operations.into_iter().flatten() {
433 let (requirement, update) = operation
434 .execute(self.table.metadata(), self.table.object_store())
435 .await?;
436
437 if let Some(requirement) = requirement {
438 requirements.push(requirement);
439 }
440 updates.extend(update);
441 }
442
443 if updates.is_empty() {
444 return Ok(());
445 }
446
447 debug!(
448 "Committing {} updates to table {}: requirements={:?}, updates={:?}",
449 updates.len(),
450 identifier,
451 requirements,
452 updates
453 );
454
455 let new_table = catalog
456 .clone()
457 .update_table(CommitTable {
458 identifier,
459 requirements,
460 updates,
461 })
462 .await?;
463
464 *self.table = new_table;
465 Ok(())
466 }
467}