iceberg_rust/table/transaction/mod.rs
1//! Transaction module for atomic table operations
2//!
3//! This module provides the transaction system for Iceberg tables, allowing multiple
4//! operations to be grouped and executed atomically. The main types are:
5//!
6//! * [`TableTransaction`] - Builder for creating and executing atomic transactions
7//! * [`Operation`] - Individual operations that can be part of a transaction
8//!
9//! Transactions ensure that either all operations succeed or none do, maintaining
10//! table consistency. Common operations include:
11//!
12//! * Adding/updating schemas
13//! * Appending data files
14//! * Replacing data files
15//! * Updating table properties
16//! * Managing snapshots and branches
17
18use std::collections::HashMap;
19use tracing::{debug, instrument};
20
21use iceberg_rust_spec::spec::{manifest::DataFile, schema::Schema, snapshot::SnapshotReference};
22
23use crate::table::transaction::append::append_summary;
24use crate::table::transaction::operation::SequenceGroup;
25use crate::{catalog::commit::CommitTable, error::Error, table::Table};
26
27use self::operation::Operation;
28
29pub(crate) mod append;
30pub(crate) mod operation;
31pub(crate) mod overwrite;
32
33pub(crate) static ADD_SCHEMA_INDEX: usize = 0;
34pub(crate) static SET_DEFAULT_SPEC_INDEX: usize = 1;
35pub(crate) static APPEND_INDEX: usize = 2;
36pub(crate) static APPEND_SEQUENCE_GROUPS_INDEX: usize = 3;
37pub(crate) static REPLACE_INDEX: usize = 4;
38pub(crate) static OVERWRITE_INDEX: usize = 5;
39pub(crate) static UPDATE_PROPERTIES_INDEX: usize = 6;
40pub(crate) static SET_SNAPSHOT_REF_INDEX: usize = 7;
41pub(crate) static EXPIRE_SNAPSHOTS_INDEX: usize = 8;
42
43pub(crate) static NUM_OPERATIONS: usize = 9;
44
45/// A transaction that can perform multiple operations on a table atomically
46///
47/// TableTransaction allows grouping multiple table operations (like schema updates,
48/// appends, overwrites) into a single atomic transaction. The transaction must be
49/// committed for changes to take effect.
50///
51/// # Type Parameters
52/// * `'table` - Lifetime of the reference to the table being modified
53///
54/// # Examples
55/// ```
56/// let mut table = // ... get table reference
57/// table.new_transaction(None)
58/// .add_schema(new_schema)
59/// .append(data_files)
60/// .commit()
61/// .await?;
62/// ```
63pub struct TableTransaction<'table> {
64 table: &'table mut Table,
65 operations: Vec<Option<Operation>>,
66 branch: Option<String>,
67}
68
69impl<'table> TableTransaction<'table> {
70 /// Create a transaction for the given table.
71 pub(crate) fn new(table: &'table mut Table, branch: Option<&str>) -> Self {
72 TableTransaction {
73 table,
74 operations: (0..NUM_OPERATIONS).map(|_| None).collect(), // 6 operation types
75 branch: branch.map(ToString::to_string),
76 }
77 }
78 /// Adds a new schema to the table
79 ///
80 /// This operation adds a new schema version to the table. The schema ID will be
81 /// automatically assigned when the transaction is committed.
82 ///
83 /// # Arguments
84 /// * `schema` - The new schema to add to the table
85 ///
86 /// # Returns
87 /// * `Self` - The transaction builder for method chaining
88 pub fn add_schema(mut self, schema: Schema) -> Self {
89 self.operations[ADD_SCHEMA_INDEX] = Some(Operation::AddSchema(schema));
90 self
91 }
92 /// Sets the default partition specification ID for the table
93 ///
94 /// # Arguments
95 /// * `spec_id` - The ID of the partition specification to set as default
96 ///
97 /// # Returns
98 /// * `Self` - The transaction builder for method chaining
99 ///
100 /// The specified partition specification must already exist in the table metadata.
101 pub fn set_default_spec(mut self, spec_id: i32) -> Self {
102 self.operations[SET_DEFAULT_SPEC_INDEX] = Some(Operation::SetDefaultSpec(spec_id));
103 self
104 }
105 /// Appends new data files to the table
106 ///
107 /// This operation adds new data files to the table's current snapshot. Multiple
108 /// append operations in the same transaction will be combined.
109 ///
110 /// # Arguments
111 /// * `files` - Vector of data files to append to the table
112 ///
113 /// # Returns
114 /// * `Self` - The transaction builder for method chaining
115 ///
116 /// # Examples
117 /// ```
118 /// let transaction = table.new_transaction(None)
119 /// .append_data(data_files)
120 /// .commit()
121 /// .await?;
122 /// ```
123 pub fn append_data(mut self, files: Vec<DataFile>) -> Self {
124 if self.operations[APPEND_SEQUENCE_GROUPS_INDEX].is_some() {
125 panic!("Cannot use append and append_sequence_group in the same transaction");
126 }
127 let summary = append_summary(&files);
128
129 if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
130 if let Operation::Append {
131 data_files: old, ..
132 } = operation
133 {
134 old.extend_from_slice(&files);
135 }
136 } else {
137 self.operations[APPEND_INDEX] = Some(Operation::Append {
138 branch: self.branch.clone(),
139 data_files: files,
140 delete_files: Vec::new(),
141 additional_summary: summary,
142 });
143 }
144 self
145 }
146 /// Appends delete files to the table
147 ///
148 /// This operation adds files that mark records for deletion in the table's current snapshot.
149 /// Multiple delete operations in the same transaction will be combined. The delete files
150 /// specify which records should be removed when reading the table.
151 ///
152 /// # Arguments
153 /// * `files` - Vector of delete files to append to the table
154 ///
155 /// # Returns
156 /// * `Self` - The transaction builder for method chaining
157 ///
158 /// # Examples
159 /// ```
160 /// let transaction = table.new_transaction(None)
161 /// .append_delete(delete_files)
162 /// .commit()
163 /// .await?;
164 /// ```
165 pub fn append_delete(mut self, files: Vec<DataFile>) -> Self {
166 if self.operations[APPEND_SEQUENCE_GROUPS_INDEX].is_some() {
167 panic!("Cannot use append and append_sequence_group in the same transaction");
168 }
169 if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
170 if let Operation::Append {
171 delete_files: old, ..
172 } = operation
173 {
174 old.extend_from_slice(&files);
175 }
176 } else {
177 self.operations[APPEND_INDEX] = Some(Operation::Append {
178 branch: self.branch.clone(),
179 data_files: Vec::new(),
180 delete_files: files,
181 additional_summary: None,
182 });
183 }
184 self
185 }
186
187 /// Appends a group of data and delete files to the table
188 ///
189 pub fn append_sequence_group(
190 mut self,
191 data_files: Vec<DataFile>,
192 delete_files: Vec<DataFile>,
193 ) -> Self {
194 if self.operations[APPEND_INDEX].is_some() {
195 panic!("Cannot use append and append_sequence_group in the same transaction");
196 }
197 if let Some(ref mut operation) = self.operations[APPEND_SEQUENCE_GROUPS_INDEX] {
198 if let Operation::AppendSequenceGroups {
199 sequence_groups: old,
200 ..
201 } = operation
202 {
203 old.push(SequenceGroup {
204 delete_files,
205 data_files,
206 });
207 }
208 } else {
209 self.operations[APPEND_SEQUENCE_GROUPS_INDEX] = Some(Operation::AppendSequenceGroups {
210 branch: self.branch.clone(),
211 sequence_groups: vec![SequenceGroup {
212 delete_files,
213 data_files,
214 }],
215 });
216 }
217 self
218 }
219 /// Overwrites specific data files in the table with new ones
220 ///
221 /// This operation replaces specified existing data files with new ones, rather than
222 /// replacing all files (like `replace`) or adding new files (like `append`). It allows
223 /// for selective replacement of data files based on the mapping provided.
224 ///
225 /// Multiple overwrite operations in the same transaction will be combined, with new
226 /// data files appended and the files-to-overwrite mapping merged.
227 ///
228 /// # Arguments
229 /// * `files` - Vector of new data files to add to the table
230 /// * `files_to_overwrite` - HashMap mapping manifest file paths to lists of data file
231 /// paths that should be overwritten/replaced
232 ///
233 /// # Returns
234 /// * `Self` - The transaction builder for method chaining
235 ///
236 /// # Examples
237 /// ```
238 /// use std::collections::HashMap;
239 ///
240 /// let mut files_to_overwrite = HashMap::new();
241 /// files_to_overwrite.insert(
242 /// "manifest-001.avro".to_string(),
243 /// vec!["data-001.parquet".to_string(), "data-002.parquet".to_string()]
244 /// );
245 ///
246 /// let transaction = table.new_transaction(None)
247 /// .overwrite(new_data_files, files_to_overwrite)
248 /// .commit()
249 /// .await?;
250 /// ```
251 pub fn overwrite(
252 mut self,
253 files: Vec<DataFile>,
254 files_to_overwrite: HashMap<String, Vec<String>>,
255 ) -> Self {
256 let summary = append_summary(&files);
257
258 if let Some(ref mut operation) = self.operations[OVERWRITE_INDEX] {
259 if let Operation::Overwrite {
260 data_files: old_data_files,
261 files_to_overwrite: old_files_to_overwrite,
262 ..
263 } = operation
264 {
265 old_data_files.extend_from_slice(&files);
266 old_files_to_overwrite.extend(files_to_overwrite);
267 }
268 } else {
269 self.operations[OVERWRITE_INDEX] = Some(Operation::Overwrite {
270 branch: self.branch.clone(),
271 data_files: files,
272 files_to_overwrite,
273 additional_summary: summary,
274 });
275 }
276 self
277 }
278 /// Replaces all data files in the table with new ones
279 ///
280 /// This operation removes all existing data files and replaces them with the provided
281 /// files. Multiple replace operations in the same transaction will be combined.
282 ///
283 /// # Arguments
284 /// * `files` - Vector of data files that will replace the existing ones
285 ///
286 /// # Returns
287 /// * `Self` - The transaction builder for method chaining
288 ///
289 /// # Examples
290 /// ```
291 /// let transaction = table.new_transaction(None)
292 /// .replace(new_files)
293 /// .commit()
294 /// .await?;
295 /// ```
296 pub fn replace(mut self, files: Vec<DataFile>) -> Self {
297 if let Some(ref mut operation) = self.operations[REPLACE_INDEX] {
298 if let Operation::Replace {
299 branch: _,
300 files: old,
301 additional_summary: None,
302 } = operation
303 {
304 old.extend_from_slice(&files);
305 }
306 } else {
307 self.operations[REPLACE_INDEX] = Some(Operation::Replace {
308 branch: self.branch.clone(),
309 files,
310 additional_summary: None,
311 });
312 }
313 self
314 }
315 /// Quickly append files to the table
316 pub fn replace_with_lineage(
317 mut self,
318 files: Vec<DataFile>,
319 additional_summary: std::collections::HashMap<String, String>,
320 ) -> Self {
321 if let Some(ref mut operation) = self.operations[REPLACE_INDEX] {
322 if let Operation::Replace {
323 branch: _,
324 files: old,
325 additional_summary: old_lineage,
326 } = operation
327 {
328 old.extend_from_slice(&files);
329 *old_lineage = Some(additional_summary.clone());
330 }
331 } else {
332 self.operations[REPLACE_INDEX] = Some(Operation::Replace {
333 branch: self.branch.clone(),
334 files,
335 additional_summary: Some(additional_summary),
336 });
337 }
338 self
339 }
340 /// Updates the table properties with new key-value pairs
341 ///
342 /// This operation adds or updates table properties. Multiple update operations
343 /// in the same transaction will be combined.
344 ///
345 /// # Arguments
346 /// * `entries` - Vector of (key, value) pairs to update in the table properties
347 ///
348 /// # Returns
349 /// * `Self` - The transaction builder for method chaining
350 ///
351 /// # Examples
352 /// ```
353 /// let transaction = table.new_transaction(None)
354 /// .update_properties(vec![
355 /// ("write.format.default".to_string(), "parquet".to_string()),
356 /// ("write.metadata.compression-codec".to_string(), "gzip".to_string())
357 /// ])
358 /// .commit()
359 /// .await?;
360 /// ```
361 pub fn update_properties(mut self, entries: Vec<(String, String)>) -> Self {
362 if let Some(ref mut operation) = self.operations[UPDATE_PROPERTIES_INDEX] {
363 if let Operation::UpdateProperties(props) = operation {
364 props.extend_from_slice(&entries);
365 }
366 } else {
367 self.operations[UPDATE_PROPERTIES_INDEX] = Some(Operation::UpdateProperties(entries));
368 }
369 self
370 }
371 /// Sets a snapshot reference for the table
372 ///
373 /// This operation creates or updates a named reference to a specific snapshot,
374 /// allowing for features like branches and tags.
375 ///
376 /// # Arguments
377 /// * `entry` - Tuple of (reference name, snapshot reference) defining the reference
378 ///
379 /// # Returns
380 /// * `Self` - The transaction builder for method chaining
381 ///
382 /// # Examples
383 /// ```
384 /// let transaction = table.new_transaction(None)
385 /// .set_snapshot_ref((
386 /// "test-branch".to_string(),
387 /// SnapshotReference {
388 /// snapshot_id: 123,
389 /// retention: SnapshotRetention::default(),
390 /// }
391 /// ))
392 /// .commit()
393 /// .await?;
394 /// ```
395 pub fn set_snapshot_ref(mut self, entry: (String, SnapshotReference)) -> Self {
396 self.operations[SET_SNAPSHOT_REF_INDEX] = Some(Operation::SetSnapshotRef(entry));
397 self
398 }
399
400 /// Expire snapshots based on the provided configuration
401 ///
402 /// This operation expires snapshots according to the retention policies specified.
403 /// It can expire snapshots older than a certain timestamp, retain only the most recent N snapshots,
404 /// and optionally clean up orphaned data files.
405 ///
406 /// # Arguments
407 /// * `older_than` - Optional timestamp (ms since Unix epoch) to expire snapshots older than this time
408 /// * `retain_last` - Optional number of most recent snapshots to keep, regardless of timestamp
409 /// * `clean_orphan_files` - Whether to clean up data files that are no longer referenced
410 /// * `retain_ref_snapshots` - Whether to preserve snapshots that are referenced by branches/tags
411 /// * `dry_run` - Whether to perform a dry run without actually deleting anything
412 ///
413 /// # Returns
414 /// * `Self` - The transaction builder for method chaining
415 ///
416 /// # Examples
417 /// ```
418 /// let result = table.new_transaction(None)
419 /// .expire_snapshots(
420 /// Some(chrono::Utc::now().timestamp_millis() - 7 * 24 * 60 * 60 * 1000),
421 /// Some(5),
422 /// true,
423 /// true,
424 /// false
425 /// )
426 /// .commit()
427 /// .await?;
428 /// ```
429 pub fn expire_snapshots(
430 mut self,
431 older_than: Option<i64>,
432 retain_last: Option<usize>,
433 clean_orphan_files: bool,
434 retain_ref_snapshots: bool,
435 dry_run: bool,
436 ) -> Self {
437 self.operations[EXPIRE_SNAPSHOTS_INDEX] = Some(Operation::ExpireSnapshots {
438 older_than,
439 retain_last,
440 _clean_orphan_files: clean_orphan_files,
441 retain_ref_snapshots,
442 dry_run,
443 });
444 self
445 }
446
447 /// Commits all operations in this transaction atomically
448 ///
449 /// This method executes all operations in the transaction and updates the table
450 /// metadata. The changes are atomic - either all operations succeed or none do.
451 /// After commit, the transaction is consumed and the table is updated with the
452 /// new metadata.
453 ///
454 /// # Returns
455 /// * `Result<(), Error>` - Ok(()) if the commit succeeds, Error if it fails
456 ///
457 /// # Errors
458 /// Returns an error if:
459 /// * Any operation fails to execute
460 /// * The catalog update fails
461 /// * Cleanup of old data files fails (for replace operations)
462 ///
463 /// # Examples
464 /// ```
465 /// let result = table.new_transaction(None)
466 /// .append(data_files)
467 /// .update_properties(properties)
468 /// .commit()
469 /// .await?;
470 /// ```
471 #[instrument(name = "iceberg_rust::table::transaction::commit", level = "debug", skip(self), fields(
472 table_identifier = %self.table.identifier,
473 branch = ?self.branch
474 ))]
475 pub async fn commit(self) -> Result<(), Error> {
476 let catalog = self.table.catalog();
477 let identifier = self.table.identifier.clone();
478
479 // Execute the table operations
480 let (mut requirements, mut updates) = (Vec::new(), Vec::new());
481 for operation in self.operations.into_iter().flatten() {
482 let (requirement, update) = operation
483 .execute(self.table.metadata(), self.table.object_store())
484 .await?;
485
486 if let Some(requirement) = requirement {
487 requirements.push(requirement);
488 }
489 updates.extend(update);
490 }
491
492 if updates.is_empty() {
493 return Ok(());
494 }
495
496 debug!(
497 "Committing {} updates to table {}: requirements={:?}, updates={:?}",
498 updates.len(),
499 identifier,
500 requirements,
501 updates
502 );
503
504 let new_table = catalog
505 .clone()
506 .update_table(CommitTable {
507 identifier,
508 requirements,
509 updates,
510 })
511 .await?;
512
513 *self.table = new_table;
514 Ok(())
515 }
516}