Skip to main content

drasi_lib/component_graph/
graph.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use petgraph::stable_graph::{EdgeIndex, NodeIndex, StableGraph};
20use petgraph::visit::EdgeRef;
21use petgraph::Direction;
22use tokio::sync::{broadcast, mpsc, Notify};
23
24use crate::channels::{ComponentEvent, ComponentEventBroadcastReceiver, ComponentStatus};
25use crate::managers::ComponentEventHistory;
26
27use super::transaction::GraphTransaction;
28use super::{
29    ComponentKind, ComponentNode, ComponentUpdate, ComponentUpdateReceiver, ComponentUpdateSender,
30    GraphEdge, GraphSnapshot, RelationshipKind,
31};
32
33// ============================================================================
34// Component Graph
35// ============================================================================
36
37/// Central component dependency graph — the single source of truth.
38///
39/// Backed by `petgraph::stable_graph::StableGraph` which keeps node/edge indices
40/// stable across removals (critical for a graph that changes at runtime).
41///
42/// # Error Handling
43///
44/// Mutation methods on this struct return `anyhow::Result`, following the Layer 2
45/// (internal module) error convention. The public API boundary in `DrasiLib` and
46/// `lib_core_ops` wraps these into `DrasiError` variants before returning to callers.
47/// External consumers should use `DrasiLib` methods rather than calling graph
48/// mutations directly.
49///
50/// # Event Emission
51///
52/// The graph emits [`ComponentEvent`]s via a built-in broadcast channel whenever
53/// components are added, removed, or change status. Call [`subscribe()`](Self::subscribe)
54/// to receive events.
55///
56/// # Thread Safety
57///
58/// `ComponentGraph` is NOT `Send`/`Sync` by itself due to the underlying `StableGraph`.
59/// It must be wrapped in `Arc<RwLock<ComponentGraph>>` for multi-threaded access.
60/// All public APIs in `DrasiLib` handle this wrapping automatically.
61///
62/// # Instance Root
63///
64/// The graph always has the DrasiLib instance as its root node.
65/// All other components are connected to it via Owns/OwnedBy edges.
66pub struct ComponentGraph {
67    /// The underlying petgraph directed graph
68    pub(super) graph: StableGraph<ComponentNode, RelationshipKind>,
69    /// Fast lookup: component ID → node index (O(1) access)
70    pub(super) index: HashMap<String, NodeIndex>,
71    /// The instance node index (always present)
72    instance_idx: NodeIndex,
73    /// Broadcast sender for component lifecycle events (fan-out to subscribers).
74    /// Events are emitted by `add_component()`, `remove_component()`, `update_status()`,
75    /// and the graph update loop.
76    pub(super) event_tx: broadcast::Sender<ComponentEvent>,
77    /// mpsc sender for component status updates (fan-in from components).
78    /// Cloned and given to each component's Base struct. The graph update loop
79    /// owns the corresponding receiver.
80    update_tx: mpsc::Sender<ComponentUpdate>,
81    /// Notifies waiters when any component's status changes.
82    /// Used by `wait_for_status()` to replace polling loops with event-driven waits.
83    /// Follows the same pattern as `PriorityQueue::notify`.
84    status_notify: Arc<Notify>,
85    /// Type-erased runtime instances, keyed by component ID.
86    ///
87    /// Managers store their `Arc<dyn Source>`, `Arc<dyn Query>`, `Arc<dyn Reaction>`, etc.
88    /// here during provisioning. This eliminates the dual-registry pattern where each
89    /// manager maintained its own HashMap — the graph is now the single store for both
90    /// metadata (in the petgraph node) and runtime instances (here).
91    ///
92    /// Access via typed helpers: [`set_runtime()`], [`get_runtime()`], [`take_runtime()`].
93    runtimes: HashMap<String, Box<dyn Any + Send + Sync>>,
94    /// Centralized event history for all components.
95    ///
96    /// Stores lifecycle events (Starting, Running, Error, Stopped, etc.) for every
97    /// component in the graph. Events are recorded by [`apply_update()`] and
98    /// [`remove_component()`]. Managers delegate event queries here rather than
99    /// maintaining their own per-manager histories.
100    event_history: ComponentEventHistory,
101}
102
103/// Default broadcast channel capacity for component events.
104const EVENT_CHANNEL_CAPACITY: usize = 1000;
105
106/// Default mpsc channel capacity for component updates.
107const UPDATE_CHANNEL_CAPACITY: usize = 1000;
108
109impl ComponentGraph {
110    /// Create a new component graph with the given instance as root node.
111    ///
112    /// Returns the graph and a [`ComponentUpdateReceiver`] that must be consumed by
113    /// a graph update loop task (see [`Self::apply_update`]).
114    pub fn new(instance_id: &str) -> (Self, ComponentUpdateReceiver) {
115        let mut graph = StableGraph::new();
116        let instance_node = ComponentNode {
117            id: instance_id.to_string(),
118            kind: ComponentKind::Instance,
119            status: ComponentStatus::Running,
120            metadata: HashMap::new(),
121        };
122        let instance_idx = graph.add_node(instance_node);
123
124        let mut index = HashMap::new();
125        index.insert(instance_id.to_string(), instance_idx);
126        let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
127        let (update_tx, update_rx) = mpsc::channel(UPDATE_CHANNEL_CAPACITY);
128
129        (
130            Self {
131                graph,
132                index,
133                instance_idx,
134                event_tx,
135                update_tx,
136                status_notify: Arc::new(Notify::new()),
137                runtimes: HashMap::new(),
138                event_history: ComponentEventHistory::new(),
139            },
140            update_rx,
141        )
142    }
143
144    /// Subscribe to component lifecycle events.
145    ///
146    /// Returns a broadcast receiver that gets a copy of every [`ComponentEvent`]
147    /// emitted by graph mutations (`add_component`, `remove_component`, `update_status`).
148    pub fn subscribe(&self) -> ComponentEventBroadcastReceiver {
149        self.event_tx.subscribe()
150    }
151
152    /// Get a reference to the broadcast sender for component events.
153    ///
154    /// This allows callers to clone the sender before the graph is wrapped in
155    /// `Arc<RwLock<>>`, enabling subscription without needing to acquire the lock.
156    /// The returned sender is the same one used by the graph for event emission.
157    pub fn event_sender(&self) -> &broadcast::Sender<ComponentEvent> {
158        &self.event_tx
159    }
160
161    /// Get a clone of the mpsc update sender.
162    ///
163    /// This is the sender that components use to report status changes without
164    /// acquiring any graph lock. Clone this and pass to `SourceBase`/`ReactionBase`.
165    pub fn update_sender(&self) -> ComponentUpdateSender {
166        self.update_tx.clone()
167    }
168
169    /// Get a clone of the status change notifier.
170    ///
171    /// This `Notify` is signalled whenever any component's status changes in the graph.
172    /// Use it with [`wait_for_status`] or build custom wait loops that avoid polling
173    /// with sleep.
174    ///
175    /// # Pattern
176    ///
177    /// ```ignore
178    /// let notify = graph.status_notifier();
179    /// // Register interest BEFORE releasing the graph lock
180    /// let notified = notify.notified();
181    /// drop(graph); // release lock
182    /// notified.await; // woken when any status changes
183    /// ```
184    pub fn status_notifier(&self) -> Arc<Notify> {
185        self.status_notify.clone()
186    }
187
188    /// Apply a [`ComponentUpdate`] received from the mpsc channel.
189    ///
190    /// Called by the graph update loop task. Updates the graph, emits a
191    /// broadcast event, and records the event in the centralized event history.
192    /// Returns the event for external logging/processing.
193    pub fn apply_update(&mut self, update: ComponentUpdate) -> Option<ComponentEvent> {
194        match update {
195            ComponentUpdate::Status {
196                component_id,
197                status,
198                message,
199            } => match self.update_status_with_message(&component_id, status, message) {
200                Ok(event) => event,
201                Err(e) => {
202                    tracing::debug!(
203                        "Graph update loop: status update skipped for '{}': {e}",
204                        component_id
205                    );
206                    None
207                }
208            },
209        }
210    }
211
212    /// Emit a [`ComponentEvent`] to all broadcast subscribers and return it.
213    ///
214    /// Returns `Some(event)` if the component kind maps to a [`ComponentType`],
215    /// `None` for kinds like `Instance` that have no external type.
216    /// Silently ignores broadcast send failures (no subscribers connected).
217    fn emit_event(
218        &self,
219        component_id: &str,
220        kind: &ComponentKind,
221        status: ComponentStatus,
222        message: Option<String>,
223    ) -> Option<ComponentEvent> {
224        if let Some(component_type) = kind.to_component_type() {
225            let event = ComponentEvent {
226                component_id: component_id.to_string(),
227                component_type,
228                status,
229                timestamp: chrono::Utc::now(),
230                message,
231            };
232            let _ = self.event_tx.send(event.clone());
233            Some(event)
234        } else {
235            None
236        }
237    }
238
239    /// Get the instance ID (root node)
240    pub fn instance_id(&self) -> &str {
241        &self.graph[self.instance_idx].id
242    }
243
244    // ========================================================================
245    // Runtime Instance Store
246    // ========================================================================
247
248    /// Store a runtime instance for a component.
249    ///
250    /// The component must already exist in the graph (registered via `add_component`
251    /// or one of the `register_*` methods). The runtime instance is stored in a
252    /// type-erased map keyed by component ID.
253    ///
254    /// Managers call this during provisioning after creating the runtime instance:
255    /// ```ignore
256    /// let source: Arc<dyn Source> = Arc::new(my_source);
257    /// source.initialize(context).await;
258    /// graph.set_runtime("source-1", Box::new(source))?;
259    /// ```
260    ///
261    /// # Errors
262    ///
263    /// Returns an error if the component ID is not present in the graph.
264    pub fn set_runtime(
265        &mut self,
266        id: &str,
267        runtime: Box<dyn Any + Send + Sync>,
268    ) -> anyhow::Result<()> {
269        if !self.index.contains_key(id) {
270            return Err(anyhow::anyhow!(
271                "set_runtime called for component '{id}' which is not in the graph"
272            ));
273        }
274
275        // Warn if the runtime type doesn't match the component kind.
276        // This catches type mismatches during development. Uses a warning rather
277        // than a hard assert to avoid breaking unit tests that use substitute types.
278        #[cfg(debug_assertions)]
279        if let Some(node) = self.get_component(id) {
280            let kind = &node.kind;
281            let type_ok = match kind {
282                ComponentKind::Source => runtime
283                    .downcast_ref::<std::sync::Arc<dyn crate::sources::Source>>()
284                    .is_some(),
285                ComponentKind::Query => runtime
286                    .downcast_ref::<std::sync::Arc<dyn crate::queries::manager::Query>>()
287                    .is_some(),
288                ComponentKind::Reaction => runtime
289                    .downcast_ref::<std::sync::Arc<dyn crate::reactions::Reaction>>()
290                    .is_some(),
291                _ => true,
292            };
293            if !type_ok {
294                tracing::warn!(
295                    "set_runtime: possible type mismatch for component '{id}' (kind={kind})"
296                );
297            }
298        }
299
300        self.runtimes.insert(id.to_string(), runtime);
301        Ok(())
302    }
303
304    /// Retrieve a reference to a component's runtime instance, downcasting to `T`.
305    ///
306    /// Returns `None` if the component has no runtime instance or if the stored
307    /// type doesn't match `T`.
308    ///
309    /// # Type Parameter
310    ///
311    /// `T` is typically `Arc<dyn Source>`, `Arc<dyn Query>`, or `Arc<dyn Reaction>`.
312    ///
313    /// # Example
314    ///
315    /// ```ignore
316    /// let source: &Arc<dyn Source> = graph.get_runtime::<Arc<dyn Source>>("source-1")
317    ///     .ok_or_else(|| anyhow!("Source not found"))?;
318    /// let source = source.clone(); // Clone the Arc for use outside the lock
319    /// ```
320    pub fn get_runtime<T: 'static>(&self, id: &str) -> Option<&T> {
321        self.runtimes.get(id).and_then(|r| r.downcast_ref::<T>())
322    }
323
324    /// Remove and return a component's runtime instance, downcasting to `T`.
325    ///
326    /// Returns `None` if the component has no runtime instance or if the stored
327    /// type doesn't match `T`. On type mismatch, the runtime is **put back** into
328    /// the store (not lost) and an error is logged.
329    ///
330    /// Used during teardown when the manager needs ownership of the instance
331    /// (e.g., to call `deprovision()`).
332    pub fn take_runtime<T: 'static>(&mut self, id: &str) -> Option<T> {
333        let runtime = self.runtimes.remove(id)?;
334        match runtime.downcast::<T>() {
335            Ok(boxed) => Some(*boxed),
336            Err(runtime) => {
337                tracing::error!(
338                    "take_runtime: type mismatch for component '{id}', putting runtime back"
339                );
340                self.runtimes.insert(id.to_string(), runtime);
341                None
342            }
343        }
344    }
345
346    /// Check if a component has a runtime instance stored.
347    pub fn has_runtime(&self, id: &str) -> bool {
348        self.runtimes.contains_key(id)
349    }
350
351    // ========================================================================
352    // Node Operations
353    // ========================================================================
354
355    /// Add a component node to the graph.
356    ///
357    /// Automatically creates bidirectional Owns/OwnedBy edges between the
358    /// instance root and the new component. Emits a [`ComponentEvent`] with
359    /// the node's current status to all subscribers.
360    ///
361    /// # Errors
362    ///
363    /// Returns an error if a component with the same ID already exists.
364    pub fn add_component(&mut self, node: ComponentNode) -> anyhow::Result<NodeIndex> {
365        let (node_idx, event, _, _) = self.add_component_internal(node)?;
366        if let Some(event) = event {
367            let _ = self.event_tx.send(event);
368        }
369        Ok(node_idx)
370    }
371
372    /// Internal: adds a component and returns the event without emitting it.
373    /// Used by both `add_component()` (emits immediately) and `GraphTransaction`
374    /// (defers emission to commit).
375    pub(super) fn add_component_internal(
376        &mut self,
377        node: ComponentNode,
378    ) -> anyhow::Result<(NodeIndex, Option<ComponentEvent>, EdgeIndex, EdgeIndex)> {
379        if self.index.contains_key(&node.id) {
380            return Err(anyhow::anyhow!(
381                "{} '{}' already exists in the graph",
382                node.kind,
383                node.id
384            ));
385        }
386
387        let id = node.id.clone();
388        let kind = node.kind.clone();
389        let status = node.status;
390        let node_idx = self.graph.add_node(node);
391        self.index.insert(id.clone(), node_idx);
392
393        // Create bidirectional ownership edges (Instance ↔ Component)
394        let owns_edge = self
395            .graph
396            .add_edge(self.instance_idx, node_idx, RelationshipKind::Owns);
397        let owned_by_edge =
398            self.graph
399                .add_edge(node_idx, self.instance_idx, RelationshipKind::OwnedBy);
400
401        let event = kind
402            .to_component_type()
403            .map(|component_type| ComponentEvent {
404                component_id: id,
405                component_type,
406                status,
407                timestamp: chrono::Utc::now(),
408                message: Some(format!("{kind} added")),
409            });
410
411        Ok((node_idx, event, owns_edge, owned_by_edge))
412    }
413
414    /// Remove a component node and all its edges from the graph.
415    ///
416    /// Emits a [`ComponentEvent`] with status [`ComponentStatus::Stopped`] to all
417    /// subscribers. Returns the removed node data, or an error if the component
418    /// doesn't exist. The instance root node cannot be removed.
419    pub fn remove_component(&mut self, id: &str) -> anyhow::Result<ComponentNode> {
420        let node_idx = self
421            .index
422            .get(id)
423            .copied()
424            .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in graph"))?;
425
426        if node_idx == self.instance_idx {
427            return Err(anyhow::anyhow!("Cannot remove the instance root node"));
428        }
429
430        // Capture kind before removing so we can emit an event
431        let kind = self.graph[node_idx].kind.clone();
432
433        self.index.remove(id);
434        // Remove runtime instance if present (atomic with node removal)
435        self.runtimes.remove(id);
436        // Remove event history for this component
437        self.event_history.remove_component(id);
438        // StableGraph::remove_node automatically removes all edges connected to this node
439        let removed = self
440            .graph
441            .remove_node(node_idx)
442            .ok_or_else(|| anyhow::anyhow!("Component '{id}' already removed"))?;
443
444        self.emit_event(
445            id,
446            &kind,
447            ComponentStatus::Stopped,
448            Some(format!("{kind} removed")),
449        );
450
451        Ok(removed)
452    }
453
454    /// Get a component node by ID.
455    pub fn get_component(&self, id: &str) -> Option<&ComponentNode> {
456        self.index
457            .get(id)
458            .and_then(|idx| self.graph.node_weight(*idx))
459    }
460
461    /// Get a mutable reference to a component node by ID.
462    pub fn get_component_mut(&mut self, id: &str) -> Option<&mut ComponentNode> {
463        self.index
464            .get(id)
465            .copied()
466            .and_then(|idx| self.graph.node_weight_mut(idx))
467    }
468
469    /// Check if a component exists in the graph.
470    pub fn contains(&self, id: &str) -> bool {
471        self.index.contains_key(id)
472    }
473
474    /// List all components of a specific kind with their status.
475    pub fn list_by_kind(&self, kind: &ComponentKind) -> Vec<(String, ComponentStatus)> {
476        self.graph
477            .node_weights()
478            .filter(|node| &node.kind == kind)
479            .map(|node| (node.id.clone(), node.status))
480            .collect()
481    }
482
483    /// Update a component's status.
484    ///
485    /// Emits a [`ComponentEvent`] with the new status to all subscribers.
486    /// Used internally by [`apply_update`] and tests.
487    pub(super) fn update_status(
488        &mut self,
489        id: &str,
490        status: ComponentStatus,
491    ) -> anyhow::Result<Option<ComponentEvent>> {
492        self.update_status_with_message(id, status, None)
493    }
494
495    /// Update a component's status with an optional message.
496    ///
497    /// Emits a [`ComponentEvent`] with the new status and message to all broadcast
498    /// subscribers AND records it in the centralized event history. This ensures
499    /// events are visible to both global subscribers (via broadcast) and per-component
500    /// subscribers (via event history channels).
501    ///
502    /// Called by [`apply_update`] in the graph update loop and by
503    /// [`validate_and_transition`] for command-initiated transitions.
504    fn update_status_with_message(
505        &mut self,
506        id: &str,
507        status: ComponentStatus,
508        message: Option<String>,
509    ) -> anyhow::Result<Option<ComponentEvent>> {
510        let node = self
511            .get_component_mut(id)
512            .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in graph"))?;
513        let kind = node.kind.clone();
514
515        // Same-state updates are idempotent no-ops (no event, no warning)
516        if node.status == status {
517            return Ok(None);
518        }
519
520        if !is_valid_transition(&node.status, &status) {
521            tracing::warn!(
522                "Invalid state transition for component '{}': {:?} → {:?}, ignoring update",
523                id,
524                node.status,
525                status
526            );
527            return Ok(None);
528        }
529
530        node.status = status;
531
532        // Wake up any waiters blocking on status changes (e.g., wait_for_status)
533        self.status_notify.notify_waiters();
534
535        let event = self.emit_event(id, &kind, status, message);
536        if let Some(ref event) = event {
537            self.event_history.record_event(event.clone());
538        }
539        Ok(event)
540    }
541
542    // ========================================================================
543    // Edge Operations
544    // ========================================================================
545
546    /// Add a bidirectional relationship between two components (idempotent).
547    ///
548    /// Creates both the forward edge (from → to with `forward` relationship) and
549    /// the reverse edge (to → from with the reverse of `forward`).
550    /// If the relationship already exists, this is a no-op and returns `Ok(())`.
551    ///
552    /// # Errors
553    ///
554    /// Returns an error if either component doesn't exist.
555    pub fn add_relationship(
556        &mut self,
557        from_id: &str,
558        to_id: &str,
559        forward: RelationshipKind,
560    ) -> anyhow::Result<()> {
561        let (_, _) = self.add_relationship_internal(from_id, to_id, forward)?;
562        Ok(())
563    }
564
565    /// Internal: adds a relationship and returns the edge indices for rollback.
566    /// Returns `(None, None)` if the relationship already exists (idempotent).
567    pub(super) fn add_relationship_internal(
568        &mut self,
569        from_id: &str,
570        to_id: &str,
571        forward: RelationshipKind,
572    ) -> anyhow::Result<(Option<EdgeIndex>, Option<EdgeIndex>)> {
573        let from_idx = self
574            .index
575            .get(from_id)
576            .copied()
577            .ok_or_else(|| anyhow::anyhow!("Component '{from_id}' not found in graph"))?;
578        let to_idx = self
579            .index
580            .get(to_id)
581            .copied()
582            .ok_or_else(|| anyhow::anyhow!("Component '{to_id}' not found in graph"))?;
583
584        // Validate the relationship is semantically valid for the node kinds
585        let from_kind = &self.graph[from_idx].kind;
586        let to_kind = &self.graph[to_idx].kind;
587        if !is_valid_relationship(from_kind, to_kind, &forward) {
588            return Err(anyhow::anyhow!(
589                "Invalid relationship: {forward:?} from {from_kind} '{from_id}' to {to_kind} '{to_id}'"
590            ));
591        }
592
593        // Idempotency: check if the forward edge already exists
594        let already_exists = self
595            .graph
596            .edges_directed(from_idx, Direction::Outgoing)
597            .any(|e| e.target() == to_idx && e.weight() == &forward);
598        if already_exists {
599            return Ok((None, None));
600        }
601
602        let reverse = forward.reverse();
603        let fwd_edge = self.graph.add_edge(from_idx, to_idx, forward);
604        let rev_edge = self.graph.add_edge(to_idx, from_idx, reverse);
605
606        Ok((Some(fwd_edge), Some(rev_edge)))
607    }
608
609    /// Remove a bidirectional relationship between two components.
610    ///
611    /// Removes both the forward edge (from → to with `forward` relationship) and
612    /// the reverse edge (to → from with the reverse of `forward`).
613    /// If the relationship doesn't exist, this is a no-op and returns `Ok(())`.
614    ///
615    /// # Errors
616    ///
617    /// Returns an error if either component doesn't exist in the graph.
618    pub fn remove_relationship(
619        &mut self,
620        from_id: &str,
621        to_id: &str,
622        forward: RelationshipKind,
623    ) -> anyhow::Result<()> {
624        let from_idx = self
625            .index
626            .get(from_id)
627            .copied()
628            .ok_or_else(|| anyhow::anyhow!("Component '{from_id}' not found in graph"))?;
629        let to_idx = self
630            .index
631            .get(to_id)
632            .copied()
633            .ok_or_else(|| anyhow::anyhow!("Component '{to_id}' not found in graph"))?;
634
635        let reverse = forward.reverse();
636
637        // Find and remove forward edge
638        let forward_edge = self
639            .graph
640            .edges_directed(from_idx, Direction::Outgoing)
641            .find(|e| e.target() == to_idx && e.weight() == &forward)
642            .map(|e| e.id());
643        if let Some(edge_id) = forward_edge {
644            self.graph.remove_edge(edge_id);
645        }
646
647        // Find and remove reverse edge
648        let reverse_edge = self
649            .graph
650            .edges_directed(to_idx, Direction::Outgoing)
651            .find(|e| e.target() == from_idx && e.weight() == &reverse)
652            .map(|e| e.id());
653        if let Some(edge_id) = reverse_edge {
654            self.graph.remove_edge(edge_id);
655        }
656
657        Ok(())
658    }
659
660    // ========================================================================
661    // Relationship Queries
662    // ========================================================================
663
664    /// Get all components that this component has outgoing edges to,
665    /// filtered by relationship kind.
666    pub fn get_neighbors(&self, id: &str, relationship: &RelationshipKind) -> Vec<&ComponentNode> {
667        let Some(&node_idx) = self.index.get(id) else {
668            return Vec::new();
669        };
670
671        self.graph
672            .edges_directed(node_idx, Direction::Outgoing)
673            .filter(|edge| edge.weight() == relationship)
674            .filter_map(|edge| self.graph.node_weight(edge.target()))
675            .collect()
676    }
677
678    /// Get all components that depend on the given component.
679    ///
680    /// "Dependents" are components that would be affected if this component
681    /// were removed or stopped. This follows Feeds edges (outgoing).
682    pub fn get_dependents(&self, id: &str) -> Vec<&ComponentNode> {
683        let Some(&node_idx) = self.index.get(id) else {
684            return Vec::new();
685        };
686
687        self.graph
688            .edges_directed(node_idx, Direction::Outgoing)
689            .filter(|edge| matches!(edge.weight(), RelationshipKind::Feeds))
690            .filter_map(|edge| self.graph.node_weight(edge.target()))
691            .collect()
692    }
693
694    /// Get all components that this component depends on.
695    ///
696    /// "Dependencies" are components that this component needs to function.
697    /// This follows SubscribesTo edges (outgoing).
698    pub fn get_dependencies(&self, id: &str) -> Vec<&ComponentNode> {
699        let Some(&node_idx) = self.index.get(id) else {
700            return Vec::new();
701        };
702
703        self.graph
704            .edges_directed(node_idx, Direction::Outgoing)
705            .filter(|edge| matches!(edge.weight(), RelationshipKind::SubscribesTo))
706            .filter_map(|edge| self.graph.node_weight(edge.target()))
707            .collect()
708    }
709
710    /// Check if a component can be safely removed (no dependents that would break).
711    ///
712    /// Returns Ok(()) if safe, or Err with the list of dependent component IDs.
713    pub fn can_remove(&self, id: &str) -> Result<(), Vec<String>> {
714        let dependents = self.get_dependents(id);
715        if dependents.is_empty() {
716            Ok(())
717        } else {
718            Err(dependents.iter().map(|n| n.id.clone()).collect())
719        }
720    }
721
722    // ========================================================================
723    // Lifecycle
724    // ========================================================================
725
726    /// Atomically validate and apply a commanded status transition.
727    ///
728    /// This is the **single canonical way** for managers to change a component's
729    /// status for command-initiated transitions (`Starting`, `Stopping`,
730    /// `Reconfiguring`). It combines validation and mutation under a single
731    /// `&mut self` borrow, eliminating the TOCTOU gap between checking status
732    /// and updating it.
733    ///
734    /// Components still report runtime-initiated transitions (`Running`,
735    /// `Stopped`, `Error`) via the mpsc channel → [`apply_update`].
736    ///
737    /// # Returns
738    ///
739    /// - `Ok(Some(event))` — transition applied, event emitted to broadcast subscribers
740    /// - `Ok(None)` — same-state no-op (component already in `target_status`)
741    /// - `Err(...)` — component not found or transition not valid from current state
742    ///
743    /// # Example
744    ///
745    /// ```ignore
746    /// let mut graph = self.graph.write().await;
747    /// graph.validate_and_transition("source-1", ComponentStatus::Starting, Some("Starting source"))?;
748    /// drop(graph); // release lock before calling source.start()
749    /// source.start().await?;
750    /// ```
751    pub fn validate_and_transition(
752        &mut self,
753        id: &str,
754        target_status: ComponentStatus,
755        message: Option<String>,
756    ) -> anyhow::Result<Option<ComponentEvent>> {
757        let node = self
758            .get_component(id)
759            .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in graph"))?;
760        let current = node.status;
761
762        // Same-state is an idempotent no-op
763        if current == target_status {
764            return Ok(None);
765        }
766
767        // Produce a descriptive error message for invalid transitions
768        if !is_valid_transition(&current, &target_status) {
769            let reason = describe_invalid_transition(id, &current, &target_status);
770            return Err(anyhow::anyhow!(reason));
771        }
772
773        // Transition is valid — apply it
774        self.update_status_with_message(id, target_status, message)
775    }
776
777    /// Get a topological ordering of components for lifecycle operations.
778    ///
779    /// Returns components in dependency order: sources first, then queries, then reactions.
780    /// Only follows Feeds edges for ordering (other edge types don't affect lifecycle order).
781    ///
782    /// The instance root node is excluded from the result.
783    pub fn topological_order(&self) -> anyhow::Result<Vec<&ComponentNode>> {
784        // Build a filtered subgraph with only Feeds edges for ordering
785        // (bidirectional edges like SubscribesTo/OwnedBy create cycles that
786        // would prevent toposort on the full graph)
787        let mut order_graph: StableGraph<(), ()> = StableGraph::new();
788        let mut idx_map: HashMap<NodeIndex, NodeIndex> = HashMap::new();
789
790        // Add all nodes
791        for node_idx in self.graph.node_indices() {
792            let new_idx = order_graph.add_node(());
793            idx_map.insert(node_idx, new_idx);
794        }
795
796        // Add only Feeds edges
797        for edge_idx in self.graph.edge_indices() {
798            if let Some(weight) = self.graph.edge_weight(edge_idx) {
799                if matches!(weight, RelationshipKind::Feeds) {
800                    if let Some((from, to)) = self.graph.edge_endpoints(edge_idx) {
801                        if let (Some(&new_from), Some(&new_to)) =
802                            (idx_map.get(&from), idx_map.get(&to))
803                        {
804                            order_graph.add_edge(new_from, new_to, ());
805                        }
806                    }
807                }
808            }
809        }
810
811        // Reverse map: new index → original index
812        let reverse_map: HashMap<NodeIndex, NodeIndex> =
813            idx_map.iter().map(|(&orig, &new)| (new, orig)).collect();
814
815        match petgraph::algo::toposort(&order_graph, None) {
816            Ok(sorted) => Ok(sorted
817                .into_iter()
818                .filter_map(|new_idx| reverse_map.get(&new_idx))
819                .filter(|idx| **idx != self.instance_idx)
820                .filter_map(|idx| self.graph.node_weight(*idx))
821                .collect()),
822            Err(_cycle) => Err(anyhow::anyhow!(
823                "Cycle detected in component graph — cannot determine lifecycle order"
824            )),
825        }
826    }
827
828    // ========================================================================
829    // Serialization
830    // ========================================================================
831
832    /// Create a serializable snapshot of the entire graph.
833    ///
834    /// The snapshot includes the instance root node and all components
835    /// with their relationships. Used for API responses and UI visualization.
836    pub fn snapshot(&self) -> GraphSnapshot {
837        let nodes: Vec<ComponentNode> = self.graph.node_weights().cloned().collect();
838
839        let edges: Vec<GraphEdge> = self
840            .graph
841            .edge_indices()
842            .filter_map(|edge_idx| {
843                let (from_idx, to_idx) = self.graph.edge_endpoints(edge_idx)?;
844                let from = self.graph.node_weight(from_idx)?;
845                let to = self.graph.node_weight(to_idx)?;
846                let relationship = self.graph.edge_weight(edge_idx)?;
847                Some(GraphEdge {
848                    from: from.id.clone(),
849                    to: to.id.clone(),
850                    relationship: relationship.clone(),
851                })
852            })
853            .collect();
854
855        GraphSnapshot {
856            instance_id: self.instance_id().to_string(),
857            nodes,
858            edges,
859        }
860    }
861
862    /// Get the total number of components (including the instance root).
863    pub fn node_count(&self) -> usize {
864        self.graph.node_count()
865    }
866
867    /// Get the total number of edges.
868    pub fn edge_count(&self) -> usize {
869        self.graph.edge_count()
870    }
871
872    // ========================================================================
873    // High-Level Registration (Source of Truth)
874    // ========================================================================
875
876    /// Register a source component in the graph.
877    ///
878    /// Creates the node and bidirectional ownership edges transactionally.
879    /// Events are emitted only on successful commit.
880    ///
881    /// # Errors
882    ///
883    /// Returns an error if a component with the same ID already exists.
884    pub fn register_source(
885        &mut self,
886        id: &str,
887        metadata: HashMap<String, String>,
888    ) -> anyhow::Result<()> {
889        let node = ComponentNode {
890            id: id.to_string(),
891            kind: ComponentKind::Source,
892            status: ComponentStatus::Stopped,
893            metadata,
894        };
895        let mut txn = self.begin();
896        txn.add_component(node)?;
897        txn.commit();
898        Ok(())
899    }
900
901    /// Register a query component with its source dependencies.
902    ///
903    /// Creates the node, ownership edges, and `Feeds` edges from each source.
904    /// All operations are transactional — if any dependency is missing or any
905    /// step fails, the entire registration is rolled back.
906    ///
907    /// # Errors
908    ///
909    /// Returns an error if:
910    /// - A component with the same ID already exists
911    /// - Any referenced source does not exist in the graph
912    pub fn register_query(
913        &mut self,
914        id: &str,
915        metadata: HashMap<String, String>,
916        source_ids: &[String],
917    ) -> anyhow::Result<()> {
918        // Validate all dependencies exist before starting the transaction
919        for source_id in source_ids {
920            if !self.contains(source_id) {
921                return Err(anyhow::anyhow!(
922                    "Cannot register query '{id}': referenced source '{source_id}' does not exist in the graph"
923                ));
924            }
925        }
926
927        let node = ComponentNode {
928            id: id.to_string(),
929            kind: ComponentKind::Query,
930            status: ComponentStatus::Stopped,
931            metadata,
932        };
933        let mut txn = self.begin();
934        txn.add_component(node)?;
935        for source_id in source_ids {
936            txn.add_relationship(source_id, id, RelationshipKind::Feeds)?;
937        }
938        txn.commit();
939        Ok(())
940    }
941
942    /// Register a reaction component with its query dependencies.
943    ///
944    /// Creates the node, ownership edges, and `Feeds` edges from each query.
945    /// All operations are transactional — if any dependency is missing or any
946    /// step fails, the entire registration is rolled back.
947    ///
948    /// # Errors
949    ///
950    /// Returns an error if:
951    /// - A component with the same ID already exists
952    /// - Any referenced query does not exist in the graph
953    pub fn register_reaction(
954        &mut self,
955        id: &str,
956        metadata: HashMap<String, String>,
957        query_ids: &[String],
958    ) -> anyhow::Result<()> {
959        // Validate all dependencies exist before starting the transaction
960        for query_id in query_ids {
961            if !self.contains(query_id) {
962                return Err(anyhow::anyhow!(
963                    "Cannot register reaction '{id}': referenced query '{query_id}' does not exist in the graph"
964                ));
965            }
966        }
967
968        let node = ComponentNode {
969            id: id.to_string(),
970            kind: ComponentKind::Reaction,
971            status: ComponentStatus::Stopped,
972            metadata,
973        };
974        let mut txn = self.begin();
975        txn.add_component(node)?;
976        for query_id in query_ids {
977            txn.add_relationship(query_id, id, RelationshipKind::Feeds)?;
978        }
979        txn.commit();
980        Ok(())
981    }
982
983    /// Deregister a component and all its edges from the graph.
984    ///
985    /// Validates that the component exists and has no dependents before removal.
986    /// The instance root node cannot be deregistered.
987    ///
988    /// # Errors
989    ///
990    /// Returns an error if:
991    /// - The component does not exist
992    /// - The component has dependents (use `can_remove()` to check first)
993    /// - The component is the instance root node
994    pub fn deregister(&mut self, id: &str) -> anyhow::Result<ComponentNode> {
995        // Validate no dependents
996        if let Err(dependent_ids) = self.can_remove(id) {
997            return Err(anyhow::anyhow!(
998                "Cannot deregister '{}': depended on by: {}",
999                id,
1000                dependent_ids.join(", ")
1001            ));
1002        }
1003        self.remove_component(id)
1004    }
1005
1006    /// Register a bootstrap provider in the graph for topology visibility.
1007    ///
1008    /// Creates the node and bidirectional ownership edges transactionally.
1009    /// Optionally links the provider to its target source via `Bootstraps` edges.
1010    ///
1011    /// # Usage
1012    ///
1013    /// Bootstrap providers are managed internally by source plugins (set via
1014    /// `Source::set_bootstrap_provider()`). Call this method to make a bootstrap
1015    /// provider visible in the component graph for topology visualization and
1016    /// dependency tracking.
1017    ///
1018    /// ```ignore
1019    /// // After adding a source with a bootstrap provider:
1020    /// let mut graph = core.component_graph().write().await;
1021    /// graph.register_bootstrap_provider("my-bootstrap", metadata, &["my-source".into()])?;
1022    /// ```
1023    ///
1024    /// # Errors
1025    ///
1026    /// Returns an error if:
1027    /// - A component with the same ID already exists
1028    /// - Any referenced source does not exist in the graph
1029    pub fn register_bootstrap_provider(
1030        &mut self,
1031        id: &str,
1032        metadata: HashMap<String, String>,
1033        source_ids: &[String],
1034    ) -> anyhow::Result<()> {
1035        for source_id in source_ids {
1036            if !self.contains(source_id) {
1037                return Err(anyhow::anyhow!(
1038                    "Cannot register bootstrap provider '{id}': referenced source '{source_id}' does not exist in the graph"
1039                ));
1040            }
1041        }
1042
1043        let node = ComponentNode {
1044            id: id.to_string(),
1045            kind: ComponentKind::BootstrapProvider,
1046            status: ComponentStatus::Stopped,
1047            metadata,
1048        };
1049        let mut txn = self.begin();
1050        txn.add_component(node)?;
1051        for source_id in source_ids {
1052            txn.add_relationship(id, source_id, RelationshipKind::Bootstraps)?;
1053        }
1054        txn.commit();
1055        Ok(())
1056    }
1057
1058    /// Register an identity provider in the graph for topology visibility.
1059    ///
1060    /// Creates the node and bidirectional ownership edges transactionally.
1061    /// Optionally links the provider to components it authenticates via
1062    /// `Authenticates` edges.
1063    ///
1064    /// # Current Status
1065    ///
1066    /// This method is **reserved for future use**. Identity provider support
1067    /// is not yet implemented in the component lifecycle. The registration
1068    /// infrastructure is in place for when authentication integration is added.
1069    ///
1070    /// # Errors
1071    ///
1072    /// Returns an error if:
1073    /// - A component with the same ID already exists
1074    /// - Any referenced component does not exist in the graph
1075    pub fn register_identity_provider(
1076        &mut self,
1077        id: &str,
1078        metadata: HashMap<String, String>,
1079        component_ids: &[String],
1080    ) -> anyhow::Result<()> {
1081        for component_id in component_ids {
1082            if !self.contains(component_id) {
1083                return Err(anyhow::anyhow!(
1084                    "Cannot register identity provider '{id}': referenced component '{component_id}' does not exist in the graph"
1085                ));
1086            }
1087        }
1088
1089        let node = ComponentNode {
1090            id: id.to_string(),
1091            kind: ComponentKind::IdentityProvider,
1092            status: ComponentStatus::Stopped,
1093            metadata,
1094        };
1095        let mut txn = self.begin();
1096        txn.add_component(node)?;
1097        for component_id in component_ids {
1098            txn.add_relationship(id, component_id, RelationshipKind::Authenticates)?;
1099        }
1100        txn.commit();
1101        Ok(())
1102    }
1103
1104    // ========================================================================
1105    // Transactions
1106    // ========================================================================
1107
1108    /// Begin a transactional mutation of the graph.
1109    ///
1110    /// Returns a [`GraphTransaction`] that collects mutations (nodes, edges)
1111    /// and defers event emission until [`commit()`](GraphTransaction::commit).
1112    /// If the transaction is dropped without being committed, all added nodes
1113    /// and edges are rolled back automatically.
1114    ///
1115    /// The `&mut self` borrow ensures compile-time exclusivity — no other code
1116    /// can access the graph while a transaction is in progress.
1117    ///
1118    /// # Example
1119    ///
1120    /// ```ignore
1121    /// let mut graph = self.graph.write().await;
1122    /// let mut txn = graph.begin();
1123    /// txn.add_component(source_node)?;
1124    /// txn.add_relationship("source-1", "query-1", RelationshipKind::Feeds)?;
1125    /// txn.commit(); // events emitted here; if this line is not reached, rollback on drop
1126    /// ```
1127    pub fn begin(&mut self) -> GraphTransaction<'_> {
1128        GraphTransaction::new(self)
1129    }
1130
1131    // ========================================================================
1132    // Event History (centralized)
1133    // ========================================================================
1134
1135    /// Record a component event in the centralized history.
1136    ///
1137    /// Called internally by [`apply_update()`]. Managers should NOT call this
1138    /// directly — status updates flow through the mpsc channel and are recorded
1139    /// automatically.
1140    pub fn record_event(&mut self, event: ComponentEvent) {
1141        self.event_history.record_event(event);
1142    }
1143
1144    /// Get all lifecycle events for a specific component.
1145    ///
1146    /// Returns events in chronological order (oldest first).
1147    /// Up to 100 most recent events are retained per component.
1148    pub fn get_events(&self, component_id: &str) -> Vec<ComponentEvent> {
1149        self.event_history.get_events(component_id)
1150    }
1151
1152    /// Get all lifecycle events across all components.
1153    ///
1154    /// Returns events sorted by timestamp (oldest first).
1155    pub fn get_all_events(&self) -> Vec<ComponentEvent> {
1156        self.event_history.get_all_events()
1157    }
1158
1159    /// Get the most recent error message for a component.
1160    pub fn get_last_error(&self, component_id: &str) -> Option<String> {
1161        self.event_history.get_last_error(component_id)
1162    }
1163
1164    /// Subscribe to live lifecycle events for a component.
1165    ///
1166    /// Returns the current history and a broadcast receiver for new events.
1167    /// Creates the component's event channel if it doesn't exist.
1168    pub fn subscribe_events(
1169        &mut self,
1170        component_id: &str,
1171    ) -> (Vec<ComponentEvent>, broadcast::Receiver<ComponentEvent>) {
1172        self.event_history.subscribe(component_id)
1173    }
1174}
1175
1176// ============================================================================
1177// Relationship Validation
1178// ============================================================================
1179
1180/// Check if a relationship kind is semantically valid between two component kinds.
1181///
1182/// This enforces the graph topology rules:
1183/// - **Feeds**: Source → Query, or Query → Reaction
1184/// - **Owns/OwnedBy**: Instance ↔ any component (created automatically)
1185/// - **Bootstraps**: BootstrapProvider → Source
1186/// - **Authenticates**: IdentityProvider → any component
1187pub(super) fn is_valid_relationship(
1188    from_kind: &ComponentKind,
1189    to_kind: &ComponentKind,
1190    relationship: &RelationshipKind,
1191) -> bool {
1192    use ComponentKind::*;
1193    use RelationshipKind::*;
1194    matches!(
1195        (from_kind, to_kind, relationship),
1196        // Data flow
1197        (Source, Query, Feeds)
1198            | (Query, Reaction, Feeds)
1199            // Ownership (auto-created by add_component_internal)
1200            | (Instance, _, Owns)
1201            // Bootstrap
1202            | (BootstrapProvider, Source, Bootstraps)
1203            // Authentication
1204            | (IdentityProvider, _, Authenticates)
1205    )
1206}
1207
1208// ============================================================================
1209// State Transition Validation
1210// ============================================================================
1211
1212/// Check if a status transition is valid according to the component lifecycle state machine.
1213///
1214/// ```text
1215/// Stopped ──→ Starting ──→ Running ──→ Stopping ──→ Stopped
1216///    │            │            │            │
1217///    │            ↓            ↓            ↓
1218///    │          Error        Error        Error
1219///    │            │
1220///    │            ↓
1221///    │         Stopped (aborted start)
1222///    │
1223///    ↓
1224/// Reconfiguring ──→ Stopped | Starting | Error
1225///
1226/// Error ──→ Starting (retry) | Stopped (reset)
1227/// ```
1228pub(super) fn is_valid_transition(from: &ComponentStatus, to: &ComponentStatus) -> bool {
1229    use ComponentStatus::*;
1230    matches!(
1231        (from, to),
1232        // Normal lifecycle
1233        (Stopped, Starting)
1234            | (Starting, Running)
1235            | (Starting, Error)
1236            | (Starting, Stopped) // aborted start
1237            | (Running, Stopping)
1238            | (Running, Stopped) // direct stop (async channel may skip Stopping)
1239            | (Running, Error)
1240            | (Stopping, Stopped)
1241            | (Stopping, Error)
1242            // Error recovery
1243            | (Error, Starting) // retry
1244            | (Error, Stopped) // reset
1245            // Reconfiguration (from any stable state)
1246            | (Stopped, Reconfiguring)
1247            | (Running, Reconfiguring)
1248            | (Error, Reconfiguring)
1249            | (Reconfiguring, Stopped)
1250            | (Reconfiguring, Starting)
1251            | (Reconfiguring, Error)
1252    )
1253}
1254
1255/// Produce a human-readable error message for an invalid transition.
1256///
1257/// These messages provide actionable feedback (e.g., "Component is already running"
1258/// instead of just "invalid transition").
1259fn describe_invalid_transition(id: &str, from: &ComponentStatus, to: &ComponentStatus) -> String {
1260    use ComponentStatus::*;
1261    match (from, to) {
1262        // Trying to start something that's already starting/running
1263        (Starting, Starting) => format!("Component '{id}' is already starting"),
1264        (Running, Starting) => format!("Component '{id}' is already running"),
1265        (Stopping, Starting) => {
1266            format!("Cannot start component '{id}' while it is stopping")
1267        }
1268        (Reconfiguring, Starting) => {
1269            // Reconfiguring → Starting is actually valid, so this shouldn't be reached,
1270            // but kept for safety
1271            format!("Cannot start component '{id}' while it is reconfiguring")
1272        }
1273        // Trying to stop something that's already stopped/stopping
1274        (Stopped, Stopping) => {
1275            format!("Cannot stop component '{id}': it is already stopped")
1276        }
1277        (Stopping, Stopping) => format!("Component '{id}' is already stopping"),
1278        (Error, Stopping) => {
1279            format!("Cannot stop component '{id}': it is in error state")
1280        }
1281        // Trying to reconfigure during a transition
1282        (Starting, Reconfiguring) => {
1283            format!("Cannot reconfigure component '{id}' while it is starting")
1284        }
1285        (Stopping, Reconfiguring) => {
1286            format!("Cannot reconfigure component '{id}' while it is stopping")
1287        }
1288        (Reconfiguring, Reconfiguring) => {
1289            format!("Component '{id}' is already reconfiguring")
1290        }
1291        // Generic fallback
1292        _ => format!("Invalid state transition for component '{id}': {from:?} → {to:?}"),
1293    }
1294}
1295
1296impl std::fmt::Debug for ComponentGraph {
1297    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1298        f.debug_struct("ComponentGraph")
1299            .field("instance_id", &self.instance_id())
1300            .field("node_count", &self.node_count())
1301            .field("edge_count", &self.edge_count())
1302            .finish()
1303    }
1304}