#[cfg(test)]
use crate::EventMetadata;
use crate::StreamEvent;
use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum SchemaFormat {
JsonSchema,
Avro,
Protobuf,
RdfSparql,
Custom { format_name: String },
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum CompatibilityMode {
None,
Backward,
Forward,
Full,
Breaking,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SchemaDefinition {
pub id: Uuid,
pub subject: String,
pub version: u32,
pub format: SchemaFormat,
pub schema_content: String,
pub title: Option<String>,
pub description: Option<String>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub compatibility: CompatibilityMode,
pub tags: Vec<String>,
pub metadata: HashMap<String, String>,
}
impl SchemaDefinition {
pub fn new(
subject: String,
version: u32,
format: SchemaFormat,
schema_content: String,
) -> Self {
let now = Utc::now();
Self {
id: Uuid::new_v4(),
subject,
version,
format,
schema_content,
title: None,
description: None,
created_at: now,
updated_at: now,
compatibility: CompatibilityMode::Backward,
tags: Vec::new(),
metadata: HashMap::new(),
}
}
pub fn update_content(&mut self, content: String) {
self.schema_content = content;
self.updated_at = Utc::now();
}
pub fn add_tag(&mut self, tag: String) {
if !self.tags.contains(&tag) {
self.tags.push(tag);
}
}
pub fn set_metadata(&mut self, key: String, value: String) {
self.metadata.insert(key, value);
self.updated_at = Utc::now();
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValidationResult {
pub is_valid: bool,
pub errors: Vec<String>,
pub warnings: Vec<String>,
pub schema_id: Uuid,
pub schema_version: u32,
pub validated_at: DateTime<Utc>,
}
impl ValidationResult {
pub fn success(schema_id: Uuid, schema_version: u32) -> Self {
Self {
is_valid: true,
errors: Vec::new(),
warnings: Vec::new(),
schema_id,
schema_version,
validated_at: Utc::now(),
}
}
pub fn failure(schema_id: Uuid, schema_version: u32, errors: Vec<String>) -> Self {
Self {
is_valid: false,
errors,
warnings: Vec::new(),
schema_id,
schema_version,
validated_at: Utc::now(),
}
}
pub fn add_warning(&mut self, warning: String) {
self.warnings.push(warning);
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SchemaRegistryConfig {
pub enable_validation: bool,
pub strict_mode: bool,
pub enable_caching: bool,
pub cache_ttl_seconds: u64,
pub external_registry: Option<ExternalRegistryConfig>,
pub default_compatibility: CompatibilityMode,
pub max_versions_per_subject: u32,
}
impl Default for SchemaRegistryConfig {
fn default() -> Self {
Self {
enable_validation: true,
strict_mode: false,
enable_caching: true,
cache_ttl_seconds: 3600, external_registry: None,
default_compatibility: CompatibilityMode::Backward,
max_versions_per_subject: 10,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExternalRegistryConfig {
pub registry_type: String,
pub url: String,
pub auth: Option<RegistryAuth>,
pub enable_sync: bool,
pub sync_interval_seconds: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegistryAuth {
pub auth_type: String,
pub username: Option<String>,
pub password: Option<String>,
pub token: Option<String>,
}
pub struct SchemaRegistry {
config: SchemaRegistryConfig,
schemas: Arc<RwLock<HashMap<String, HashMap<u32, SchemaDefinition>>>>,
schema_cache: Arc<RwLock<HashMap<Uuid, SchemaDefinition>>>,
latest_versions: Arc<RwLock<HashMap<String, u32>>>,
validation_stats: Arc<RwLock<ValidationStats>>,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct ValidationStats {
pub total_validations: u64,
pub successful_validations: u64,
pub failed_validations: u64,
pub warnings_count: u64,
pub validation_time_ms: f64,
pub cache_hits: u64,
pub cache_misses: u64,
}
impl SchemaRegistry {
pub fn new(config: SchemaRegistryConfig) -> Self {
Self {
config,
schemas: Arc::new(RwLock::new(HashMap::new())),
schema_cache: Arc::new(RwLock::new(HashMap::new())),
latest_versions: Arc::new(RwLock::new(HashMap::new())),
validation_stats: Arc::new(RwLock::new(ValidationStats::default())),
}
}
pub async fn register_schema(
&self,
subject: String,
format: SchemaFormat,
schema_content: String,
compatibility: Option<CompatibilityMode>,
) -> Result<SchemaDefinition> {
let mut schemas = self.schemas.write().await;
let mut latest_versions = self.latest_versions.write().await;
let subject_schemas = schemas.entry(subject.clone()).or_insert_with(HashMap::new);
let next_version = latest_versions.get(&subject).map(|v| v + 1).unwrap_or(1);
if next_version > 1 {
let latest_version = next_version - 1;
if let Some(existing_schema) = subject_schemas.get(&latest_version) {
self.check_compatibility(existing_schema, &schema_content, format.clone())
.await?;
}
}
let mut schema =
SchemaDefinition::new(subject.clone(), next_version, format, schema_content);
if let Some(compat) = compatibility {
schema.compatibility = compat;
} else {
schema.compatibility = self.config.default_compatibility.clone();
}
subject_schemas.insert(next_version, schema.clone());
latest_versions.insert(subject.clone(), next_version);
if self.config.enable_caching {
let mut cache = self.schema_cache.write().await;
cache.insert(schema.id, schema.clone());
}
info!(
"Registered schema for subject '{}' version {} with ID {}",
subject, next_version, schema.id
);
Ok(schema)
}
pub async fn get_schema(
&self,
subject: &str,
version: Option<u32>,
) -> Result<Option<SchemaDefinition>> {
let schemas = self.schemas.read().await;
if let Some(subject_schemas) = schemas.get(subject) {
let version = if let Some(v) = version {
v
} else {
let latest_versions = self.latest_versions.read().await;
*latest_versions
.get(subject)
.ok_or_else(|| anyhow!("No schemas found for subject: {}", subject))?
};
Ok(subject_schemas.get(&version).cloned())
} else {
Ok(None)
}
}
pub async fn get_schema_by_id(&self, schema_id: &Uuid) -> Result<Option<SchemaDefinition>> {
if self.config.enable_caching {
let cache = self.schema_cache.read().await;
if let Some(schema) = cache.get(schema_id) {
let mut stats = self.validation_stats.write().await;
stats.cache_hits += 1;
return Ok(Some(schema.clone()));
}
}
let schemas = self.schemas.read().await;
for subject_schemas in schemas.values() {
for schema in subject_schemas.values() {
if &schema.id == schema_id {
if self.config.enable_caching {
let mut cache = self.schema_cache.write().await;
cache.insert(*schema_id, schema.clone());
}
let mut stats = self.validation_stats.write().await;
stats.cache_misses += 1;
return Ok(Some(schema.clone()));
}
}
}
Ok(None)
}
pub async fn list_schemas(&self, subject: &str) -> Result<Vec<SchemaDefinition>> {
let schemas = self.schemas.read().await;
if let Some(subject_schemas) = schemas.get(subject) {
let mut schemas: Vec<SchemaDefinition> = subject_schemas.values().cloned().collect();
schemas.sort_by(|a, b| a.version.cmp(&b.version));
Ok(schemas)
} else {
Ok(Vec::new())
}
}
pub async fn list_subjects(&self) -> Result<Vec<String>> {
let schemas = self.schemas.read().await;
Ok(schemas.keys().cloned().collect())
}
pub async fn validate_event(
&self,
event: &StreamEvent,
subject: Option<&str>,
) -> Result<ValidationResult> {
if !self.config.enable_validation {
return Ok(ValidationResult::success(Uuid::new_v4(), 1));
}
let start_time = std::time::Instant::now();
let mut stats = self.validation_stats.write().await;
stats.total_validations += 1;
drop(stats);
let event_subject = subject
.map(|s| s.to_string())
.or_else(|| self.extract_subject_from_event(event))
.ok_or_else(|| anyhow!("Cannot determine subject for validation"))?;
let schema = self
.get_schema(&event_subject, None)
.await?
.ok_or_else(|| anyhow!("No schema found for subject: {}", event_subject))?;
let validation_result = match schema.format {
SchemaFormat::JsonSchema => self.validate_with_json_schema(event, &schema).await?,
SchemaFormat::RdfSparql => self.validate_with_rdf_schema(event, &schema).await?,
SchemaFormat::Avro => self.validate_with_avro_schema(event, &schema).await?,
_ => {
warn!("Validation not implemented for format: {:?}", schema.format);
ValidationResult::success(schema.id, schema.version)
}
};
let elapsed = start_time.elapsed();
let mut stats = self.validation_stats.write().await;
stats.validation_time_ms = (stats.validation_time_ms + elapsed.as_millis() as f64) / 2.0;
if validation_result.is_valid {
stats.successful_validations += 1;
} else {
stats.failed_validations += 1;
}
stats.warnings_count += validation_result.warnings.len() as u64;
debug!(
"Validated event against schema {} ({}ms): {}",
schema.id,
elapsed.as_millis(),
if validation_result.is_valid {
"VALID"
} else {
"INVALID"
}
);
Ok(validation_result)
}
fn extract_subject_from_event(&self, event: &StreamEvent) -> Option<String> {
match event {
StreamEvent::TripleAdded { metadata, .. } => metadata
.properties
.get("subject")
.cloned()
.or_else(|| Some("rdf.triple.added".to_string())),
StreamEvent::TripleRemoved { metadata, .. } => metadata
.properties
.get("subject")
.cloned()
.or_else(|| Some("rdf.triple.removed".to_string())),
StreamEvent::SparqlUpdate { metadata, .. } => metadata
.properties
.get("subject")
.cloned()
.or_else(|| Some("sparql.update".to_string())),
StreamEvent::TransactionBegin { metadata, .. } => metadata
.properties
.get("subject")
.cloned()
.or_else(|| Some("transaction.begin".to_string())),
StreamEvent::TransactionCommit { metadata, .. } => metadata
.properties
.get("subject")
.cloned()
.or_else(|| Some("transaction.commit".to_string())),
_ => Some(format!("stream.event.{:?}", std::mem::discriminant(event))),
}
}
async fn check_compatibility(
&self,
existing_schema: &SchemaDefinition,
new_schema_content: &str,
new_format: SchemaFormat,
) -> Result<()> {
if existing_schema.compatibility == CompatibilityMode::None {
return Ok(());
}
if existing_schema.format != new_format {
return Err(anyhow!(
"Schema format changed from {:?} to {:?}",
existing_schema.format,
new_format
));
}
match new_format {
SchemaFormat::JsonSchema => {
self.check_json_schema_compatibility(existing_schema, new_schema_content)
.await
}
SchemaFormat::RdfSparql => {
self.check_rdf_schema_compatibility(existing_schema, new_schema_content)
.await
}
_ => {
warn!(
"Compatibility checking not implemented for format: {:?}",
new_format
);
Ok(())
}
}
}
async fn validate_with_json_schema(
&self,
_event: &StreamEvent,
schema: &SchemaDefinition,
) -> Result<ValidationResult> {
debug!("Validating with JSON schema: {}", schema.id);
Ok(ValidationResult::success(schema.id, schema.version))
}
async fn validate_with_rdf_schema(
&self,
event: &StreamEvent,
schema: &SchemaDefinition,
) -> Result<ValidationResult> {
match event {
StreamEvent::TripleAdded {
subject,
predicate,
object: _,
..
} => {
let mut errors = Vec::new();
if !subject.starts_with("http://") && !subject.starts_with("https://") {
errors.push(format!("Invalid subject URI: {subject}"));
}
if !predicate.starts_with("http://") && !predicate.starts_with("https://") {
errors.push(format!("Invalid predicate URI: {predicate}"));
}
if errors.is_empty() {
Ok(ValidationResult::success(schema.id, schema.version))
} else {
Ok(ValidationResult::failure(schema.id, schema.version, errors))
}
}
_ => Ok(ValidationResult::success(schema.id, schema.version)),
}
}
async fn validate_with_avro_schema(
&self,
_event: &StreamEvent,
schema: &SchemaDefinition,
) -> Result<ValidationResult> {
debug!("Validating with Avro schema: {}", schema.id);
Ok(ValidationResult::success(schema.id, schema.version))
}
async fn check_json_schema_compatibility(
&self,
_existing_schema: &SchemaDefinition,
_new_schema_content: &str,
) -> Result<()> {
Ok(())
}
async fn check_rdf_schema_compatibility(
&self,
_existing_schema: &SchemaDefinition,
_new_schema_content: &str,
) -> Result<()> {
Ok(())
}
pub async fn get_validation_stats(&self) -> ValidationStats {
let stats = self.validation_stats.read().await;
(*stats).clone()
}
pub async fn delete_schema(&self, subject: &str, version: Option<u32>) -> Result<bool> {
let mut schemas = self.schemas.write().await;
let mut latest_versions = self.latest_versions.write().await;
if let Some(subject_schemas) = schemas.get_mut(subject) {
if let Some(version) = version {
let removed = subject_schemas.remove(&version).is_some();
if let Some(latest) = latest_versions.get(subject) {
if *latest == version {
let new_latest = subject_schemas.keys().max().cloned();
if let Some(new_latest) = new_latest {
latest_versions.insert(subject.to_string(), new_latest);
} else {
latest_versions.remove(subject);
schemas.remove(subject);
}
}
}
Ok(removed)
} else {
schemas.remove(subject);
latest_versions.remove(subject);
Ok(true)
}
} else {
Ok(false)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use std::collections::HashMap;
#[tokio::test]
async fn test_schema_registration() -> Result<()> {
let config = SchemaRegistryConfig::default();
let registry = SchemaRegistry::new(config);
let schema_content = r#"
{
"type": "object",
"properties": {
"subject": {"type": "string"},
"predicate": {"type": "string"},
"object": {"type": "string"}
},
"required": ["subject", "predicate", "object"]
}"#;
let schema = registry
.register_schema(
"rdf.triple.added".to_string(),
SchemaFormat::JsonSchema,
schema_content.to_string(),
None,
)
.await?;
assert_eq!(schema.subject, "rdf.triple.added");
assert_eq!(schema.version, 1);
assert_eq!(schema.format, SchemaFormat::JsonSchema);
Ok(())
}
#[tokio::test]
async fn test_schema_retrieval() -> Result<()> {
let config = SchemaRegistryConfig::default();
let registry = SchemaRegistry::new(config);
let schema_content = r#"{"type": "object"}"#;
let registered_schema = registry
.register_schema(
"test.subject".to_string(),
SchemaFormat::JsonSchema,
schema_content.to_string(),
None,
)
.await?;
let retrieved = registry
.get_schema("test.subject", Some(1))
.await?
.expect("Schema should exist");
assert_eq!(retrieved.id, registered_schema.id);
assert_eq!(retrieved.version, 1);
let retrieved_by_id = registry
.get_schema_by_id(®istered_schema.id)
.await?
.expect("Schema should exist");
assert_eq!(retrieved_by_id.id, registered_schema.id);
Ok(())
}
#[tokio::test]
async fn test_event_validation() -> Result<()> {
let config = SchemaRegistryConfig::default();
let registry = SchemaRegistry::new(config);
let schema_content = "RDF Triple Schema";
registry
.register_schema(
"rdf.triple.added".to_string(),
SchemaFormat::RdfSparql,
schema_content.to_string(),
None,
)
.await?;
let event = StreamEvent::TripleAdded {
subject: "https://example.org/subject".to_string(),
predicate: "https://example.org/predicate".to_string(),
object: "\"Test Object\"".to_string(),
graph: None,
metadata: EventMetadata {
event_id: "test_event_1".to_string(),
timestamp: Utc::now(),
source: "test".to_string(),
user: None,
context: None,
caused_by: None,
version: "1.0".to_string(),
properties: HashMap::new(),
checksum: None,
},
};
let validation_result = registry
.validate_event(&event, Some("rdf.triple.added"))
.await?;
assert!(validation_result.is_valid);
assert!(validation_result.errors.is_empty());
Ok(())
}
#[tokio::test]
async fn test_schema_versioning() -> Result<()> {
let config = SchemaRegistryConfig::default();
let registry = SchemaRegistry::new(config);
let subject = "test.versioning".to_string();
let _v1 = registry
.register_schema(
subject.clone(),
SchemaFormat::JsonSchema,
r#"{"type": "object", "properties": {"name": {"type": "string"}}}"#.to_string(),
None,
)
.await?;
let _v2 = registry
.register_schema(
subject.clone(),
SchemaFormat::JsonSchema,
r#"{"type": "object", "properties": {"name": {"type": "string"}, "age": {"type": "integer"}}}"#.to_string(),
None,
)
.await?;
let schemas = registry.list_schemas(&subject).await?;
assert_eq!(schemas.len(), 2);
assert_eq!(schemas[0].version, 1);
assert_eq!(schemas[1].version, 2);
let latest = registry.get_schema(&subject, None).await?.unwrap();
assert_eq!(latest.version, 2);
Ok(())
}
}