#[cfg(test)]
mod query_joins_tests {
use crate::channels::*;
use crate::config::{QueryConfig, QueryJoinConfig, QueryJoinKeyConfig};
use crate::queries::QueryManager;
use crate::sources::tests::{create_test_mock_source, TestMockSource};
use crate::sources::{convert_json_to_element_value, SourceManager};
use crate::test_helpers::wait_for_component_status;
use drasi_core::middleware::MiddlewareTypeRegistry;
use drasi_core::models::{
Element, ElementMetadata, ElementPropertyMap, ElementReference, SourceChange,
};
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::Duration;
fn create_query_join_config(id: &str, keys: Vec<(String, String)>) -> QueryJoinConfig {
QueryJoinConfig {
id: id.to_string(),
keys: keys
.into_iter()
.map(|(label, property)| QueryJoinKeyConfig { label, property })
.collect(),
}
}
fn create_query_config_with_joins(
id: &str,
query: &str,
sources: Vec<String>,
joins: Vec<QueryJoinConfig>,
) -> QueryConfig {
use crate::config::SourceSubscriptionConfig;
QueryConfig {
id: id.to_string(),
query: query.to_string(),
query_language: crate::config::QueryLanguage::Cypher,
middleware: vec![],
sources: sources
.into_iter()
.map(|source_id| SourceSubscriptionConfig {
nodes: vec![],
relations: vec![],
source_id,
pipeline: vec![],
})
.collect(),
auto_start: false,
joins: Some(joins),
enable_bootstrap: true,
bootstrap_buffer_size: 10000,
priority_queue_capacity: None,
dispatch_buffer_capacity: None,
dispatch_mode: None,
storage_backend: None,
recovery_policy: None,
}
}
fn create_node_with_properties(
source_name: &str,
id: &str,
labels: Vec<String>,
properties: HashMap<&str, serde_json::Value>,
) -> Element {
let reference = ElementReference::new(source_name, id);
let mut property_map = ElementPropertyMap::new();
for (key, value) in properties {
let element_value = convert_json_to_element_value(&value);
property_map.insert(key, element_value);
}
let metadata = ElementMetadata {
reference,
labels: Arc::from(
labels
.into_iter()
.map(|l| Arc::from(l.as_str()))
.collect::<Vec<_>>(),
),
effective_from: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos() as u64,
};
Element::Node {
metadata,
properties: property_map,
}
}
async fn create_test_environment() -> (
Arc<QueryManager>,
Arc<SourceManager>,
Arc<tokio::sync::RwLock<crate::component_graph::ComponentGraph>>,
) {
let log_registry = crate::managers::get_or_init_global_registry();
let (graph, update_rx) = crate::component_graph::ComponentGraph::new("test-instance");
let update_tx = graph.update_sender();
let graph = Arc::new(tokio::sync::RwLock::new(graph));
{
let graph_clone = graph.clone();
tokio::spawn(async move {
let mut rx = update_rx;
while let Some(update) = rx.recv().await {
let mut g = graph_clone.write().await;
g.apply_update(update);
}
});
}
let source_manager = Arc::new(SourceManager::new(
"test-instance",
log_registry.clone(),
graph.clone(),
update_tx.clone(),
));
let index_factory = Arc::new(crate::indexes::IndexFactory::new(vec![], None));
let middleware_registry = Arc::new(MiddlewareTypeRegistry::new());
let query_manager = Arc::new(QueryManager::new(
"test-instance",
source_manager.clone(),
index_factory,
middleware_registry,
log_registry,
graph.clone(),
update_tx,
));
(query_manager, source_manager, graph)
}
async fn add_source(
source_manager: &SourceManager,
graph: &tokio::sync::RwLock<crate::component_graph::ComponentGraph>,
source: impl crate::sources::Source + 'static,
) -> anyhow::Result<()> {
let source_id = source.id().to_string();
let source_type = source.type_name().to_string();
let auto_start = source.auto_start();
{
let mut g = graph.write().await;
let mut metadata = HashMap::new();
metadata.insert("kind".to_string(), source_type);
metadata.insert("autoStart".to_string(), auto_start.to_string());
g.register_source(&source_id, metadata)?;
}
source_manager.provision_source(source).await
}
async fn add_query(
manager: &QueryManager,
graph: &tokio::sync::RwLock<crate::component_graph::ComponentGraph>,
config: QueryConfig,
) -> anyhow::Result<()> {
{
let mut g = graph.write().await;
let source_ids: Vec<String> =
config.sources.iter().map(|s| s.source_id.clone()).collect();
for sid in &source_ids {
if !g.contains(sid) {
g.register_source(sid, HashMap::new())?;
}
}
let mut metadata = HashMap::new();
metadata.insert("query".to_string(), config.query.clone());
g.register_query(&config.id, metadata, &source_ids)?;
}
manager.provision_query(config).await
}
#[tokio::test]
async fn test_basic_join_between_two_sources() {
let (query_manager, source_manager, graph) = create_test_environment().await;
let vehicles_source = create_test_mock_source("vehicles".to_string());
let drivers_source = create_test_mock_source("drivers".to_string());
add_source(&source_manager, &graph, vehicles_source)
.await
.unwrap();
add_source(&source_manager, &graph, drivers_source)
.await
.unwrap();
let join_config = create_query_join_config(
"VEHICLE_TO_DRIVER",
vec![
("Vehicle".to_string(), "licensePlate".to_string()),
("Driver".to_string(), "vehicleLicensePlate".to_string()),
],
);
let query_config = create_query_config_with_joins(
"vehicle-driver-query",
"MATCH (d:Driver)-[:VEHICLE_TO_DRIVER]->(v:Vehicle) WHERE v.status = 'available' RETURN d.name as driver, v.licensePlate as plate",
vec!["vehicles".to_string(), "drivers".to_string()],
vec![join_config],
);
add_query(&query_manager, &graph, query_config)
.await
.unwrap();
let mut event_rx = graph.read().await.subscribe();
query_manager
.start_query("vehicle-driver-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"vehicle-driver-query",
ComponentStatus::Running,
Duration::from_secs(5),
)
.await;
let vehicles_mock = source_manager
.get_source_instance("vehicles")
.await
.expect("vehicles source should exist");
let drivers_mock = source_manager
.get_source_instance("drivers")
.await
.expect("drivers source should exist");
let vehicles_source = vehicles_mock
.as_any()
.downcast_ref::<TestMockSource>()
.expect("Should be TestMockSource");
let drivers_source = drivers_mock
.as_any()
.downcast_ref::<TestMockSource>()
.expect("Should be TestMockSource");
let vehicle1 = create_node_with_properties(
"vehicles",
"v1",
vec!["Vehicle".to_string()],
HashMap::from([
("licensePlate", json!("ABC-123")),
("status", json!("available")),
("model", json!("Toyota Camry")),
]),
);
vehicles_source
.inject_event(SourceChange::Insert { element: vehicle1 })
.await
.unwrap();
let driver1 = create_node_with_properties(
"drivers",
"d1",
vec!["Driver".to_string()],
HashMap::from([
("name", json!("John Doe")),
("vehicleLicensePlate", json!("ABC-123")),
("employeeId", json!("EMP001")),
]),
);
drivers_source
.inject_event(SourceChange::Insert { element: driver1 })
.await
.unwrap();
let status = tokio::time::timeout(Duration::from_secs(5), async {
loop {
let s = query_manager
.get_query_status("vehicle-driver-query".to_string())
.await;
if s.is_ok() {
return s;
}
tokio::task::yield_now().await;
}
})
.await
.expect("Timed out waiting for query status");
assert!(status.is_ok(), "Should be able to get query status");
}
#[tokio::test]
async fn test_dynamic_updates_with_joins() {
let (query_manager, source_manager, graph) = create_test_environment().await;
let orders_source = create_test_mock_source("orders".to_string());
let restaurants_source = create_test_mock_source("restaurants".to_string());
add_source(&source_manager, &graph, orders_source)
.await
.unwrap();
add_source(&source_manager, &graph, restaurants_source)
.await
.unwrap();
let join_config = create_query_join_config(
"ORDER_TO_RESTAURANT",
vec![
("Order".to_string(), "restaurantId".to_string()),
("Restaurant".to_string(), "id".to_string()),
],
);
let query_config = create_query_config_with_joins(
"order-restaurant-query",
"MATCH (o:Order)-[:ORDER_TO_RESTAURANT]->(r:Restaurant) RETURN o.orderId as orderId, r.name as restaurant",
vec!["orders".to_string(), "restaurants".to_string()],
vec![join_config],
);
add_query(&query_manager, &graph, query_config)
.await
.unwrap();
let mut event_rx = graph.read().await.subscribe();
query_manager
.start_query("order-restaurant-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"order-restaurant-query",
ComponentStatus::Running,
Duration::from_secs(5),
)
.await;
let restaurants_mock = source_manager
.get_source_instance("restaurants")
.await
.expect("restaurants source");
let orders_mock = source_manager
.get_source_instance("orders")
.await
.expect("orders source");
let restaurants_source = restaurants_mock
.as_any()
.downcast_ref::<TestMockSource>()
.expect("MockSource");
let orders_source = orders_mock
.as_any()
.downcast_ref::<TestMockSource>()
.expect("MockSource");
let restaurant1 = create_node_with_properties(
"restaurants",
"r1",
vec!["Restaurant".to_string()],
HashMap::from([("id", json!("REST001")), ("name", json!("Pizza Palace"))]),
);
restaurants_source
.inject_event(SourceChange::Insert {
element: restaurant1,
})
.await
.unwrap();
let order1 = create_node_with_properties(
"orders",
"o1",
vec!["Order".to_string()],
HashMap::from([
("orderId", json!("ORD001")),
("restaurantId", json!("REST001")),
("status", json!("pending")),
]),
);
orders_source
.inject_event(SourceChange::Insert {
element: order1.clone(),
})
.await
.unwrap();
tokio::time::timeout(Duration::from_secs(5), async {
loop {
if query_manager
.get_query_status("order-restaurant-query".to_string())
.await
.is_ok()
{
return;
}
tokio::task::yield_now().await;
}
})
.await
.expect("Timed out waiting for query to process initial data");
let updated_order = create_node_with_properties(
"orders",
"o1",
vec!["Order".to_string()],
HashMap::from([
("orderId", json!("ORD001-UPDATED")),
("restaurantId", json!("REST001")),
("status", json!("completed")),
]),
);
orders_source
.inject_event(SourceChange::Update {
element: updated_order,
})
.await
.unwrap();
tokio::time::timeout(Duration::from_secs(5), async {
loop {
if query_manager
.get_query_status("order-restaurant-query".to_string())
.await
.is_ok()
{
return;
}
tokio::task::yield_now().await;
}
})
.await
.expect("Timed out waiting for query to process update");
let metadata = match order1 {
Element::Node { metadata, .. } => metadata,
_ => panic!("Expected node element"),
};
orders_source
.inject_event(SourceChange::Delete { metadata })
.await
.unwrap();
let status = tokio::time::timeout(Duration::from_secs(5), async {
loop {
let s = query_manager
.get_query_status("order-restaurant-query".to_string())
.await;
if s.is_ok() {
return s;
}
tokio::task::yield_now().await;
}
})
.await
.expect("Timed out waiting for query status");
assert!(status.is_ok(), "Query should still be running");
}
#[tokio::test]
async fn test_multiple_joins_in_single_query() {
let (query_manager, source_manager, graph) = create_test_environment().await;
let orders_source = create_test_mock_source("orders".to_string());
let drivers_source = create_test_mock_source("drivers".to_string());
let restaurants_source = create_test_mock_source("restaurants".to_string());
add_source(&source_manager, &graph, orders_source)
.await
.unwrap();
add_source(&source_manager, &graph, drivers_source)
.await
.unwrap();
add_source(&source_manager, &graph, restaurants_source)
.await
.unwrap();
let restaurant_join = create_query_join_config(
"ORDER_TO_RESTAURANT",
vec![
("Order".to_string(), "restaurantId".to_string()),
("Restaurant".to_string(), "id".to_string()),
],
);
let driver_join = create_query_join_config(
"ORDER_TO_DRIVER",
vec![
("Order".to_string(), "driverId".to_string()),
("Driver".to_string(), "id".to_string()),
],
);
let query_config = create_query_config_with_joins(
"full-order-query",
"MATCH (o:Order)-[:ORDER_TO_RESTAURANT]->(r:Restaurant), (o)-[:ORDER_TO_DRIVER]->(d:Driver) RETURN o.orderId, r.name, d.name",
vec!["orders".to_string(), "drivers".to_string(), "restaurants".to_string()],
vec![restaurant_join, driver_join],
);
add_query(&query_manager, &graph, query_config)
.await
.unwrap();
let mut event_rx = graph.read().await.subscribe();
query_manager
.start_query("full-order-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"full-order-query",
ComponentStatus::Running,
Duration::from_secs(5),
)
.await;
let orders_mock = source_manager
.get_source_instance("orders")
.await
.expect("orders source");
let drivers_mock = source_manager
.get_source_instance("drivers")
.await
.expect("drivers source");
let restaurants_mock = source_manager
.get_source_instance("restaurants")
.await
.expect("restaurants source");
let orders_source = orders_mock
.as_any()
.downcast_ref::<TestMockSource>()
.expect("MockSource");
let drivers_source = drivers_mock
.as_any()
.downcast_ref::<TestMockSource>()
.expect("MockSource");
let restaurants_source = restaurants_mock
.as_any()
.downcast_ref::<TestMockSource>()
.expect("MockSource");
let restaurant = create_node_with_properties(
"restaurants",
"r1",
vec!["Restaurant".to_string()],
HashMap::from([("id", json!("REST001")), ("name", json!("Burger Barn"))]),
);
restaurants_source
.inject_event(SourceChange::Insert {
element: restaurant,
})
.await
.unwrap();
let driver = create_node_with_properties(
"drivers",
"d1",
vec!["Driver".to_string()],
HashMap::from([("id", json!("DRV001")), ("name", json!("Alice Smith"))]),
);
drivers_source
.inject_event(SourceChange::Insert { element: driver })
.await
.unwrap();
let order = create_node_with_properties(
"orders",
"o1",
vec!["Order".to_string()],
HashMap::from([
("orderId", json!("ORD001")),
("restaurantId", json!("REST001")),
("driverId", json!("DRV001")),
]),
);
orders_source
.inject_event(SourceChange::Insert { element: order })
.await
.unwrap();
let status = tokio::time::timeout(Duration::from_secs(5), async {
loop {
let s = query_manager
.get_query_status("full-order-query".to_string())
.await;
if s.is_ok() {
return s;
}
tokio::task::yield_now().await;
}
})
.await
.expect("Timed out waiting for query status");
assert!(
status.is_ok(),
"Query with multiple joins should be running"
);
}
#[tokio::test]
async fn test_join_with_non_matching_properties() {
let (query_manager, source_manager, graph) = create_test_environment().await;
let source1 = create_test_mock_source("source1".to_string());
let source2 = create_test_mock_source("source2".to_string());
add_source(&source_manager, &graph, source1).await.unwrap();
add_source(&source_manager, &graph, source2).await.unwrap();
let join_config = create_query_join_config(
"TEST_JOIN",
vec![
("NodeA".to_string(), "linkId".to_string()),
("NodeB".to_string(), "linkId".to_string()),
],
);
let query_config = create_query_config_with_joins(
"non-matching-query",
"MATCH (a:NodeA)-[:TEST_JOIN]->(b:NodeB) RETURN a, b",
vec!["source1".to_string(), "source2".to_string()],
vec![join_config],
);
add_query(&query_manager, &graph, query_config)
.await
.unwrap();
let mut event_rx = graph.read().await.subscribe();
query_manager
.start_query("non-matching-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"non-matching-query",
ComponentStatus::Running,
Duration::from_secs(5),
)
.await;
let source1_mock = source_manager
.get_source_instance("source1")
.await
.expect("source1");
let source2_mock = source_manager
.get_source_instance("source2")
.await
.expect("source2");
let source1_source = source1_mock
.as_any()
.downcast_ref::<TestMockSource>()
.expect("MockSource");
let source2_source = source2_mock
.as_any()
.downcast_ref::<TestMockSource>()
.expect("MockSource");
let node_a = create_node_with_properties(
"source1",
"a1",
vec!["NodeA".to_string()],
HashMap::from([("linkId", json!("LINK001"))]),
);
source1_source
.inject_event(SourceChange::Insert { element: node_a })
.await
.unwrap();
let node_b = create_node_with_properties(
"source2",
"b1",
vec!["NodeB".to_string()],
HashMap::from([("linkId", json!("LINK999"))]), );
source2_source
.inject_event(SourceChange::Insert { element: node_b })
.await
.unwrap();
let status = tokio::time::timeout(Duration::from_secs(5), async {
loop {
let s = query_manager
.get_query_status("non-matching-query".to_string())
.await;
if s.is_ok() {
return s;
}
tokio::task::yield_now().await;
}
})
.await
.expect("Timed out waiting for query status");
assert!(status.is_ok(), "Query should handle non-matching joins");
}
#[tokio::test]
async fn test_join_with_null_properties() {
let (query_manager, source_manager, graph) = create_test_environment().await;
let source1 = create_test_mock_source("source1".to_string());
let source2 = create_test_mock_source("source2".to_string());
add_source(&source_manager, &graph, source1).await.unwrap();
add_source(&source_manager, &graph, source2).await.unwrap();
let join_config = create_query_join_config(
"NULL_TEST_JOIN",
vec![
("NodeA".to_string(), "optionalId".to_string()),
("NodeB".to_string(), "optionalId".to_string()),
],
);
let query_config = create_query_config_with_joins(
"null-property-query",
"MATCH (a:NodeA)-[:NULL_TEST_JOIN]->(b:NodeB) RETURN a, b",
vec!["source1".to_string(), "source2".to_string()],
vec![join_config],
);
add_query(&query_manager, &graph, query_config)
.await
.unwrap();
let mut event_rx = graph.read().await.subscribe();
query_manager
.start_query("null-property-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"null-property-query",
ComponentStatus::Running,
Duration::from_secs(5),
)
.await;
let source1_mock = source_manager
.get_source_instance("source1")
.await
.expect("source1");
let source2_mock = source_manager
.get_source_instance("source2")
.await
.expect("source2");
let source1_source = source1_mock
.as_any()
.downcast_ref::<TestMockSource>()
.expect("MockSource");
let source2_source = source2_mock
.as_any()
.downcast_ref::<TestMockSource>()
.expect("MockSource");
let node_a = create_node_with_properties(
"source1",
"a1",
vec!["NodeA".to_string()],
HashMap::from([("otherprop", json!("value"))]), );
source1_source
.inject_event(SourceChange::Insert { element: node_a })
.await
.unwrap();
let node_b = create_node_with_properties(
"source2",
"b1",
vec!["NodeB".to_string()],
HashMap::from([("optionalId", json!(null))]), );
source2_source
.inject_event(SourceChange::Insert { element: node_b })
.await
.unwrap();
let status = tokio::time::timeout(Duration::from_secs(5), async {
loop {
let s = query_manager
.get_query_status("null-property-query".to_string())
.await;
if s.is_ok() {
return s;
}
tokio::task::yield_now().await;
}
})
.await
.expect("Timed out waiting for query status");
assert!(status.is_ok(), "Query should handle null properties");
}
#[tokio::test]
async fn test_join_with_duplicate_keys() {
let (query_manager, source_manager, graph) = create_test_environment().await;
let products_source = create_test_mock_source("products".to_string());
let categories_source = create_test_mock_source("categories".to_string());
add_source(&source_manager, &graph, products_source)
.await
.unwrap();
add_source(&source_manager, &graph, categories_source)
.await
.unwrap();
let join_config = create_query_join_config(
"PRODUCT_CATEGORY",
vec![
("Product".to_string(), "categoryId".to_string()),
("Category".to_string(), "id".to_string()),
],
);
let query_config = create_query_config_with_joins(
"product-category-query",
"MATCH (p:Product)-[:PRODUCT_CATEGORY]->(c:Category) RETURN p.name, c.name",
vec!["products".to_string(), "categories".to_string()],
vec![join_config],
);
add_query(&query_manager, &graph, query_config)
.await
.unwrap();
let mut event_rx = graph.read().await.subscribe();
query_manager
.start_query("product-category-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"product-category-query",
ComponentStatus::Running,
Duration::from_secs(5),
)
.await;
let products_mock = source_manager
.get_source_instance("products")
.await
.expect("products");
let categories_mock = source_manager
.get_source_instance("categories")
.await
.expect("categories");
let products_source = products_mock
.as_any()
.downcast_ref::<TestMockSource>()
.expect("MockSource");
let categories_source = categories_mock
.as_any()
.downcast_ref::<TestMockSource>()
.expect("MockSource");
let category = create_node_with_properties(
"categories",
"cat1",
vec!["Category".to_string()],
HashMap::from([("id", json!("CAT001")), ("name", json!("Electronics"))]),
);
categories_source
.inject_event(SourceChange::Insert { element: category })
.await
.unwrap();
for i in 1..=3 {
let product = create_node_with_properties(
"products",
&format!("prod{i}"),
vec!["Product".to_string()],
HashMap::from([
("name", json!(format!("Product {}", i))),
("categoryId", json!("CAT001")), ]),
);
products_source
.inject_event(SourceChange::Insert { element: product })
.await
.unwrap();
}
let status = tokio::time::timeout(Duration::from_secs(5), async {
loop {
let s = query_manager
.get_query_status("product-category-query".to_string())
.await;
if s.is_ok() {
return s;
}
tokio::task::yield_now().await;
}
})
.await
.expect("Timed out waiting for query status");
assert!(status.is_ok(), "Query should handle duplicate keys");
}
#[tokio::test]
async fn test_bootstrap_with_joins() {
let (query_manager, source_manager, graph) = create_test_environment().await;
let users_source = create_test_mock_source("users".to_string());
let posts_source = create_test_mock_source("posts".to_string());
add_source(&source_manager, &graph, users_source)
.await
.unwrap();
add_source(&source_manager, &graph, posts_source)
.await
.unwrap();
let join_config = create_query_join_config(
"AUTHORED_BY",
vec![
("Post".to_string(), "authorId".to_string()),
("User".to_string(), "userId".to_string()),
],
);
let query_config = create_query_config_with_joins(
"user-posts-query",
"MATCH (p:Post)-[:AUTHORED_BY]->(u:User) RETURN p.title, u.name",
vec!["users".to_string(), "posts".to_string()],
vec![join_config],
);
add_query(&query_manager, &graph, query_config)
.await
.unwrap();
let mut event_rx = graph.read().await.subscribe();
query_manager
.start_query("user-posts-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"user-posts-query",
ComponentStatus::Running,
Duration::from_secs(5),
)
.await;
let users_mock = source_manager
.get_source_instance("users")
.await
.expect("users");
let posts_mock = source_manager
.get_source_instance("posts")
.await
.expect("posts");
let users_source = users_mock
.as_any()
.downcast_ref::<TestMockSource>()
.expect("MockSource");
let posts_source = posts_mock
.as_any()
.downcast_ref::<TestMockSource>()
.expect("MockSource");
let user1 = create_node_with_properties(
"users",
"u1",
vec!["User".to_string()],
HashMap::from([("userId", json!("USER001")), ("name", json!("Bob"))]),
);
users_source
.inject_event(SourceChange::Insert { element: user1 })
.await
.unwrap();
let post1 = create_node_with_properties(
"posts",
"p1",
vec!["Post".to_string()],
HashMap::from([
("postId", json!("POST001")),
("authorId", json!("USER001")),
("title", json!("First Post")),
]),
);
posts_source
.inject_event(SourceChange::Insert { element: post1 })
.await
.unwrap();
tokio::time::timeout(Duration::from_secs(5), async {
loop {
if query_manager
.get_query_status("user-posts-query".to_string())
.await
.is_ok()
{
return;
}
tokio::task::yield_now().await;
}
})
.await
.expect("Timed out waiting for query to process initial data");
let post2 = create_node_with_properties(
"posts",
"p2",
vec!["Post".to_string()],
HashMap::from([
("postId", json!("POST002")),
("authorId", json!("USER001")),
("title", json!("Second Post")),
]),
);
posts_source
.inject_event(SourceChange::Insert { element: post2 })
.await
.unwrap();
let status = tokio::time::timeout(Duration::from_secs(5), async {
loop {
let s = query_manager
.get_query_status("user-posts-query".to_string())
.await;
if s.is_ok() {
return s;
}
tokio::task::yield_now().await;
}
})
.await
.expect("Timed out waiting for query status");
assert!(status.is_ok(), "Query should handle bootstrap with joins");
}
}