#[cfg(test)]
mod manager_tests {
use super::super::*;
use crate::channels::*;
use crate::config::{QueryConfig, QueryLanguage, SourceSubscriptionConfig};
use crate::sources::tests::{create_test_bootstrap_mock_source, create_test_mock_source};
use crate::sources::SourceManager;
use crate::test_helpers::wait_for_component_status;
use drasi_core::middleware::MiddlewareTypeRegistry;
use std::sync::Arc;
fn create_test_query_config(id: &str, sources: Vec<String>) -> QueryConfig {
create_test_query_config_with_auto_start(id, sources, true)
}
fn create_test_query_config_with_auto_start(
id: &str,
sources: Vec<String>,
auto_start: bool,
) -> QueryConfig {
QueryConfig {
id: id.to_string(),
query: "MATCH (n) RETURN n".to_string(),
query_language: QueryLanguage::Cypher,
middleware: vec![],
sources: sources
.into_iter()
.map(|source_id| SourceSubscriptionConfig {
nodes: vec![],
relations: vec![],
source_id,
pipeline: vec![],
})
.collect(),
auto_start,
joins: None,
enable_bootstrap: true,
bootstrap_buffer_size: 10000,
priority_queue_capacity: None,
dispatch_buffer_capacity: None,
dispatch_mode: None,
storage_backend: None,
recovery_policy: None,
}
}
fn create_test_gql_query_config(id: &str, sources: Vec<String>) -> QueryConfig {
QueryConfig {
id: id.to_string(),
query: "MATCH (n:Person) RETURN n.name".to_string(),
query_language: QueryLanguage::GQL,
middleware: vec![],
sources: sources
.into_iter()
.map(|source_id| SourceSubscriptionConfig {
nodes: vec![],
relations: vec![],
source_id,
pipeline: vec![],
})
.collect(),
auto_start: true,
joins: None,
enable_bootstrap: true,
bootstrap_buffer_size: 10000,
priority_queue_capacity: None,
dispatch_buffer_capacity: None,
dispatch_mode: None,
storage_backend: None,
recovery_policy: None,
}
}
async fn create_test_manager() -> (
Arc<QueryManager>,
Arc<SourceManager>,
Arc<tokio::sync::RwLock<crate::component_graph::ComponentGraph>>,
) {
let log_registry = crate::managers::get_or_init_global_registry();
let (graph, update_rx) = crate::component_graph::ComponentGraph::new("test-instance");
let update_tx = graph.update_sender();
let graph = Arc::new(tokio::sync::RwLock::new(graph));
{
let graph_clone = graph.clone();
tokio::spawn(async move {
let mut rx = update_rx;
while let Some(update) = rx.recv().await {
let mut g = graph_clone.write().await;
g.apply_update(update);
}
});
}
let source_manager = Arc::new(SourceManager::new(
"test-instance",
log_registry.clone(),
graph.clone(),
update_tx.clone(),
));
let index_factory = Arc::new(crate::indexes::IndexFactory::new(vec![], None));
let middleware_registry = Arc::new(MiddlewareTypeRegistry::new());
let query_manager = Arc::new(QueryManager::new(
"test-instance",
source_manager.clone(),
index_factory,
middleware_registry,
log_registry,
graph.clone(),
update_tx,
));
(query_manager, source_manager, graph)
}
async fn create_test_manager_with_graph() -> (
Arc<QueryManager>,
Arc<SourceManager>,
Arc<tokio::sync::RwLock<crate::component_graph::ComponentGraph>>,
) {
create_test_manager().await
}
async fn add_source(
source_manager: &SourceManager,
graph: &tokio::sync::RwLock<crate::component_graph::ComponentGraph>,
source: impl crate::sources::Source + 'static,
) -> anyhow::Result<()> {
let source_id = source.id().to_string();
let source_type = source.type_name().to_string();
let auto_start = source.auto_start();
{
let mut g = graph.write().await;
let mut metadata = std::collections::HashMap::new();
metadata.insert("kind".to_string(), source_type);
metadata.insert("autoStart".to_string(), auto_start.to_string());
g.register_source(&source_id, metadata)?;
}
source_manager.provision_source(source).await
}
async fn add_query(
manager: &QueryManager,
graph: &tokio::sync::RwLock<crate::component_graph::ComponentGraph>,
config: QueryConfig,
) -> anyhow::Result<()> {
{
let mut g = graph.write().await;
let source_ids: Vec<String> =
config.sources.iter().map(|s| s.source_id.clone()).collect();
for sid in &source_ids {
if !g.contains(sid) {
g.register_source(sid, std::collections::HashMap::new())?;
}
}
let mut metadata = std::collections::HashMap::new();
metadata.insert("query".to_string(), config.query.clone());
g.register_query(&config.id, metadata, &source_ids)?;
}
manager.provision_query(config).await
}
async fn delete_query(
manager: &QueryManager,
graph: &tokio::sync::RwLock<crate::component_graph::ComponentGraph>,
id: &str,
) -> anyhow::Result<()> {
manager.teardown_query(id.to_string()).await?;
let mut g = graph.write().await;
g.deregister(id)?;
Ok(())
}
#[tokio::test]
async fn test_add_query() {
let (manager, source_manager, graph) = create_test_manager().await;
let config = create_test_query_config("test-query", vec!["source1".to_string()]);
let result = add_query(&manager, &graph, config.clone()).await;
assert!(result.is_ok());
let queries = manager.list_queries().await;
assert_eq!(queries.len(), 1);
assert_eq!(queries[0].0, "test-query");
}
#[tokio::test]
async fn test_add_duplicate_query() {
let (manager, source_manager, graph) = create_test_manager().await;
let config = create_test_query_config("test-query", vec![]);
assert!(add_query(&manager, &graph, config.clone()).await.is_ok());
let result = add_query(&manager, &graph, config).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("already exists"));
}
#[tokio::test]
async fn test_delete_query() {
let (manager, source_manager, graph) = create_test_manager().await;
let config = create_test_query_config("test-query", vec![]);
add_query(&manager, &graph, config).await.unwrap();
let result = delete_query(&manager, &graph, "test-query").await;
assert!(result.is_ok());
let queries = manager.list_queries().await;
assert_eq!(queries.len(), 0);
}
#[tokio::test]
async fn test_start_query() {
let (manager, source_manager, graph) = create_test_manager_with_graph().await;
let mut event_rx = graph.read().await.subscribe();
let source = create_test_mock_source("source1".to_string());
add_source(&source_manager, &graph, source).await.unwrap();
let config = create_test_query_config("test-query", vec!["source1".to_string()]);
add_query(&manager, &graph, config).await.unwrap();
let result = manager.start_query("test-query".to_string()).await;
assert!(result.is_ok());
tokio::time::timeout(std::time::Duration::from_secs(1), async {
while let Ok(event) = event_rx.recv().await {
if event.component_id == "test-query" {
if event
.message
.as_deref()
.is_some_and(|m| m.ends_with("added"))
{
continue;
}
assert!(
matches!(event.status, ComponentStatus::Starting)
|| matches!(event.status, ComponentStatus::Running)
);
break;
}
}
})
.await
.expect("Timeout waiting for status event");
}
#[tokio::test]
async fn test_stop_query() {
let (manager, source_manager, graph) = create_test_manager().await;
let mut event_rx = graph.read().await.subscribe();
let source = create_test_mock_source("source1".to_string());
add_source(&source_manager, &graph, source).await.unwrap();
let config = create_test_query_config("test-query", vec!["source1".to_string()]);
add_query(&manager, &graph, config).await.unwrap();
manager.start_query("test-query".to_string()).await.unwrap();
wait_for_component_status(
&mut event_rx,
"test-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let result = manager.stop_query("test-query".to_string()).await;
assert!(result.is_ok());
tokio::time::timeout(std::time::Duration::from_secs(1), async {
while let Ok(event) = event_rx.recv().await {
if event.component_id == "test-query"
&& matches!(event.status, ComponentStatus::Stopped)
{
break;
}
}
})
.await
.expect("Timeout waiting for stop event");
}
#[tokio::test]
async fn test_stop_query_cancels_subscription_tasks() {
let (manager, source_manager, graph) = create_test_manager().await;
let source = create_test_mock_source("source1".to_string());
add_source(&source_manager, &graph, source).await.unwrap();
let mut event_rx = graph.read().await.subscribe();
let config = create_test_query_config("test-query", vec!["source1".to_string()]);
add_query(&manager, &graph, config).await.unwrap();
manager.start_query("test-query".to_string()).await.unwrap();
wait_for_component_status(
&mut event_rx,
"test-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let query = manager.get_query_instance("test-query").await.unwrap();
let concrete = query.as_any().downcast_ref::<DrasiQuery>().unwrap();
assert!(
concrete.subscription_task_count().await > 0,
"Expected active subscription task"
);
manager.stop_query("test-query".to_string()).await.unwrap();
wait_for_component_status(
&mut event_rx,
"test-query",
ComponentStatus::Stopped,
std::time::Duration::from_secs(5),
)
.await;
assert!(
concrete.subscription_task_count().await == 0,
"Subscription tasks should be cleared on stop"
);
}
#[tokio::test]
async fn test_partial_subscription_failure_cleans_up_tasks() {
let (manager, source_manager, graph) = create_test_manager().await;
let mut event_rx = graph.read().await.subscribe();
let source1 = create_test_mock_source("source1".to_string());
add_source(&source_manager, &graph, source1).await.unwrap();
let config = create_test_query_config(
"test-query",
vec!["source1".to_string(), "source2".to_string()],
);
add_query(&manager, &graph, config).await.unwrap();
let result = manager.start_query("test-query".to_string()).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("source2"));
wait_for_component_status(
&mut event_rx,
"test-query",
ComponentStatus::Error,
std::time::Duration::from_secs(5),
)
.await;
let status = manager
.get_query_status("test-query".to_string())
.await
.unwrap();
assert!(
matches!(status, ComponentStatus::Error),
"Expected Error status after failed start"
);
let query = manager.get_query_instance("test-query").await.unwrap();
let concrete = query.as_any().downcast_ref::<DrasiQuery>().unwrap();
assert_eq!(
concrete.subscription_task_count().await,
0,
"Subscription tasks should be cleaned up after partial failure"
);
}
#[tokio::test]
async fn test_add_gql_query() {
let (manager, source_manager, graph) = create_test_manager().await;
let config = create_test_gql_query_config("test-gql-query", vec!["source1".to_string()]);
let result = add_query(&manager, &graph, config.clone()).await;
assert!(result.is_ok());
let queries = manager.list_queries().await;
assert_eq!(queries.len(), 1);
assert_eq!(queries[0].0, "test-gql-query");
}
#[tokio::test]
async fn test_start_gql_query() {
let (manager, source_manager, graph) = create_test_manager_with_graph().await;
let mut event_rx = graph.read().await.subscribe();
let source = create_test_mock_source("source1".to_string());
add_source(&source_manager, &graph, source).await.unwrap();
let config = create_test_gql_query_config("test-gql-query", vec!["source1".to_string()]);
add_query(&manager, &graph, config).await.unwrap();
let result = manager.start_query("test-gql-query".to_string()).await;
assert!(result.is_ok());
tokio::time::timeout(std::time::Duration::from_secs(1), async {
while let Ok(event) = event_rx.recv().await {
if event.component_id == "test-gql-query" {
if event
.message
.as_deref()
.is_some_and(|m| m.ends_with("added"))
{
continue;
}
assert!(
matches!(event.status, ComponentStatus::Starting)
|| matches!(event.status, ComponentStatus::Running)
);
break;
}
}
})
.await
.expect("Timeout waiting for status event");
}
#[tokio::test]
async fn test_mixed_language_queries() {
let (manager, source_manager, graph) = create_test_manager().await;
let cypher_config = create_test_query_config("cypher-query", vec!["source1".to_string()]);
assert!(add_query(&manager, &graph, cypher_config).await.is_ok());
let gql_config = create_test_gql_query_config("gql-query", vec!["source1".to_string()]);
assert!(add_query(&manager, &graph, gql_config).await.is_ok());
let queries = manager.list_queries().await;
assert_eq!(queries.len(), 2);
let query_ids: Vec<String> = queries.iter().map(|(id, _)| id.clone()).collect();
assert!(query_ids.contains(&"cypher-query".to_string()));
assert!(query_ids.contains(&"gql-query".to_string()));
}
#[tokio::test]
async fn test_get_query_config() {
let (manager, source_manager, graph) = create_test_manager().await;
let config = create_test_query_config("test-query", vec!["source1".to_string()]);
add_query(&manager, &graph, config.clone()).await.unwrap();
let retrieved = manager.get_query_config("test-query").await;
assert!(retrieved.is_some());
let retrieved = retrieved.unwrap();
assert_eq!(retrieved.id, config.id);
assert_eq!(retrieved.query, config.query);
assert_eq!(retrieved.sources.len(), config.sources.len());
}
#[tokio::test]
async fn test_update_query() {
let (manager, source_manager, graph) = create_test_manager().await;
let mut config = create_test_query_config("test-query", vec![]);
add_query(&manager, &graph, config.clone()).await.unwrap();
config.query = "MATCH (n:Updated) RETURN n".to_string();
manager
.teardown_query("test-query".to_string())
.await
.unwrap();
{
let mut g = graph.write().await;
let _ = g.deregister("test-query");
}
{
let mut g = graph.write().await;
let mut metadata = std::collections::HashMap::new();
metadata.insert("query".to_string(), config.query.clone());
let source_ids: Vec<String> =
config.sources.iter().map(|s| s.source_id.clone()).collect();
g.register_query(&config.id, metadata, &source_ids).unwrap();
}
manager.provision_query(config.clone()).await.unwrap();
let retrieved = manager.get_query_config("test-query").await.unwrap();
assert_eq!(retrieved.query, config.query);
}
#[tokio::test]
async fn test_query_lifecycle() {
let (manager, source_manager, graph) = create_test_manager().await;
let source = create_test_mock_source("source1".to_string());
add_source(&source_manager, &graph, source).await.unwrap();
let query_config = create_test_query_config("test-query", vec!["source1".to_string()]);
add_query(&manager, &graph, query_config).await.unwrap();
let status = manager
.get_query_status("test-query".to_string())
.await
.unwrap();
assert!(matches!(status, ComponentStatus::Added));
let mut event_rx = graph.read().await.subscribe();
manager.start_query("test-query".to_string()).await.unwrap();
wait_for_component_status(
&mut event_rx,
"test-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let status = manager
.get_query_status("test-query".to_string())
.await
.unwrap();
assert!(matches!(status, ComponentStatus::Running));
manager.stop_query("test-query".to_string()).await.unwrap();
wait_for_component_status(
&mut event_rx,
"test-query",
ComponentStatus::Stopped,
std::time::Duration::from_secs(5),
)
.await;
let status = manager
.get_query_status("test-query".to_string())
.await
.unwrap();
assert!(matches!(status, ComponentStatus::Stopped));
}
#[tokio::test]
async fn test_query_auto_start_defaults_to_true() {
let config = create_test_query_config("default-query", vec![]);
assert!(config.auto_start, "Default auto_start should be true");
}
#[tokio::test]
async fn test_query_auto_start_false_not_started_on_add() {
let (manager, source_manager, graph) = create_test_manager().await;
let config = create_test_query_config_with_auto_start("no-auto-start-query", vec![], false);
add_query(&manager, &graph, config).await.unwrap();
let status = manager
.get_query_status("no-auto-start-query".to_string())
.await
.unwrap();
assert!(
matches!(status, ComponentStatus::Added),
"Query with auto_start=false should remain in Added state after add"
);
}
#[tokio::test]
async fn test_query_auto_start_false_can_be_manually_started() {
let (manager, source_manager, graph) = create_test_manager().await;
let source = create_test_mock_source("source1".to_string());
add_source(&source_manager, &graph, source).await.unwrap();
let config = create_test_query_config_with_auto_start(
"manual-query",
vec!["source1".to_string()],
false,
);
add_query(&manager, &graph, config).await.unwrap();
let status = manager
.get_query_status("manual-query".to_string())
.await
.unwrap();
assert!(
matches!(status, ComponentStatus::Added),
"Query with auto_start=false should be in Added state initially"
);
let mut event_rx = graph.read().await.subscribe();
manager
.start_query("manual-query".to_string())
.await
.unwrap();
wait_for_component_status(
&mut event_rx,
"manual-query",
ComponentStatus::Running,
std::time::Duration::from_secs(5),
)
.await;
let status = manager
.get_query_status("manual-query".to_string())
.await
.unwrap();
assert!(
matches!(status, ComponentStatus::Running),
"Query with auto_start=false should be manually startable"
);
}
#[tokio::test]
async fn test_query_config_preserves_auto_start() {
let (manager, source_manager, graph) = create_test_manager().await;
let config = create_test_query_config_with_auto_start("test-query", vec![], false);
add_query(&manager, &graph, config).await.unwrap();
let retrieved = manager.get_query_config("test-query").await.unwrap();
assert!(
!retrieved.auto_start,
"Retrieved config should preserve auto_start=false"
);
let config2 = create_test_query_config_with_auto_start("test-query-2", vec![], true);
add_query(&manager, &graph, config2).await.unwrap();
let retrieved2 = manager.get_query_config("test-query-2").await.unwrap();
assert!(
retrieved2.auto_start,
"Retrieved config should preserve auto_start=true"
);
}
#[tokio::test]
async fn test_delete_query_cleans_up_event_history() {
let (manager, source_manager, graph) = create_test_manager().await;
let config = create_test_query_config("cleanup-events-query", vec![]);
add_query(&manager, &graph, config).await.unwrap();
manager
.record_event(ComponentEvent {
component_id: "cleanup-events-query".to_string(),
component_type: ComponentType::Query,
status: ComponentStatus::Running,
timestamp: chrono::Utc::now(),
message: Some("Test event".to_string()),
})
.await;
let events = manager.get_query_events("cleanup-events-query").await;
assert!(!events.is_empty(), "Expected events after recording");
delete_query(&manager, &graph, "cleanup-events-query")
.await
.unwrap();
let events_after = manager.get_query_events("cleanup-events-query").await;
assert!(events_after.is_empty(), "Expected no events after deletion");
}
#[tokio::test]
async fn test_bootstrap_gate_delays_running_status() {
let (manager, source_manager, graph) = create_test_manager_with_graph().await;
let mut event_rx = graph.read().await.subscribe();
let (bootstrap_tx, bootstrap_rx) = tokio::sync::mpsc::channel::<BootstrapEvent>(100);
let source = create_test_bootstrap_mock_source("bs-source".to_string(), bootstrap_rx);
add_source(&source_manager, &graph, source).await.unwrap();
let config = create_test_query_config("bs-query", vec!["bs-source".to_string()]);
add_query(&manager, &graph, config).await.unwrap();
manager.start_query("bs-query".to_string()).await.unwrap();
tokio::time::timeout(std::time::Duration::from_secs(1), async {
while let Ok(event) = event_rx.recv().await {
if event.component_id == "bs-query"
&& matches!(event.status, ComponentStatus::Starting)
{
break;
}
}
})
.await
.expect("Timeout waiting for Starting event");
let status = manager
.get_query_status("bs-query".to_string())
.await
.unwrap();
assert!(
matches!(status, ComponentStatus::Starting),
"Expected Starting while bootstrap is in progress, got {status:?}"
);
drop(bootstrap_tx);
tokio::time::timeout(std::time::Duration::from_secs(2), async {
while let Ok(event) = event_rx.recv().await {
if event.component_id == "bs-query"
&& matches!(event.status, ComponentStatus::Running)
{
return;
}
}
})
.await
.expect("Timeout waiting for Running event after bootstrap completion");
let status = manager
.get_query_status("bs-query".to_string())
.await
.unwrap();
assert!(
matches!(status, ComponentStatus::Running),
"Expected Running after bootstrap completed, got {status:?}"
);
}
#[tokio::test]
async fn test_delete_query_cleans_up_log_history() {
let (manager, source_manager, graph) = create_test_manager().await;
let config = create_test_query_config("cleanup-logs-query", vec![]);
add_query(&manager, &graph, config).await.unwrap();
let result = manager.subscribe_logs("cleanup-logs-query").await;
assert!(result.is_some(), "Expected to subscribe to query logs");
delete_query(&manager, &graph, "cleanup-logs-query")
.await
.unwrap();
let result = manager.subscribe_logs("cleanup-logs-query").await;
assert!(result.is_none(), "Expected None for deleted query logs");
}
}
#[cfg(test)]
mod query_core_tests {
use crate::config::QueryConfig;
#[tokio::test]
async fn test_query_syntax_validation() {
let valid_queries = vec![
"MATCH (n) RETURN n",
"MATCH (n:Person) WHERE n.age > 21 RETURN n.name",
"MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b",
];
for query in valid_queries {
let config = QueryConfig {
id: "test".to_string(),
query: query.to_string(),
query_language: crate::config::QueryLanguage::Cypher,
middleware: vec![],
sources: vec![],
auto_start: false,
joins: None,
enable_bootstrap: true,
bootstrap_buffer_size: 10000,
priority_queue_capacity: None,
dispatch_buffer_capacity: None,
dispatch_mode: None,
storage_backend: None,
recovery_policy: None,
};
assert!(!config.query.is_empty());
}
}
#[tokio::test]
async fn test_empty_query_validation() {
let config = QueryConfig {
id: "test".to_string(),
query: "".to_string(),
query_language: crate::config::QueryLanguage::Cypher,
middleware: vec![],
sources: vec![],
auto_start: false,
joins: None,
enable_bootstrap: true,
bootstrap_buffer_size: 10000,
priority_queue_capacity: None,
dispatch_buffer_capacity: None,
dispatch_mode: None,
storage_backend: None,
recovery_policy: None,
};
assert!(config.query.is_empty());
}
}