1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
/*!
 * A Struct for the view metadata   
*/

use std::{
    collections::HashMap,
    time::{SystemTime, UNIX_EPOCH},
};

use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use uuid::Uuid;

use crate::error::Error;

use super::schema::Schema;

use _serde::ViewMetadataEnum;

/// Prefix used to denote branch references in the view properties
pub static REF_PREFIX: &str = "ref-";

/// Fields for the version 1 of the view metadata.
pub type ViewMetadata = GeneralViewMetadata<ViewRepresentation>;
/// Builder for the view metadata
pub type ViewMetadataBuilder = GeneralViewMetadataBuilder<ViewRepresentation>;

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)]
#[serde(try_from = "ViewMetadataEnum<T>", into = "ViewMetadataEnum<T>")]
/// Fields for the version 1 of the view metadata.
pub struct GeneralViewMetadata<T: Representation> {
    #[builder(default = "Uuid::new_v4()")]
    /// A UUID that identifies the view, generated when the view is created. Implementations must throw an exception if a view’s UUID does not match the expected UUID after refreshing metadata
    pub view_uuid: Uuid,
    #[builder(default)]
    /// An integer version number for the view format; must be 1
    pub format_version: FormatVersion,
    #[builder(setter(into))]
    /// The view’s base location. This is used to determine where to store manifest files and view metadata files.
    pub location: String,
    /// Current version of the view. Set to ‘1’ when the view is first created.
    pub current_version_id: i64,
    #[builder(setter(each(name = "with_version")), default)]
    /// An array of structs describing the last known versions of the view. Controlled by the table property: “version.history.num-entries”. See section Versions.
    pub versions: HashMap<i64, Version<T>>,
    #[builder(default)]
    /// A list of timestamp and version ID pairs that encodes changes to the current version for the view.
    /// Each time the current-version-id is changed, a new entry should be added with the last-updated-ms and the new current-version-id.
    pub version_log: Vec<VersionLogStruct>,
    #[builder(setter(each(name = "with_schema")), default)]
    /// A list of schemas, the same as the ‘schemas’ field from Iceberg table spec.
    pub schemas: HashMap<i32, Schema>,
    #[builder(default)]
    /// A string to string map of view properties. This is used for metadata such as “comment” and for settings that affect view maintenance.
    /// This is not intended to be used for arbitrary metadata.
    pub properties: HashMap<String, String>,
}

impl<T: Representation> GeneralViewMetadata<T> {
    /// Get current schema
    #[inline]
    pub fn current_schema(&self, branch: Option<&str>) -> Result<&Schema, Error> {
        let id = self.current_version(branch)?.schema_id;
        self.schemas
            .get(&id)
            .ok_or_else(|| Error::InvalidFormat("view metadata".to_string()))
    }
    /// Get schema for version
    #[inline]
    pub fn schema(&self, version_id: i64) -> Result<&Schema, Error> {
        let id = self
            .versions
            .get(&version_id)
            .ok_or_else(|| Error::NotFound("Version".to_string(), version_id.to_string()))?
            .schema_id;
        self.schemas
            .get(&id)
            .ok_or_else(|| Error::InvalidFormat("view metadata".to_string()))
    }
    /// Get current version
    #[inline]
    pub fn current_version(&self, snapshot_ref: Option<&str>) -> Result<&Version<T>, Error> {
        let version_id: i64 = match snapshot_ref {
            None => self.current_version_id,
            Some(reference) => self
                .properties
                .get(&(REF_PREFIX.to_string() + &reference))
                .and_then(|x| x.parse().ok())
                .unwrap_or(self.current_version_id),
        };
        self.versions
            .get(&version_id)
            .ok_or_else(|| Error::InvalidFormat("view metadata".to_string()))
    }
    /// Add schema to view metadata
    #[inline]
    pub fn add_schema(&mut self, schema: Schema) {
        self.schemas.insert(schema.schema_id, schema);
    }
}

mod _serde {
    use std::collections::HashMap;

    use serde::{Deserialize, Serialize};
    use uuid::Uuid;

    use crate::{
        error::Error,
        spec::{schema::SchemaV2, table_metadata::VersionNumber},
    };

    use super::{FormatVersion, GeneralViewMetadata, Representation, Version, VersionLogStruct};

    /// Metadata of an iceberg view
    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
    #[serde(untagged)]
    pub(super) enum ViewMetadataEnum<T> {
        /// Version 1 of the table metadata
        V1(ViewMetadataV1<T>),
    }

    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
    #[serde(rename_all = "kebab-case")]
    /// Fields for the version 1 of the view metadata.
    pub struct ViewMetadataV1<T> {
        /// A UUID that identifies the view, generated when the view is created. Implementations must throw an exception if a view’s UUID does not match the expected UUID after refreshing metadata
        pub view_uuid: Uuid,
        /// An integer version number for the view format; must be 1
        pub format_version: VersionNumber<1>,
        /// The view’s base location. This is used to determine where to store manifest files and view metadata files.
        pub location: String,
        /// Current version of the view. Set to ‘1’ when the view is first created.
        pub current_version_id: i64,
        /// An array of structs describing the last known versions of the view. Controlled by the table property: “version.history.num-entries”. See section Versions.
        pub versions: Vec<Version<T>>,
        /// A list of timestamp and version ID pairs that encodes changes to the current version for the view.
        /// Each time the current-version-id is changed, a new entry should be added with the last-updated-ms and the new current-version-id.
        pub version_log: Vec<VersionLogStruct>,
        /// A list of schemas, the same as the ‘schemas’ field from Iceberg table spec.
        pub schemas: Vec<SchemaV2>,
        /// A string to string map of view properties. This is used for metadata such as “comment” and for settings that affect view maintenance.
        /// This is not intended to be used for arbitrary metadata.
        pub properties: Option<HashMap<String, String>>,
    }

    impl<T: Representation> TryFrom<ViewMetadataEnum<T>> for GeneralViewMetadata<T> {
        type Error = Error;
        fn try_from(value: ViewMetadataEnum<T>) -> Result<Self, Self::Error> {
            match value {
                ViewMetadataEnum::V1(metadata) => metadata.try_into(),
            }
        }
    }

    impl<T: Representation> From<GeneralViewMetadata<T>> for ViewMetadataEnum<T> {
        fn from(value: GeneralViewMetadata<T>) -> Self {
            match value.format_version {
                FormatVersion::V1 => ViewMetadataEnum::V1(value.into()),
            }
        }
    }

    impl<T: Representation> TryFrom<ViewMetadataV1<T>> for GeneralViewMetadata<T> {
        type Error = Error;
        fn try_from(value: ViewMetadataV1<T>) -> Result<Self, Self::Error> {
            Ok(GeneralViewMetadata {
                view_uuid: value.view_uuid,
                format_version: FormatVersion::V1,
                location: value.location,
                current_version_id: value.current_version_id,
                versions: HashMap::from_iter(value.versions.into_iter().map(|x| (x.version_id, x))),
                version_log: value.version_log,
                properties: value.properties.unwrap_or_default(),
                schemas: HashMap::from_iter(
                    value
                        .schemas
                        .into_iter()
                        .map(|x| Ok((x.schema_id, x.try_into()?)))
                        .collect::<Result<Vec<_>, Error>>()?,
                ),
            })
        }
    }

    impl<T: Representation> From<GeneralViewMetadata<T>> for ViewMetadataV1<T> {
        fn from(value: GeneralViewMetadata<T>) -> Self {
            ViewMetadataV1 {
                view_uuid: value.view_uuid,
                format_version: VersionNumber::<1>,
                location: value.location,
                current_version_id: value.current_version_id,
                versions: value.versions.into_values().collect(),
                version_log: value.version_log,
                properties: if value.properties.is_empty() {
                    None
                } else {
                    Some(value.properties)
                },
                schemas: value.schemas.into_values().map(Into::into).collect(),
            }
        }
    }
}
#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone)]
#[repr(u8)]
/// Iceberg format version
#[derive(Default)]
pub enum FormatVersion {
    /// Iceberg spec version 1
    #[default]
    V1 = b'1',
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)]
#[serde(rename_all = "kebab-case")]
/// Fields for the version 2 of the view metadata.
pub struct Version<T> {
    /// Monotonically increasing id indicating the version of the view. Starts with 1.
    pub version_id: i64,
    /// ID of the schema for the view version
    pub schema_id: i32,
    #[builder(
        default = "SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_micros() as i64"
    )]
    /// Timestamp expressed in ms since epoch at which the version of the view was created.
    pub timestamp_ms: i64,
    #[builder(default)]
    /// A string map summarizes the version changes, including operation, described in Summary.
    pub summary: Summary,
    #[builder(setter(each(name = "with_representation")), default)]
    /// A list of “representations” as described in Representations.
    pub representations: Vec<T>,
    #[builder(default)]
    /// A string specifying the catalog to use when the table or view references in the view definition do not contain an explicit catalog.
    pub default_catalog: Option<String>,
    #[builder(default)]
    /// The namespace to use when the table or view references in the view definition do not contain an explicit namespace.
    /// Since the namespace may contain multiple parts, it is serialized as a list of strings.
    pub default_namespace: Option<Vec<String>>,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "kebab-case")]
/// Fields for the version 2 of the view metadata.
pub struct VersionLogStruct {
    /// The timestamp when the referenced version was made the current version
    pub timestamp_ms: i64,
    /// Version id of the view
    pub version_id: i64,
}

#[derive(Debug, PartialEq, Eq, Clone)]
/// View operation that create the metadata file
#[derive(Default)]
pub enum Operation {
    /// Create view
    #[default]
    Create,
    /// Replace view
    Replace,
}

/// Serialize for PrimitiveType wit special handling for
/// Decimal and Fixed types.
impl Serialize for Operation {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: serde::Serializer,
    {
        use Operation::*;
        match self {
            Create => serializer.serialize_str("create"),
            Replace => serializer.serialize_str("replace"),
        }
    }
}

/// Serialize for PrimitiveType wit special handling for
/// Decimal and Fixed types.
impl<'de> Deserialize<'de> for Operation {
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    where
        D: serde::Deserializer<'de>,
    {
        let s = String::deserialize(deserializer)?;
        if s == "create" {
            Ok(Operation::Create)
        } else if s == "replace" {
            Ok(Operation::Replace)
        } else {
            Err(serde::de::Error::custom("Invalid view operation."))
        }
    }
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)]
#[serde(rename_all = "kebab-case")]
/// Fields for the version 2 of the view metadata.
pub struct Summary {
    /// A string value indicating the view operation that caused this metadata to be created. Allowed values are “create” and “replace”.
    pub operation: Operation,
    /// Name of the engine that created the view version
    pub engine_name: Option<String>,
    /// A string value indicating the version of the engine that performed the operation
    pub engine_version: Option<String>,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "kebab-case", tag = "type")]
/// Fields for the version 2 of the view metadata.
pub enum ViewRepresentation {
    #[serde(rename = "sql")]
    /// This type of representation stores the original view definition in SQL and its SQL dialect.
    Sql {
        /// A string representing the original view definition in SQL
        sql: String,
        /// A string specifying the dialect of the ‘sql’ field. It can be used by the engines to detect the SQL dialect.
        dialect: String,
    },
}

impl Representation for ViewRepresentation {}

/// Marks a representation for an iceberg view
pub trait Representation: Clone {}

#[cfg(test)]
mod tests {

    use crate::{error::Error, spec::view_metadata::ViewMetadata};

    #[test]
    fn test_deserialize_view_data_v1() -> Result<(), Error> {
        let data = r#"
        {
        "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
        "format-version" : 1,
        "location" : "s3://bucket/warehouse/default.db/event_agg",
        "current-version-id" : 1,
        "properties" : {
            "comment" : "Daily event counts"
        },
        "versions" : [ {
            "version-id" : 1,
            "timestamp-ms" : 1573518431292,
            "schema-id" : 1,
            "default-catalog" : "prod",
            "default-namespace" : [ "default" ],
            "summary" : {
            "operation" : "create",
            "engine-name" : "Spark",
            "engineVersion" : "3.3.2"
            },
            "representations" : [ {
            "type" : "sql",
            "sql" : "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
            "dialect" : "spark"
            } ]
        } ],
        "schemas": [ {
            "schema-id": 1,
            "type" : "struct",
            "fields" : [ {
            "id" : 1,
            "name" : "event_count",
            "required" : false,
            "type" : "int",
            "doc" : "Count of events"
            }, {
            "id" : 2,
            "name" : "event_date",
            "required" : false,
            "type" : "date"
            } ]
        } ],
        "version-log" : [ {
            "timestamp-ms" : 1573518431292,
            "version-id" : 1
        } ]
        }
        "#;
        let metadata =
            serde_json::from_str::<ViewMetadata>(data).expect("Failed to deserialize json");
        //test serialise deserialise works.
        let metadata_two: ViewMetadata = serde_json::from_str(
            &serde_json::to_string(&metadata).expect("Failed to serialize metadata"),
        )
        .expect("Failed to serialize json");
        dbg!(&metadata, &metadata_two);
        assert_eq!(metadata, metadata_two);

        Ok(())
    }
}