Skip to main content

drasi_lib/bootstrap/
component_graph.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Component Graph Bootstrap Provider
16//!
17//! Generates bootstrap data from the [`ComponentGraph`] for the component graph source.
18//! Takes a single atomic snapshot of the graph and translates all nodes and edges
19//! into `SourceChange::Insert` elements, guaranteeing a consistent view of the
20//! component topology.
21
22use anyhow::Result;
23use async_trait::async_trait;
24use drasi_core::models::{ElementPropertyMap, ElementValue, SourceChange};
25use log::info;
26use std::sync::Arc;
27use tokio::sync::RwLock;
28
29use crate::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult};
30use crate::channels::*;
31use crate::component_graph::{ComponentGraph, ComponentKind, RelationshipKind};
32use crate::config::SourceSubscriptionSettings;
33use crate::sources::component_graph_source::COMPONENT_GRAPH_SOURCE_ID;
34use crate::sources::graph_elements::{make_node, make_relation, status_str};
35
36/// Bootstrap provider that generates a consistent snapshot from the [`ComponentGraph`].
37///
38/// Takes a single read lock on the graph, iterates all nodes and edges, and
39/// translates them into `SourceChange::Insert` elements. This replaces the
40/// previous approach of querying three separate managers independently.
41pub struct ComponentGraphBootstrapProvider {
42    graph: Arc<RwLock<ComponentGraph>>,
43}
44
45impl ComponentGraphBootstrapProvider {
46    pub fn new(graph: Arc<RwLock<ComponentGraph>>) -> Self {
47        Self { graph }
48    }
49}
50
51#[async_trait]
52impl BootstrapProvider for ComponentGraphBootstrapProvider {
53    async fn bootstrap(
54        &self,
55        request: BootstrapRequest,
56        _context: &BootstrapContext,
57        event_tx: BootstrapEventSender,
58        _settings: Option<&SourceSubscriptionSettings>,
59    ) -> Result<BootstrapResult> {
60        info!(
61            "Component graph bootstrap for query '{}' starting",
62            request.query_id
63        );
64
65        // Take a single atomic snapshot of the graph
66        let snapshot = self.graph.read().await.snapshot();
67        let mut count: u64 = 0;
68
69        // Emit all nodes
70        for node in &snapshot.nodes {
71            // Skip the component graph source itself to avoid self-reference
72            if node.id == COMPONENT_GRAPH_SOURCE_ID {
73                continue;
74            }
75
76            let (label, prefix) = match node.kind {
77                ComponentKind::Instance => ("DrasiInstance", "instance"),
78                ComponentKind::Source => ("Source", "source"),
79                ComponentKind::Query => ("Query", "query"),
80                ComponentKind::Reaction => ("Reaction", "reaction"),
81                ComponentKind::BootstrapProvider => continue,
82                ComponentKind::IdentityProvider => continue,
83            };
84
85            let node_id = format!("{prefix}:{}", node.id);
86
87            // Build properties from status + metadata
88            let mut props = ElementPropertyMap::new();
89            props.insert("id", ElementValue::String(Arc::from(node.id.as_str())));
90            props.insert(
91                "status",
92                ElementValue::String(Arc::from(status_str(&node.status))),
93            );
94
95            // Add metadata properties (kind, query, autoStart, etc.)
96            for (k, v) in &node.metadata {
97                props.insert(k, ElementValue::String(Arc::from(v.as_str())));
98            }
99
100            // Special case: DrasiInstance gets "running" property
101            if matches!(node.kind, ComponentKind::Instance) {
102                props.insert("running", ElementValue::String(Arc::from("true")));
103            }
104
105            let element = make_node(&node_id, &[label], props);
106            if event_tx
107                .send(BootstrapEvent {
108                    source_id: COMPONENT_GRAPH_SOURCE_ID.to_string(),
109                    change: SourceChange::Insert { element },
110                    timestamp: chrono::Utc::now(),
111                    sequence: count,
112                })
113                .await
114                .is_err()
115            {
116                log::warn!(
117                    "Bootstrap node event dropped (channel closed) for source '{COMPONENT_GRAPH_SOURCE_ID}'"
118                );
119            }
120            count += 1;
121        }
122
123        // Emit all edges as relations
124        for edge in &snapshot.edges {
125            // Skip edges involving the component graph source itself
126            if edge.from == COMPONENT_GRAPH_SOURCE_ID || edge.to == COMPONENT_GRAPH_SOURCE_ID {
127                continue;
128            }
129
130            let (label, rel_id, from_prefix, to_prefix) = match &edge.relationship {
131                RelationshipKind::Owns => {
132                    // Instance → Component: emit HAS_SOURCE/HAS_QUERY/HAS_REACTION
133                    let to_node = snapshot.nodes.iter().find(|n| n.id == edge.to);
134                    let (has_label, rel_prefix) = match to_node.map(|n| &n.kind) {
135                        Some(ComponentKind::Source) => ("HAS_SOURCE", "has_source"),
136                        Some(ComponentKind::Query) => ("HAS_QUERY", "has_query"),
137                        Some(ComponentKind::Reaction) => ("HAS_REACTION", "has_reaction"),
138                        _ => continue,
139                    };
140                    let rel_id = format!("rel:{rel_prefix}:{}:{}", edge.from, edge.to);
141                    (has_label, rel_id, "instance", "")
142                }
143                RelationshipKind::Feeds => {
144                    // Source→Query or Query→Reaction
145                    let from_node = snapshot.nodes.iter().find(|n| n.id == edge.from);
146                    match from_node.map(|n| &n.kind) {
147                        Some(ComponentKind::Source) => {
148                            // Source feeds Query → SUBSCRIBES_TO (Query→Source in Cypher)
149                            let rel_id = format!("rel:subscribes:{}:{}", edge.to, edge.from);
150                            ("SUBSCRIBES_TO", rel_id, "query", "source")
151                        }
152                        Some(ComponentKind::Query) => {
153                            // Query feeds Reaction → LISTENS_TO (Reaction→Query in Cypher)
154                            let rel_id = format!("rel:listens:{}:{}", edge.to, edge.from);
155                            ("LISTENS_TO", rel_id, "reaction", "query")
156                        }
157                        _ => continue,
158                    }
159                }
160                // Skip reverse/non-data edges (OwnedBy, SubscribesTo, etc.)
161                _ => continue,
162            };
163
164            let (in_node_id, out_node_id) = match &edge.relationship {
165                RelationshipKind::Owns => {
166                    let to_node = snapshot.nodes.iter().find(|n| n.id == edge.to);
167                    let to_prefix = match to_node.map(|n| &n.kind) {
168                        Some(ComponentKind::Source) => "source",
169                        Some(ComponentKind::Query) => "query",
170                        Some(ComponentKind::Reaction) => "reaction",
171                        _ => continue,
172                    };
173                    (
174                        format!("{from_prefix}:{}", edge.from),
175                        format!("{to_prefix}:{}", edge.to),
176                    )
177                }
178                RelationshipKind::Feeds => (
179                    format!("{from_prefix}:{}", edge.to),
180                    format!("{to_prefix}:{}", edge.from),
181                ),
182                _ => continue,
183            };
184
185            let rel = make_relation(
186                &rel_id,
187                &[label],
188                &in_node_id,
189                &out_node_id,
190                ElementPropertyMap::new(),
191            );
192            if event_tx
193                .send(BootstrapEvent {
194                    source_id: COMPONENT_GRAPH_SOURCE_ID.to_string(),
195                    change: SourceChange::Insert { element: rel },
196                    timestamp: chrono::Utc::now(),
197                    sequence: count,
198                })
199                .await
200                .is_err()
201            {
202                log::warn!(
203                    "Bootstrap edge event dropped (channel closed) for source '{COMPONENT_GRAPH_SOURCE_ID}'"
204                );
205            }
206            count += 1;
207        }
208
209        info!(
210            "Component graph bootstrap complete: {} elements for query '{}'",
211            count, request.query_id
212        );
213        Ok(BootstrapResult {
214            event_count: count as usize,
215            last_sequence: None,
216            sequences_aligned: false,
217        })
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224    use drasi_core::models::Element;
225    use std::collections::HashMap;
226    use tokio::sync::mpsc;
227
228    fn make_request(query_id: &str) -> BootstrapRequest {
229        BootstrapRequest {
230            query_id: query_id.to_string(),
231            node_labels: vec![],
232            relation_labels: vec![],
233            request_id: "test-request".to_string(),
234        }
235    }
236
237    fn make_context() -> BootstrapContext {
238        BootstrapContext::new_minimal(
239            "test-server".to_string(),
240            COMPONENT_GRAPH_SOURCE_ID.to_string(),
241        )
242    }
243
244    #[test]
245    fn test_new_creates_provider_with_graph_reference() {
246        let (graph, _rx) = ComponentGraph::new("test-instance");
247        let graph = Arc::new(RwLock::new(graph));
248        let _provider = ComponentGraphBootstrapProvider::new(graph);
249    }
250
251    #[tokio::test]
252    async fn test_bootstrap_empty_graph() {
253        let (graph, _rx) = ComponentGraph::new("test-instance");
254        let graph = Arc::new(RwLock::new(graph));
255        let provider = ComponentGraphBootstrapProvider::new(graph);
256
257        let (tx, mut rx) = mpsc::channel::<BootstrapEvent>(100);
258        let request = make_request("test-query");
259        let context = make_context();
260
261        let result = provider
262            .bootstrap(request, &context, tx, None)
263            .await
264            .unwrap();
265
266        // Only the instance root node is present; it is emitted as DrasiInstance
267        assert_eq!(result.event_count, 1);
268
269        let event = rx.recv().await.unwrap();
270        match &event.change {
271            SourceChange::Insert { element } => match element {
272                Element::Node { metadata, .. } => {
273                    let labels: Vec<&str> = metadata.labels.iter().map(|l| l.as_ref()).collect();
274                    assert!(labels.contains(&"DrasiInstance"));
275                }
276                _ => panic!("Expected Node element for instance"),
277            },
278            _ => panic!("Expected Insert change"),
279        }
280    }
281
282    #[tokio::test]
283    async fn test_bootstrap_with_sources_and_queries() {
284        let (mut graph, _rx) = ComponentGraph::new("test-instance");
285        graph.register_source("src1", HashMap::new()).unwrap();
286        graph.register_source("src2", HashMap::new()).unwrap();
287        graph
288            .register_query(
289                "q1",
290                HashMap::new(),
291                &["src1".to_string(), "src2".to_string()],
292            )
293            .unwrap();
294
295        let graph = Arc::new(RwLock::new(graph));
296        let provider = ComponentGraphBootstrapProvider::new(graph);
297
298        let (tx, mut rx) = mpsc::channel::<BootstrapEvent>(100);
299        let request = make_request("test-query");
300        let context = make_context();
301
302        let result = provider
303            .bootstrap(request, &context, tx, None)
304            .await
305            .unwrap();
306
307        let mut events = Vec::new();
308        while let Ok(event) = rx.try_recv() {
309            events.push(event);
310        }
311
312        assert_eq!(result.event_count, events.len());
313
314        let mut node_count = 0;
315        let mut relation_count = 0;
316        for event in &events {
317            match &event.change {
318                SourceChange::Insert { element } => match element {
319                    Element::Node { .. } => node_count += 1,
320                    Element::Relation { .. } => relation_count += 1,
321                },
322                _ => panic!("Expected Insert change"),
323            }
324        }
325
326        // 4 nodes: instance + src1 + src2 + q1
327        assert_eq!(node_count, 4);
328        // 5 relations: HAS_SOURCE×2 + HAS_QUERY×1 + SUBSCRIBES_TO×2
329        assert_eq!(relation_count, 5);
330        assert_eq!(result.event_count, 9);
331    }
332
333    #[tokio::test]
334    async fn test_bootstrap_emitted_labels_match_component_kind() {
335        let (mut graph, _rx) = ComponentGraph::new("test-instance");
336        graph.register_source("my-source", HashMap::new()).unwrap();
337        graph
338            .register_query("my-query", HashMap::new(), &["my-source".to_string()])
339            .unwrap();
340        graph
341            .register_reaction("my-reaction", HashMap::new(), &["my-query".to_string()])
342            .unwrap();
343
344        let graph = Arc::new(RwLock::new(graph));
345        let provider = ComponentGraphBootstrapProvider::new(graph);
346
347        let (tx, mut rx) = mpsc::channel::<BootstrapEvent>(100);
348        let request = make_request("test-query");
349        let context = make_context();
350
351        provider
352            .bootstrap(request, &context, tx, None)
353            .await
354            .unwrap();
355
356        let mut events = Vec::new();
357        while let Ok(event) = rx.try_recv() {
358            events.push(event);
359        }
360
361        let mut found_instance = false;
362        let mut found_source = false;
363        let mut found_query = false;
364        let mut found_reaction = false;
365
366        for event in &events {
367            if let SourceChange::Insert {
368                element: Element::Node { metadata, .. },
369            } = &event.change
370            {
371                let labels: Vec<&str> = metadata.labels.iter().map(|l| l.as_ref()).collect();
372                if labels.contains(&"DrasiInstance") {
373                    found_instance = true;
374                }
375                if labels.contains(&"Source") {
376                    found_source = true;
377                }
378                if labels.contains(&"Query") {
379                    found_query = true;
380                }
381                if labels.contains(&"Reaction") {
382                    found_reaction = true;
383                }
384            }
385        }
386
387        assert!(found_instance, "Should emit DrasiInstance label");
388        assert!(found_source, "Should emit Source label");
389        assert!(found_query, "Should emit Query label");
390        assert!(found_reaction, "Should emit Reaction label");
391    }
392}