iceberg_rust/catalog/
commit.rs

1//! Commit operations for atomic metadata updates in Iceberg catalogs
2//!
3//! This module provides the core functionality for atomic commits to Iceberg tables and views:
4//! * Commit structures for tables and views
5//! * Update operations that can be applied
6//! * Requirements that must be satisfied
7//! * Functions to validate requirements and apply updates
8//!
9//! All changes are made atomically - either all updates succeed or none are applied.
10//! Requirements are checked first to ensure concurrent modifications don't corrupt state.
11
12use iceberg_rust_spec::{
13    spec::{
14        partition::PartitionSpec,
15        schema::Schema,
16        snapshot::{Snapshot, SnapshotReference},
17        sort::SortOrder,
18        table_metadata::TableMetadata,
19        view_metadata::{GeneralViewMetadata, Version},
20    },
21    table_metadata::SnapshotLog,
22    view_metadata::Materialization,
23};
24use serde_derive::{Deserialize, Serialize};
25use std::collections::HashMap;
26use std::time::{SystemTime, UNIX_EPOCH};
27use uuid::Uuid;
28
29use crate::error::Error;
30
31use super::identifier::Identifier;
32
33/// A commit operation to update table metadata in an Iceberg catalog
34///
35/// This struct represents an atomic commit operation that can:
36/// * Update table metadata
37/// * Add or remove snapshots
38/// * Modify schema, partition specs, and sort orders
39///
40/// The commit includes both requirements that must be satisfied and
41/// a list of updates to apply atomically.
42#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
43pub struct CommitTable {
44    /// Table identifier
45    pub identifier: Identifier,
46    /// Assertions about the metadata that must be true to update the metadata
47    pub requirements: Vec<TableRequirement>,
48    /// Changes to the table metadata
49    pub updates: Vec<TableUpdate>,
50}
51
52/// A commit operation to update view metadata in an Iceberg catalog
53///
54/// This struct represents an atomic commit operation that can:
55/// * Update view metadata
56/// * Add or modify schemas
57/// * Update view versions
58/// * Modify location and properties
59///
60/// The commit includes both requirements that must be satisfied and
61/// a list of updates to apply atomically.
62///
63/// # Type Parameters
64/// * `T` - The materialization type for the view, typically `Option<()>` for regular views
65///   or custom types for materialized views
66#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
67pub struct CommitView<T: Materialization> {
68    /// Table identifier
69    pub identifier: Identifier,
70    /// Assertions about the metadata that must be true to update the metadata
71    pub requirements: Vec<ViewRequirement>,
72    /// Changes to the table metadata
73    pub updates: Vec<ViewUpdate<T>>,
74}
75
76/// Updates that can be applied to table metadata in a commit operation
77///
78/// This enum represents all possible modifications that can be made to table metadata:
79/// * UUID assignment (only during table creation)
80/// * Format version updates
81/// * Schema modifications
82/// * Partition spec and sort order changes
83/// * Snapshot management (add, remove, set references)
84/// * Location and property updates
85///
86/// Each variant includes the necessary data for that specific update type.
87/// Updates are applied atomically as part of a commit operation.
88#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
89#[serde(
90    tag = "action",
91    rename_all = "kebab-case",
92    rename_all_fields = "kebab-case"
93)]
94pub enum TableUpdate {
95    /// Assigning a UUID to a table/view should only be done when creating the table/view. It is not safe to re-assign the UUID if a table/view already has a UUID assigned
96    AssignUuid {
97        /// new uuid
98        uuid: String,
99    },
100    /// Update format version
101    UpgradeFormatVersion {
102        /// New format version
103        format_version: i32,
104    },
105    /// The highest assigned column ID for the table. This is used to ensure columns are always assigned an unused ID when evolving schemas. When omitted, it will be computed on the server side.
106    AddSchema {
107        /// Schema to add
108        schema: Schema,
109        /// New last column id
110        last_column_id: Option<i32>,
111    },
112    /// Schema ID to set as current, or -1 to set last added schema
113    SetCurrentSchema {
114        /// New schema_id
115        schema_id: i32,
116    },
117    /// Add new partition spec
118    AddSpec {
119        /// New partition spec
120        spec: PartitionSpec,
121    },
122    /// Partition spec ID to set as the default, or -1 to set last added spec
123    SetDefaultSpec {
124        /// Spec id to set
125        spec_id: i32,
126    },
127    /// Add a new sort order
128    AddSortOrder {
129        /// New sort order
130        sort_order: SortOrder,
131    },
132    /// Sort order ID to set as the default, or -1 to set last added sort order
133    SetDefaultSortOrder {
134        /// Sort order id to set
135        sort_order_id: i32,
136    },
137    /// Add a new snapshot
138    AddSnapshot {
139        /// New snapshot
140        snapshot: Snapshot,
141    },
142    /// Set the current snapshot reference
143    SetSnapshotRef {
144        /// Name of the snapshot refrence
145        ref_name: String,
146        /// Snapshot refernce to set
147        #[serde(flatten)]
148        snapshot_reference: SnapshotReference,
149    },
150    /// Remove snapshots with certain snapshot ids
151    RemoveSnapshots {
152        /// Ids of the snapshots to remove
153        snapshot_ids: Vec<i64>,
154    },
155    /// Remove snapshot reference
156    RemoveSnapshotRef {
157        /// Name of the snapshot ref to remove
158        ref_name: String,
159    },
160    /// Set a new location for the table
161    SetLocation {
162        /// New Location
163        location: String,
164    },
165    /// Set table properties
166    SetProperties {
167        /// Properties to set
168        updates: HashMap<String, String>,
169    },
170    /// Remove table properties
171    RemoveProperties {
172        /// Properties to remove
173        removals: Vec<String>,
174    },
175}
176
177/// Requirements that must be met before applying updates to table metadata
178///
179/// This enum defines preconditions that must be satisfied before a table update
180/// can be committed. Requirements are checked atomically to prevent concurrent
181/// modifications from corrupting table state.
182///
183/// # Requirements Types
184/// * Table existence checks
185/// * UUID validation
186/// * Reference state validation
187/// * Schema and partition spec version checks
188/// * Sort order validation
189/// * Column and partition ID validation
190///
191/// Each variant includes the specific values that must match the current table state.
192#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
193#[serde(
194    tag = "type",
195    rename_all = "kebab-case",
196    rename_all_fields = "kebab-case"
197)]
198pub enum TableRequirement {
199    /// The table must not already exist; used for create transactions
200    AssertCreate,
201    /// The table UUID must match the requirement's `uuid`
202    AssertTableUuid {
203        /// Uuid to assert
204        uuid: Uuid,
205    },
206    /// The table branch or tag identified by the requirement's `ref` must reference the requirement's `snapshot-id`;
207    /// if `snapshot-id` is `null` or missing, the ref must not already exist
208    AssertRefSnapshotId {
209        /// Name of ref
210        r#ref: String,
211        /// Snapshot id
212        snapshot_id: i64,
213    },
214    /// The table's last assigned column id must match the requirement's `last-assigned-field-id`
215    AssertLastAssignedFieldId {
216        /// Id of last assigned id
217        last_assigned_field_id: i32,
218    },
219    /// The table's current schema id must match the requirement's `current-schema-id`
220    AssertCurrentSchemaId {
221        /// Current schema id
222        current_schema_id: i32,
223    },
224    ///The table's last assigned partition id must match the requirement's `last-assigned-partition-id`
225    AssertLastAssignedPartitionId {
226        /// id of last assigned partition
227        last_assigned_partition_id: i32,
228    },
229    /// The table's default spec id must match the requirement's `default-spec-id`
230    AssertDefaultSpecId {
231        /// Default spec id
232        default_spec_id: i32,
233    },
234    /// The table's default sort order id must match the requirement's `default-sort-order-id`
235    AssertDefaultSortOrderId {
236        /// Default sort order id
237        default_sort_order_id: i32,
238    },
239}
240
241/// Updates that can be applied to view metadata in a commit operation
242///
243/// This enum represents all possible modifications that can be made to view metadata:
244/// * UUID assignment (only during view creation)
245/// * Format version updates
246/// * Schema modifications
247/// * Location and property updates
248/// * Version management (add versions, set current version)
249///
250/// # Type Parameters
251/// * `T` - The materialization type for the view, typically `Option<()>` for regular views
252///   or `FullIdentifier` for materialized views
253///
254/// Each variant includes the necessary data for that specific update type.
255/// Updates are applied atomically as part of a commit operation.
256#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
257#[serde(
258    tag = "action",
259    rename_all = "kebab-case",
260    rename_all_fields = "kebab-case"
261)]
262pub enum ViewUpdate<T: Materialization> {
263    /// Assigning a UUID to a table/view should only be done when creating the table/view. It is not safe to re-assign the UUID if a table/view already has a UUID assigned
264    AssignUuid {
265        /// new uuid
266        uuid: String,
267    },
268    /// Update format version
269    UpgradeFormatVersion {
270        /// New format version
271        format_version: i32,
272    },
273    /// The highest assigned column ID for the table. This is used to ensure columns are always assigned an unused ID when evolving schemas. When omitted, it will be computed on the server side.
274    AddSchema {
275        /// Schema to add
276        schema: Schema,
277        /// New last column id
278        last_column_id: Option<i32>,
279    },
280    /// Set a new location for the table
281    SetLocation {
282        /// New Location
283        location: String,
284    },
285    /// Set table properties
286    SetProperties {
287        /// Properties to set
288        updates: HashMap<String, String>,
289    },
290    /// Remove table properties
291    RemoveProperties {
292        /// Properties to remove
293        removals: Vec<String>,
294    },
295    /// Add a new version to the view
296    AddViewVersion {
297        /// Version to add
298        view_version: Version<T>,
299    },
300    /// The view version id to set as current, or -1 to set last added view version id
301    SetCurrentViewVersion {
302        /// The id to set
303        view_version_id: i64,
304    },
305}
306
307/// Requirements that must be met before applying updates to view metadata
308///
309/// This enum defines preconditions that must be satisfied before a view update
310/// can be committed. Requirements are checked atomically to prevent concurrent
311/// modifications from corrupting view state.
312///
313/// # Requirements Types
314/// * UUID validation - Ensures view UUID matches expected value
315///
316/// Each variant includes the specific values that must match the current view state.
317#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
318#[serde(
319    tag = "type",
320    rename_all = "kebab-case",
321    rename_all_fields = "kebab-case"
322)]
323pub enum ViewRequirement {
324    /// The view UUID must match the requirement's `uuid`
325    AssertViewUuid {
326        /// Uuid to assert
327        uuid: Uuid,
328    },
329}
330/// Validates that table metadata meets all specified requirements
331///
332/// This function checks if the current table metadata satisfies all the requirements
333/// specified for a commit operation. It ensures atomic updates by verifying preconditions
334/// like UUID matches, snapshot references, schema versions etc.
335///
336/// # Arguments
337/// * `requirements` - List of requirements that must be satisfied
338/// * `metadata` - Current table metadata to validate against
339///
340/// # Returns
341/// * `true` if all requirements are met
342/// * `false` if any requirement is not satisfied
343pub fn check_table_requirements(
344    requirements: &[TableRequirement],
345    metadata: &TableMetadata,
346) -> bool {
347    requirements.iter().all(|x| match x {
348        // Assert create has to be check in another place
349        TableRequirement::AssertCreate => true,
350        TableRequirement::AssertTableUuid { uuid } => metadata.table_uuid == *uuid,
351        TableRequirement::AssertRefSnapshotId { r#ref, snapshot_id } => metadata
352            .refs
353            .get(r#ref)
354            .map(|id| id.snapshot_id == *snapshot_id)
355            .unwrap_or(false),
356        TableRequirement::AssertLastAssignedFieldId {
357            last_assigned_field_id,
358        } => metadata.last_column_id == *last_assigned_field_id,
359        TableRequirement::AssertCurrentSchemaId { current_schema_id } => {
360            metadata.current_schema_id == *current_schema_id
361        }
362        TableRequirement::AssertLastAssignedPartitionId {
363            last_assigned_partition_id,
364        } => metadata.last_partition_id == *last_assigned_partition_id,
365        TableRequirement::AssertDefaultSpecId { default_spec_id } => {
366            metadata.default_spec_id == *default_spec_id
367        }
368        TableRequirement::AssertDefaultSortOrderId {
369            default_sort_order_id,
370        } => metadata.default_sort_order_id == *default_sort_order_id,
371    })
372}
373
374/// Validates that view metadata meets all specified requirements
375///
376/// This function checks if the current view metadata satisfies all the requirements
377/// specified for a commit operation. It ensures atomic updates by verifying preconditions
378/// like UUID matches.
379///
380/// # Type Parameters
381/// * `T` - The materialization type for the view, must implement Materialization, Eq and 'static
382///
383/// # Arguments
384/// * `requirements` - List of requirements that must be satisfied
385/// * `metadata` - Current view metadata to validate against
386///
387/// # Returns
388/// * `true` if all requirements are met
389/// * `false` if any requirement is not satisfied
390pub fn check_view_requirements<T: Materialization + Eq + 'static>(
391    requirements: &[ViewRequirement],
392    metadata: &GeneralViewMetadata<T>,
393) -> bool {
394    requirements.iter().all(|x| match x {
395        ViewRequirement::AssertViewUuid { uuid } => metadata.view_uuid == *uuid,
396    })
397}
398/// Applies a sequence of updates to table metadata
399///
400/// This function processes each update in order, modifying the table metadata accordingly.
401/// Updates can include:
402/// * Format version changes
403/// * UUID assignments
404/// * Schema modifications
405/// * Partition spec and sort order changes
406/// * Snapshot management
407/// * Location and property updates
408///
409/// # Arguments
410/// * `metadata` - Mutable reference to table metadata to modify
411/// * `updates` - Vector of updates to apply
412///
413/// # Returns
414/// * `Ok(())` if all updates were applied successfully
415/// * `Err(Error)` if any update failed to apply
416pub fn apply_table_updates(
417    metadata: &mut TableMetadata,
418    updates: Vec<TableUpdate>,
419) -> Result<(), Error> {
420    let mut added_schema_id = None;
421    let mut added_spec_id = None;
422    let mut added_sort_order_id = None;
423    for update in updates {
424        match update {
425            TableUpdate::UpgradeFormatVersion { format_version } => {
426                if i32::from(metadata.format_version) != format_version {
427                    unimplemented!("Table format upgrade");
428                }
429            }
430            TableUpdate::AssignUuid { uuid } => {
431                metadata.table_uuid = Uuid::parse_str(&uuid)?;
432            }
433            TableUpdate::AddSchema {
434                schema,
435                last_column_id,
436            } => {
437                let schema_id = *schema.schema_id();
438                metadata.schemas.insert(schema_id, schema);
439                added_schema_id = Some(schema_id);
440                if let Some(last_column_id) = last_column_id {
441                    metadata.last_column_id = last_column_id;
442                }
443            }
444            TableUpdate::SetCurrentSchema { schema_id } => {
445                if schema_id == -1 {
446                    if let Some(added_schema_id) = added_schema_id {
447                        metadata.current_schema_id = added_schema_id;
448                    } else {
449                        return Err(Error::InvalidFormat(
450                            "Cannot set current schema to -1 without adding a schema first"
451                                .to_string(),
452                        ));
453                    }
454                } else {
455                    metadata.current_schema_id = schema_id;
456                }
457            }
458            TableUpdate::AddSpec { spec } => {
459                let spec_id = *spec.spec_id();
460                metadata.partition_specs.insert(spec_id, spec);
461                added_spec_id = Some(spec_id);
462            }
463            TableUpdate::SetDefaultSpec { spec_id } => {
464                if spec_id == -1 {
465                    if let Some(added_spec_id) = added_spec_id {
466                        metadata.default_spec_id = added_spec_id;
467                    } else {
468                        return Err(Error::InvalidFormat(
469                            "Cannot set default spec to -1 without adding a spec first".to_string(),
470                        ));
471                    }
472                } else {
473                    metadata.default_spec_id = spec_id;
474                }
475            }
476            TableUpdate::AddSortOrder { sort_order } => {
477                let sort_order_id = sort_order.order_id;
478                metadata.sort_orders.insert(sort_order_id, sort_order);
479                added_sort_order_id = Some(sort_order_id);
480            }
481            TableUpdate::SetDefaultSortOrder { sort_order_id } => {
482                if sort_order_id == -1 {
483                    if let Some(added_sort_order_id) = added_sort_order_id {
484                        metadata.default_sort_order_id = added_sort_order_id;
485                    } else {
486                        return Err(Error::InvalidFormat(
487                            "Cannot set default sort order to -1 without adding a sort order first"
488                                .to_string(),
489                        ));
490                    }
491                } else {
492                    metadata.default_sort_order_id = sort_order_id;
493                }
494            }
495            TableUpdate::AddSnapshot { snapshot } => {
496                metadata.snapshot_log.push(SnapshotLog {
497                    snapshot_id: *snapshot.snapshot_id(),
498                    timestamp_ms: *snapshot.timestamp_ms(),
499                });
500                metadata.last_sequence_number = *snapshot.sequence_number();
501                metadata.snapshots.insert(*snapshot.snapshot_id(), snapshot);
502            }
503            TableUpdate::SetSnapshotRef {
504                ref_name,
505                snapshot_reference,
506            } => {
507                if ref_name == "main" {
508                    metadata.current_snapshot_id = Some(snapshot_reference.snapshot_id);
509                }
510                metadata.refs.insert(ref_name, snapshot_reference);
511            }
512            TableUpdate::RemoveSnapshots { snapshot_ids } => {
513                for id in snapshot_ids {
514                    metadata.snapshots.remove(&id);
515                }
516            }
517            TableUpdate::RemoveSnapshotRef { ref_name } => {
518                metadata.refs.remove(&ref_name);
519            }
520            TableUpdate::SetLocation { location } => {
521                metadata.location = location;
522            }
523            TableUpdate::SetProperties { updates } => {
524                metadata.properties.extend(updates);
525            }
526            TableUpdate::RemoveProperties { removals } => {
527                for rem in removals {
528                    metadata.properties.remove(&rem);
529                }
530            }
531        };
532    }
533
534    // Lastly make sure `last-updated-ms` field is up-to-date
535    metadata.last_updated_ms = SystemTime::now()
536        .duration_since(UNIX_EPOCH)
537        .unwrap()
538        .as_millis() as i64;
539    Ok(())
540}
541
542/// Applies a sequence of updates to view metadata
543///
544/// This function processes each update in order, modifying the view metadata accordingly.
545/// Updates can include:
546/// * Format version changes
547/// * UUID assignments
548/// * Schema modifications
549/// * Location and property updates
550/// * Version management
551///
552/// # Type Parameters
553/// * `T` - The materialization type for the view, must implement Materialization + 'static
554///
555/// # Arguments
556/// * `metadata` - Mutable reference to view metadata to modify
557/// * `updates` - Vector of updates to apply
558///
559/// # Returns
560/// * `Ok(())` if all updates were applied successfully
561/// * `Err(Error)` if any update failed to apply
562pub fn apply_view_updates<T: Materialization + 'static>(
563    metadata: &mut GeneralViewMetadata<T>,
564    updates: Vec<ViewUpdate<T>>,
565) -> Result<(), Error> {
566    for update in updates {
567        match update {
568            ViewUpdate::UpgradeFormatVersion { format_version } => {
569                if i32::from(metadata.format_version.clone()) != format_version {
570                    unimplemented!("Upgrade of format version");
571                }
572            }
573            ViewUpdate::AssignUuid { uuid } => {
574                metadata.view_uuid = Uuid::parse_str(&uuid)?;
575            }
576            ViewUpdate::AddSchema {
577                schema,
578                last_column_id: _,
579            } => {
580                metadata.schemas.insert(*schema.schema_id(), schema);
581            }
582            ViewUpdate::SetLocation { location } => {
583                metadata.location = location;
584            }
585            ViewUpdate::SetProperties { updates } => {
586                metadata.properties.extend(updates);
587            }
588            ViewUpdate::RemoveProperties { removals } => {
589                for rem in removals {
590                    metadata.properties.remove(&rem);
591                }
592            }
593            ViewUpdate::AddViewVersion { view_version } => {
594                metadata
595                    .versions
596                    .insert(view_version.version_id, view_version);
597            }
598            ViewUpdate::SetCurrentViewVersion { view_version_id } => {
599                metadata.current_version_id = view_version_id;
600            }
601        };
602    }
603    Ok(())
604}