use std::{collections::BTreeMap, fmt, fs, path::PathBuf, str::FromStr};
use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64_ENGINE};
use chrono::{DateTime, NaiveDate, Utc};
use parking_lot::RwLock;
use regex::Regex;
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use serde_json::{Map as JsonMap, Value};
use validator::{validate_credit_card, validate_email, validate_url};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(try_from = "String", into = "String")]
pub enum ColumnType {
Integer,
Float,
Decimal { precision: u8, scale: u8 },
Boolean,
Text,
Timestamp,
Date,
Json,
Binary,
Object,
}
type ColumnTypeResult<T> = std::result::Result<T, ColumnTypeParseError>;
impl ColumnType {
fn as_str(&self) -> String {
match self {
ColumnType::Integer => "integer".to_string(),
ColumnType::Float => "float".to_string(),
ColumnType::Decimal { precision, scale } => {
format!("decimal({},{})", precision, scale)
}
ColumnType::Boolean => "boolean".to_string(),
ColumnType::Text => "text".to_string(),
ColumnType::Timestamp => "timestamp".to_string(),
ColumnType::Date => "date".to_string(),
ColumnType::Json => "json".to_string(),
ColumnType::Binary => "binary".to_string(),
ColumnType::Object => "object".to_string(),
}
}
fn parse_decimal(value: &str) -> ColumnTypeResult<Self> {
let start = value
.find('(')
.ok_or_else(|| ColumnTypeParseError(value.to_string()))?;
let end = value
.rfind(')')
.filter(|pos| *pos > start)
.ok_or_else(|| ColumnTypeParseError(value.to_string()))?;
let inner = &value[start + 1..end];
let mut parts = inner.split(',').map(|part| part.trim());
let precision = parts
.next()
.ok_or_else(|| ColumnTypeParseError(value.to_string()))?
.parse()
.map_err(|_| ColumnTypeParseError(value.to_string()))?;
let scale = parts
.next()
.ok_or_else(|| ColumnTypeParseError(value.to_string()))?
.parse()
.map_err(|_| ColumnTypeParseError(value.to_string()))?;
if parts.next().is_some() {
return Err(ColumnTypeParseError(value.to_string()));
}
Ok(ColumnType::Decimal { precision, scale })
}
}
impl fmt::Display for ColumnType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_str())
}
}
impl From<ColumnType> for String {
fn from(value: ColumnType) -> Self {
value.as_str()
}
}
impl TryFrom<String> for ColumnType {
type Error = ColumnTypeParseError;
fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
ColumnType::from_str(&value)
}
}
impl FromStr for ColumnType {
type Err = ColumnTypeParseError;
fn from_str(value: &str) -> std::result::Result<Self, Self::Err> {
let trimmed = value.trim();
if trimmed.is_empty() {
return Err(ColumnTypeParseError(value.to_string()));
}
let lower = trimmed.to_ascii_lowercase();
match lower.as_str() {
"integer" | "int" => Ok(ColumnType::Integer),
"float" | "double" => Ok(ColumnType::Float),
"boolean" | "bool" => Ok(ColumnType::Boolean),
"text" | "string" => Ok(ColumnType::Text),
"timestamp" => Ok(ColumnType::Timestamp),
"date" => Ok(ColumnType::Date),
"json" => Ok(ColumnType::Json),
"binary" | "bytes" => Ok(ColumnType::Binary),
"object" => Ok(ColumnType::Object),
_ if lower.starts_with("decimal(") || lower.starts_with("numeric(") => {
ColumnType::parse_decimal(trimmed)
}
_ => Err(ColumnTypeParseError(value.to_string())),
}
}
}
#[derive(Debug, Clone)]
pub struct ColumnTypeParseError(String);
impl fmt::Display for ColumnTypeParseError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "invalid column type '{}'", self.0)
}
}
impl std::error::Error for ColumnTypeParseError {}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(default)]
pub struct FieldRules {
#[serde(skip_serializing_if = "is_false")]
pub required: bool,
#[serde(skip_serializing_if = "vec_is_empty")]
pub contains: Vec<String>,
#[serde(skip_serializing_if = "vec_is_empty")]
pub does_not_contain: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub must_match: Option<String>,
#[serde(skip_serializing_if = "vec_is_empty")]
pub regex: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub length: Option<LengthRule>,
#[serde(skip_serializing_if = "Option::is_none")]
pub range: Option<RangeRule>,
#[serde(skip_serializing_if = "Option::is_none")]
pub format: Option<FieldFormat>,
#[serde(skip_serializing_if = "map_is_empty")]
pub properties: BTreeMap<String, ColumnSettings>,
}
impl FieldRules {
fn is_default(&self) -> bool {
!self.required
&& self.contains.is_empty()
&& self.does_not_contain.is_empty()
&& self.must_match.is_none()
&& self.regex.is_empty()
&& self.length.is_none()
&& self.range.is_none()
&& self.format.is_none()
&& self.properties.is_empty()
}
fn validate_rules(
&self,
path: &str,
column_type: &ColumnType,
value: &ComparableValue,
) -> Result<()> {
if !self.contains.is_empty()
|| !self.does_not_contain.is_empty()
|| !self.regex.is_empty()
|| self.format.is_some()
{
let text = value.as_text().ok_or_else(|| {
EventError::SchemaViolation(format!("field {} must be a string", path))
})?;
for needle in &self.contains {
if !text.contains(needle) {
return Err(EventError::SchemaViolation(format!(
"field {} must contain '{}'",
path, needle
)));
}
}
for needle in &self.does_not_contain {
if text.contains(needle) {
return Err(EventError::SchemaViolation(format!(
"field {} must not contain '{}'",
path, needle
)));
}
}
for pattern in &self.regex {
let regex = Regex::new(pattern).map_err(|err| {
EventError::SchemaViolation(format!(
"field {} has invalid regex '{}': {}",
path, pattern, err
))
})?;
if !regex.is_match(text) {
return Err(EventError::SchemaViolation(format!(
"field {} must match pattern {}",
path, pattern
)));
}
}
if let Some(format) = &self.format {
match format {
FieldFormat::Email => {
if !validate_email(text) {
return Err(EventError::SchemaViolation(format!(
"field {} must be a valid email address",
path
)));
}
}
FieldFormat::Url => {
if !validate_url(text) {
return Err(EventError::SchemaViolation(format!(
"field {} must be a valid URL",
path
)));
}
}
FieldFormat::CreditCard => {
if !validate_credit_card(text) {
return Err(EventError::SchemaViolation(format!(
"field {} must be a valid credit card number",
path
)));
}
}
}
}
} else if let Some(length) = &self.length {
match value {
ComparableValue::Text(text) => length.check(path, text.chars().count())?,
ComparableValue::Binary(bytes) => length.check(path, bytes.len())?,
_ => {
return Err(EventError::SchemaViolation(format!(
"field {} does not support length validation",
path
)));
}
}
}
if let Some(range) = &self.range {
match value {
ComparableValue::Integer(current) => {
check_integer_range(path, *current, range)?;
}
ComparableValue::Float(current) => {
check_float_range(path, *current, range)?;
}
ComparableValue::Decimal(current) => {
check_decimal_range(path, current, range)?;
}
ComparableValue::Timestamp(current) => {
check_timestamp_range(path, current, range)?;
}
ComparableValue::Date(current) => {
check_date_range(path, current, range)?;
}
_ => {
return Err(EventError::SchemaViolation(format!(
"field {} does not support range validation",
path
)));
}
}
}
if !self.properties.is_empty()
&& !matches!(column_type, ColumnType::Object | ColumnType::Json)
{
return Err(EventError::SchemaViolation(format!(
"field {} cannot define nested properties on non-object type",
path
)));
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(default)]
pub struct LengthRule {
pub min: Option<usize>,
pub max: Option<usize>,
}
impl LengthRule {
fn check(&self, path: &str, len: usize) -> Result<()> {
if let Some(min) = self.min {
if len < min {
return Err(EventError::SchemaViolation(format!(
"field {} must have length >= {}",
path, min
)));
}
}
if let Some(max) = self.max {
if len > max {
return Err(EventError::SchemaViolation(format!(
"field {} must have length <= {}",
path, max
)));
}
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(default)]
pub struct RangeRule {
pub min: Option<String>,
pub max: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum FieldFormat {
Email,
Url,
CreditCard,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ColumnSettings {
pub column_type: ColumnType,
pub rules: FieldRules,
}
impl ColumnSettings {
fn from_type(column_type: ColumnType) -> Self {
Self {
column_type,
rules: FieldRules::default(),
}
}
fn validate_with_path(
&self,
path: &str,
value_opt: Option<&Value>,
root_value: &Value,
root_definitions: &BTreeMap<String, ColumnSettings>,
normalized: &mut BTreeMap<String, ComparableValue>,
) -> Result<Option<ComparableValue>> {
if value_opt.is_none() {
if self.rules.required {
return Err(EventError::SchemaViolation(format!(
"field {} is required",
path
)));
}
return Ok(None);
}
let value = value_opt.expect("value checked above");
let comparable =
self.coerce_value(path, value, root_value, root_definitions, normalized)?;
self.rules
.validate_rules(path, &self.column_type, &comparable)?;
Ok(Some(comparable))
}
fn coerce_value(
&self,
path: &str,
value: &Value,
root_value: &Value,
root_definitions: &BTreeMap<String, ColumnSettings>,
normalized: &mut BTreeMap<String, ComparableValue>,
) -> Result<ComparableValue> {
match &self.column_type {
ColumnType::Integer => Ok(ComparableValue::Integer(coerce_integer(value, path)?)),
ColumnType::Float => Ok(ComparableValue::Float(coerce_float(value, path)?)),
ColumnType::Decimal { precision, scale } => {
let decimal = coerce_decimal(value, path)?;
check_decimal_constraints(&decimal, *precision, *scale, path)?;
Ok(ComparableValue::Decimal(decimal))
}
ColumnType::Boolean => Ok(ComparableValue::Boolean(coerce_boolean(value, path)?)),
ColumnType::Text => Ok(ComparableValue::Text(coerce_text(value, path)?)),
ColumnType::Timestamp => Ok(ComparableValue::Timestamp(coerce_timestamp(value, path)?)),
ColumnType::Date => Ok(ComparableValue::Date(coerce_date(value, path)?)),
ColumnType::Json => Ok(ComparableValue::Json(value.clone())),
ColumnType::Binary => Ok(ComparableValue::Binary(coerce_binary(value, path)?)),
ColumnType::Object => {
let object = value.as_object().ok_or_else(|| {
EventError::SchemaViolation(format!("field {} must be a JSON object", path))
})?;
validate_columns(
&self.rules.properties,
object,
root_value,
root_definitions,
path,
normalized,
)?;
Ok(ComparableValue::Json(value.clone()))
}
}
}
fn same_kind(&self, other: &ColumnSettings) -> bool {
use std::mem::Discriminant;
fn discriminant(value: &ColumnType) -> Discriminant<ColumnType> {
std::mem::discriminant(value)
}
if discriminant(&self.column_type) != discriminant(&other.column_type) {
return false;
}
match (&self.column_type, &other.column_type) {
(
ColumnType::Decimal {
precision: p1,
scale: s1,
},
ColumnType::Decimal {
precision: p2,
scale: s2,
},
) => p1 == p2 && s1 == s2,
_ => true,
}
}
}
#[derive(Serialize, Deserialize)]
#[serde(untagged)]
enum ColumnSettingsHelper {
Simple(ColumnType),
Detailed {
#[serde(rename = "type")]
column_type: ColumnType,
#[serde(flatten)]
rules: FieldRules,
},
}
impl Serialize for ColumnSettings {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if self.rules.is_default() {
ColumnSettingsHelper::Simple(self.column_type.clone()).serialize(serializer)
} else {
ColumnSettingsHelper::Detailed {
column_type: self.column_type.clone(),
rules: self.rules.clone(),
}
.serialize(serializer)
}
}
}
impl<'de> Deserialize<'de> for ColumnSettings {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let helper = ColumnSettingsHelper::deserialize(deserializer)?;
Ok(match helper {
ColumnSettingsHelper::Simple(column_type) => ColumnSettings::from_type(column_type),
ColumnSettingsHelper::Detailed { column_type, rules } => {
ColumnSettings { column_type, rules }
}
})
}
}
#[derive(Debug, Clone, PartialEq)]
enum ComparableValue {
Integer(i64),
Float(f64),
Decimal(Decimal),
Boolean(bool),
Text(String),
Timestamp(DateTime<Utc>),
Date(NaiveDate),
Json(Value),
Binary(Vec<u8>),
}
fn is_false(value: &bool) -> bool {
!*value
}
fn vec_is_empty(value: &Vec<String>) -> bool {
value.is_empty()
}
fn map_is_empty(value: &BTreeMap<String, ColumnSettings>) -> bool {
value.is_empty()
}
impl ComparableValue {
fn as_text(&self) -> Option<&str> {
match self {
ComparableValue::Text(value) => Some(value.as_str()),
_ => None,
}
}
}
use super::error::{EventError, Result};
use crate::store::payload_to_map;
pub const MAX_EVENT_NOTE_LENGTH: usize = 128;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct EventSchema {
pub fields: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub notes: Option<String>,
}
impl Default for EventSchema {
fn default() -> Self {
Self {
fields: Vec::new(),
notes: None,
}
}
}
impl EventSchema {
fn ensure_sorted(&mut self) {
self.fields.sort();
self.fields.dedup();
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AggregateSchema {
pub aggregate: String,
pub snapshot_threshold: Option<u64>,
pub locked: bool,
pub field_locks: Vec<String>,
#[serde(default)]
pub hidden: bool,
#[serde(default)]
pub hidden_fields: Vec<String>,
#[serde(default)]
pub column_types: BTreeMap<String, ColumnSettings>,
pub events: BTreeMap<String, EventSchema>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
impl AggregateSchema {
fn ensure_sorted(&mut self) {
self.field_locks.sort();
self.field_locks.dedup();
self.hidden_fields.sort();
self.hidden_fields.dedup();
for schema in self.events.values_mut() {
schema.ensure_sorted();
}
}
}
#[derive(Debug)]
pub struct SchemaManager {
path: PathBuf,
items: RwLock<BTreeMap<String, AggregateSchema>>,
}
#[derive(Debug)]
pub struct CreateSchemaInput {
pub aggregate: String,
pub events: Vec<String>,
pub snapshot_threshold: Option<u64>,
}
#[derive(Debug, Default)]
pub struct SchemaUpdate {
pub snapshot_threshold: Option<Option<u64>>,
pub locked: Option<bool>,
pub field_lock: Option<(String, bool)>,
pub event_add_fields: BTreeMap<String, Vec<String>>,
pub event_remove_fields: BTreeMap<String, Vec<String>>,
pub event_notes: BTreeMap<String, Option<String>>,
pub hidden: Option<bool>,
pub hidden_field: Option<(String, bool)>,
pub column_type: Option<(String, Option<ColumnType>)>,
pub column_rules: Option<(String, Option<FieldRules>)>,
}
impl SchemaManager {
pub fn load(path: PathBuf) -> Result<Self> {
if !path.exists() {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
fs::write(&path, "{}")?;
}
let contents = fs::read_to_string(&path)?;
let map: BTreeMap<String, AggregateSchema> = if contents.trim().is_empty() {
BTreeMap::new()
} else {
serde_json::from_str(&contents)?
};
Ok(Self {
path,
items: RwLock::new(map),
})
}
pub fn create(&self, input: CreateSchemaInput) -> Result<AggregateSchema> {
if input.aggregate.trim().is_empty() {
return Err(EventError::InvalidSchema(
"aggregate name must be provided".into(),
));
}
if input.events.is_empty() {
return Err(EventError::InvalidSchema(
"at least one event must be provided".into(),
));
}
let mut items = self.items.write();
let aggregate_key = input.aggregate.clone();
if items.contains_key(&aggregate_key) {
return Err(EventError::SchemaExists);
}
let now = Utc::now();
let mut events = BTreeMap::new();
for event in input.events {
if event.trim().is_empty() {
return Err(EventError::InvalidSchema(
"event names cannot be empty".into(),
));
}
events.insert(event, EventSchema::default());
}
let mut schema = AggregateSchema {
aggregate: aggregate_key.clone(),
snapshot_threshold: input.snapshot_threshold,
locked: false,
field_locks: Vec::new(),
hidden: false,
hidden_fields: Vec::new(),
column_types: BTreeMap::new(),
events,
created_at: now,
updated_at: now,
};
schema.ensure_sorted();
items.insert(aggregate_key.clone(), schema.clone());
self.persist(&items)?;
Ok(schema)
}
pub fn update(&self, aggregate: &str, update: SchemaUpdate) -> Result<AggregateSchema> {
let mut items = self.items.write();
{
let schema = items.get_mut(aggregate).ok_or(EventError::SchemaNotFound)?;
if let Some(snapshot) = update.snapshot_threshold {
schema.snapshot_threshold = snapshot;
}
if let Some(locked) = update.locked {
schema.locked = locked;
}
if let Some((field, lock)) = update.field_lock {
if field.trim().is_empty() {
return Err(EventError::InvalidSchema(
"field name cannot be empty".into(),
));
}
if lock {
if !schema.field_locks.contains(&field) {
schema.field_locks.push(field);
}
} else {
schema.field_locks.retain(|item| item != &field);
}
}
if let Some(hidden) = update.hidden {
schema.hidden = hidden;
}
if let Some((field, hide)) = update.hidden_field {
if field.trim().is_empty() {
return Err(EventError::InvalidSchema(
"field name cannot be empty".into(),
));
}
if hide {
if !schema.hidden_fields.contains(&field) {
schema.hidden_fields.push(field);
}
} else {
schema.hidden_fields.retain(|item| item != &field);
}
}
if let Some((field, data_type)) = update.column_type {
if field.trim().is_empty() {
return Err(EventError::InvalidSchema(
"field name cannot be empty".into(),
));
}
match data_type {
Some(value) => {
schema
.column_types
.insert(field, ColumnSettings::from_type(value));
}
None => {
schema.column_types.remove(&field);
}
}
}
if let Some((field, rules)) = update.column_rules {
if field.trim().is_empty() {
return Err(EventError::InvalidSchema(
"field name cannot be empty".into(),
));
}
if let Some(settings) = schema.column_types.get_mut(&field) {
match rules {
Some(mut new_rules) => {
if new_rules.properties.is_empty() {
new_rules.properties =
std::mem::take(&mut settings.rules.properties);
}
settings.rules = new_rules;
}
None => {
settings.rules = FieldRules::default();
}
}
} else {
return Err(EventError::InvalidSchema(format!(
"cannot update rules for undefined field {}",
field
)));
}
}
for (event, fields) in update.event_add_fields {
let schema_event = schema
.events
.entry(event.clone())
.or_insert_with(EventSchema::default);
for field in fields {
if field.trim().is_empty() {
return Err(EventError::InvalidSchema(
"field names cannot be empty".into(),
));
}
schema_event.fields.push(field);
}
}
for (event, fields) in update.event_remove_fields {
if let Some(schema_event) = schema.events.get_mut(&event) {
schema_event
.fields
.retain(|existing| !fields.contains(existing));
}
}
for (event, note) in update.event_notes {
let schema_event = schema.events.get_mut(&event).ok_or_else(|| {
EventError::InvalidSchema(format!(
"event {} is not defined for aggregate {}",
event, aggregate
))
})?;
if let Some(ref value) = note {
if value.chars().count() > MAX_EVENT_NOTE_LENGTH {
return Err(EventError::InvalidSchema(format!(
"default note for event {} cannot exceed {} characters",
event, MAX_EVENT_NOTE_LENGTH
)));
}
}
schema_event.notes = note;
}
schema.ensure_sorted();
schema.updated_at = Utc::now();
}
let result = items
.get(aggregate)
.cloned()
.ok_or(EventError::SchemaNotFound)?;
self.persist(&items)?;
Ok(result)
}
pub fn list(&self) -> Vec<AggregateSchema> {
self.items.read().values().cloned().collect()
}
pub fn get(&self, aggregate: &str) -> Result<AggregateSchema> {
self.items
.read()
.get(aggregate)
.cloned()
.ok_or(EventError::SchemaNotFound)
}
pub fn should_snapshot(&self, aggregate: &str, version: u64) -> bool {
if version == 0 {
return false;
}
let items = self.items.read();
items
.get(aggregate)
.and_then(|schema| schema.snapshot_threshold)
.map(|threshold| threshold > 0 && version % threshold == 0)
.unwrap_or(false)
}
pub fn snapshot(&self) -> BTreeMap<String, AggregateSchema> {
self.items.read().clone()
}
pub fn replace_all(&self, mut items: BTreeMap<String, AggregateSchema>) -> Result<()> {
for schema in items.values_mut() {
schema.ensure_sorted();
}
{
let mut guard = self.items.write();
*guard = items.clone();
}
self.persist(&items)?;
Ok(())
}
pub fn validate_event(&self, aggregate: &str, event_type: &str, payload: &Value) -> Result<()> {
let items = self.items.read();
let Some(schema) = items.get(aggregate) else {
return Ok(());
};
if schema.locked {
return Err(EventError::SchemaViolation(format!(
"aggregate {} is locked for updates",
aggregate
)));
}
let payload_map = payload_to_map(payload);
for key in payload_map.keys() {
if schema.field_locks.contains(key) {
return Err(EventError::SchemaViolation(format!(
"field {} is locked for aggregate {}",
key, aggregate
)));
}
}
let event_schema = schema.events.get(event_type).ok_or_else(|| {
EventError::SchemaViolation(format!(
"event {} is not defined for aggregate {}",
event_type, aggregate
))
})?;
if !event_schema.fields.is_empty() {
for required in &event_schema.fields {
if !payload_map.contains_key(required) {
return Err(EventError::SchemaViolation(format!(
"missing required field {} for event {}",
required, event_type
)));
}
}
for key in payload_map.keys() {
if !event_schema.fields.contains(key) {
return Err(EventError::SchemaViolation(format!(
"field {} is not permitted for event {}",
key, event_type
)));
}
}
}
if !schema.column_types.is_empty() {
let object = payload.as_object().ok_or_else(|| {
EventError::SchemaViolation(format!(
"aggregate {} expects object payload for validation",
aggregate
))
})?;
let mut normalized = BTreeMap::new();
validate_columns(
&schema.column_types,
object,
payload,
&schema.column_types,
"",
&mut normalized,
)?;
validate_must_match(&schema.column_types, &schema.column_types, &normalized, "")?;
}
Ok(())
}
pub fn remove_event(&self, aggregate: &str, event: &str) -> Result<AggregateSchema> {
let mut items = self.items.write();
let result = {
let schema = items.get_mut(aggregate).ok_or(EventError::SchemaNotFound)?;
if !schema.events.contains_key(event) {
return Err(EventError::SchemaViolation(format!(
"event {} is not defined for aggregate {}",
event, aggregate
)));
}
if schema.events.len() == 1 {
return Err(EventError::SchemaViolation(format!(
"aggregate {} must define at least one event",
aggregate
)));
}
schema.events.remove(event);
schema.updated_at = Utc::now();
schema.clone()
};
self.persist(&items)?;
Ok(result)
}
fn persist(&self, items: &BTreeMap<String, AggregateSchema>) -> Result<()> {
let payload = serde_json::to_string_pretty(items)?;
fs::write(&self.path, payload)?;
Ok(())
}
}
fn validate_columns(
definitions: &BTreeMap<String, ColumnSettings>,
payload: &JsonMap<String, Value>,
root_value: &Value,
root_definitions: &BTreeMap<String, ColumnSettings>,
prefix: &str,
normalized: &mut BTreeMap<String, ComparableValue>,
) -> Result<()> {
for (field, definition) in definitions {
let path = join_path(prefix, field);
let value_opt = payload.get(field);
if let Some(value) = definition.validate_with_path(
&path,
value_opt,
root_value,
root_definitions,
normalized,
)? {
normalized.insert(path.clone(), value);
}
}
Ok(())
}
fn validate_must_match(
definitions: &BTreeMap<String, ColumnSettings>,
root_definitions: &BTreeMap<String, ColumnSettings>,
values: &BTreeMap<String, ComparableValue>,
prefix: &str,
) -> Result<()> {
for (field, definition) in definitions {
let path = join_path(prefix, field);
if let Some(target_path) = &definition.rules.must_match {
if let Some(source_value) = values.get(&path) {
let target_value = values.get(target_path).ok_or_else(|| {
EventError::SchemaViolation(format!(
"field {} must match {}, but {} is missing",
path, target_path, target_path
))
})?;
let target_definition = find_column_settings(root_definitions, target_path)
.ok_or_else(|| {
EventError::SchemaViolation(format!(
"field {} must match {}, but {} is not defined in schema",
path, target_path, target_path
))
})?;
if !definition.same_kind(target_definition) {
return Err(EventError::SchemaViolation(format!(
"field {} must match {}, but their types differ",
path, target_path
)));
}
if source_value != target_value {
return Err(EventError::SchemaViolation(format!(
"field {} must match {}",
path, target_path
)));
}
} else if definition.rules.required {
return Err(EventError::SchemaViolation(format!(
"field {} is required but missing",
path
)));
}
}
if !definition.rules.properties.is_empty()
&& matches!(definition.column_type, ColumnType::Object)
{
validate_must_match(
&definition.rules.properties,
root_definitions,
values,
&path,
)?;
}
}
Ok(())
}
fn find_column_settings<'a>(
definitions: &'a BTreeMap<String, ColumnSettings>,
path: &str,
) -> Option<&'a ColumnSettings> {
let mut map = definitions;
let mut segments = path.split('.').peekable();
while let Some(segment) = segments.next() {
let current = map.get(segment)?;
if segments.peek().is_some() {
if let ColumnType::Object = current.column_type {
map = ¤t.rules.properties;
} else {
return None;
}
} else {
return Some(current);
}
}
None
}
fn join_path(prefix: &str, key: &str) -> String {
if prefix.is_empty() {
key.to_string()
} else {
format!("{}.{}", prefix, key)
}
}
fn coerce_integer(value: &Value, path: &str) -> Result<i64> {
match value {
Value::Number(number) => {
if let Some(int) = number.as_i64() {
Ok(int)
} else if let Some(unsigned) = number.as_u64() {
i64::try_from(unsigned).map_err(|_| {
EventError::SchemaViolation(format!(
"field {} exceeds signed integer range",
path
))
})
} else {
Err(EventError::SchemaViolation(format!(
"field {} must be an integer",
path
)))
}
}
Value::String(raw) => raw
.trim()
.parse::<i64>()
.map_err(|_| EventError::SchemaViolation(format!("field {} must be an integer", path))),
_ => Err(EventError::SchemaViolation(format!(
"field {} must be an integer",
path
))),
}
}
fn coerce_float(value: &Value, path: &str) -> Result<f64> {
match value {
Value::Number(number) => number
.as_f64()
.ok_or_else(|| EventError::SchemaViolation(format!("field {} must be a float", path))),
Value::String(raw) => raw
.trim()
.parse::<f64>()
.map_err(|_| EventError::SchemaViolation(format!("field {} must be a float", path))),
_ => Err(EventError::SchemaViolation(format!(
"field {} must be a float",
path
))),
}
}
fn coerce_decimal(value: &Value, path: &str) -> Result<Decimal> {
let raw = match value {
Value::Number(number) => number.to_string(),
Value::String(raw) => raw.trim().to_string(),
_ => {
return Err(EventError::SchemaViolation(format!(
"field {} must be a decimal value",
path
)));
}
};
Decimal::from_str(&raw)
.map_err(|_| EventError::SchemaViolation(format!("field {} must be a decimal value", path)))
}
fn coerce_boolean(value: &Value, path: &str) -> Result<bool> {
match value {
Value::Bool(flag) => Ok(*flag),
Value::Number(number) => match number.as_i64() {
Some(0) => Ok(false),
Some(1) => Ok(true),
_ => Err(EventError::SchemaViolation(format!(
"field {} must be a boolean",
path
))),
},
Value::String(raw) => match raw.trim().to_ascii_lowercase().as_str() {
"true" | "1" => Ok(true),
"false" | "0" => Ok(false),
_ => Err(EventError::SchemaViolation(format!(
"field {} must be a boolean",
path
))),
},
_ => Err(EventError::SchemaViolation(format!(
"field {} must be a boolean",
path
))),
}
}
fn coerce_text(value: &Value, path: &str) -> Result<String> {
match value {
Value::String(raw) => Ok(raw.clone()),
_ => Err(EventError::SchemaViolation(format!(
"field {} must be a string",
path
))),
}
}
fn coerce_timestamp(value: &Value, path: &str) -> Result<DateTime<Utc>> {
let raw = match value {
Value::String(raw) => raw,
_ => {
return Err(EventError::SchemaViolation(format!(
"field {} must be an RFC3339 timestamp",
path
)));
}
};
DateTime::parse_from_rfc3339(raw)
.map(|dt| dt.with_timezone(&Utc))
.map_err(|_| {
EventError::SchemaViolation(format!("field {} must be an RFC3339 timestamp", path))
})
}
fn coerce_date(value: &Value, path: &str) -> Result<NaiveDate> {
let raw = match value {
Value::String(raw) => raw,
_ => {
return Err(EventError::SchemaViolation(format!(
"field {} must be a date formatted as YYYY-MM-DD",
path
)));
}
};
NaiveDate::parse_from_str(raw, "%Y-%m-%d").map_err(|_| {
EventError::SchemaViolation(format!(
"field {} must be a date formatted as YYYY-MM-DD",
path
))
})
}
fn coerce_binary(value: &Value, path: &str) -> Result<Vec<u8>> {
let raw = match value {
Value::String(raw) => raw,
_ => {
return Err(EventError::SchemaViolation(format!(
"field {} must be base64 encoded",
path
)));
}
};
BASE64_ENGINE
.decode(raw.as_bytes())
.map_err(|_| EventError::SchemaViolation(format!("field {} must be base64 encoded", path)))
}
fn check_decimal_constraints(
decimal: &Decimal,
precision: u8,
scale: u8,
path: &str,
) -> Result<()> {
if decimal.scale() as u8 > scale {
return Err(EventError::SchemaViolation(format!(
"field {} must have at most {} decimal places",
path, scale
)));
}
let digits = decimal
.abs()
.to_string()
.chars()
.filter(|c| c.is_ascii_digit())
.count();
if digits > precision as usize {
return Err(EventError::SchemaViolation(format!(
"field {} must have precision <= {}",
path, precision
)));
}
Ok(())
}
fn check_integer_range(path: &str, value: i64, range: &RangeRule) -> Result<()> {
if let Some(raw) = &range.min {
let min = raw.trim().parse::<i64>().map_err(|_| {
EventError::SchemaViolation(format!(
"field {} range minimum '{}' is not a valid integer",
path, raw
))
})?;
if value < min {
return Err(EventError::SchemaViolation(format!(
"field {} must be >= {}",
path, min
)));
}
}
if let Some(raw) = &range.max {
let max = raw.trim().parse::<i64>().map_err(|_| {
EventError::SchemaViolation(format!(
"field {} range maximum '{}' is not a valid integer",
path, raw
))
})?;
if value > max {
return Err(EventError::SchemaViolation(format!(
"field {} must be <= {}",
path, max
)));
}
}
Ok(())
}
fn check_float_range(path: &str, value: f64, range: &RangeRule) -> Result<()> {
if let Some(raw) = &range.min {
let min = raw.trim().parse::<f64>().map_err(|_| {
EventError::SchemaViolation(format!(
"field {} range minimum '{}' is not a valid float",
path, raw
))
})?;
if value < min {
return Err(EventError::SchemaViolation(format!(
"field {} must be >= {}",
path, min
)));
}
}
if let Some(raw) = &range.max {
let max = raw.trim().parse::<f64>().map_err(|_| {
EventError::SchemaViolation(format!(
"field {} range maximum '{}' is not a valid float",
path, raw
))
})?;
if value > max {
return Err(EventError::SchemaViolation(format!(
"field {} must be <= {}",
path, max
)));
}
}
Ok(())
}
fn check_decimal_range(path: &str, value: &Decimal, range: &RangeRule) -> Result<()> {
if let Some(raw) = &range.min {
let min = Decimal::from_str(raw.trim()).map_err(|_| {
EventError::SchemaViolation(format!(
"field {} range minimum '{}' is not a valid decimal",
path, raw
))
})?;
if value < &min {
return Err(EventError::SchemaViolation(format!(
"field {} must be >= {}",
path, min
)));
}
}
if let Some(raw) = &range.max {
let max = Decimal::from_str(raw.trim()).map_err(|_| {
EventError::SchemaViolation(format!(
"field {} range maximum '{}' is not a valid decimal",
path, raw
))
})?;
if value > &max {
return Err(EventError::SchemaViolation(format!(
"field {} must be <= {}",
path, max
)));
}
}
Ok(())
}
fn check_timestamp_range(path: &str, value: &DateTime<Utc>, range: &RangeRule) -> Result<()> {
if let Some(raw) = &range.min {
let min = parse_timestamp_literal(raw, path)?;
if value < &min {
return Err(EventError::SchemaViolation(format!(
"field {} must be >= {}",
path, raw
)));
}
}
if let Some(raw) = &range.max {
let max = parse_timestamp_literal(raw, path)?;
if value > &max {
return Err(EventError::SchemaViolation(format!(
"field {} must be <= {}",
path, raw
)));
}
}
Ok(())
}
fn check_date_range(path: &str, value: &NaiveDate, range: &RangeRule) -> Result<()> {
if let Some(raw) = &range.min {
let min = parse_date_literal(raw, path)?;
if value < &min {
return Err(EventError::SchemaViolation(format!(
"field {} must be on or after {}",
path, raw
)));
}
}
if let Some(raw) = &range.max {
let max = parse_date_literal(raw, path)?;
if value > &max {
return Err(EventError::SchemaViolation(format!(
"field {} must be on or before {}",
path, raw
)));
}
}
Ok(())
}
fn parse_timestamp_literal(raw: &str, path: &str) -> Result<DateTime<Utc>> {
DateTime::parse_from_rfc3339(raw.trim())
.map(|dt| dt.with_timezone(&Utc))
.map_err(|_| {
EventError::SchemaViolation(format!(
"field {} range boundary '{}' is not a valid RFC3339 timestamp",
path, raw
))
})
}
fn parse_date_literal(raw: &str, path: &str) -> Result<NaiveDate> {
NaiveDate::parse_from_str(raw.trim(), "%Y-%m-%d").map_err(|_| {
EventError::SchemaViolation(format!(
"field {} range boundary '{}' is not a valid date (YYYY-MM-DD)",
path, raw
))
})
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use std::{collections::BTreeMap, str::FromStr};
#[test]
fn create_and_update_schema() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("schemas.json");
let manager = SchemaManager::load(path).unwrap();
let schema = manager
.create(CreateSchemaInput {
aggregate: "patient".into(),
events: vec!["patient-created".into(), "patient-updated".into()],
snapshot_threshold: Some(10),
})
.unwrap();
assert_eq!(schema.aggregate, "patient");
assert_eq!(schema.events.len(), 2);
let updated = manager
.update(
"patient",
SchemaUpdate {
locked: Some(true),
field_lock: Some(("birthdate".into(), true)),
event_add_fields: {
let mut map = BTreeMap::new();
map.insert(
"patient-created".into(),
vec!["name".into(), "birthdate".into()],
);
map
},
..SchemaUpdate::default()
},
)
.unwrap();
assert!(updated.locked);
assert!(updated.field_locks.contains(&"birthdate".to_string()));
assert!(
updated
.events
.get("patient-created")
.unwrap()
.fields
.contains(&"name".to_string())
);
}
#[test]
fn remove_event_from_aggregate() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("schemas.json");
let manager = SchemaManager::load(path).unwrap();
manager
.create(CreateSchemaInput {
aggregate: "person".into(),
events: vec!["person_created".into(), "person_updated".into()],
snapshot_threshold: None,
})
.unwrap();
let updated = manager.remove_event("person", "person_updated").unwrap();
assert!(updated.events.contains_key("person_created"));
assert!(!updated.events.contains_key("person_updated"));
let err = manager.remove_event("person", "missing").unwrap_err();
assert!(matches!(err, EventError::SchemaViolation(_)));
let err = manager
.remove_event("person", "person_created")
.unwrap_err();
assert!(matches!(err, EventError::SchemaViolation(_)));
}
#[test]
fn validates_required_fields() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("schemas.json");
let manager = SchemaManager::load(path).unwrap();
manager
.create(CreateSchemaInput {
aggregate: "person".into(),
events: vec!["person_created".into()],
snapshot_threshold: None,
})
.unwrap();
let mut update = SchemaUpdate::default();
update
.event_add_fields
.insert("person_created".into(), vec!["first_name".into()]);
manager.update("person", update).unwrap();
let payload = json!({ "first_name": "Alice" });
manager
.validate_event("person", "person_created", &payload)
.unwrap();
let payload = json!({});
let err = manager
.validate_event("person", "person_created", &payload)
.unwrap_err();
assert!(matches!(err, EventError::SchemaViolation(_)));
}
#[test]
fn snapshot_threshold_triggers_on_expected_versions() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("schemas.json");
let manager = SchemaManager::load(path).unwrap();
manager
.create(CreateSchemaInput {
aggregate: "order".into(),
events: vec!["order-created".into()],
snapshot_threshold: Some(3),
})
.unwrap();
assert!(!manager.should_snapshot("order", 1));
assert!(manager.should_snapshot("order", 3));
assert!(!manager.should_snapshot("order", 4));
assert!(!manager.should_snapshot("missing", 3));
}
#[test]
fn column_type_from_str_parses_decimal() {
let ty = ColumnType::from_str("decimal(12, 4)").unwrap();
assert_eq!(
ty,
ColumnType::Decimal {
precision: 12,
scale: 4
}
);
}
#[test]
fn column_type_round_trip_serialization() {
let mut map: BTreeMap<String, ColumnSettings> = BTreeMap::new();
map.insert(
"amount".to_string(),
ColumnSettings::from_type(ColumnType::Decimal {
precision: 8,
scale: 2,
}),
);
map.insert(
"flag".to_string(),
ColumnSettings::from_type(ColumnType::Boolean),
);
let json = serde_json::to_string(&map).unwrap();
assert_eq!(json, r#"{"amount":"decimal(8,2)","flag":"boolean"}"#);
let decoded: BTreeMap<String, ColumnSettings> = serde_json::from_str(&json).unwrap();
assert_eq!(decoded, map);
}
#[test]
fn update_schema_column_type() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("schemas.json");
let manager = SchemaManager::load(path).unwrap();
manager
.create(CreateSchemaInput {
aggregate: "order".into(),
events: vec!["order_created".into()],
snapshot_threshold: None,
})
.unwrap();
let mut update = SchemaUpdate::default();
update.column_type = Some((
"total".to_string(),
Some(ColumnType::Decimal {
precision: 12,
scale: 2,
}),
));
let schema = manager.update("order", update).unwrap();
let column_type = schema
.column_types
.get("total")
.expect("column type should be recorded");
assert_eq!(
column_type.column_type,
ColumnType::Decimal {
precision: 12,
scale: 2
}
);
let mut removal = SchemaUpdate::default();
removal.column_type = Some(("total".to_string(), None));
let schema = manager.update("order", removal).unwrap();
assert!(!schema.column_types.contains_key("total"));
}
#[test]
fn enforces_contains_and_excludes_rules() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("schemas.json");
let manager = SchemaManager::load(path).unwrap();
manager
.create(CreateSchemaInput {
aggregate: "article".into(),
events: vec!["article_created".into()],
snapshot_threshold: None,
})
.unwrap();
let mut update = SchemaUpdate::default();
update.column_type = Some(("description".into(), Some(ColumnType::Text)));
manager.update("article", update).unwrap();
let mut rules = FieldRules::default();
rules.required = true;
rules.contains.push("foo".into());
rules.does_not_contain.push("bar".into());
let mut rules_update = SchemaUpdate::default();
rules_update.column_rules = Some(("description".into(), Some(rules)));
manager.update("article", rules_update).unwrap();
manager
.validate_event(
"article",
"article_created",
&json!({ "description": "foo baz" }),
)
.unwrap();
let err = manager
.validate_event(
"article",
"article_created",
&json!({ "description": "baz bar" }),
)
.unwrap_err();
assert!(matches!(err, EventError::SchemaViolation(_)));
}
#[test]
fn enforces_range_rules() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("schemas.json");
let manager = SchemaManager::load(path).unwrap();
manager
.create(CreateSchemaInput {
aggregate: "metrics".into(),
events: vec!["metrics_recorded".into()],
snapshot_threshold: None,
})
.unwrap();
let mut update = SchemaUpdate::default();
update.column_type = Some(("count".into(), Some(ColumnType::Integer)));
manager.update("metrics", update).unwrap();
let mut rules = FieldRules::default();
rules.range = Some(RangeRule {
min: Some("1".into()),
max: Some("10".into()),
});
let mut rules_update = SchemaUpdate::default();
rules_update.column_rules = Some(("count".into(), Some(rules)));
manager.update("metrics", rules_update).unwrap();
manager
.validate_event("metrics", "metrics_recorded", &json!({ "count": 5 }))
.unwrap();
let err = manager
.validate_event("metrics", "metrics_recorded", &json!({ "count": 0 }))
.unwrap_err();
assert!(matches!(err, EventError::SchemaViolation(_)));
}
#[test]
fn enforces_format_rules() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("schemas.json");
let manager = SchemaManager::load(path).unwrap();
manager
.create(CreateSchemaInput {
aggregate: "contact".into(),
events: vec!["contact_added".into()],
snapshot_threshold: None,
})
.unwrap();
let mut update = SchemaUpdate::default();
update.column_type = Some(("email".into(), Some(ColumnType::Text)));
manager.update("contact", update).unwrap();
let mut rules = FieldRules::default();
rules.format = Some(FieldFormat::Email);
rules.required = true;
let mut rules_update = SchemaUpdate::default();
rules_update.column_rules = Some(("email".into(), Some(rules)));
manager.update("contact", rules_update).unwrap();
manager
.validate_event(
"contact",
"contact_added",
&json!({ "email": "user@example.com" }),
)
.unwrap();
let err = manager
.validate_event(
"contact",
"contact_added",
&json!({ "email": "not-an-email" }),
)
.unwrap_err();
assert!(matches!(err, EventError::SchemaViolation(_)));
}
#[test]
fn enforces_must_match_rule() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("schemas.json");
let manager = SchemaManager::load(path).unwrap();
manager
.create(CreateSchemaInput {
aggregate: "user".into(),
events: vec!["user_created".into()],
snapshot_threshold: None,
})
.unwrap();
let mut update = SchemaUpdate::default();
update.column_type = Some(("password".into(), Some(ColumnType::Text)));
manager.update("user", update).unwrap();
let mut update_confirm = SchemaUpdate::default();
update_confirm.column_type = Some(("password_confirmation".into(), Some(ColumnType::Text)));
manager.update("user", update_confirm).unwrap();
let mut rules = FieldRules::default();
rules.must_match = Some("password".into());
rules.required = true;
let mut rules_update = SchemaUpdate::default();
rules_update.column_rules = Some(("password_confirmation".into(), Some(rules)));
manager.update("user", rules_update).unwrap();
manager
.validate_event(
"user",
"user_created",
&json!({
"password": "secret",
"password_confirmation": "secret"
}),
)
.unwrap();
let err = manager
.validate_event(
"user",
"user_created",
&json!({
"password": "secret",
"password_confirmation": "mismatch"
}),
)
.unwrap_err();
assert!(matches!(err, EventError::SchemaViolation(_)));
}
#[test]
fn validates_nested_object_properties() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("schemas.json");
let manager = SchemaManager::load(path).unwrap();
manager
.create(CreateSchemaInput {
aggregate: "customer".into(),
events: vec!["customer_created".into()],
snapshot_threshold: None,
})
.unwrap();
let mut update = SchemaUpdate::default();
update.column_type = Some(("address".into(), Some(ColumnType::Object)));
manager.update("customer", update).unwrap();
let mut child_rules = FieldRules::default();
child_rules.required = true;
let mut rules = FieldRules::default();
rules.properties.insert(
"city".into(),
ColumnSettings {
column_type: ColumnType::Text,
rules: child_rules,
},
);
let mut rules_update = SchemaUpdate::default();
rules_update.column_rules = Some(("address".into(), Some(rules)));
manager.update("customer", rules_update).unwrap();
manager
.validate_event(
"customer",
"customer_created",
&json!({ "address": { "city": "Paris" } }),
)
.unwrap();
let err = manager
.validate_event("customer", "customer_created", &json!({ "address": {} }))
.unwrap_err();
assert!(matches!(err, EventError::SchemaViolation(_)));
}
}