use super::SchemaCore;
use crate::fold_db_core::infrastructure::message_bus::Event;
use crate::schema::types::{DeclarativeSchemaDefinition, Schema, SchemaError};
use std::collections::HashMap;
use std::path::Path;
impl SchemaCore {
pub async fn parse_schema_file(&self, path: &Path) -> Result<Option<Schema>, SchemaError> {
let contents = match std::fs::read_to_string(path) {
Ok(c) => c,
Err(e) => {
return Err(SchemaError::InvalidData(format!(
"Failed to read {}: {}",
path.display(),
e
)))
}
};
let declarative_schema = serde_json::from_str::<DeclarativeSchemaDefinition>(&contents)
.map_err(|e| {
SchemaError::InvalidData(format!("Failed to parse declarative schema: {}", e))
})?;
Ok(Some(
self.interpret_declarative_schema(declarative_schema)
.await?,
))
}
pub async fn interpret_declarative_schema(
&self,
mut declarative_schema: DeclarativeSchemaDefinition,
) -> Result<Schema, SchemaError> {
let fields_to_check: Vec<String> = declarative_schema.fields.clone().unwrap_or_default();
for field_name in fields_to_check {
if !declarative_schema
.field_topologies
.contains_key(&field_name)
{
declarative_schema.set_field_topology(
field_name,
crate::schema::types::JsonTopology::new(
crate::schema::types::TopologyNode::Any,
),
);
}
}
declarative_schema.populate_runtime_fields()?;
if let Some(transform_fields) = &declarative_schema.transform_fields {
self.register_declarative_transforms(&declarative_schema, transform_fields)
.await?;
}
Ok(declarative_schema)
}
pub(crate) async fn register_declarative_transforms(
&self,
declarative_schema: &DeclarativeSchemaDefinition,
transform_fields: &HashMap<String, String>,
) -> Result<(), SchemaError> {
use crate::fold_db_core::infrastructure::message_bus::events::schema_events::TransformRegistrationRequest;
use crate::schema::types::transform::{Transform, TransformRegistration};
use uuid::Uuid;
let transform_id = declarative_schema.name.clone();
let transform = Transform::from_schema_name(declarative_schema.name.clone());
let mut all_trigger_fields = Vec::new();
for field_expression in transform_fields.values() {
let fields =
DeclarativeSchemaDefinition::extract_inputs_from_expression(field_expression);
all_trigger_fields.extend(fields);
}
let unique_trigger_fields: std::collections::HashSet<_> =
all_trigger_fields.into_iter().collect();
let trigger_fields: Vec<String> = unique_trigger_fields.into_iter().collect();
let registration = TransformRegistration {
transform_id: transform_id.clone(),
transform,
trigger_fields,
};
let correlation_id = Uuid::new_v4().to_string();
let registration_request = TransformRegistrationRequest {
registration,
correlation_id,
};
self.get_message_bus()
.publish_event(Event::TransformRegistrationRequest(registration_request))
.await
.map_err(|e| {
SchemaError::InvalidData(format!(
"Failed to publish transform registration request: {}",
e
))
})?;
Ok(())
}
}