iceberg_rust/table/transaction/
mod.rs

1//! Transaction module for atomic table operations
2//!
3//! This module provides the transaction system for Iceberg tables, allowing multiple
4//! operations to be grouped and executed atomically. The main types are:
5//!
6//! * [`TableTransaction`] - Builder for creating and executing atomic transactions
7//! * [`Operation`] - Individual operations that can be part of a transaction
8//!
9//! Transactions ensure that either all operations succeed or none do, maintaining
10//! table consistency. Common operations include:
11//!
12//! * Adding/updating schemas
13//! * Appending data files
14//! * Replacing data files
15//! * Updating table properties
16//! * Managing snapshots and branches
17
18use std::collections::HashMap;
19use tracing::{debug, instrument};
20
21use iceberg_rust_spec::spec::{manifest::DataFile, schema::Schema, snapshot::SnapshotReference};
22
23use crate::table::transaction::append::append_summary;
24use crate::table::transaction::operation::SequenceGroup;
25use crate::{catalog::commit::CommitTable, error::Error, table::Table};
26
27use self::operation::Operation;
28
29pub(crate) mod append;
30pub(crate) mod operation;
31pub(crate) mod overwrite;
32
33pub(crate) static ADD_SCHEMA_INDEX: usize = 0;
34pub(crate) static SET_DEFAULT_SPEC_INDEX: usize = 1;
35pub(crate) static APPEND_INDEX: usize = 2;
36pub(crate) static APPEND_SEQUENCE_GROUPS_INDEX: usize = 3;
37pub(crate) static REPLACE_INDEX: usize = 4;
38pub(crate) static OVERWRITE_INDEX: usize = 5;
39pub(crate) static UPDATE_PROPERTIES_INDEX: usize = 6;
40pub(crate) static SET_SNAPSHOT_REF_INDEX: usize = 7;
41
42pub(crate) static NUM_OPERATIONS: usize = 8;
43
44/// A transaction that can perform multiple operations on a table atomically
45///
46/// TableTransaction allows grouping multiple table operations (like schema updates,
47/// appends, overwrites) into a single atomic transaction. The transaction must be
48/// committed for changes to take effect.
49///
50/// # Type Parameters
51/// * `'table` - Lifetime of the reference to the table being modified
52///
53/// # Examples
54/// ```
55/// let mut table = // ... get table reference
56/// table.new_transaction(None)
57///     .add_schema(new_schema)
58///     .append(data_files)
59///     .commit()
60///     .await?;
61/// ```
62pub struct TableTransaction<'table> {
63    table: &'table mut Table,
64    operations: Vec<Option<Operation>>,
65    branch: Option<String>,
66}
67
68impl<'table> TableTransaction<'table> {
69    /// Create a transaction for the given table.
70    pub(crate) fn new(table: &'table mut Table, branch: Option<&str>) -> Self {
71        TableTransaction {
72            table,
73            operations: (0..NUM_OPERATIONS).map(|_| None).collect(), // 6 operation types
74            branch: branch.map(ToString::to_string),
75        }
76    }
77    /// Adds a new schema to the table
78    ///
79    /// This operation adds a new schema version to the table. The schema ID will be
80    /// automatically assigned when the transaction is committed.
81    ///
82    /// # Arguments
83    /// * `schema` - The new schema to add to the table
84    ///
85    /// # Returns
86    /// * `Self` - The transaction builder for method chaining
87    pub fn add_schema(mut self, schema: Schema) -> Self {
88        self.operations[ADD_SCHEMA_INDEX] = Some(Operation::AddSchema(schema));
89        self
90    }
91    /// Sets the default partition specification ID for the table
92    ///
93    /// # Arguments
94    /// * `spec_id` - The ID of the partition specification to set as default
95    ///
96    /// # Returns
97    /// * `Self` - The transaction builder for method chaining
98    ///
99    /// The specified partition specification must already exist in the table metadata.
100    pub fn set_default_spec(mut self, spec_id: i32) -> Self {
101        self.operations[SET_DEFAULT_SPEC_INDEX] = Some(Operation::SetDefaultSpec(spec_id));
102        self
103    }
104    /// Appends new data files to the table
105    ///
106    /// This operation adds new data files to the table's current snapshot. Multiple
107    /// append operations in the same transaction will be combined.
108    ///
109    /// # Arguments
110    /// * `files` - Vector of data files to append to the table
111    ///
112    /// # Returns
113    /// * `Self` - The transaction builder for method chaining
114    ///
115    /// # Examples
116    /// ```
117    /// let transaction = table.new_transaction(None)
118    ///     .append_data(data_files)
119    ///     .commit()
120    ///     .await?;
121    /// ```
122    pub fn append_data(mut self, files: Vec<DataFile>) -> Self {
123        if self.operations[APPEND_SEQUENCE_GROUPS_INDEX].is_some() {
124            panic!("Cannot use append and append_sequence_group in the same transaction");
125        }
126        let summary = append_summary(&files);
127
128        if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
129            if let Operation::Append {
130                data_files: old, ..
131            } = operation
132            {
133                old.extend_from_slice(&files);
134            }
135        } else {
136            self.operations[APPEND_INDEX] = Some(Operation::Append {
137                branch: self.branch.clone(),
138                data_files: files,
139                delete_files: Vec::new(),
140                additional_summary: summary,
141            });
142        }
143        self
144    }
145    /// Appends delete files to the table
146    ///
147    /// This operation adds files that mark records for deletion in the table's current snapshot.
148    /// Multiple delete operations in the same transaction will be combined. The delete files
149    /// specify which records should be removed when reading the table.
150    ///
151    /// # Arguments
152    /// * `files` - Vector of delete files to append to the table
153    ///
154    /// # Returns
155    /// * `Self` - The transaction builder for method chaining
156    ///
157    /// # Examples
158    /// ```
159    /// let transaction = table.new_transaction(None)
160    ///     .append_delete(delete_files)
161    ///     .commit()
162    ///     .await?;
163    /// ```
164    pub fn append_delete(mut self, files: Vec<DataFile>) -> Self {
165        if self.operations[APPEND_SEQUENCE_GROUPS_INDEX].is_some() {
166            panic!("Cannot use append and append_sequence_group in the same transaction");
167        }
168        if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
169            if let Operation::Append {
170                delete_files: old, ..
171            } = operation
172            {
173                old.extend_from_slice(&files);
174            }
175        } else {
176            self.operations[APPEND_INDEX] = Some(Operation::Append {
177                branch: self.branch.clone(),
178                data_files: Vec::new(),
179                delete_files: files,
180                additional_summary: None,
181            });
182        }
183        self
184    }
185
186    /// Appends a group of data and delete files to the table
187    ///
188    pub fn append_sequence_group(
189        mut self,
190        data_files: Vec<DataFile>,
191        delete_files: Vec<DataFile>,
192    ) -> Self {
193        if self.operations[APPEND_INDEX].is_some() {
194            panic!("Cannot use append and append_sequence_group in the same transaction");
195        }
196        if let Some(ref mut operation) = self.operations[APPEND_SEQUENCE_GROUPS_INDEX] {
197            if let Operation::AppendSequenceGroups {
198                sequence_groups: old,
199                ..
200            } = operation
201            {
202                old.push(SequenceGroup {
203                    delete_files,
204                    data_files,
205                });
206            }
207        } else {
208            self.operations[APPEND_SEQUENCE_GROUPS_INDEX] = Some(Operation::AppendSequenceGroups {
209                branch: self.branch.clone(),
210                sequence_groups: vec![SequenceGroup {
211                    delete_files,
212                    data_files,
213                }],
214            });
215        }
216        self
217    }
218    /// Overwrites specific data files in the table with new ones
219    ///
220    /// This operation replaces specified existing data files with new ones, rather than
221    /// replacing all files (like `replace`) or adding new files (like `append`). It allows
222    /// for selective replacement of data files based on the mapping provided.
223    ///
224    /// Multiple overwrite operations in the same transaction will be combined, with new
225    /// data files appended and the files-to-overwrite mapping merged.
226    ///
227    /// # Arguments
228    /// * `files` - Vector of new data files to add to the table
229    /// * `files_to_overwrite` - HashMap mapping manifest file paths to lists of data file
230    ///   paths that should be overwritten/replaced
231    ///
232    /// # Returns
233    /// * `Self` - The transaction builder for method chaining
234    ///
235    /// # Examples
236    /// ```
237    /// use std::collections::HashMap;
238    ///
239    /// let mut files_to_overwrite = HashMap::new();
240    /// files_to_overwrite.insert(
241    ///     "manifest-001.avro".to_string(),
242    ///     vec!["data-001.parquet".to_string(), "data-002.parquet".to_string()]
243    /// );
244    ///
245    /// let transaction = table.new_transaction(None)
246    ///     .overwrite(new_data_files, files_to_overwrite)
247    ///     .commit()
248    ///     .await?;
249    /// ```
250    pub fn overwrite(
251        mut self,
252        files: Vec<DataFile>,
253        files_to_overwrite: HashMap<String, Vec<String>>,
254    ) -> Self {
255        let summary = append_summary(&files);
256
257        if let Some(ref mut operation) = self.operations[OVERWRITE_INDEX] {
258            if let Operation::Overwrite {
259                data_files: old_data_files,
260                files_to_overwrite: old_files_to_overwrite,
261                ..
262            } = operation
263            {
264                old_data_files.extend_from_slice(&files);
265                old_files_to_overwrite.extend(files_to_overwrite);
266            }
267        } else {
268            self.operations[OVERWRITE_INDEX] = Some(Operation::Overwrite {
269                branch: self.branch.clone(),
270                data_files: files,
271                files_to_overwrite,
272                additional_summary: summary,
273            });
274        }
275        self
276    }
277    /// Replaces all data files in the table with new ones
278    ///
279    /// This operation removes all existing data files and replaces them with the provided
280    /// files. Multiple replace operations in the same transaction will be combined.
281    ///
282    /// # Arguments
283    /// * `files` - Vector of data files that will replace the existing ones
284    ///
285    /// # Returns
286    /// * `Self` - The transaction builder for method chaining
287    ///
288    /// # Examples
289    /// ```
290    /// let transaction = table.new_transaction(None)
291    ///     .replace(new_files)
292    ///     .commit()
293    ///     .await?;
294    /// ```
295    pub fn replace(mut self, files: Vec<DataFile>) -> Self {
296        if let Some(ref mut operation) = self.operations[REPLACE_INDEX] {
297            if let Operation::Replace {
298                branch: _,
299                files: old,
300                additional_summary: None,
301            } = operation
302            {
303                old.extend_from_slice(&files);
304            }
305        } else {
306            self.operations[REPLACE_INDEX] = Some(Operation::Replace {
307                branch: self.branch.clone(),
308                files,
309                additional_summary: None,
310            });
311        }
312        self
313    }
314    /// Quickly append files to the table
315    pub fn replace_with_lineage(
316        mut self,
317        files: Vec<DataFile>,
318        additional_summary: std::collections::HashMap<String, String>,
319    ) -> Self {
320        if let Some(ref mut operation) = self.operations[REPLACE_INDEX] {
321            if let Operation::Replace {
322                branch: _,
323                files: old,
324                additional_summary: old_lineage,
325            } = operation
326            {
327                old.extend_from_slice(&files);
328                *old_lineage = Some(additional_summary.clone());
329            }
330        } else {
331            self.operations[REPLACE_INDEX] = Some(Operation::Replace {
332                branch: self.branch.clone(),
333                files,
334                additional_summary: Some(additional_summary),
335            });
336        }
337        self
338    }
339    /// Updates the table properties with new key-value pairs
340    ///
341    /// This operation adds or updates table properties. Multiple update operations
342    /// in the same transaction will be combined.
343    ///
344    /// # Arguments
345    /// * `entries` - Vector of (key, value) pairs to update in the table properties
346    ///
347    /// # Returns
348    /// * `Self` - The transaction builder for method chaining
349    ///
350    /// # Examples
351    /// ```
352    /// let transaction = table.new_transaction(None)
353    ///     .update_properties(vec![
354    ///         ("write.format.default".to_string(), "parquet".to_string()),
355    ///         ("write.metadata.compression-codec".to_string(), "gzip".to_string())
356    ///     ])
357    ///     .commit()
358    ///     .await?;
359    /// ```
360    pub fn update_properties(mut self, entries: Vec<(String, String)>) -> Self {
361        if let Some(ref mut operation) = self.operations[UPDATE_PROPERTIES_INDEX] {
362            if let Operation::UpdateProperties(props) = operation {
363                props.extend_from_slice(&entries);
364            }
365        } else {
366            self.operations[UPDATE_PROPERTIES_INDEX] = Some(Operation::UpdateProperties(entries));
367        }
368        self
369    }
370    /// Sets a snapshot reference for the table
371    ///
372    /// This operation creates or updates a named reference to a specific snapshot,
373    /// allowing for features like branches and tags.
374    ///
375    /// # Arguments
376    /// * `entry` - Tuple of (reference name, snapshot reference) defining the reference
377    ///
378    /// # Returns
379    /// * `Self` - The transaction builder for method chaining
380    ///
381    /// # Examples
382    /// ```
383    /// let transaction = table.new_transaction(None)
384    ///     .set_snapshot_ref((
385    ///         "test-branch".to_string(),
386    ///         SnapshotReference {
387    ///             snapshot_id: 123,
388    ///             retention: SnapshotRetention::default(),
389    ///         }
390    ///     ))
391    ///     .commit()
392    ///     .await?;
393    /// ```
394    pub fn set_snapshot_ref(mut self, entry: (String, SnapshotReference)) -> Self {
395        self.operations[SET_SNAPSHOT_REF_INDEX] = Some(Operation::SetSnapshotRef(entry));
396        self
397    }
398    /// Commits all operations in this transaction atomically
399    ///
400    /// This method executes all operations in the transaction and updates the table
401    /// metadata. The changes are atomic - either all operations succeed or none do.
402    /// After commit, the transaction is consumed and the table is updated with the
403    /// new metadata.
404    ///
405    /// # Returns
406    /// * `Result<(), Error>` - Ok(()) if the commit succeeds, Error if it fails
407    ///
408    /// # Errors
409    /// Returns an error if:
410    /// * Any operation fails to execute
411    /// * The catalog update fails
412    /// * Cleanup of old data files fails (for replace operations)
413    ///
414    /// # Examples
415    /// ```
416    /// let result = table.new_transaction(None)
417    ///     .append(data_files)
418    ///     .update_properties(properties)
419    ///     .commit()
420    ///     .await?;
421    /// ```
422    #[instrument(name = "iceberg_rust::table::transaction::commit", level = "debug", skip(self), fields(
423        table_identifier = %self.table.identifier,
424        branch = ?self.branch
425    ))]
426    pub async fn commit(self) -> Result<(), Error> {
427        let catalog = self.table.catalog();
428        let identifier = self.table.identifier.clone();
429
430        // Execute the table operations
431        let (mut requirements, mut updates) = (Vec::new(), Vec::new());
432        for operation in self.operations.into_iter().flatten() {
433            let (requirement, update) = operation
434                .execute(self.table.metadata(), self.table.object_store())
435                .await?;
436
437            if let Some(requirement) = requirement {
438                requirements.push(requirement);
439            }
440            updates.extend(update);
441        }
442
443        if updates.is_empty() {
444            return Ok(());
445        }
446
447        debug!(
448            "Committing {} updates to table {}: requirements={:?}, updates={:?}",
449            updates.len(),
450            identifier,
451            requirements,
452            updates
453        );
454
455        let new_table = catalog
456            .clone()
457            .update_table(CommitTable {
458                identifier,
459                requirements,
460                updates,
461            })
462            .await?;
463
464        *self.table = new_table;
465        Ok(())
466    }
467}