use anyhow::Result;
use async_trait::async_trait;
use drasi_core::models::{ElementPropertyMap, ElementValue, SourceChange};
use log::{debug, info, warn};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::bootstrap::{BootstrapProvider, ComponentGraphBootstrapProvider};
use crate::channels::*;
use crate::component_graph::ComponentGraph;
use crate::config::SourceSubscriptionSettings;
use crate::context::SourceRuntimeContext;
use crate::sources::base::{SourceBase, SourceBaseParams};
use crate::sources::graph_elements::{
build_props, make_delete_metadata, make_node, make_relation, status_str,
};
use crate::sources::Source;
pub const COMPONENT_GRAPH_SOURCE_ID: &str = "__component_graph__";
pub struct ComponentGraphSource {
base: SourceBase,
broadcast_tx: ComponentEventBroadcastSender,
instance_id: String,
}
impl ComponentGraphSource {
pub fn new(
broadcast_tx: ComponentEventBroadcastSender,
instance_id: String,
graph: Arc<RwLock<ComponentGraph>>,
) -> Result<Self> {
let params = SourceBaseParams::new(COMPONENT_GRAPH_SOURCE_ID)
.with_dispatch_mode(DispatchMode::Broadcast)
.with_auto_start(true)
.with_bootstrap_provider(ComponentGraphBootstrapProvider::new(graph));
Ok(Self {
base: SourceBase::new(params)?,
broadcast_tx,
instance_id,
})
}
}
#[async_trait]
impl Source for ComponentGraphSource {
fn id(&self) -> &str {
COMPONENT_GRAPH_SOURCE_ID
}
fn type_name(&self) -> &str {
"component_graph"
}
fn properties(&self) -> HashMap<String, serde_json::Value> {
let mut props = HashMap::new();
props.insert(
"instance_id".to_string(),
serde_json::json!(self.instance_id),
);
props
}
fn dispatch_mode(&self) -> DispatchMode {
DispatchMode::Broadcast
}
fn auto_start(&self) -> bool {
true
}
async fn start(&self) -> Result<()> {
let current = self.base.get_status().await;
if matches!(
current,
ComponentStatus::Running | ComponentStatus::Starting
) {
warn!(
"Component graph source for instance '{}' is already {current:?}, skipping start",
self.instance_id
);
return Ok(());
}
info!(
"Starting component graph source for instance '{}'",
self.instance_id
);
self.base
.set_status(
ComponentStatus::Starting,
Some("Starting component graph source".to_string()),
)
.await;
let mut rx = self.broadcast_tx.subscribe();
let base = self.base.clone_shared();
let instance_id = self.instance_id.clone();
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
self.base.set_shutdown_tx(shutdown_tx).await;
let handle = tokio::spawn(async move {
debug!("Component graph source event loop started");
loop {
tokio::select! {
_ = &mut shutdown_rx => {
info!("Component graph source received shutdown signal");
break;
}
result = rx.recv() => {
match result {
Ok(event) => {
if let Err(e) = handle_component_event(&base, &instance_id, &event).await {
warn!("Component graph source failed to process event: {e}");
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
warn!("Component graph source lagged by {n} events");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
info!("Component graph source broadcast channel closed");
break;
}
}
}
}
}
debug!("Component graph source event loop ended");
});
self.base.set_task_handle(handle).await;
self.base
.set_status(
ComponentStatus::Running,
Some("Component graph source running".to_string()),
)
.await;
info!(
"Component graph source started for instance '{}'",
self.instance_id
);
Ok(())
}
async fn stop(&self) -> Result<()> {
self.base.stop_common().await
}
async fn status(&self) -> ComponentStatus {
self.base.get_status().await
}
async fn subscribe(
&self,
settings: SourceSubscriptionSettings,
) -> Result<SubscriptionResponse> {
self.base
.subscribe_with_bootstrap(&settings, "component_graph")
.await
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
async fn initialize(&self, context: SourceRuntimeContext) {
self.base.initialize(context).await;
}
async fn set_bootstrap_provider(&self, provider: Box<dyn BootstrapProvider + 'static>) {
self.base.set_bootstrap_provider(provider).await;
}
}
async fn handle_component_event(
base: &SourceBase,
instance_id: &str,
event: &ComponentEvent,
) -> Result<()> {
if event.component_id == COMPONENT_GRAPH_SOURCE_ID {
return Ok(());
}
let changes = match event.status {
ComponentStatus::Added => build_added_changes(instance_id, event),
ComponentStatus::Removed => build_removed_changes(instance_id, event),
_ => build_status_update_changes(event),
};
for change in changes {
base.dispatch_source_change(change).await?;
}
Ok(())
}
fn build_added_changes(instance_id: &str, event: &ComponentEvent) -> Vec<SourceChange> {
let mut changes = Vec::new();
let (label, prefix) = component_label_prefix(&event.component_type);
let node_id = format!("{prefix}:{}", event.component_id);
let node = make_node(
&node_id,
&[label],
build_props(&[("id", &event.component_id), ("status", "Stopped")]),
);
changes.push(SourceChange::Insert { element: node });
let instance_node_id = format!("instance:{instance_id}");
let has_label = match event.component_type {
ComponentType::Source => "HAS_SOURCE",
ComponentType::Query => "HAS_QUERY",
ComponentType::Reaction => "HAS_REACTION",
ComponentType::BootstrapProvider => "HAS_BOOTSTRAP_PROVIDER",
ComponentType::IdentityProvider => "HAS_IDENTITY_PROVIDER",
};
let rel_prefix = match event.component_type {
ComponentType::Source => "has_source",
ComponentType::Query => "has_query",
ComponentType::Reaction => "has_reaction",
ComponentType::BootstrapProvider => "has_bootstrap_provider",
ComponentType::IdentityProvider => "has_identity_provider",
};
let rel_id = format!("rel:{rel_prefix}:{instance_id}:{}", event.component_id);
let rel = make_relation(
&rel_id,
&[has_label],
&instance_node_id,
&node_id,
ElementPropertyMap::new(),
);
changes.push(SourceChange::Insert { element: rel });
changes
}
fn build_removed_changes(instance_id: &str, event: &ComponentEvent) -> Vec<SourceChange> {
let mut changes = Vec::new();
let (label, prefix) = component_label_prefix(&event.component_type);
let rel_prefix = match event.component_type {
ComponentType::Source => "has_source",
ComponentType::Query => "has_query",
ComponentType::Reaction => "has_reaction",
ComponentType::BootstrapProvider => "has_bootstrap_provider",
ComponentType::IdentityProvider => "has_identity_provider",
};
let has_label = match event.component_type {
ComponentType::Source => "HAS_SOURCE",
ComponentType::Query => "HAS_QUERY",
ComponentType::Reaction => "HAS_REACTION",
ComponentType::BootstrapProvider => "HAS_BOOTSTRAP_PROVIDER",
ComponentType::IdentityProvider => "HAS_IDENTITY_PROVIDER",
};
let rel_id = format!("rel:{rel_prefix}:{instance_id}:{}", event.component_id);
changes.push(SourceChange::Delete {
metadata: make_delete_metadata(&rel_id, &[has_label]),
});
let node_id = format!("{prefix}:{}", event.component_id);
changes.push(SourceChange::Delete {
metadata: make_delete_metadata(&node_id, &[label]),
});
changes
}
fn build_status_update_changes(event: &ComponentEvent) -> Vec<SourceChange> {
let (label, prefix) = component_label_prefix(&event.component_type);
let node_id = format!("{prefix}:{}", event.component_id);
let mut props =
build_props(&[("id", &event.component_id), ("status", status_str(&event.status))]);
if let Some(ref msg) = event.message {
if event.status == ComponentStatus::Error {
props.insert("error", ElementValue::String(Arc::from(msg.as_str())));
}
}
let node = make_node(&node_id, &[label], props);
vec![SourceChange::Update { element: node }]
}
fn component_label_prefix(ct: &ComponentType) -> (&'static str, &'static str) {
match ct {
ComponentType::Source => ("Source", "source"),
ComponentType::Query => ("Query", "query"),
ComponentType::Reaction => ("Reaction", "reaction"),
ComponentType::BootstrapProvider => ("BootstrapProvider", "bootstrap_provider"),
ComponentType::IdentityProvider => ("IdentityProvider", "identity_provider"),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::component_graph::ComponentGraph;
fn make_source() -> ComponentGraphSource {
let (graph, _update_rx) = ComponentGraph::new("test-instance");
let broadcast_tx = graph.event_sender().clone();
let graph = Arc::new(RwLock::new(graph));
ComponentGraphSource::new(broadcast_tx, "test-instance".to_string(), graph).unwrap()
}
#[test]
fn new_creates_source_with_correct_id() {
let source = make_source();
assert_eq!(source.id(), COMPONENT_GRAPH_SOURCE_ID);
assert_eq!(source.id(), "__component_graph__");
}
#[test]
fn id_returns_component_graph_source_id() {
let source = make_source();
assert_eq!(source.id(), "__component_graph__");
}
#[test]
fn type_name_returns_component_graph() {
let source = make_source();
assert_eq!(source.type_name(), "component_graph");
}
#[test]
fn auto_start_returns_true() {
let source = make_source();
assert!(source.auto_start());
}
#[tokio::test]
async fn initial_status_is_stopped() {
let source = make_source();
assert_eq!(source.status().await, ComponentStatus::Stopped);
}
#[test]
fn dispatch_mode_is_broadcast() {
let source = make_source();
assert_eq!(source.dispatch_mode(), DispatchMode::Broadcast);
}
#[test]
fn properties_contains_instance_id() {
let source = make_source();
let props = source.properties();
assert_eq!(
props.get("instance_id"),
Some(&serde_json::json!("test-instance"))
);
}
#[test]
fn as_any_downcasts_to_self() {
let source = make_source();
assert!(source.as_any().is::<ComponentGraphSource>());
}
}