use crate::spec::core_options::CoreOptions;
use crate::spec::types::{ArrayType, DataType, MapType, MultisetType, RowType};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::collections::{HashMap, HashSet};
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct TableSchema {
version: i32,
id: i64,
fields: Vec<DataField>,
highest_field_id: i32,
partition_keys: Vec<String>,
primary_keys: Vec<String>,
options: HashMap<String, String>,
comment: Option<String>,
time_millis: i64,
}
impl TableSchema {
pub const CURRENT_VERSION: i32 = 3;
pub fn new(id: i64, schema: &Schema) -> Self {
let fields = schema.fields().to_vec();
let highest_field_id = Self::current_highest_field_id(&fields);
Self {
version: Self::CURRENT_VERSION,
id,
fields,
highest_field_id,
partition_keys: schema.partition_keys().to_vec(),
primary_keys: schema.primary_keys().to_vec(),
options: schema.options().clone(),
comment: schema.comment().map(|s| s.to_string()),
time_millis: chrono::Utc::now().timestamp_millis(),
}
}
pub fn current_highest_field_id(fields: &[DataField]) -> i32 {
fields.iter().map(|f| f.id()).max().unwrap_or(-1)
}
pub fn version(&self) -> i32 {
self.version
}
pub fn id(&self) -> i64 {
self.id
}
pub fn fields(&self) -> &[DataField] {
&self.fields
}
pub fn highest_field_id(&self) -> i32 {
self.highest_field_id
}
pub fn partition_keys(&self) -> &[String] {
&self.partition_keys
}
pub fn partition_fields(&self) -> Vec<DataField> {
self.partition_keys
.iter()
.filter_map(|key| self.fields.iter().find(|f| f.name() == key).cloned())
.collect()
}
pub fn primary_keys(&self) -> &[String] {
&self.primary_keys
}
pub fn options(&self) -> &HashMap<String, String> {
&self.options
}
pub fn copy_with_options(&self, extra: HashMap<String, String>) -> Self {
let mut new_schema = self.clone();
new_schema.options.extend(extra);
new_schema
}
pub fn comment(&self) -> Option<&str> {
self.comment.as_deref()
}
pub fn time_millis(&self) -> i64 {
self.time_millis
}
pub fn bucket_keys(&self) -> Vec<String> {
let core_options = CoreOptions::new(&self.options);
if let Some(keys) = core_options.bucket_key() {
return keys;
}
if !self.primary_keys.is_empty() {
return self.primary_keys.clone();
}
let partition_set: HashSet<&str> = self.partition_keys.iter().map(String::as_str).collect();
self.fields
.iter()
.filter(|f| !partition_set.contains(f.name()))
.map(|f| f.name().to_string())
.collect()
}
}
pub const ROW_ID_FIELD_NAME: &str = "_ROW_ID";
pub const ROW_ID_FIELD_ID: i32 = i32::MAX - 5;
#[serde_as]
#[derive(Debug, Clone, PartialEq, Hash, Eq, Deserialize, Serialize)]
pub struct DataField {
id: i32,
name: String,
#[serde(rename = "type")]
typ: DataType,
#[serde(skip_serializing_if = "Option::is_none")]
description: Option<String>,
}
impl DataField {
pub fn new(id: i32, name: String, typ: DataType) -> Self {
Self {
id,
name,
typ,
description: None,
}
}
pub fn id(&self) -> i32 {
self.id
}
pub fn name(&self) -> &str {
&self.name
}
pub fn data_type(&self) -> &DataType {
&self.typ
}
pub fn description(&self) -> Option<&str> {
self.description.as_deref()
}
pub fn with_id(mut self, new_id: i32) -> Self {
self.id = new_id;
self
}
pub fn with_name(mut self, new_name: String) -> Self {
self.name = new_name;
self
}
pub fn with_description(mut self, new_description: Option<String>) -> Self {
self.description = new_description;
self
}
}
pub fn escape_identifier(identifier: &str) -> String {
identifier.replace('"', "\"\"")
}
pub fn escape_single_quotes(text: &str) -> String {
text.replace('\'', "''")
}
pub const PRIMARY_KEY_OPTION: &str = "primary-key";
pub const PARTITION_OPTION: &str = "partition";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Schema {
fields: Vec<DataField>,
partition_keys: Vec<String>,
primary_keys: Vec<String>,
options: HashMap<String, String>,
comment: Option<String>,
}
impl Schema {
fn new(
fields: Vec<DataField>,
partition_keys: Vec<String>,
primary_keys: Vec<String>,
mut options: HashMap<String, String>,
comment: Option<String>,
) -> crate::Result<Self> {
let primary_keys = Self::normalize_primary_keys(&primary_keys, &mut options)?;
let partition_keys = Self::normalize_partition_keys(&partition_keys, &mut options)?;
let fields = Self::normalize_fields(&fields, &partition_keys, &primary_keys)?;
Ok(Self {
fields,
partition_keys,
primary_keys,
options,
comment,
})
}
fn normalize_primary_keys(
primary_keys: &[String],
options: &mut HashMap<String, String>,
) -> crate::Result<Vec<String>> {
if let Some(pk) = options.remove(PRIMARY_KEY_OPTION) {
if !primary_keys.is_empty() {
return Err(crate::Error::ConfigInvalid {
message: "Cannot define primary key on DDL and table options at the same time."
.to_string(),
});
}
return Ok(pk
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect());
}
Ok(primary_keys.to_vec())
}
fn normalize_partition_keys(
partition_keys: &[String],
options: &mut HashMap<String, String>,
) -> crate::Result<Vec<String>> {
if let Some(part) = options.remove(PARTITION_OPTION) {
if !partition_keys.is_empty() {
return Err(crate::Error::ConfigInvalid {
message: "Cannot define partition on DDL and table options at the same time."
.to_string(),
});
}
return Ok(part
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect());
}
Ok(partition_keys.to_vec())
}
fn normalize_fields(
fields: &[DataField],
partition_keys: &[String],
primary_keys: &[String],
) -> crate::Result<Vec<DataField>> {
let field_names: Vec<String> = fields.iter().map(|f| f.name().to_string()).collect();
Self::validate_no_duplicate_fields(&field_names)?;
Self::validate_partition_keys(&field_names, partition_keys)?;
Self::validate_primary_keys(&field_names, primary_keys)?;
if primary_keys.is_empty() {
return Ok(fields.to_vec());
}
let pk_set: HashSet<&str> = primary_keys.iter().map(String::as_str).collect();
let mut new_fields = Vec::with_capacity(fields.len());
for f in fields {
if pk_set.contains(f.name()) && f.data_type().is_nullable() {
new_fields.push(
DataField::new(
f.id(),
f.name().to_string(),
f.data_type().copy_with_nullable(false)?,
)
.with_description(f.description().map(|s| s.to_string())),
);
} else {
new_fields.push(f.clone());
}
}
Ok(new_fields)
}
fn validate_no_duplicate_fields(field_names: &[String]) -> crate::Result<()> {
let duplicates = Self::duplicate_fields(field_names);
if duplicates.is_empty() {
Ok(())
} else {
Err(crate::Error::ConfigInvalid {
message: format!(
"Table column {field_names:?} must not contain duplicate fields. Found: {duplicates:?}"
),
})
}
}
fn validate_partition_keys(
field_names: &[String],
partition_keys: &[String],
) -> crate::Result<()> {
let all_names: HashSet<&str> = field_names.iter().map(String::as_str).collect();
let duplicates = Self::duplicate_fields(partition_keys);
if !duplicates.is_empty() {
return Err(crate::Error::ConfigInvalid {
message: format!(
"Partition key constraint {partition_keys:?} must not contain duplicate columns. Found: {duplicates:?}"
),
});
}
if !partition_keys
.iter()
.all(|k| all_names.contains(k.as_str()))
{
return Err(crate::Error::ConfigInvalid {
message: format!(
"Table column {field_names:?} should include all partition fields {partition_keys:?}"
),
});
}
Ok(())
}
fn validate_primary_keys(field_names: &[String], primary_keys: &[String]) -> crate::Result<()> {
if primary_keys.is_empty() {
return Ok(());
}
let all_names: HashSet<&str> = field_names.iter().map(String::as_str).collect();
let duplicates = Self::duplicate_fields(primary_keys);
if !duplicates.is_empty() {
return Err(crate::Error::ConfigInvalid {
message: format!(
"Primary key constraint {primary_keys:?} must not contain duplicate columns. Found: {duplicates:?}"
),
});
}
if !primary_keys.iter().all(|k| all_names.contains(k.as_str())) {
return Err(crate::Error::ConfigInvalid {
message: format!(
"Table column {field_names:?} should include all primary key constraint {primary_keys:?}"
),
});
}
Ok(())
}
pub fn duplicate_fields(names: &[String]) -> HashSet<String> {
let mut seen = HashMap::new();
for n in names {
*seen.entry(n.clone()).or_insert(0) += 1;
}
seen.into_iter()
.filter(|(_, count)| *count > 1)
.map(|(name, _)| name)
.collect()
}
pub fn row_type(&self) -> RowType {
RowType::with_nullable(false, self.fields.clone())
}
pub fn fields(&self) -> &[DataField] {
&self.fields
}
pub fn partition_keys(&self) -> &[String] {
&self.partition_keys
}
pub fn primary_keys(&self) -> &[String] {
&self.primary_keys
}
pub fn options(&self) -> &HashMap<String, String> {
&self.options
}
pub fn comment(&self) -> Option<&str> {
self.comment.as_deref()
}
pub fn copy(&self, row_type: RowType) -> crate::Result<Self> {
Self::new(
row_type.fields().to_vec(),
self.partition_keys.clone(),
self.primary_keys.clone(),
self.options.clone(),
self.comment.clone(),
)
}
pub fn builder() -> SchemaBuilder {
SchemaBuilder::new()
}
}
pub struct SchemaBuilder {
columns: Vec<DataField>,
partition_keys: Vec<String>,
primary_keys: Vec<String>,
options: HashMap<String, String>,
comment: Option<String>,
next_field_id: i32,
}
impl SchemaBuilder {
pub fn new() -> Self {
Self {
columns: Vec::new(),
partition_keys: Vec::new(),
primary_keys: Vec::new(),
options: HashMap::new(),
comment: None,
next_field_id: 0,
}
}
pub fn column(self, column_name: impl Into<String>, data_type: DataType) -> Self {
self.column_with_description(column_name, data_type, None)
}
pub fn column_with_description(
mut self,
column_name: impl Into<String>,
data_type: DataType,
description: Option<String>,
) -> Self {
let name = column_name.into();
let id = self.next_field_id;
self.next_field_id += 1;
let data_type = Self::assign_nested_field_ids(data_type, &mut self.next_field_id);
self.columns
.push(DataField::new(id, name, data_type).with_description(description));
self
}
pub fn partition_keys(mut self, names: impl IntoIterator<Item = impl Into<String>>) -> Self {
self.partition_keys = names.into_iter().map(Into::into).collect();
self
}
pub fn primary_key(mut self, names: impl IntoIterator<Item = impl Into<String>>) -> Self {
self.primary_keys = names.into_iter().map(Into::into).collect();
self
}
pub fn options(mut self, opts: impl IntoIterator<Item = (String, String)>) -> Self {
self.options.extend(opts);
self
}
pub fn option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.options.insert(key.into(), value.into());
self
}
pub fn comment(mut self, comment: Option<String>) -> Self {
self.comment = comment;
self
}
pub fn build(self) -> crate::Result<Schema> {
Schema::new(
self.columns,
self.partition_keys,
self.primary_keys,
self.options,
self.comment,
)
}
fn assign_nested_field_ids(data_type: DataType, next_id: &mut i32) -> DataType {
let nullable = data_type.is_nullable();
match data_type {
DataType::Row(row) => {
let fields = row
.fields()
.iter()
.map(|f| {
let id = *next_id;
*next_id += 1;
let typ = Self::assign_nested_field_ids(f.data_type().clone(), next_id);
DataField::new(id, f.name().to_string(), typ)
})
.collect();
DataType::Row(RowType::with_nullable(nullable, fields))
}
DataType::Array(arr) => {
let element = Self::assign_nested_field_ids(arr.element_type().clone(), next_id);
DataType::Array(ArrayType::with_nullable(nullable, element))
}
DataType::Map(map) => {
let key = Self::assign_nested_field_ids(map.key_type().clone(), next_id);
let value = Self::assign_nested_field_ids(map.value_type().clone(), next_id);
DataType::Map(MapType::with_nullable(nullable, key, value))
}
DataType::Multiset(ms) => {
let element = Self::assign_nested_field_ids(ms.element_type().clone(), next_id);
DataType::Multiset(MultisetType::with_nullable(nullable, element))
}
other => other,
}
}
}
impl Default for SchemaBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use crate::spec::IntType;
use super::*;
#[test]
fn test_create_data_field() {
let id = 1;
let name = "field1".to_string();
let typ = DataType::Int(IntType::new());
let description = "test description".to_string();
let data_field = DataField::new(id, name.clone(), typ.clone())
.with_description(Some(description.clone()));
assert_eq!(data_field.id(), id);
assert_eq!(data_field.name(), name);
assert_eq!(data_field.data_type(), &typ);
assert_eq!(data_field.description(), Some(description).as_deref());
}
#[test]
fn test_new_id() {
let d_type = DataType::Int(IntType::new());
let new_data_field = DataField::new(1, "field1".to_string(), d_type.clone()).with_id(2);
assert_eq!(new_data_field.id(), 2);
assert_eq!(new_data_field.name(), "field1");
assert_eq!(new_data_field.data_type(), &d_type);
assert_eq!(new_data_field.description(), None);
}
#[test]
fn test_new_name() {
let d_type = DataType::Int(IntType::new());
let new_data_field =
DataField::new(1, "field1".to_string(), d_type.clone()).with_name("field2".to_string());
assert_eq!(new_data_field.id(), 1);
assert_eq!(new_data_field.name(), "field2");
assert_eq!(new_data_field.data_type(), &d_type);
assert_eq!(new_data_field.description(), None);
}
#[test]
fn test_new_description() {
let d_type = DataType::Int(IntType::new());
let new_data_field = DataField::new(1, "field1".to_string(), d_type.clone())
.with_description(Some("new description".to_string()));
assert_eq!(new_data_field.id(), 1);
assert_eq!(new_data_field.name(), "field1");
assert_eq!(new_data_field.data_type(), &d_type);
assert_eq!(new_data_field.description(), Some("new description"));
}
#[test]
fn test_escape_identifier() {
let escaped_identifier = escape_identifier("\"identifier\"");
assert_eq!(escaped_identifier, "\"\"identifier\"\"");
}
#[test]
fn test_escape_single_quotes() {
let escaped_text = escape_single_quotes("text with 'single' quotes");
assert_eq!(escaped_text, "text with ''single'' quotes");
}
#[test]
fn test_schema_builder_build() {
let schema = Schema::builder()
.column("id", DataType::Int(IntType::with_nullable(true)))
.column("name", DataType::Int(IntType::new()))
.primary_key(["id"])
.option("k", "v")
.comment(Some("table comment".into()))
.build()
.unwrap();
assert_eq!(schema.fields().len(), 2);
assert_eq!(schema.primary_keys(), &["id"]);
assert_eq!(schema.options().get("k"), Some(&"v".to_string()));
assert_eq!(schema.comment(), Some("table comment"));
let id_field = schema.fields().iter().find(|f| f.name() == "id").unwrap();
assert!(
!id_field.data_type().is_nullable(),
"primary key column should be normalized to NOT NULL"
);
}
#[test]
fn test_schema_validation() {
let res = Schema::builder()
.column("a", DataType::Int(IntType::new()))
.column("b", DataType::Int(IntType::new()))
.column("a", DataType::Int(IntType::new()))
.build();
assert!(res.is_err(), "duplicate field names should be rejected");
let res = Schema::builder()
.column("a", DataType::Int(IntType::new()))
.column("b", DataType::Int(IntType::new()))
.partition_keys(["a", "a"])
.build();
assert!(res.is_err(), "duplicate partition keys should be rejected");
let res = Schema::builder()
.column("a", DataType::Int(IntType::new()))
.column("b", DataType::Int(IntType::new()))
.partition_keys(["c"])
.build();
assert!(
res.is_err(),
"partition key not in columns should be rejected"
);
let res = Schema::builder()
.column("a", DataType::Int(IntType::with_nullable(false)))
.column("b", DataType::Int(IntType::new()))
.primary_key(["a", "a"])
.build();
assert!(res.is_err(), "duplicate primary keys should be rejected");
let res = Schema::builder()
.column("a", DataType::Int(IntType::with_nullable(false)))
.column("b", DataType::Int(IntType::new()))
.primary_key(["c"])
.build();
assert!(
res.is_err(),
"primary key not in columns should be rejected"
);
let res = Schema::builder()
.column("a", DataType::Int(IntType::with_nullable(false)))
.column("b", DataType::Int(IntType::new()))
.primary_key(["a"])
.option(PRIMARY_KEY_OPTION, "a")
.build();
assert!(
res.is_err(),
"primary key defined in both DDL and options should be rejected"
);
let res = Schema::builder()
.column("a", DataType::Int(IntType::new()))
.column("b", DataType::Int(IntType::new()))
.partition_keys(["a"])
.option(PARTITION_OPTION, "a")
.build();
assert!(
res.is_err(),
"partition defined in both DDL and options should be rejected"
);
let schema = Schema::builder()
.column("a", DataType::Int(IntType::with_nullable(false)))
.column("b", DataType::Int(IntType::new()))
.column("c", DataType::Int(IntType::new()))
.partition_keys(["a"])
.primary_key(["a", "b"])
.build()
.unwrap();
assert_eq!(schema.partition_keys(), &["a"]);
assert_eq!(schema.primary_keys(), &["a", "b"]);
}
#[test]
fn test_schema_builder_column_row_type() {
let row_type = RowType::new(vec![DataField::new(
0,
"nested".into(),
DataType::Int(IntType::new()),
)]);
let schema = Schema::builder()
.column("id", DataType::Int(IntType::new()))
.column("payload", DataType::Row(row_type))
.build()
.unwrap();
assert_eq!(schema.fields().len(), 2);
assert_eq!(schema.fields()[0].id(), 0);
assert_eq!(schema.fields()[1].id(), 1);
if let DataType::Row(row) = schema.fields()[1].data_type() {
assert_eq!(row.fields().len(), 1);
assert_eq!(row.fields()[0].id(), 2);
assert_eq!(row.fields()[0].name(), "nested");
} else {
panic!("expected Row type");
}
}
}