1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
//! Transaction API for writing to Iceberg tables
use crate::spec::DataFile;
use crate::table::Table;
/// Operations that can be performed in a transaction
#[derive(Debug, Clone)]
pub enum TransactionOperation {
/// Append data files
Append(Vec<DataFile>),
/// Rewrite files: atomically delete old files and add new ones.
/// Used for compaction, where we replace N small files with M larger files.
Rewrite {
/// Files to be deleted (marked as deleted in manifest)
files_to_delete: Vec<DataFile>,
/// New files to add (marked as added in manifest)
files_to_add: Vec<DataFile>,
},
}
/// A transaction for modifying a table
pub struct Transaction {
table: Table,
operations: Vec<TransactionOperation>,
}
impl Transaction {
/// Create a new transaction
pub(crate) fn new(table: Table) -> Self {
Self {
table,
operations: Vec::new(),
}
}
/// Get the table this transaction operates on
pub fn table(&self) -> &Table {
&self.table
}
/// Append data files to the table
pub fn append(mut self, data_files: Vec<DataFile>) -> Self {
self.operations
.push(TransactionOperation::Append(data_files));
self
}
/// Rewrite files: atomically delete old files and add new ones.
/// Used for compaction, where we replace N small files with M larger files.
pub fn rewrite(mut self, files_to_delete: Vec<DataFile>, files_to_add: Vec<DataFile>) -> Self {
self.operations.push(TransactionOperation::Rewrite {
files_to_delete,
files_to_add,
});
self
}
/// Check if transaction has any operations
pub fn has_operations(&self) -> bool {
!self.operations.is_empty()
}
/// Get the operations (for internal use)
pub(crate) fn operations(&self) -> &[TransactionOperation] {
&self.operations
}
/// Replace the table metadata backing this transaction (for retries)
pub(crate) fn rebind_table(self, table: Table) -> Self {
Self {
table,
operations: self.operations,
}
}
/// Commit the transaction, writing snapshots to the catalog
pub async fn commit(
self,
catalog: &dyn crate::catalog::Catalog,
timestamp_ms: i64,
) -> crate::error::Result<()> {
crate::commit::commit_transaction(self, catalog, timestamp_ms).await
}
}