Skip to main content

iceberg_rust/table/transaction/
mod.rs

1//! Transaction module for atomic table operations
2//!
3//! This module provides the transaction system for Iceberg tables, allowing multiple
4//! operations to be grouped and executed atomically. The main types are:
5//!
6//! * [`TableTransaction`] - Builder for creating and executing atomic transactions
7//! * [`Operation`] - Individual operations that can be part of a transaction
8//!
9//! Transactions ensure that either all operations succeed or none do, maintaining
10//! table consistency. Common operations include:
11//!
12//! * Adding/updating schemas
13//! * Appending data files
14//! * Replacing data files
15//! * Updating table properties
16//! * Managing snapshots and branches
17
18use std::collections::HashMap;
19use tracing::{debug, instrument};
20
21use iceberg_rust_spec::spec::{manifest::DataFile, schema::Schema, snapshot::SnapshotReference};
22
23use crate::table::transaction::append::append_summary;
24use crate::table::transaction::operation::SequenceGroup;
25use crate::{catalog::commit::CommitTable, error::Error, table::Table};
26
27use self::operation::Operation;
28
29pub(crate) mod append;
30pub(crate) mod operation;
31pub(crate) mod overwrite;
32
33pub(crate) static ADD_SCHEMA_INDEX: usize = 0;
34pub(crate) static SET_DEFAULT_SPEC_INDEX: usize = 1;
35pub(crate) static APPEND_INDEX: usize = 2;
36pub(crate) static APPEND_SEQUENCE_GROUPS_INDEX: usize = 3;
37pub(crate) static REPLACE_INDEX: usize = 4;
38pub(crate) static OVERWRITE_INDEX: usize = 5;
39pub(crate) static UPDATE_PROPERTIES_INDEX: usize = 6;
40pub(crate) static SET_SNAPSHOT_REF_INDEX: usize = 7;
41pub(crate) static EXPIRE_SNAPSHOTS_INDEX: usize = 8;
42
43pub(crate) static NUM_OPERATIONS: usize = 9;
44
45/// A transaction that can perform multiple operations on a table atomically
46///
47/// TableTransaction allows grouping multiple table operations (like schema updates,
48/// appends, overwrites) into a single atomic transaction. The transaction must be
49/// committed for changes to take effect.
50///
51/// # Type Parameters
52/// * `'table` - Lifetime of the reference to the table being modified
53///
54/// # Examples
55/// ```
56/// let mut table = // ... get table reference
57/// table.new_transaction(None)
58///     .add_schema(new_schema)
59///     .append(data_files)
60///     .commit()
61///     .await?;
62/// ```
63pub struct TableTransaction<'table> {
64    table: &'table mut Table,
65    operations: Vec<Option<Operation>>,
66    branch: Option<String>,
67}
68
69impl<'table> TableTransaction<'table> {
70    /// Create a transaction for the given table.
71    pub(crate) fn new(table: &'table mut Table, branch: Option<&str>) -> Self {
72        TableTransaction {
73            table,
74            operations: (0..NUM_OPERATIONS).map(|_| None).collect(), // 6 operation types
75            branch: branch.map(ToString::to_string),
76        }
77    }
78    /// Adds a new schema to the table
79    ///
80    /// This operation adds a new schema version to the table. The schema ID will be
81    /// automatically assigned when the transaction is committed.
82    ///
83    /// # Arguments
84    /// * `schema` - The new schema to add to the table
85    ///
86    /// # Returns
87    /// * `Self` - The transaction builder for method chaining
88    pub fn add_schema(mut self, schema: Schema) -> Self {
89        self.operations[ADD_SCHEMA_INDEX] = Some(Operation::AddSchema(schema));
90        self
91    }
92    /// Sets the default partition specification ID for the table
93    ///
94    /// # Arguments
95    /// * `spec_id` - The ID of the partition specification to set as default
96    ///
97    /// # Returns
98    /// * `Self` - The transaction builder for method chaining
99    ///
100    /// The specified partition specification must already exist in the table metadata.
101    pub fn set_default_spec(mut self, spec_id: i32) -> Self {
102        self.operations[SET_DEFAULT_SPEC_INDEX] = Some(Operation::SetDefaultSpec(spec_id));
103        self
104    }
105    /// Appends new data files to the table
106    ///
107    /// This operation adds new data files to the table's current snapshot. Multiple
108    /// append operations in the same transaction will be combined.
109    ///
110    /// # Arguments
111    /// * `files` - Vector of data files to append to the table
112    ///
113    /// # Returns
114    /// * `Self` - The transaction builder for method chaining
115    ///
116    /// # Examples
117    /// ```
118    /// let transaction = table.new_transaction(None)
119    ///     .append_data(data_files)
120    ///     .commit()
121    ///     .await?;
122    /// ```
123    pub fn append_data(mut self, files: Vec<DataFile>) -> Self {
124        if self.operations[APPEND_SEQUENCE_GROUPS_INDEX].is_some() {
125            panic!("Cannot use append and append_sequence_group in the same transaction");
126        }
127        let summary = append_summary(&files);
128
129        if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
130            if let Operation::Append {
131                data_files: old, ..
132            } = operation
133            {
134                old.extend_from_slice(&files);
135            }
136        } else {
137            self.operations[APPEND_INDEX] = Some(Operation::Append {
138                branch: self.branch.clone(),
139                data_files: files,
140                delete_files: Vec::new(),
141                additional_summary: summary,
142            });
143        }
144        self
145    }
146    /// Appends delete files to the table
147    ///
148    /// This operation adds files that mark records for deletion in the table's current snapshot.
149    /// Multiple delete operations in the same transaction will be combined. The delete files
150    /// specify which records should be removed when reading the table.
151    ///
152    /// # Arguments
153    /// * `files` - Vector of delete files to append to the table
154    ///
155    /// # Returns
156    /// * `Self` - The transaction builder for method chaining
157    ///
158    /// # Examples
159    /// ```
160    /// let transaction = table.new_transaction(None)
161    ///     .append_delete(delete_files)
162    ///     .commit()
163    ///     .await?;
164    /// ```
165    pub fn append_delete(mut self, files: Vec<DataFile>) -> Self {
166        if self.operations[APPEND_SEQUENCE_GROUPS_INDEX].is_some() {
167            panic!("Cannot use append and append_sequence_group in the same transaction");
168        }
169        if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
170            if let Operation::Append {
171                delete_files: old, ..
172            } = operation
173            {
174                old.extend_from_slice(&files);
175            }
176        } else {
177            self.operations[APPEND_INDEX] = Some(Operation::Append {
178                branch: self.branch.clone(),
179                data_files: Vec::new(),
180                delete_files: files,
181                additional_summary: None,
182            });
183        }
184        self
185    }
186
187    /// Appends a group of data and delete files to the table
188    ///
189    pub fn append_sequence_group(
190        mut self,
191        data_files: Vec<DataFile>,
192        delete_files: Vec<DataFile>,
193    ) -> Self {
194        if self.operations[APPEND_INDEX].is_some() {
195            panic!("Cannot use append and append_sequence_group in the same transaction");
196        }
197        if let Some(ref mut operation) = self.operations[APPEND_SEQUENCE_GROUPS_INDEX] {
198            if let Operation::AppendSequenceGroups {
199                sequence_groups: old,
200                ..
201            } = operation
202            {
203                old.push(SequenceGroup {
204                    delete_files,
205                    data_files,
206                });
207            }
208        } else {
209            self.operations[APPEND_SEQUENCE_GROUPS_INDEX] = Some(Operation::AppendSequenceGroups {
210                branch: self.branch.clone(),
211                sequence_groups: vec![SequenceGroup {
212                    delete_files,
213                    data_files,
214                }],
215            });
216        }
217        self
218    }
219    /// Overwrites specific data files in the table with new ones
220    ///
221    /// This operation replaces specified existing data files with new ones, rather than
222    /// replacing all files (like `replace`) or adding new files (like `append`). It allows
223    /// for selective replacement of data files based on the mapping provided.
224    ///
225    /// Multiple overwrite operations in the same transaction will be combined, with new
226    /// data files appended and the files-to-overwrite mapping merged.
227    ///
228    /// # Arguments
229    /// * `files` - Vector of new data files to add to the table
230    /// * `files_to_overwrite` - HashMap mapping manifest file paths to lists of data file
231    ///   paths that should be overwritten/replaced
232    ///
233    /// # Returns
234    /// * `Self` - The transaction builder for method chaining
235    ///
236    /// # Examples
237    /// ```
238    /// use std::collections::HashMap;
239    ///
240    /// let mut files_to_overwrite = HashMap::new();
241    /// files_to_overwrite.insert(
242    ///     "manifest-001.avro".to_string(),
243    ///     vec!["data-001.parquet".to_string(), "data-002.parquet".to_string()]
244    /// );
245    ///
246    /// let transaction = table.new_transaction(None)
247    ///     .overwrite(new_data_files, files_to_overwrite)
248    ///     .commit()
249    ///     .await?;
250    /// ```
251    pub fn overwrite(
252        mut self,
253        files: Vec<DataFile>,
254        files_to_overwrite: HashMap<String, Vec<String>>,
255    ) -> Self {
256        let summary = append_summary(&files);
257
258        if let Some(ref mut operation) = self.operations[OVERWRITE_INDEX] {
259            if let Operation::Overwrite {
260                data_files: old_data_files,
261                files_to_overwrite: old_files_to_overwrite,
262                ..
263            } = operation
264            {
265                old_data_files.extend_from_slice(&files);
266                old_files_to_overwrite.extend(files_to_overwrite);
267            }
268        } else {
269            self.operations[OVERWRITE_INDEX] = Some(Operation::Overwrite {
270                branch: self.branch.clone(),
271                data_files: files,
272                files_to_overwrite,
273                additional_summary: summary,
274            });
275        }
276        self
277    }
278    /// Replaces all data files in the table with new ones
279    ///
280    /// This operation removes all existing data files and replaces them with the provided
281    /// files. Multiple replace operations in the same transaction will be combined.
282    ///
283    /// # Arguments
284    /// * `files` - Vector of data files that will replace the existing ones
285    ///
286    /// # Returns
287    /// * `Self` - The transaction builder for method chaining
288    ///
289    /// # Examples
290    /// ```
291    /// let transaction = table.new_transaction(None)
292    ///     .replace(new_files)
293    ///     .commit()
294    ///     .await?;
295    /// ```
296    pub fn replace(mut self, files: Vec<DataFile>) -> Self {
297        if let Some(ref mut operation) = self.operations[REPLACE_INDEX] {
298            if let Operation::Replace {
299                branch: _,
300                files: old,
301                additional_summary: None,
302            } = operation
303            {
304                old.extend_from_slice(&files);
305            }
306        } else {
307            self.operations[REPLACE_INDEX] = Some(Operation::Replace {
308                branch: self.branch.clone(),
309                files,
310                additional_summary: None,
311            });
312        }
313        self
314    }
315    /// Quickly append files to the table
316    pub fn replace_with_lineage(
317        mut self,
318        files: Vec<DataFile>,
319        additional_summary: std::collections::HashMap<String, String>,
320    ) -> Self {
321        if let Some(ref mut operation) = self.operations[REPLACE_INDEX] {
322            if let Operation::Replace {
323                branch: _,
324                files: old,
325                additional_summary: old_lineage,
326            } = operation
327            {
328                old.extend_from_slice(&files);
329                *old_lineage = Some(additional_summary.clone());
330            }
331        } else {
332            self.operations[REPLACE_INDEX] = Some(Operation::Replace {
333                branch: self.branch.clone(),
334                files,
335                additional_summary: Some(additional_summary),
336            });
337        }
338        self
339    }
340    /// Updates the table properties with new key-value pairs
341    ///
342    /// This operation adds or updates table properties. Multiple update operations
343    /// in the same transaction will be combined.
344    ///
345    /// # Arguments
346    /// * `entries` - Vector of (key, value) pairs to update in the table properties
347    ///
348    /// # Returns
349    /// * `Self` - The transaction builder for method chaining
350    ///
351    /// # Examples
352    /// ```
353    /// let transaction = table.new_transaction(None)
354    ///     .update_properties(vec![
355    ///         ("write.format.default".to_string(), "parquet".to_string()),
356    ///         ("write.metadata.compression-codec".to_string(), "gzip".to_string())
357    ///     ])
358    ///     .commit()
359    ///     .await?;
360    /// ```
361    pub fn update_properties(mut self, entries: Vec<(String, String)>) -> Self {
362        if let Some(ref mut operation) = self.operations[UPDATE_PROPERTIES_INDEX] {
363            if let Operation::UpdateProperties(props) = operation {
364                props.extend_from_slice(&entries);
365            }
366        } else {
367            self.operations[UPDATE_PROPERTIES_INDEX] = Some(Operation::UpdateProperties(entries));
368        }
369        self
370    }
371    /// Sets a snapshot reference for the table
372    ///
373    /// This operation creates or updates a named reference to a specific snapshot,
374    /// allowing for features like branches and tags.
375    ///
376    /// # Arguments
377    /// * `entry` - Tuple of (reference name, snapshot reference) defining the reference
378    ///
379    /// # Returns
380    /// * `Self` - The transaction builder for method chaining
381    ///
382    /// # Examples
383    /// ```
384    /// let transaction = table.new_transaction(None)
385    ///     .set_snapshot_ref((
386    ///         "test-branch".to_string(),
387    ///         SnapshotReference {
388    ///             snapshot_id: 123,
389    ///             retention: SnapshotRetention::default(),
390    ///         }
391    ///     ))
392    ///     .commit()
393    ///     .await?;
394    /// ```
395    pub fn set_snapshot_ref(mut self, entry: (String, SnapshotReference)) -> Self {
396        self.operations[SET_SNAPSHOT_REF_INDEX] = Some(Operation::SetSnapshotRef(entry));
397        self
398    }
399
400    /// Expire snapshots based on the provided configuration
401    ///
402    /// This operation expires snapshots according to the retention policies specified.
403    /// It can expire snapshots older than a certain timestamp, retain only the most recent N snapshots,
404    /// and optionally clean up orphaned data files.
405    ///
406    /// # Arguments
407    /// * `older_than` - Optional timestamp (ms since Unix epoch) to expire snapshots older than this time
408    /// * `retain_last` - Optional number of most recent snapshots to keep, regardless of timestamp
409    /// * `clean_orphan_files` - Whether to clean up data files that are no longer referenced
410    /// * `retain_ref_snapshots` - Whether to preserve snapshots that are referenced by branches/tags
411    /// * `dry_run` - Whether to perform a dry run without actually deleting anything
412    ///
413    /// # Returns
414    /// * `Self` - The transaction builder for method chaining
415    ///
416    /// # Examples
417    /// ```
418    /// let result = table.new_transaction(None)
419    ///     .expire_snapshots(
420    ///         Some(chrono::Utc::now().timestamp_millis() - 7 * 24 * 60 * 60 * 1000),
421    ///         Some(5),
422    ///         true,
423    ///         true,
424    ///         false
425    ///     )
426    ///     .commit()
427    ///     .await?;
428    /// ```
429    pub fn expire_snapshots(
430        mut self,
431        older_than: Option<i64>,
432        retain_last: Option<usize>,
433        clean_orphan_files: bool,
434        retain_ref_snapshots: bool,
435        dry_run: bool,
436    ) -> Self {
437        self.operations[EXPIRE_SNAPSHOTS_INDEX] = Some(Operation::ExpireSnapshots {
438            older_than,
439            retain_last,
440            _clean_orphan_files: clean_orphan_files,
441            retain_ref_snapshots,
442            dry_run,
443        });
444        self
445    }
446
447    /// Commits all operations in this transaction atomically
448    ///
449    /// This method executes all operations in the transaction and updates the table
450    /// metadata. The changes are atomic - either all operations succeed or none do.
451    /// After commit, the transaction is consumed and the table is updated with the
452    /// new metadata.
453    ///
454    /// # Returns
455    /// * `Result<(), Error>` - Ok(()) if the commit succeeds, Error if it fails
456    ///
457    /// # Errors
458    /// Returns an error if:
459    /// * Any operation fails to execute
460    /// * The catalog update fails
461    /// * Cleanup of old data files fails (for replace operations)
462    ///
463    /// # Examples
464    /// ```
465    /// let result = table.new_transaction(None)
466    ///     .append(data_files)
467    ///     .update_properties(properties)
468    ///     .commit()
469    ///     .await?;
470    /// ```
471    #[instrument(name = "iceberg_rust::table::transaction::commit", level = "debug", skip(self), fields(
472        table_identifier = %self.table.identifier,
473        branch = ?self.branch
474    ))]
475    pub async fn commit(self) -> Result<(), Error> {
476        let catalog = self.table.catalog();
477        let identifier = self.table.identifier.clone();
478
479        // Execute the table operations
480        let (mut requirements, mut updates) = (Vec::new(), Vec::new());
481        for operation in self.operations.into_iter().flatten() {
482            let (requirement, update) = operation
483                .execute(self.table.metadata(), self.table.object_store())
484                .await?;
485
486            if let Some(requirement) = requirement {
487                requirements.push(requirement);
488            }
489            updates.extend(update);
490        }
491
492        if updates.is_empty() {
493            return Ok(());
494        }
495
496        debug!(
497            "Committing {} updates to table {}: requirements={:?}, updates={:?}",
498            updates.len(),
499            identifier,
500            requirements,
501            updates
502        );
503
504        let new_table = catalog
505            .clone()
506            .update_table(CommitTable {
507                identifier,
508                requirements,
509                updates,
510            })
511            .await?;
512
513        *self.table = new_table;
514        Ok(())
515    }
516}