use std::collections::HashMap;
use std::fs;
use std::sync::{Arc, Mutex};
use serde_json;
use crate::fold_db_core::infrastructure::message_bus::events::schema_events::SchemaApproved;
use crate::fold_db_core::infrastructure::message_bus::{AsyncMessageBus, Event};
use crate::schema::types::field::Field;
use crate::schema::types::{DeclarativeSchemaDefinition, Schema, SchemaError};
use crate::schema::{SchemaState, SchemaWithState};
pub struct SchemaCore {
schemas: Arc<Mutex<HashMap<String, Schema>>>,
schema_states: Arc<Mutex<HashMap<String, SchemaState>>>,
db_ops: std::sync::Arc<crate::db_operations::DbOperations>,
message_bus: Arc<AsyncMessageBus>,
}
impl SchemaCore {
pub async fn new(
db_ops: std::sync::Arc<crate::db_operations::DbOperations>,
message_bus: Arc<AsyncMessageBus>,
) -> Result<Self, SchemaError> {
let schemas = db_ops.get_all_schemas().await?;
let schema_states = db_ops.get_all_schema_states().await?;
let schema_core = Self {
schemas: Arc::new(Mutex::new(schemas.clone())),
schema_states: Arc::new(Mutex::new(schema_states)),
db_ops,
message_bus,
};
for (_, schema) in schemas {
if let Some(transform_fields) = &schema.transform_fields {
schema_core
.register_declarative_transforms(&schema, transform_fields)
.await?;
}
}
Ok(schema_core)
}
pub fn get_schemas(&self) -> Result<HashMap<String, Schema>, SchemaError> {
Ok(self
.schemas
.lock()
.map_err(|_| SchemaError::InvalidData("Failed to acquire schemas lock".to_string()))?
.clone())
}
pub fn get_schema_states(&self) -> Result<HashMap<String, SchemaState>, SchemaError> {
Ok(self
.schema_states
.lock()
.map_err(|_| {
SchemaError::InvalidData("Failed to acquire schema_states lock".to_string())
})?
.clone())
}
pub fn get_schemas_with_states(&self) -> Result<Vec<SchemaWithState>, SchemaError> {
let schemas = self.get_schemas()?;
let schema_states = self.get_schema_states()?;
let mut with_states = Vec::with_capacity(schemas.len());
for (name, schema) in schemas {
let state = schema_states.get(&name).copied().unwrap_or_default();
with_states.push(SchemaWithState::new(schema, state));
}
Ok(with_states)
}
pub async fn set_schema_state(
&self,
schema_name: &str,
schema_state: SchemaState,
) -> Result<(), SchemaError> {
self.set_schema_state_with_backfill(schema_name, schema_state, None)
.await
}
pub async fn approve(&self, schema_name: &str) -> Result<(), SchemaError> {
self.approve_with_backfill(schema_name, None).await
}
pub async fn approve_with_backfill(
&self,
schema_name: &str,
backfill_hash: Option<String>,
) -> Result<(), SchemaError> {
let current_state = self
.get_schema_states()?
.get(schema_name)
.copied()
.unwrap_or_default();
if current_state != SchemaState::Approved {
self.set_schema_state_with_backfill(schema_name, SchemaState::Approved, backfill_hash)
.await?;
}
Ok(())
}
pub async fn set_schema_state_with_backfill(
&self,
schema_name: &str,
schema_state: SchemaState,
backfill_hash: Option<String>,
) -> Result<(), SchemaError> {
if schema_state == SchemaState::Approved {
self.apply_field_mappers(schema_name).await?;
}
self.db_ops
.store_schema_state(schema_name, &schema_state)
.await?;
self.schema_states
.lock()
.map_err(|_| {
SchemaError::InvalidData("Failed to acquire schema_states lock".to_string())
})?
.insert(schema_name.to_string(), schema_state);
if schema_state == SchemaState::Approved {
let event = SchemaApproved {
schema_name: schema_name.to_string(),
backfill_hash,
};
self.message_bus
.publish_event(Event::SchemaApproved(event))
.await
.map_err(|e| {
SchemaError::InvalidData(format!(
"Failed to publish SchemaApproved event: {}",
e
))
})?;
}
Ok(())
}
async fn apply_field_mappers(&self, schema_name: &str) -> Result<(), SchemaError> {
let mut schema = self.db_ops.get_schema(schema_name).await?.ok_or_else(|| {
SchemaError::InvalidData(format!("Schema '{}' not found in database", schema_name))
})?;
let Some(field_mappers) = schema.field_mappers().cloned() else {
return Ok(());
};
if field_mappers.is_empty() {
return Ok(());
}
let mut source_cache: HashMap<String, Schema> = HashMap::new();
let mut updated = false;
for (target_field, mapper) in field_mappers {
let source_schema_name = mapper.source_schema().to_string();
let source_schema = if let Some(schema) = source_cache.get(&source_schema_name) {
schema
} else {
let fetched = self
.db_ops
.get_schema(&source_schema_name)
.await?
.ok_or_else(|| {
SchemaError::InvalidData(format!(
"Source schema '{}' for field mapper not found",
source_schema_name
))
})?;
source_cache.insert(source_schema_name.clone(), fetched);
source_cache
.get(&source_schema_name)
.expect("source schema inserted")
};
let source_field = source_schema
.runtime_fields
.get(mapper.source_field())
.ok_or_else(|| {
SchemaError::InvalidData(format!(
"Source field '{}.{}' not found for mapper",
source_schema_name,
mapper.source_field()
))
})?;
let molecule_uuid =
source_field
.common()
.molecule_uuid()
.cloned()
.ok_or_else(|| {
SchemaError::InvalidData(format!(
"Source field '{}.{}' is missing a molecule UUID",
source_schema_name,
mapper.source_field()
))
})?;
let target_runtime_field =
schema
.runtime_fields
.get_mut(&target_field)
.ok_or_else(|| {
SchemaError::InvalidData(format!(
"Target field '{}' not found while applying field mapper",
target_field
))
})?;
target_runtime_field
.common_mut()
.set_molecule_uuid(molecule_uuid.clone());
target_runtime_field
.common_mut()
.set_field_mappers(HashMap::from([(target_field.clone(), mapper.clone())]));
updated = true;
}
if updated {
schema.sync_molecule_uuids();
self.db_ops.store_schema(schema_name, &schema).await?;
self.schemas
.lock()
.map_err(|_| {
SchemaError::InvalidData("Failed to acquire schemas lock".to_string())
})?
.insert(schema_name.to_string(), schema);
}
Ok(())
}
pub async fn block_schema(&self, schema_name: &str) -> Result<(), SchemaError> {
self.set_schema_state(schema_name, SchemaState::Blocked)
.await
}
pub fn get_message_bus(&self) -> Arc<AsyncMessageBus> {
Arc::clone(&self.message_bus)
}
pub fn get_schema(&self, schema_name: &str) -> Result<Option<Schema>, SchemaError> {
Ok(self
.schemas
.lock()
.map_err(|_| SchemaError::InvalidData("Failed to acquire schemas lock".to_string()))?
.get(schema_name)
.cloned())
}
pub async fn fetch_schema(&self, schema_name: &str) -> Result<Option<Schema>, SchemaError> {
if let Some(schema) = self.get_schema(schema_name)? {
return Ok(Some(schema));
}
if let Some(schema) = self
.db_ops
.get_schema(schema_name)
.await
.map_err(|e| SchemaError::InvalidData(e.to_string()))?
{
self.load_schema_internal(schema.clone()).await?;
log::info!(
"Refreshed schema '{}' from database (stale cache)",
schema.name
);
return Ok(Some(schema));
}
Ok(None)
}
pub fn add_schema_available(&self, schema: Schema) -> Result<(), SchemaError> {
let mut schemas = self
.schemas
.lock()
.map_err(|_| SchemaError::InvalidData("Failed to acquire schemas lock".to_string()))?;
let mut schema_states = self.schema_states.lock().map_err(|_| {
SchemaError::InvalidData("Failed to acquire schema_states lock".to_string())
})?;
schemas.insert(schema.name.clone(), schema.clone());
schema_states.insert(schema.name.clone(), SchemaState::Available);
Ok(())
}
pub async fn load_schema_internal(&self, schema: Schema) -> Result<(), SchemaError> {
let name = schema.name.clone();
let existing_schema = self.db_ops.get_schema(&name).await?;
if existing_schema.is_some() {
{
let mut schemas = self.schemas.lock().map_err(|_| {
SchemaError::InvalidData("Failed to acquire schemas lock".to_string())
})?;
schemas.insert(name.clone(), schema);
}
let existing_state = self.db_ops.get_schema_state(&name).await?;
let mut schema_states = self.schema_states.lock().map_err(|_| {
SchemaError::InvalidData("Failed to acquire schema_states lock".to_string())
})?;
let state = existing_state.unwrap_or(SchemaState::Available);
schema_states.insert(name.clone(), state);
} else {
self.db_ops.store_schema(&name, &schema).await?;
self.db_ops
.store_schema_state(&name, &SchemaState::Available)
.await?;
{
let mut schemas = self.schemas.lock().map_err(|_| {
SchemaError::InvalidData("Failed to acquire schemas lock".to_string())
})?;
schemas.insert(name.clone(), schema);
}
{
let mut schema_states = self.schema_states.lock().map_err(|_| {
SchemaError::InvalidData("Failed to acquire schema_states lock".to_string())
})?;
schema_states.insert(name.clone(), SchemaState::Available);
} }
Ok(())
}
pub async fn load_schema_from_json(&self, json_str: &str) -> Result<(), SchemaError> {
let declarative_schema: DeclarativeSchemaDefinition = serde_json::from_str(json_str)
.map_err(|e| {
SchemaError::InvalidData(format!("Failed to parse declarative schema: {}", e))
})?;
let schema = self
.interpret_declarative_schema(declarative_schema)
.await?;
self.load_schema_internal(schema).await
}
pub async fn load_schema_from_file<P: AsRef<std::path::Path>>(
&self,
path: P,
) -> Result<(), SchemaError> {
if let Some(schema) = self.parse_schema_file(path.as_ref()).await? {
self.load_schema_internal(schema).await
} else {
Err(SchemaError::InvalidData(
"No schema found in file".to_string(),
))
}
}
pub async fn load_schemas_from_directory<P: AsRef<std::path::Path>>(
&self,
directory: P,
) -> Result<usize, SchemaError> {
let dir_path = directory.as_ref();
if !dir_path.exists() {
return Ok(0);
}
let mut loaded_count: usize = 0;
let entries = fs::read_dir(dir_path).map_err(|e| {
SchemaError::InvalidData(format!(
"Failed to read directory {}: {}",
dir_path.display(),
e
))
})?;
for entry in entries {
let entry = entry.map_err(|e| {
SchemaError::InvalidData(format!(
"Failed to read entry in {}: {}",
dir_path.display(),
e
))
})?;
let path = entry.path();
if path.extension().map(|ext| ext == "json").unwrap_or(false) {
match self.load_schema_from_file(&path).await {
Ok(()) => {
loaded_count += 1;
}
Err(e) => {
log::warn!("Failed to load schema from file {}: {}", path.display(), e);
}
}
}
}
Ok(loaded_count)
}
pub async fn new_for_testing() -> Result<Self, SchemaError> {
let db = sled::Config::new()
.temporary(true)
.open()
.map_err(|e| SchemaError::InvalidData(e.to_string()))?;
let db_ops = std::sync::Arc::new(
crate::db_operations::DbOperations::from_sled(db)
.await
.map_err(|e| SchemaError::InvalidData(e.to_string()))?,
);
let message_bus = Arc::new(AsyncMessageBus::new());
Self::new(db_ops, message_bus).await
}
}
#[cfg(test)]
mod tests {
use super::*;
fn blogpost_schema_json() -> String {
r#"{
"name": "BlogPost",
"key": { "range_field": "publish_date" },
"fields": {
"title": {},
"content": {},
"author": {},
"publish_date": {}
}
}"#
.to_string()
}
fn wordindex_schema_json() -> String {
r#"{
"name": "BlogPostWordIndex",
"key": { "hash_field": "word", "range_field": "publish_date" },
"fields": {
"word": {},
"publish_date": {}
}
}"#
.to_string()
}
#[tokio::test]
async fn new_for_testing_starts_with_empty_schemas() {
let core = SchemaCore::new_for_testing().await.expect("init core");
let schemas = core.get_schemas().expect("get_schemas");
assert!(schemas.is_empty(), "expected no schemas at start");
}
#[tokio::test]
async fn load_schema_from_json_adds_available_schema() {
let core = SchemaCore::new_for_testing().await.expect("init core");
core.load_schema_from_json(&blogpost_schema_json())
.await
.expect("load blogpost");
let schemas = core.get_schemas().expect("get_schemas");
assert!(
schemas.contains_key("BlogPost"),
"BlogPost should be loaded"
);
let states = core.get_schema_states().expect("get states");
assert_eq!(states.get("BlogPost"), Some(&SchemaState::Available));
}
#[tokio::test]
async fn get_schemas_with_states_returns_default_available() {
let core = SchemaCore::new_for_testing().await.expect("init core");
core.load_schema_from_json(&blogpost_schema_json())
.await
.expect("load blogpost");
let schemas_with_states = core.get_schemas_with_states().expect("get with states");
assert_eq!(schemas_with_states.len(), 1);
let schema_entry = schemas_with_states
.iter()
.find(|entry| entry.name() == "BlogPost")
.expect("BlogPost entry");
assert_eq!(schema_entry.state, SchemaState::Available);
}
#[tokio::test]
async fn load_multiple_schemas_from_json() {
let core = SchemaCore::new_for_testing().await.expect("init core");
core.load_schema_from_json(&blogpost_schema_json())
.await
.expect("load blogpost");
core.load_schema_from_json(&wordindex_schema_json())
.await
.expect("load wordindex");
let schemas = core.get_schemas().expect("get_schemas");
assert!(schemas.contains_key("BlogPost"));
assert!(schemas.contains_key("BlogPostWordIndex"));
let states = core.get_schema_states().expect("get states");
assert_eq!(states.get("BlogPost"), Some(&SchemaState::Available));
assert_eq!(
states.get("BlogPostWordIndex"),
Some(&SchemaState::Available)
);
}
#[tokio::test]
async fn load_schema_from_file_works_with_declarative_format() {
let core = SchemaCore::new_for_testing().await.expect("init core");
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("BlogPost.json");
std::fs::write(&path, blogpost_schema_json()).expect("write schema json");
core.load_schema_from_file(&path)
.await
.expect("load from file");
let schemas = core.get_schemas().expect("get_schemas");
assert!(schemas.contains_key("BlogPost"));
}
#[tokio::test]
async fn blogpost_wordindex_sets_hashrange_keyconfig() {
use crate::schema::types::SchemaType;
let core = SchemaCore::new_for_testing().await.expect("init core");
core.load_schema_from_json(&wordindex_schema_json())
.await
.expect("load wordindex");
let schemas = core.get_schemas().expect("get_schemas");
let s = schemas.get("BlogPostWordIndex").expect("schema exists");
assert_eq!(s.schema_type, SchemaType::HashRange);
let key = s.key.as_ref().expect("key should be present");
assert_eq!(key.hash_field.as_deref(), Some("word"));
assert_eq!(key.range_field.as_deref(), Some("publish_date"));
}
#[tokio::test]
async fn load_wordindex_schema_from_file() {
let core = SchemaCore::new_for_testing().await.expect("init core");
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("BlogPostWordIndex.json");
std::fs::write(&path, wordindex_schema_json()).expect("write schema json");
core.load_schema_from_file(&path)
.await
.expect("load from file");
let schemas = core.get_schemas().expect("get_schemas");
assert!(schemas.contains_key("BlogPostWordIndex"));
}
}