use anyhow::Result;
use log::{info, warn};
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::channels::*;
use crate::component_graph::{ComponentGraph, GraphSnapshot};
use crate::config::{DrasiLibConfig, RuntimeConfig};
use crate::error::DrasiError;
use crate::inspection::InspectionAPI;
use crate::lifecycle::LifecycleManager;
use crate::managers::ComponentLogRegistry;
use crate::queries::QueryManager;
use crate::reactions::ReactionManager;
use crate::sources::SourceManager;
use crate::state_guard::StateGuard;
use drasi_core::middleware::MiddlewareTypeRegistry;
pub struct DrasiLib {
pub(crate) config: Arc<RuntimeConfig>,
pub(crate) source_manager: Arc<SourceManager>,
pub(crate) query_manager: Arc<QueryManager>,
pub(crate) reaction_manager: Arc<ReactionManager>,
pub(crate) running: Arc<RwLock<bool>>,
pub(crate) is_shutdown: Arc<std::sync::atomic::AtomicBool>,
pub(crate) state_guard: StateGuard,
pub(crate) inspection: InspectionAPI,
pub(crate) lifecycle: Arc<LifecycleManager>,
pub(crate) middleware_registry: Arc<MiddlewareTypeRegistry>,
pub(crate) log_registry: Arc<ComponentLogRegistry>,
pub(crate) component_event_broadcast_tx: ComponentEventBroadcastSender,
pub(crate) component_graph: Arc<RwLock<ComponentGraph>>,
pub(crate) graph_update_handle: Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>>,
}
impl Clone for DrasiLib {
fn clone(&self) -> Self {
Self {
config: Arc::clone(&self.config),
source_manager: Arc::clone(&self.source_manager),
query_manager: Arc::clone(&self.query_manager),
reaction_manager: Arc::clone(&self.reaction_manager),
running: Arc::clone(&self.running),
is_shutdown: Arc::clone(&self.is_shutdown),
state_guard: self.state_guard.clone(),
inspection: self.inspection.clone(),
lifecycle: Arc::clone(&self.lifecycle),
middleware_registry: Arc::clone(&self.middleware_registry),
log_registry: Arc::clone(&self.log_registry),
component_event_broadcast_tx: self.component_event_broadcast_tx.clone(),
component_graph: Arc::clone(&self.component_graph),
graph_update_handle: Arc::clone(&self.graph_update_handle),
}
}
}
impl DrasiLib {
pub(crate) fn to_arc(&self) -> Arc<Self> {
Arc::new(self.clone())
}
pub fn subscribe_all_component_events(&self) -> ComponentEventBroadcastReceiver {
self.component_event_broadcast_tx.subscribe()
}
pub(crate) fn new(config: Arc<RuntimeConfig>) -> Self {
let log_registry = crate::managers::get_or_init_global_registry();
let instance_id = config.id.clone();
let (graph, update_rx) = ComponentGraph::new(&instance_id);
let component_event_broadcast_tx = graph.event_sender().clone();
let update_tx = graph.update_sender();
let component_graph = Arc::new(RwLock::new(graph));
let source_manager = Arc::new(SourceManager::new(
&instance_id,
log_registry.clone(),
component_graph.clone(),
update_tx.clone(),
));
let mut middleware_registry = MiddlewareTypeRegistry::new();
#[cfg(feature = "middleware-jq")]
middleware_registry.register(Arc::new(drasi_middleware::jq::JQFactory::new()));
#[cfg(feature = "middleware-map")]
middleware_registry.register(Arc::new(drasi_middleware::map::MapFactory::new()));
#[cfg(feature = "middleware-unwind")]
middleware_registry.register(Arc::new(drasi_middleware::unwind::UnwindFactory::new()));
#[cfg(feature = "middleware-relabel")]
middleware_registry.register(Arc::new(
drasi_middleware::relabel::RelabelMiddlewareFactory::new(),
));
#[cfg(feature = "middleware-decoder")]
middleware_registry.register(Arc::new(drasi_middleware::decoder::DecoderFactory::new()));
#[cfg(feature = "middleware-parse-json")]
middleware_registry.register(Arc::new(
drasi_middleware::parse_json::ParseJsonFactory::new(),
));
#[cfg(feature = "middleware-promote")]
middleware_registry.register(Arc::new(
drasi_middleware::promote::PromoteMiddlewareFactory::new(),
));
let middleware_registry = Arc::new(middleware_registry);
let query_manager = Arc::new(QueryManager::new(
&instance_id,
source_manager.clone(),
config.index_factory.clone(),
middleware_registry.clone(),
log_registry.clone(),
component_graph.clone(),
update_tx.clone(),
));
let reaction_manager = Arc::new(ReactionManager::new(
&instance_id,
log_registry.clone(),
component_graph.clone(),
update_tx.clone(),
));
let state_guard = StateGuard::new();
let inspection = InspectionAPI::new(
source_manager.clone(),
query_manager.clone(),
reaction_manager.clone(),
state_guard.clone(),
config.clone(),
);
let lifecycle = Arc::new(LifecycleManager::new(
config.clone(),
source_manager.clone(),
query_manager.clone(),
reaction_manager.clone(),
component_graph.clone(),
));
let graph_update_handle = {
let graph = component_graph.clone();
let handle = tokio::spawn(async move {
let mut update_rx = update_rx;
while let Some(first) = update_rx.recv().await {
let mut batch = vec![first];
while let Ok(update) = update_rx.try_recv() {
batch.push(update);
}
let events: Vec<_> = {
let mut g = graph.write().await;
batch
.into_iter()
.filter_map(|update| g.apply_update(update))
.collect()
};
for event in events {
log::info!(
"Component Event - {:?} {}: {:?} - {}",
event.component_type,
event.component_id,
event.status,
event.message.clone().unwrap_or_default()
);
}
}
tracing::debug!("Graph update loop exited — all senders dropped");
});
Arc::new(tokio::sync::Mutex::new(Some(handle)))
};
Self {
config,
source_manager,
query_manager,
reaction_manager,
running: Arc::new(RwLock::new(false)),
is_shutdown: Arc::new(std::sync::atomic::AtomicBool::new(false)),
state_guard,
inspection,
lifecycle,
middleware_registry,
log_registry,
component_event_broadcast_tx,
component_graph,
graph_update_handle,
}
}
pub(crate) async fn initialize(&mut self) -> Result<()> {
let already_initialized = self.state_guard.is_initialized();
if already_initialized {
info!("Server already initialized, skipping initialization");
return Ok(());
}
info!("Initializing drasi-lib");
self.reaction_manager
.inject_query_provider(
Arc::clone(&self.query_manager) as Arc<dyn crate::reactions::QueryProvider>
)
.await;
let state_store = self.config.state_store_provider.clone();
self.source_manager
.inject_state_store(state_store.clone())
.await;
self.reaction_manager.inject_state_store(state_store).await;
if let Some(identity_provider) = &self.config.identity_provider {
self.source_manager
.inject_identity_provider(identity_provider.clone())
.await;
self.reaction_manager
.inject_identity_provider(identity_provider.clone())
.await;
}
self.lifecycle.load_configuration().await?;
self.state_guard.mark_initialized();
info!("drasi-lib initialized successfully");
Ok(())
}
pub async fn start(&self) -> crate::error::Result<()> {
if self.is_shutdown.load(std::sync::atomic::Ordering::Acquire) {
return Err(DrasiError::invalid_state(
"Server has been shut down and cannot be restarted",
));
}
{
let running = self.running.read().await;
if *running {
warn!("Server is already running");
return Err(DrasiError::invalid_state("Server is already running"));
}
}
if !self.state_guard.is_initialized() {
return Err(DrasiError::invalid_state(
"Server must be initialized before starting",
));
}
info!("Starting drasi-lib");
self.lifecycle.start_components().await?;
*self.running.write().await = true;
info!("drasi-lib started successfully");
Ok(())
}
pub async fn stop(&self) -> crate::error::Result<()> {
{
let running = self.running.read().await;
if !*running {
warn!("Server is already stopped");
return Err(DrasiError::invalid_state("Server is already stopped"));
}
}
info!("Stopping drasi-lib");
let result = self.lifecycle.stop_all_components().await;
*self.running.write().await = false;
match result {
Ok(()) => {
info!("drasi-lib stopped successfully");
Ok(())
}
Err(e) => {
warn!("drasi-lib stopped with errors: {e}");
Err(DrasiError::Internal(e))
}
}
}
pub async fn shutdown(&self) -> crate::error::Result<()> {
if self
.is_shutdown
.swap(true, std::sync::atomic::Ordering::AcqRel)
{
return Ok(());
}
if self.is_running().await {
if let Err(e) = self.stop().await {
warn!("Errors during shutdown stop: {e}");
}
}
if let Some(handle) = self.graph_update_handle.lock().await.take() {
handle.abort();
let _ = handle.await;
}
info!("drasi-lib shut down permanently");
Ok(())
}
pub fn query_manager(&self) -> &QueryManager {
&self.query_manager
}
pub fn middleware_registry(&self) -> Arc<MiddlewareTypeRegistry> {
Arc::clone(&self.middleware_registry)
}
pub fn log_registry(&self) -> Arc<ComponentLogRegistry> {
Arc::clone(&self.log_registry)
}
pub fn component_graph(&self) -> Arc<RwLock<ComponentGraph>> {
Arc::clone(&self.component_graph)
}
pub async fn get_current_config(&self) -> crate::error::Result<DrasiLibConfig> {
self.inspection.get_current_config().await
}
pub async fn snapshot_configuration(
&self,
) -> crate::error::Result<crate::config::snapshot::ConfigurationSnapshot> {
use crate::component_graph::ComponentKind;
use crate::config::snapshot::{
BootstrapSnapshot, ConfigurationSnapshot, QuerySnapshot, ReactionSnapshot,
SourceSnapshot,
};
self.state_guard.require_initialized()?;
let graph = self.component_graph.read().await;
let graph_snapshot = graph.snapshot();
let mut sources = Vec::new();
let mut queries = Vec::new();
let mut reactions = Vec::new();
for node in &graph_snapshot.nodes {
match node.kind {
ComponentKind::Source => {
if let Some(source) =
graph.get_runtime::<std::sync::Arc<dyn crate::sources::Source>>(&node.id)
{
let bootstrap_provider = graph
.get_neighbors(
&node.id,
&crate::component_graph::RelationshipKind::BootstrappedBy,
)
.into_iter()
.find(|n| n.kind == ComponentKind::BootstrapProvider)
.map(|bp_node| {
let kind =
bp_node.metadata.get("kind").cloned().unwrap_or_default();
let properties: std::collections::HashMap<
String,
serde_json::Value,
> = bp_node
.metadata
.iter()
.filter(|(k, _)| *k != "kind")
.filter_map(|(k, v)| {
serde_json::from_str(v)
.ok()
.map(|parsed| (k.clone(), parsed))
})
.collect();
BootstrapSnapshot { kind, properties }
});
sources.push(SourceSnapshot {
id: node.id.clone(),
source_type: source.type_name().to_string(),
status: node.status,
auto_start: node
.metadata
.get("autoStart")
.map(|v| v == "true")
.unwrap_or(false),
properties: source.properties(),
bootstrap_provider,
});
}
}
ComponentKind::Query => {
if let Some(query) = graph
.get_runtime::<std::sync::Arc<dyn crate::queries::manager::Query>>(&node.id)
{
queries.push(QuerySnapshot {
id: node.id.clone(),
config: query.get_config().clone(),
status: node.status,
});
}
}
ComponentKind::Reaction => {
if let Some(reaction) = graph
.get_runtime::<std::sync::Arc<dyn crate::reactions::Reaction>>(&node.id)
{
reactions.push(ReactionSnapshot {
id: node.id.clone(),
reaction_type: reaction.type_name().to_string(),
status: node.status,
auto_start: node
.metadata
.get("autoStart")
.map(|v| v == "true")
.unwrap_or(true),
queries: reaction.query_ids(),
properties: reaction.properties(),
});
}
}
_ => {}
}
}
Ok(ConfigurationSnapshot {
instance_id: graph_snapshot.instance_id,
timestamp: chrono::Utc::now().to_rfc3339(),
sources,
queries,
reactions,
edges: graph_snapshot.edges,
})
}
pub fn builder() -> crate::builder::DrasiLibBuilder {
crate::builder::DrasiLibBuilder::new()
}
pub async fn is_running(&self) -> bool {
*self.running.read().await
}
pub fn get_config(&self) -> &RuntimeConfig {
&self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
async fn create_test_server() -> DrasiLib {
DrasiLib::builder()
.with_id("test-server")
.build()
.await
.expect("Failed to build server")
}
#[tokio::test]
async fn test_middleware_registry_is_initialized() {
let core = create_test_server().await;
let registry = core.middleware_registry();
#[cfg(feature = "middleware-jq")]
assert!(
registry.get("jq").is_some(),
"JQ factory should be registered"
);
#[cfg(feature = "middleware-map")]
assert!(
registry.get("map").is_some(),
"Map factory should be registered"
);
#[cfg(feature = "middleware-unwind")]
assert!(
registry.get("unwind").is_some(),
"Unwind factory should be registered"
);
#[cfg(feature = "middleware-relabel")]
assert!(
registry.get("relabel").is_some(),
"Relabel factory should be registered"
);
#[cfg(feature = "middleware-decoder")]
assert!(
registry.get("decoder").is_some(),
"Decoder factory should be registered"
);
#[cfg(feature = "middleware-parse-json")]
assert!(
registry.get("parse_json").is_some(),
"ParseJson factory should be registered"
);
#[cfg(feature = "middleware-promote")]
assert!(
registry.get("promote").is_some(),
"Promote factory should be registered"
);
}
#[tokio::test]
async fn test_middleware_registry_arc_sharing() {
let core = create_test_server().await;
let registry1 = core.middleware_registry();
let registry2 = core.middleware_registry();
#[cfg(feature = "middleware-jq")]
{
assert!(registry1.get("jq").is_some());
assert!(registry2.get("jq").is_some());
}
#[cfg(all(feature = "middleware-map", not(feature = "middleware-jq")))]
{
assert!(registry1.get("map").is_some());
assert!(registry2.get("map").is_some());
}
#[cfg(all(
feature = "middleware-decoder",
not(feature = "middleware-jq"),
not(feature = "middleware-map")
))]
{
assert!(registry1.get("decoder").is_some());
assert!(registry2.get("decoder").is_some());
}
}
#[tokio::test]
async fn test_middleware_registry_accessible_before_start() {
let core = create_test_server().await;
assert!(!core.is_running().await);
let registry = core.middleware_registry();
#[cfg(feature = "middleware-jq")]
assert!(registry.get("jq").is_some());
#[cfg(all(feature = "middleware-map", not(feature = "middleware-jq")))]
assert!(registry.get("map").is_some());
#[cfg(not(any(
feature = "middleware-jq",
feature = "middleware-map",
feature = "middleware-decoder",
feature = "middleware-parse-json",
feature = "middleware-promote",
feature = "middleware-relabel",
feature = "middleware-unwind"
)))]
{
let _ = registry;
}
}
#[tokio::test]
async fn test_middleware_registry_accessible_after_start() {
let core = create_test_server().await;
core.start().await.expect("Failed to start server");
assert!(core.is_running().await);
let registry = core.middleware_registry();
#[cfg(feature = "middleware-jq")]
assert!(registry.get("jq").is_some());
#[cfg(all(feature = "middleware-map", not(feature = "middleware-jq")))]
assert!(registry.get("map").is_some());
#[cfg(not(any(
feature = "middleware-jq",
feature = "middleware-map",
feature = "middleware-decoder",
feature = "middleware-parse-json",
feature = "middleware-promote",
feature = "middleware-relabel",
feature = "middleware-unwind"
)))]
{
let _ = registry;
}
let _ = core.stop().await;
}
#[tokio::test]
async fn test_build_minimal_instance() {
let result = DrasiLib::builder().with_id("test").build().await;
assert!(result.is_ok(), "Minimal DrasiLib build should succeed");
}
#[tokio::test]
async fn test_is_running_lifecycle() {
let core = create_test_server().await;
assert!(
!core.is_running().await,
"Should not be running before start"
);
core.start().await.expect("Failed to start");
assert!(core.is_running().await, "Should be running after start");
let _ = core.stop().await;
assert!(!core.is_running().await, "Should not be running after stop");
}
#[tokio::test]
async fn test_start_then_stop() {
let core = create_test_server().await;
let start_result = core.start().await;
assert!(start_result.is_ok(), "start() should succeed");
let _ = core.stop().await;
assert!(
!core.is_running().await,
"Server should be stopped after stop()"
);
}
#[tokio::test]
async fn test_get_config_returns_runtime_config() {
let core = DrasiLib::builder()
.with_id("config-test")
.build()
.await
.expect("Failed to build");
let config = core.get_config();
assert_eq!(config.id, "config-test");
}
#[tokio::test]
async fn test_component_graph_returns_arc_rwlock() {
let core = DrasiLib::builder()
.with_id("graph-test")
.build()
.await
.expect("Failed to build");
let graph = core.component_graph();
let graph_read = graph.read().await;
assert_eq!(graph_read.instance_id(), "graph-test");
}
#[tokio::test]
async fn test_builder_returns_drasi_lib_builder() {
let builder = DrasiLib::builder();
let result = builder.with_id("builder-test").build().await;
assert!(
result.is_ok(),
"Builder should produce a valid DrasiLib instance"
);
}
mod snapshot_tests {
use super::*;
use crate::builder::Query;
use crate::component_graph::ComponentKind;
use crate::config::snapshot::ConfigurationSnapshot;
use crate::reactions::Reaction;
use crate::sources::tests::{create_test_mock_source, TestMockSource};
use async_trait::async_trait;
use std::collections::HashMap;
struct PropertiedSource {
id: String,
status_handle: crate::component_graph::ComponentStatusHandle,
dispatchers: Arc<
RwLock<
Vec<
Box<
dyn crate::channels::ChangeDispatcher<
crate::channels::SourceEventWrapper,
>,
>,
>,
>,
>,
}
impl PropertiedSource {
fn new(id: &str) -> Self {
Self {
id: id.to_string(),
status_handle: crate::component_graph::ComponentStatusHandle::new(id),
dispatchers: Arc::new(RwLock::new(Vec::new())),
}
}
}
#[async_trait]
impl crate::sources::Source for PropertiedSource {
fn id(&self) -> &str {
&self.id
}
fn type_name(&self) -> &str {
"test-postgres"
}
fn properties(&self) -> HashMap<String, serde_json::Value> {
let mut props = HashMap::new();
props.insert(
"host".to_string(),
serde_json::Value::String("localhost".to_string()),
);
props.insert("port".to_string(), serde_json::json!(5432));
props
}
fn auto_start(&self) -> bool {
false
}
async fn start(&self) -> anyhow::Result<()> {
self.status_handle
.set_status(
crate::channels::ComponentStatus::Starting,
Some("Starting".into()),
)
.await;
self.status_handle
.set_status(
crate::channels::ComponentStatus::Running,
Some("Running".into()),
)
.await;
Ok(())
}
async fn stop(&self) -> anyhow::Result<()> {
self.status_handle
.set_status(
crate::channels::ComponentStatus::Stopping,
Some("Stopping".into()),
)
.await;
self.status_handle
.set_status(
crate::channels::ComponentStatus::Stopped,
Some("Stopped".into()),
)
.await;
Ok(())
}
async fn status(&self) -> crate::channels::ComponentStatus {
self.status_handle.get_status().await
}
async fn subscribe(
&self,
settings: crate::config::SourceSubscriptionSettings,
) -> anyhow::Result<crate::channels::SubscriptionResponse> {
use crate::channels::ChannelChangeDispatcher;
let dispatcher =
ChannelChangeDispatcher::<crate::channels::SourceEventWrapper>::new(100);
let receiver = dispatcher.create_receiver().await?;
self.dispatchers.write().await.push(Box::new(dispatcher));
Ok(crate::channels::SubscriptionResponse {
query_id: settings.query_id,
source_id: self.id.clone(),
receiver,
bootstrap_receiver: None,
position_handle: None,
})
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
async fn initialize(&self, ctx: crate::context::SourceRuntimeContext) {
self.status_handle.wire(ctx.update_tx.clone()).await;
}
}
struct PropertiedReaction {
id: String,
queries: Vec<String>,
status_handle: crate::component_graph::ComponentStatusHandle,
}
impl PropertiedReaction {
fn new(id: &str, queries: Vec<String>) -> Self {
Self {
id: id.to_string(),
queries,
status_handle: crate::component_graph::ComponentStatusHandle::new(id),
}
}
}
#[async_trait]
impl Reaction for PropertiedReaction {
fn id(&self) -> &str {
&self.id
}
fn type_name(&self) -> &str {
"test-http"
}
fn properties(&self) -> HashMap<String, serde_json::Value> {
let mut props = HashMap::new();
props.insert(
"base_url".to_string(),
serde_json::Value::String("https://example.com/webhook".to_string()),
);
props.insert("timeout_ms".to_string(), serde_json::json!(5000));
props
}
fn query_ids(&self) -> Vec<String> {
self.queries.clone()
}
fn auto_start(&self) -> bool {
false
}
async fn initialize(&self, ctx: crate::context::ReactionRuntimeContext) {
self.status_handle.wire(ctx.update_tx.clone()).await;
}
async fn start(&self) -> anyhow::Result<()> {
self.status_handle
.set_status(
crate::channels::ComponentStatus::Starting,
Some("Starting".into()),
)
.await;
self.status_handle
.set_status(
crate::channels::ComponentStatus::Running,
Some("Running".into()),
)
.await;
Ok(())
}
async fn stop(&self) -> anyhow::Result<()> {
self.status_handle
.set_status(
crate::channels::ComponentStatus::Stopping,
Some("Stopping".into()),
)
.await;
self.status_handle
.set_status(
crate::channels::ComponentStatus::Stopped,
Some("Stopped".into()),
)
.await;
Ok(())
}
async fn status(&self) -> crate::channels::ComponentStatus {
self.status_handle.get_status().await
}
async fn enqueue_query_result(
&self,
_result: crate::channels::QueryResult,
) -> anyhow::Result<()> {
Ok(())
}
}
async fn build_core_with_components() -> DrasiLib {
let source = PropertiedSource::new("pg-source");
let core = DrasiLib::builder()
.with_id("snapshot-test")
.with_source(source)
.with_query(
Query::cypher("q1")
.query("MATCH (n:Person) RETURN n.name")
.from_source("pg-source")
.auto_start(false)
.build(),
)
.build()
.await
.unwrap();
core.start().await.unwrap();
let reaction = PropertiedReaction::new("http-reaction", vec!["q1".to_string()]);
core.add_reaction(reaction).await.unwrap();
core
}
#[tokio::test]
async fn snapshot_empty_instance() {
let core = create_test_server().await;
core.start().await.unwrap();
let snapshot = core.snapshot_configuration().await.unwrap();
assert_eq!(snapshot.instance_id, "test-server");
assert!(!snapshot.timestamp.is_empty());
assert!(snapshot.queries.is_empty());
assert!(snapshot.reactions.is_empty());
}
#[tokio::test]
async fn snapshot_requires_initialization() {
let core = DrasiLib::builder().with_id("uninit").build().await.unwrap();
let result = core.snapshot_configuration().await;
assert!(
result.is_ok(),
"Snapshot should succeed after build/initialization"
);
}
#[tokio::test]
async fn snapshot_captures_source_properties() {
let core = build_core_with_components().await;
let snapshot = core.snapshot_configuration().await.unwrap();
let pg_source = snapshot
.sources
.iter()
.find(|s| s.id == "pg-source")
.expect("pg-source should be in snapshot");
assert_eq!(pg_source.source_type, "test-postgres");
assert!(!pg_source.auto_start);
assert_eq!(
pg_source.properties.get("host"),
Some(&serde_json::Value::String("localhost".to_string()))
);
assert_eq!(
pg_source.properties.get("port"),
Some(&serde_json::json!(5432))
);
}
#[tokio::test]
async fn snapshot_captures_query_config() {
let core = build_core_with_components().await;
let snapshot = core.snapshot_configuration().await.unwrap();
let q1 = snapshot
.queries
.iter()
.find(|q| q.id == "q1")
.expect("q1 should be in snapshot");
assert_eq!(q1.config.query, "MATCH (n:Person) RETURN n.name");
assert!(
q1.config.sources.iter().any(|s| s.source_id == "pg-source"),
"Query should reference pg-source"
);
}
#[tokio::test]
async fn snapshot_captures_reaction_properties() {
let core = build_core_with_components().await;
let snapshot = core.snapshot_configuration().await.unwrap();
let reaction = snapshot
.reactions
.iter()
.find(|r| r.id == "http-reaction")
.expect("http-reaction should be in snapshot");
assert_eq!(reaction.reaction_type, "test-http");
assert_eq!(reaction.queries, vec!["q1".to_string()]);
assert_eq!(
reaction.properties.get("base_url"),
Some(&serde_json::Value::String(
"https://example.com/webhook".to_string()
))
);
assert_eq!(
reaction.properties.get("timeout_ms"),
Some(&serde_json::json!(5000))
);
}
#[tokio::test]
async fn snapshot_includes_dependency_edges() {
let core = build_core_with_components().await;
let snapshot = core.snapshot_configuration().await.unwrap();
assert!(
!snapshot.edges.is_empty(),
"Snapshot should include dependency edges"
);
let has_source_to_query = snapshot
.edges
.iter()
.any(|e| e.from == "pg-source" && e.to == "q1");
assert!(has_source_to_query, "Should have edge from pg-source to q1");
let has_query_to_reaction = snapshot
.edges
.iter()
.any(|e| e.from == "q1" && e.to == "http-reaction");
assert!(
has_query_to_reaction,
"Should have edge from q1 to http-reaction"
);
}
#[tokio::test]
async fn snapshot_json_round_trip() {
let core = build_core_with_components().await;
let snapshot = core.snapshot_configuration().await.unwrap();
let json = serde_json::to_string_pretty(&snapshot).expect("Should serialize to JSON");
let restored: ConfigurationSnapshot =
serde_json::from_str(&json).expect("Should deserialize from JSON");
assert_eq!(restored.instance_id, snapshot.instance_id);
assert_eq!(restored.timestamp, snapshot.timestamp);
assert_eq!(restored.sources.len(), snapshot.sources.len());
assert_eq!(restored.queries.len(), snapshot.queries.len());
assert_eq!(restored.reactions.len(), snapshot.reactions.len());
assert_eq!(restored.edges.len(), snapshot.edges.len());
let src = restored
.sources
.iter()
.find(|s| s.id == "pg-source")
.unwrap();
assert_eq!(src.properties.get("port"), Some(&serde_json::json!(5432)));
let q = restored.queries.iter().find(|q| q.id == "q1").unwrap();
assert_eq!(q.config.query, "MATCH (n:Person) RETURN n.name");
let rxn = restored
.reactions
.iter()
.find(|r| r.id == "http-reaction")
.unwrap();
assert_eq!(
rxn.properties.get("timeout_ms"),
Some(&serde_json::json!(5000))
);
}
#[tokio::test]
async fn snapshot_captures_component_status() {
let core = build_core_with_components().await;
let snapshot = core.snapshot_configuration().await.unwrap();
let src = snapshot
.sources
.iter()
.find(|s| s.id == "pg-source")
.unwrap();
assert!(
matches!(
src.status,
crate::channels::ComponentStatus::Added
| crate::channels::ComponentStatus::Running
),
"Source status should be captured: {:?}",
src.status
);
}
#[tokio::test]
async fn snapshot_multiple_sources_and_reactions() {
let s1 = PropertiedSource::new("src-a");
let s2 = TestMockSource::with_auto_start("src-b".to_string(), false).unwrap();
let core = DrasiLib::builder()
.with_id("multi-test")
.with_source(s1)
.with_source(s2)
.with_query(
Query::cypher("qa")
.query("MATCH (n:A) RETURN n")
.from_source("src-a")
.auto_start(false)
.build(),
)
.with_query(
Query::cypher("qb")
.query("MATCH (n:B) RETURN n")
.from_source("src-b")
.auto_start(false)
.build(),
)
.build()
.await
.unwrap();
core.start().await.unwrap();
let r1 = PropertiedReaction::new("rxn-1", vec!["qa".to_string()]);
let r2 = PropertiedReaction::new("rxn-2", vec!["qb".to_string()]);
core.add_reaction(r1).await.unwrap();
core.add_reaction(r2).await.unwrap();
let snapshot = core.snapshot_configuration().await.unwrap();
let user_sources: Vec<_> = snapshot
.sources
.iter()
.filter(|s| s.id == "src-a" || s.id == "src-b")
.collect();
assert_eq!(user_sources.len(), 2, "Should have 2 user sources");
assert_eq!(snapshot.queries.len(), 2, "Should have 2 queries");
assert_eq!(snapshot.reactions.len(), 2, "Should have 2 reactions");
let src_a = snapshot.sources.iter().find(|s| s.id == "src-a").unwrap();
assert!(!src_a.properties.is_empty(), "src-a should have properties");
let src_b = snapshot.sources.iter().find(|s| s.id == "src-b").unwrap();
assert!(
src_b.properties.is_empty(),
"src-b (TestMockSource) should have empty properties"
);
}
#[tokio::test]
async fn snapshot_captures_reaction_auto_start() {
let core = build_core_with_components().await;
let snapshot = core.snapshot_configuration().await.unwrap();
let reaction = snapshot
.reactions
.iter()
.find(|r| r.id == "http-reaction")
.expect("http-reaction should be in snapshot");
assert!(
!reaction.auto_start,
"Reaction auto_start should be captured as false"
);
}
#[tokio::test]
async fn snapshot_source_without_bootstrap_has_none() {
let core = build_core_with_components().await;
let snapshot = core.snapshot_configuration().await.unwrap();
let src = snapshot
.sources
.iter()
.find(|s| s.id == "pg-source")
.expect("pg-source should be in snapshot");
assert!(
src.bootstrap_provider.is_none(),
"Source without bootstrap should have bootstrap_provider=None"
);
}
#[tokio::test]
async fn snapshot_captures_bootstrap_provider_from_graph() {
let source = PropertiedSource::new("bp-source");
let core = DrasiLib::builder()
.with_id("bootstrap-test")
.with_source(source)
.with_query(
Query::cypher("bp-query")
.query("MATCH (n) RETURN n")
.from_source("bp-source")
.auto_start(false)
.build(),
)
.build()
.await
.unwrap();
core.start().await.unwrap();
{
let graph = core.component_graph();
let mut g = graph.write().await;
let mut metadata = HashMap::new();
metadata.insert("kind".to_string(), "postgres".to_string());
metadata.insert(
"timeout_seconds".to_string(),
serde_json::json!(300).to_string(),
);
g.register_bootstrap_provider(
"bp-source-bootstrap",
metadata,
&["bp-source".to_string()],
)
.unwrap();
}
let snapshot = core.snapshot_configuration().await.unwrap();
let src = snapshot
.sources
.iter()
.find(|s| s.id == "bp-source")
.expect("bp-source should be in snapshot");
let bp = src
.bootstrap_provider
.as_ref()
.expect("Source should have bootstrap_provider");
assert_eq!(bp.kind, "postgres");
assert!(
bp.properties.contains_key("timeout_seconds"),
"Bootstrap properties should include timeout_seconds"
);
}
#[tokio::test]
async fn snapshot_bootstrap_json_round_trip() {
let source = PropertiedSource::new("rt-source");
let core = DrasiLib::builder()
.with_id("bp-roundtrip")
.with_source(source)
.build()
.await
.unwrap();
core.start().await.unwrap();
{
let graph = core.component_graph();
let mut g = graph.write().await;
let mut metadata = HashMap::new();
metadata.insert("kind".to_string(), "scriptfile".to_string());
metadata.insert(
"file_paths".to_string(),
serde_json::json!(["data.jsonl", "init.jsonl"]).to_string(),
);
g.register_bootstrap_provider(
"rt-source-bootstrap",
metadata,
&["rt-source".to_string()],
)
.unwrap();
}
let snapshot = core.snapshot_configuration().await.unwrap();
let json = serde_json::to_string_pretty(&snapshot).expect("Should serialize");
let restored: ConfigurationSnapshot =
serde_json::from_str(&json).expect("Should deserialize");
let src = restored
.sources
.iter()
.find(|s| s.id == "rt-source")
.unwrap();
let bp = src.bootstrap_provider.as_ref().unwrap();
assert_eq!(bp.kind, "scriptfile");
assert!(bp.properties.contains_key("file_paths"));
}
#[tokio::test]
async fn snapshot_excludes_introspection_source() {
let core = build_core_with_components().await;
let snapshot = core.snapshot_configuration().await.unwrap();
let has_internal = snapshot.sources.iter().any(|s| s.id.starts_with("__"));
if has_internal {
let internal: Vec<_> = snapshot
.sources
.iter()
.filter(|s| s.id.starts_with("__"))
.map(|s| s.id.as_str())
.collect();
eprintln!(
"Note: Internal sources present in snapshot (server should filter): {internal:?}"
);
}
}
}
}