1use anyhow::Result;
23use async_trait::async_trait;
24use drasi_core::models::{ElementPropertyMap, ElementValue, SourceChange};
25use log::info;
26use std::sync::Arc;
27use tokio::sync::RwLock;
28
29use crate::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult};
30use crate::channels::*;
31use crate::component_graph::{ComponentGraph, ComponentKind, RelationshipKind};
32use crate::config::SourceSubscriptionSettings;
33use crate::sources::component_graph_source::COMPONENT_GRAPH_SOURCE_ID;
34use crate::sources::graph_elements::{make_node, make_relation, status_str};
35
36pub struct ComponentGraphBootstrapProvider {
42 graph: Arc<RwLock<ComponentGraph>>,
43}
44
45impl ComponentGraphBootstrapProvider {
46 pub fn new(graph: Arc<RwLock<ComponentGraph>>) -> Self {
47 Self { graph }
48 }
49}
50
51#[async_trait]
52impl BootstrapProvider for ComponentGraphBootstrapProvider {
53 async fn bootstrap(
54 &self,
55 request: BootstrapRequest,
56 _context: &BootstrapContext,
57 event_tx: BootstrapEventSender,
58 _settings: Option<&SourceSubscriptionSettings>,
59 ) -> Result<BootstrapResult> {
60 info!(
61 "Component graph bootstrap for query '{}' starting",
62 request.query_id
63 );
64
65 let snapshot = self.graph.read().await.snapshot();
67 let mut count: u64 = 0;
68
69 for node in &snapshot.nodes {
71 if node.id == COMPONENT_GRAPH_SOURCE_ID {
73 continue;
74 }
75
76 let (label, prefix) = match node.kind {
77 ComponentKind::Instance => ("DrasiInstance", "instance"),
78 ComponentKind::Source => ("Source", "source"),
79 ComponentKind::Query => ("Query", "query"),
80 ComponentKind::Reaction => ("Reaction", "reaction"),
81 ComponentKind::BootstrapProvider => continue,
82 ComponentKind::IdentityProvider => continue,
83 };
84
85 let node_id = format!("{prefix}:{}", node.id);
86
87 let mut props = ElementPropertyMap::new();
89 props.insert("id", ElementValue::String(Arc::from(node.id.as_str())));
90 props.insert(
91 "status",
92 ElementValue::String(Arc::from(status_str(&node.status))),
93 );
94
95 for (k, v) in &node.metadata {
97 props.insert(k, ElementValue::String(Arc::from(v.as_str())));
98 }
99
100 if matches!(node.kind, ComponentKind::Instance) {
102 props.insert("running", ElementValue::String(Arc::from("true")));
103 }
104
105 let element = make_node(&node_id, &[label], props);
106 if event_tx
107 .send(BootstrapEvent {
108 source_id: COMPONENT_GRAPH_SOURCE_ID.to_string(),
109 change: SourceChange::Insert { element },
110 timestamp: chrono::Utc::now(),
111 sequence: count,
112 })
113 .await
114 .is_err()
115 {
116 log::warn!(
117 "Bootstrap node event dropped (channel closed) for source '{COMPONENT_GRAPH_SOURCE_ID}'"
118 );
119 }
120 count += 1;
121 }
122
123 for edge in &snapshot.edges {
125 if edge.from == COMPONENT_GRAPH_SOURCE_ID || edge.to == COMPONENT_GRAPH_SOURCE_ID {
127 continue;
128 }
129
130 let (label, rel_id, from_prefix, to_prefix) = match &edge.relationship {
131 RelationshipKind::Owns => {
132 let to_node = snapshot.nodes.iter().find(|n| n.id == edge.to);
134 let (has_label, rel_prefix) = match to_node.map(|n| &n.kind) {
135 Some(ComponentKind::Source) => ("HAS_SOURCE", "has_source"),
136 Some(ComponentKind::Query) => ("HAS_QUERY", "has_query"),
137 Some(ComponentKind::Reaction) => ("HAS_REACTION", "has_reaction"),
138 _ => continue,
139 };
140 let rel_id = format!("rel:{rel_prefix}:{}:{}", edge.from, edge.to);
141 (has_label, rel_id, "instance", "")
142 }
143 RelationshipKind::Feeds => {
144 let from_node = snapshot.nodes.iter().find(|n| n.id == edge.from);
146 match from_node.map(|n| &n.kind) {
147 Some(ComponentKind::Source) => {
148 let rel_id = format!("rel:subscribes:{}:{}", edge.to, edge.from);
150 ("SUBSCRIBES_TO", rel_id, "query", "source")
151 }
152 Some(ComponentKind::Query) => {
153 let rel_id = format!("rel:listens:{}:{}", edge.to, edge.from);
155 ("LISTENS_TO", rel_id, "reaction", "query")
156 }
157 _ => continue,
158 }
159 }
160 _ => continue,
162 };
163
164 let (in_node_id, out_node_id) = match &edge.relationship {
165 RelationshipKind::Owns => {
166 let to_node = snapshot.nodes.iter().find(|n| n.id == edge.to);
167 let to_prefix = match to_node.map(|n| &n.kind) {
168 Some(ComponentKind::Source) => "source",
169 Some(ComponentKind::Query) => "query",
170 Some(ComponentKind::Reaction) => "reaction",
171 _ => continue,
172 };
173 (
174 format!("{from_prefix}:{}", edge.from),
175 format!("{to_prefix}:{}", edge.to),
176 )
177 }
178 RelationshipKind::Feeds => (
179 format!("{from_prefix}:{}", edge.to),
180 format!("{to_prefix}:{}", edge.from),
181 ),
182 _ => continue,
183 };
184
185 let rel = make_relation(
186 &rel_id,
187 &[label],
188 &in_node_id,
189 &out_node_id,
190 ElementPropertyMap::new(),
191 );
192 if event_tx
193 .send(BootstrapEvent {
194 source_id: COMPONENT_GRAPH_SOURCE_ID.to_string(),
195 change: SourceChange::Insert { element: rel },
196 timestamp: chrono::Utc::now(),
197 sequence: count,
198 })
199 .await
200 .is_err()
201 {
202 log::warn!(
203 "Bootstrap edge event dropped (channel closed) for source '{COMPONENT_GRAPH_SOURCE_ID}'"
204 );
205 }
206 count += 1;
207 }
208
209 info!(
210 "Component graph bootstrap complete: {} elements for query '{}'",
211 count, request.query_id
212 );
213 Ok(BootstrapResult {
214 event_count: count as usize,
215 last_sequence: None,
216 sequences_aligned: false,
217 })
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224 use drasi_core::models::Element;
225 use std::collections::HashMap;
226 use tokio::sync::mpsc;
227
228 fn make_request(query_id: &str) -> BootstrapRequest {
229 BootstrapRequest {
230 query_id: query_id.to_string(),
231 node_labels: vec![],
232 relation_labels: vec![],
233 request_id: "test-request".to_string(),
234 }
235 }
236
237 fn make_context() -> BootstrapContext {
238 BootstrapContext::new_minimal(
239 "test-server".to_string(),
240 COMPONENT_GRAPH_SOURCE_ID.to_string(),
241 )
242 }
243
244 #[test]
245 fn test_new_creates_provider_with_graph_reference() {
246 let (graph, _rx) = ComponentGraph::new("test-instance");
247 let graph = Arc::new(RwLock::new(graph));
248 let _provider = ComponentGraphBootstrapProvider::new(graph);
249 }
250
251 #[tokio::test]
252 async fn test_bootstrap_empty_graph() {
253 let (graph, _rx) = ComponentGraph::new("test-instance");
254 let graph = Arc::new(RwLock::new(graph));
255 let provider = ComponentGraphBootstrapProvider::new(graph);
256
257 let (tx, mut rx) = mpsc::channel::<BootstrapEvent>(100);
258 let request = make_request("test-query");
259 let context = make_context();
260
261 let result = provider
262 .bootstrap(request, &context, tx, None)
263 .await
264 .unwrap();
265
266 assert_eq!(result.event_count, 1);
268
269 let event = rx.recv().await.unwrap();
270 match &event.change {
271 SourceChange::Insert { element } => match element {
272 Element::Node { metadata, .. } => {
273 let labels: Vec<&str> = metadata.labels.iter().map(|l| l.as_ref()).collect();
274 assert!(labels.contains(&"DrasiInstance"));
275 }
276 _ => panic!("Expected Node element for instance"),
277 },
278 _ => panic!("Expected Insert change"),
279 }
280 }
281
282 #[tokio::test]
283 async fn test_bootstrap_with_sources_and_queries() {
284 let (mut graph, _rx) = ComponentGraph::new("test-instance");
285 graph.register_source("src1", HashMap::new()).unwrap();
286 graph.register_source("src2", HashMap::new()).unwrap();
287 graph
288 .register_query(
289 "q1",
290 HashMap::new(),
291 &["src1".to_string(), "src2".to_string()],
292 )
293 .unwrap();
294
295 let graph = Arc::new(RwLock::new(graph));
296 let provider = ComponentGraphBootstrapProvider::new(graph);
297
298 let (tx, mut rx) = mpsc::channel::<BootstrapEvent>(100);
299 let request = make_request("test-query");
300 let context = make_context();
301
302 let result = provider
303 .bootstrap(request, &context, tx, None)
304 .await
305 .unwrap();
306
307 let mut events = Vec::new();
308 while let Ok(event) = rx.try_recv() {
309 events.push(event);
310 }
311
312 assert_eq!(result.event_count, events.len());
313
314 let mut node_count = 0;
315 let mut relation_count = 0;
316 for event in &events {
317 match &event.change {
318 SourceChange::Insert { element } => match element {
319 Element::Node { .. } => node_count += 1,
320 Element::Relation { .. } => relation_count += 1,
321 },
322 _ => panic!("Expected Insert change"),
323 }
324 }
325
326 assert_eq!(node_count, 4);
328 assert_eq!(relation_count, 5);
330 assert_eq!(result.event_count, 9);
331 }
332
333 #[tokio::test]
334 async fn test_bootstrap_emitted_labels_match_component_kind() {
335 let (mut graph, _rx) = ComponentGraph::new("test-instance");
336 graph.register_source("my-source", HashMap::new()).unwrap();
337 graph
338 .register_query("my-query", HashMap::new(), &["my-source".to_string()])
339 .unwrap();
340 graph
341 .register_reaction("my-reaction", HashMap::new(), &["my-query".to_string()])
342 .unwrap();
343
344 let graph = Arc::new(RwLock::new(graph));
345 let provider = ComponentGraphBootstrapProvider::new(graph);
346
347 let (tx, mut rx) = mpsc::channel::<BootstrapEvent>(100);
348 let request = make_request("test-query");
349 let context = make_context();
350
351 provider
352 .bootstrap(request, &context, tx, None)
353 .await
354 .unwrap();
355
356 let mut events = Vec::new();
357 while let Ok(event) = rx.try_recv() {
358 events.push(event);
359 }
360
361 let mut found_instance = false;
362 let mut found_source = false;
363 let mut found_query = false;
364 let mut found_reaction = false;
365
366 for event in &events {
367 if let SourceChange::Insert {
368 element: Element::Node { metadata, .. },
369 } = &event.change
370 {
371 let labels: Vec<&str> = metadata.labels.iter().map(|l| l.as_ref()).collect();
372 if labels.contains(&"DrasiInstance") {
373 found_instance = true;
374 }
375 if labels.contains(&"Source") {
376 found_source = true;
377 }
378 if labels.contains(&"Query") {
379 found_query = true;
380 }
381 if labels.contains(&"Reaction") {
382 found_reaction = true;
383 }
384 }
385 }
386
387 assert!(found_instance, "Should emit DrasiInstance label");
388 assert!(found_source, "Should emit Source label");
389 assert!(found_query, "Should emit Query label");
390 assert!(found_reaction, "Should emit Reaction label");
391 }
392}