use anyhow::Result;
use log::info;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::channels::ComponentStatus;
use crate::component_graph::{ComponentGraph, ComponentKind};
use crate::config::RuntimeConfig;
use crate::queries::QueryManager;
use crate::reactions::ReactionManager;
use crate::sources::SourceManager;
pub(crate) struct LifecycleManager {
config: Arc<RuntimeConfig>,
source_manager: Arc<SourceManager>,
query_manager: Arc<QueryManager>,
reaction_manager: Arc<ReactionManager>,
graph: Arc<RwLock<ComponentGraph>>,
}
impl LifecycleManager {
pub fn new(
config: Arc<RuntimeConfig>,
source_manager: Arc<SourceManager>,
query_manager: Arc<QueryManager>,
reaction_manager: Arc<ReactionManager>,
graph: Arc<RwLock<ComponentGraph>>,
) -> Self {
Self {
config,
source_manager,
query_manager,
reaction_manager,
graph,
}
}
pub async fn load_configuration(&self) -> Result<()> {
info!("Loading configuration");
for query_config in &self.config.queries {
let config = query_config.clone();
{
let mut graph = self.graph.write().await;
let mut metadata = HashMap::new();
metadata.insert("query".to_string(), config.query.clone());
let source_ids: Vec<String> =
config.sources.iter().map(|s| s.source_id.clone()).collect();
graph.register_query(&config.id, metadata, &source_ids)?;
}
let query_id = config.id.clone();
if let Err(e) = self.query_manager.provision_query(config).await {
let mut graph = self.graph.write().await;
let _ = graph.deregister(&query_id);
return Err(e);
}
}
info!("Configuration loaded successfully");
Ok(())
}
pub async fn start_components(&self) -> Result<()> {
info!("Starting all auto-start components in sequence: Sources → Queries → Reactions");
info!("Starting auto-start sources");
self.source_manager.start_all().await?;
info!("All auto-start sources started successfully");
info!("Starting auto-start queries");
self.query_manager.start_all().await?;
info!("All auto-start queries started successfully");
info!("Starting auto-start reactions");
self.reaction_manager.start_all().await?;
info!("All auto-start reactions started successfully");
info!("All auto-start components started in sequence: Sources → Queries → Reactions");
info!("[STARTUP-COMPLETE] DrasiLib.start() is now returning - all components and subscriptions should be active");
Ok(())
}
pub async fn stop_all_components(&self) -> Result<()> {
use log::error;
let shutdown_order: Vec<(String, ComponentKind, ComponentStatus)> = {
let graph = self.graph.read().await;
match graph.topological_order() {
Ok(order) => order
.into_iter()
.rev()
.map(|n| (n.id.clone(), n.kind.clone(), n.status))
.collect(),
Err(e) => {
error!("Failed to compute topological order: {e}, falling back to kind-based ordering");
let mut all = Vec::new();
for (id, status) in graph.list_by_kind(&ComponentKind::Reaction) {
all.push((id, ComponentKind::Reaction, status));
}
for (id, status) in graph.list_by_kind(&ComponentKind::Query) {
all.push((id, ComponentKind::Query, status));
}
for (id, status) in graph.list_by_kind(&ComponentKind::Source) {
all.push((id, ComponentKind::Source, status));
}
all
}
}
};
let mut failures = Vec::new();
for (id, kind, status) in shutdown_order {
if !matches!(status, ComponentStatus::Running | ComponentStatus::Starting) {
continue;
}
let result = match kind {
ComponentKind::Reaction => {
info!("Stopping reaction '{id}'");
self.reaction_manager.stop_reaction(id.clone()).await
}
ComponentKind::Query => {
info!("Stopping query '{id}'");
self.query_manager.stop_query(id.clone()).await
}
ComponentKind::Source => {
info!("Stopping source '{id}'");
self.source_manager.stop_source(id.clone()).await
}
_ => continue,
};
if let Err(e) = result {
error!("Error stopping {kind} {id}: {e}");
failures.push((id, e.to_string()));
}
}
if failures.is_empty() {
info!("All components stopped");
Ok(())
} else {
let error_msg = failures
.iter()
.map(|(id, err)| format!("{id}: {err}"))
.collect::<Vec<_>>()
.join(", ");
Err(anyhow::anyhow!(
"Failed to stop some components: {error_msg}"
))
}
}
}
#[cfg(test)]
mod tests {
use crate::channels::ComponentStatus;
use crate::lib_core::DrasiLib;
use crate::sources::tests::TestMockSource;
use crate::sources::COMPONENT_GRAPH_SOURCE_ID;
use crate::test_helpers::wait_for_component_status;
use std::time::Duration;
async fn build_with_sources(sources: Vec<TestMockSource>) -> DrasiLib {
let mut builder = DrasiLib::builder().with_id("lifecycle-test");
for s in sources {
builder = builder.with_source(s);
}
builder.build().await.unwrap()
}
#[tokio::test]
async fn start_components_starts_auto_start_sources() {
let source = TestMockSource::with_auto_start("auto-src".to_string(), true).unwrap();
let core = build_with_sources(vec![source]).await;
let mut event_rx = core.component_graph.read().await.subscribe();
core.start().await.unwrap();
wait_for_component_status(
&mut event_rx,
"auto-src",
ComponentStatus::Running,
Duration::from_secs(5),
)
.await;
let status = core.get_source_status("auto-src").await.unwrap();
assert_eq!(status, ComponentStatus::Running);
}
#[tokio::test]
async fn start_components_skips_non_auto_start() {
let source = TestMockSource::with_auto_start("manual-src".to_string(), false).unwrap();
let core = build_with_sources(vec![source]).await;
core.start().await.unwrap();
let status = core.get_source_status("manual-src").await.unwrap();
assert_eq!(status, ComponentStatus::Added);
}
#[tokio::test]
async fn stop_all_components_stops_running() {
let source = TestMockSource::with_auto_start("stop-src".to_string(), true).unwrap();
let core = build_with_sources(vec![source]).await;
let mut event_rx = core.component_graph.read().await.subscribe();
core.start().await.unwrap();
wait_for_component_status(
&mut event_rx,
"stop-src",
ComponentStatus::Running,
Duration::from_secs(5),
)
.await;
core.stop().await.unwrap();
wait_for_component_status(
&mut event_rx,
"stop-src",
ComponentStatus::Stopped,
Duration::from_secs(5),
)
.await;
let status = core.get_source_status("stop-src").await.unwrap();
assert_eq!(status, ComponentStatus::Stopped);
}
#[tokio::test]
async fn stop_all_components_handles_already_stopped() {
let source = TestMockSource::with_auto_start("idle-src".to_string(), false).unwrap();
let core = build_with_sources(vec![source]).await;
core.start().await.unwrap();
let _ = core.stop().await;
let status = core.get_source_status("idle-src").await.unwrap();
assert_eq!(status, ComponentStatus::Added);
}
#[tokio::test]
async fn load_configuration_creates_queries_from_config() {
use crate::builder::Query;
let source = TestMockSource::with_auto_start("cfg-src".to_string(), true).unwrap();
let core = DrasiLib::builder()
.with_id("cfg-test")
.with_source(source)
.with_query(
Query::cypher("cfg-query")
.query("MATCH (n:Person) RETURN n")
.from_source("cfg-src")
.build(),
)
.build()
.await
.unwrap();
let queries = core.list_queries().await.unwrap();
assert!(
queries.iter().any(|(id, _)| id == "cfg-query"),
"Query 'cfg-query' should exist after build; got: {queries:?}"
);
let (_, status) = queries.iter().find(|(id, _)| id == "cfg-query").unwrap();
assert_eq!(*status, ComponentStatus::Added);
}
}