iceberg_rust/catalog/
commit.rs

1//! Commit operations for atomic metadata updates in Iceberg catalogs
2//!
3//! This module provides the core functionality for atomic commits to Iceberg tables and views:
4//! * Commit structures for tables and views
5//! * Update operations that can be applied
6//! * Requirements that must be satisfied
7//! * Functions to validate requirements and apply updates
8//!
9//! All changes are made atomically - either all updates succeed or none are applied.
10//! Requirements are checked first to ensure concurrent modifications don't corrupt state.
11
12use std::collections::HashMap;
13
14use iceberg_rust_spec::{
15    spec::{
16        partition::PartitionSpec,
17        schema::Schema,
18        snapshot::{Snapshot, SnapshotReference},
19        sort::SortOrder,
20        table_metadata::TableMetadata,
21        view_metadata::{GeneralViewMetadata, Version},
22    },
23    table_metadata::SnapshotLog,
24    view_metadata::Materialization,
25};
26use serde_derive::{Deserialize, Serialize};
27use uuid::Uuid;
28
29use crate::error::Error;
30
31use super::identifier::Identifier;
32
33/// A commit operation to update table metadata in an Iceberg catalog
34///
35/// This struct represents an atomic commit operation that can:
36/// * Update table metadata
37/// * Add or remove snapshots
38/// * Modify schema, partition specs, and sort orders
39///
40/// The commit includes both requirements that must be satisfied and
41/// a list of updates to apply atomically.
42#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
43pub struct CommitTable {
44    /// Table identifier
45    pub identifier: Identifier,
46    /// Assertions about the metadata that must be true to update the metadata
47    pub requirements: Vec<TableRequirement>,
48    /// Changes to the table metadata
49    pub updates: Vec<TableUpdate>,
50}
51
52/// A commit operation to update view metadata in an Iceberg catalog
53///
54/// This struct represents an atomic commit operation that can:
55/// * Update view metadata
56/// * Add or modify schemas
57/// * Update view versions
58/// * Modify location and properties
59///
60/// The commit includes both requirements that must be satisfied and
61/// a list of updates to apply atomically.
62///
63/// # Type Parameters
64/// * `T` - The materialization type for the view, typically `Option<()>` for regular views
65///   or custom types for materialized views
66#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
67pub struct CommitView<T: Materialization> {
68    /// Table identifier
69    pub identifier: Identifier,
70    /// Assertions about the metadata that must be true to update the metadata
71    pub requirements: Vec<ViewRequirement>,
72    /// Changes to the table metadata
73    pub updates: Vec<ViewUpdate<T>>,
74}
75
76/// Updates that can be applied to table metadata in a commit operation
77///
78/// This enum represents all possible modifications that can be made to table metadata:
79/// * UUID assignment (only during table creation)
80/// * Format version updates
81/// * Schema modifications
82/// * Partition spec and sort order changes
83/// * Snapshot management (add, remove, set references)
84/// * Location and property updates
85///
86/// Each variant includes the necessary data for that specific update type.
87/// Updates are applied atomically as part of a commit operation.
88#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
89#[serde(
90    tag = "action",
91    rename_all = "kebab-case",
92    rename_all_fields = "kebab-case"
93)]
94pub enum TableUpdate {
95    /// Assigning a UUID to a table/view should only be done when creating the table/view. It is not safe to re-assign the UUID if a table/view already has a UUID assigned
96    AssignUUID {
97        /// new uuid
98        uuid: String,
99    },
100    /// Update format version
101    UpgradeFormatVersion {
102        /// New format version
103        format_version: i32,
104    },
105    /// The highest assigned column ID for the table. This is used to ensure columns are always assigned an unused ID when evolving schemas. When omitted, it will be computed on the server side.
106    AddSchema {
107        /// Schema to add
108        schema: Schema,
109        /// New last column id
110        last_column_id: Option<i32>,
111    },
112    /// Schema ID to set as current, or -1 to set last added schema
113    SetCurrentSchema {
114        /// New schema_id
115        schema_id: i32,
116    },
117    /// Add new partition spec
118    AddPartitionSpec {
119        /// New partition spec
120        spec: PartitionSpec,
121    },
122    /// Partition spec ID to set as the default, or -1 to set last added spec
123    SetDefaultSpec {
124        /// Spec id to set
125        spec_id: i32,
126    },
127    /// Add a new sort order
128    AddSortOrder {
129        /// New sort order
130        sort_order: SortOrder,
131    },
132    /// Sort order ID to set as the default, or -1 to set last added sort order
133    SetDefaultSortOrder {
134        /// Sort order id to set
135        sort_order_id: i32,
136    },
137    /// Add a new snapshot
138    AddSnapshot {
139        /// New snapshot
140        snapshot: Snapshot,
141    },
142    /// Set the current snapshot reference
143    SetSnapshotRef {
144        /// Name of the snapshot refrence
145        ref_name: String,
146        /// Snapshot refernce to set
147        #[serde(flatten)]
148        snapshot_reference: SnapshotReference,
149    },
150    /// Remove snapshots with certain snapshot ids
151    RemoveSnapshots {
152        /// Ids of the snapshots to remove
153        snapshot_ids: Vec<i64>,
154    },
155    /// Remove snapshot reference
156    RemoveSnapshotRef {
157        /// Name of the snapshot ref to remove
158        ref_name: String,
159    },
160    /// Set a new location for the table
161    SetLocation {
162        /// New Location
163        location: String,
164    },
165    /// Set table properties
166    SetProperties {
167        /// Properties to set
168        updates: HashMap<String, String>,
169    },
170    /// Remove table properties
171    RemoveProperties {
172        /// Properties to remove
173        removals: Vec<String>,
174    },
175}
176
177/// Requirements that must be met before applying updates to table metadata
178///
179/// This enum defines preconditions that must be satisfied before a table update
180/// can be committed. Requirements are checked atomically to prevent concurrent
181/// modifications from corrupting table state.
182///
183/// # Requirements Types
184/// * Table existence checks
185/// * UUID validation
186/// * Reference state validation
187/// * Schema and partition spec version checks
188/// * Sort order validation
189/// * Column and partition ID validation
190///
191/// Each variant includes the specific values that must match the current table state.
192#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
193#[serde(
194    tag = "type",
195    rename_all = "kebab-case",
196    rename_all_fields = "kebab-case"
197)]
198pub enum TableRequirement {
199    /// The table must not already exist; used for create transactions
200    AssertCreate,
201    /// The table UUID must match the requirement's `uuid`
202    AssertTableUuid {
203        /// Uuid to assert
204        uuid: Uuid,
205    },
206    /// The table branch or tag identified by the requirement's `ref` must reference the requirement's `snapshot-id`;
207    /// if `snapshot-id` is `null` or missing, the ref must not already exist
208    AssertRefSnapshotId {
209        /// Name of ref
210        r#ref: String,
211        /// Snapshot id
212        snapshot_id: i64,
213    },
214    /// The table's last assigned column id must match the requirement's `last-assigned-field-id`
215    AssertLastAssignedFieldId {
216        /// Id of last assigned id
217        last_assigned_field_id: i32,
218    },
219    /// The table's current schema id must match the requirement's `current-schema-id`
220    AssertCurrentSchemaId {
221        /// Current schema id
222        current_schema_id: i32,
223    },
224    ///The table's last assigned partition id must match the requirement's `last-assigned-partition-id`
225    AssertLastAssignedPartitionId {
226        /// id of last assigned partition
227        last_assigned_partition_id: i32,
228    },
229    /// The table's default spec id must match the requirement's `default-spec-id`
230    AssertDefaultSpecId {
231        /// Default spec id
232        default_spec_id: i32,
233    },
234    /// The table's default sort order id must match the requirement's `default-sort-order-id`
235    AssertDefaultSortOrderId {
236        /// Default sort order id
237        default_sort_order_id: i32,
238    },
239}
240
241/// Updates that can be applied to view metadata in a commit operation
242///
243/// This enum represents all possible modifications that can be made to view metadata:
244/// * UUID assignment (only during view creation)
245/// * Format version updates
246/// * Schema modifications
247/// * Location and property updates
248/// * Version management (add versions, set current version)
249///
250/// # Type Parameters
251/// * `T` - The materialization type for the view, typically `Option<()>` for regular views
252///   or `FullIdentifier` for materialized views
253///
254/// Each variant includes the necessary data for that specific update type.
255/// Updates are applied atomically as part of a commit operation.
256#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
257#[serde(
258    tag = "action",
259    rename_all = "kebab-case",
260    rename_all_fields = "kebab-case"
261)]
262pub enum ViewUpdate<T: Materialization> {
263    /// Assigning a UUID to a table/view should only be done when creating the table/view. It is not safe to re-assign the UUID if a table/view already has a UUID assigned
264    AssignUUID {
265        /// new uuid
266        uuid: String,
267    },
268    /// Update format version
269    UpgradeFormatVersion {
270        /// New format version
271        format_version: i32,
272    },
273    /// The highest assigned column ID for the table. This is used to ensure columns are always assigned an unused ID when evolving schemas. When omitted, it will be computed on the server side.
274    AddSchema {
275        /// Schema to add
276        schema: Schema,
277        /// New last column id
278        last_column_id: Option<i32>,
279    },
280    /// Set a new location for the table
281    SetLocation {
282        /// New Location
283        location: String,
284    },
285    /// Set table properties
286    SetProperties {
287        /// Properties to set
288        updates: HashMap<String, String>,
289    },
290    /// Remove table properties
291    RemoveProperties {
292        /// Properties to remove
293        removals: Vec<String>,
294    },
295    /// Add a new version to the view
296    AddViewVersion {
297        /// Version to add
298        view_version: Version<T>,
299    },
300    /// The view version id to set as current, or -1 to set last added view version id
301    SetCurrentViewVersion {
302        /// The id to set
303        view_version_id: i64,
304    },
305}
306
307/// Requirements that must be met before applying updates to view metadata
308///
309/// This enum defines preconditions that must be satisfied before a view update
310/// can be committed. Requirements are checked atomically to prevent concurrent
311/// modifications from corrupting view state.
312///
313/// # Requirements Types
314/// * UUID validation - Ensures view UUID matches expected value
315///
316/// Each variant includes the specific values that must match the current view state.
317#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
318#[serde(
319    tag = "type",
320    rename_all = "kebab-case",
321    rename_all_fields = "kebab-case"
322)]
323pub enum ViewRequirement {
324    /// The view UUID must match the requirement's `uuid`
325    AssertViewUuid {
326        /// Uuid to assert
327        uuid: Uuid,
328    },
329}
330/// Validates that table metadata meets all specified requirements
331///
332/// This function checks if the current table metadata satisfies all the requirements
333/// specified for a commit operation. It ensures atomic updates by verifying preconditions
334/// like UUID matches, snapshot references, schema versions etc.
335///
336/// # Arguments
337/// * `requirements` - List of requirements that must be satisfied
338/// * `metadata` - Current table metadata to validate against
339///
340/// # Returns
341/// * `true` if all requirements are met
342/// * `false` if any requirement is not satisfied
343pub fn check_table_requirements(
344    requirements: &[TableRequirement],
345    metadata: &TableMetadata,
346) -> bool {
347    requirements.iter().all(|x| match x {
348        // Assert create has to be check in another place
349        TableRequirement::AssertCreate => true,
350        TableRequirement::AssertTableUuid { uuid } => metadata.table_uuid == *uuid,
351        TableRequirement::AssertRefSnapshotId { r#ref, snapshot_id } => metadata
352            .refs
353            .get(r#ref)
354            .map(|id| id.snapshot_id == *snapshot_id)
355            .unwrap_or(false),
356        TableRequirement::AssertLastAssignedFieldId {
357            last_assigned_field_id,
358        } => metadata.last_column_id == *last_assigned_field_id,
359        TableRequirement::AssertCurrentSchemaId { current_schema_id } => {
360            metadata.current_schema_id == *current_schema_id
361        }
362        TableRequirement::AssertLastAssignedPartitionId {
363            last_assigned_partition_id,
364        } => metadata.last_partition_id == *last_assigned_partition_id,
365        TableRequirement::AssertDefaultSpecId { default_spec_id } => {
366            metadata.default_spec_id == *default_spec_id
367        }
368        TableRequirement::AssertDefaultSortOrderId {
369            default_sort_order_id,
370        } => metadata.default_sort_order_id == *default_sort_order_id,
371    })
372}
373
374/// Validates that view metadata meets all specified requirements
375///
376/// This function checks if the current view metadata satisfies all the requirements
377/// specified for a commit operation. It ensures atomic updates by verifying preconditions
378/// like UUID matches.
379///
380/// # Type Parameters
381/// * `T` - The materialization type for the view, must implement Materialization, Eq and 'static
382///
383/// # Arguments
384/// * `requirements` - List of requirements that must be satisfied
385/// * `metadata` - Current view metadata to validate against
386///
387/// # Returns
388/// * `true` if all requirements are met
389/// * `false` if any requirement is not satisfied
390pub fn check_view_requirements<T: Materialization + Eq + 'static>(
391    requirements: &[ViewRequirement],
392    metadata: &GeneralViewMetadata<T>,
393) -> bool {
394    requirements.iter().all(|x| match x {
395        ViewRequirement::AssertViewUuid { uuid } => metadata.view_uuid == *uuid,
396    })
397}
398/// Applies a sequence of updates to table metadata
399///
400/// This function processes each update in order, modifying the table metadata accordingly.
401/// Updates can include:
402/// * Format version changes
403/// * UUID assignments
404/// * Schema modifications
405/// * Partition spec and sort order changes
406/// * Snapshot management
407/// * Location and property updates
408///
409/// # Arguments
410/// * `metadata` - Mutable reference to table metadata to modify
411/// * `updates` - Vector of updates to apply
412///
413/// # Returns
414/// * `Ok(())` if all updates were applied successfully
415/// * `Err(Error)` if any update failed to apply
416pub fn apply_table_updates(
417    metadata: &mut TableMetadata,
418    updates: Vec<TableUpdate>,
419) -> Result<(), Error> {
420    for update in updates {
421        match update {
422            TableUpdate::UpgradeFormatVersion { format_version: _ } => {
423                unimplemented!();
424            }
425            TableUpdate::AssignUUID { uuid } => {
426                metadata.table_uuid = Uuid::parse_str(&uuid)?;
427            }
428            TableUpdate::AddSchema {
429                schema,
430                last_column_id,
431            } => {
432                metadata.schemas.insert(*schema.schema_id(), schema);
433                if let Some(last_column_id) = last_column_id {
434                    metadata.last_column_id = last_column_id;
435                }
436            }
437            TableUpdate::SetCurrentSchema { schema_id } => {
438                metadata.current_schema_id = schema_id;
439            }
440            TableUpdate::AddPartitionSpec { spec } => {
441                metadata.partition_specs.insert(*spec.spec_id(), spec);
442            }
443            TableUpdate::SetDefaultSpec { spec_id } => {
444                metadata.default_spec_id = spec_id;
445            }
446            TableUpdate::AddSortOrder { sort_order } => {
447                metadata.sort_orders.insert(sort_order.order_id, sort_order);
448            }
449            TableUpdate::SetDefaultSortOrder { sort_order_id } => {
450                metadata.default_sort_order_id = sort_order_id;
451            }
452            TableUpdate::AddSnapshot { snapshot } => {
453                metadata.snapshot_log.push(SnapshotLog {
454                    snapshot_id: *snapshot.snapshot_id(),
455                    timestamp_ms: *snapshot.timestamp_ms(),
456                });
457                metadata.last_sequence_number = *snapshot.sequence_number();
458                metadata.snapshots.insert(*snapshot.snapshot_id(), snapshot);
459            }
460            TableUpdate::SetSnapshotRef {
461                ref_name,
462                snapshot_reference,
463            } => {
464                if ref_name == "main" {
465                    metadata.current_snapshot_id = Some(snapshot_reference.snapshot_id);
466                }
467                metadata.refs.insert(ref_name, snapshot_reference);
468            }
469            TableUpdate::RemoveSnapshots { snapshot_ids } => {
470                for id in snapshot_ids {
471                    metadata.snapshots.remove(&id);
472                }
473            }
474            TableUpdate::RemoveSnapshotRef { ref_name } => {
475                metadata.refs.remove(&ref_name);
476            }
477            TableUpdate::SetLocation { location } => {
478                metadata.location = location;
479            }
480            TableUpdate::SetProperties { updates } => {
481                metadata.properties.extend(updates);
482            }
483            TableUpdate::RemoveProperties { removals } => {
484                for rem in removals {
485                    metadata.properties.remove(&rem);
486                }
487            }
488        };
489    }
490    Ok(())
491}
492
493/// Applies a sequence of updates to view metadata
494///
495/// This function processes each update in order, modifying the view metadata accordingly.
496/// Updates can include:
497/// * Format version changes
498/// * UUID assignments
499/// * Schema modifications
500/// * Location and property updates
501/// * Version management
502///
503/// # Type Parameters
504/// * `T` - The materialization type for the view, must implement Materialization + 'static
505///
506/// # Arguments
507/// * `metadata` - Mutable reference to view metadata to modify
508/// * `updates` - Vector of updates to apply
509///
510/// # Returns
511/// * `Ok(())` if all updates were applied successfully
512/// * `Err(Error)` if any update failed to apply
513pub fn apply_view_updates<T: Materialization + 'static>(
514    metadata: &mut GeneralViewMetadata<T>,
515    updates: Vec<ViewUpdate<T>>,
516) -> Result<(), Error> {
517    for update in updates {
518        match update {
519            ViewUpdate::UpgradeFormatVersion { format_version: _ } => {
520                unimplemented!();
521            }
522            ViewUpdate::AssignUUID { uuid } => {
523                metadata.view_uuid = Uuid::parse_str(&uuid)?;
524            }
525            ViewUpdate::AddSchema {
526                schema,
527                last_column_id: _,
528            } => {
529                metadata.schemas.insert(*schema.schema_id(), schema);
530            }
531            ViewUpdate::SetLocation { location } => {
532                metadata.location = location;
533            }
534            ViewUpdate::SetProperties { updates } => {
535                metadata.properties.extend(updates);
536            }
537            ViewUpdate::RemoveProperties { removals } => {
538                for rem in removals {
539                    metadata.properties.remove(&rem);
540                }
541            }
542            ViewUpdate::AddViewVersion { view_version } => {
543                metadata
544                    .versions
545                    .insert(view_version.version_id, view_version);
546            }
547            ViewUpdate::SetCurrentViewVersion { view_version_id } => {
548                metadata.current_version_id = view_version_id;
549            }
550        };
551    }
552    Ok(())
553}