iceberg-rust 0.10.0

Unofficial rust implementation of the Iceberg table format
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
//! Transaction module for atomic table operations
//!
//! This module provides the transaction system for Iceberg tables, allowing multiple
//! operations to be grouped and executed atomically. The main types are:
//!
//! * [`TableTransaction`] - Builder for creating and executing atomic transactions
//! * [`Operation`] - Individual operations that can be part of a transaction
//!
//! Transactions ensure that either all operations succeed or none do, maintaining
//! table consistency. Common operations include:
//!
//! * Adding/updating schemas
//! * Appending data files
//! * Replacing data files
//! * Updating table properties
//! * Managing snapshots and branches

use std::collections::HashMap;
use tracing::{debug, instrument};

use iceberg_rust_spec::spec::{manifest::DataFile, schema::Schema, snapshot::SnapshotReference};

use crate::table::transaction::append::append_summary;
use crate::table::transaction::operation::SequenceGroup;
use crate::{catalog::commit::CommitTable, error::Error, table::Table};

use self::operation::Operation;

pub(crate) mod append;
pub(crate) mod operation;
pub(crate) mod overwrite;

pub(crate) static ADD_SCHEMA_INDEX: usize = 0;
pub(crate) static SET_DEFAULT_SPEC_INDEX: usize = 1;
pub(crate) static APPEND_INDEX: usize = 2;
pub(crate) static APPEND_SEQUENCE_GROUPS_INDEX: usize = 3;
pub(crate) static REPLACE_INDEX: usize = 4;
pub(crate) static OVERWRITE_INDEX: usize = 5;
pub(crate) static UPDATE_PROPERTIES_INDEX: usize = 6;
pub(crate) static SET_SNAPSHOT_REF_INDEX: usize = 7;
pub(crate) static EXPIRE_SNAPSHOTS_INDEX: usize = 8;

pub(crate) static NUM_OPERATIONS: usize = 9;

/// A transaction that can perform multiple operations on a table atomically
///
/// TableTransaction allows grouping multiple table operations (like schema updates,
/// appends, overwrites) into a single atomic transaction. The transaction must be
/// committed for changes to take effect.
///
/// # Type Parameters
/// * `'table` - Lifetime of the reference to the table being modified
///
/// # Examples
/// ```
/// let mut table = // ... get table reference
/// table.new_transaction(None)
///     .add_schema(new_schema)
///     .append(data_files)
///     .commit()
///     .await?;
/// ```
pub struct TableTransaction<'table> {
    table: &'table mut Table,
    operations: Vec<Option<Operation>>,
    branch: Option<String>,
}

impl<'table> TableTransaction<'table> {
    /// Create a transaction for the given table.
    pub(crate) fn new(table: &'table mut Table, branch: Option<&str>) -> Self {
        TableTransaction {
            table,
            operations: (0..NUM_OPERATIONS).map(|_| None).collect(), // 6 operation types
            branch: branch.map(ToString::to_string),
        }
    }
    /// Adds a new schema to the table
    ///
    /// This operation adds a new schema version to the table. The schema ID will be
    /// automatically assigned when the transaction is committed.
    ///
    /// # Arguments
    /// * `schema` - The new schema to add to the table
    ///
    /// # Returns
    /// * `Self` - The transaction builder for method chaining
    pub fn add_schema(mut self, schema: Schema) -> Self {
        self.operations[ADD_SCHEMA_INDEX] = Some(Operation::AddSchema(schema));
        self
    }
    /// Sets the default partition specification ID for the table
    ///
    /// # Arguments
    /// * `spec_id` - The ID of the partition specification to set as default
    ///
    /// # Returns
    /// * `Self` - The transaction builder for method chaining
    ///
    /// The specified partition specification must already exist in the table metadata.
    pub fn set_default_spec(mut self, spec_id: i32) -> Self {
        self.operations[SET_DEFAULT_SPEC_INDEX] = Some(Operation::SetDefaultSpec(spec_id));
        self
    }
    /// Appends new data files to the table
    ///
    /// This operation adds new data files to the table's current snapshot. Multiple
    /// append operations in the same transaction will be combined.
    ///
    /// # Arguments
    /// * `files` - Vector of data files to append to the table
    ///
    /// # Returns
    /// * `Self` - The transaction builder for method chaining
    ///
    /// # Examples
    /// ```
    /// let transaction = table.new_transaction(None)
    ///     .append_data(data_files)
    ///     .commit()
    ///     .await?;
    /// ```
    pub fn append_data(mut self, files: Vec<DataFile>) -> Self {
        if self.operations[APPEND_SEQUENCE_GROUPS_INDEX].is_some() {
            panic!("Cannot use append and append_sequence_group in the same transaction");
        }
        let summary = append_summary(&files);

        if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
            if let Operation::Append {
                data_files: old, ..
            } = operation
            {
                old.extend_from_slice(&files);
            }
        } else {
            self.operations[APPEND_INDEX] = Some(Operation::Append {
                branch: self.branch.clone(),
                data_files: files,
                delete_files: Vec::new(),
                additional_summary: summary,
            });
        }
        self
    }
    /// Appends delete files to the table
    ///
    /// This operation adds files that mark records for deletion in the table's current snapshot.
    /// Multiple delete operations in the same transaction will be combined. The delete files
    /// specify which records should be removed when reading the table.
    ///
    /// # Arguments
    /// * `files` - Vector of delete files to append to the table
    ///
    /// # Returns
    /// * `Self` - The transaction builder for method chaining
    ///
    /// # Examples
    /// ```
    /// let transaction = table.new_transaction(None)
    ///     .append_delete(delete_files)
    ///     .commit()
    ///     .await?;
    /// ```
    pub fn append_delete(mut self, files: Vec<DataFile>) -> Self {
        if self.operations[APPEND_SEQUENCE_GROUPS_INDEX].is_some() {
            panic!("Cannot use append and append_sequence_group in the same transaction");
        }
        if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
            if let Operation::Append {
                delete_files: old, ..
            } = operation
            {
                old.extend_from_slice(&files);
            }
        } else {
            self.operations[APPEND_INDEX] = Some(Operation::Append {
                branch: self.branch.clone(),
                data_files: Vec::new(),
                delete_files: files,
                additional_summary: None,
            });
        }
        self
    }

    /// Appends a group of data and delete files to the table
    ///
    pub fn append_sequence_group(
        mut self,
        data_files: Vec<DataFile>,
        delete_files: Vec<DataFile>,
    ) -> Self {
        if self.operations[APPEND_INDEX].is_some() {
            panic!("Cannot use append and append_sequence_group in the same transaction");
        }
        if let Some(ref mut operation) = self.operations[APPEND_SEQUENCE_GROUPS_INDEX] {
            if let Operation::AppendSequenceGroups {
                sequence_groups: old,
                ..
            } = operation
            {
                old.push(SequenceGroup {
                    delete_files,
                    data_files,
                });
            }
        } else {
            self.operations[APPEND_SEQUENCE_GROUPS_INDEX] = Some(Operation::AppendSequenceGroups {
                branch: self.branch.clone(),
                sequence_groups: vec![SequenceGroup {
                    delete_files,
                    data_files,
                }],
            });
        }
        self
    }
    /// Overwrites specific data files in the table with new ones
    ///
    /// This operation replaces specified existing data files with new ones, rather than
    /// replacing all files (like `replace`) or adding new files (like `append`). It allows
    /// for selective replacement of data files based on the mapping provided.
    ///
    /// Multiple overwrite operations in the same transaction will be combined, with new
    /// data files appended and the files-to-overwrite mapping merged.
    ///
    /// # Arguments
    /// * `files` - Vector of new data files to add to the table
    /// * `files_to_overwrite` - HashMap mapping manifest file paths to lists of data file
    ///   paths that should be overwritten/replaced
    ///
    /// # Returns
    /// * `Self` - The transaction builder for method chaining
    ///
    /// # Examples
    /// ```
    /// use std::collections::HashMap;
    ///
    /// let mut files_to_overwrite = HashMap::new();
    /// files_to_overwrite.insert(
    ///     "manifest-001.avro".to_string(),
    ///     vec!["data-001.parquet".to_string(), "data-002.parquet".to_string()]
    /// );
    ///
    /// let transaction = table.new_transaction(None)
    ///     .overwrite(new_data_files, files_to_overwrite)
    ///     .commit()
    ///     .await?;
    /// ```
    pub fn overwrite(
        mut self,
        files: Vec<DataFile>,
        files_to_overwrite: HashMap<String, Vec<String>>,
    ) -> Self {
        let summary = append_summary(&files);

        if let Some(ref mut operation) = self.operations[OVERWRITE_INDEX] {
            if let Operation::Overwrite {
                data_files: old_data_files,
                files_to_overwrite: old_files_to_overwrite,
                ..
            } = operation
            {
                old_data_files.extend_from_slice(&files);
                old_files_to_overwrite.extend(files_to_overwrite);
            }
        } else {
            self.operations[OVERWRITE_INDEX] = Some(Operation::Overwrite {
                branch: self.branch.clone(),
                data_files: files,
                files_to_overwrite,
                additional_summary: summary,
            });
        }
        self
    }
    /// Replaces all data files in the table with new ones
    ///
    /// This operation removes all existing data files and replaces them with the provided
    /// files. Multiple replace operations in the same transaction will be combined.
    ///
    /// # Arguments
    /// * `files` - Vector of data files that will replace the existing ones
    ///
    /// # Returns
    /// * `Self` - The transaction builder for method chaining
    ///
    /// # Examples
    /// ```
    /// let transaction = table.new_transaction(None)
    ///     .replace(new_files)
    ///     .commit()
    ///     .await?;
    /// ```
    pub fn replace(mut self, files: Vec<DataFile>) -> Self {
        if let Some(ref mut operation) = self.operations[REPLACE_INDEX] {
            if let Operation::Replace {
                branch: _,
                files: old,
                additional_summary: None,
            } = operation
            {
                old.extend_from_slice(&files);
            }
        } else {
            self.operations[REPLACE_INDEX] = Some(Operation::Replace {
                branch: self.branch.clone(),
                files,
                additional_summary: None,
            });
        }
        self
    }
    /// Quickly append files to the table
    pub fn replace_with_lineage(
        mut self,
        files: Vec<DataFile>,
        additional_summary: std::collections::HashMap<String, String>,
    ) -> Self {
        if let Some(ref mut operation) = self.operations[REPLACE_INDEX] {
            if let Operation::Replace {
                branch: _,
                files: old,
                additional_summary: old_lineage,
            } = operation
            {
                old.extend_from_slice(&files);
                *old_lineage = Some(additional_summary.clone());
            }
        } else {
            self.operations[REPLACE_INDEX] = Some(Operation::Replace {
                branch: self.branch.clone(),
                files,
                additional_summary: Some(additional_summary),
            });
        }
        self
    }
    /// Updates the table properties with new key-value pairs
    ///
    /// This operation adds or updates table properties. Multiple update operations
    /// in the same transaction will be combined.
    ///
    /// # Arguments
    /// * `entries` - Vector of (key, value) pairs to update in the table properties
    ///
    /// # Returns
    /// * `Self` - The transaction builder for method chaining
    ///
    /// # Examples
    /// ```
    /// let transaction = table.new_transaction(None)
    ///     .update_properties(vec![
    ///         ("write.format.default".to_string(), "parquet".to_string()),
    ///         ("write.metadata.compression-codec".to_string(), "gzip".to_string())
    ///     ])
    ///     .commit()
    ///     .await?;
    /// ```
    pub fn update_properties(mut self, entries: Vec<(String, String)>) -> Self {
        if let Some(ref mut operation) = self.operations[UPDATE_PROPERTIES_INDEX] {
            if let Operation::UpdateProperties(props) = operation {
                props.extend_from_slice(&entries);
            }
        } else {
            self.operations[UPDATE_PROPERTIES_INDEX] = Some(Operation::UpdateProperties(entries));
        }
        self
    }
    /// Sets a snapshot reference for the table
    ///
    /// This operation creates or updates a named reference to a specific snapshot,
    /// allowing for features like branches and tags.
    ///
    /// # Arguments
    /// * `entry` - Tuple of (reference name, snapshot reference) defining the reference
    ///
    /// # Returns
    /// * `Self` - The transaction builder for method chaining
    ///
    /// # Examples
    /// ```
    /// let transaction = table.new_transaction(None)
    ///     .set_snapshot_ref((
    ///         "test-branch".to_string(),
    ///         SnapshotReference {
    ///             snapshot_id: 123,
    ///             retention: SnapshotRetention::default(),
    ///         }
    ///     ))
    ///     .commit()
    ///     .await?;
    /// ```
    pub fn set_snapshot_ref(mut self, entry: (String, SnapshotReference)) -> Self {
        self.operations[SET_SNAPSHOT_REF_INDEX] = Some(Operation::SetSnapshotRef(entry));
        self
    }

    /// Expire snapshots based on the provided configuration
    ///
    /// This operation expires snapshots according to the retention policies specified.
    /// It can expire snapshots older than a certain timestamp, retain only the most recent N snapshots,
    /// and optionally clean up orphaned data files.
    ///
    /// # Arguments
    /// * `older_than` - Optional timestamp (ms since Unix epoch) to expire snapshots older than this time
    /// * `retain_last` - Optional number of most recent snapshots to keep, regardless of timestamp
    /// * `clean_orphan_files` - Whether to clean up data files that are no longer referenced
    /// * `retain_ref_snapshots` - Whether to preserve snapshots that are referenced by branches/tags
    /// * `dry_run` - Whether to perform a dry run without actually deleting anything
    ///
    /// # Returns
    /// * `Self` - The transaction builder for method chaining
    ///
    /// # Examples
    /// ```
    /// let result = table.new_transaction(None)
    ///     .expire_snapshots(
    ///         Some(chrono::Utc::now().timestamp_millis() - 7 * 24 * 60 * 60 * 1000),
    ///         Some(5),
    ///         true,
    ///         true,
    ///         false
    ///     )
    ///     .commit()
    ///     .await?;
    /// ```
    pub fn expire_snapshots(
        mut self,
        older_than: Option<i64>,
        retain_last: Option<usize>,
        clean_orphan_files: bool,
        retain_ref_snapshots: bool,
        dry_run: bool,
    ) -> Self {
        self.operations[EXPIRE_SNAPSHOTS_INDEX] = Some(Operation::ExpireSnapshots {
            older_than,
            retain_last,
            _clean_orphan_files: clean_orphan_files,
            retain_ref_snapshots,
            dry_run,
        });
        self
    }

    /// Commits all operations in this transaction atomically
    ///
    /// This method executes all operations in the transaction and updates the table
    /// metadata. The changes are atomic - either all operations succeed or none do.
    /// After commit, the transaction is consumed and the table is updated with the
    /// new metadata.
    ///
    /// # Returns
    /// * `Result<(), Error>` - Ok(()) if the commit succeeds, Error if it fails
    ///
    /// # Errors
    /// Returns an error if:
    /// * Any operation fails to execute
    /// * The catalog update fails
    /// * Cleanup of old data files fails (for replace operations)
    ///
    /// # Examples
    /// ```
    /// let result = table.new_transaction(None)
    ///     .append(data_files)
    ///     .update_properties(properties)
    ///     .commit()
    ///     .await?;
    /// ```
    #[instrument(name = "iceberg_rust::table::transaction::commit", level = "debug", skip(self), fields(
        table_identifier = %self.table.identifier,
        branch = ?self.branch
    ))]
    pub async fn commit(self) -> Result<(), Error> {
        let catalog = self.table.catalog();
        let identifier = self.table.identifier.clone();

        // Execute the table operations
        let (mut requirements, mut updates) = (Vec::new(), Vec::new());
        for operation in self.operations.into_iter().flatten() {
            let (requirement, update) = operation
                .execute(self.table.metadata(), self.table.object_store())
                .await?;

            if let Some(requirement) = requirement {
                requirements.push(requirement);
            }
            updates.extend(update);
        }

        if updates.is_empty() {
            return Ok(());
        }

        debug!(
            "Committing {} updates to table {}: requirements={:?}, updates={:?}",
            updates.len(),
            identifier,
            requirements,
            updates
        );

        let new_table = catalog
            .clone()
            .update_table(CommitTable {
                identifier,
                requirements,
                updates,
            })
            .await?;

        *self.table = new_table;
        Ok(())
    }
}