iceberg_rust/view/transaction/
mod.rs

1/*!
2 * Defines the [Transaction] type for views to perform multiple view [Operation]s with ACID guarantees.
3*/
4
5pub mod operation;
6use iceberg_rust_spec::spec::{types::StructType, view_metadata::ViewRepresentation};
7use tracing::debug;
8
9use crate::{catalog::commit::CommitView, error::Error};
10
11use self::operation::Operation as ViewOperation;
12
13use super::View;
14
15/// Transactions let you perform a sequence of [Operation]s that can be committed to be performed with ACID guarantees.
16pub struct Transaction<'view> {
17    view: &'view mut View,
18    operations: Vec<ViewOperation>,
19    branch: Option<String>,
20}
21
22impl<'view> Transaction<'view> {
23    /// Create a transaction for the given view.
24    pub fn new(view: &'view mut View, branch: Option<&str>) -> Self {
25        Transaction {
26            view,
27            operations: vec![],
28            branch: branch.map(ToString::to_string),
29        }
30    }
31    /// Update the schmema of the view
32    pub fn update_representations(
33        mut self,
34        representations: Vec<ViewRepresentation>,
35        schema: StructType,
36    ) -> Self {
37        self.operations.push(ViewOperation::UpdateRepresentations {
38            representations,
39            schema,
40            branch: self.branch.clone(),
41        });
42        self
43    }
44    /// Update view properties
45    pub fn update_properties(mut self, entries: Vec<(String, String)>) -> Self {
46        self.operations
47            .push(ViewOperation::UpdateProperties(entries));
48        self
49    }
50    /// Commit the transaction to perform the [Operation]s with ACID guarantees.
51    pub async fn commit(self) -> Result<(), Error> {
52        let catalog = self.view.catalog();
53
54        let identifier = self.view.identifier().clone();
55        // Execute the table operations
56        let (mut requirements, mut updates) = (Vec::new(), Vec::new());
57        for operation in self.operations {
58            let (requirement, update) = operation.execute(&self.view.metadata).await?;
59
60            if let Some(requirement) = requirement {
61                requirements.push(requirement);
62            }
63            updates.extend(update);
64        }
65
66        debug!(
67            "Committing {} updates to view {}: requirements={:?}, updates={:?}",
68            updates.len(),
69            identifier,
70            requirements,
71            updates
72        );
73
74        let new_view = catalog
75            .clone()
76            .update_view(CommitView {
77                identifier,
78                requirements,
79                updates,
80            })
81            .await?;
82        *self.view = new_view;
83        Ok(())
84    }
85}