Skip to main content

drasi_lib/bootstrap/
component_graph.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Component Graph Bootstrap Provider
16//!
17//! Generates bootstrap data from the [`ComponentGraph`] for the component graph source.
18//! Takes a single atomic snapshot of the graph and translates all nodes and edges
19//! into `SourceChange::Insert` elements, guaranteeing a consistent view of the
20//! component topology.
21
22use anyhow::Result;
23use async_trait::async_trait;
24use drasi_core::models::{ElementPropertyMap, ElementValue, SourceChange};
25use log::info;
26use std::sync::Arc;
27use tokio::sync::RwLock;
28
29use crate::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest};
30use crate::channels::*;
31use crate::component_graph::{ComponentGraph, ComponentKind, RelationshipKind};
32use crate::config::SourceSubscriptionSettings;
33use crate::sources::component_graph_source::COMPONENT_GRAPH_SOURCE_ID;
34use crate::sources::graph_elements::{make_node, make_relation, status_str};
35
36/// Bootstrap provider that generates a consistent snapshot from the [`ComponentGraph`].
37///
38/// Takes a single read lock on the graph, iterates all nodes and edges, and
39/// translates them into `SourceChange::Insert` elements. This replaces the
40/// previous approach of querying three separate managers independently.
41pub struct ComponentGraphBootstrapProvider {
42    graph: Arc<RwLock<ComponentGraph>>,
43}
44
45impl ComponentGraphBootstrapProvider {
46    pub fn new(graph: Arc<RwLock<ComponentGraph>>) -> Self {
47        Self { graph }
48    }
49}
50
51#[async_trait]
52impl BootstrapProvider for ComponentGraphBootstrapProvider {
53    async fn bootstrap(
54        &self,
55        request: BootstrapRequest,
56        _context: &BootstrapContext,
57        event_tx: BootstrapEventSender,
58        _settings: Option<&SourceSubscriptionSettings>,
59    ) -> Result<usize> {
60        info!(
61            "Component graph bootstrap for query '{}' starting",
62            request.query_id
63        );
64
65        // Take a single atomic snapshot of the graph
66        let snapshot = self.graph.read().await.snapshot();
67        let mut count: u64 = 0;
68
69        // Emit all nodes
70        for node in &snapshot.nodes {
71            // Skip the component graph source itself to avoid self-reference
72            if node.id == COMPONENT_GRAPH_SOURCE_ID {
73                continue;
74            }
75
76            let (label, prefix) = match node.kind {
77                ComponentKind::Instance => ("DrasiInstance", "instance"),
78                ComponentKind::Source => ("Source", "source"),
79                ComponentKind::Query => ("Query", "query"),
80                ComponentKind::Reaction => ("Reaction", "reaction"),
81                ComponentKind::BootstrapProvider => continue,
82                ComponentKind::IdentityProvider => continue,
83            };
84
85            let node_id = format!("{prefix}:{}", node.id);
86
87            // Build properties from status + metadata
88            let mut props = ElementPropertyMap::new();
89            props.insert("id", ElementValue::String(Arc::from(node.id.as_str())));
90            props.insert(
91                "status",
92                ElementValue::String(Arc::from(status_str(&node.status))),
93            );
94
95            // Add metadata properties (kind, query, autoStart, etc.)
96            for (k, v) in &node.metadata {
97                props.insert(k, ElementValue::String(Arc::from(v.as_str())));
98            }
99
100            // Special case: DrasiInstance gets "running" property
101            if matches!(node.kind, ComponentKind::Instance) {
102                props.insert("running", ElementValue::String(Arc::from("true")));
103            }
104
105            let element = make_node(&node_id, &[label], props);
106            if event_tx
107                .send(BootstrapEvent {
108                    source_id: COMPONENT_GRAPH_SOURCE_ID.to_string(),
109                    change: SourceChange::Insert { element },
110                    timestamp: chrono::Utc::now(),
111                    sequence: count,
112                })
113                .await
114                .is_err()
115            {
116                log::warn!(
117                    "Bootstrap node event dropped (channel closed) for source '{COMPONENT_GRAPH_SOURCE_ID}'"
118                );
119            }
120            count += 1;
121        }
122
123        // Emit all edges as relations
124        for edge in &snapshot.edges {
125            // Skip edges involving the component graph source itself
126            if edge.from == COMPONENT_GRAPH_SOURCE_ID || edge.to == COMPONENT_GRAPH_SOURCE_ID {
127                continue;
128            }
129
130            let (label, rel_id, from_prefix, to_prefix) = match &edge.relationship {
131                RelationshipKind::Owns => {
132                    // Instance → Component: emit HAS_SOURCE/HAS_QUERY/HAS_REACTION
133                    let to_node = snapshot.nodes.iter().find(|n| n.id == edge.to);
134                    let (has_label, rel_prefix) = match to_node.map(|n| &n.kind) {
135                        Some(ComponentKind::Source) => ("HAS_SOURCE", "has_source"),
136                        Some(ComponentKind::Query) => ("HAS_QUERY", "has_query"),
137                        Some(ComponentKind::Reaction) => ("HAS_REACTION", "has_reaction"),
138                        _ => continue,
139                    };
140                    let rel_id = format!("rel:{rel_prefix}:{}:{}", edge.from, edge.to);
141                    (has_label, rel_id, "instance", "")
142                }
143                RelationshipKind::Feeds => {
144                    // Source→Query or Query→Reaction
145                    let from_node = snapshot.nodes.iter().find(|n| n.id == edge.from);
146                    match from_node.map(|n| &n.kind) {
147                        Some(ComponentKind::Source) => {
148                            // Source feeds Query → SUBSCRIBES_TO (Query→Source in Cypher)
149                            let rel_id = format!("rel:subscribes:{}:{}", edge.to, edge.from);
150                            ("SUBSCRIBES_TO", rel_id, "query", "source")
151                        }
152                        Some(ComponentKind::Query) => {
153                            // Query feeds Reaction → LISTENS_TO (Reaction→Query in Cypher)
154                            let rel_id = format!("rel:listens:{}:{}", edge.to, edge.from);
155                            ("LISTENS_TO", rel_id, "reaction", "query")
156                        }
157                        _ => continue,
158                    }
159                }
160                // Skip reverse/non-data edges (OwnedBy, SubscribesTo, etc.)
161                _ => continue,
162            };
163
164            let (in_node_id, out_node_id) = match &edge.relationship {
165                RelationshipKind::Owns => {
166                    let to_node = snapshot.nodes.iter().find(|n| n.id == edge.to);
167                    let to_prefix = match to_node.map(|n| &n.kind) {
168                        Some(ComponentKind::Source) => "source",
169                        Some(ComponentKind::Query) => "query",
170                        Some(ComponentKind::Reaction) => "reaction",
171                        _ => continue,
172                    };
173                    (
174                        format!("{from_prefix}:{}", edge.from),
175                        format!("{to_prefix}:{}", edge.to),
176                    )
177                }
178                RelationshipKind::Feeds => (
179                    format!("{from_prefix}:{}", edge.to),
180                    format!("{to_prefix}:{}", edge.from),
181                ),
182                _ => continue,
183            };
184
185            let rel = make_relation(
186                &rel_id,
187                &[label],
188                &in_node_id,
189                &out_node_id,
190                ElementPropertyMap::new(),
191            );
192            if event_tx
193                .send(BootstrapEvent {
194                    source_id: COMPONENT_GRAPH_SOURCE_ID.to_string(),
195                    change: SourceChange::Insert { element: rel },
196                    timestamp: chrono::Utc::now(),
197                    sequence: count,
198                })
199                .await
200                .is_err()
201            {
202                log::warn!(
203                    "Bootstrap edge event dropped (channel closed) for source '{COMPONENT_GRAPH_SOURCE_ID}'"
204                );
205            }
206            count += 1;
207        }
208
209        info!(
210            "Component graph bootstrap complete: {} elements for query '{}'",
211            count, request.query_id
212        );
213        Ok(count as usize)
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220    use drasi_core::models::Element;
221    use std::collections::HashMap;
222    use tokio::sync::mpsc;
223
224    fn make_request(query_id: &str) -> BootstrapRequest {
225        BootstrapRequest {
226            query_id: query_id.to_string(),
227            node_labels: vec![],
228            relation_labels: vec![],
229            request_id: "test-request".to_string(),
230        }
231    }
232
233    fn make_context() -> BootstrapContext {
234        BootstrapContext::new_minimal(
235            "test-server".to_string(),
236            COMPONENT_GRAPH_SOURCE_ID.to_string(),
237        )
238    }
239
240    #[test]
241    fn test_new_creates_provider_with_graph_reference() {
242        let (graph, _rx) = ComponentGraph::new("test-instance");
243        let graph = Arc::new(RwLock::new(graph));
244        let _provider = ComponentGraphBootstrapProvider::new(graph);
245    }
246
247    #[tokio::test]
248    async fn test_bootstrap_empty_graph() {
249        let (graph, _rx) = ComponentGraph::new("test-instance");
250        let graph = Arc::new(RwLock::new(graph));
251        let provider = ComponentGraphBootstrapProvider::new(graph);
252
253        let (tx, mut rx) = mpsc::channel::<BootstrapEvent>(100);
254        let request = make_request("test-query");
255        let context = make_context();
256
257        let count = provider
258            .bootstrap(request, &context, tx, None)
259            .await
260            .unwrap();
261
262        // Only the instance root node is present; it is emitted as DrasiInstance
263        assert_eq!(count, 1);
264
265        let event = rx.recv().await.unwrap();
266        match &event.change {
267            SourceChange::Insert { element } => match element {
268                Element::Node { metadata, .. } => {
269                    let labels: Vec<&str> = metadata.labels.iter().map(|l| l.as_ref()).collect();
270                    assert!(labels.contains(&"DrasiInstance"));
271                }
272                _ => panic!("Expected Node element for instance"),
273            },
274            _ => panic!("Expected Insert change"),
275        }
276    }
277
278    #[tokio::test]
279    async fn test_bootstrap_with_sources_and_queries() {
280        let (mut graph, _rx) = ComponentGraph::new("test-instance");
281        graph.register_source("src1", HashMap::new()).unwrap();
282        graph.register_source("src2", HashMap::new()).unwrap();
283        graph
284            .register_query(
285                "q1",
286                HashMap::new(),
287                &["src1".to_string(), "src2".to_string()],
288            )
289            .unwrap();
290
291        let graph = Arc::new(RwLock::new(graph));
292        let provider = ComponentGraphBootstrapProvider::new(graph);
293
294        let (tx, mut rx) = mpsc::channel::<BootstrapEvent>(100);
295        let request = make_request("test-query");
296        let context = make_context();
297
298        let count = provider
299            .bootstrap(request, &context, tx, None)
300            .await
301            .unwrap();
302
303        let mut events = Vec::new();
304        while let Ok(event) = rx.try_recv() {
305            events.push(event);
306        }
307
308        assert_eq!(count, events.len());
309
310        let mut node_count = 0;
311        let mut relation_count = 0;
312        for event in &events {
313            match &event.change {
314                SourceChange::Insert { element } => match element {
315                    Element::Node { .. } => node_count += 1,
316                    Element::Relation { .. } => relation_count += 1,
317                },
318                _ => panic!("Expected Insert change"),
319            }
320        }
321
322        // 4 nodes: instance + src1 + src2 + q1
323        assert_eq!(node_count, 4);
324        // 5 relations: HAS_SOURCE×2 + HAS_QUERY×1 + SUBSCRIBES_TO×2
325        assert_eq!(relation_count, 5);
326        assert_eq!(count, 9);
327    }
328
329    #[tokio::test]
330    async fn test_bootstrap_emitted_labels_match_component_kind() {
331        let (mut graph, _rx) = ComponentGraph::new("test-instance");
332        graph.register_source("my-source", HashMap::new()).unwrap();
333        graph
334            .register_query("my-query", HashMap::new(), &["my-source".to_string()])
335            .unwrap();
336        graph
337            .register_reaction("my-reaction", HashMap::new(), &["my-query".to_string()])
338            .unwrap();
339
340        let graph = Arc::new(RwLock::new(graph));
341        let provider = ComponentGraphBootstrapProvider::new(graph);
342
343        let (tx, mut rx) = mpsc::channel::<BootstrapEvent>(100);
344        let request = make_request("test-query");
345        let context = make_context();
346
347        provider
348            .bootstrap(request, &context, tx, None)
349            .await
350            .unwrap();
351
352        let mut events = Vec::new();
353        while let Ok(event) = rx.try_recv() {
354            events.push(event);
355        }
356
357        let mut found_instance = false;
358        let mut found_source = false;
359        let mut found_query = false;
360        let mut found_reaction = false;
361
362        for event in &events {
363            if let SourceChange::Insert {
364                element: Element::Node { metadata, .. },
365            } = &event.change
366            {
367                let labels: Vec<&str> = metadata.labels.iter().map(|l| l.as_ref()).collect();
368                if labels.contains(&"DrasiInstance") {
369                    found_instance = true;
370                }
371                if labels.contains(&"Source") {
372                    found_source = true;
373                }
374                if labels.contains(&"Query") {
375                    found_query = true;
376                }
377                if labels.contains(&"Reaction") {
378                    found_reaction = true;
379                }
380            }
381        }
382
383        assert!(found_instance, "Should emit DrasiInstance label");
384        assert!(found_source, "Should emit Source label");
385        assert!(found_query, "Should emit Query label");
386        assert!(found_reaction, "Should emit Reaction label");
387    }
388}