use datafold::fold_db_core::infrastructure::message_bus::query_events::MutationExecuted;
use datafold::fold_db_core::infrastructure::message_bus::Event;
use datafold::fold_db_core::FoldDB;
use serde_json::json;
use std::time::Duration;
use tempfile::TempDir;
#[tokio::test(flavor = "multi_thread")]
async fn test_blogpost_mutation_triggers_transforms() {
let temp_dir = TempDir::new().expect("Failed to create temp directory");
let test_db_path = temp_dir
.path()
.to_str()
.expect("Failed to convert path to string");
let fold_db = FoldDB::new(test_db_path)
.await
.expect("Failed to create FoldDB");
let blogpost_schema_json = json!({
"name": "BlogPost",
"key": {
"range_field": "publish_date"
},
"fields": {
"title": {},
"content": {},
"author": {},
"publish_date": {},
"tags": {}
}
});
let blogpost_schema_str =
serde_json::to_string(&blogpost_schema_json).expect("Failed to serialize BlogPost schema");
fold_db
.schema_manager()
.load_schema_from_json(&blogpost_schema_str)
.await
.expect("Failed to load BlogPost schema");
let wordindex_schema_json = json!({
"name": "BlogPostWordIndex",
"key": {
"hash_field": "word",
"range_field": "publish_date"
},
"transform_fields": {
"word": "BlogPost.content.split_by_word()",
"publish_date": "BlogPost.publish_date",
"content": "BlogPost.content",
"author": "BlogPost.author",
"title": "BlogPost.title",
"tags": "BlogPost.tags"
}
});
let wordindex_schema_str = serde_json::to_string(&wordindex_schema_json)
.expect("Failed to serialize BlogPostWordIndex schema");
fold_db
.schema_manager()
.load_schema_from_json(&wordindex_schema_str)
.await
.expect("Failed to load BlogPostWordIndex schema");
tokio::time::sleep(Duration::from_millis(50)).await;
use datafold::schema::SchemaState;
fold_db
.schema_manager()
.set_schema_state("BlogPost", SchemaState::Approved)
.await
.expect("Failed to approve BlogPost schema");
fold_db
.schema_manager()
.set_schema_state("BlogPostWordIndex", SchemaState::Approved)
.await
.expect("Failed to approve BlogPostWordIndex schema");
let message_bus = fold_db.message_bus();
let mut transform_triggered_consumer = message_bus.subscribe("TransformTriggered").await;
let mut transform_executed_consumer = message_bus.subscribe("TransformExecuted").await;
let mutation_event = MutationExecuted::new(
"write_mutation",
"BlogPost",
44,
vec![
"content".to_string(),
"tags".to_string(),
"publish_date".to_string(),
"author".to_string(),
"title".to_string(),
],
);
message_bus
.publish_event(Event::MutationExecuted(mutation_event))
.await
.expect("Failed to publish MutationExecuted event");
let mut triggered_transform_ids = Vec::new();
let timeout = Duration::from_millis(500);
let start = std::time::Instant::now();
while start.elapsed() < timeout {
if let Ok(event) = transform_triggered_consumer.try_recv() {
if let Event::TransformTriggered(e) = event {
triggered_transform_ids.push(e.transform_id);
}
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
let expected_transform = "BlogPostWordIndex";
assert!(
triggered_transform_ids.contains(&expected_transform.to_string()),
"Transform '{}' should be triggered when BlogPost fields are mutated, but it wasn't. Triggered: {:?}",
expected_transform,
triggered_transform_ids
);
assert_eq!(
triggered_transform_ids.len(),
1,
"Should trigger exactly 1 transform (BlogPostWordIndex), but got {}. Triggered: {:?}",
triggered_transform_ids.len(),
triggered_transform_ids
);
let mut executed_transform_ids = Vec::new();
let execution_timeout = Duration::from_millis(1000);
let execution_start = std::time::Instant::now();
while execution_start.elapsed() < execution_timeout {
if let Ok(event) = transform_executed_consumer.try_recv() {
if let Event::TransformExecuted(e) = event {
executed_transform_ids.push(e.transform_id);
}
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
if !executed_transform_ids.is_empty() {
for executed_id in &executed_transform_ids {
assert!(
triggered_transform_ids.contains(executed_id),
"Executed transform '{}' should have been triggered first",
executed_id
);
}
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_partial_mutation_triggers_subset_of_transforms() {
let temp_dir = TempDir::new().expect("Failed to create temp directory");
let test_db_path = temp_dir
.path()
.to_str()
.expect("Failed to convert path to string");
let fold_db = FoldDB::new(test_db_path)
.await
.expect("Failed to create FoldDB");
let blogpost_schema_json = json!({
"name": "BlogPost",
"key": {
"range_field": "publish_date"
},
"fields": {
"title": {},
"content": {},
"author": {},
"publish_date": {},
"tags": {}
}
});
let blogpost_schema_str =
serde_json::to_string(&blogpost_schema_json).expect("Failed to serialize BlogPost schema");
fold_db
.schema_manager()
.load_schema_from_json(&blogpost_schema_str)
.await
.expect("Failed to load BlogPost schema");
let wordindex_schema_json = json!({
"name": "BlogPostWordIndex",
"key": {
"hash_field": "word",
"range_field": "publish_date"
},
"transform_fields": {
"word": "BlogPost.content.split_by_word()",
"publish_date": "BlogPost.publish_date",
"content": "BlogPost.content",
"author": "BlogPost.author",
"title": "BlogPost.title",
"tags": "BlogPost.tags"
}
});
let wordindex_schema_str = serde_json::to_string(&wordindex_schema_json)
.expect("Failed to serialize BlogPostWordIndex schema");
fold_db
.schema_manager()
.load_schema_from_json(&wordindex_schema_str)
.await
.expect("Failed to load BlogPostWordIndex schema");
tokio::time::sleep(Duration::from_millis(50)).await;
use datafold::schema::SchemaState;
fold_db
.schema_manager()
.set_schema_state("BlogPost", SchemaState::Approved)
.await
.expect("Failed to approve BlogPost schema");
fold_db
.schema_manager()
.set_schema_state("BlogPostWordIndex", SchemaState::Approved)
.await
.expect("Failed to approve BlogPostWordIndex schema");
let message_bus = fold_db.message_bus();
let mut transform_triggered_consumer = message_bus.subscribe("TransformTriggered").await;
let mutation_event =
MutationExecuted::new("update_mutation", "BlogPost", 10, vec!["title".to_string()]);
message_bus
.publish_event(Event::MutationExecuted(mutation_event))
.await
.expect("Failed to publish MutationExecuted event");
let mut triggered_transform_ids = Vec::new();
let timeout = Duration::from_millis(500);
let start = std::time::Instant::now();
while start.elapsed() < timeout {
if let Ok(event) = transform_triggered_consumer.try_recv() {
if let Event::TransformTriggered(e) = event {
triggered_transform_ids.push(e.transform_id);
}
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
assert!(
triggered_transform_ids.contains(&"BlogPostWordIndex".to_string()),
"BlogPostWordIndex should be triggered when title field is mutated"
);
assert_eq!(
triggered_transform_ids.len(),
1,
"Should trigger exactly 1 transform for title mutation, but got {}. Triggered: {:?}",
triggered_transform_ids.len(),
triggered_transform_ids
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_content_mutation_triggers_word_transform() {
let temp_dir = TempDir::new().expect("Failed to create temp directory");
let test_db_path = temp_dir
.path()
.to_str()
.expect("Failed to convert path to string");
let fold_db = FoldDB::new(test_db_path)
.await
.expect("Failed to create FoldDB");
let blogpost_schema_json = json!({
"name": "BlogPost",
"key": {
"range_field": "publish_date"
},
"fields": {
"title": {},
"content": {},
"author": {},
"publish_date": {},
"tags": {}
}
});
fold_db
.schema_manager()
.load_schema_from_json(&serde_json::to_string(&blogpost_schema_json).unwrap())
.await
.expect("Failed to load BlogPost schema");
let wordindex_schema_json = json!({
"name": "BlogPostWordIndex",
"key": {
"hash_field": "word",
"range_field": "publish_date"
},
"transform_fields": {
"word": "BlogPost.content.split_by_word()",
"publish_date": "BlogPost.publish_date",
"content": "BlogPost.content",
"author": "BlogPost.author",
"title": "BlogPost.title",
"tags": "BlogPost.tags"
}
});
fold_db
.schema_manager()
.load_schema_from_json(&serde_json::to_string(&wordindex_schema_json).unwrap())
.await
.expect("Failed to load BlogPostWordIndex schema");
tokio::time::sleep(Duration::from_millis(50)).await;
use datafold::schema::SchemaState;
fold_db
.schema_manager()
.set_schema_state("BlogPost", SchemaState::Approved)
.await
.expect("Failed to approve BlogPost schema");
fold_db
.schema_manager()
.set_schema_state("BlogPostWordIndex", SchemaState::Approved)
.await
.expect("Failed to approve BlogPostWordIndex schema");
let message_bus = fold_db.message_bus();
let mut transform_triggered_consumer = message_bus.subscribe("TransformTriggered").await;
let mutation_event = MutationExecuted::new(
"write_mutation",
"BlogPost",
25,
vec!["content".to_string()],
);
message_bus
.publish_event(Event::MutationExecuted(mutation_event))
.await
.expect("Failed to publish MutationExecuted event");
let mut triggered_transform_ids = Vec::new();
let timeout = Duration::from_millis(500);
let start = std::time::Instant::now();
while start.elapsed() < timeout {
if let Ok(event) = transform_triggered_consumer.try_recv() {
if let Event::TransformTriggered(e) = event {
triggered_transform_ids.push(e.transform_id);
}
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
assert!(
triggered_transform_ids.contains(&"BlogPostWordIndex".to_string()),
"BlogPostWordIndex should be triggered when content field is mutated"
);
assert_eq!(
triggered_transform_ids.len(),
1,
"Should trigger exactly 1 transform (BlogPostWordIndex), but got {}. Triggered: {:?}",
triggered_transform_ids.len(),
triggered_transform_ids
);
}