1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/*!
 * Defines the [Transaction] type for views to perform multiple view [Operation]s with ACID guarantees.
*/

pub mod operation;
use iceberg_rust_spec::spec::{types::StructType, view_metadata::ViewRepresentation};

use crate::{catalog::commit::CommitView, error::Error};

use self::operation::Operation as ViewOperation;

use super::View;

/// Transactions let you perform a sequence of [Operation]s that can be committed to be performed with ACID guarantees.
pub struct Transaction<'view> {
    view: &'view mut View,
    operations: Vec<ViewOperation>,
    branch: Option<String>,
}

impl<'view> Transaction<'view> {
    /// Create a transaction for the given view.
    pub fn new(view: &'view mut View, branch: Option<&str>) -> Self {
        Transaction {
            view,
            operations: vec![],
            branch: branch.map(ToString::to_string),
        }
    }
    /// Update the schmema of the view
    pub fn update_representation(
        mut self,
        representation: ViewRepresentation,
        schema: StructType,
    ) -> Self {
        self.operations.push(ViewOperation::UpdateRepresentation {
            representation,
            schema,
            branch: self.branch.clone(),
        });
        self
    }
    /// Update view properties
    pub fn update_properties(mut self, entries: Vec<(String, String)>) -> Self {
        self.operations
            .push(ViewOperation::UpdateProperties(entries));
        self
    }
    /// Commit the transaction to perform the [Operation]s with ACID guarantees.
    pub async fn commit(self) -> Result<(), Error> {
        let catalog = self.view.catalog();

        let identifier = self.view.identifier().clone();
        // Execute the table operations
        let (mut requirements, mut updates) = (Vec::new(), Vec::new());
        for operation in self.operations {
            let (requirement, update) = operation.execute(&self.view.metadata).await?;

            if let Some(requirement) = requirement {
                requirements.push(requirement);
            }
            updates.extend(update);
        }
        let new_view = catalog
            .clone()
            .update_view(CommitView {
                identifier,
                requirements,
                updates,
            })
            .await?;
        *self.view = new_view;
        Ok(())
    }
}