1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
/*!
 * Defines the different [Operation]s on a [View].
*/

use iceberg_rust_spec::spec::{
    schema::SchemaBuilder,
    types::StructType,
    view_metadata::{GeneralViewMetadata, Summary, Version, ViewRepresentation, REF_PREFIX},
};
use std::{
    collections::HashMap,
    time::{SystemTime, UNIX_EPOCH},
};

use crate::{
    catalog::commit::{ViewRequirement, ViewUpdate},
    error::Error,
};

/// View operation
pub enum Operation {
    /// Update vresion
    UpdateRepresentation {
        /// Representation to add
        representation: ViewRepresentation,
        /// Schema of the representation
        schema: StructType,
        /// Branch where to add the representation
        branch: Option<String>,
    },
    /// Update view properties
    UpdateProperties(Vec<(String, String)>),
}

impl Operation {
    /// Execute operation
    pub async fn execute<T: Clone + Default>(
        self,
        metadata: &GeneralViewMetadata<T>,
    ) -> Result<(Option<ViewRequirement>, Vec<ViewUpdate>), Error> {
        match self {
            Operation::UpdateRepresentation {
                representation,
                schema,
                branch,
            } => {
                let version_id = metadata.versions.keys().max().unwrap_or(&0) + 1;
                let schema_id = metadata.schemas.keys().max().unwrap_or(&0) + 1;
                let last_column_id = schema.iter().map(|x| x.id).max().unwrap_or(0);
                let version = Version {
                    version_id,
                    schema_id,
                    summary: Summary {
                        operation: iceberg_rust_spec::spec::view_metadata::Operation::Replace,
                        engine_name: None,
                        engine_version: None,
                    },
                    representations: vec![representation],
                    default_catalog: None,
                    default_namespace: None,
                    timestamp_ms: SystemTime::now()
                        .duration_since(UNIX_EPOCH)
                        .unwrap()
                        .as_micros() as i64,
                };

                let branch_name = branch.unwrap_or("main".to_string());

                Ok((
                    Some(ViewRequirement::AssertViewUuid {
                        uuid: metadata.view_uuid,
                    }),
                    vec![
                        ViewUpdate::AddViewVersion {
                            view_version: version,
                        },
                        ViewUpdate::SetCurrentViewVersion {
                            view_version_id: version_id,
                        },
                        ViewUpdate::AddSchema {
                            schema: SchemaBuilder::default()
                                .with_schema_id(schema_id)
                                .with_fields(schema)
                                .build()
                                .map_err(iceberg_rust_spec::error::Error::from)?,
                            last_column_id: Some(last_column_id),
                        },
                        ViewUpdate::SetProperties {
                            updates: HashMap::from_iter(vec![(
                                REF_PREFIX.to_string() + &branch_name,
                                version_id.to_string(),
                            )]),
                        },
                    ],
                ))
            }
            Operation::UpdateProperties(entries) => Ok((
                None,
                vec![ViewUpdate::SetProperties {
                    updates: HashMap::from_iter(entries),
                }],
            )),
        }
    }
}