iceberg_rust/catalog/commit.rs
1//! Commit operations for atomic metadata updates in Iceberg catalogs
2//!
3//! This module provides the core functionality for atomic commits to Iceberg tables and views:
4//! * Commit structures for tables and views
5//! * Update operations that can be applied
6//! * Requirements that must be satisfied
7//! * Functions to validate requirements and apply updates
8//!
9//! All changes are made atomically - either all updates succeed or none are applied.
10//! Requirements are checked first to ensure concurrent modifications don't corrupt state.
11
12use iceberg_rust_spec::{
13 spec::{
14 partition::PartitionSpec,
15 schema::Schema,
16 snapshot::{Snapshot, SnapshotReference},
17 sort::SortOrder,
18 table_metadata::TableMetadata,
19 view_metadata::{GeneralViewMetadata, Version},
20 },
21 table_metadata::SnapshotLog,
22 view_metadata::Materialization,
23};
24use serde_derive::{Deserialize, Serialize};
25use std::collections::HashMap;
26use std::time::{SystemTime, UNIX_EPOCH};
27use uuid::Uuid;
28
29use crate::error::Error;
30
31use super::identifier::Identifier;
32
33/// A commit operation to update table metadata in an Iceberg catalog
34///
35/// This struct represents an atomic commit operation that can:
36/// * Update table metadata
37/// * Add or remove snapshots
38/// * Modify schema, partition specs, and sort orders
39///
40/// The commit includes both requirements that must be satisfied and
41/// a list of updates to apply atomically.
42#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
43pub struct CommitTable {
44 /// Table identifier
45 pub identifier: Identifier,
46 /// Assertions about the metadata that must be true to update the metadata
47 pub requirements: Vec<TableRequirement>,
48 /// Changes to the table metadata
49 pub updates: Vec<TableUpdate>,
50}
51
52/// A commit operation to update view metadata in an Iceberg catalog
53///
54/// This struct represents an atomic commit operation that can:
55/// * Update view metadata
56/// * Add or modify schemas
57/// * Update view versions
58/// * Modify location and properties
59///
60/// The commit includes both requirements that must be satisfied and
61/// a list of updates to apply atomically.
62///
63/// # Type Parameters
64/// * `T` - The materialization type for the view, typically `Option<()>` for regular views
65/// or custom types for materialized views
66#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
67pub struct CommitView<T: Materialization> {
68 /// Table identifier
69 pub identifier: Identifier,
70 /// Assertions about the metadata that must be true to update the metadata
71 pub requirements: Vec<ViewRequirement>,
72 /// Changes to the table metadata
73 pub updates: Vec<ViewUpdate<T>>,
74}
75
76/// Updates that can be applied to table metadata in a commit operation
77///
78/// This enum represents all possible modifications that can be made to table metadata:
79/// * UUID assignment (only during table creation)
80/// * Format version updates
81/// * Schema modifications
82/// * Partition spec and sort order changes
83/// * Snapshot management (add, remove, set references)
84/// * Location and property updates
85///
86/// Each variant includes the necessary data for that specific update type.
87/// Updates are applied atomically as part of a commit operation.
88#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
89#[serde(
90 tag = "action",
91 rename_all = "kebab-case",
92 rename_all_fields = "kebab-case"
93)]
94pub enum TableUpdate {
95 /// Assigning a UUID to a table/view should only be done when creating the table/view. It is not safe to re-assign the UUID if a table/view already has a UUID assigned
96 AssignUuid {
97 /// new uuid
98 uuid: String,
99 },
100 /// Update format version
101 UpgradeFormatVersion {
102 /// New format version
103 format_version: i32,
104 },
105 /// The highest assigned column ID for the table. This is used to ensure columns are always assigned an unused ID when evolving schemas. When omitted, it will be computed on the server side.
106 AddSchema {
107 /// Schema to add
108 schema: Schema,
109 /// New last column id
110 last_column_id: Option<i32>,
111 },
112 /// Schema ID to set as current, or -1 to set last added schema
113 SetCurrentSchema {
114 /// New schema_id
115 schema_id: i32,
116 },
117 /// Add new partition spec
118 AddSpec {
119 /// New partition spec
120 spec: PartitionSpec,
121 },
122 /// Partition spec ID to set as the default, or -1 to set last added spec
123 SetDefaultSpec {
124 /// Spec id to set
125 spec_id: i32,
126 },
127 /// Add a new sort order
128 AddSortOrder {
129 /// New sort order
130 sort_order: SortOrder,
131 },
132 /// Sort order ID to set as the default, or -1 to set last added sort order
133 SetDefaultSortOrder {
134 /// Sort order id to set
135 sort_order_id: i32,
136 },
137 /// Add a new snapshot
138 AddSnapshot {
139 /// New snapshot
140 snapshot: Snapshot,
141 },
142 /// Set the current snapshot reference
143 SetSnapshotRef {
144 /// Name of the snapshot refrence
145 ref_name: String,
146 /// Snapshot refernce to set
147 #[serde(flatten)]
148 snapshot_reference: SnapshotReference,
149 },
150 /// Remove snapshots with certain snapshot ids
151 RemoveSnapshots {
152 /// Ids of the snapshots to remove
153 snapshot_ids: Vec<i64>,
154 },
155 /// Remove snapshot reference
156 RemoveSnapshotRef {
157 /// Name of the snapshot ref to remove
158 ref_name: String,
159 },
160 /// Set a new location for the table
161 SetLocation {
162 /// New Location
163 location: String,
164 },
165 /// Set table properties
166 SetProperties {
167 /// Properties to set
168 updates: HashMap<String, String>,
169 },
170 /// Remove table properties
171 RemoveProperties {
172 /// Properties to remove
173 removals: Vec<String>,
174 },
175}
176
177/// Requirements that must be met before applying updates to table metadata
178///
179/// This enum defines preconditions that must be satisfied before a table update
180/// can be committed. Requirements are checked atomically to prevent concurrent
181/// modifications from corrupting table state.
182///
183/// # Requirements Types
184/// * Table existence checks
185/// * UUID validation
186/// * Reference state validation
187/// * Schema and partition spec version checks
188/// * Sort order validation
189/// * Column and partition ID validation
190///
191/// Each variant includes the specific values that must match the current table state.
192#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
193#[serde(
194 tag = "type",
195 rename_all = "kebab-case",
196 rename_all_fields = "kebab-case"
197)]
198pub enum TableRequirement {
199 /// The table must not already exist; used for create transactions
200 AssertCreate,
201 /// The table UUID must match the requirement's `uuid`
202 AssertTableUuid {
203 /// Uuid to assert
204 uuid: Uuid,
205 },
206 /// The table branch or tag identified by the requirement's `ref` must reference the requirement's `snapshot-id`;
207 /// if `snapshot-id` is `null` or missing, the ref must not already exist
208 AssertRefSnapshotId {
209 /// Name of ref
210 r#ref: String,
211 /// Snapshot id
212 snapshot_id: i64,
213 },
214 /// The table's last assigned column id must match the requirement's `last-assigned-field-id`
215 AssertLastAssignedFieldId {
216 /// Id of last assigned id
217 last_assigned_field_id: i32,
218 },
219 /// The table's current schema id must match the requirement's `current-schema-id`
220 AssertCurrentSchemaId {
221 /// Current schema id
222 current_schema_id: i32,
223 },
224 ///The table's last assigned partition id must match the requirement's `last-assigned-partition-id`
225 AssertLastAssignedPartitionId {
226 /// id of last assigned partition
227 last_assigned_partition_id: i32,
228 },
229 /// The table's default spec id must match the requirement's `default-spec-id`
230 AssertDefaultSpecId {
231 /// Default spec id
232 default_spec_id: i32,
233 },
234 /// The table's default sort order id must match the requirement's `default-sort-order-id`
235 AssertDefaultSortOrderId {
236 /// Default sort order id
237 default_sort_order_id: i32,
238 },
239}
240
241/// Updates that can be applied to view metadata in a commit operation
242///
243/// This enum represents all possible modifications that can be made to view metadata:
244/// * UUID assignment (only during view creation)
245/// * Format version updates
246/// * Schema modifications
247/// * Location and property updates
248/// * Version management (add versions, set current version)
249///
250/// # Type Parameters
251/// * `T` - The materialization type for the view, typically `Option<()>` for regular views
252/// or `FullIdentifier` for materialized views
253///
254/// Each variant includes the necessary data for that specific update type.
255/// Updates are applied atomically as part of a commit operation.
256#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
257#[serde(
258 tag = "action",
259 rename_all = "kebab-case",
260 rename_all_fields = "kebab-case"
261)]
262pub enum ViewUpdate<T: Materialization> {
263 /// Assigning a UUID to a table/view should only be done when creating the table/view. It is not safe to re-assign the UUID if a table/view already has a UUID assigned
264 AssignUuid {
265 /// new uuid
266 uuid: String,
267 },
268 /// Update format version
269 UpgradeFormatVersion {
270 /// New format version
271 format_version: i32,
272 },
273 /// The highest assigned column ID for the table. This is used to ensure columns are always assigned an unused ID when evolving schemas. When omitted, it will be computed on the server side.
274 AddSchema {
275 /// Schema to add
276 schema: Schema,
277 /// New last column id
278 last_column_id: Option<i32>,
279 },
280 /// Set a new location for the table
281 SetLocation {
282 /// New Location
283 location: String,
284 },
285 /// Set table properties
286 SetProperties {
287 /// Properties to set
288 updates: HashMap<String, String>,
289 },
290 /// Remove table properties
291 RemoveProperties {
292 /// Properties to remove
293 removals: Vec<String>,
294 },
295 /// Add a new version to the view
296 AddViewVersion {
297 /// Version to add
298 view_version: Version<T>,
299 },
300 /// The view version id to set as current, or -1 to set last added view version id
301 SetCurrentViewVersion {
302 /// The id to set
303 view_version_id: i64,
304 },
305}
306
307/// Requirements that must be met before applying updates to view metadata
308///
309/// This enum defines preconditions that must be satisfied before a view update
310/// can be committed. Requirements are checked atomically to prevent concurrent
311/// modifications from corrupting view state.
312///
313/// # Requirements Types
314/// * UUID validation - Ensures view UUID matches expected value
315///
316/// Each variant includes the specific values that must match the current view state.
317#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
318#[serde(
319 tag = "type",
320 rename_all = "kebab-case",
321 rename_all_fields = "kebab-case"
322)]
323pub enum ViewRequirement {
324 /// The view UUID must match the requirement's `uuid`
325 AssertViewUuid {
326 /// Uuid to assert
327 uuid: Uuid,
328 },
329}
330/// Validates that table metadata meets all specified requirements
331///
332/// This function checks if the current table metadata satisfies all the requirements
333/// specified for a commit operation. It ensures atomic updates by verifying preconditions
334/// like UUID matches, snapshot references, schema versions etc.
335///
336/// # Arguments
337/// * `requirements` - List of requirements that must be satisfied
338/// * `metadata` - Current table metadata to validate against
339///
340/// # Returns
341/// * `true` if all requirements are met
342/// * `false` if any requirement is not satisfied
343pub fn check_table_requirements(
344 requirements: &[TableRequirement],
345 metadata: &TableMetadata,
346) -> bool {
347 requirements.iter().all(|x| match x {
348 // Assert create has to be check in another place
349 TableRequirement::AssertCreate => true,
350 TableRequirement::AssertTableUuid { uuid } => metadata.table_uuid == *uuid,
351 TableRequirement::AssertRefSnapshotId { r#ref, snapshot_id } => metadata
352 .refs
353 .get(r#ref)
354 .map(|id| id.snapshot_id == *snapshot_id)
355 .unwrap_or(false),
356 TableRequirement::AssertLastAssignedFieldId {
357 last_assigned_field_id,
358 } => metadata.last_column_id == *last_assigned_field_id,
359 TableRequirement::AssertCurrentSchemaId { current_schema_id } => {
360 metadata.current_schema_id == *current_schema_id
361 }
362 TableRequirement::AssertLastAssignedPartitionId {
363 last_assigned_partition_id,
364 } => metadata.last_partition_id == *last_assigned_partition_id,
365 TableRequirement::AssertDefaultSpecId { default_spec_id } => {
366 metadata.default_spec_id == *default_spec_id
367 }
368 TableRequirement::AssertDefaultSortOrderId {
369 default_sort_order_id,
370 } => metadata.default_sort_order_id == *default_sort_order_id,
371 })
372}
373
374/// Validates that view metadata meets all specified requirements
375///
376/// This function checks if the current view metadata satisfies all the requirements
377/// specified for a commit operation. It ensures atomic updates by verifying preconditions
378/// like UUID matches.
379///
380/// # Type Parameters
381/// * `T` - The materialization type for the view, must implement Materialization, Eq and 'static
382///
383/// # Arguments
384/// * `requirements` - List of requirements that must be satisfied
385/// * `metadata` - Current view metadata to validate against
386///
387/// # Returns
388/// * `true` if all requirements are met
389/// * `false` if any requirement is not satisfied
390pub fn check_view_requirements<T: Materialization + Eq + 'static>(
391 requirements: &[ViewRequirement],
392 metadata: &GeneralViewMetadata<T>,
393) -> bool {
394 requirements.iter().all(|x| match x {
395 ViewRequirement::AssertViewUuid { uuid } => metadata.view_uuid == *uuid,
396 })
397}
398/// Applies a sequence of updates to table metadata
399///
400/// This function processes each update in order, modifying the table metadata accordingly.
401/// Updates can include:
402/// * Format version changes
403/// * UUID assignments
404/// * Schema modifications
405/// * Partition spec and sort order changes
406/// * Snapshot management
407/// * Location and property updates
408///
409/// # Arguments
410/// * `metadata` - Mutable reference to table metadata to modify
411/// * `updates` - Vector of updates to apply
412///
413/// # Returns
414/// * `Ok(())` if all updates were applied successfully
415/// * `Err(Error)` if any update failed to apply
416pub fn apply_table_updates(
417 metadata: &mut TableMetadata,
418 updates: Vec<TableUpdate>,
419) -> Result<(), Error> {
420 let mut added_schema_id = None;
421 let mut added_spec_id = None;
422 let mut added_sort_order_id = None;
423 for update in updates {
424 match update {
425 TableUpdate::UpgradeFormatVersion { format_version } => {
426 if i32::from(metadata.format_version) != format_version {
427 unimplemented!("Table format upgrade");
428 }
429 }
430 TableUpdate::AssignUuid { uuid } => {
431 metadata.table_uuid = Uuid::parse_str(&uuid)?;
432 }
433 TableUpdate::AddSchema {
434 schema,
435 last_column_id,
436 } => {
437 let schema_id = *schema.schema_id();
438 metadata.schemas.insert(schema_id, schema);
439 added_schema_id = Some(schema_id);
440 if let Some(last_column_id) = last_column_id {
441 metadata.last_column_id = last_column_id;
442 }
443 }
444 TableUpdate::SetCurrentSchema { schema_id } => {
445 if schema_id == -1 {
446 if let Some(added_schema_id) = added_schema_id {
447 metadata.current_schema_id = added_schema_id;
448 } else {
449 return Err(Error::InvalidFormat(
450 "Cannot set current schema to -1 without adding a schema first"
451 .to_string(),
452 ));
453 }
454 } else {
455 metadata.current_schema_id = schema_id;
456 }
457 }
458 TableUpdate::AddSpec { spec } => {
459 let spec_id = *spec.spec_id();
460 metadata.partition_specs.insert(spec_id, spec);
461 added_spec_id = Some(spec_id);
462 }
463 TableUpdate::SetDefaultSpec { spec_id } => {
464 if spec_id == -1 {
465 if let Some(added_spec_id) = added_spec_id {
466 metadata.default_spec_id = added_spec_id;
467 } else {
468 return Err(Error::InvalidFormat(
469 "Cannot set default spec to -1 without adding a spec first".to_string(),
470 ));
471 }
472 } else {
473 metadata.default_spec_id = spec_id;
474 }
475 }
476 TableUpdate::AddSortOrder { sort_order } => {
477 let sort_order_id = sort_order.order_id;
478 metadata.sort_orders.insert(sort_order_id, sort_order);
479 added_sort_order_id = Some(sort_order_id);
480 }
481 TableUpdate::SetDefaultSortOrder { sort_order_id } => {
482 if sort_order_id == -1 {
483 if let Some(added_sort_order_id) = added_sort_order_id {
484 metadata.default_sort_order_id = added_sort_order_id;
485 } else {
486 return Err(Error::InvalidFormat(
487 "Cannot set default sort order to -1 without adding a sort order first"
488 .to_string(),
489 ));
490 }
491 } else {
492 metadata.default_sort_order_id = sort_order_id;
493 }
494 }
495 TableUpdate::AddSnapshot { snapshot } => {
496 metadata.snapshot_log.push(SnapshotLog {
497 snapshot_id: *snapshot.snapshot_id(),
498 timestamp_ms: *snapshot.timestamp_ms(),
499 });
500 metadata.last_sequence_number = *snapshot.sequence_number();
501 metadata.snapshots.insert(*snapshot.snapshot_id(), snapshot);
502 }
503 TableUpdate::SetSnapshotRef {
504 ref_name,
505 snapshot_reference,
506 } => {
507 if ref_name == "main" {
508 metadata.current_snapshot_id = Some(snapshot_reference.snapshot_id);
509 }
510 metadata.refs.insert(ref_name, snapshot_reference);
511 }
512 TableUpdate::RemoveSnapshots { snapshot_ids } => {
513 for id in snapshot_ids {
514 metadata.snapshots.remove(&id);
515 }
516 }
517 TableUpdate::RemoveSnapshotRef { ref_name } => {
518 metadata.refs.remove(&ref_name);
519 }
520 TableUpdate::SetLocation { location } => {
521 metadata.location = location;
522 }
523 TableUpdate::SetProperties { updates } => {
524 metadata.properties.extend(updates);
525 }
526 TableUpdate::RemoveProperties { removals } => {
527 for rem in removals {
528 metadata.properties.remove(&rem);
529 }
530 }
531 };
532 }
533
534 // Lastly make sure `last-updated-ms` field is up-to-date
535 metadata.last_updated_ms = SystemTime::now()
536 .duration_since(UNIX_EPOCH)
537 .unwrap()
538 .as_millis() as i64;
539 Ok(())
540}
541
542/// Applies a sequence of updates to view metadata
543///
544/// This function processes each update in order, modifying the view metadata accordingly.
545/// Updates can include:
546/// * Format version changes
547/// * UUID assignments
548/// * Schema modifications
549/// * Location and property updates
550/// * Version management
551///
552/// # Type Parameters
553/// * `T` - The materialization type for the view, must implement Materialization + 'static
554///
555/// # Arguments
556/// * `metadata` - Mutable reference to view metadata to modify
557/// * `updates` - Vector of updates to apply
558///
559/// # Returns
560/// * `Ok(())` if all updates were applied successfully
561/// * `Err(Error)` if any update failed to apply
562pub fn apply_view_updates<T: Materialization + 'static>(
563 metadata: &mut GeneralViewMetadata<T>,
564 updates: Vec<ViewUpdate<T>>,
565) -> Result<(), Error> {
566 for update in updates {
567 match update {
568 ViewUpdate::UpgradeFormatVersion { format_version } => {
569 if i32::from(metadata.format_version.clone()) != format_version {
570 unimplemented!("Upgrade of format version");
571 }
572 }
573 ViewUpdate::AssignUuid { uuid } => {
574 metadata.view_uuid = Uuid::parse_str(&uuid)?;
575 }
576 ViewUpdate::AddSchema {
577 schema,
578 last_column_id: _,
579 } => {
580 metadata.schemas.insert(*schema.schema_id(), schema);
581 }
582 ViewUpdate::SetLocation { location } => {
583 metadata.location = location;
584 }
585 ViewUpdate::SetProperties { updates } => {
586 metadata.properties.extend(updates);
587 }
588 ViewUpdate::RemoveProperties { removals } => {
589 for rem in removals {
590 metadata.properties.remove(&rem);
591 }
592 }
593 ViewUpdate::AddViewVersion { view_version } => {
594 metadata
595 .versions
596 .insert(view_version.version_id, view_version);
597 }
598 ViewUpdate::SetCurrentViewVersion { view_version_id } => {
599 metadata.current_version_id = view_version_id;
600 }
601 };
602 }
603 Ok(())
604}