use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, BTreeSet};
use crate::{FieldEntry, FieldType, IndexedFieldValues, Resource, SchemaError};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Schema {
idx: BTreeSet<usize>,
fields: BTreeMap<String, FieldEntry>,
version: u64,
}
impl Schema {
pub const ID_KEY: &str = "_id";
pub fn version(&self) -> u64 {
self.version
}
pub fn with_version(&mut self, version: u64) -> &mut Self {
self.version = version;
self
}
pub fn needs_upgrade(&self, other: &Schema) -> bool {
self.version > other.version
}
pub fn upgrade_with(&mut self, old: &Schema) -> Result<(), SchemaError> {
if !self.needs_upgrade(old) {
return Err(SchemaError::Schema(format!(
"new schema version {} must be greater than old version {}",
self.version, old.version
)));
}
let mut next_idx = old.idx.last().copied().unwrap_or(0) + 1;
for (name, field) in self.fields.iter() {
if let Some(old_field) = old.fields.get(name) {
if field.r#type() != old_field.r#type() {
return Err(SchemaError::Schema(format!(
"field {name:?} type changed from {:?} to {:?}, type changes are not allowed",
old_field.r#type(),
field.r#type()
)));
}
} else {
if field.required() {
return Err(SchemaError::Schema(format!(
"new field {name:?} must be optional when upgrading schema"
)));
}
if next_idx > u16::MAX as usize {
return Err(SchemaError::Schema(
"Schema has reached the maximum number of fields".to_string(),
));
}
next_idx += 1;
}
}
let mut next_idx = old.idx.last().copied().unwrap_or(0) + 1;
for (name, field) in self.fields.iter_mut() {
if let Some(old_field) = old.fields.get(name) {
field.set_idx(old_field.idx());
} else {
field.set_idx(next_idx);
next_idx += 1;
}
}
self.idx = self.fields.values().map(|f| f.idx()).collect();
Ok(())
}
pub fn builder() -> SchemaBuilder {
SchemaBuilder::new()
}
pub fn len(&self) -> usize {
self.fields.len()
}
pub fn is_empty(&self) -> bool {
self.fields.is_empty()
}
pub fn get_field(&self, name: &str) -> Option<&FieldEntry> {
self.fields.get(name)
}
pub fn get_field_or_err(&self, name: &str) -> Result<&FieldEntry, SchemaError> {
self.fields
.get(name)
.ok_or_else(|| SchemaError::Validation(format!("field {name:?} not found in schema")))
}
pub fn iter(&self) -> impl Iterator<Item = &FieldEntry> {
self.fields.values()
}
pub fn validate(&self, values: &IndexedFieldValues) -> Result<(), SchemaError> {
for idx in values.keys() {
if !self.idx.contains(idx) {
return Err(SchemaError::Validation(format!(
"field index {idx:?} not found in schema"
)));
}
}
for field in self.fields.values() {
if let Some(value) = values.get(&field.idx()) {
field.validate(value)?;
} else if field.required() {
return Err(SchemaError::Validation(format!(
"field {:?} is required",
field.name()
)));
}
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize)]
struct SchemaRef<'a> {
fields: Vec<&'a FieldEntry>,
version: u64,
}
#[derive(Debug, Clone, Deserialize)]
struct SchemaOwned {
fields: Vec<FieldEntry>,
#[serde(default)]
version: u64,
}
impl Serialize for Schema {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let val = SchemaRef {
fields: self.fields.values().collect(),
version: self.version,
};
val.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for Schema {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let val = SchemaOwned::deserialize(deserializer)?;
let mut idx = BTreeSet::<usize>::new();
let mut fields = BTreeMap::<String, FieldEntry>::new();
for f in val.fields.into_iter() {
crate::validate_field_name(f.name()).map_err(serde::de::Error::custom)?;
if f.idx() > u16::MAX as usize {
return Err(serde::de::Error::custom(format!(
"field index {:?} exceeds u16::MAX",
f.idx()
)));
}
if !idx.insert(f.idx()) {
return Err(serde::de::Error::custom(format!(
"duplicate field index {:?}",
f.idx()
)));
}
let name = f.name().to_string();
if fields.insert(name.clone(), f).is_some() {
return Err(serde::de::Error::custom(format!(
"duplicate field name {name:?}"
)));
}
}
let id = fields.get(Schema::ID_KEY).ok_or_else(|| {
serde::de::Error::custom(format!(
"schema is missing required field {:?}",
Schema::ID_KEY
))
})?;
if id.idx() != 0 {
return Err(serde::de::Error::custom(format!(
"field {:?} must have index 0, got {:?}",
Schema::ID_KEY,
id.idx()
)));
}
if id.r#type() != &FieldType::U64 {
return Err(serde::de::Error::custom(format!(
"field {:?} must have type U64, got {:?}",
Schema::ID_KEY,
id.r#type()
)));
}
if !id.unique() {
return Err(serde::de::Error::custom(format!(
"field {:?} must be unique",
Schema::ID_KEY
)));
}
Ok(Schema {
idx,
fields,
version: val.version,
})
}
}
#[derive(Clone, Debug)]
pub struct SchemaBuilder {
idx: usize,
fields: BTreeMap<String, FieldEntry>,
version: u64,
}
impl Default for SchemaBuilder {
fn default() -> Self {
Self::new()
}
}
impl SchemaBuilder {
pub fn new() -> SchemaBuilder {
SchemaBuilder {
idx: 0,
version: 0,
fields: BTreeMap::from([(
Schema::ID_KEY.to_string(),
FieldEntry::new(Schema::ID_KEY.to_string(), FieldType::U64)
.unwrap()
.with_unique()
.with_idx(0)
.with_description(format!(
"{:?} is a u64 field, used as an internal unique identifier",
Schema::ID_KEY
)),
)]),
}
}
pub fn with_version(&mut self, version: u64) -> &mut Self {
self.version = version;
self
}
pub fn with_resource(&mut self, field: &str, required: bool) -> Result<&mut Self, SchemaError> {
let ft = Resource::field_type();
let ft = if required {
ft
} else {
FieldType::Option(Box::new(ft))
};
let entry = FieldEntry::new(field.to_string(), ft)?.with_description(format!(
"{field:?} is a field of type Resource, used to store resources"
));
self.add_field(entry)
}
pub fn add_field(&mut self, entry: FieldEntry) -> Result<&mut Self, SchemaError> {
if self.fields.contains_key(entry.name()) {
return Err(SchemaError::Schema(format!(
"Field {:?} already exists in schema",
entry.name()
)));
}
let next_idx = self.idx + 1;
if next_idx > u16::MAX as usize {
return Err(SchemaError::Schema(
"Schema has reached the maximum number of fields".to_string(),
));
}
self.idx = next_idx;
self.fields
.insert(entry.name().to_string(), entry.with_idx(self.idx));
Ok(self)
}
pub fn build(self) -> Result<Schema, SchemaError> {
const MAX_FIELDS: usize = u16::MAX as usize + 1;
if self.fields.len() > MAX_FIELDS {
return Err(SchemaError::Schema(
"Schema has reached the maximum number of fields".to_string(),
));
}
Ok(Schema {
idx: self.fields.values().map(|f| f.idx()).collect(),
fields: self.fields,
version: self.version,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Fe, Ft, Fv};
use serde_json::json;
#[test]
fn test_schema_builder() {
let mut builder = SchemaBuilder::new();
assert_eq!(builder.fields.len(), 1);
let id_field = Fe::new("_id".to_string(), Ft::U64).unwrap();
assert!(builder.add_field(id_field).is_err());
let name_field = Fe::new("name".to_string(), Ft::Text).unwrap();
assert!(builder.add_field(name_field).is_ok());
let age_field = Fe::new("age".to_string(), Ft::Option(Box::new(Ft::U64))).unwrap();
assert!(builder.add_field(age_field).is_ok());
let duplicate_field = Fe::new("name".to_string(), Ft::Text).unwrap();
assert!(builder.add_field(duplicate_field).is_err());
let schema = builder.build().unwrap();
assert_eq!(schema.len(), 3);
assert!(!schema.is_empty());
assert!(schema.idx.contains(&0)); assert!(schema.idx.contains(&1)); assert!(schema.idx.contains(&2));
let id_field = schema.get_field(Schema::ID_KEY).unwrap();
assert_eq!(id_field.name(), Schema::ID_KEY);
assert_eq!(id_field.idx(), 0);
assert!(id_field.required());
assert!(id_field.unique());
let name_field = schema.get_field("name").unwrap();
assert_eq!(name_field.name(), "name");
assert_eq!(name_field.idx(), 1);
assert!(name_field.required());
let age_field = schema.get_field("age").unwrap();
assert_eq!(age_field.name(), "age");
assert_eq!(age_field.idx(), 2);
assert!(!age_field.required());
assert!(schema.get_field("unknown").is_none());
}
#[test]
fn test_schema_validation() {
let mut builder = SchemaBuilder::new();
let name_field = Fe::new("name".to_string(), Ft::Text).unwrap();
builder.add_field(name_field).unwrap();
let age_field = Fe::new("age".to_string(), Ft::U64).unwrap();
builder.add_field(age_field).unwrap();
let schema = builder.build().unwrap();
let mut valid_values = IndexedFieldValues::new();
valid_values.insert(0, Fv::U64(99));
valid_values.insert(1, Fv::Text("John".to_string()));
valid_values.insert(2, Fv::U64(30));
assert!(schema.validate(&valid_values).is_ok());
valid_values.insert(0, Fv::I64(99));
assert!(schema.validate(&valid_values).is_err());
let mut missing_required = IndexedFieldValues::new();
missing_required.insert(0, Fv::Text("user1".to_string()));
missing_required.insert(1, Fv::Text("John".to_string()));
assert!(schema.validate(&missing_required).is_err());
let mut invalid_index = IndexedFieldValues::new();
invalid_index.insert(0, Fv::U64(99));
invalid_index.insert(1, Fv::Text("John".to_string()));
invalid_index.insert(2, Fv::U64(30));
invalid_index.insert(99, Fv::Text("Invalid".to_string())); assert!(schema.validate(&invalid_index).is_err());
let mut invalid_type = IndexedFieldValues::new();
invalid_type.insert(0, Fv::U64(99));
invalid_type.insert(1, Fv::Text("John".to_string()));
invalid_type.insert(2, Fv::Text("30".to_string())); assert!(schema.validate(&invalid_type).is_err());
}
#[test]
fn test_schema_builder_limits() {
let empty_builder = SchemaBuilder::new();
assert!(empty_builder.build().is_ok());
let mut builder = SchemaBuilder::new();
builder.idx = u16::MAX as usize - 1;
let test_field = Fe::new("test".to_string(), Ft::Text).unwrap();
assert!(builder.add_field(test_field).is_ok());
let overflow_field = Fe::new("overflow".to_string(), Ft::Text).unwrap();
assert!(builder.add_field(overflow_field).is_err());
assert_eq!(builder.idx, u16::MAX as usize);
assert!(!builder.fields.contains_key("overflow"));
}
#[test]
fn test_schema_equality() {
let mut builder1 = SchemaBuilder::new();
let name_field1 = Fe::new("name".to_string(), Ft::Text).unwrap();
builder1.add_field(name_field1).unwrap();
let schema1 = builder1.build().unwrap();
let mut builder2 = SchemaBuilder::new();
let name_field2 = Fe::new("name".to_string(), Ft::Text).unwrap();
builder2.add_field(name_field2).unwrap();
let schema2 = builder2.build().unwrap();
assert_eq!(schema1, schema2);
let mut builder3 = SchemaBuilder::new();
let age_field3 = Fe::new("name".to_string(), Ft::U64).unwrap();
builder3.add_field(age_field3).unwrap();
let schema3 = builder3.build().unwrap();
assert_ne!(schema1, schema3);
}
#[test]
fn test_schema_iter() {
let mut builder = SchemaBuilder::new();
let name_field = Fe::new("name".to_string(), Ft::Text).unwrap();
builder.add_field(name_field).unwrap();
let schema = builder.build().unwrap();
let fields: Vec<&FieldEntry> = schema.iter().collect();
assert_eq!(fields.len(), 2);
let field_names: Vec<&str> = fields.iter().map(|f| f.name()).collect();
println!("Field names: {field_names:?}");
assert!(field_names.contains(&"_id"));
assert!(field_names.contains(&"name"));
}
#[test]
fn test_schema_serde_roundtrip_json() {
let mut builder = SchemaBuilder::new();
builder
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
builder
.add_field(Fe::new("age".to_string(), Ft::Option(Box::new(Ft::U64))).unwrap())
.unwrap();
let schema = builder.build().unwrap();
let v = serde_json::to_value(&schema).unwrap();
let schema2: Schema = serde_json::from_value(v).unwrap();
assert_eq!(schema, schema2);
}
#[test]
fn test_schema_deserialize_rejects_invalid_invariants() {
let mut builder = SchemaBuilder::new();
builder
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
let schema = builder.build().unwrap();
let _v = serde_json::to_value(&schema).unwrap();
let mut missing_id = serde_json::to_value(&schema).unwrap();
let fields_missing = missing_id
.get_mut("fields")
.and_then(|x| x.as_array_mut())
.unwrap();
fields_missing.retain(|f| f.get("n") != Some(&json!("_id")));
assert!(serde_json::from_value::<Schema>(missing_id).is_err());
let mut invalid_name = serde_json::to_value(&schema).unwrap();
let fields2 = invalid_name
.get_mut("fields")
.and_then(|x| x.as_array_mut())
.unwrap();
if let Some(name_field) = fields2
.iter_mut()
.find(|f| f.get("n") == Some(&json!("name")))
{
name_field["n"] = json!("Name");
}
assert!(serde_json::from_value::<Schema>(invalid_name).is_err());
let mut dup_idx = serde_json::to_value(&schema).unwrap();
let fields3 = dup_idx
.get_mut("fields")
.and_then(|x| x.as_array_mut())
.unwrap();
if let Some(name_field) = fields3
.iter_mut()
.find(|f| f.get("n") == Some(&json!("name")))
{
name_field["i"] = json!(0);
}
assert!(serde_json::from_value::<Schema>(dup_idx).is_err());
let mut id_wrong_type = serde_json::to_value(&schema).unwrap();
let fields4 = id_wrong_type
.get_mut("fields")
.and_then(|x| x.as_array_mut())
.unwrap();
if let Some(id_field) = fields4
.iter_mut()
.find(|f| f.get("n") == Some(&json!("_id")))
{
id_field["t"] = json!("Text");
}
assert!(serde_json::from_value::<Schema>(id_wrong_type).is_err());
}
#[test]
fn test_schema_version() {
let mut builder = SchemaBuilder::new();
builder
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
let schema_v0 = builder.build().unwrap();
assert_eq!(schema_v0.version(), 0);
let mut schema_v1 = schema_v0.clone();
schema_v1.with_version(1);
assert_eq!(schema_v1.version(), 1);
let mut builder = SchemaBuilder::new();
builder.with_version(2);
builder
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
let schema_v2 = builder.build().unwrap();
assert_eq!(schema_v2.version(), 2);
assert!(schema_v2.needs_upgrade(&schema_v0));
assert!(!schema_v0.needs_upgrade(&schema_v2));
assert!(!schema_v0.needs_upgrade(&schema_v0));
assert_ne!(schema_v0, schema_v2);
}
#[test]
fn test_schema_version_serde_roundtrip() {
let mut builder = SchemaBuilder::new();
builder.with_version(3);
builder
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
let schema = builder.build().unwrap();
assert_eq!(schema.version(), 3);
let v = serde_json::to_value(&schema).unwrap();
assert_eq!(v.get("version").unwrap().as_u64().unwrap(), 3);
let schema2: Schema = serde_json::from_value(v).unwrap();
assert_eq!(schema, schema2);
assert_eq!(schema2.version(), 3);
}
#[test]
fn test_schema_version_defaults_to_zero_on_deserialize() {
let mut builder = SchemaBuilder::new();
builder
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
let schema = builder.build().unwrap();
let mut v = serde_json::to_value(&schema).unwrap();
v.as_object_mut().unwrap().remove("version");
let schema2: Schema = serde_json::from_value(v).unwrap();
assert_eq!(schema2.version(), 0);
assert_eq!(schema, schema2);
}
#[test]
fn test_upgrade_with_inherits_old_idx() {
let mut old_builder = SchemaBuilder::new();
old_builder.with_version(1);
old_builder
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
old_builder
.add_field(Fe::new("age".to_string(), Ft::Option(Box::new(Ft::U64))).unwrap())
.unwrap();
let old = old_builder.build().unwrap();
assert_eq!(old.get_field("name").unwrap().idx(), 1);
assert_eq!(old.get_field("age").unwrap().idx(), 2);
let mut new_builder = SchemaBuilder::new();
new_builder.with_version(2);
new_builder
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
new_builder
.add_field(Fe::new("age".to_string(), Ft::Option(Box::new(Ft::U64))).unwrap())
.unwrap();
new_builder
.add_field(Fe::new("email".to_string(), Ft::Option(Box::new(Ft::Text))).unwrap())
.unwrap();
let mut new_schema = new_builder.build().unwrap();
assert_eq!(new_schema.get_field("email").unwrap().idx(), 3);
new_schema.upgrade_with(&old).unwrap();
assert_eq!(new_schema.get_field("_id").unwrap().idx(), 0);
assert_eq!(new_schema.get_field("name").unwrap().idx(), 1);
assert_eq!(new_schema.get_field("age").unwrap().idx(), 2);
assert_eq!(new_schema.get_field("email").unwrap().idx(), 3);
}
#[test]
fn test_upgrade_with_removed_field_idx_not_reused() {
let mut old_builder = SchemaBuilder::new();
old_builder.with_version(1);
old_builder
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
old_builder
.add_field(Fe::new("age".to_string(), Ft::Option(Box::new(Ft::U64))).unwrap())
.unwrap();
old_builder
.add_field(Fe::new("bio".to_string(), Ft::Option(Box::new(Ft::Text))).unwrap())
.unwrap();
let old = old_builder.build().unwrap();
assert_eq!(old.get_field("bio").unwrap().idx(), 3);
let mut new_builder = SchemaBuilder::new();
new_builder.with_version(2);
new_builder
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
new_builder
.add_field(Fe::new("age".to_string(), Ft::Option(Box::new(Ft::U64))).unwrap())
.unwrap();
new_builder
.add_field(Fe::new("email".to_string(), Ft::Option(Box::new(Ft::Text))).unwrap())
.unwrap();
let mut new_schema = new_builder.build().unwrap();
new_schema.upgrade_with(&old).unwrap();
assert_eq!(new_schema.get_field("name").unwrap().idx(), 1);
assert_eq!(new_schema.get_field("age").unwrap().idx(), 2);
assert_eq!(new_schema.get_field("email").unwrap().idx(), 4);
}
#[test]
fn test_upgrade_with_rejects_new_required_field() {
let mut old_builder = SchemaBuilder::new();
old_builder.with_version(1);
old_builder
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
let old = old_builder.build().unwrap();
let mut new_builder = SchemaBuilder::new();
new_builder.with_version(2);
new_builder
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
new_builder
.add_field(Fe::new("email".to_string(), Ft::Text).unwrap())
.unwrap();
let mut new_schema = new_builder.build().unwrap();
let err = new_schema.upgrade_with(&old).unwrap_err();
assert!(
format!("{err:?}").contains("must be optional"),
"expected required field error, got: {err:?}"
);
}
#[test]
fn test_upgrade_with_rejects_index_overflow() {
let mut old_builder = SchemaBuilder::new();
old_builder.with_version(1);
old_builder.idx = u16::MAX as usize - 1;
old_builder
.add_field(Fe::new("last".to_string(), Ft::Text).unwrap())
.unwrap();
let old = old_builder.build().unwrap();
assert_eq!(old.get_field("last").unwrap().idx(), u16::MAX as usize);
let mut new_builder = SchemaBuilder::new();
new_builder.with_version(2);
new_builder
.add_field(Fe::new("last".to_string(), Ft::Text).unwrap())
.unwrap();
new_builder
.add_field(Fe::new("next".to_string(), Ft::Option(Box::new(Ft::Text))).unwrap())
.unwrap();
let mut new_schema = new_builder.build().unwrap();
assert!(new_schema.upgrade_with(&old).is_err());
}
#[test]
fn test_upgrade_with_rejects_type_change() {
let mut old_builder = SchemaBuilder::new();
old_builder.with_version(1);
old_builder
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
let old = old_builder.build().unwrap();
let mut new_builder = SchemaBuilder::new();
new_builder.with_version(2);
new_builder
.add_field(Fe::new("name".to_string(), Ft::U64).unwrap())
.unwrap();
let mut new_schema = new_builder.build().unwrap();
let err = new_schema.upgrade_with(&old).unwrap_err();
assert!(
format!("{err:?}").contains("type changed"),
"expected type change error, got: {err:?}"
);
}
#[test]
fn test_upgrade_with_is_atomic_on_error() {
let mut old_builder = SchemaBuilder::new();
old_builder.with_version(1);
old_builder
.add_field(Fe::new("age".to_string(), Ft::U64).unwrap())
.unwrap();
old_builder
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
let old = old_builder.build().unwrap();
let mut new_builder = SchemaBuilder::new();
new_builder.with_version(2);
new_builder
.add_field(Fe::new("name".to_string(), Ft::U64).unwrap())
.unwrap();
new_builder
.add_field(Fe::new("age".to_string(), Ft::U64).unwrap())
.unwrap();
let mut new_schema = new_builder.build().unwrap();
let snapshot = new_schema.clone();
assert!(new_schema.upgrade_with(&old).is_err());
assert_eq!(new_schema, snapshot);
assert_eq!(new_schema.get_field("age").unwrap().idx(), 2);
}
#[test]
fn test_upgrade_with_rejects_lower_version() {
let mut old_builder = SchemaBuilder::new();
old_builder.with_version(3);
old_builder
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
let old = old_builder.build().unwrap();
let mut new_builder = SchemaBuilder::new();
new_builder.with_version(2);
new_builder
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
let mut new_schema = new_builder.build().unwrap();
assert!(new_schema.upgrade_with(&old).is_err());
}
}