use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::Arc;
use crate::{
application::services::projection::Projection, domain::entities::Event, error::Result,
};
pub const SCHEMA_REGISTERED: &str = "prime.schema.registered";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SchemaEntry {
pub type_name: String,
pub kind: SchemaKind,
pub schema: Value,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum SchemaKind {
Node,
Edge,
}
#[derive(Debug, Clone)]
pub struct ValidationError {
pub type_name: String,
pub missing_fields: Vec<String>,
}
impl std::fmt::Display for ValidationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"validation failed for {}: missing required fields: {}",
self.type_name,
self.missing_fields.join(", ")
)
}
}
pub struct SchemaProjection {
name: String,
node_schemas: Arc<DashMap<String, SchemaEntry>>,
edge_schemas: Arc<DashMap<String, SchemaEntry>>,
}
impl SchemaProjection {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
node_schemas: Arc::new(DashMap::new()),
edge_schemas: Arc::new(DashMap::new()),
}
}
pub fn register_node_schema(&self, node_type: &str, schema: Value) {
self.node_schemas.insert(
node_type.to_string(),
SchemaEntry {
type_name: node_type.to_string(),
kind: SchemaKind::Node,
schema,
},
);
}
pub fn register_edge_schema(&self, relation: &str, schema: Value) {
self.edge_schemas.insert(
relation.to_string(),
SchemaEntry {
type_name: relation.to_string(),
kind: SchemaKind::Edge,
schema,
},
);
}
pub fn validate_node(
&self,
node_type: &str,
properties: &Value,
) -> std::result::Result<(), ValidationError> {
if let Some(entry) = self.node_schemas.get(node_type) {
validate_properties(node_type, &entry.schema, properties)
} else {
Ok(()) }
}
pub fn validate_edge(
&self,
relation: &str,
properties: Option<&Value>,
) -> std::result::Result<(), ValidationError> {
if let Some(entry) = self.edge_schemas.get(relation) {
let props = properties
.cloned()
.unwrap_or(Value::Object(Default::default()));
validate_properties(relation, &entry.schema, &props)
} else {
Ok(())
}
}
pub fn schemas(&self) -> Vec<SchemaEntry> {
let mut result: Vec<SchemaEntry> = self
.node_schemas
.iter()
.map(|e| e.value().clone())
.collect();
result.extend(self.edge_schemas.iter().map(|e| e.value().clone()));
result
}
}
fn validate_properties(
type_name: &str,
schema: &Value,
properties: &Value,
) -> std::result::Result<(), ValidationError> {
let mut missing = Vec::new();
if let Some(required) = schema.get("required").and_then(|v| v.as_array()) {
for field in required {
if let Some(field_name) = field.as_str()
&& properties.get(field_name).is_none()
{
missing.push(field_name.to_string());
}
}
}
if missing.is_empty() {
Ok(())
} else {
Err(ValidationError {
type_name: type_name.to_string(),
missing_fields: missing,
})
}
}
impl Projection for SchemaProjection {
fn name(&self) -> &str {
&self.name
}
fn process(&self, event: &Event) -> Result<()> {
if event.event_type_str() != SCHEMA_REGISTERED {
return Ok(());
}
let payload = &event.payload;
let type_name = match payload.get("type_name").and_then(|v| v.as_str()) {
Some(t) => t.to_string(),
None => return Ok(()),
};
let kind = match payload.get("kind").and_then(|v| v.as_str()) {
Some("node") => SchemaKind::Node,
Some("edge") => SchemaKind::Edge,
_ => return Ok(()),
};
let schema = match payload.get("schema") {
Some(s) => s.clone(),
None => return Ok(()),
};
match kind {
SchemaKind::Node => self.register_node_schema(&type_name, schema),
SchemaKind::Edge => self.register_edge_schema(&type_name, schema),
}
Ok(())
}
fn get_state(&self, _entity_id: &str) -> Option<Value> {
serde_json::to_value(self.schemas()).ok()
}
fn clear(&self) {
self.node_schemas.clear();
self.edge_schemas.clear();
}
fn snapshot(&self) -> Option<Value> {
serde_json::to_value(self.schemas()).ok()
}
fn restore(&self, snapshot: &Value) -> Result<()> {
let entries: Vec<SchemaEntry> = serde_json::from_value(snapshot.clone())
.map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
self.node_schemas.clear();
self.edge_schemas.clear();
for entry in entries {
match entry.kind {
SchemaKind::Node => {
self.node_schemas.insert(entry.type_name.clone(), entry);
}
SchemaKind::Edge => {
self.edge_schemas.insert(entry.type_name.clone(), entry);
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use uuid::Uuid;
#[test]
fn test_validate_node_with_required_fields_passes() {
let proj = SchemaProjection::new("schema");
proj.register_node_schema("person", serde_json::json!({"required": ["name"]}));
let result = proj.validate_node("person", &serde_json::json!({"name": "Alice"}));
assert!(result.is_ok());
}
#[test]
fn test_validate_node_missing_required_field_fails() {
let proj = SchemaProjection::new("schema");
proj.register_node_schema("person", serde_json::json!({"required": ["name", "email"]}));
let result = proj.validate_node("person", &serde_json::json!({"name": "Alice"}));
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.missing_fields, vec!["email"]);
}
#[test]
fn test_validate_node_no_schema_accepts_anything() {
let proj = SchemaProjection::new("schema");
let result = proj.validate_node("unknown_type", &serde_json::json!({}));
assert!(result.is_ok());
}
#[test]
fn test_validate_edge_with_schema() {
let proj = SchemaProjection::new("schema");
proj.register_edge_schema("works_on", serde_json::json!({"required": ["since"]}));
let result = proj.validate_edge("works_on", Some(&serde_json::json!({"since": "2026-01"})));
assert!(result.is_ok());
let result = proj.validate_edge("works_on", Some(&serde_json::json!({})));
assert!(result.is_err());
}
#[test]
fn test_validate_edge_no_properties_fails_if_required() {
let proj = SchemaProjection::new("schema");
proj.register_edge_schema("works_on", serde_json::json!({"required": ["since"]}));
let result = proj.validate_edge("works_on", None);
assert!(result.is_err());
}
#[test]
fn test_validate_edge_no_schema_accepts_anything() {
let proj = SchemaProjection::new("schema");
let result = proj.validate_edge("unknown_relation", None);
assert!(result.is_ok());
}
#[test]
fn test_schemas_list() {
let proj = SchemaProjection::new("schema");
proj.register_node_schema("person", serde_json::json!({"required": ["name"]}));
proj.register_edge_schema("works_on", serde_json::json!({"required": ["since"]}));
let schemas = proj.schemas();
assert_eq!(schemas.len(), 2);
}
#[test]
fn test_process_schema_registered_event() {
let proj = SchemaProjection::new("schema");
let event = Event::reconstruct_from_strings(
Uuid::new_v4(),
SCHEMA_REGISTERED.to_string(),
"schema:person".to_string(),
"default".to_string(),
serde_json::json!({
"type_name": "person",
"kind": "node",
"schema": {"required": ["name"]},
}),
Utc::now(),
None,
1,
);
proj.process(&event).unwrap();
let result = proj.validate_node("person", &serde_json::json!({}));
assert!(result.is_err());
let result = proj.validate_node("person", &serde_json::json!({"name": "Alice"}));
assert!(result.is_ok());
}
#[test]
fn test_snapshot_and_restore() {
let proj = SchemaProjection::new("schema");
proj.register_node_schema("person", serde_json::json!({"required": ["name"]}));
let snap = proj.snapshot().expect("snapshot should be Some");
proj.clear();
assert!(proj.validate_node("person", &serde_json::json!({})).is_ok());
proj.restore(&snap).unwrap();
assert!(
proj.validate_node("person", &serde_json::json!({}))
.is_err()
); }
#[test]
fn test_validation_error_display() {
let err = ValidationError {
type_name: "person".to_string(),
missing_fields: vec!["name".to_string(), "email".to_string()],
};
assert_eq!(
err.to_string(),
"validation failed for person: missing required fields: name, email"
);
}
}