Skip to main content

drasi_lib/bootstrap/
component_graph.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Component Graph Bootstrap Provider
16//!
17//! Generates bootstrap data from the [`ComponentGraph`] for the component graph source.
18//! Takes a single atomic snapshot of the graph and translates all nodes and edges
19//! into `SourceChange::Insert` elements, guaranteeing a consistent view of the
20//! component topology.
21
22use anyhow::Result;
23use async_trait::async_trait;
24use drasi_core::models::{ElementPropertyMap, ElementValue, SourceChange};
25use log::info;
26use std::sync::Arc;
27use tokio::sync::RwLock;
28
29use crate::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult};
30use crate::channels::*;
31use crate::component_graph::{ComponentGraph, ComponentKind, RelationshipKind};
32use crate::config::SourceSubscriptionSettings;
33use crate::sources::component_graph_source::COMPONENT_GRAPH_SOURCE_ID;
34use crate::sources::graph_elements::{make_node, make_relation, status_str};
35
36/// Bootstrap provider that generates a consistent snapshot from the [`ComponentGraph`].
37///
38/// Takes a single read lock on the graph, iterates all nodes and edges, and
39/// translates them into `SourceChange::Insert` elements. This replaces the
40/// previous approach of querying three separate managers independently.
41pub struct ComponentGraphBootstrapProvider {
42    graph: Arc<RwLock<ComponentGraph>>,
43}
44
45impl ComponentGraphBootstrapProvider {
46    pub fn new(graph: Arc<RwLock<ComponentGraph>>) -> Self {
47        Self { graph }
48    }
49}
50
51#[async_trait]
52impl BootstrapProvider for ComponentGraphBootstrapProvider {
53    async fn bootstrap(
54        &self,
55        request: BootstrapRequest,
56        _context: &BootstrapContext,
57        event_tx: BootstrapEventSender,
58        _settings: Option<&SourceSubscriptionSettings>,
59    ) -> Result<BootstrapResult> {
60        info!(
61            "Component graph bootstrap for query '{}' starting",
62            request.query_id
63        );
64
65        // Take a single atomic snapshot of the graph
66        let snapshot = self.graph.read().await.snapshot();
67        let mut count: u64 = 0;
68
69        // Emit all nodes
70        for node in &snapshot.nodes {
71            // Skip the component graph source itself to avoid self-reference
72            if node.id == COMPONENT_GRAPH_SOURCE_ID {
73                continue;
74            }
75
76            let (label, prefix) = match node.kind {
77                ComponentKind::Instance => ("DrasiInstance", "instance"),
78                ComponentKind::Source => ("Source", "source"),
79                ComponentKind::Query => ("Query", "query"),
80                ComponentKind::Reaction => ("Reaction", "reaction"),
81                ComponentKind::BootstrapProvider => continue,
82                ComponentKind::IdentityProvider => continue,
83            };
84
85            let node_id = format!("{prefix}:{}", node.id);
86
87            // Build properties from status + metadata
88            let mut props = ElementPropertyMap::new();
89            props.insert("id", ElementValue::String(Arc::from(node.id.as_str())));
90            props.insert(
91                "status",
92                ElementValue::String(Arc::from(status_str(&node.status))),
93            );
94
95            // Add metadata properties (kind, query, autoStart, etc.)
96            for (k, v) in &node.metadata {
97                props.insert(k, ElementValue::String(Arc::from(v.as_str())));
98            }
99
100            // Special case: DrasiInstance gets "running" property
101            if matches!(node.kind, ComponentKind::Instance) {
102                props.insert("running", ElementValue::String(Arc::from("true")));
103            }
104
105            let element = make_node(&node_id, &[label], props);
106            if event_tx
107                .send(BootstrapEvent {
108                    source_id: COMPONENT_GRAPH_SOURCE_ID.to_string(),
109                    change: SourceChange::Insert { element },
110                    timestamp: chrono::Utc::now(),
111                    sequence: count,
112                })
113                .await
114                .is_err()
115            {
116                log::warn!(
117                    "Bootstrap node event dropped (channel closed) for source '{COMPONENT_GRAPH_SOURCE_ID}'"
118                );
119            }
120            count += 1;
121        }
122
123        // Emit all edges as relations
124        for edge in &snapshot.edges {
125            // Skip edges involving the component graph source itself
126            if edge.from == COMPONENT_GRAPH_SOURCE_ID || edge.to == COMPONENT_GRAPH_SOURCE_ID {
127                continue;
128            }
129
130            let (label, rel_id, from_prefix, to_prefix) = match &edge.relationship {
131                RelationshipKind::Owns => {
132                    // Instance → Component: emit HAS_SOURCE/HAS_QUERY/HAS_REACTION
133                    let to_node = snapshot.nodes.iter().find(|n| n.id == edge.to);
134                    let (has_label, rel_prefix) = match to_node.map(|n| &n.kind) {
135                        Some(ComponentKind::Source) => ("HAS_SOURCE", "has_source"),
136                        Some(ComponentKind::Query) => ("HAS_QUERY", "has_query"),
137                        Some(ComponentKind::Reaction) => ("HAS_REACTION", "has_reaction"),
138                        _ => continue,
139                    };
140                    let rel_id = format!("rel:{rel_prefix}:{}:{}", edge.from, edge.to);
141                    (has_label, rel_id, "instance", "")
142                }
143                RelationshipKind::Feeds => {
144                    // Source→Query or Query→Reaction
145                    let from_node = snapshot.nodes.iter().find(|n| n.id == edge.from);
146                    match from_node.map(|n| &n.kind) {
147                        Some(ComponentKind::Source) => {
148                            // Source feeds Query → SUBSCRIBES_TO (Query→Source in Cypher)
149                            let rel_id = format!("rel:subscribes:{}:{}", edge.to, edge.from);
150                            ("SUBSCRIBES_TO", rel_id, "query", "source")
151                        }
152                        Some(ComponentKind::Query) => {
153                            // Query feeds Reaction → LISTENS_TO (Reaction→Query in Cypher)
154                            let rel_id = format!("rel:listens:{}:{}", edge.to, edge.from);
155                            ("LISTENS_TO", rel_id, "reaction", "query")
156                        }
157                        _ => continue,
158                    }
159                }
160                // Skip reverse/non-data edges (OwnedBy, SubscribesTo, etc.)
161                _ => continue,
162            };
163
164            let (in_node_id, out_node_id) = match &edge.relationship {
165                RelationshipKind::Owns => {
166                    let to_node = snapshot.nodes.iter().find(|n| n.id == edge.to);
167                    let to_prefix = match to_node.map(|n| &n.kind) {
168                        Some(ComponentKind::Source) => "source",
169                        Some(ComponentKind::Query) => "query",
170                        Some(ComponentKind::Reaction) => "reaction",
171                        _ => continue,
172                    };
173                    (
174                        format!("{from_prefix}:{}", edge.from),
175                        format!("{to_prefix}:{}", edge.to),
176                    )
177                }
178                RelationshipKind::Feeds => (
179                    format!("{from_prefix}:{}", edge.to),
180                    format!("{to_prefix}:{}", edge.from),
181                ),
182                _ => continue,
183            };
184
185            let rel = make_relation(
186                &rel_id,
187                &[label],
188                &in_node_id,
189                &out_node_id,
190                ElementPropertyMap::new(),
191            );
192            if event_tx
193                .send(BootstrapEvent {
194                    source_id: COMPONENT_GRAPH_SOURCE_ID.to_string(),
195                    change: SourceChange::Insert { element: rel },
196                    timestamp: chrono::Utc::now(),
197                    sequence: count,
198                })
199                .await
200                .is_err()
201            {
202                log::warn!(
203                    "Bootstrap edge event dropped (channel closed) for source '{COMPONENT_GRAPH_SOURCE_ID}'"
204                );
205            }
206            count += 1;
207        }
208
209        info!(
210            "Component graph bootstrap complete: {} elements for query '{}'",
211            count, request.query_id
212        );
213        Ok(BootstrapResult {
214            event_count: count as usize,
215            last_sequence: None,
216            sequences_aligned: false,
217            source_position: None,
218        })
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225    use drasi_core::models::Element;
226    use std::collections::HashMap;
227    use tokio::sync::mpsc;
228
229    fn make_request(query_id: &str) -> BootstrapRequest {
230        BootstrapRequest {
231            query_id: query_id.to_string(),
232            node_labels: vec![],
233            relation_labels: vec![],
234            request_id: "test-request".to_string(),
235        }
236    }
237
238    fn make_context() -> BootstrapContext {
239        BootstrapContext::new_minimal(
240            "test-server".to_string(),
241            COMPONENT_GRAPH_SOURCE_ID.to_string(),
242        )
243    }
244
245    #[test]
246    fn test_new_creates_provider_with_graph_reference() {
247        let (graph, _rx) = ComponentGraph::new("test-instance");
248        let graph = Arc::new(RwLock::new(graph));
249        let _provider = ComponentGraphBootstrapProvider::new(graph);
250    }
251
252    #[tokio::test]
253    async fn test_bootstrap_empty_graph() {
254        let (graph, _rx) = ComponentGraph::new("test-instance");
255        let graph = Arc::new(RwLock::new(graph));
256        let provider = ComponentGraphBootstrapProvider::new(graph);
257
258        let (tx, mut rx) = mpsc::channel::<BootstrapEvent>(100);
259        let request = make_request("test-query");
260        let context = make_context();
261
262        let result = provider
263            .bootstrap(request, &context, tx, None)
264            .await
265            .unwrap();
266
267        // Only the instance root node is present; it is emitted as DrasiInstance
268        assert_eq!(result.event_count, 1);
269
270        let event = rx.recv().await.unwrap();
271        match &event.change {
272            SourceChange::Insert { element } => match element {
273                Element::Node { metadata, .. } => {
274                    let labels: Vec<&str> = metadata.labels.iter().map(|l| l.as_ref()).collect();
275                    assert!(labels.contains(&"DrasiInstance"));
276                }
277                _ => panic!("Expected Node element for instance"),
278            },
279            _ => panic!("Expected Insert change"),
280        }
281    }
282
283    #[tokio::test]
284    async fn test_bootstrap_with_sources_and_queries() {
285        let (mut graph, _rx) = ComponentGraph::new("test-instance");
286        graph.register_source("src1", HashMap::new()).unwrap();
287        graph.register_source("src2", HashMap::new()).unwrap();
288        graph
289            .register_query(
290                "q1",
291                HashMap::new(),
292                &["src1".to_string(), "src2".to_string()],
293            )
294            .unwrap();
295
296        let graph = Arc::new(RwLock::new(graph));
297        let provider = ComponentGraphBootstrapProvider::new(graph);
298
299        let (tx, mut rx) = mpsc::channel::<BootstrapEvent>(100);
300        let request = make_request("test-query");
301        let context = make_context();
302
303        let result = provider
304            .bootstrap(request, &context, tx, None)
305            .await
306            .unwrap();
307
308        let mut events = Vec::new();
309        while let Ok(event) = rx.try_recv() {
310            events.push(event);
311        }
312
313        assert_eq!(result.event_count, events.len());
314
315        let mut node_count = 0;
316        let mut relation_count = 0;
317        for event in &events {
318            match &event.change {
319                SourceChange::Insert { element } => match element {
320                    Element::Node { .. } => node_count += 1,
321                    Element::Relation { .. } => relation_count += 1,
322                },
323                _ => panic!("Expected Insert change"),
324            }
325        }
326
327        // 4 nodes: instance + src1 + src2 + q1
328        assert_eq!(node_count, 4);
329        // 5 relations: HAS_SOURCE×2 + HAS_QUERY×1 + SUBSCRIBES_TO×2
330        assert_eq!(relation_count, 5);
331        assert_eq!(result.event_count, 9);
332    }
333
334    #[tokio::test]
335    async fn test_bootstrap_emitted_labels_match_component_kind() {
336        let (mut graph, _rx) = ComponentGraph::new("test-instance");
337        graph.register_source("my-source", HashMap::new()).unwrap();
338        graph
339            .register_query("my-query", HashMap::new(), &["my-source".to_string()])
340            .unwrap();
341        graph
342            .register_reaction("my-reaction", HashMap::new(), &["my-query".to_string()])
343            .unwrap();
344
345        let graph = Arc::new(RwLock::new(graph));
346        let provider = ComponentGraphBootstrapProvider::new(graph);
347
348        let (tx, mut rx) = mpsc::channel::<BootstrapEvent>(100);
349        let request = make_request("test-query");
350        let context = make_context();
351
352        provider
353            .bootstrap(request, &context, tx, None)
354            .await
355            .unwrap();
356
357        let mut events = Vec::new();
358        while let Ok(event) = rx.try_recv() {
359            events.push(event);
360        }
361
362        let mut found_instance = false;
363        let mut found_source = false;
364        let mut found_query = false;
365        let mut found_reaction = false;
366
367        for event in &events {
368            if let SourceChange::Insert {
369                element: Element::Node { metadata, .. },
370            } = &event.change
371            {
372                let labels: Vec<&str> = metadata.labels.iter().map(|l| l.as_ref()).collect();
373                if labels.contains(&"DrasiInstance") {
374                    found_instance = true;
375                }
376                if labels.contains(&"Source") {
377                    found_source = true;
378                }
379                if labels.contains(&"Query") {
380                    found_query = true;
381                }
382                if labels.contains(&"Reaction") {
383                    found_reaction = true;
384                }
385            }
386        }
387
388        assert!(found_instance, "Should emit DrasiInstance label");
389        assert!(found_source, "Should emit Source label");
390        assert!(found_query, "Should emit Query label");
391        assert!(found_reaction, "Should emit Reaction label");
392    }
393}