iceberg_rust/view/transaction/
mod.rs1pub mod operation;
6use iceberg_rust_spec::spec::{types::StructType, view_metadata::ViewRepresentation};
7use tracing::debug;
8
9use crate::{catalog::commit::CommitView, error::Error};
10
11use self::operation::Operation as ViewOperation;
12
13use super::View;
14
15pub struct Transaction<'view> {
17 view: &'view mut View,
18 operations: Vec<ViewOperation>,
19 branch: Option<String>,
20}
21
22impl<'view> Transaction<'view> {
23 pub fn new(view: &'view mut View, branch: Option<&str>) -> Self {
25 Transaction {
26 view,
27 operations: vec![],
28 branch: branch.map(ToString::to_string),
29 }
30 }
31 pub fn update_representations(
33 mut self,
34 representations: Vec<ViewRepresentation>,
35 schema: StructType,
36 ) -> Self {
37 self.operations.push(ViewOperation::UpdateRepresentations {
38 representations,
39 schema,
40 branch: self.branch.clone(),
41 });
42 self
43 }
44 pub fn update_properties(mut self, entries: Vec<(String, String)>) -> Self {
46 self.operations
47 .push(ViewOperation::UpdateProperties(entries));
48 self
49 }
50 pub async fn commit(self) -> Result<(), Error> {
52 let catalog = self.view.catalog();
53
54 let identifier = self.view.identifier().clone();
55 let (mut requirements, mut updates) = (Vec::new(), Vec::new());
57 for operation in self.operations {
58 let (requirement, update) = operation.execute(&self.view.metadata).await?;
59
60 if let Some(requirement) = requirement {
61 requirements.push(requirement);
62 }
63 updates.extend(update);
64 }
65
66 debug!(
67 "Committing {} updates to view {}: requirements={:?}, updates={:?}",
68 updates.len(),
69 identifier,
70 requirements,
71 updates
72 );
73
74 let new_view = catalog
75 .clone()
76 .update_view(CommitView {
77 identifier,
78 requirements,
79 updates,
80 })
81 .await?;
82 *self.view = new_view;
83 Ok(())
84 }
85}