use std::{
collections::HashMap,
fmt::{self, Display},
iter::once,
ops::{Deref, DerefMut},
str,
time::{SystemTime, UNIX_EPOCH},
};
use derive_builder::Builder;
use derive_getters::Getters;
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use uuid::Uuid;
use crate::error::Error;
use super::schema::{Schema, DEFAULT_SCHEMA_ID};
pub use _serde::ViewMetadataV1;
use _serde::ViewMetadataEnum;
pub static REF_PREFIX: &str = "ref-";
pub static DEFAULT_VERSION_ID: i64 = 0;
pub type ViewMetadata = GeneralViewMetadata<Option<()>>;
pub type ViewMetadataBuilder = GeneralViewMetadataBuilder<Option<()>>;
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)]
#[serde(try_from = "ViewMetadataEnum<T>", into = "ViewMetadataEnum<T>")]
pub struct GeneralViewMetadata<T: Materialization> {
#[builder(default = "Uuid::new_v4()")]
pub view_uuid: Uuid,
#[builder(default)]
pub format_version: FormatVersion,
#[builder(setter(into))]
pub location: String,
pub current_version_id: i64,
#[builder(setter(each(name = "with_version")), default)]
pub versions: HashMap<i64, Version<T>>,
#[builder(default)]
pub version_log: Vec<VersionLogStruct>,
#[builder(setter(each(name = "with_schema")), default)]
pub schemas: HashMap<i32, Schema>,
#[builder(default)]
pub properties: HashMap<String, String>,
}
impl<T: Materialization> GeneralViewMetadata<T> {
#[inline]
pub fn current_schema(&self, branch: Option<&str>) -> Result<&Schema, Error> {
let id = self.current_version(branch)?.schema_id;
self.schemas
.get(&id)
.ok_or_else(|| Error::InvalidFormat("view metadata".to_string()))
}
#[inline]
pub fn schema(&self, version_id: i64) -> Result<&Schema, Error> {
let id = self
.versions
.get(&version_id)
.ok_or_else(|| Error::NotFound("Version".to_string(), version_id.to_string()))?
.schema_id;
self.schemas
.get(&id)
.ok_or_else(|| Error::InvalidFormat("view metadata".to_string()))
}
#[inline]
pub fn current_version(&self, snapshot_ref: Option<&str>) -> Result<&Version<T>, Error> {
let version_id: i64 = match snapshot_ref {
None => self.current_version_id,
Some(reference) => self
.properties
.get(&(REF_PREFIX.to_string() + reference))
.and_then(|x| x.parse().ok())
.unwrap_or(self.current_version_id),
};
self.versions
.get(&version_id)
.ok_or_else(|| Error::InvalidFormat("view metadata".to_string()))
}
#[inline]
pub fn add_schema(&mut self, schema: Schema) {
self.schemas.insert(*schema.schema_id(), schema);
}
}
impl fmt::Display for ViewMetadata {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}",
&serde_json::to_string(self).map_err(|_| fmt::Error::default())?,
)
}
}
impl str::FromStr for ViewMetadata {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
serde_json::from_str(s).map_err(Error::from)
}
}
mod _serde {
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::{
error::Error,
spec::{schema::SchemaV2, table_metadata::VersionNumber},
};
use super::{FormatVersion, GeneralViewMetadata, Materialization, Version, VersionLogStruct};
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(untagged)]
pub(super) enum ViewMetadataEnum<T: Materialization> {
V1(ViewMetadataV1<T>),
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
pub struct ViewMetadataV1<T: Materialization> {
pub view_uuid: Uuid,
pub format_version: VersionNumber<1>,
pub location: String,
pub current_version_id: i64,
pub versions: Vec<Version<T>>,
pub version_log: Vec<VersionLogStruct>,
pub schemas: Vec<SchemaV2>,
#[serde(skip_serializing_if = "Option::is_none")]
pub properties: Option<HashMap<String, String>>,
}
impl<T: Materialization> TryFrom<ViewMetadataEnum<T>> for GeneralViewMetadata<T> {
type Error = Error;
fn try_from(value: ViewMetadataEnum<T>) -> Result<Self, Self::Error> {
match value {
ViewMetadataEnum::V1(metadata) => metadata.try_into(),
}
}
}
impl<T: Materialization> From<GeneralViewMetadata<T>> for ViewMetadataEnum<T> {
fn from(value: GeneralViewMetadata<T>) -> Self {
match value.format_version {
FormatVersion::V1 => ViewMetadataEnum::V1(value.into()),
}
}
}
impl<T: Materialization> TryFrom<ViewMetadataV1<T>> for GeneralViewMetadata<T> {
type Error = Error;
fn try_from(value: ViewMetadataV1<T>) -> Result<Self, Self::Error> {
Ok(GeneralViewMetadata {
view_uuid: value.view_uuid,
format_version: FormatVersion::V1,
location: value.location,
current_version_id: value.current_version_id,
versions: HashMap::from_iter(value.versions.into_iter().map(|x| (x.version_id, x))),
version_log: value.version_log,
properties: value.properties.unwrap_or_default(),
schemas: HashMap::from_iter(
value
.schemas
.into_iter()
.map(|x| Ok((x.schema_id, x.try_into()?)))
.collect::<Result<Vec<_>, Error>>()?,
),
})
}
}
impl<T: Materialization> From<GeneralViewMetadata<T>> for ViewMetadataV1<T> {
fn from(value: GeneralViewMetadata<T>) -> Self {
ViewMetadataV1 {
view_uuid: value.view_uuid,
format_version: VersionNumber::<1>,
location: value.location,
current_version_id: value.current_version_id,
versions: value.versions.into_values().collect(),
version_log: value.version_log,
properties: if value.properties.is_empty() {
None
} else {
Some(value.properties)
},
schemas: value.schemas.into_values().map(Into::into).collect(),
}
}
}
}
#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone)]
#[repr(u8)]
#[derive(Default)]
pub enum FormatVersion {
#[default]
V1 = b'1',
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder, Getters)]
#[serde(rename_all = "kebab-case")]
pub struct Version<T: Materialization> {
#[builder(default = "DEFAULT_VERSION_ID")]
pub version_id: i64,
#[builder(default = "DEFAULT_SCHEMA_ID")]
pub schema_id: i32,
#[builder(
default = "SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_micros() as i64"
)]
pub timestamp_ms: i64,
#[builder(default)]
pub summary: Summary,
#[builder(setter(each(name = "with_representation")), default)]
pub representations: Vec<ViewRepresentation>,
#[builder(setter(strip_option), default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub default_catalog: Option<String>,
#[builder(setter(strip_option), default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub default_namespace: Option<Vec<String>>,
#[builder(default)]
#[serde(skip_serializing_if = "Materialization::is_none")]
pub storage_table: T,
#[builder(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub lineage: Option<Lineage>,
}
pub trait Materialization: Clone + Default {
fn is_none(&self) -> bool;
}
impl Materialization for Option<()> {
fn is_none(&self) -> bool {
true
}
}
impl Materialization for FullIdentifier {
fn is_none(&self) -> bool {
false
}
}
impl<T: Materialization> Version<T> {
pub fn builder() -> VersionBuilder<T> {
VersionBuilder::default()
}
}
impl<T: Materialization + Serialize> fmt::Display for Version<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}",
&serde_json::to_string(self).map_err(|_| fmt::Error::default())?,
)
}
}
impl<T: Materialization + for<'de> Deserialize<'de>> str::FromStr for Version<T> {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
serde_json::from_str(s).map_err(Error::from)
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "kebab-case")]
pub struct VersionLogStruct {
pub timestamp_ms: i64,
pub version_id: i64,
}
#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Default)]
pub enum Operation {
#[default]
Create,
Replace,
}
impl Serialize for Operation {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use Operation::*;
match self {
Create => serializer.serialize_str("create"),
Replace => serializer.serialize_str("replace"),
}
}
}
impl<'de> Deserialize<'de> for Operation {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
if s == "create" {
Ok(Operation::Create)
} else if s == "replace" {
Ok(Operation::Replace)
} else {
Err(serde::de::Error::custom("Invalid view operation."))
}
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)]
#[serde(rename_all = "kebab-case")]
pub struct Summary {
pub operation: Operation,
#[serde(skip_serializing_if = "Option::is_none")]
pub engine_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub engine_version: Option<String>,
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "kebab-case", tag = "type")]
pub enum ViewRepresentation {
#[serde(rename = "sql")]
Sql {
sql: String,
dialect: String,
},
}
impl ViewRepresentation {
pub fn sql(sql: &str, dialect: Option<&str>) -> Self {
ViewRepresentation::Sql {
sql: sql.to_owned(),
dialect: dialect.unwrap_or("ansi").to_owned(),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default, Getters)]
pub struct FullIdentifier {
catalog: String,
namespace: Vec<String>,
name: String,
#[serde(skip_serializing_if = "Option::is_none")]
r#ref: Option<String>,
}
impl FullIdentifier {
pub fn new(catalog: &str, namespace: &[String], name: &str, r#ref: Option<&str>) -> Self {
Self {
catalog: catalog.to_string(),
namespace: namespace.iter().map(ToOwned::to_owned).collect(),
name: name.to_string(),
r#ref: r#ref.map(ToString::to_string),
}
}
pub fn parse(
input: &str,
default_namespace: Option<&[String]>,
default_catalog: Option<&str>,
) -> Result<Self, Error> {
let mut parts = input.split(".").collect::<Vec<_>>().into_iter().rev();
let table_name = parts.next().ok_or(Error::InvalidFormat(format!(
"Identifier {:?} is empty",
input
)))?;
let namespace_name = parts.next();
let mut parts = parts.rev();
let catalog_name = parts
.next()
.or(default_catalog)
.ok_or(Error::InvalidFormat(format!(
"Identifier {:?} is empty",
input
)))?;
let namespace = if let Some(namespace_name) = namespace_name {
parts
.chain(once(namespace_name))
.map(ToOwned::to_owned)
.collect()
} else {
default_namespace
.ok_or(Error::NotFound(
"Default".to_owned(),
"namespace".to_owned(),
))?
.iter()
.map(ToOwned::to_owned)
.collect()
};
Ok(FullIdentifier {
catalog: catalog_name.to_owned(),
namespace,
name: table_name.to_owned(),
r#ref: None,
})
}
}
impl Display for FullIdentifier {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}",
self.catalog.to_owned() + "." + &self.namespace.join(".") + "." + &self.name
)
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default, Getters)]
#[serde(rename_all = "kebab-case")]
pub struct SourceTable {
identifier: FullIdentifier,
sequence_id: i64,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(from = "Vec<SourceTable>", into = "Vec<SourceTable>")]
pub struct Lineage(HashMap<FullIdentifier, i64>);
impl Lineage {
pub fn new() -> Self {
Lineage(HashMap::new())
}
pub fn from_iter<T: IntoIterator<Item = (FullIdentifier, i64)>>(iter: T) -> Self {
Lineage(HashMap::from_iter(iter))
}
}
impl From<Vec<SourceTable>> for Lineage {
fn from(value: Vec<SourceTable>) -> Self {
Lineage(HashMap::from_iter(
value.into_iter().map(|x| (x.identifier, x.sequence_id)),
))
}
}
impl From<Lineage> for Vec<SourceTable> {
fn from(value: Lineage) -> Self {
value
.0
.into_iter()
.map(|(identifier, sequence_id)| SourceTable {
identifier,
sequence_id,
})
.collect()
}
}
impl Deref for Lineage {
type Target = HashMap<FullIdentifier, i64>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for Lineage {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[cfg(test)]
mod tests {
use crate::{error::Error, spec::view_metadata::ViewMetadata};
#[test]
fn test_deserialize_view_data_v1() -> Result<(), Error> {
let data = r#"
{
"view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
"format-version" : 1,
"location" : "s3://bucket/warehouse/default.db/event_agg",
"current-version-id" : 1,
"properties" : {
"comment" : "Daily event counts"
},
"versions" : [ {
"version-id" : 1,
"timestamp-ms" : 1573518431292,
"schema-id" : 1,
"default-catalog" : "prod",
"default-namespace" : [ "default" ],
"summary" : {
"operation" : "create",
"engine-name" : "Spark",
"engineVersion" : "3.3.2"
},
"representations" : [ {
"type" : "sql",
"sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
"dialect" : "spark"
} ]
} ],
"schemas": [ {
"schema-id": 1,
"type" : "struct",
"fields" : [ {
"id" : 1,
"name" : "event_count",
"required" : false,
"type" : "int",
"doc" : "Count of events"
}, {
"id" : 2,
"name" : "event_date",
"required" : false,
"type" : "date"
} ]
} ],
"version-log" : [ {
"timestamp-ms" : 1573518431292,
"version-id" : 1
} ]
}
"#;
let metadata =
serde_json::from_str::<ViewMetadata>(data).expect("Failed to deserialize json");
let metadata_two: ViewMetadata = serde_json::from_str(
&serde_json::to_string(&metadata).expect("Failed to serialize metadata"),
)
.expect("Failed to serialize json");
assert_eq!(metadata, metadata_two);
Ok(())
}
}