1use anyhow::Result;
23use async_trait::async_trait;
24use drasi_core::models::{ElementPropertyMap, ElementValue, SourceChange};
25use log::info;
26use std::sync::Arc;
27use tokio::sync::RwLock;
28
29use crate::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult};
30use crate::channels::*;
31use crate::component_graph::{ComponentGraph, ComponentKind, RelationshipKind};
32use crate::config::SourceSubscriptionSettings;
33use crate::sources::component_graph_source::COMPONENT_GRAPH_SOURCE_ID;
34use crate::sources::graph_elements::{make_node, make_relation, status_str};
35
36pub struct ComponentGraphBootstrapProvider {
42 graph: Arc<RwLock<ComponentGraph>>,
43}
44
45impl ComponentGraphBootstrapProvider {
46 pub fn new(graph: Arc<RwLock<ComponentGraph>>) -> Self {
47 Self { graph }
48 }
49}
50
51#[async_trait]
52impl BootstrapProvider for ComponentGraphBootstrapProvider {
53 async fn bootstrap(
54 &self,
55 request: BootstrapRequest,
56 _context: &BootstrapContext,
57 event_tx: BootstrapEventSender,
58 _settings: Option<&SourceSubscriptionSettings>,
59 ) -> Result<BootstrapResult> {
60 info!(
61 "Component graph bootstrap for query '{}' starting",
62 request.query_id
63 );
64
65 let snapshot = self.graph.read().await.snapshot();
67 let mut count: u64 = 0;
68
69 for node in &snapshot.nodes {
71 if node.id == COMPONENT_GRAPH_SOURCE_ID {
73 continue;
74 }
75
76 let (label, prefix) = match node.kind {
77 ComponentKind::Instance => ("DrasiInstance", "instance"),
78 ComponentKind::Source => ("Source", "source"),
79 ComponentKind::Query => ("Query", "query"),
80 ComponentKind::Reaction => ("Reaction", "reaction"),
81 ComponentKind::BootstrapProvider => continue,
82 ComponentKind::IdentityProvider => continue,
83 };
84
85 let node_id = format!("{prefix}:{}", node.id);
86
87 let mut props = ElementPropertyMap::new();
89 props.insert("id", ElementValue::String(Arc::from(node.id.as_str())));
90 props.insert(
91 "status",
92 ElementValue::String(Arc::from(status_str(&node.status))),
93 );
94
95 for (k, v) in &node.metadata {
97 props.insert(k, ElementValue::String(Arc::from(v.as_str())));
98 }
99
100 if matches!(node.kind, ComponentKind::Instance) {
102 props.insert("running", ElementValue::String(Arc::from("true")));
103 }
104
105 let element = make_node(&node_id, &[label], props);
106 if event_tx
107 .send(BootstrapEvent {
108 source_id: COMPONENT_GRAPH_SOURCE_ID.to_string(),
109 change: SourceChange::Insert { element },
110 timestamp: chrono::Utc::now(),
111 sequence: count,
112 })
113 .await
114 .is_err()
115 {
116 log::warn!(
117 "Bootstrap node event dropped (channel closed) for source '{COMPONENT_GRAPH_SOURCE_ID}'"
118 );
119 }
120 count += 1;
121 }
122
123 for edge in &snapshot.edges {
125 if edge.from == COMPONENT_GRAPH_SOURCE_ID || edge.to == COMPONENT_GRAPH_SOURCE_ID {
127 continue;
128 }
129
130 let (label, rel_id, from_prefix, to_prefix) = match &edge.relationship {
131 RelationshipKind::Owns => {
132 let to_node = snapshot.nodes.iter().find(|n| n.id == edge.to);
134 let (has_label, rel_prefix) = match to_node.map(|n| &n.kind) {
135 Some(ComponentKind::Source) => ("HAS_SOURCE", "has_source"),
136 Some(ComponentKind::Query) => ("HAS_QUERY", "has_query"),
137 Some(ComponentKind::Reaction) => ("HAS_REACTION", "has_reaction"),
138 _ => continue,
139 };
140 let rel_id = format!("rel:{rel_prefix}:{}:{}", edge.from, edge.to);
141 (has_label, rel_id, "instance", "")
142 }
143 RelationshipKind::Feeds => {
144 let from_node = snapshot.nodes.iter().find(|n| n.id == edge.from);
146 match from_node.map(|n| &n.kind) {
147 Some(ComponentKind::Source) => {
148 let rel_id = format!("rel:subscribes:{}:{}", edge.to, edge.from);
150 ("SUBSCRIBES_TO", rel_id, "query", "source")
151 }
152 Some(ComponentKind::Query) => {
153 let rel_id = format!("rel:listens:{}:{}", edge.to, edge.from);
155 ("LISTENS_TO", rel_id, "reaction", "query")
156 }
157 _ => continue,
158 }
159 }
160 _ => continue,
162 };
163
164 let (in_node_id, out_node_id) = match &edge.relationship {
165 RelationshipKind::Owns => {
166 let to_node = snapshot.nodes.iter().find(|n| n.id == edge.to);
167 let to_prefix = match to_node.map(|n| &n.kind) {
168 Some(ComponentKind::Source) => "source",
169 Some(ComponentKind::Query) => "query",
170 Some(ComponentKind::Reaction) => "reaction",
171 _ => continue,
172 };
173 (
174 format!("{from_prefix}:{}", edge.from),
175 format!("{to_prefix}:{}", edge.to),
176 )
177 }
178 RelationshipKind::Feeds => (
179 format!("{from_prefix}:{}", edge.to),
180 format!("{to_prefix}:{}", edge.from),
181 ),
182 _ => continue,
183 };
184
185 let rel = make_relation(
186 &rel_id,
187 &[label],
188 &in_node_id,
189 &out_node_id,
190 ElementPropertyMap::new(),
191 );
192 if event_tx
193 .send(BootstrapEvent {
194 source_id: COMPONENT_GRAPH_SOURCE_ID.to_string(),
195 change: SourceChange::Insert { element: rel },
196 timestamp: chrono::Utc::now(),
197 sequence: count,
198 })
199 .await
200 .is_err()
201 {
202 log::warn!(
203 "Bootstrap edge event dropped (channel closed) for source '{COMPONENT_GRAPH_SOURCE_ID}'"
204 );
205 }
206 count += 1;
207 }
208
209 info!(
210 "Component graph bootstrap complete: {} elements for query '{}'",
211 count, request.query_id
212 );
213 Ok(BootstrapResult {
214 event_count: count as usize,
215 last_sequence: None,
216 sequences_aligned: false,
217 source_position: None,
218 })
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225 use drasi_core::models::Element;
226 use std::collections::HashMap;
227 use tokio::sync::mpsc;
228
229 fn make_request(query_id: &str) -> BootstrapRequest {
230 BootstrapRequest {
231 query_id: query_id.to_string(),
232 node_labels: vec![],
233 relation_labels: vec![],
234 request_id: "test-request".to_string(),
235 }
236 }
237
238 fn make_context() -> BootstrapContext {
239 BootstrapContext::new_minimal(
240 "test-server".to_string(),
241 COMPONENT_GRAPH_SOURCE_ID.to_string(),
242 )
243 }
244
245 #[test]
246 fn test_new_creates_provider_with_graph_reference() {
247 let (graph, _rx) = ComponentGraph::new("test-instance");
248 let graph = Arc::new(RwLock::new(graph));
249 let _provider = ComponentGraphBootstrapProvider::new(graph);
250 }
251
252 #[tokio::test]
253 async fn test_bootstrap_empty_graph() {
254 let (graph, _rx) = ComponentGraph::new("test-instance");
255 let graph = Arc::new(RwLock::new(graph));
256 let provider = ComponentGraphBootstrapProvider::new(graph);
257
258 let (tx, mut rx) = mpsc::channel::<BootstrapEvent>(100);
259 let request = make_request("test-query");
260 let context = make_context();
261
262 let result = provider
263 .bootstrap(request, &context, tx, None)
264 .await
265 .unwrap();
266
267 assert_eq!(result.event_count, 1);
269
270 let event = rx.recv().await.unwrap();
271 match &event.change {
272 SourceChange::Insert { element } => match element {
273 Element::Node { metadata, .. } => {
274 let labels: Vec<&str> = metadata.labels.iter().map(|l| l.as_ref()).collect();
275 assert!(labels.contains(&"DrasiInstance"));
276 }
277 _ => panic!("Expected Node element for instance"),
278 },
279 _ => panic!("Expected Insert change"),
280 }
281 }
282
283 #[tokio::test]
284 async fn test_bootstrap_with_sources_and_queries() {
285 let (mut graph, _rx) = ComponentGraph::new("test-instance");
286 graph.register_source("src1", HashMap::new()).unwrap();
287 graph.register_source("src2", HashMap::new()).unwrap();
288 graph
289 .register_query(
290 "q1",
291 HashMap::new(),
292 &["src1".to_string(), "src2".to_string()],
293 )
294 .unwrap();
295
296 let graph = Arc::new(RwLock::new(graph));
297 let provider = ComponentGraphBootstrapProvider::new(graph);
298
299 let (tx, mut rx) = mpsc::channel::<BootstrapEvent>(100);
300 let request = make_request("test-query");
301 let context = make_context();
302
303 let result = provider
304 .bootstrap(request, &context, tx, None)
305 .await
306 .unwrap();
307
308 let mut events = Vec::new();
309 while let Ok(event) = rx.try_recv() {
310 events.push(event);
311 }
312
313 assert_eq!(result.event_count, events.len());
314
315 let mut node_count = 0;
316 let mut relation_count = 0;
317 for event in &events {
318 match &event.change {
319 SourceChange::Insert { element } => match element {
320 Element::Node { .. } => node_count += 1,
321 Element::Relation { .. } => relation_count += 1,
322 },
323 _ => panic!("Expected Insert change"),
324 }
325 }
326
327 assert_eq!(node_count, 4);
329 assert_eq!(relation_count, 5);
331 assert_eq!(result.event_count, 9);
332 }
333
334 #[tokio::test]
335 async fn test_bootstrap_emitted_labels_match_component_kind() {
336 let (mut graph, _rx) = ComponentGraph::new("test-instance");
337 graph.register_source("my-source", HashMap::new()).unwrap();
338 graph
339 .register_query("my-query", HashMap::new(), &["my-source".to_string()])
340 .unwrap();
341 graph
342 .register_reaction("my-reaction", HashMap::new(), &["my-query".to_string()])
343 .unwrap();
344
345 let graph = Arc::new(RwLock::new(graph));
346 let provider = ComponentGraphBootstrapProvider::new(graph);
347
348 let (tx, mut rx) = mpsc::channel::<BootstrapEvent>(100);
349 let request = make_request("test-query");
350 let context = make_context();
351
352 provider
353 .bootstrap(request, &context, tx, None)
354 .await
355 .unwrap();
356
357 let mut events = Vec::new();
358 while let Ok(event) = rx.try_recv() {
359 events.push(event);
360 }
361
362 let mut found_instance = false;
363 let mut found_source = false;
364 let mut found_query = false;
365 let mut found_reaction = false;
366
367 for event in &events {
368 if let SourceChange::Insert {
369 element: Element::Node { metadata, .. },
370 } = &event.change
371 {
372 let labels: Vec<&str> = metadata.labels.iter().map(|l| l.as_ref()).collect();
373 if labels.contains(&"DrasiInstance") {
374 found_instance = true;
375 }
376 if labels.contains(&"Source") {
377 found_source = true;
378 }
379 if labels.contains(&"Query") {
380 found_query = true;
381 }
382 if labels.contains(&"Reaction") {
383 found_reaction = true;
384 }
385 }
386 }
387
388 assert!(found_instance, "Should emit DrasiInstance label");
389 assert!(found_source, "Should emit Source label");
390 assert!(found_query, "Should emit Query label");
391 assert!(found_reaction, "Should emit Reaction label");
392 }
393}