iceberg_rust/catalog/commit.rs
1//! Commit operations for atomic metadata updates in Iceberg catalogs
2//!
3//! This module provides the core functionality for atomic commits to Iceberg tables and views:
4//! * Commit structures for tables and views
5//! * Update operations that can be applied
6//! * Requirements that must be satisfied
7//! * Functions to validate requirements and apply updates
8//!
9//! All changes are made atomically - either all updates succeed or none are applied.
10//! Requirements are checked first to ensure concurrent modifications don't corrupt state.
11
12use std::collections::HashMap;
13
14use iceberg_rust_spec::{
15 spec::{
16 partition::PartitionSpec,
17 schema::Schema,
18 snapshot::{Snapshot, SnapshotReference},
19 sort::SortOrder,
20 table_metadata::TableMetadata,
21 view_metadata::{GeneralViewMetadata, Version},
22 },
23 table_metadata::SnapshotLog,
24 view_metadata::Materialization,
25};
26use serde_derive::{Deserialize, Serialize};
27use uuid::Uuid;
28
29use crate::error::Error;
30
31use super::identifier::Identifier;
32
33/// A commit operation to update table metadata in an Iceberg catalog
34///
35/// This struct represents an atomic commit operation that can:
36/// * Update table metadata
37/// * Add or remove snapshots
38/// * Modify schema, partition specs, and sort orders
39///
40/// The commit includes both requirements that must be satisfied and
41/// a list of updates to apply atomically.
42#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
43pub struct CommitTable {
44 /// Table identifier
45 pub identifier: Identifier,
46 /// Assertions about the metadata that must be true to update the metadata
47 pub requirements: Vec<TableRequirement>,
48 /// Changes to the table metadata
49 pub updates: Vec<TableUpdate>,
50}
51
52/// A commit operation to update view metadata in an Iceberg catalog
53///
54/// This struct represents an atomic commit operation that can:
55/// * Update view metadata
56/// * Add or modify schemas
57/// * Update view versions
58/// * Modify location and properties
59///
60/// The commit includes both requirements that must be satisfied and
61/// a list of updates to apply atomically.
62///
63/// # Type Parameters
64/// * `T` - The materialization type for the view, typically `Option<()>` for regular views
65/// or custom types for materialized views
66#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
67pub struct CommitView<T: Materialization> {
68 /// Table identifier
69 pub identifier: Identifier,
70 /// Assertions about the metadata that must be true to update the metadata
71 pub requirements: Vec<ViewRequirement>,
72 /// Changes to the table metadata
73 pub updates: Vec<ViewUpdate<T>>,
74}
75
76/// Updates that can be applied to table metadata in a commit operation
77///
78/// This enum represents all possible modifications that can be made to table metadata:
79/// * UUID assignment (only during table creation)
80/// * Format version updates
81/// * Schema modifications
82/// * Partition spec and sort order changes
83/// * Snapshot management (add, remove, set references)
84/// * Location and property updates
85///
86/// Each variant includes the necessary data for that specific update type.
87/// Updates are applied atomically as part of a commit operation.
88#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
89#[serde(
90 tag = "action",
91 rename_all = "kebab-case",
92 rename_all_fields = "kebab-case"
93)]
94pub enum TableUpdate {
95 /// Assigning a UUID to a table/view should only be done when creating the table/view. It is not safe to re-assign the UUID if a table/view already has a UUID assigned
96 AssignUUID {
97 /// new uuid
98 uuid: String,
99 },
100 /// Update format version
101 UpgradeFormatVersion {
102 /// New format version
103 format_version: i32,
104 },
105 /// The highest assigned column ID for the table. This is used to ensure columns are always assigned an unused ID when evolving schemas. When omitted, it will be computed on the server side.
106 AddSchema {
107 /// Schema to add
108 schema: Schema,
109 /// New last column id
110 last_column_id: Option<i32>,
111 },
112 /// Schema ID to set as current, or -1 to set last added schema
113 SetCurrentSchema {
114 /// New schema_id
115 schema_id: i32,
116 },
117 /// Add new partition spec
118 AddPartitionSpec {
119 /// New partition spec
120 spec: PartitionSpec,
121 },
122 /// Partition spec ID to set as the default, or -1 to set last added spec
123 SetDefaultSpec {
124 /// Spec id to set
125 spec_id: i32,
126 },
127 /// Add a new sort order
128 AddSortOrder {
129 /// New sort order
130 sort_order: SortOrder,
131 },
132 /// Sort order ID to set as the default, or -1 to set last added sort order
133 SetDefaultSortOrder {
134 /// Sort order id to set
135 sort_order_id: i32,
136 },
137 /// Add a new snapshot
138 AddSnapshot {
139 /// New snapshot
140 snapshot: Snapshot,
141 },
142 /// Set the current snapshot reference
143 SetSnapshotRef {
144 /// Name of the snapshot refrence
145 ref_name: String,
146 /// Snapshot refernce to set
147 #[serde(flatten)]
148 snapshot_reference: SnapshotReference,
149 },
150 /// Remove snapshots with certain snapshot ids
151 RemoveSnapshots {
152 /// Ids of the snapshots to remove
153 snapshot_ids: Vec<i64>,
154 },
155 /// Remove snapshot reference
156 RemoveSnapshotRef {
157 /// Name of the snapshot ref to remove
158 ref_name: String,
159 },
160 /// Set a new location for the table
161 SetLocation {
162 /// New Location
163 location: String,
164 },
165 /// Set table properties
166 SetProperties {
167 /// Properties to set
168 updates: HashMap<String, String>,
169 },
170 /// Remove table properties
171 RemoveProperties {
172 /// Properties to remove
173 removals: Vec<String>,
174 },
175}
176
177/// Requirements that must be met before applying updates to table metadata
178///
179/// This enum defines preconditions that must be satisfied before a table update
180/// can be committed. Requirements are checked atomically to prevent concurrent
181/// modifications from corrupting table state.
182///
183/// # Requirements Types
184/// * Table existence checks
185/// * UUID validation
186/// * Reference state validation
187/// * Schema and partition spec version checks
188/// * Sort order validation
189/// * Column and partition ID validation
190///
191/// Each variant includes the specific values that must match the current table state.
192#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
193#[serde(
194 tag = "type",
195 rename_all = "kebab-case",
196 rename_all_fields = "kebab-case"
197)]
198pub enum TableRequirement {
199 /// The table must not already exist; used for create transactions
200 AssertCreate,
201 /// The table UUID must match the requirement's `uuid`
202 AssertTableUuid {
203 /// Uuid to assert
204 uuid: Uuid,
205 },
206 /// The table branch or tag identified by the requirement's `ref` must reference the requirement's `snapshot-id`;
207 /// if `snapshot-id` is `null` or missing, the ref must not already exist
208 AssertRefSnapshotId {
209 /// Name of ref
210 r#ref: String,
211 /// Snapshot id
212 snapshot_id: i64,
213 },
214 /// The table's last assigned column id must match the requirement's `last-assigned-field-id`
215 AssertLastAssignedFieldId {
216 /// Id of last assigned id
217 last_assigned_field_id: i32,
218 },
219 /// The table's current schema id must match the requirement's `current-schema-id`
220 AssertCurrentSchemaId {
221 /// Current schema id
222 current_schema_id: i32,
223 },
224 ///The table's last assigned partition id must match the requirement's `last-assigned-partition-id`
225 AssertLastAssignedPartitionId {
226 /// id of last assigned partition
227 last_assigned_partition_id: i32,
228 },
229 /// The table's default spec id must match the requirement's `default-spec-id`
230 AssertDefaultSpecId {
231 /// Default spec id
232 default_spec_id: i32,
233 },
234 /// The table's default sort order id must match the requirement's `default-sort-order-id`
235 AssertDefaultSortOrderId {
236 /// Default sort order id
237 default_sort_order_id: i32,
238 },
239}
240
241/// Updates that can be applied to view metadata in a commit operation
242///
243/// This enum represents all possible modifications that can be made to view metadata:
244/// * UUID assignment (only during view creation)
245/// * Format version updates
246/// * Schema modifications
247/// * Location and property updates
248/// * Version management (add versions, set current version)
249///
250/// # Type Parameters
251/// * `T` - The materialization type for the view, typically `Option<()>` for regular views
252/// or `FullIdentifier` for materialized views
253///
254/// Each variant includes the necessary data for that specific update type.
255/// Updates are applied atomically as part of a commit operation.
256#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
257#[serde(
258 tag = "action",
259 rename_all = "kebab-case",
260 rename_all_fields = "kebab-case"
261)]
262pub enum ViewUpdate<T: Materialization> {
263 /// Assigning a UUID to a table/view should only be done when creating the table/view. It is not safe to re-assign the UUID if a table/view already has a UUID assigned
264 AssignUUID {
265 /// new uuid
266 uuid: String,
267 },
268 /// Update format version
269 UpgradeFormatVersion {
270 /// New format version
271 format_version: i32,
272 },
273 /// The highest assigned column ID for the table. This is used to ensure columns are always assigned an unused ID when evolving schemas. When omitted, it will be computed on the server side.
274 AddSchema {
275 /// Schema to add
276 schema: Schema,
277 /// New last column id
278 last_column_id: Option<i32>,
279 },
280 /// Set a new location for the table
281 SetLocation {
282 /// New Location
283 location: String,
284 },
285 /// Set table properties
286 SetProperties {
287 /// Properties to set
288 updates: HashMap<String, String>,
289 },
290 /// Remove table properties
291 RemoveProperties {
292 /// Properties to remove
293 removals: Vec<String>,
294 },
295 /// Add a new version to the view
296 AddViewVersion {
297 /// Version to add
298 view_version: Version<T>,
299 },
300 /// The view version id to set as current, or -1 to set last added view version id
301 SetCurrentViewVersion {
302 /// The id to set
303 view_version_id: i64,
304 },
305}
306
307/// Requirements that must be met before applying updates to view metadata
308///
309/// This enum defines preconditions that must be satisfied before a view update
310/// can be committed. Requirements are checked atomically to prevent concurrent
311/// modifications from corrupting view state.
312///
313/// # Requirements Types
314/// * UUID validation - Ensures view UUID matches expected value
315///
316/// Each variant includes the specific values that must match the current view state.
317#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
318#[serde(
319 tag = "type",
320 rename_all = "kebab-case",
321 rename_all_fields = "kebab-case"
322)]
323pub enum ViewRequirement {
324 /// The view UUID must match the requirement's `uuid`
325 AssertViewUuid {
326 /// Uuid to assert
327 uuid: Uuid,
328 },
329}
330/// Validates that table metadata meets all specified requirements
331///
332/// This function checks if the current table metadata satisfies all the requirements
333/// specified for a commit operation. It ensures atomic updates by verifying preconditions
334/// like UUID matches, snapshot references, schema versions etc.
335///
336/// # Arguments
337/// * `requirements` - List of requirements that must be satisfied
338/// * `metadata` - Current table metadata to validate against
339///
340/// # Returns
341/// * `true` if all requirements are met
342/// * `false` if any requirement is not satisfied
343pub fn check_table_requirements(
344 requirements: &[TableRequirement],
345 metadata: &TableMetadata,
346) -> bool {
347 requirements.iter().all(|x| match x {
348 // Assert create has to be check in another place
349 TableRequirement::AssertCreate => true,
350 TableRequirement::AssertTableUuid { uuid } => metadata.table_uuid == *uuid,
351 TableRequirement::AssertRefSnapshotId { r#ref, snapshot_id } => metadata
352 .refs
353 .get(r#ref)
354 .map(|id| id.snapshot_id == *snapshot_id)
355 .unwrap_or(false),
356 TableRequirement::AssertLastAssignedFieldId {
357 last_assigned_field_id,
358 } => metadata.last_column_id == *last_assigned_field_id,
359 TableRequirement::AssertCurrentSchemaId { current_schema_id } => {
360 metadata.current_schema_id == *current_schema_id
361 }
362 TableRequirement::AssertLastAssignedPartitionId {
363 last_assigned_partition_id,
364 } => metadata.last_partition_id == *last_assigned_partition_id,
365 TableRequirement::AssertDefaultSpecId { default_spec_id } => {
366 metadata.default_spec_id == *default_spec_id
367 }
368 TableRequirement::AssertDefaultSortOrderId {
369 default_sort_order_id,
370 } => metadata.default_sort_order_id == *default_sort_order_id,
371 })
372}
373
374/// Validates that view metadata meets all specified requirements
375///
376/// This function checks if the current view metadata satisfies all the requirements
377/// specified for a commit operation. It ensures atomic updates by verifying preconditions
378/// like UUID matches.
379///
380/// # Type Parameters
381/// * `T` - The materialization type for the view, must implement Materialization, Eq and 'static
382///
383/// # Arguments
384/// * `requirements` - List of requirements that must be satisfied
385/// * `metadata` - Current view metadata to validate against
386///
387/// # Returns
388/// * `true` if all requirements are met
389/// * `false` if any requirement is not satisfied
390pub fn check_view_requirements<T: Materialization + Eq + 'static>(
391 requirements: &[ViewRequirement],
392 metadata: &GeneralViewMetadata<T>,
393) -> bool {
394 requirements.iter().all(|x| match x {
395 ViewRequirement::AssertViewUuid { uuid } => metadata.view_uuid == *uuid,
396 })
397}
398/// Applies a sequence of updates to table metadata
399///
400/// This function processes each update in order, modifying the table metadata accordingly.
401/// Updates can include:
402/// * Format version changes
403/// * UUID assignments
404/// * Schema modifications
405/// * Partition spec and sort order changes
406/// * Snapshot management
407/// * Location and property updates
408///
409/// # Arguments
410/// * `metadata` - Mutable reference to table metadata to modify
411/// * `updates` - Vector of updates to apply
412///
413/// # Returns
414/// * `Ok(())` if all updates were applied successfully
415/// * `Err(Error)` if any update failed to apply
416pub fn apply_table_updates(
417 metadata: &mut TableMetadata,
418 updates: Vec<TableUpdate>,
419) -> Result<(), Error> {
420 for update in updates {
421 match update {
422 TableUpdate::UpgradeFormatVersion { format_version: _ } => {
423 unimplemented!();
424 }
425 TableUpdate::AssignUUID { uuid } => {
426 metadata.table_uuid = Uuid::parse_str(&uuid)?;
427 }
428 TableUpdate::AddSchema {
429 schema,
430 last_column_id,
431 } => {
432 metadata.schemas.insert(*schema.schema_id(), schema);
433 if let Some(last_column_id) = last_column_id {
434 metadata.last_column_id = last_column_id;
435 }
436 }
437 TableUpdate::SetCurrentSchema { schema_id } => {
438 metadata.current_schema_id = schema_id;
439 }
440 TableUpdate::AddPartitionSpec { spec } => {
441 metadata.partition_specs.insert(*spec.spec_id(), spec);
442 }
443 TableUpdate::SetDefaultSpec { spec_id } => {
444 metadata.default_spec_id = spec_id;
445 }
446 TableUpdate::AddSortOrder { sort_order } => {
447 metadata.sort_orders.insert(sort_order.order_id, sort_order);
448 }
449 TableUpdate::SetDefaultSortOrder { sort_order_id } => {
450 metadata.default_sort_order_id = sort_order_id;
451 }
452 TableUpdate::AddSnapshot { snapshot } => {
453 metadata.snapshot_log.push(SnapshotLog {
454 snapshot_id: *snapshot.snapshot_id(),
455 timestamp_ms: *snapshot.timestamp_ms(),
456 });
457 metadata.last_sequence_number = *snapshot.sequence_number();
458 metadata.snapshots.insert(*snapshot.snapshot_id(), snapshot);
459 }
460 TableUpdate::SetSnapshotRef {
461 ref_name,
462 snapshot_reference,
463 } => {
464 if ref_name == "main" {
465 metadata.current_snapshot_id = Some(snapshot_reference.snapshot_id);
466 }
467 metadata.refs.insert(ref_name, snapshot_reference);
468 }
469 TableUpdate::RemoveSnapshots { snapshot_ids } => {
470 for id in snapshot_ids {
471 metadata.snapshots.remove(&id);
472 }
473 }
474 TableUpdate::RemoveSnapshotRef { ref_name } => {
475 metadata.refs.remove(&ref_name);
476 }
477 TableUpdate::SetLocation { location } => {
478 metadata.location = location;
479 }
480 TableUpdate::SetProperties { updates } => {
481 metadata.properties.extend(updates);
482 }
483 TableUpdate::RemoveProperties { removals } => {
484 for rem in removals {
485 metadata.properties.remove(&rem);
486 }
487 }
488 };
489 }
490 Ok(())
491}
492
493/// Applies a sequence of updates to view metadata
494///
495/// This function processes each update in order, modifying the view metadata accordingly.
496/// Updates can include:
497/// * Format version changes
498/// * UUID assignments
499/// * Schema modifications
500/// * Location and property updates
501/// * Version management
502///
503/// # Type Parameters
504/// * `T` - The materialization type for the view, must implement Materialization + 'static
505///
506/// # Arguments
507/// * `metadata` - Mutable reference to view metadata to modify
508/// * `updates` - Vector of updates to apply
509///
510/// # Returns
511/// * `Ok(())` if all updates were applied successfully
512/// * `Err(Error)` if any update failed to apply
513pub fn apply_view_updates<T: Materialization + 'static>(
514 metadata: &mut GeneralViewMetadata<T>,
515 updates: Vec<ViewUpdate<T>>,
516) -> Result<(), Error> {
517 for update in updates {
518 match update {
519 ViewUpdate::UpgradeFormatVersion { format_version: _ } => {
520 unimplemented!();
521 }
522 ViewUpdate::AssignUUID { uuid } => {
523 metadata.view_uuid = Uuid::parse_str(&uuid)?;
524 }
525 ViewUpdate::AddSchema {
526 schema,
527 last_column_id: _,
528 } => {
529 metadata.schemas.insert(*schema.schema_id(), schema);
530 }
531 ViewUpdate::SetLocation { location } => {
532 metadata.location = location;
533 }
534 ViewUpdate::SetProperties { updates } => {
535 metadata.properties.extend(updates);
536 }
537 ViewUpdate::RemoveProperties { removals } => {
538 for rem in removals {
539 metadata.properties.remove(&rem);
540 }
541 }
542 ViewUpdate::AddViewVersion { view_version } => {
543 metadata
544 .versions
545 .insert(view_version.version_id, view_version);
546 }
547 ViewUpdate::SetCurrentViewVersion { view_version_id } => {
548 metadata.current_version_id = view_version_id;
549 }
550 };
551 }
552 Ok(())
553}