use itertools::Itertools;
use serde::{Deserialize, Serialize};
use crate::error::Error;
use super::view_metadata::{GeneralViewMetadata, GeneralViewMetadataBuilder};
pub static STORAGE_TABLE: &str = "storage_table";
pub type MaterializedViewMetadata = GeneralViewMetadata<String>;
pub type MaterializedViewMetadataBuilder = GeneralViewMetadataBuilder<String>;
pub fn depends_on_tables_to_string(source_tables: &[SourceTable]) -> Result<String, Error> {
Ok(source_tables
.iter()
.map(|x| x.identifier.to_string() + "=" + &x.snapshot_id.to_string())
.join(","))
}
pub fn depends_on_tables_from_string(value: &str) -> Result<Vec<SourceTable>, Error> {
value
.split(',')
.map(|x| {
x.split('=')
.next_tuple()
.ok_or(Error::InvalidFormat("Lineage information".to_owned()))
.and_then(|(identifier, snapshot_id)| {
Ok(SourceTable {
identifier: identifier.to_owned(),
snapshot_id: snapshot_id.parse()?,
})
})
})
.collect::<Result<_, Error>>()
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "kebab-case")]
pub struct SourceTable {
pub identifier: String,
pub snapshot_id: i64,
}
#[cfg(test)]
mod tests {
use crate::{
error::Error,
spec::materialized_view_metadata::{
depends_on_tables_from_string, depends_on_tables_to_string, MaterializedViewMetadata,
SourceTable,
},
};
#[test]
fn test_deserialize_materialized_view_metadata_v1() -> Result<(), Error> {
let data = r#"
{
"view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
"format-version" : 1,
"location" : "s3://bucket/warehouse/default.db/event_agg",
"current-version-id" : 1,
"properties" : {
"comment" : "Daily event counts",
"storage_table": "iceberg.default.event_agg"
},
"versions" : [ {
"version-id" : 1,
"timestamp-ms" : 1573518431292,
"schema-id" : 1,
"default-catalog" : "prod",
"default-namespace" : [ "default" ],
"summary" : {
"operation" : "create",
"engine-name" : "Spark",
"engineVersion" : "3.3.2"
},
"representations" : [ {
"type" : "sql",
"sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
"dialect" : "spark"
} ]
} ],
"schemas": [ {
"schema-id": 1,
"type" : "struct",
"fields" : [ {
"id" : 1,
"name" : "event_count",
"required" : false,
"type" : "int",
"doc" : "Count of events"
}, {
"id" : 2,
"name" : "event_date",
"required" : false,
"type" : "date"
} ]
} ],
"version-log" : [ {
"timestamp-ms" : 1573518431292,
"version-id" : 1
} ]
}
"#;
let metadata = serde_json::from_str::<MaterializedViewMetadata>(data)
.expect("Failed to deserialize json");
let metadata_two: MaterializedViewMetadata = serde_json::from_str(
&serde_json::to_string(&metadata).expect("Failed to serialize metadata"),
)
.expect("Failed to serialize json");
assert_eq!(metadata, metadata_two);
Ok(())
}
#[test]
fn test_depends_on_tables_try_from_str() {
let input = "table1=1,table2=2";
let result = depends_on_tables_from_string(input).unwrap();
assert_eq!(
result,
vec![
SourceTable {
identifier: "table1".to_string(),
snapshot_id: 1
},
SourceTable {
identifier: "table2".to_string(),
snapshot_id: 2
}
]
);
}
#[test]
fn test_try_from_depends_on_tables_to_string() {
let depends_on_tables = vec![
SourceTable {
identifier: "table1".to_string(),
snapshot_id: 1,
},
SourceTable {
identifier: "table2".to_string(),
snapshot_id: 2,
},
];
let result = depends_on_tables_to_string(&depends_on_tables);
assert!(result.is_ok());
assert_eq!(result.unwrap(), "table1=1,table2=2");
}
}