use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use petgraph::stable_graph::{EdgeIndex, NodeIndex, StableGraph};
use petgraph::visit::EdgeRef;
use petgraph::Direction;
use tokio::sync::{broadcast, mpsc, Notify};
use crate::channels::{ComponentEvent, ComponentEventBroadcastReceiver, ComponentStatus};
use crate::managers::ComponentEventHistory;
use super::transaction::GraphTransaction;
use super::{
ComponentKind, ComponentNode, ComponentUpdate, ComponentUpdateReceiver, ComponentUpdateSender,
GraphEdge, GraphSnapshot, RelationshipKind,
};
pub struct ComponentGraph {
pub(super) graph: StableGraph<ComponentNode, RelationshipKind>,
pub(super) index: HashMap<String, NodeIndex>,
instance_idx: NodeIndex,
pub(super) event_tx: broadcast::Sender<ComponentEvent>,
update_tx: mpsc::Sender<ComponentUpdate>,
status_notify: Arc<Notify>,
runtimes: HashMap<String, Box<dyn Any + Send + Sync>>,
pub(super) event_history: ComponentEventHistory,
}
const EVENT_CHANNEL_CAPACITY: usize = 1000;
const UPDATE_CHANNEL_CAPACITY: usize = 1000;
impl ComponentGraph {
pub fn new(instance_id: &str) -> (Self, ComponentUpdateReceiver) {
let mut graph = StableGraph::new();
let instance_node = ComponentNode {
id: instance_id.to_string(),
kind: ComponentKind::Instance,
status: ComponentStatus::Running,
metadata: HashMap::new(),
};
let instance_idx = graph.add_node(instance_node);
let mut index = HashMap::new();
index.insert(instance_id.to_string(), instance_idx);
let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
let (update_tx, update_rx) = mpsc::channel(UPDATE_CHANNEL_CAPACITY);
(
Self {
graph,
index,
instance_idx,
event_tx,
update_tx,
status_notify: Arc::new(Notify::new()),
runtimes: HashMap::new(),
event_history: ComponentEventHistory::new(),
},
update_rx,
)
}
pub fn subscribe(&self) -> ComponentEventBroadcastReceiver {
self.event_tx.subscribe()
}
pub fn event_sender(&self) -> &broadcast::Sender<ComponentEvent> {
&self.event_tx
}
pub fn update_sender(&self) -> ComponentUpdateSender {
self.update_tx.clone()
}
pub fn status_notifier(&self) -> Arc<Notify> {
self.status_notify.clone()
}
pub fn apply_update(&mut self, update: ComponentUpdate) -> Option<ComponentEvent> {
match update {
ComponentUpdate::Status {
component_id,
status,
message,
} => match self.update_status_with_message(&component_id, status, message) {
Ok(event) => event,
Err(e) => {
tracing::debug!(
"Graph update loop: status update skipped for '{}': {e}",
component_id
);
None
}
},
}
}
fn emit_event(
&self,
component_id: &str,
kind: &ComponentKind,
status: ComponentStatus,
message: Option<String>,
) -> Option<ComponentEvent> {
if let Some(component_type) = kind.to_component_type() {
let event = ComponentEvent {
component_id: component_id.to_string(),
component_type,
status,
timestamp: chrono::Utc::now(),
message,
};
let _ = self.event_tx.send(event.clone());
Some(event)
} else {
None
}
}
pub fn instance_id(&self) -> &str {
&self.graph[self.instance_idx].id
}
pub fn set_runtime(
&mut self,
id: &str,
runtime: Box<dyn Any + Send + Sync>,
) -> anyhow::Result<()> {
if !self.index.contains_key(id) {
return Err(anyhow::anyhow!(
"set_runtime called for component '{id}' which is not in the graph"
));
}
#[cfg(debug_assertions)]
if let Some(node) = self.get_component(id) {
let kind = &node.kind;
let type_ok = match kind {
ComponentKind::Source => runtime
.downcast_ref::<std::sync::Arc<dyn crate::sources::Source>>()
.is_some(),
ComponentKind::Query => runtime
.downcast_ref::<std::sync::Arc<dyn crate::queries::manager::Query>>()
.is_some(),
ComponentKind::Reaction => runtime
.downcast_ref::<std::sync::Arc<dyn crate::reactions::Reaction>>()
.is_some(),
_ => true,
};
if !type_ok {
tracing::warn!(
"set_runtime: possible type mismatch for component '{id}' (kind={kind})"
);
}
}
self.runtimes.insert(id.to_string(), runtime);
Ok(())
}
pub fn get_runtime<T: 'static>(&self, id: &str) -> Option<&T> {
self.runtimes.get(id).and_then(|r| r.downcast_ref::<T>())
}
pub fn take_runtime<T: 'static>(&mut self, id: &str) -> Option<T> {
let runtime = self.runtimes.remove(id)?;
match runtime.downcast::<T>() {
Ok(boxed) => Some(*boxed),
Err(runtime) => {
tracing::error!(
"take_runtime: type mismatch for component '{id}', putting runtime back"
);
self.runtimes.insert(id.to_string(), runtime);
None
}
}
}
pub fn has_runtime(&self, id: &str) -> bool {
self.runtimes.contains_key(id)
}
pub fn add_component(&mut self, node: ComponentNode) -> anyhow::Result<NodeIndex> {
let (node_idx, event, _, _) = self.add_component_internal(node)?;
if let Some(event) = event {
let _ = self.event_tx.send(event.clone());
self.event_history.record_event(event);
}
Ok(node_idx)
}
pub(super) fn add_component_internal(
&mut self,
node: ComponentNode,
) -> anyhow::Result<(NodeIndex, Option<ComponentEvent>, EdgeIndex, EdgeIndex)> {
if self.index.contains_key(&node.id) {
return Err(anyhow::anyhow!(
"{} '{}' already exists in the graph",
node.kind,
node.id
));
}
let id = node.id.clone();
let kind = node.kind.clone();
let status = node.status;
let node_idx = self.graph.add_node(node);
self.index.insert(id.clone(), node_idx);
let owns_edge = self
.graph
.add_edge(self.instance_idx, node_idx, RelationshipKind::Owns);
let owned_by_edge =
self.graph
.add_edge(node_idx, self.instance_idx, RelationshipKind::OwnedBy);
let event = kind
.to_component_type()
.map(|component_type| ComponentEvent {
component_id: id,
component_type,
status,
timestamp: chrono::Utc::now(),
message: Some(format!("{kind} added")),
});
Ok((node_idx, event, owns_edge, owned_by_edge))
}
pub fn remove_component(&mut self, id: &str) -> anyhow::Result<ComponentNode> {
let node_idx = self
.index
.get(id)
.copied()
.ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in graph"))?;
if node_idx == self.instance_idx {
return Err(anyhow::anyhow!("Cannot remove the instance root node"));
}
let kind = self.graph[node_idx].kind.clone();
self.index.remove(id);
self.runtimes.remove(id);
self.event_history.remove_component(id);
let removed = self
.graph
.remove_node(node_idx)
.ok_or_else(|| anyhow::anyhow!("Component '{id}' already removed"))?;
self.emit_event(
id,
&kind,
ComponentStatus::Removed,
Some(format!("{kind} removed")),
);
Ok(removed)
}
pub fn get_component(&self, id: &str) -> Option<&ComponentNode> {
self.index
.get(id)
.and_then(|idx| self.graph.node_weight(*idx))
}
pub fn get_component_mut(&mut self, id: &str) -> Option<&mut ComponentNode> {
self.index
.get(id)
.copied()
.and_then(|idx| self.graph.node_weight_mut(idx))
}
pub fn contains(&self, id: &str) -> bool {
self.index.contains_key(id)
}
pub fn list_by_kind(&self, kind: &ComponentKind) -> Vec<(String, ComponentStatus)> {
self.graph
.node_weights()
.filter(|node| &node.kind == kind)
.map(|node| (node.id.clone(), node.status))
.collect()
}
pub(super) fn update_status(
&mut self,
id: &str,
status: ComponentStatus,
) -> anyhow::Result<Option<ComponentEvent>> {
self.update_status_with_message(id, status, None)
}
fn update_status_with_message(
&mut self,
id: &str,
status: ComponentStatus,
message: Option<String>,
) -> anyhow::Result<Option<ComponentEvent>> {
let node = self
.get_component_mut(id)
.ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in graph"))?;
let kind = node.kind.clone();
if node.status == status {
return Ok(None);
}
if !is_valid_transition(&node.status, &status) {
tracing::warn!(
"Invalid state transition for component '{}': {:?} → {:?}, ignoring update",
id,
node.status,
status
);
return Ok(None);
}
node.status = status;
self.status_notify.notify_waiters();
let event = self.emit_event(id, &kind, status, message);
if let Some(ref event) = event {
self.event_history.record_event(event.clone());
}
Ok(event)
}
pub fn add_relationship(
&mut self,
from_id: &str,
to_id: &str,
forward: RelationshipKind,
) -> anyhow::Result<()> {
let (_, _) = self.add_relationship_internal(from_id, to_id, forward)?;
Ok(())
}
pub(super) fn add_relationship_internal(
&mut self,
from_id: &str,
to_id: &str,
forward: RelationshipKind,
) -> anyhow::Result<(Option<EdgeIndex>, Option<EdgeIndex>)> {
let from_idx = self
.index
.get(from_id)
.copied()
.ok_or_else(|| anyhow::anyhow!("Component '{from_id}' not found in graph"))?;
let to_idx = self
.index
.get(to_id)
.copied()
.ok_or_else(|| anyhow::anyhow!("Component '{to_id}' not found in graph"))?;
let from_kind = &self.graph[from_idx].kind;
let to_kind = &self.graph[to_idx].kind;
if !is_valid_relationship(from_kind, to_kind, &forward) {
return Err(anyhow::anyhow!(
"Invalid relationship: {forward:?} from {from_kind} '{from_id}' to {to_kind} '{to_id}'"
));
}
let already_exists = self
.graph
.edges_directed(from_idx, Direction::Outgoing)
.any(|e| e.target() == to_idx && e.weight() == &forward);
if already_exists {
return Ok((None, None));
}
let reverse = forward.reverse();
let fwd_edge = self.graph.add_edge(from_idx, to_idx, forward);
let rev_edge = self.graph.add_edge(to_idx, from_idx, reverse);
Ok((Some(fwd_edge), Some(rev_edge)))
}
pub fn remove_relationship(
&mut self,
from_id: &str,
to_id: &str,
forward: RelationshipKind,
) -> anyhow::Result<()> {
let from_idx = self
.index
.get(from_id)
.copied()
.ok_or_else(|| anyhow::anyhow!("Component '{from_id}' not found in graph"))?;
let to_idx = self
.index
.get(to_id)
.copied()
.ok_or_else(|| anyhow::anyhow!("Component '{to_id}' not found in graph"))?;
let reverse = forward.reverse();
let forward_edge = self
.graph
.edges_directed(from_idx, Direction::Outgoing)
.find(|e| e.target() == to_idx && e.weight() == &forward)
.map(|e| e.id());
if let Some(edge_id) = forward_edge {
self.graph.remove_edge(edge_id);
}
let reverse_edge = self
.graph
.edges_directed(to_idx, Direction::Outgoing)
.find(|e| e.target() == from_idx && e.weight() == &reverse)
.map(|e| e.id());
if let Some(edge_id) = reverse_edge {
self.graph.remove_edge(edge_id);
}
Ok(())
}
pub fn get_neighbors(&self, id: &str, relationship: &RelationshipKind) -> Vec<&ComponentNode> {
let Some(&node_idx) = self.index.get(id) else {
return Vec::new();
};
self.graph
.edges_directed(node_idx, Direction::Outgoing)
.filter(|edge| edge.weight() == relationship)
.filter_map(|edge| self.graph.node_weight(edge.target()))
.collect()
}
pub fn get_dependents(&self, id: &str) -> Vec<&ComponentNode> {
let Some(&node_idx) = self.index.get(id) else {
return Vec::new();
};
self.graph
.edges_directed(node_idx, Direction::Outgoing)
.filter(|edge| matches!(edge.weight(), RelationshipKind::Feeds))
.filter_map(|edge| self.graph.node_weight(edge.target()))
.collect()
}
pub fn get_dependencies(&self, id: &str) -> Vec<&ComponentNode> {
let Some(&node_idx) = self.index.get(id) else {
return Vec::new();
};
self.graph
.edges_directed(node_idx, Direction::Outgoing)
.filter(|edge| matches!(edge.weight(), RelationshipKind::SubscribesTo))
.filter_map(|edge| self.graph.node_weight(edge.target()))
.collect()
}
pub fn can_remove(&self, id: &str) -> Result<(), Vec<String>> {
let dependents = self.get_dependents(id);
if dependents.is_empty() {
Ok(())
} else {
Err(dependents.iter().map(|n| n.id.clone()).collect())
}
}
pub fn validate_and_transition(
&mut self,
id: &str,
target_status: ComponentStatus,
message: Option<String>,
) -> anyhow::Result<Option<ComponentEvent>> {
let node = self
.get_component(id)
.ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in graph"))?;
let current = node.status;
if current == target_status {
return Ok(None);
}
if !is_valid_transition(¤t, &target_status) {
let reason = describe_invalid_transition(id, ¤t, &target_status);
return Err(anyhow::anyhow!(reason));
}
self.update_status_with_message(id, target_status, message)
}
pub fn topological_order(&self) -> anyhow::Result<Vec<&ComponentNode>> {
let mut order_graph: StableGraph<(), ()> = StableGraph::new();
let mut idx_map: HashMap<NodeIndex, NodeIndex> = HashMap::new();
for node_idx in self.graph.node_indices() {
let new_idx = order_graph.add_node(());
idx_map.insert(node_idx, new_idx);
}
for edge_idx in self.graph.edge_indices() {
if let Some(weight) = self.graph.edge_weight(edge_idx) {
if matches!(weight, RelationshipKind::Feeds) {
if let Some((from, to)) = self.graph.edge_endpoints(edge_idx) {
if let (Some(&new_from), Some(&new_to)) =
(idx_map.get(&from), idx_map.get(&to))
{
order_graph.add_edge(new_from, new_to, ());
}
}
}
}
}
let reverse_map: HashMap<NodeIndex, NodeIndex> =
idx_map.iter().map(|(&orig, &new)| (new, orig)).collect();
match petgraph::algo::toposort(&order_graph, None) {
Ok(sorted) => Ok(sorted
.into_iter()
.filter_map(|new_idx| reverse_map.get(&new_idx))
.filter(|idx| **idx != self.instance_idx)
.filter_map(|idx| self.graph.node_weight(*idx))
.collect()),
Err(_cycle) => Err(anyhow::anyhow!(
"Cycle detected in component graph — cannot determine lifecycle order"
)),
}
}
pub fn snapshot(&self) -> GraphSnapshot {
let nodes: Vec<ComponentNode> = self.graph.node_weights().cloned().collect();
let edges: Vec<GraphEdge> = self
.graph
.edge_indices()
.filter_map(|edge_idx| {
let (from_idx, to_idx) = self.graph.edge_endpoints(edge_idx)?;
let from = self.graph.node_weight(from_idx)?;
let to = self.graph.node_weight(to_idx)?;
let relationship = self.graph.edge_weight(edge_idx)?;
Some(GraphEdge {
from: from.id.clone(),
to: to.id.clone(),
relationship: relationship.clone(),
})
})
.collect();
GraphSnapshot {
instance_id: self.instance_id().to_string(),
nodes,
edges,
}
}
pub fn node_count(&self) -> usize {
self.graph.node_count()
}
pub fn edge_count(&self) -> usize {
self.graph.edge_count()
}
pub fn register_source(
&mut self,
id: &str,
metadata: HashMap<String, String>,
) -> anyhow::Result<()> {
let node = ComponentNode {
id: id.to_string(),
kind: ComponentKind::Source,
status: ComponentStatus::Added,
metadata,
};
let mut txn = self.begin();
txn.add_component(node)?;
txn.commit();
Ok(())
}
pub fn register_query(
&mut self,
id: &str,
metadata: HashMap<String, String>,
source_ids: &[String],
) -> anyhow::Result<()> {
for source_id in source_ids {
if !self.contains(source_id) {
return Err(anyhow::anyhow!(
"Cannot register query '{id}': referenced source '{source_id}' does not exist in the graph"
));
}
}
let node = ComponentNode {
id: id.to_string(),
kind: ComponentKind::Query,
status: ComponentStatus::Added,
metadata,
};
let mut txn = self.begin();
txn.add_component(node)?;
for source_id in source_ids {
txn.add_relationship(source_id, id, RelationshipKind::Feeds)?;
}
txn.commit();
Ok(())
}
pub fn register_reaction(
&mut self,
id: &str,
metadata: HashMap<String, String>,
query_ids: &[String],
) -> anyhow::Result<()> {
for query_id in query_ids {
if !self.contains(query_id) {
return Err(anyhow::anyhow!(
"Cannot register reaction '{id}': referenced query '{query_id}' does not exist in the graph"
));
}
}
let node = ComponentNode {
id: id.to_string(),
kind: ComponentKind::Reaction,
status: ComponentStatus::Added,
metadata,
};
let mut txn = self.begin();
txn.add_component(node)?;
for query_id in query_ids {
txn.add_relationship(query_id, id, RelationshipKind::Feeds)?;
}
txn.commit();
Ok(())
}
pub fn deregister(&mut self, id: &str) -> anyhow::Result<ComponentNode> {
if let Err(dependent_ids) = self.can_remove(id) {
return Err(anyhow::anyhow!(
"Cannot deregister '{}': depended on by: {}",
id,
dependent_ids.join(", ")
));
}
self.remove_component(id)
}
pub fn register_bootstrap_provider(
&mut self,
id: &str,
metadata: HashMap<String, String>,
source_ids: &[String],
) -> anyhow::Result<()> {
for source_id in source_ids {
if !self.contains(source_id) {
return Err(anyhow::anyhow!(
"Cannot register bootstrap provider '{id}': referenced source '{source_id}' does not exist in the graph"
));
}
}
let node = ComponentNode {
id: id.to_string(),
kind: ComponentKind::BootstrapProvider,
status: ComponentStatus::Added,
metadata,
};
let mut txn = self.begin();
txn.add_component(node)?;
for source_id in source_ids {
txn.add_relationship(id, source_id, RelationshipKind::Bootstraps)?;
}
txn.commit();
Ok(())
}
pub fn register_identity_provider(
&mut self,
id: &str,
metadata: HashMap<String, String>,
component_ids: &[String],
) -> anyhow::Result<()> {
for component_id in component_ids {
if !self.contains(component_id) {
return Err(anyhow::anyhow!(
"Cannot register identity provider '{id}': referenced component '{component_id}' does not exist in the graph"
));
}
}
let node = ComponentNode {
id: id.to_string(),
kind: ComponentKind::IdentityProvider,
status: ComponentStatus::Added,
metadata,
};
let mut txn = self.begin();
txn.add_component(node)?;
for component_id in component_ids {
txn.add_relationship(id, component_id, RelationshipKind::Authenticates)?;
}
txn.commit();
Ok(())
}
pub fn begin(&mut self) -> GraphTransaction<'_> {
GraphTransaction::new(self)
}
pub fn record_event(&mut self, event: ComponentEvent) {
self.event_history.record_event(event);
}
pub fn get_events(&self, component_id: &str) -> Vec<ComponentEvent> {
self.event_history.get_events(component_id)
}
pub fn get_all_events(&self) -> Vec<ComponentEvent> {
self.event_history.get_all_events()
}
pub fn get_last_error(&self, component_id: &str) -> Option<String> {
self.event_history.get_last_error(component_id)
}
pub fn subscribe_events(
&self,
component_id: &str,
) -> Option<(Vec<ComponentEvent>, broadcast::Receiver<ComponentEvent>)> {
self.event_history.try_subscribe(component_id)
}
}
pub(super) fn is_valid_relationship(
from_kind: &ComponentKind,
to_kind: &ComponentKind,
relationship: &RelationshipKind,
) -> bool {
use ComponentKind::*;
use RelationshipKind::*;
matches!(
(from_kind, to_kind, relationship),
(Source, Query, Feeds)
| (Query, Reaction, Feeds)
| (Instance, _, Owns)
| (BootstrapProvider, Source, Bootstraps)
| (IdentityProvider, _, Authenticates)
)
}
pub(super) fn is_valid_transition(from: &ComponentStatus, to: &ComponentStatus) -> bool {
use ComponentStatus::*;
matches!(
(from, to),
(Added, Starting)
| (Added, Stopped) | (Stopped, Starting)
| (Starting, Running)
| (Starting, Error)
| (Starting, Stopped) | (Running, Stopping)
| (Running, Stopped) | (Running, Error)
| (Stopping, Stopped)
| (Stopping, Error)
| (Error, Starting) | (Error, Stopped) | (Added, Reconfiguring)
| (Stopped, Reconfiguring)
| (Running, Reconfiguring)
| (Error, Reconfiguring)
| (Reconfiguring, Stopped)
| (Reconfiguring, Starting)
| (Reconfiguring, Error)
)
}
fn describe_invalid_transition(id: &str, from: &ComponentStatus, to: &ComponentStatus) -> String {
use ComponentStatus::*;
match (from, to) {
(Starting, Starting) => format!("Component '{id}' is already starting"),
(Running, Starting) => format!("Component '{id}' is already running"),
(Stopping, Starting) => {
format!("Cannot start component '{id}' while it is stopping")
}
(Reconfiguring, Starting) => {
format!("Cannot start component '{id}' while it is reconfiguring")
}
(Stopped, Stopping) => {
format!("Cannot stop component '{id}': it is already stopped")
}
(Stopping, Stopping) => format!("Component '{id}' is already stopping"),
(Error, Stopping) => {
format!("Cannot stop component '{id}': it is in error state")
}
(Starting, Reconfiguring) => {
format!("Cannot reconfigure component '{id}' while it is starting")
}
(Stopping, Reconfiguring) => {
format!("Cannot reconfigure component '{id}' while it is stopping")
}
(Reconfiguring, Reconfiguring) => {
format!("Component '{id}' is already reconfiguring")
}
_ => format!("Invalid state transition for component '{id}': {from:?} → {to:?}"),
}
}
impl std::fmt::Debug for ComponentGraph {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ComponentGraph")
.field("instance_id", &self.instance_id())
.field("node_count", &self.node_count())
.field("edge_count", &self.edge_count())
.finish()
}
}