use anyhow::Result;
use async_trait::async_trait;
use drasi_core::models::{ElementPropertyMap, ElementValue, SourceChange};
use log::{debug, info, warn};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::bootstrap::{BootstrapProvider, ComponentGraphBootstrapProvider};
use crate::channels::*;
use crate::component_graph::ComponentGraph;
use crate::config::{
NodeSchema, PropertySchema, PropertyType, RelationSchema, SourceSchema,
SourceSubscriptionSettings,
};
use crate::context::SourceRuntimeContext;
use crate::sources::base::{SourceBase, SourceBaseParams};
use crate::sources::graph_elements::{
build_props, make_delete_metadata, make_node, make_relation, status_str,
};
use crate::sources::Source;
pub const COMPONENT_GRAPH_SOURCE_ID: &str = "__component_graph__";
pub struct ComponentGraphSource {
base: SourceBase,
broadcast_tx: ComponentEventBroadcastSender,
instance_id: String,
}
impl ComponentGraphSource {
pub fn new(
broadcast_tx: ComponentEventBroadcastSender,
instance_id: String,
graph: Arc<RwLock<ComponentGraph>>,
) -> Result<Self> {
let params = SourceBaseParams::new(COMPONENT_GRAPH_SOURCE_ID)
.with_dispatch_mode(DispatchMode::Broadcast)
.with_auto_start(true)
.with_bootstrap_provider(ComponentGraphBootstrapProvider::new(graph));
Ok(Self {
base: SourceBase::new(params)?,
broadcast_tx,
instance_id,
})
}
}
#[async_trait]
impl Source for ComponentGraphSource {
fn id(&self) -> &str {
COMPONENT_GRAPH_SOURCE_ID
}
fn type_name(&self) -> &str {
"component_graph"
}
fn properties(&self) -> HashMap<String, serde_json::Value> {
let mut props = HashMap::new();
props.insert(
"instance_id".to_string(),
serde_json::json!(self.instance_id),
);
props
}
fn dispatch_mode(&self) -> DispatchMode {
DispatchMode::Broadcast
}
fn auto_start(&self) -> bool {
true
}
fn describe_schema(&self) -> Option<SourceSchema> {
let id_prop = PropertySchema {
name: "id".to_string(),
data_type: Some(PropertyType::String),
description: Some("Component identifier".to_string()),
};
let status_prop = PropertySchema {
name: "status".to_string(),
data_type: Some(PropertyType::String),
description: Some("Component lifecycle status".to_string()),
};
let error_prop = PropertySchema {
name: "error".to_string(),
data_type: Some(PropertyType::String),
description: Some("Error message (present when status is Error)".to_string()),
};
let component_props = vec![id_prop.clone(), status_prop.clone(), error_prop.clone()];
Some(SourceSchema {
nodes: vec![
NodeSchema {
label: "DrasiInstance".to_string(),
properties: vec![
id_prop,
status_prop,
error_prop,
PropertySchema {
name: "running".to_string(),
data_type: Some(PropertyType::String),
description: Some("Whether the instance is running".to_string()),
},
],
},
NodeSchema {
label: "Source".to_string(),
properties: component_props.clone(),
},
NodeSchema {
label: "Query".to_string(),
properties: component_props.clone(),
},
NodeSchema {
label: "Reaction".to_string(),
properties: component_props,
},
],
relations: vec![
RelationSchema {
label: "HAS_SOURCE".to_string(),
from: Some("DrasiInstance".to_string()),
to: Some("Source".to_string()),
properties: Vec::new(),
},
RelationSchema {
label: "HAS_QUERY".to_string(),
from: Some("DrasiInstance".to_string()),
to: Some("Query".to_string()),
properties: Vec::new(),
},
RelationSchema {
label: "HAS_REACTION".to_string(),
from: Some("DrasiInstance".to_string()),
to: Some("Reaction".to_string()),
properties: Vec::new(),
},
RelationSchema {
label: "SUBSCRIBES_TO".to_string(),
from: Some("Query".to_string()),
to: Some("Source".to_string()),
properties: Vec::new(),
},
RelationSchema {
label: "LISTENS_TO".to_string(),
from: Some("Reaction".to_string()),
to: Some("Query".to_string()),
properties: Vec::new(),
},
],
})
}
async fn start(&self) -> Result<()> {
let current = self.base.get_status().await;
if matches!(
current,
ComponentStatus::Running | ComponentStatus::Starting
) {
warn!(
"Component graph source for instance '{}' is already {current:?}, skipping start",
self.instance_id
);
return Ok(());
}
info!(
"Starting component graph source for instance '{}'",
self.instance_id
);
self.base
.set_status(
ComponentStatus::Starting,
Some("Starting component graph source".to_string()),
)
.await;
let mut rx = self.broadcast_tx.subscribe();
let base = self.base.clone_shared();
let instance_id = self.instance_id.clone();
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
self.base.set_shutdown_tx(shutdown_tx).await;
let handle = tokio::spawn(async move {
debug!("Component graph source event loop started");
loop {
tokio::select! {
_ = &mut shutdown_rx => {
info!("Component graph source received shutdown signal");
break;
}
result = rx.recv() => {
match result {
Ok(event) => {
if let Err(e) = handle_component_event(&base, &instance_id, &event).await {
warn!("Component graph source failed to process event: {e}");
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
warn!("Component graph source lagged by {n} events");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
info!("Component graph source broadcast channel closed");
break;
}
}
}
}
}
debug!("Component graph source event loop ended");
});
self.base.set_task_handle(handle).await;
self.base
.set_status(
ComponentStatus::Running,
Some("Component graph source running".to_string()),
)
.await;
info!(
"Component graph source started for instance '{}'",
self.instance_id
);
Ok(())
}
async fn stop(&self) -> Result<()> {
self.base.stop_common().await
}
async fn status(&self) -> ComponentStatus {
self.base.get_status().await
}
async fn subscribe(
&self,
settings: SourceSubscriptionSettings,
) -> Result<SubscriptionResponse> {
self.base
.subscribe_with_bootstrap(&settings, "component_graph")
.await
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
async fn initialize(&self, context: SourceRuntimeContext) {
self.base.initialize(context).await;
}
async fn set_bootstrap_provider(&self, provider: Box<dyn BootstrapProvider + 'static>) {
self.base.set_bootstrap_provider(provider).await;
}
}
async fn handle_component_event(
base: &SourceBase,
instance_id: &str,
event: &ComponentEvent,
) -> Result<()> {
if event.component_id == COMPONENT_GRAPH_SOURCE_ID {
return Ok(());
}
let changes = match event.status {
ComponentStatus::Added => build_added_changes(instance_id, event),
ComponentStatus::Removed => build_removed_changes(instance_id, event),
_ => build_status_update_changes(event),
};
for change in changes {
base.dispatch_source_change(change).await?;
}
Ok(())
}
fn build_added_changes(instance_id: &str, event: &ComponentEvent) -> Vec<SourceChange> {
let mut changes = Vec::new();
let (label, prefix) = component_label_prefix(&event.component_type);
let node_id = format!("{prefix}:{}", event.component_id);
let node = make_node(
&node_id,
&[label],
build_props(&[("id", &event.component_id), ("status", "Stopped")]),
);
changes.push(SourceChange::Insert { element: node });
let instance_node_id = format!("instance:{instance_id}");
let has_label = match event.component_type {
ComponentType::Source => "HAS_SOURCE",
ComponentType::Query => "HAS_QUERY",
ComponentType::Reaction => "HAS_REACTION",
ComponentType::BootstrapProvider => "HAS_BOOTSTRAP_PROVIDER",
ComponentType::IdentityProvider => "HAS_IDENTITY_PROVIDER",
};
let rel_prefix = match event.component_type {
ComponentType::Source => "has_source",
ComponentType::Query => "has_query",
ComponentType::Reaction => "has_reaction",
ComponentType::BootstrapProvider => "has_bootstrap_provider",
ComponentType::IdentityProvider => "has_identity_provider",
};
let rel_id = format!("rel:{rel_prefix}:{instance_id}:{}", event.component_id);
let rel = make_relation(
&rel_id,
&[has_label],
&instance_node_id,
&node_id,
ElementPropertyMap::new(),
);
changes.push(SourceChange::Insert { element: rel });
changes
}
fn build_removed_changes(instance_id: &str, event: &ComponentEvent) -> Vec<SourceChange> {
let mut changes = Vec::new();
let (label, prefix) = component_label_prefix(&event.component_type);
let rel_prefix = match event.component_type {
ComponentType::Source => "has_source",
ComponentType::Query => "has_query",
ComponentType::Reaction => "has_reaction",
ComponentType::BootstrapProvider => "has_bootstrap_provider",
ComponentType::IdentityProvider => "has_identity_provider",
};
let has_label = match event.component_type {
ComponentType::Source => "HAS_SOURCE",
ComponentType::Query => "HAS_QUERY",
ComponentType::Reaction => "HAS_REACTION",
ComponentType::BootstrapProvider => "HAS_BOOTSTRAP_PROVIDER",
ComponentType::IdentityProvider => "HAS_IDENTITY_PROVIDER",
};
let rel_id = format!("rel:{rel_prefix}:{instance_id}:{}", event.component_id);
changes.push(SourceChange::Delete {
metadata: make_delete_metadata(&rel_id, &[has_label]),
});
let node_id = format!("{prefix}:{}", event.component_id);
changes.push(SourceChange::Delete {
metadata: make_delete_metadata(&node_id, &[label]),
});
changes
}
fn build_status_update_changes(event: &ComponentEvent) -> Vec<SourceChange> {
let (label, prefix) = component_label_prefix(&event.component_type);
let node_id = format!("{prefix}:{}", event.component_id);
let mut props =
build_props(&[("id", &event.component_id), ("status", status_str(&event.status))]);
if let Some(ref msg) = event.message {
if event.status == ComponentStatus::Error {
props.insert("error", ElementValue::String(Arc::from(msg.as_str())));
}
}
let node = make_node(&node_id, &[label], props);
vec![SourceChange::Update { element: node }]
}
fn component_label_prefix(ct: &ComponentType) -> (&'static str, &'static str) {
match ct {
ComponentType::Source => ("Source", "source"),
ComponentType::Query => ("Query", "query"),
ComponentType::Reaction => ("Reaction", "reaction"),
ComponentType::BootstrapProvider => ("BootstrapProvider", "bootstrap_provider"),
ComponentType::IdentityProvider => ("IdentityProvider", "identity_provider"),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::component_graph::ComponentGraph;
fn make_source() -> ComponentGraphSource {
let (graph, _update_rx) = ComponentGraph::new("test-instance");
let broadcast_tx = graph.event_sender().clone();
let graph = Arc::new(RwLock::new(graph));
ComponentGraphSource::new(broadcast_tx, "test-instance".to_string(), graph).unwrap()
}
#[test]
fn new_creates_source_with_correct_id() {
let source = make_source();
assert_eq!(source.id(), COMPONENT_GRAPH_SOURCE_ID);
assert_eq!(source.id(), "__component_graph__");
}
#[test]
fn id_returns_component_graph_source_id() {
let source = make_source();
assert_eq!(source.id(), "__component_graph__");
}
#[test]
fn type_name_returns_component_graph() {
let source = make_source();
assert_eq!(source.type_name(), "component_graph");
}
#[test]
fn auto_start_returns_true() {
let source = make_source();
assert!(source.auto_start());
}
#[tokio::test]
async fn initial_status_is_stopped() {
let source = make_source();
assert_eq!(source.status().await, ComponentStatus::Stopped);
}
#[test]
fn dispatch_mode_is_broadcast() {
let source = make_source();
assert_eq!(source.dispatch_mode(), DispatchMode::Broadcast);
}
#[test]
fn properties_contains_instance_id() {
let source = make_source();
let props = source.properties();
assert_eq!(
props.get("instance_id"),
Some(&serde_json::json!("test-instance"))
);
}
#[test]
fn as_any_downcasts_to_self() {
let source = make_source();
assert!(source.as_any().is::<ComponentGraphSource>());
}
#[test]
fn describe_schema_returns_component_graph_schema() {
let source = make_source();
let schema = source.describe_schema().expect("should return Some");
let node_labels: Vec<&str> = schema.nodes.iter().map(|n| n.label.as_str()).collect();
assert!(node_labels.contains(&"DrasiInstance"));
assert!(node_labels.contains(&"Source"));
assert!(node_labels.contains(&"Query"));
assert!(node_labels.contains(&"Reaction"));
let rel_labels: Vec<&str> = schema.relations.iter().map(|r| r.label.as_str()).collect();
assert!(rel_labels.contains(&"HAS_SOURCE"));
assert!(rel_labels.contains(&"HAS_QUERY"));
assert!(rel_labels.contains(&"HAS_REACTION"));
assert!(rel_labels.contains(&"SUBSCRIBES_TO"));
assert!(rel_labels.contains(&"LISTENS_TO"));
let subscribes = schema
.relations
.iter()
.find(|r| r.label == "SUBSCRIBES_TO")
.unwrap();
assert_eq!(subscribes.from.as_deref(), Some("Query"));
assert_eq!(subscribes.to.as_deref(), Some("Source"));
let listens = schema
.relations
.iter()
.find(|r| r.label == "LISTENS_TO")
.unwrap();
assert_eq!(listens.from.as_deref(), Some("Reaction"));
assert_eq!(listens.to.as_deref(), Some("Query"));
let instance = schema
.nodes
.iter()
.find(|n| n.label == "DrasiInstance")
.unwrap();
assert!(instance.properties.iter().any(|p| p.name == "running"));
for node in &schema.nodes {
assert!(
node.properties.iter().any(|p| p.name == "id"),
"{} should have id property",
node.label
);
assert!(
node.properties.iter().any(|p| p.name == "status"),
"{} should have status property",
node.label
);
}
}
}