use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::{Arc, RwLock};
use crate::fold_db_core::infrastructure::message_bus::AsyncMessageBus;
use crate::schema::types::{SchemaError, Transform};
pub struct TransformManager {
pub db_ops: Arc<crate::db_operations::DbOperations>,
pub(super) registered_transforms: RwLock<HashMap<String, Transform>>,
pub(super) schema_field_to_transforms: RwLock<BTreeMap<String, HashSet<String>>>,
pub(super) message_bus: Arc<AsyncMessageBus>,
}
impl TransformManager {
pub async fn new(
db_ops: std::sync::Arc<crate::db_operations::DbOperations>,
message_bus: Arc<AsyncMessageBus>,
) -> Result<Self, SchemaError> {
let (registered_transforms, schema_field_to_transforms) =
db_ops.load_transform_state().await?;
let manager = Self {
db_ops: Arc::clone(&db_ops),
registered_transforms: RwLock::new(registered_transforms),
schema_field_to_transforms: RwLock::new(schema_field_to_transforms),
message_bus: Arc::clone(&message_bus),
};
Ok(manager)
}
pub fn list_transforms(&self) -> Result<HashMap<String, Transform>, SchemaError> {
let transforms = self
.registered_transforms
.read()
.map_err(|e| SchemaError::InvalidData(format!("Failed to acquire read lock: {}", e)))?;
Ok(transforms.clone())
}
pub fn transform_exists(&self, transform_id: &str) -> Result<bool, SchemaError> {
let transforms = self
.registered_transforms
.read()
.map_err(|e| SchemaError::InvalidData(format!("Failed to acquire read lock: {}", e)))?;
Ok(transforms.contains_key(transform_id))
}
pub async fn get_schema_state(
&self,
schema_name: &str,
) -> Result<Option<crate::schema::SchemaState>, SchemaError> {
self.db_ops.get_schema_state(schema_name).await
}
pub fn get_transforms_for_field(
&self,
schema_name: &str,
field_name: &str,
) -> Result<HashSet<String>, SchemaError> {
let key = format!("{}.{}", schema_name, field_name);
let mappings = self
.schema_field_to_transforms
.read()
.map_err(|e| SchemaError::InvalidData(format!("Failed to acquire read lock: {}", e)))?;
let field_to_transforms = mappings.get(&key).cloned().unwrap_or_default();
Ok(field_to_transforms)
}
pub async fn handle_transform_registration(
&self,
registration: &crate::schema::types::TransformRegistration,
) -> Result<(), SchemaError> {
let transform_id = ®istration.transform_id;
let transform = ®istration.transform;
let trigger_fields = ®istration.trigger_fields;
self.update_in_memory_registration(transform_id, transform, trigger_fields)?;
let (transforms, mappings) = {
let t = self
.registered_transforms
.read()
.map_err(|e| {
SchemaError::InvalidData(format!("Failed to acquire read lock: {}", e))
})?
.clone();
let m = self
.schema_field_to_transforms
.read()
.map_err(|e| {
SchemaError::InvalidData(format!("Failed to acquire read lock: {}", e))
})?
.clone();
(t, m)
};
self.db_ops
.sync_transform_state(&transforms, &mappings)
.await?;
Ok(())
}
fn update_in_memory_registration(
&self,
transform_id: &str,
transform: &Transform,
trigger_fields: &[String],
) -> Result<(), SchemaError> {
{
let mut registered_transforms = self.registered_transforms.write().map_err(|e| {
SchemaError::InvalidData(format!("Failed to acquire write lock: {}", e))
})?;
registered_transforms.insert(transform_id.to_string(), transform.clone());
}
{
let mut field_to_transforms = self.schema_field_to_transforms.write().map_err(|e| {
SchemaError::InvalidData(format!("Failed to acquire write lock: {}", e))
})?;
for field in trigger_fields {
field_to_transforms
.entry(field.clone())
.or_insert_with(HashSet::new)
.insert(transform_id.to_string());
}
}
Ok(())
}
}