1use std::{
13 collections::HashMap,
14 fmt::{self},
15 str,
16 time::{SystemTime, UNIX_EPOCH},
17};
18
19use derive_builder::Builder;
20use derive_getters::Getters;
21use serde::{Deserialize, Serialize};
22use serde_repr::{Deserialize_repr, Serialize_repr};
23use uuid::Uuid;
24
25use crate::{error::Error, identifier::FullIdentifier};
26
27use super::{
28 schema::{Schema, DEFAULT_SCHEMA_ID},
29 tabular::TabularMetadataRef,
30};
31
32pub use _serde::ViewMetadataV1;
33
34use _serde::ViewMetadataEnum;
35
36pub static REF_PREFIX: &str = "ref-";
38
39pub static DEFAULT_VERSION_ID: i64 = 0;
41
42pub type ViewMetadata = GeneralViewMetadata<Option<()>>;
44pub type ViewMetadataBuilder = GeneralViewMetadataBuilder<Option<()>>;
46
47#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)]
48#[serde(try_from = "ViewMetadataEnum<T>", into = "ViewMetadataEnum<T>")]
49pub struct GeneralViewMetadata<T: Materialization> {
51 #[builder(default = "Uuid::new_v4()")]
52 pub view_uuid: Uuid,
54 #[builder(default)]
55 pub format_version: FormatVersion,
57 #[builder(setter(into))]
58 pub location: String,
60 pub current_version_id: i64,
62 #[builder(setter(each(name = "with_version")), default)]
63 pub versions: HashMap<i64, Version<T>>,
65 #[builder(default)]
66 pub version_log: Vec<VersionLogStruct>,
69 #[builder(setter(each(name = "with_schema")), default)]
70 pub schemas: HashMap<i32, Schema>,
72 #[builder(default)]
73 pub properties: HashMap<String, String>,
76}
77
78impl<T: Materialization> GeneralViewMetadata<T> {
79 #[inline]
87 pub fn current_schema(&self, branch: Option<&str>) -> Result<&Schema, Error> {
88 let id = self.current_version(branch)?.schema_id;
89 self.schemas
90 .get(&id)
91 .ok_or_else(|| Error::InvalidFormat("view metadata".to_string()))
92 }
93
94 #[inline]
102 pub fn schema(&self, version_id: i64) -> Result<&Schema, Error> {
103 let id = self
104 .versions
105 .get(&version_id)
106 .ok_or_else(|| Error::NotFound(format!("View version {version_id}")))?
107 .schema_id;
108 self.schemas
109 .get(&id)
110 .ok_or_else(|| Error::InvalidFormat("view metadata".to_string()))
111 }
112
113 #[inline]
121 pub fn current_version(&self, snapshot_ref: Option<&str>) -> Result<&Version<T>, Error> {
122 let version_id: i64 = match snapshot_ref {
123 None => self.current_version_id,
124 Some(reference) => self
125 .properties
126 .get(&(REF_PREFIX.to_string() + reference))
127 .and_then(|x| x.parse().ok())
128 .unwrap_or(self.current_version_id),
129 };
130 self.versions
131 .get(&version_id)
132 .ok_or_else(|| Error::InvalidFormat("view metadata".to_string()))
133 }
134
135 #[inline]
140 pub fn add_schema(&mut self, schema: Schema) {
141 self.schemas.insert(*schema.schema_id(), schema);
142 }
143}
144
145impl ViewMetadata {
146 pub fn as_ref(&self) -> TabularMetadataRef<'_> {
147 TabularMetadataRef::View(self)
148 }
149}
150
151impl fmt::Display for ViewMetadata {
152 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153 write!(
154 f,
155 "{}",
156 &serde_json::to_string(self).map_err(|_| fmt::Error)?,
157 )
158 }
159}
160
161impl str::FromStr for ViewMetadata {
162 type Err = Error;
163 fn from_str(s: &str) -> Result<Self, Self::Err> {
164 serde_json::from_str(s).map_err(Error::from)
165 }
166}
167
168mod _serde {
169 use std::collections::HashMap;
170
171 use serde::{Deserialize, Serialize};
172 use uuid::Uuid;
173
174 use crate::{
175 error::Error,
176 spec::{schema::SchemaV2, table_metadata::VersionNumber},
177 };
178
179 use super::{FormatVersion, GeneralViewMetadata, Materialization, Version, VersionLogStruct};
180
181 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
183 #[serde(untagged)]
184 pub(super) enum ViewMetadataEnum<T: Materialization> {
185 V1(ViewMetadataV1<T>),
187 }
188
189 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
190 #[serde(rename_all = "kebab-case")]
191 pub struct ViewMetadataV1<T: Materialization> {
193 pub view_uuid: Uuid,
195 pub format_version: VersionNumber<1>,
197 pub location: String,
199 pub current_version_id: i64,
201 pub versions: Vec<Version<T>>,
203 pub version_log: Vec<VersionLogStruct>,
206 pub schemas: Vec<SchemaV2>,
208 #[serde(skip_serializing_if = "Option::is_none")]
211 pub properties: Option<HashMap<String, String>>,
212 }
213
214 impl<T: Materialization> TryFrom<ViewMetadataEnum<T>> for GeneralViewMetadata<T> {
215 type Error = Error;
216 fn try_from(value: ViewMetadataEnum<T>) -> Result<Self, Self::Error> {
217 match value {
218 ViewMetadataEnum::V1(metadata) => metadata.try_into(),
219 }
220 }
221 }
222
223 impl<T: Materialization> From<GeneralViewMetadata<T>> for ViewMetadataEnum<T> {
224 fn from(value: GeneralViewMetadata<T>) -> Self {
225 match value.format_version {
226 FormatVersion::V1 => ViewMetadataEnum::V1(value.into()),
227 }
228 }
229 }
230
231 impl<T: Materialization> TryFrom<ViewMetadataV1<T>> for GeneralViewMetadata<T> {
232 type Error = Error;
233 fn try_from(value: ViewMetadataV1<T>) -> Result<Self, Self::Error> {
234 Ok(GeneralViewMetadata {
235 view_uuid: value.view_uuid,
236 format_version: FormatVersion::V1,
237 location: value.location,
238 current_version_id: value.current_version_id,
239 versions: HashMap::from_iter(value.versions.into_iter().map(|x| (x.version_id, x))),
240 version_log: value.version_log,
241 properties: value.properties.unwrap_or_default(),
242 schemas: HashMap::from_iter(
243 value
244 .schemas
245 .into_iter()
246 .map(|x| Ok((x.schema_id, x.try_into()?)))
247 .collect::<Result<Vec<_>, Error>>()?,
248 ),
249 })
250 }
251 }
252
253 impl<T: Materialization> From<GeneralViewMetadata<T>> for ViewMetadataV1<T> {
254 fn from(value: GeneralViewMetadata<T>) -> Self {
255 ViewMetadataV1 {
256 view_uuid: value.view_uuid,
257 format_version: VersionNumber::<1>,
258 location: value.location,
259 current_version_id: value.current_version_id,
260 versions: value.versions.into_values().collect(),
261 version_log: value.version_log,
262 properties: if value.properties.is_empty() {
263 None
264 } else {
265 Some(value.properties)
266 },
267 schemas: value.schemas.into_values().map(Into::into).collect(),
268 }
269 }
270 }
271}
272
273#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone)]
274#[repr(u8)]
275#[derive(Default)]
277pub enum FormatVersion {
278 #[default]
280 V1 = b'1',
281}
282
283impl TryFrom<u8> for FormatVersion {
284 type Error = Error;
285 fn try_from(value: u8) -> Result<Self, Self::Error> {
286 match value {
287 1 => Ok(FormatVersion::V1),
288 _ => Err(Error::Conversion(
289 "u8".to_string(),
290 "format version".to_string(),
291 )),
292 }
293 }
294}
295
296impl From<FormatVersion> for u8 {
297 fn from(value: FormatVersion) -> Self {
298 match value {
299 FormatVersion::V1 => b'1',
300 }
301 }
302}
303
304impl From<FormatVersion> for i32 {
305 fn from(value: FormatVersion) -> Self {
306 match value {
307 FormatVersion::V1 => 1,
308 }
309 }
310}
311
312#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder, Getters)]
313#[serde(rename_all = "kebab-case")]
314pub struct Version<T: Materialization> {
316 #[builder(default = "DEFAULT_VERSION_ID")]
318 pub version_id: i64,
319 #[builder(default = "DEFAULT_SCHEMA_ID")]
321 pub schema_id: i32,
322 #[builder(
323 default = "SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as i64"
324 )]
325 pub timestamp_ms: i64,
327 #[builder(default)]
328 pub summary: Summary,
330 #[builder(setter(each(name = "with_representation")), default)]
331 pub representations: Vec<ViewRepresentation>,
333 #[builder(setter(strip_option), default)]
334 #[serde(skip_serializing_if = "Option::is_none")]
335 pub default_catalog: Option<String>,
337 #[builder(setter(strip_option), default)]
338 pub default_namespace: Vec<String>,
341 #[builder(default)]
343 #[serde(skip_serializing_if = "Materialization::is_none")]
344 pub storage_table: T,
345}
346
347pub trait Materialization: Clone + Default {
348 fn is_none(&self) -> bool;
349}
350
351impl Materialization for Option<()> {
352 fn is_none(&self) -> bool {
353 true
354 }
355}
356
357impl Materialization for FullIdentifier {
358 fn is_none(&self) -> bool {
359 false
360 }
361}
362
363impl<T: Materialization> Version<T> {
364 pub fn builder() -> VersionBuilder<T> {
365 VersionBuilder::default()
366 }
367}
368
369impl<T: Materialization + Serialize> fmt::Display for Version<T> {
370 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
371 write!(
372 f,
373 "{}",
374 &serde_json::to_string(self).map_err(|_| fmt::Error)?,
375 )
376 }
377}
378
379impl<T: Materialization + for<'de> Deserialize<'de>> str::FromStr for Version<T> {
380 type Err = Error;
381 fn from_str(s: &str) -> Result<Self, Self::Err> {
382 serde_json::from_str(s).map_err(Error::from)
383 }
384}
385
386#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
387#[serde(rename_all = "kebab-case")]
388pub struct VersionLogStruct {
390 pub timestamp_ms: i64,
392 pub version_id: i64,
394}
395
396#[derive(Debug, PartialEq, Eq, Clone)]
397#[derive(Default)]
399pub enum Operation {
400 #[default]
402 Create,
403 Replace,
405}
406
407impl Serialize for Operation {
410 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
411 where
412 S: serde::Serializer,
413 {
414 use Operation::*;
415 match self {
416 Create => serializer.serialize_str("create"),
417 Replace => serializer.serialize_str("replace"),
418 }
419 }
420}
421
422impl<'de> Deserialize<'de> for Operation {
425 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
426 where
427 D: serde::Deserializer<'de>,
428 {
429 let s = String::deserialize(deserializer)?;
430 if s == "create" {
431 Ok(Operation::Create)
432 } else if s == "replace" {
433 Ok(Operation::Replace)
434 } else {
435 Err(serde::de::Error::custom("Invalid view operation."))
436 }
437 }
438}
439
440#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)]
441#[serde(rename_all = "kebab-case")]
442pub struct Summary {
444 pub operation: Operation,
446 #[serde(skip_serializing_if = "Option::is_none")]
447 pub engine_name: Option<String>,
449 #[serde(skip_serializing_if = "Option::is_none")]
450 pub engine_version: Option<String>,
452}
453
454#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
455#[serde(rename_all = "kebab-case", tag = "type")]
456pub enum ViewRepresentation {
458 #[serde(rename = "sql")]
459 Sql {
461 sql: String,
463 dialect: String,
465 },
466}
467
468impl ViewRepresentation {
469 pub fn sql(sql: &str, dialect: Option<&str>) -> Self {
470 ViewRepresentation::Sql {
471 sql: sql.to_owned(),
472 dialect: dialect.unwrap_or("ansi").to_owned(),
473 }
474 }
475}
476
477#[cfg(test)]
478mod tests {
479
480 use crate::{error::Error, spec::view_metadata::ViewMetadata};
481
482 #[test]
483 fn test_deserialize_view_data_v1() -> Result<(), Error> {
484 let data = r#"
485 {
486 "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
487 "format-version" : 1,
488 "location" : "s3://bucket/warehouse/default.db/event_agg",
489 "current-version-id" : 1,
490 "properties" : {
491 "comment" : "Daily event counts"
492 },
493 "versions" : [ {
494 "version-id" : 1,
495 "timestamp-ms" : 1573518431292,
496 "schema-id" : 1,
497 "default-catalog" : "prod",
498 "default-namespace" : [ "default" ],
499 "summary" : {
500 "operation" : "create",
501 "engine-name" : "Spark",
502 "engineVersion" : "3.3.2"
503 },
504 "representations" : [ {
505 "type" : "sql",
506 "sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
507 "dialect" : "spark"
508 } ]
509 } ],
510 "schemas": [ {
511 "schema-id": 1,
512 "type" : "struct",
513 "fields" : [ {
514 "id" : 1,
515 "name" : "event_count",
516 "required" : false,
517 "type" : "int",
518 "doc" : "Count of events"
519 }, {
520 "id" : 2,
521 "name" : "event_date",
522 "required" : false,
523 "type" : "date"
524 } ]
525 } ],
526 "version-log" : [ {
527 "timestamp-ms" : 1573518431292,
528 "version-id" : 1
529 } ]
530 }
531 "#;
532 let metadata =
533 serde_json::from_str::<ViewMetadata>(data).expect("Failed to deserialize json");
534 let metadata_two: ViewMetadata = serde_json::from_str(
536 &serde_json::to_string(&metadata).expect("Failed to serialize metadata"),
537 )
538 .expect("Failed to serialize json");
539 assert_eq!(metadata, metadata_two);
540
541 Ok(())
542 }
543}