Skip to main content

drasi_lib/component_graph/
graph.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use petgraph::stable_graph::{EdgeIndex, NodeIndex, StableGraph};
20use petgraph::visit::EdgeRef;
21use petgraph::Direction;
22use tokio::sync::{broadcast, mpsc, Notify};
23
24use crate::channels::{ComponentEvent, ComponentEventBroadcastReceiver, ComponentStatus};
25use crate::managers::ComponentEventHistory;
26
27use super::transaction::GraphTransaction;
28use super::{
29    ComponentKind, ComponentNode, ComponentUpdate, ComponentUpdateReceiver, ComponentUpdateSender,
30    GraphEdge, GraphSnapshot, RelationshipKind,
31};
32
33// ============================================================================
34// Component Graph
35// ============================================================================
36
37/// Central component dependency graph — the single source of truth.
38///
39/// Backed by `petgraph::stable_graph::StableGraph` which keeps node/edge indices
40/// stable across removals (critical for a graph that changes at runtime).
41///
42/// # Error Handling
43///
44/// Mutation methods on this struct return `anyhow::Result`, following the Layer 2
45/// (internal module) error convention. The public API boundary in `DrasiLib` and
46/// `lib_core_ops` wraps these into `DrasiError` variants before returning to callers.
47/// External consumers should use `DrasiLib` methods rather than calling graph
48/// mutations directly.
49///
50/// # Event Emission
51///
52/// The graph emits [`ComponentEvent`]s via a built-in broadcast channel whenever
53/// components are added, removed, or change status. Call [`subscribe()`](Self::subscribe)
54/// to receive events.
55///
56/// # Thread Safety
57///
58/// `ComponentGraph` is NOT `Send`/`Sync` by itself due to the underlying `StableGraph`.
59/// It must be wrapped in `Arc<RwLock<ComponentGraph>>` for multi-threaded access.
60/// All public APIs in `DrasiLib` handle this wrapping automatically.
61///
62/// # Instance Root
63///
64/// The graph always has the DrasiLib instance as its root node.
65/// All other components are connected to it via Owns/OwnedBy edges.
66pub struct ComponentGraph {
67    /// The underlying petgraph directed graph
68    pub(super) graph: StableGraph<ComponentNode, RelationshipKind>,
69    /// Fast lookup: component ID → node index (O(1) access)
70    pub(super) index: HashMap<String, NodeIndex>,
71    /// The instance node index (always present)
72    instance_idx: NodeIndex,
73    /// Broadcast sender for component lifecycle events (fan-out to subscribers).
74    /// Events are emitted by `add_component()`, `remove_component()`, `update_status()`,
75    /// and the graph update loop.
76    pub(super) event_tx: broadcast::Sender<ComponentEvent>,
77    /// mpsc sender for component status updates (fan-in from components).
78    /// Cloned and given to each component's Base struct. The graph update loop
79    /// owns the corresponding receiver.
80    update_tx: mpsc::Sender<ComponentUpdate>,
81    /// Notifies waiters when any component's status changes.
82    /// Used by `wait_for_status()` to replace polling loops with event-driven waits.
83    /// Follows the same pattern as `PriorityQueue::notify`.
84    status_notify: Arc<Notify>,
85    /// Type-erased runtime instances, keyed by component ID.
86    ///
87    /// Managers store their `Arc<dyn Source>`, `Arc<dyn Query>`, `Arc<dyn Reaction>`, etc.
88    /// here during provisioning. This eliminates the dual-registry pattern where each
89    /// manager maintained its own HashMap — the graph is now the single store for both
90    /// metadata (in the petgraph node) and runtime instances (here).
91    ///
92    /// Access via typed helpers: [`set_runtime()`], [`get_runtime()`], [`take_runtime()`].
93    runtimes: HashMap<String, Box<dyn Any + Send + Sync>>,
94    /// Centralized event history for all components.
95    ///
96    /// Stores lifecycle events (Starting, Running, Error, Stopped, etc.) for every
97    /// component in the graph. Events are recorded by [`apply_update()`] and
98    /// [`remove_component()`]. Managers delegate event queries here rather than
99    /// maintaining their own per-manager histories.
100    pub(super) event_history: ComponentEventHistory,
101}
102
103/// Default broadcast channel capacity for component events.
104const EVENT_CHANNEL_CAPACITY: usize = 1000;
105
106/// Default mpsc channel capacity for component updates.
107const UPDATE_CHANNEL_CAPACITY: usize = 1000;
108
109impl ComponentGraph {
110    /// Create a new component graph with the given instance as root node.
111    ///
112    /// Returns the graph and a [`ComponentUpdateReceiver`] that must be consumed by
113    /// a graph update loop task (see [`Self::apply_update`]).
114    pub fn new(instance_id: &str) -> (Self, ComponentUpdateReceiver) {
115        let mut graph = StableGraph::new();
116        let instance_node = ComponentNode {
117            id: instance_id.to_string(),
118            kind: ComponentKind::Instance,
119            status: ComponentStatus::Running,
120            metadata: HashMap::new(),
121        };
122        let instance_idx = graph.add_node(instance_node);
123
124        let mut index = HashMap::new();
125        index.insert(instance_id.to_string(), instance_idx);
126        let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
127        let (update_tx, update_rx) = mpsc::channel(UPDATE_CHANNEL_CAPACITY);
128
129        (
130            Self {
131                graph,
132                index,
133                instance_idx,
134                event_tx,
135                update_tx,
136                status_notify: Arc::new(Notify::new()),
137                runtimes: HashMap::new(),
138                event_history: ComponentEventHistory::new(),
139            },
140            update_rx,
141        )
142    }
143
144    /// Subscribe to component lifecycle events.
145    ///
146    /// Returns a broadcast receiver that gets a copy of every [`ComponentEvent`]
147    /// emitted by graph mutations (`add_component`, `remove_component`, `update_status`).
148    pub fn subscribe(&self) -> ComponentEventBroadcastReceiver {
149        self.event_tx.subscribe()
150    }
151
152    /// Get a reference to the broadcast sender for component events.
153    ///
154    /// This allows callers to clone the sender before the graph is wrapped in
155    /// `Arc<RwLock<>>`, enabling subscription without needing to acquire the lock.
156    /// The returned sender is the same one used by the graph for event emission.
157    pub fn event_sender(&self) -> &broadcast::Sender<ComponentEvent> {
158        &self.event_tx
159    }
160
161    /// Get a clone of the mpsc update sender.
162    ///
163    /// This is the sender that components use to report status changes without
164    /// acquiring any graph lock. Clone this and pass to `SourceBase`/`ReactionBase`.
165    pub fn update_sender(&self) -> ComponentUpdateSender {
166        self.update_tx.clone()
167    }
168
169    /// Get a clone of the status change notifier.
170    ///
171    /// This `Notify` is signalled whenever any component's status changes in the graph.
172    /// Use it with [`wait_for_status`] or build custom wait loops that avoid polling
173    /// with sleep.
174    ///
175    /// # Pattern
176    ///
177    /// ```ignore
178    /// let notify = graph.status_notifier();
179    /// // Register interest BEFORE releasing the graph lock
180    /// let notified = notify.notified();
181    /// drop(graph); // release lock
182    /// notified.await; // woken when any status changes
183    /// ```
184    pub fn status_notifier(&self) -> Arc<Notify> {
185        self.status_notify.clone()
186    }
187
188    /// Apply a [`ComponentUpdate`] received from the mpsc channel.
189    ///
190    /// Called by the graph update loop task. Updates the graph, emits a
191    /// broadcast event, and records the event in the centralized event history.
192    /// Returns the event for external logging/processing.
193    pub fn apply_update(&mut self, update: ComponentUpdate) -> Option<ComponentEvent> {
194        match update {
195            ComponentUpdate::Status {
196                component_id,
197                status,
198                message,
199            } => match self.update_status_with_message(&component_id, status, message) {
200                Ok(event) => event,
201                Err(e) => {
202                    tracing::debug!(
203                        "Graph update loop: status update skipped for '{}': {e}",
204                        component_id
205                    );
206                    None
207                }
208            },
209        }
210    }
211
212    /// Emit a [`ComponentEvent`] to all broadcast subscribers and return it.
213    ///
214    /// Returns `Some(event)` if the component kind maps to a [`ComponentType`],
215    /// `None` for kinds like `Instance` that have no external type.
216    /// Silently ignores broadcast send failures (no subscribers connected).
217    fn emit_event(
218        &self,
219        component_id: &str,
220        kind: &ComponentKind,
221        status: ComponentStatus,
222        message: Option<String>,
223    ) -> Option<ComponentEvent> {
224        if let Some(component_type) = kind.to_component_type() {
225            let event = ComponentEvent {
226                component_id: component_id.to_string(),
227                component_type,
228                status,
229                timestamp: chrono::Utc::now(),
230                message,
231            };
232            let _ = self.event_tx.send(event.clone());
233            Some(event)
234        } else {
235            None
236        }
237    }
238
239    /// Get the instance ID (root node)
240    pub fn instance_id(&self) -> &str {
241        &self.graph[self.instance_idx].id
242    }
243
244    // ========================================================================
245    // Runtime Instance Store
246    // ========================================================================
247
248    /// Store a runtime instance for a component.
249    ///
250    /// The component must already exist in the graph (registered via `add_component`
251    /// or one of the `register_*` methods). The runtime instance is stored in a
252    /// type-erased map keyed by component ID.
253    ///
254    /// Managers call this during provisioning after creating the runtime instance:
255    /// ```ignore
256    /// let source: Arc<dyn Source> = Arc::new(my_source);
257    /// source.initialize(context).await;
258    /// graph.set_runtime("source-1", Box::new(source))?;
259    /// ```
260    ///
261    /// # Errors
262    ///
263    /// Returns an error if the component ID is not present in the graph.
264    pub fn set_runtime(
265        &mut self,
266        id: &str,
267        runtime: Box<dyn Any + Send + Sync>,
268    ) -> anyhow::Result<()> {
269        if !self.index.contains_key(id) {
270            return Err(anyhow::anyhow!(
271                "set_runtime called for component '{id}' which is not in the graph"
272            ));
273        }
274
275        // Warn if the runtime type doesn't match the component kind.
276        // This catches type mismatches during development. Uses a warning rather
277        // than a hard assert to avoid breaking unit tests that use substitute types.
278        #[cfg(debug_assertions)]
279        if let Some(node) = self.get_component(id) {
280            let kind = &node.kind;
281            let type_ok = match kind {
282                ComponentKind::Source => runtime
283                    .downcast_ref::<std::sync::Arc<dyn crate::sources::Source>>()
284                    .is_some(),
285                ComponentKind::Query => runtime
286                    .downcast_ref::<std::sync::Arc<dyn crate::queries::manager::Query>>()
287                    .is_some(),
288                ComponentKind::Reaction => runtime
289                    .downcast_ref::<std::sync::Arc<dyn crate::reactions::Reaction>>()
290                    .is_some(),
291                _ => true,
292            };
293            if !type_ok {
294                tracing::warn!(
295                    "set_runtime: possible type mismatch for component '{id}' (kind={kind})"
296                );
297            }
298        }
299
300        self.runtimes.insert(id.to_string(), runtime);
301        Ok(())
302    }
303
304    /// Retrieve a reference to a component's runtime instance, downcasting to `T`.
305    ///
306    /// Returns `None` if the component has no runtime instance or if the stored
307    /// type doesn't match `T`.
308    ///
309    /// # Type Parameter
310    ///
311    /// `T` is typically `Arc<dyn Source>`, `Arc<dyn Query>`, or `Arc<dyn Reaction>`.
312    ///
313    /// # Example
314    ///
315    /// ```ignore
316    /// let source: &Arc<dyn Source> = graph.get_runtime::<Arc<dyn Source>>("source-1")
317    ///     .ok_or_else(|| anyhow!("Source not found"))?;
318    /// let source = source.clone(); // Clone the Arc for use outside the lock
319    /// ```
320    pub fn get_runtime<T: 'static>(&self, id: &str) -> Option<&T> {
321        self.runtimes.get(id).and_then(|r| r.downcast_ref::<T>())
322    }
323
324    /// Remove and return a component's runtime instance, downcasting to `T`.
325    ///
326    /// Returns `None` if the component has no runtime instance or if the stored
327    /// type doesn't match `T`. On type mismatch, the runtime is **put back** into
328    /// the store (not lost) and an error is logged.
329    ///
330    /// Used during teardown when the manager needs ownership of the instance
331    /// (e.g., to call `deprovision()`).
332    pub fn take_runtime<T: 'static>(&mut self, id: &str) -> Option<T> {
333        let runtime = self.runtimes.remove(id)?;
334        match runtime.downcast::<T>() {
335            Ok(boxed) => Some(*boxed),
336            Err(runtime) => {
337                tracing::error!(
338                    "take_runtime: type mismatch for component '{id}', putting runtime back"
339                );
340                self.runtimes.insert(id.to_string(), runtime);
341                None
342            }
343        }
344    }
345
346    /// Check if a component has a runtime instance stored.
347    pub fn has_runtime(&self, id: &str) -> bool {
348        self.runtimes.contains_key(id)
349    }
350
351    // ========================================================================
352    // Node Operations
353    // ========================================================================
354
355    /// Add a component node to the graph.
356    ///
357    /// Automatically creates bidirectional Owns/OwnedBy edges between the
358    /// instance root and the new component. Emits a [`ComponentEvent`] with
359    /// the node's current status to all subscribers.
360    ///
361    /// # Errors
362    ///
363    /// Returns an error if a component with the same ID already exists.
364    pub fn add_component(&mut self, node: ComponentNode) -> anyhow::Result<NodeIndex> {
365        let (node_idx, event, _, _) = self.add_component_internal(node)?;
366        if let Some(event) = event {
367            let _ = self.event_tx.send(event.clone());
368            self.event_history.record_event(event);
369        }
370        Ok(node_idx)
371    }
372
373    /// Internal: adds a component and returns the event without emitting it.
374    /// Used by both `add_component()` (emits immediately) and `GraphTransaction`
375    /// (defers emission to commit).
376    pub(super) fn add_component_internal(
377        &mut self,
378        node: ComponentNode,
379    ) -> anyhow::Result<(NodeIndex, Option<ComponentEvent>, EdgeIndex, EdgeIndex)> {
380        if self.index.contains_key(&node.id) {
381            return Err(anyhow::anyhow!(
382                "{} '{}' already exists in the graph",
383                node.kind,
384                node.id
385            ));
386        }
387
388        let id = node.id.clone();
389        let kind = node.kind.clone();
390        let status = node.status;
391        let node_idx = self.graph.add_node(node);
392        self.index.insert(id.clone(), node_idx);
393
394        // Create bidirectional ownership edges (Instance ↔ Component)
395        let owns_edge = self
396            .graph
397            .add_edge(self.instance_idx, node_idx, RelationshipKind::Owns);
398        let owned_by_edge =
399            self.graph
400                .add_edge(node_idx, self.instance_idx, RelationshipKind::OwnedBy);
401
402        let event = kind
403            .to_component_type()
404            .map(|component_type| ComponentEvent {
405                component_id: id,
406                component_type,
407                status,
408                timestamp: chrono::Utc::now(),
409                message: Some(format!("{kind} added")),
410            });
411
412        Ok((node_idx, event, owns_edge, owned_by_edge))
413    }
414
415    /// Remove a component node and all its edges from the graph.
416    ///
417    /// Emits a [`ComponentEvent`] with status [`ComponentStatus::Removed`] to all
418    /// subscribers. Returns the removed node data, or an error if the component
419    /// doesn't exist. The instance root node cannot be removed.
420    pub fn remove_component(&mut self, id: &str) -> anyhow::Result<ComponentNode> {
421        let node_idx = self
422            .index
423            .get(id)
424            .copied()
425            .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in graph"))?;
426
427        if node_idx == self.instance_idx {
428            return Err(anyhow::anyhow!("Cannot remove the instance root node"));
429        }
430
431        // Capture kind before removing so we can emit an event
432        let kind = self.graph[node_idx].kind.clone();
433
434        self.index.remove(id);
435        // Remove runtime instance if present (atomic with node removal)
436        self.runtimes.remove(id);
437        // Remove event history for this component
438        self.event_history.remove_component(id);
439        // StableGraph::remove_node automatically removes all edges connected to this node
440        let removed = self
441            .graph
442            .remove_node(node_idx)
443            .ok_or_else(|| anyhow::anyhow!("Component '{id}' already removed"))?;
444
445        self.emit_event(
446            id,
447            &kind,
448            ComponentStatus::Removed,
449            Some(format!("{kind} removed")),
450        );
451
452        Ok(removed)
453    }
454
455    /// Get a component node by ID.
456    pub fn get_component(&self, id: &str) -> Option<&ComponentNode> {
457        self.index
458            .get(id)
459            .and_then(|idx| self.graph.node_weight(*idx))
460    }
461
462    /// Get a mutable reference to a component node by ID.
463    pub fn get_component_mut(&mut self, id: &str) -> Option<&mut ComponentNode> {
464        self.index
465            .get(id)
466            .copied()
467            .and_then(|idx| self.graph.node_weight_mut(idx))
468    }
469
470    /// Check if a component exists in the graph.
471    pub fn contains(&self, id: &str) -> bool {
472        self.index.contains_key(id)
473    }
474
475    /// List all components of a specific kind with their status.
476    pub fn list_by_kind(&self, kind: &ComponentKind) -> Vec<(String, ComponentStatus)> {
477        self.graph
478            .node_weights()
479            .filter(|node| &node.kind == kind)
480            .map(|node| (node.id.clone(), node.status))
481            .collect()
482    }
483
484    /// Update a component's status.
485    ///
486    /// Emits a [`ComponentEvent`] with the new status to all subscribers.
487    /// Used internally by [`apply_update`] and tests.
488    pub(super) fn update_status(
489        &mut self,
490        id: &str,
491        status: ComponentStatus,
492    ) -> anyhow::Result<Option<ComponentEvent>> {
493        self.update_status_with_message(id, status, None)
494    }
495
496    /// Update a component's status with an optional message.
497    ///
498    /// Emits a [`ComponentEvent`] with the new status and message to all broadcast
499    /// subscribers AND records it in the centralized event history. This ensures
500    /// events are visible to both global subscribers (via broadcast) and per-component
501    /// subscribers (via event history channels).
502    ///
503    /// Called by [`apply_update`] in the graph update loop and by
504    /// [`validate_and_transition`] for command-initiated transitions.
505    fn update_status_with_message(
506        &mut self,
507        id: &str,
508        status: ComponentStatus,
509        message: Option<String>,
510    ) -> anyhow::Result<Option<ComponentEvent>> {
511        let node = self
512            .get_component_mut(id)
513            .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in graph"))?;
514        let kind = node.kind.clone();
515
516        // Same-state updates are idempotent no-ops (no event, no warning)
517        if node.status == status {
518            return Ok(None);
519        }
520
521        if !is_valid_transition(&node.status, &status) {
522            tracing::warn!(
523                "Invalid state transition for component '{}': {:?} → {:?}, ignoring update",
524                id,
525                node.status,
526                status
527            );
528            return Ok(None);
529        }
530
531        node.status = status;
532
533        // Wake up any waiters blocking on status changes (e.g., wait_for_status)
534        self.status_notify.notify_waiters();
535
536        let event = self.emit_event(id, &kind, status, message);
537        if let Some(ref event) = event {
538            self.event_history.record_event(event.clone());
539        }
540        Ok(event)
541    }
542
543    // ========================================================================
544    // Edge Operations
545    // ========================================================================
546
547    /// Add a bidirectional relationship between two components (idempotent).
548    ///
549    /// Creates both the forward edge (from → to with `forward` relationship) and
550    /// the reverse edge (to → from with the reverse of `forward`).
551    /// If the relationship already exists, this is a no-op and returns `Ok(())`.
552    ///
553    /// # Errors
554    ///
555    /// Returns an error if either component doesn't exist.
556    pub fn add_relationship(
557        &mut self,
558        from_id: &str,
559        to_id: &str,
560        forward: RelationshipKind,
561    ) -> anyhow::Result<()> {
562        let (_, _) = self.add_relationship_internal(from_id, to_id, forward)?;
563        Ok(())
564    }
565
566    /// Internal: adds a relationship and returns the edge indices for rollback.
567    /// Returns `(None, None)` if the relationship already exists (idempotent).
568    pub(super) fn add_relationship_internal(
569        &mut self,
570        from_id: &str,
571        to_id: &str,
572        forward: RelationshipKind,
573    ) -> anyhow::Result<(Option<EdgeIndex>, Option<EdgeIndex>)> {
574        let from_idx = self
575            .index
576            .get(from_id)
577            .copied()
578            .ok_or_else(|| anyhow::anyhow!("Component '{from_id}' not found in graph"))?;
579        let to_idx = self
580            .index
581            .get(to_id)
582            .copied()
583            .ok_or_else(|| anyhow::anyhow!("Component '{to_id}' not found in graph"))?;
584
585        // Validate the relationship is semantically valid for the node kinds
586        let from_kind = &self.graph[from_idx].kind;
587        let to_kind = &self.graph[to_idx].kind;
588        if !is_valid_relationship(from_kind, to_kind, &forward) {
589            return Err(anyhow::anyhow!(
590                "Invalid relationship: {forward:?} from {from_kind} '{from_id}' to {to_kind} '{to_id}'"
591            ));
592        }
593
594        // Idempotency: check if the forward edge already exists
595        let already_exists = self
596            .graph
597            .edges_directed(from_idx, Direction::Outgoing)
598            .any(|e| e.target() == to_idx && e.weight() == &forward);
599        if already_exists {
600            return Ok((None, None));
601        }
602
603        let reverse = forward.reverse();
604        let fwd_edge = self.graph.add_edge(from_idx, to_idx, forward);
605        let rev_edge = self.graph.add_edge(to_idx, from_idx, reverse);
606
607        Ok((Some(fwd_edge), Some(rev_edge)))
608    }
609
610    /// Remove a bidirectional relationship between two components.
611    ///
612    /// Removes both the forward edge (from → to with `forward` relationship) and
613    /// the reverse edge (to → from with the reverse of `forward`).
614    /// If the relationship doesn't exist, this is a no-op and returns `Ok(())`.
615    ///
616    /// # Errors
617    ///
618    /// Returns an error if either component doesn't exist in the graph.
619    pub fn remove_relationship(
620        &mut self,
621        from_id: &str,
622        to_id: &str,
623        forward: RelationshipKind,
624    ) -> anyhow::Result<()> {
625        let from_idx = self
626            .index
627            .get(from_id)
628            .copied()
629            .ok_or_else(|| anyhow::anyhow!("Component '{from_id}' not found in graph"))?;
630        let to_idx = self
631            .index
632            .get(to_id)
633            .copied()
634            .ok_or_else(|| anyhow::anyhow!("Component '{to_id}' not found in graph"))?;
635
636        let reverse = forward.reverse();
637
638        // Find and remove forward edge
639        let forward_edge = self
640            .graph
641            .edges_directed(from_idx, Direction::Outgoing)
642            .find(|e| e.target() == to_idx && e.weight() == &forward)
643            .map(|e| e.id());
644        if let Some(edge_id) = forward_edge {
645            self.graph.remove_edge(edge_id);
646        }
647
648        // Find and remove reverse edge
649        let reverse_edge = self
650            .graph
651            .edges_directed(to_idx, Direction::Outgoing)
652            .find(|e| e.target() == from_idx && e.weight() == &reverse)
653            .map(|e| e.id());
654        if let Some(edge_id) = reverse_edge {
655            self.graph.remove_edge(edge_id);
656        }
657
658        Ok(())
659    }
660
661    // ========================================================================
662    // Relationship Queries
663    // ========================================================================
664
665    /// Get all components that this component has outgoing edges to,
666    /// filtered by relationship kind.
667    pub fn get_neighbors(&self, id: &str, relationship: &RelationshipKind) -> Vec<&ComponentNode> {
668        let Some(&node_idx) = self.index.get(id) else {
669            return Vec::new();
670        };
671
672        self.graph
673            .edges_directed(node_idx, Direction::Outgoing)
674            .filter(|edge| edge.weight() == relationship)
675            .filter_map(|edge| self.graph.node_weight(edge.target()))
676            .collect()
677    }
678
679    /// Get all components that depend on the given component.
680    ///
681    /// "Dependents" are components that would be affected if this component
682    /// were removed or stopped. This follows Feeds edges (outgoing).
683    pub fn get_dependents(&self, id: &str) -> Vec<&ComponentNode> {
684        let Some(&node_idx) = self.index.get(id) else {
685            return Vec::new();
686        };
687
688        self.graph
689            .edges_directed(node_idx, Direction::Outgoing)
690            .filter(|edge| matches!(edge.weight(), RelationshipKind::Feeds))
691            .filter_map(|edge| self.graph.node_weight(edge.target()))
692            .collect()
693    }
694
695    /// Get all components that this component depends on.
696    ///
697    /// "Dependencies" are components that this component needs to function.
698    /// This follows SubscribesTo edges (outgoing).
699    pub fn get_dependencies(&self, id: &str) -> Vec<&ComponentNode> {
700        let Some(&node_idx) = self.index.get(id) else {
701            return Vec::new();
702        };
703
704        self.graph
705            .edges_directed(node_idx, Direction::Outgoing)
706            .filter(|edge| matches!(edge.weight(), RelationshipKind::SubscribesTo))
707            .filter_map(|edge| self.graph.node_weight(edge.target()))
708            .collect()
709    }
710
711    /// Check if a component can be safely removed (no dependents that would break).
712    ///
713    /// Returns Ok(()) if safe, or Err with the list of dependent component IDs.
714    pub fn can_remove(&self, id: &str) -> Result<(), Vec<String>> {
715        let dependents = self.get_dependents(id);
716        if dependents.is_empty() {
717            Ok(())
718        } else {
719            Err(dependents.iter().map(|n| n.id.clone()).collect())
720        }
721    }
722
723    // ========================================================================
724    // Lifecycle
725    // ========================================================================
726
727    /// Atomically validate and apply a commanded status transition.
728    ///
729    /// This is the **single canonical way** for managers to change a component's
730    /// status for command-initiated transitions (`Starting`, `Stopping`,
731    /// `Reconfiguring`). It combines validation and mutation under a single
732    /// `&mut self` borrow, eliminating the TOCTOU gap between checking status
733    /// and updating it.
734    ///
735    /// Components still report runtime-initiated transitions (`Running`,
736    /// `Stopped`, `Error`) via the mpsc channel → [`apply_update`].
737    ///
738    /// # Returns
739    ///
740    /// - `Ok(Some(event))` — transition applied, event emitted to broadcast subscribers
741    /// - `Ok(None)` — same-state no-op (component already in `target_status`)
742    /// - `Err(...)` — component not found or transition not valid from current state
743    ///
744    /// # Example
745    ///
746    /// ```ignore
747    /// let mut graph = self.graph.write().await;
748    /// graph.validate_and_transition("source-1", ComponentStatus::Starting, Some("Starting source"))?;
749    /// drop(graph); // release lock before calling source.start()
750    /// source.start().await?;
751    /// ```
752    pub fn validate_and_transition(
753        &mut self,
754        id: &str,
755        target_status: ComponentStatus,
756        message: Option<String>,
757    ) -> anyhow::Result<Option<ComponentEvent>> {
758        let node = self
759            .get_component(id)
760            .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in graph"))?;
761        let current = node.status;
762
763        // Same-state is an idempotent no-op
764        if current == target_status {
765            return Ok(None);
766        }
767
768        // Produce a descriptive error message for invalid transitions
769        if !is_valid_transition(&current, &target_status) {
770            let reason = describe_invalid_transition(id, &current, &target_status);
771            return Err(anyhow::anyhow!(reason));
772        }
773
774        // Transition is valid — apply it
775        self.update_status_with_message(id, target_status, message)
776    }
777
778    /// Get a topological ordering of components for lifecycle operations.
779    ///
780    /// Returns components in dependency order: sources first, then queries, then reactions.
781    /// Only follows Feeds edges for ordering (other edge types don't affect lifecycle order).
782    ///
783    /// The instance root node is excluded from the result.
784    pub fn topological_order(&self) -> anyhow::Result<Vec<&ComponentNode>> {
785        // Build a filtered subgraph with only Feeds edges for ordering
786        // (bidirectional edges like SubscribesTo/OwnedBy create cycles that
787        // would prevent toposort on the full graph)
788        let mut order_graph: StableGraph<(), ()> = StableGraph::new();
789        let mut idx_map: HashMap<NodeIndex, NodeIndex> = HashMap::new();
790
791        // Add all nodes
792        for node_idx in self.graph.node_indices() {
793            let new_idx = order_graph.add_node(());
794            idx_map.insert(node_idx, new_idx);
795        }
796
797        // Add only Feeds edges
798        for edge_idx in self.graph.edge_indices() {
799            if let Some(weight) = self.graph.edge_weight(edge_idx) {
800                if matches!(weight, RelationshipKind::Feeds) {
801                    if let Some((from, to)) = self.graph.edge_endpoints(edge_idx) {
802                        if let (Some(&new_from), Some(&new_to)) =
803                            (idx_map.get(&from), idx_map.get(&to))
804                        {
805                            order_graph.add_edge(new_from, new_to, ());
806                        }
807                    }
808                }
809            }
810        }
811
812        // Reverse map: new index → original index
813        let reverse_map: HashMap<NodeIndex, NodeIndex> =
814            idx_map.iter().map(|(&orig, &new)| (new, orig)).collect();
815
816        match petgraph::algo::toposort(&order_graph, None) {
817            Ok(sorted) => Ok(sorted
818                .into_iter()
819                .filter_map(|new_idx| reverse_map.get(&new_idx))
820                .filter(|idx| **idx != self.instance_idx)
821                .filter_map(|idx| self.graph.node_weight(*idx))
822                .collect()),
823            Err(_cycle) => Err(anyhow::anyhow!(
824                "Cycle detected in component graph — cannot determine lifecycle order"
825            )),
826        }
827    }
828
829    // ========================================================================
830    // Serialization
831    // ========================================================================
832
833    /// Create a serializable snapshot of the entire graph.
834    ///
835    /// The snapshot includes the instance root node and all components
836    /// with their relationships. Used for API responses and UI visualization.
837    pub fn snapshot(&self) -> GraphSnapshot {
838        let nodes: Vec<ComponentNode> = self.graph.node_weights().cloned().collect();
839
840        let edges: Vec<GraphEdge> = self
841            .graph
842            .edge_indices()
843            .filter_map(|edge_idx| {
844                let (from_idx, to_idx) = self.graph.edge_endpoints(edge_idx)?;
845                let from = self.graph.node_weight(from_idx)?;
846                let to = self.graph.node_weight(to_idx)?;
847                let relationship = self.graph.edge_weight(edge_idx)?;
848                Some(GraphEdge {
849                    from: from.id.clone(),
850                    to: to.id.clone(),
851                    relationship: relationship.clone(),
852                })
853            })
854            .collect();
855
856        GraphSnapshot {
857            instance_id: self.instance_id().to_string(),
858            nodes,
859            edges,
860        }
861    }
862
863    /// Get the total number of components (including the instance root).
864    pub fn node_count(&self) -> usize {
865        self.graph.node_count()
866    }
867
868    /// Get the total number of edges.
869    pub fn edge_count(&self) -> usize {
870        self.graph.edge_count()
871    }
872
873    // ========================================================================
874    // High-Level Registration (Source of Truth)
875    // ========================================================================
876
877    /// Register a source component in the graph.
878    ///
879    /// Creates the node and bidirectional ownership edges transactionally.
880    /// Events are emitted only on successful commit.
881    ///
882    /// # Errors
883    ///
884    /// Returns an error if a component with the same ID already exists.
885    pub fn register_source(
886        &mut self,
887        id: &str,
888        metadata: HashMap<String, String>,
889    ) -> anyhow::Result<()> {
890        let node = ComponentNode {
891            id: id.to_string(),
892            kind: ComponentKind::Source,
893            status: ComponentStatus::Added,
894            metadata,
895        };
896        let mut txn = self.begin();
897        txn.add_component(node)?;
898        txn.commit();
899        Ok(())
900    }
901
902    /// Register a query component with its source dependencies.
903    ///
904    /// Creates the node, ownership edges, and `Feeds` edges from each source.
905    /// All operations are transactional — if any dependency is missing or any
906    /// step fails, the entire registration is rolled back.
907    ///
908    /// # Errors
909    ///
910    /// Returns an error if:
911    /// - A component with the same ID already exists
912    /// - Any referenced source does not exist in the graph
913    pub fn register_query(
914        &mut self,
915        id: &str,
916        metadata: HashMap<String, String>,
917        source_ids: &[String],
918    ) -> anyhow::Result<()> {
919        // Validate all dependencies exist before starting the transaction
920        for source_id in source_ids {
921            if !self.contains(source_id) {
922                return Err(anyhow::anyhow!(
923                    "Cannot register query '{id}': referenced source '{source_id}' does not exist in the graph"
924                ));
925            }
926        }
927
928        let node = ComponentNode {
929            id: id.to_string(),
930            kind: ComponentKind::Query,
931            status: ComponentStatus::Added,
932            metadata,
933        };
934        let mut txn = self.begin();
935        txn.add_component(node)?;
936        for source_id in source_ids {
937            txn.add_relationship(source_id, id, RelationshipKind::Feeds)?;
938        }
939        txn.commit();
940        Ok(())
941    }
942
943    /// Register a reaction component with its query dependencies.
944    ///
945    /// Creates the node, ownership edges, and `Feeds` edges from each query.
946    /// All operations are transactional — if any dependency is missing or any
947    /// step fails, the entire registration is rolled back.
948    ///
949    /// # Errors
950    ///
951    /// Returns an error if:
952    /// - A component with the same ID already exists
953    /// - Any referenced query does not exist in the graph
954    pub fn register_reaction(
955        &mut self,
956        id: &str,
957        metadata: HashMap<String, String>,
958        query_ids: &[String],
959    ) -> anyhow::Result<()> {
960        // Validate all dependencies exist before starting the transaction
961        for query_id in query_ids {
962            if !self.contains(query_id) {
963                return Err(anyhow::anyhow!(
964                    "Cannot register reaction '{id}': referenced query '{query_id}' does not exist in the graph"
965                ));
966            }
967        }
968
969        let node = ComponentNode {
970            id: id.to_string(),
971            kind: ComponentKind::Reaction,
972            status: ComponentStatus::Added,
973            metadata,
974        };
975        let mut txn = self.begin();
976        txn.add_component(node)?;
977        for query_id in query_ids {
978            txn.add_relationship(query_id, id, RelationshipKind::Feeds)?;
979        }
980        txn.commit();
981        Ok(())
982    }
983
984    /// Deregister a component and all its edges from the graph.
985    ///
986    /// Validates that the component exists and has no dependents before removal.
987    /// The instance root node cannot be deregistered.
988    ///
989    /// # Errors
990    ///
991    /// Returns an error if:
992    /// - The component does not exist
993    /// - The component has dependents (use `can_remove()` to check first)
994    /// - The component is the instance root node
995    pub fn deregister(&mut self, id: &str) -> anyhow::Result<ComponentNode> {
996        // Validate no dependents
997        if let Err(dependent_ids) = self.can_remove(id) {
998            return Err(anyhow::anyhow!(
999                "Cannot deregister '{}': depended on by: {}",
1000                id,
1001                dependent_ids.join(", ")
1002            ));
1003        }
1004        self.remove_component(id)
1005    }
1006
1007    /// Register a bootstrap provider in the graph for topology visibility.
1008    ///
1009    /// Creates the node and bidirectional ownership edges transactionally.
1010    /// Optionally links the provider to its target source via `Bootstraps` edges.
1011    ///
1012    /// # Usage
1013    ///
1014    /// Bootstrap providers are managed internally by source plugins (set via
1015    /// `Source::set_bootstrap_provider()`). Call this method to make a bootstrap
1016    /// provider visible in the component graph for topology visualization and
1017    /// dependency tracking.
1018    ///
1019    /// ```ignore
1020    /// // After adding a source with a bootstrap provider:
1021    /// let mut graph = core.component_graph().write().await;
1022    /// graph.register_bootstrap_provider("my-bootstrap", metadata, &["my-source".into()])?;
1023    /// ```
1024    ///
1025    /// # Errors
1026    ///
1027    /// Returns an error if:
1028    /// - A component with the same ID already exists
1029    /// - Any referenced source does not exist in the graph
1030    pub fn register_bootstrap_provider(
1031        &mut self,
1032        id: &str,
1033        metadata: HashMap<String, String>,
1034        source_ids: &[String],
1035    ) -> anyhow::Result<()> {
1036        for source_id in source_ids {
1037            if !self.contains(source_id) {
1038                return Err(anyhow::anyhow!(
1039                    "Cannot register bootstrap provider '{id}': referenced source '{source_id}' does not exist in the graph"
1040                ));
1041            }
1042        }
1043
1044        let node = ComponentNode {
1045            id: id.to_string(),
1046            kind: ComponentKind::BootstrapProvider,
1047            status: ComponentStatus::Added,
1048            metadata,
1049        };
1050        let mut txn = self.begin();
1051        txn.add_component(node)?;
1052        for source_id in source_ids {
1053            txn.add_relationship(id, source_id, RelationshipKind::Bootstraps)?;
1054        }
1055        txn.commit();
1056        Ok(())
1057    }
1058
1059    /// Register an identity provider in the graph for topology visibility.
1060    ///
1061    /// Creates the node and bidirectional ownership edges transactionally.
1062    /// Optionally links the provider to components it authenticates via
1063    /// `Authenticates` edges.
1064    ///
1065    /// # Current Status
1066    ///
1067    /// This method is **reserved for future use**. Identity provider support
1068    /// is not yet implemented in the component lifecycle. The registration
1069    /// infrastructure is in place for when authentication integration is added.
1070    ///
1071    /// # Errors
1072    ///
1073    /// Returns an error if:
1074    /// - A component with the same ID already exists
1075    /// - Any referenced component does not exist in the graph
1076    pub fn register_identity_provider(
1077        &mut self,
1078        id: &str,
1079        metadata: HashMap<String, String>,
1080        component_ids: &[String],
1081    ) -> anyhow::Result<()> {
1082        for component_id in component_ids {
1083            if !self.contains(component_id) {
1084                return Err(anyhow::anyhow!(
1085                    "Cannot register identity provider '{id}': referenced component '{component_id}' does not exist in the graph"
1086                ));
1087            }
1088        }
1089
1090        let node = ComponentNode {
1091            id: id.to_string(),
1092            kind: ComponentKind::IdentityProvider,
1093            status: ComponentStatus::Added,
1094            metadata,
1095        };
1096        let mut txn = self.begin();
1097        txn.add_component(node)?;
1098        for component_id in component_ids {
1099            txn.add_relationship(id, component_id, RelationshipKind::Authenticates)?;
1100        }
1101        txn.commit();
1102        Ok(())
1103    }
1104
1105    // ========================================================================
1106    // Transactions
1107    // ========================================================================
1108
1109    /// Begin a transactional mutation of the graph.
1110    ///
1111    /// Returns a [`GraphTransaction`] that collects mutations (nodes, edges)
1112    /// and defers event emission until [`commit()`](GraphTransaction::commit).
1113    /// If the transaction is dropped without being committed, all added nodes
1114    /// and edges are rolled back automatically.
1115    ///
1116    /// The `&mut self` borrow ensures compile-time exclusivity — no other code
1117    /// can access the graph while a transaction is in progress.
1118    ///
1119    /// # Example
1120    ///
1121    /// ```ignore
1122    /// let mut graph = self.graph.write().await;
1123    /// let mut txn = graph.begin();
1124    /// txn.add_component(source_node)?;
1125    /// txn.add_relationship("source-1", "query-1", RelationshipKind::Feeds)?;
1126    /// txn.commit(); // events emitted here; if this line is not reached, rollback on drop
1127    /// ```
1128    pub fn begin(&mut self) -> GraphTransaction<'_> {
1129        GraphTransaction::new(self)
1130    }
1131
1132    // ========================================================================
1133    // Event History (centralized)
1134    // ========================================================================
1135
1136    /// Record a component event in the centralized history.
1137    ///
1138    /// Called internally by [`apply_update()`]. Managers should NOT call this
1139    /// directly — status updates flow through the mpsc channel and are recorded
1140    /// automatically.
1141    pub fn record_event(&mut self, event: ComponentEvent) {
1142        self.event_history.record_event(event);
1143    }
1144
1145    /// Get all lifecycle events for a specific component.
1146    ///
1147    /// Returns events in chronological order (oldest first).
1148    /// Up to 100 most recent events are retained per component.
1149    pub fn get_events(&self, component_id: &str) -> Vec<ComponentEvent> {
1150        self.event_history.get_events(component_id)
1151    }
1152
1153    /// Get all lifecycle events across all components.
1154    ///
1155    /// Returns events sorted by timestamp (oldest first).
1156    pub fn get_all_events(&self) -> Vec<ComponentEvent> {
1157        self.event_history.get_all_events()
1158    }
1159
1160    /// Get the most recent error message for a component.
1161    pub fn get_last_error(&self, component_id: &str) -> Option<String> {
1162        self.event_history.get_last_error(component_id)
1163    }
1164
1165    /// Subscribe to live lifecycle events for a component.
1166    ///
1167    /// Returns the current history and a broadcast receiver for new events,
1168    /// or `None` if the component has no event channel yet.
1169    pub fn subscribe_events(
1170        &self,
1171        component_id: &str,
1172    ) -> Option<(Vec<ComponentEvent>, broadcast::Receiver<ComponentEvent>)> {
1173        self.event_history.try_subscribe(component_id)
1174    }
1175}
1176
1177// ============================================================================
1178// Relationship Validation
1179// ============================================================================
1180
1181/// Check if a relationship kind is semantically valid between two component kinds.
1182///
1183/// This enforces the graph topology rules:
1184/// - **Feeds**: Source → Query, or Query → Reaction
1185/// - **Owns/OwnedBy**: Instance ↔ any component (created automatically)
1186/// - **Bootstraps**: BootstrapProvider → Source
1187/// - **Authenticates**: IdentityProvider → any component
1188pub(super) fn is_valid_relationship(
1189    from_kind: &ComponentKind,
1190    to_kind: &ComponentKind,
1191    relationship: &RelationshipKind,
1192) -> bool {
1193    use ComponentKind::*;
1194    use RelationshipKind::*;
1195    matches!(
1196        (from_kind, to_kind, relationship),
1197        // Data flow
1198        (Source, Query, Feeds)
1199            | (Query, Reaction, Feeds)
1200            // Ownership (auto-created by add_component_internal)
1201            | (Instance, _, Owns)
1202            // Bootstrap
1203            | (BootstrapProvider, Source, Bootstraps)
1204            // Authentication
1205            | (IdentityProvider, _, Authenticates)
1206    )
1207}
1208
1209// ============================================================================
1210// State Transition Validation
1211// ============================================================================
1212
1213/// Check if a status transition is valid according to the component lifecycle state machine.
1214///
1215/// ```text
1216/// Added ──→ Starting ──→ Running ──→ Stopping ──→ Stopped
1217///   │           │            │            │
1218///   │           ↓            ↓            ↓
1219///   │         Error        Error        Error
1220///   │           │
1221///   │           ↓
1222///   │        Stopped (aborted start)
1223///   │
1224///   ↓
1225/// Reconfiguring ──→ Stopped | Starting | Error
1226///
1227/// Error ──→ Starting (retry) | Stopped (reset)
1228///
1229/// Note: Added and Removed are set by the graph on add/remove_component()
1230/// and are NOT valid targets for validate_and_transition().
1231/// ```
1232pub(super) fn is_valid_transition(from: &ComponentStatus, to: &ComponentStatus) -> bool {
1233    use ComponentStatus::*;
1234    matches!(
1235        (from, to),
1236        // Normal lifecycle
1237        (Added, Starting)
1238            | (Added, Stopped) // immediate deactivation without starting
1239            | (Stopped, Starting)
1240            | (Starting, Running)
1241            | (Starting, Error)
1242            | (Starting, Stopped) // aborted start
1243            | (Running, Stopping)
1244            | (Running, Stopped) // direct stop (async channel may skip Stopping)
1245            | (Running, Error)
1246            | (Stopping, Stopped)
1247            | (Stopping, Error)
1248            // Error recovery
1249            | (Error, Starting) // retry
1250            | (Error, Stopped) // reset
1251            // Reconfiguration (from any stable state)
1252            | (Added, Reconfiguring)
1253            | (Stopped, Reconfiguring)
1254            | (Running, Reconfiguring)
1255            | (Error, Reconfiguring)
1256            | (Reconfiguring, Stopped)
1257            | (Reconfiguring, Starting)
1258            | (Reconfiguring, Error)
1259    )
1260}
1261
1262/// Produce a human-readable error message for an invalid transition.
1263///
1264/// These messages provide actionable feedback (e.g., "Component is already running"
1265/// instead of just "invalid transition").
1266fn describe_invalid_transition(id: &str, from: &ComponentStatus, to: &ComponentStatus) -> String {
1267    use ComponentStatus::*;
1268    match (from, to) {
1269        // Trying to start something that's already starting/running
1270        (Starting, Starting) => format!("Component '{id}' is already starting"),
1271        (Running, Starting) => format!("Component '{id}' is already running"),
1272        (Stopping, Starting) => {
1273            format!("Cannot start component '{id}' while it is stopping")
1274        }
1275        (Reconfiguring, Starting) => {
1276            // Reconfiguring → Starting is actually valid, so this shouldn't be reached,
1277            // but kept for safety
1278            format!("Cannot start component '{id}' while it is reconfiguring")
1279        }
1280        // Trying to stop something that's already stopped/stopping
1281        (Stopped, Stopping) => {
1282            format!("Cannot stop component '{id}': it is already stopped")
1283        }
1284        (Stopping, Stopping) => format!("Component '{id}' is already stopping"),
1285        (Error, Stopping) => {
1286            format!("Cannot stop component '{id}': it is in error state")
1287        }
1288        // Trying to reconfigure during a transition
1289        (Starting, Reconfiguring) => {
1290            format!("Cannot reconfigure component '{id}' while it is starting")
1291        }
1292        (Stopping, Reconfiguring) => {
1293            format!("Cannot reconfigure component '{id}' while it is stopping")
1294        }
1295        (Reconfiguring, Reconfiguring) => {
1296            format!("Component '{id}' is already reconfiguring")
1297        }
1298        // Generic fallback
1299        _ => format!("Invalid state transition for component '{id}': {from:?} → {to:?}"),
1300    }
1301}
1302
1303impl std::fmt::Debug for ComponentGraph {
1304    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1305        f.debug_struct("ComponentGraph")
1306            .field("instance_id", &self.instance_id())
1307            .field("node_count", &self.node_count())
1308            .field("edge_count", &self.edge_count())
1309            .finish()
1310    }
1311}