use datafold::fold_db_core::infrastructure::message_bus::query_events::MutationExecuted;
use datafold::fold_db_core::infrastructure::message_bus::Event;
use datafold::fold_db_core::FoldDB;
use datafold::schema::SchemaState;
use serde_json::json;
use std::time::Duration;
use tempfile::TempDir;
#[tokio::test]
async fn test_transform_requires_approval_to_execute() {
let temp_dir = TempDir::new().expect("Failed to create temp directory");
let test_db_path = temp_dir
.path()
.to_str()
.expect("Failed to convert path to string");
let fold_db = FoldDB::new(test_db_path)
.await
.expect("Failed to create FoldDB");
let blogpost_schema_json = json!({
"name": "BlogPost",
"key": {
"range_field": "publish_date"
},
"fields": {
"title": {},
"content": {},
"author": {},
"publish_date": {},
"tags": {}
}
});
let blogpost_schema_str =
serde_json::to_string(&blogpost_schema_json).expect("Failed to serialize BlogPost schema");
fold_db
.schema_manager()
.load_schema_from_json(&blogpost_schema_str)
.await
.expect("Failed to load BlogPost schema");
fold_db
.schema_manager()
.set_schema_state("BlogPost", SchemaState::Approved)
.await
.expect("Failed to approve BlogPost schema");
let wordindex_schema_json = json!({
"name": "BlogPostWordIndex",
"key": {
"hash_field": "word",
"range_field": "publish_date"
},
"transform_fields": {
"word": "BlogPost.content.split_by_word()",
"publish_date": "BlogPost.publish_date",
"content": "BlogPost.content",
"author": "BlogPost.author",
"title": "BlogPost.title",
"tags": "BlogPost.tags"
}
});
let wordindex_schema_str = serde_json::to_string(&wordindex_schema_json)
.expect("Failed to serialize BlogPostWordIndex schema");
fold_db
.schema_manager()
.load_schema_from_json(&wordindex_schema_str)
.await
.expect("Failed to load BlogPostWordIndex schema");
tokio::time::sleep(Duration::from_millis(100)).await;
let schema_states = fold_db
.schema_manager()
.get_schema_states()
.expect("Failed to get schema states");
assert_eq!(
schema_states
.get("BlogPostWordIndex")
.copied()
.unwrap_or_default(),
SchemaState::Available,
"BlogPostWordIndex should be in Available state"
);
println!("✅ BlogPostWordIndex is in Available state (not approved)");
let transform_manager = fold_db.transform_manager();
let registered_transforms = transform_manager
.list_transforms()
.expect("Failed to list transforms");
assert!(
registered_transforms.contains_key("BlogPostWordIndex"),
"Transform should be registered even if schema is not approved"
);
println!("✅ Transform 'BlogPostWordIndex' is registered");
let message_bus = fold_db.message_bus();
let mut transform_triggered_consumer = message_bus.subscribe("TransformTriggered").await;
let mut transform_executed_consumer = message_bus.subscribe("TransformExecuted").await;
let mutation_event = MutationExecuted::new(
"write_mutation",
"BlogPost",
44,
vec![
"content".to_string(),
"tags".to_string(),
"publish_date".to_string(),
"author".to_string(),
"title".to_string(),
],
);
message_bus
.publish_event(Event::MutationExecuted(mutation_event))
.await
.expect("Failed to publish MutationExecuted event");
tokio::time::sleep(Duration::from_millis(200)).await;
let mut triggered_transform_ids = Vec::new();
while let Ok(event) = transform_triggered_consumer.try_recv() {
if let Event::TransformTriggered(e) = event {
triggered_transform_ids.push(e.transform_id);
}
}
let mut executed_results = Vec::new();
while let Ok(event) = transform_executed_consumer.try_recv() {
if let Event::TransformExecuted(e) = event {
executed_results.push((e.transform_id, e.result));
}
}
println!("📋 Triggered transforms: {:?}", triggered_transform_ids);
println!("📋 Executed transforms: {:?}", executed_results);
assert!(
!triggered_transform_ids.contains(&"BlogPostWordIndex".to_string()),
"Transform should NOT be triggered for unapproved schema (filtered before event emission)"
);
let blogpost_word_index_execution = executed_results
.iter()
.find(|(id, _)| id == "BlogPostWordIndex");
assert!(
blogpost_word_index_execution.is_none(),
"Transform should NOT be executed for unapproved schema"
);
println!("✅ Transform correctly filtered - no TransformTriggered or TransformExecuted events emitted");
}
#[tokio::test]
async fn test_transform_executes_when_approved() {
let temp_dir = TempDir::new().expect("Failed to create temp directory");
let test_db_path = temp_dir
.path()
.to_str()
.expect("Failed to convert path to string");
let fold_db = FoldDB::new(test_db_path)
.await
.expect("Failed to create FoldDB");
let blogpost_schema_json = json!({
"name": "BlogPost",
"key": {
"range_field": "publish_date"
},
"fields": {
"title": {},
"content": {},
"author": {},
"publish_date": {},
"tags": {}
}
});
let blogpost_schema_str =
serde_json::to_string(&blogpost_schema_json).expect("Failed to serialize BlogPost schema");
fold_db
.schema_manager()
.load_schema_from_json(&blogpost_schema_str)
.await
.expect("Failed to load BlogPost schema");
fold_db
.schema_manager()
.set_schema_state("BlogPost", SchemaState::Approved)
.await
.expect("Failed to approve BlogPost schema");
let wordindex_schema_json = json!({
"name": "BlogPostWordIndex",
"key": {
"hash_field": "word",
"range_field": "publish_date"
},
"transform_fields": {
"word": "BlogPost.content.split_by_word()",
"publish_date": "BlogPost.publish_date",
"content": "BlogPost.content",
"author": "BlogPost.author",
"title": "BlogPost.title",
"tags": "BlogPost.tags"
}
});
let wordindex_schema_str = serde_json::to_string(&wordindex_schema_json)
.expect("Failed to serialize BlogPostWordIndex schema");
fold_db
.schema_manager()
.load_schema_from_json(&wordindex_schema_str)
.await
.expect("Failed to load BlogPostWordIndex schema");
fold_db
.schema_manager()
.set_schema_state("BlogPostWordIndex", SchemaState::Approved)
.await
.expect("Failed to approve BlogPostWordIndex schema");
tokio::time::sleep(Duration::from_millis(100)).await;
let schema_states = fold_db
.schema_manager()
.get_schema_states()
.expect("Failed to get schema states");
assert_eq!(
schema_states
.get("BlogPostWordIndex")
.copied()
.unwrap_or_default(),
SchemaState::Approved,
"BlogPostWordIndex should be in Approved state"
);
println!("✅ BlogPostWordIndex is in Approved state");
let message_bus = fold_db.message_bus();
let mut transform_executed_consumer = message_bus.subscribe("TransformExecuted").await;
let mutation_event = MutationExecuted::new(
"write_mutation",
"BlogPost",
44,
vec![
"content".to_string(),
"tags".to_string(),
"publish_date".to_string(),
"author".to_string(),
"title".to_string(),
],
);
message_bus
.publish_event(Event::MutationExecuted(mutation_event))
.await
.expect("Failed to publish MutationExecuted event");
tokio::time::sleep(Duration::from_millis(500)).await;
let mut executed_results = Vec::new();
while let Ok(event) = transform_executed_consumer.try_recv() {
if let Event::TransformExecuted(e) = event {
executed_results.push((e.transform_id, e.result));
}
}
println!("📋 Executed transforms: {:?}", executed_results);
let blogpost_word_index_execution = executed_results
.iter()
.find(|(id, _)| id == "BlogPostWordIndex");
if let Some((_, result)) = blogpost_word_index_execution {
assert!(
!result.contains("not approved"),
"Transform execution should succeed for approved schema, got: {}",
result
);
println!("✅ Transform execution succeeded: {}", result);
} else {
panic!("Transform should have been executed for approved schema");
}
}