use anyhow::Result;
use async_trait::async_trait;
use drasi_core::models::{ElementPropertyMap, ElementValue, SourceChange};
use log::info;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult};
use crate::channels::*;
use crate::component_graph::{ComponentGraph, ComponentKind, RelationshipKind};
use crate::config::SourceSubscriptionSettings;
use crate::sources::component_graph_source::COMPONENT_GRAPH_SOURCE_ID;
use crate::sources::graph_elements::{make_node, make_relation, status_str};
pub struct ComponentGraphBootstrapProvider {
graph: Arc<RwLock<ComponentGraph>>,
}
impl ComponentGraphBootstrapProvider {
pub fn new(graph: Arc<RwLock<ComponentGraph>>) -> Self {
Self { graph }
}
}
#[async_trait]
impl BootstrapProvider for ComponentGraphBootstrapProvider {
async fn bootstrap(
&self,
request: BootstrapRequest,
_context: &BootstrapContext,
event_tx: BootstrapEventSender,
_settings: Option<&SourceSubscriptionSettings>,
) -> Result<BootstrapResult> {
info!(
"Component graph bootstrap for query '{}' starting",
request.query_id
);
let snapshot = self.graph.read().await.snapshot();
let mut count: u64 = 0;
for node in &snapshot.nodes {
if node.id == COMPONENT_GRAPH_SOURCE_ID {
continue;
}
let (label, prefix) = match node.kind {
ComponentKind::Instance => ("DrasiInstance", "instance"),
ComponentKind::Source => ("Source", "source"),
ComponentKind::Query => ("Query", "query"),
ComponentKind::Reaction => ("Reaction", "reaction"),
ComponentKind::BootstrapProvider => continue,
ComponentKind::IdentityProvider => continue,
};
let node_id = format!("{prefix}:{}", node.id);
let mut props = ElementPropertyMap::new();
props.insert("id", ElementValue::String(Arc::from(node.id.as_str())));
props.insert(
"status",
ElementValue::String(Arc::from(status_str(&node.status))),
);
for (k, v) in &node.metadata {
props.insert(k, ElementValue::String(Arc::from(v.as_str())));
}
if matches!(node.kind, ComponentKind::Instance) {
props.insert("running", ElementValue::String(Arc::from("true")));
}
let element = make_node(&node_id, &[label], props);
if event_tx
.send(BootstrapEvent {
source_id: COMPONENT_GRAPH_SOURCE_ID.to_string(),
change: SourceChange::Insert { element },
timestamp: chrono::Utc::now(),
sequence: count,
})
.await
.is_err()
{
log::warn!(
"Bootstrap node event dropped (channel closed) for source '{COMPONENT_GRAPH_SOURCE_ID}'"
);
}
count += 1;
}
for edge in &snapshot.edges {
if edge.from == COMPONENT_GRAPH_SOURCE_ID || edge.to == COMPONENT_GRAPH_SOURCE_ID {
continue;
}
let (label, rel_id, from_prefix, to_prefix) = match &edge.relationship {
RelationshipKind::Owns => {
let to_node = snapshot.nodes.iter().find(|n| n.id == edge.to);
let (has_label, rel_prefix) = match to_node.map(|n| &n.kind) {
Some(ComponentKind::Source) => ("HAS_SOURCE", "has_source"),
Some(ComponentKind::Query) => ("HAS_QUERY", "has_query"),
Some(ComponentKind::Reaction) => ("HAS_REACTION", "has_reaction"),
_ => continue,
};
let rel_id = format!("rel:{rel_prefix}:{}:{}", edge.from, edge.to);
(has_label, rel_id, "instance", "")
}
RelationshipKind::Feeds => {
let from_node = snapshot.nodes.iter().find(|n| n.id == edge.from);
match from_node.map(|n| &n.kind) {
Some(ComponentKind::Source) => {
let rel_id = format!("rel:subscribes:{}:{}", edge.to, edge.from);
("SUBSCRIBES_TO", rel_id, "query", "source")
}
Some(ComponentKind::Query) => {
let rel_id = format!("rel:listens:{}:{}", edge.to, edge.from);
("LISTENS_TO", rel_id, "reaction", "query")
}
_ => continue,
}
}
_ => continue,
};
let (in_node_id, out_node_id) = match &edge.relationship {
RelationshipKind::Owns => {
let to_node = snapshot.nodes.iter().find(|n| n.id == edge.to);
let to_prefix = match to_node.map(|n| &n.kind) {
Some(ComponentKind::Source) => "source",
Some(ComponentKind::Query) => "query",
Some(ComponentKind::Reaction) => "reaction",
_ => continue,
};
(
format!("{from_prefix}:{}", edge.from),
format!("{to_prefix}:{}", edge.to),
)
}
RelationshipKind::Feeds => (
format!("{from_prefix}:{}", edge.to),
format!("{to_prefix}:{}", edge.from),
),
_ => continue,
};
let rel = make_relation(
&rel_id,
&[label],
&in_node_id,
&out_node_id,
ElementPropertyMap::new(),
);
if event_tx
.send(BootstrapEvent {
source_id: COMPONENT_GRAPH_SOURCE_ID.to_string(),
change: SourceChange::Insert { element: rel },
timestamp: chrono::Utc::now(),
sequence: count,
})
.await
.is_err()
{
log::warn!(
"Bootstrap edge event dropped (channel closed) for source '{COMPONENT_GRAPH_SOURCE_ID}'"
);
}
count += 1;
}
info!(
"Component graph bootstrap complete: {} elements for query '{}'",
count, request.query_id
);
Ok(BootstrapResult {
event_count: count as usize,
last_sequence: None,
sequences_aligned: false,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use drasi_core::models::Element;
use std::collections::HashMap;
use tokio::sync::mpsc;
fn make_request(query_id: &str) -> BootstrapRequest {
BootstrapRequest {
query_id: query_id.to_string(),
node_labels: vec![],
relation_labels: vec![],
request_id: "test-request".to_string(),
}
}
fn make_context() -> BootstrapContext {
BootstrapContext::new_minimal(
"test-server".to_string(),
COMPONENT_GRAPH_SOURCE_ID.to_string(),
)
}
#[test]
fn test_new_creates_provider_with_graph_reference() {
let (graph, _rx) = ComponentGraph::new("test-instance");
let graph = Arc::new(RwLock::new(graph));
let _provider = ComponentGraphBootstrapProvider::new(graph);
}
#[tokio::test]
async fn test_bootstrap_empty_graph() {
let (graph, _rx) = ComponentGraph::new("test-instance");
let graph = Arc::new(RwLock::new(graph));
let provider = ComponentGraphBootstrapProvider::new(graph);
let (tx, mut rx) = mpsc::channel::<BootstrapEvent>(100);
let request = make_request("test-query");
let context = make_context();
let result = provider
.bootstrap(request, &context, tx, None)
.await
.unwrap();
assert_eq!(result.event_count, 1);
let event = rx.recv().await.unwrap();
match &event.change {
SourceChange::Insert { element } => match element {
Element::Node { metadata, .. } => {
let labels: Vec<&str> = metadata.labels.iter().map(|l| l.as_ref()).collect();
assert!(labels.contains(&"DrasiInstance"));
}
_ => panic!("Expected Node element for instance"),
},
_ => panic!("Expected Insert change"),
}
}
#[tokio::test]
async fn test_bootstrap_with_sources_and_queries() {
let (mut graph, _rx) = ComponentGraph::new("test-instance");
graph.register_source("src1", HashMap::new()).unwrap();
graph.register_source("src2", HashMap::new()).unwrap();
graph
.register_query(
"q1",
HashMap::new(),
&["src1".to_string(), "src2".to_string()],
)
.unwrap();
let graph = Arc::new(RwLock::new(graph));
let provider = ComponentGraphBootstrapProvider::new(graph);
let (tx, mut rx) = mpsc::channel::<BootstrapEvent>(100);
let request = make_request("test-query");
let context = make_context();
let result = provider
.bootstrap(request, &context, tx, None)
.await
.unwrap();
let mut events = Vec::new();
while let Ok(event) = rx.try_recv() {
events.push(event);
}
assert_eq!(result.event_count, events.len());
let mut node_count = 0;
let mut relation_count = 0;
for event in &events {
match &event.change {
SourceChange::Insert { element } => match element {
Element::Node { .. } => node_count += 1,
Element::Relation { .. } => relation_count += 1,
},
_ => panic!("Expected Insert change"),
}
}
assert_eq!(node_count, 4);
assert_eq!(relation_count, 5);
assert_eq!(result.event_count, 9);
}
#[tokio::test]
async fn test_bootstrap_emitted_labels_match_component_kind() {
let (mut graph, _rx) = ComponentGraph::new("test-instance");
graph.register_source("my-source", HashMap::new()).unwrap();
graph
.register_query("my-query", HashMap::new(), &["my-source".to_string()])
.unwrap();
graph
.register_reaction("my-reaction", HashMap::new(), &["my-query".to_string()])
.unwrap();
let graph = Arc::new(RwLock::new(graph));
let provider = ComponentGraphBootstrapProvider::new(graph);
let (tx, mut rx) = mpsc::channel::<BootstrapEvent>(100);
let request = make_request("test-query");
let context = make_context();
provider
.bootstrap(request, &context, tx, None)
.await
.unwrap();
let mut events = Vec::new();
while let Ok(event) = rx.try_recv() {
events.push(event);
}
let mut found_instance = false;
let mut found_source = false;
let mut found_query = false;
let mut found_reaction = false;
for event in &events {
if let SourceChange::Insert {
element: Element::Node { metadata, .. },
} = &event.change
{
let labels: Vec<&str> = metadata.labels.iter().map(|l| l.as_ref()).collect();
if labels.contains(&"DrasiInstance") {
found_instance = true;
}
if labels.contains(&"Source") {
found_source = true;
}
if labels.contains(&"Query") {
found_query = true;
}
if labels.contains(&"Reaction") {
found_reaction = true;
}
}
}
assert!(found_instance, "Should emit DrasiInstance label");
assert!(found_source, "Should emit Source label");
assert!(found_query, "Should emit Query label");
assert!(found_reaction, "Should emit Reaction label");
}
}