use serde::{de::Error as _, Deserialize, Serialize};
#[cfg(test)]
use serde_json::json;
use std::{
collections::{HashMap, HashSet},
fmt,
};
use crate::{common::*, drivers::dbcrossbar_schema::external_schema::ExternalSchema};
#[derive(Clone, Debug, Eq, PartialEq)]
#[non_exhaustive]
pub struct Schema {
pub(crate) named_data_types: HashMap<String, NamedDataType>,
pub(crate) table: Table,
}
impl Schema {
fn validate(&self) -> Result<()> {
for ndt in self.named_data_types.values() {
ndt.data_type.validate(self)?;
}
for col in &self.table.columns {
col.data_type.validate(self)?;
}
Ok(())
}
pub(crate) fn from_types_and_table(
types: Vec<NamedDataType>,
table: Table,
) -> Result<Schema> {
let named_data_types = types
.into_iter()
.map(|ty| (ty.name.clone(), ty))
.collect::<HashMap<_, _>>();
let schema = Schema {
named_data_types,
table,
};
schema.validate()?;
Ok(schema)
}
pub(crate) fn from_table(table: Table) -> Result<Schema> {
let schema = Schema {
named_data_types: HashMap::new(),
table,
};
schema.validate()?;
Ok(schema)
}
pub(crate) fn data_type_for_name(&self, name: &str) -> &DataType {
if let Some(named_data_type) = self.named_data_types.get(name) {
&named_data_type.data_type
} else {
panic!(
"data type {:?} is not defined, and this wasn't caught by `validate`",
name,
);
}
}
#[cfg(test)]
pub(crate) fn dummy_test_schema() -> Schema {
Schema {
named_data_types: HashMap::new(),
table: Table {
name: "placeholder".to_owned(),
columns: vec![],
},
}
}
}
impl<'de> Deserialize<'de> for Schema {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let external = ExternalSchema::deserialize(deserializer)?;
external.into_schema().map_err(|err| {
D::Error::custom(format!("error validating schema: {}", err))
})
}
}
impl Serialize for Schema {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let external = ExternalSchema::from_schema(self.to_owned());
external.serialize(serializer)
}
}
#[test]
fn rejects_undefined_type_names() {
let json = r#"
{
"named_data_types": [],
"table": {
"name": "example",
"columns": [
{ "name": "i", "is_nullable": false, "data_type": { "named": "color" }}
]
}
}
"#;
assert!(serde_json::from_str::<Schema>(json).is_err());
}
#[test]
fn accepts_defined_type_names() {
let json = r#"
{
"named_data_types": [{
"name": "color",
"data_type": { "one_of": ["red", "green", "blue"] }
}],
"tables": [{
"name": "example",
"columns": [
{ "name": "i", "is_nullable": false, "data_type": { "named": "color" }}
]
}]
}
"#;
let schema = serde_json::from_str::<Schema>(json).expect("could not parse schema");
let mut expected_named_data_types = HashMap::new();
expected_named_data_types.insert(
"color".to_owned(),
NamedDataType {
name: "color".to_owned(),
data_type: DataType::OneOf(vec![
"red".to_owned(),
"green".to_owned(),
"blue".to_owned(),
]),
},
);
assert_eq!(
schema,
Schema {
named_data_types: expected_named_data_types,
table: Table {
name: "example".to_owned(),
columns: vec![Column {
name: "i".to_owned(),
is_nullable: false,
data_type: DataType::Named("color".to_owned()),
comment: None,
}],
}
}
)
}
#[test]
fn rejects_recursive_named_types() {
let json = r#"
{
"named_data_types": [{
"name": "colors",
"data_type": { "array": { "named": "colors" } }
}],
"table": {
"name": "example",
"columns": [
{ "name": "i", "is_nullable": false, "data_type": { "named": "colors" }}
]
}
}
"#;
assert!(serde_json::from_str::<Schema>(json).is_err());
}
#[test]
fn round_trip_serialization() {
let mut named_data_types = HashMap::new();
named_data_types.insert(
"color".to_owned(),
NamedDataType {
name: "color".to_owned(),
data_type: DataType::OneOf(vec![
"red".to_owned(),
"green".to_owned(),
"blue".to_owned(),
]),
},
);
let schema = Schema {
named_data_types,
table: Table {
name: "example".to_owned(),
columns: vec![Column {
name: "i".to_owned(),
is_nullable: false,
data_type: DataType::Named("color".to_owned()),
comment: None,
}],
},
};
let json = serde_json::to_string(&schema).expect("could not serialize schema");
let parsed =
serde_json::from_str::<Schema>(&json).expect("could not parse schema");
assert_eq!(parsed, schema);
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(deny_unknown_fields)]
pub struct NamedDataType {
pub(crate) name: String,
pub(crate) data_type: DataType,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(deny_unknown_fields)]
pub struct Table {
pub name: String,
pub columns: Vec<Column>,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(deny_unknown_fields)]
pub struct Column {
pub name: String,
pub is_nullable: bool,
pub data_type: DataType,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub comment: Option<String>,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum DataType {
Array(Box<DataType>),
Bool,
Date,
Decimal,
Float32,
Float64,
GeoJson(Srid),
Int16,
Int32,
Int64,
Json,
Named(String),
OneOf(Vec<String>),
Struct(Vec<StructField>),
Text,
TimestampWithoutTimeZone,
TimestampWithTimeZone,
Uuid,
}
impl DataType {
fn validate(&self, schema: &Schema) -> Result<()> {
let mut seen = HashSet::new();
self.validate_recursive(schema, &mut seen)?;
Ok(())
}
fn validate_recursive(
&self,
schema: &Schema,
seen: &mut HashSet<String>,
) -> Result<()> {
match self {
DataType::Bool
| DataType::Date
| DataType::Decimal
| DataType::Float32
| DataType::Float64
| DataType::GeoJson(_)
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Json
| DataType::OneOf(_)
| DataType::Text
| DataType::TimestampWithoutTimeZone
| DataType::TimestampWithTimeZone
| DataType::Uuid => Ok(()),
DataType::Array(ty) => ty.validate_recursive(schema, seen),
DataType::Named(name) => {
if let Some(named_data_type) = schema.named_data_types.get(name) {
debug_assert_eq!(name, &named_data_type.name);
if !seen.insert(name.to_owned()) {
return Err(format_err!("the named type {:?} refers to itself recursively, which is not supported", name));
}
named_data_type.data_type.validate_recursive(schema, seen)?;
seen.remove(name);
Ok(())
} else {
Err(format_err!(
"named data type {:?} is not defined anywhere",
name
))
}
}
DataType::Struct(fields) => {
for field in fields {
field.data_type.validate_recursive(schema, seen)?;
}
Ok(())
}
}
}
pub(crate) fn serializes_as_json_for_csv(&self, schema: &Schema) -> bool {
match self {
DataType::Array(_)
| DataType::GeoJson(_)
| DataType::Json
| DataType::Struct(_) => true,
DataType::Bool
| DataType::Date
| DataType::Decimal
| DataType::Float32
| DataType::Float64
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::OneOf(_)
| DataType::Text
| DataType::TimestampWithoutTimeZone
| DataType::TimestampWithTimeZone
| DataType::Uuid => false,
DataType::Named(name) => {
let dt = schema.data_type_for_name(name);
dt.serializes_as_json_for_csv(schema)
}
}
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(deny_unknown_fields)]
pub struct StructField {
pub name: String,
pub is_nullable: bool,
pub data_type: DataType,
}
#[test]
fn data_type_serialization_examples() {
let examples = &[
(
DataType::Array(Box::new(DataType::Text)),
json!({"array":"text"}),
),
(DataType::Bool, json!("bool")),
(DataType::Date, json!("date")),
(DataType::Decimal, json!("decimal")),
(DataType::Float32, json!("float32")),
(DataType::Float64, json!("float64")),
(DataType::Int16, json!("int16")),
(DataType::Int32, json!("int32")),
(DataType::Int64, json!("int64")),
(DataType::Json, json!("json")),
(
DataType::Named("name".to_owned()),
json!({ "named": "name" }),
),
(
DataType::OneOf(vec!["a".to_owned()]),
json!({ "one_of": ["a"] }),
),
(
DataType::Struct(vec![StructField {
name: "x".to_owned(),
is_nullable: false,
data_type: DataType::Float32,
}]),
json!({ "struct": [
{ "name": "x", "is_nullable": false, "data_type": "float32" },
] }),
),
(DataType::Text, json!("text")),
(
DataType::TimestampWithoutTimeZone,
json!("timestamp_without_time_zone"),
),
(
DataType::TimestampWithTimeZone,
json!("timestamp_with_time_zone"),
),
(DataType::Uuid, json!("uuid")),
];
for (data_type, serialized) in examples {
assert_eq!(&json!(data_type), serialized);
}
}
#[test]
fn parse_schema_from_manual() {
serde_json::from_str::<Schema>(include_str!(
"../../dbcrossbar/fixtures/dbcrossbar_schema.json"
))
.unwrap();
}
#[test]
fn data_type_roundtrip() {
let data_types = vec![
DataType::Array(Box::new(DataType::Text)),
DataType::Bool,
DataType::Date,
DataType::Decimal,
DataType::Float32,
DataType::Float64,
DataType::Int16,
DataType::Int32,
DataType::Int64,
DataType::Json,
DataType::Named("name".to_owned()),
DataType::OneOf(vec!["a".to_owned()]),
DataType::Struct(vec![StructField {
name: "x".to_owned(),
is_nullable: false,
data_type: DataType::Float32,
}]),
DataType::Text,
DataType::TimestampWithoutTimeZone,
DataType::TimestampWithTimeZone,
DataType::Uuid,
];
for data_type in &data_types {
let serialized = serde_json::to_string(data_type).unwrap();
println!("{:?}: {}", data_type, serialized);
let parsed: DataType = serde_json::from_str(&serialized).unwrap();
assert_eq!(&parsed, data_type);
}
}
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(transparent)]
pub struct Srid(u32);
impl Srid {
pub fn wgs84() -> Srid {
Srid(4326)
}
pub fn new(srid: u32) -> Srid {
Srid(srid)
}
pub fn to_u32(self) -> u32 {
self.0
}
}
impl Default for Srid {
fn default() -> Self {
Self::wgs84()
}
}
impl fmt::Display for Srid {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.0.fmt(f)
}
}