1use anyhow::Result;
23use async_trait::async_trait;
24use drasi_core::models::{ElementPropertyMap, ElementValue, SourceChange};
25use log::info;
26use std::sync::Arc;
27use tokio::sync::RwLock;
28
29use crate::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest};
30use crate::channels::*;
31use crate::component_graph::{ComponentGraph, ComponentKind, RelationshipKind};
32use crate::config::SourceSubscriptionSettings;
33use crate::sources::component_graph_source::COMPONENT_GRAPH_SOURCE_ID;
34use crate::sources::graph_elements::{make_node, make_relation, status_str};
35
36pub struct ComponentGraphBootstrapProvider {
42 graph: Arc<RwLock<ComponentGraph>>,
43}
44
45impl ComponentGraphBootstrapProvider {
46 pub fn new(graph: Arc<RwLock<ComponentGraph>>) -> Self {
47 Self { graph }
48 }
49}
50
51#[async_trait]
52impl BootstrapProvider for ComponentGraphBootstrapProvider {
53 async fn bootstrap(
54 &self,
55 request: BootstrapRequest,
56 _context: &BootstrapContext,
57 event_tx: BootstrapEventSender,
58 _settings: Option<&SourceSubscriptionSettings>,
59 ) -> Result<usize> {
60 info!(
61 "Component graph bootstrap for query '{}' starting",
62 request.query_id
63 );
64
65 let snapshot = self.graph.read().await.snapshot();
67 let mut count: u64 = 0;
68
69 for node in &snapshot.nodes {
71 if node.id == COMPONENT_GRAPH_SOURCE_ID {
73 continue;
74 }
75
76 let (label, prefix) = match node.kind {
77 ComponentKind::Instance => ("DrasiInstance", "instance"),
78 ComponentKind::Source => ("Source", "source"),
79 ComponentKind::Query => ("Query", "query"),
80 ComponentKind::Reaction => ("Reaction", "reaction"),
81 ComponentKind::BootstrapProvider => continue,
82 ComponentKind::IdentityProvider => continue,
83 };
84
85 let node_id = format!("{prefix}:{}", node.id);
86
87 let mut props = ElementPropertyMap::new();
89 props.insert("id", ElementValue::String(Arc::from(node.id.as_str())));
90 props.insert(
91 "status",
92 ElementValue::String(Arc::from(status_str(&node.status))),
93 );
94
95 for (k, v) in &node.metadata {
97 props.insert(k, ElementValue::String(Arc::from(v.as_str())));
98 }
99
100 if matches!(node.kind, ComponentKind::Instance) {
102 props.insert("running", ElementValue::String(Arc::from("true")));
103 }
104
105 let element = make_node(&node_id, &[label], props);
106 if event_tx
107 .send(BootstrapEvent {
108 source_id: COMPONENT_GRAPH_SOURCE_ID.to_string(),
109 change: SourceChange::Insert { element },
110 timestamp: chrono::Utc::now(),
111 sequence: count,
112 })
113 .await
114 .is_err()
115 {
116 log::warn!(
117 "Bootstrap node event dropped (channel closed) for source '{COMPONENT_GRAPH_SOURCE_ID}'"
118 );
119 }
120 count += 1;
121 }
122
123 for edge in &snapshot.edges {
125 if edge.from == COMPONENT_GRAPH_SOURCE_ID || edge.to == COMPONENT_GRAPH_SOURCE_ID {
127 continue;
128 }
129
130 let (label, rel_id, from_prefix, to_prefix) = match &edge.relationship {
131 RelationshipKind::Owns => {
132 let to_node = snapshot.nodes.iter().find(|n| n.id == edge.to);
134 let (has_label, rel_prefix) = match to_node.map(|n| &n.kind) {
135 Some(ComponentKind::Source) => ("HAS_SOURCE", "has_source"),
136 Some(ComponentKind::Query) => ("HAS_QUERY", "has_query"),
137 Some(ComponentKind::Reaction) => ("HAS_REACTION", "has_reaction"),
138 _ => continue,
139 };
140 let rel_id = format!("rel:{rel_prefix}:{}:{}", edge.from, edge.to);
141 (has_label, rel_id, "instance", "")
142 }
143 RelationshipKind::Feeds => {
144 let from_node = snapshot.nodes.iter().find(|n| n.id == edge.from);
146 match from_node.map(|n| &n.kind) {
147 Some(ComponentKind::Source) => {
148 let rel_id = format!("rel:subscribes:{}:{}", edge.to, edge.from);
150 ("SUBSCRIBES_TO", rel_id, "query", "source")
151 }
152 Some(ComponentKind::Query) => {
153 let rel_id = format!("rel:listens:{}:{}", edge.to, edge.from);
155 ("LISTENS_TO", rel_id, "reaction", "query")
156 }
157 _ => continue,
158 }
159 }
160 _ => continue,
162 };
163
164 let (in_node_id, out_node_id) = match &edge.relationship {
165 RelationshipKind::Owns => {
166 let to_node = snapshot.nodes.iter().find(|n| n.id == edge.to);
167 let to_prefix = match to_node.map(|n| &n.kind) {
168 Some(ComponentKind::Source) => "source",
169 Some(ComponentKind::Query) => "query",
170 Some(ComponentKind::Reaction) => "reaction",
171 _ => continue,
172 };
173 (
174 format!("{from_prefix}:{}", edge.from),
175 format!("{to_prefix}:{}", edge.to),
176 )
177 }
178 RelationshipKind::Feeds => (
179 format!("{from_prefix}:{}", edge.to),
180 format!("{to_prefix}:{}", edge.from),
181 ),
182 _ => continue,
183 };
184
185 let rel = make_relation(
186 &rel_id,
187 &[label],
188 &in_node_id,
189 &out_node_id,
190 ElementPropertyMap::new(),
191 );
192 if event_tx
193 .send(BootstrapEvent {
194 source_id: COMPONENT_GRAPH_SOURCE_ID.to_string(),
195 change: SourceChange::Insert { element: rel },
196 timestamp: chrono::Utc::now(),
197 sequence: count,
198 })
199 .await
200 .is_err()
201 {
202 log::warn!(
203 "Bootstrap edge event dropped (channel closed) for source '{COMPONENT_GRAPH_SOURCE_ID}'"
204 );
205 }
206 count += 1;
207 }
208
209 info!(
210 "Component graph bootstrap complete: {} elements for query '{}'",
211 count, request.query_id
212 );
213 Ok(count as usize)
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220 use drasi_core::models::Element;
221 use std::collections::HashMap;
222 use tokio::sync::mpsc;
223
224 fn make_request(query_id: &str) -> BootstrapRequest {
225 BootstrapRequest {
226 query_id: query_id.to_string(),
227 node_labels: vec![],
228 relation_labels: vec![],
229 request_id: "test-request".to_string(),
230 }
231 }
232
233 fn make_context() -> BootstrapContext {
234 BootstrapContext::new_minimal(
235 "test-server".to_string(),
236 COMPONENT_GRAPH_SOURCE_ID.to_string(),
237 )
238 }
239
240 #[test]
241 fn test_new_creates_provider_with_graph_reference() {
242 let (graph, _rx) = ComponentGraph::new("test-instance");
243 let graph = Arc::new(RwLock::new(graph));
244 let _provider = ComponentGraphBootstrapProvider::new(graph);
245 }
246
247 #[tokio::test]
248 async fn test_bootstrap_empty_graph() {
249 let (graph, _rx) = ComponentGraph::new("test-instance");
250 let graph = Arc::new(RwLock::new(graph));
251 let provider = ComponentGraphBootstrapProvider::new(graph);
252
253 let (tx, mut rx) = mpsc::channel::<BootstrapEvent>(100);
254 let request = make_request("test-query");
255 let context = make_context();
256
257 let count = provider
258 .bootstrap(request, &context, tx, None)
259 .await
260 .unwrap();
261
262 assert_eq!(count, 1);
264
265 let event = rx.recv().await.unwrap();
266 match &event.change {
267 SourceChange::Insert { element } => match element {
268 Element::Node { metadata, .. } => {
269 let labels: Vec<&str> = metadata.labels.iter().map(|l| l.as_ref()).collect();
270 assert!(labels.contains(&"DrasiInstance"));
271 }
272 _ => panic!("Expected Node element for instance"),
273 },
274 _ => panic!("Expected Insert change"),
275 }
276 }
277
278 #[tokio::test]
279 async fn test_bootstrap_with_sources_and_queries() {
280 let (mut graph, _rx) = ComponentGraph::new("test-instance");
281 graph.register_source("src1", HashMap::new()).unwrap();
282 graph.register_source("src2", HashMap::new()).unwrap();
283 graph
284 .register_query(
285 "q1",
286 HashMap::new(),
287 &["src1".to_string(), "src2".to_string()],
288 )
289 .unwrap();
290
291 let graph = Arc::new(RwLock::new(graph));
292 let provider = ComponentGraphBootstrapProvider::new(graph);
293
294 let (tx, mut rx) = mpsc::channel::<BootstrapEvent>(100);
295 let request = make_request("test-query");
296 let context = make_context();
297
298 let count = provider
299 .bootstrap(request, &context, tx, None)
300 .await
301 .unwrap();
302
303 let mut events = Vec::new();
304 while let Ok(event) = rx.try_recv() {
305 events.push(event);
306 }
307
308 assert_eq!(count, events.len());
309
310 let mut node_count = 0;
311 let mut relation_count = 0;
312 for event in &events {
313 match &event.change {
314 SourceChange::Insert { element } => match element {
315 Element::Node { .. } => node_count += 1,
316 Element::Relation { .. } => relation_count += 1,
317 },
318 _ => panic!("Expected Insert change"),
319 }
320 }
321
322 assert_eq!(node_count, 4);
324 assert_eq!(relation_count, 5);
326 assert_eq!(count, 9);
327 }
328
329 #[tokio::test]
330 async fn test_bootstrap_emitted_labels_match_component_kind() {
331 let (mut graph, _rx) = ComponentGraph::new("test-instance");
332 graph.register_source("my-source", HashMap::new()).unwrap();
333 graph
334 .register_query("my-query", HashMap::new(), &["my-source".to_string()])
335 .unwrap();
336 graph
337 .register_reaction("my-reaction", HashMap::new(), &["my-query".to_string()])
338 .unwrap();
339
340 let graph = Arc::new(RwLock::new(graph));
341 let provider = ComponentGraphBootstrapProvider::new(graph);
342
343 let (tx, mut rx) = mpsc::channel::<BootstrapEvent>(100);
344 let request = make_request("test-query");
345 let context = make_context();
346
347 provider
348 .bootstrap(request, &context, tx, None)
349 .await
350 .unwrap();
351
352 let mut events = Vec::new();
353 while let Ok(event) = rx.try_recv() {
354 events.push(event);
355 }
356
357 let mut found_instance = false;
358 let mut found_source = false;
359 let mut found_query = false;
360 let mut found_reaction = false;
361
362 for event in &events {
363 if let SourceChange::Insert {
364 element: Element::Node { metadata, .. },
365 } = &event.change
366 {
367 let labels: Vec<&str> = metadata.labels.iter().map(|l| l.as_ref()).collect();
368 if labels.contains(&"DrasiInstance") {
369 found_instance = true;
370 }
371 if labels.contains(&"Source") {
372 found_source = true;
373 }
374 if labels.contains(&"Query") {
375 found_query = true;
376 }
377 if labels.contains(&"Reaction") {
378 found_reaction = true;
379 }
380 }
381 }
382
383 assert!(found_instance, "Should emit DrasiInstance label");
384 assert!(found_source, "Should emit Source label");
385 assert!(found_query, "Should emit Query label");
386 assert!(found_reaction, "Should emit Reaction label");
387 }
388}