drasi_lib/component_graph/graph.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use petgraph::stable_graph::{EdgeIndex, NodeIndex, StableGraph};
20use petgraph::visit::EdgeRef;
21use petgraph::Direction;
22use tokio::sync::{broadcast, mpsc, Notify};
23
24use crate::channels::{ComponentEvent, ComponentEventBroadcastReceiver, ComponentStatus};
25use crate::managers::ComponentEventHistory;
26
27use super::transaction::GraphTransaction;
28use super::{
29 ComponentKind, ComponentNode, ComponentUpdate, ComponentUpdateReceiver, ComponentUpdateSender,
30 GraphEdge, GraphSnapshot, RelationshipKind,
31};
32
33// ============================================================================
34// Component Graph
35// ============================================================================
36
37/// Central component dependency graph — the single source of truth.
38///
39/// Backed by `petgraph::stable_graph::StableGraph` which keeps node/edge indices
40/// stable across removals (critical for a graph that changes at runtime).
41///
42/// # Error Handling
43///
44/// Mutation methods on this struct return `anyhow::Result`, following the Layer 2
45/// (internal module) error convention. The public API boundary in `DrasiLib` and
46/// `lib_core_ops` wraps these into `DrasiError` variants before returning to callers.
47/// External consumers should use `DrasiLib` methods rather than calling graph
48/// mutations directly.
49///
50/// # Event Emission
51///
52/// The graph emits [`ComponentEvent`]s via a built-in broadcast channel whenever
53/// components are added, removed, or change status. Call [`subscribe()`](Self::subscribe)
54/// to receive events.
55///
56/// # Thread Safety
57///
58/// `ComponentGraph` is NOT `Send`/`Sync` by itself due to the underlying `StableGraph`.
59/// It must be wrapped in `Arc<RwLock<ComponentGraph>>` for multi-threaded access.
60/// All public APIs in `DrasiLib` handle this wrapping automatically.
61///
62/// # Instance Root
63///
64/// The graph always has the DrasiLib instance as its root node.
65/// All other components are connected to it via Owns/OwnedBy edges.
66pub struct ComponentGraph {
67 /// The underlying petgraph directed graph
68 pub(super) graph: StableGraph<ComponentNode, RelationshipKind>,
69 /// Fast lookup: component ID → node index (O(1) access)
70 pub(super) index: HashMap<String, NodeIndex>,
71 /// The instance node index (always present)
72 instance_idx: NodeIndex,
73 /// Broadcast sender for component lifecycle events (fan-out to subscribers).
74 /// Events are emitted by `add_component()`, `remove_component()`, `update_status()`,
75 /// and the graph update loop.
76 pub(super) event_tx: broadcast::Sender<ComponentEvent>,
77 /// mpsc sender for component status updates (fan-in from components).
78 /// Cloned and given to each component's Base struct. The graph update loop
79 /// owns the corresponding receiver.
80 update_tx: mpsc::Sender<ComponentUpdate>,
81 /// Notifies waiters when any component's status changes.
82 /// Used by `wait_for_status()` to replace polling loops with event-driven waits.
83 /// Follows the same pattern as `PriorityQueue::notify`.
84 status_notify: Arc<Notify>,
85 /// Type-erased runtime instances, keyed by component ID.
86 ///
87 /// Managers store their `Arc<dyn Source>`, `Arc<dyn Query>`, `Arc<dyn Reaction>`, etc.
88 /// here during provisioning. This eliminates the dual-registry pattern where each
89 /// manager maintained its own HashMap — the graph is now the single store for both
90 /// metadata (in the petgraph node) and runtime instances (here).
91 ///
92 /// Access via typed helpers: [`set_runtime()`], [`get_runtime()`], [`take_runtime()`].
93 runtimes: HashMap<String, Box<dyn Any + Send + Sync>>,
94 /// Centralized event history for all components.
95 ///
96 /// Stores lifecycle events (Starting, Running, Error, Stopped, etc.) for every
97 /// component in the graph. Events are recorded by [`apply_update()`] and
98 /// [`remove_component()`]. Managers delegate event queries here rather than
99 /// maintaining their own per-manager histories.
100 event_history: ComponentEventHistory,
101}
102
103/// Default broadcast channel capacity for component events.
104const EVENT_CHANNEL_CAPACITY: usize = 1000;
105
106/// Default mpsc channel capacity for component updates.
107const UPDATE_CHANNEL_CAPACITY: usize = 1000;
108
109impl ComponentGraph {
110 /// Create a new component graph with the given instance as root node.
111 ///
112 /// Returns the graph and a [`ComponentUpdateReceiver`] that must be consumed by
113 /// a graph update loop task (see [`Self::apply_update`]).
114 pub fn new(instance_id: &str) -> (Self, ComponentUpdateReceiver) {
115 let mut graph = StableGraph::new();
116 let instance_node = ComponentNode {
117 id: instance_id.to_string(),
118 kind: ComponentKind::Instance,
119 status: ComponentStatus::Running,
120 metadata: HashMap::new(),
121 };
122 let instance_idx = graph.add_node(instance_node);
123
124 let mut index = HashMap::new();
125 index.insert(instance_id.to_string(), instance_idx);
126 let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
127 let (update_tx, update_rx) = mpsc::channel(UPDATE_CHANNEL_CAPACITY);
128
129 (
130 Self {
131 graph,
132 index,
133 instance_idx,
134 event_tx,
135 update_tx,
136 status_notify: Arc::new(Notify::new()),
137 runtimes: HashMap::new(),
138 event_history: ComponentEventHistory::new(),
139 },
140 update_rx,
141 )
142 }
143
144 /// Subscribe to component lifecycle events.
145 ///
146 /// Returns a broadcast receiver that gets a copy of every [`ComponentEvent`]
147 /// emitted by graph mutations (`add_component`, `remove_component`, `update_status`).
148 pub fn subscribe(&self) -> ComponentEventBroadcastReceiver {
149 self.event_tx.subscribe()
150 }
151
152 /// Get a reference to the broadcast sender for component events.
153 ///
154 /// This allows callers to clone the sender before the graph is wrapped in
155 /// `Arc<RwLock<>>`, enabling subscription without needing to acquire the lock.
156 /// The returned sender is the same one used by the graph for event emission.
157 pub fn event_sender(&self) -> &broadcast::Sender<ComponentEvent> {
158 &self.event_tx
159 }
160
161 /// Get a clone of the mpsc update sender.
162 ///
163 /// This is the sender that components use to report status changes without
164 /// acquiring any graph lock. Clone this and pass to `SourceBase`/`ReactionBase`.
165 pub fn update_sender(&self) -> ComponentUpdateSender {
166 self.update_tx.clone()
167 }
168
169 /// Get a clone of the status change notifier.
170 ///
171 /// This `Notify` is signalled whenever any component's status changes in the graph.
172 /// Use it with [`wait_for_status`] or build custom wait loops that avoid polling
173 /// with sleep.
174 ///
175 /// # Pattern
176 ///
177 /// ```ignore
178 /// let notify = graph.status_notifier();
179 /// // Register interest BEFORE releasing the graph lock
180 /// let notified = notify.notified();
181 /// drop(graph); // release lock
182 /// notified.await; // woken when any status changes
183 /// ```
184 pub fn status_notifier(&self) -> Arc<Notify> {
185 self.status_notify.clone()
186 }
187
188 /// Apply a [`ComponentUpdate`] received from the mpsc channel.
189 ///
190 /// Called by the graph update loop task. Updates the graph, emits a
191 /// broadcast event, and records the event in the centralized event history.
192 /// Returns the event for external logging/processing.
193 pub fn apply_update(&mut self, update: ComponentUpdate) -> Option<ComponentEvent> {
194 match update {
195 ComponentUpdate::Status {
196 component_id,
197 status,
198 message,
199 } => match self.update_status_with_message(&component_id, status, message) {
200 Ok(event) => event,
201 Err(e) => {
202 tracing::debug!(
203 "Graph update loop: status update skipped for '{}': {e}",
204 component_id
205 );
206 None
207 }
208 },
209 }
210 }
211
212 /// Emit a [`ComponentEvent`] to all broadcast subscribers and return it.
213 ///
214 /// Returns `Some(event)` if the component kind maps to a [`ComponentType`],
215 /// `None` for kinds like `Instance` that have no external type.
216 /// Silently ignores broadcast send failures (no subscribers connected).
217 fn emit_event(
218 &self,
219 component_id: &str,
220 kind: &ComponentKind,
221 status: ComponentStatus,
222 message: Option<String>,
223 ) -> Option<ComponentEvent> {
224 if let Some(component_type) = kind.to_component_type() {
225 let event = ComponentEvent {
226 component_id: component_id.to_string(),
227 component_type,
228 status,
229 timestamp: chrono::Utc::now(),
230 message,
231 };
232 let _ = self.event_tx.send(event.clone());
233 Some(event)
234 } else {
235 None
236 }
237 }
238
239 /// Get the instance ID (root node)
240 pub fn instance_id(&self) -> &str {
241 &self.graph[self.instance_idx].id
242 }
243
244 // ========================================================================
245 // Runtime Instance Store
246 // ========================================================================
247
248 /// Store a runtime instance for a component.
249 ///
250 /// The component must already exist in the graph (registered via `add_component`
251 /// or one of the `register_*` methods). The runtime instance is stored in a
252 /// type-erased map keyed by component ID.
253 ///
254 /// Managers call this during provisioning after creating the runtime instance:
255 /// ```ignore
256 /// let source: Arc<dyn Source> = Arc::new(my_source);
257 /// source.initialize(context).await;
258 /// graph.set_runtime("source-1", Box::new(source))?;
259 /// ```
260 ///
261 /// # Errors
262 ///
263 /// Returns an error if the component ID is not present in the graph.
264 pub fn set_runtime(
265 &mut self,
266 id: &str,
267 runtime: Box<dyn Any + Send + Sync>,
268 ) -> anyhow::Result<()> {
269 if !self.index.contains_key(id) {
270 return Err(anyhow::anyhow!(
271 "set_runtime called for component '{id}' which is not in the graph"
272 ));
273 }
274
275 // Warn if the runtime type doesn't match the component kind.
276 // This catches type mismatches during development. Uses a warning rather
277 // than a hard assert to avoid breaking unit tests that use substitute types.
278 #[cfg(debug_assertions)]
279 if let Some(node) = self.get_component(id) {
280 let kind = &node.kind;
281 let type_ok = match kind {
282 ComponentKind::Source => runtime
283 .downcast_ref::<std::sync::Arc<dyn crate::sources::Source>>()
284 .is_some(),
285 ComponentKind::Query => runtime
286 .downcast_ref::<std::sync::Arc<dyn crate::queries::manager::Query>>()
287 .is_some(),
288 ComponentKind::Reaction => runtime
289 .downcast_ref::<std::sync::Arc<dyn crate::reactions::Reaction>>()
290 .is_some(),
291 _ => true,
292 };
293 if !type_ok {
294 tracing::warn!(
295 "set_runtime: possible type mismatch for component '{id}' (kind={kind})"
296 );
297 }
298 }
299
300 self.runtimes.insert(id.to_string(), runtime);
301 Ok(())
302 }
303
304 /// Retrieve a reference to a component's runtime instance, downcasting to `T`.
305 ///
306 /// Returns `None` if the component has no runtime instance or if the stored
307 /// type doesn't match `T`.
308 ///
309 /// # Type Parameter
310 ///
311 /// `T` is typically `Arc<dyn Source>`, `Arc<dyn Query>`, or `Arc<dyn Reaction>`.
312 ///
313 /// # Example
314 ///
315 /// ```ignore
316 /// let source: &Arc<dyn Source> = graph.get_runtime::<Arc<dyn Source>>("source-1")
317 /// .ok_or_else(|| anyhow!("Source not found"))?;
318 /// let source = source.clone(); // Clone the Arc for use outside the lock
319 /// ```
320 pub fn get_runtime<T: 'static>(&self, id: &str) -> Option<&T> {
321 self.runtimes.get(id).and_then(|r| r.downcast_ref::<T>())
322 }
323
324 /// Remove and return a component's runtime instance, downcasting to `T`.
325 ///
326 /// Returns `None` if the component has no runtime instance or if the stored
327 /// type doesn't match `T`. On type mismatch, the runtime is **put back** into
328 /// the store (not lost) and an error is logged.
329 ///
330 /// Used during teardown when the manager needs ownership of the instance
331 /// (e.g., to call `deprovision()`).
332 pub fn take_runtime<T: 'static>(&mut self, id: &str) -> Option<T> {
333 let runtime = self.runtimes.remove(id)?;
334 match runtime.downcast::<T>() {
335 Ok(boxed) => Some(*boxed),
336 Err(runtime) => {
337 tracing::error!(
338 "take_runtime: type mismatch for component '{id}', putting runtime back"
339 );
340 self.runtimes.insert(id.to_string(), runtime);
341 None
342 }
343 }
344 }
345
346 /// Check if a component has a runtime instance stored.
347 pub fn has_runtime(&self, id: &str) -> bool {
348 self.runtimes.contains_key(id)
349 }
350
351 // ========================================================================
352 // Node Operations
353 // ========================================================================
354
355 /// Add a component node to the graph.
356 ///
357 /// Automatically creates bidirectional Owns/OwnedBy edges between the
358 /// instance root and the new component. Emits a [`ComponentEvent`] with
359 /// the node's current status to all subscribers.
360 ///
361 /// # Errors
362 ///
363 /// Returns an error if a component with the same ID already exists.
364 pub fn add_component(&mut self, node: ComponentNode) -> anyhow::Result<NodeIndex> {
365 let (node_idx, event, _, _) = self.add_component_internal(node)?;
366 if let Some(event) = event {
367 let _ = self.event_tx.send(event);
368 }
369 Ok(node_idx)
370 }
371
372 /// Internal: adds a component and returns the event without emitting it.
373 /// Used by both `add_component()` (emits immediately) and `GraphTransaction`
374 /// (defers emission to commit).
375 pub(super) fn add_component_internal(
376 &mut self,
377 node: ComponentNode,
378 ) -> anyhow::Result<(NodeIndex, Option<ComponentEvent>, EdgeIndex, EdgeIndex)> {
379 if self.index.contains_key(&node.id) {
380 return Err(anyhow::anyhow!(
381 "{} '{}' already exists in the graph",
382 node.kind,
383 node.id
384 ));
385 }
386
387 let id = node.id.clone();
388 let kind = node.kind.clone();
389 let status = node.status;
390 let node_idx = self.graph.add_node(node);
391 self.index.insert(id.clone(), node_idx);
392
393 // Create bidirectional ownership edges (Instance ↔ Component)
394 let owns_edge = self
395 .graph
396 .add_edge(self.instance_idx, node_idx, RelationshipKind::Owns);
397 let owned_by_edge =
398 self.graph
399 .add_edge(node_idx, self.instance_idx, RelationshipKind::OwnedBy);
400
401 let event = kind
402 .to_component_type()
403 .map(|component_type| ComponentEvent {
404 component_id: id,
405 component_type,
406 status,
407 timestamp: chrono::Utc::now(),
408 message: Some(format!("{kind} added")),
409 });
410
411 Ok((node_idx, event, owns_edge, owned_by_edge))
412 }
413
414 /// Remove a component node and all its edges from the graph.
415 ///
416 /// Emits a [`ComponentEvent`] with status [`ComponentStatus::Stopped`] to all
417 /// subscribers. Returns the removed node data, or an error if the component
418 /// doesn't exist. The instance root node cannot be removed.
419 pub fn remove_component(&mut self, id: &str) -> anyhow::Result<ComponentNode> {
420 let node_idx = self
421 .index
422 .get(id)
423 .copied()
424 .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in graph"))?;
425
426 if node_idx == self.instance_idx {
427 return Err(anyhow::anyhow!("Cannot remove the instance root node"));
428 }
429
430 // Capture kind before removing so we can emit an event
431 let kind = self.graph[node_idx].kind.clone();
432
433 self.index.remove(id);
434 // Remove runtime instance if present (atomic with node removal)
435 self.runtimes.remove(id);
436 // Remove event history for this component
437 self.event_history.remove_component(id);
438 // StableGraph::remove_node automatically removes all edges connected to this node
439 let removed = self
440 .graph
441 .remove_node(node_idx)
442 .ok_or_else(|| anyhow::anyhow!("Component '{id}' already removed"))?;
443
444 self.emit_event(
445 id,
446 &kind,
447 ComponentStatus::Stopped,
448 Some(format!("{kind} removed")),
449 );
450
451 Ok(removed)
452 }
453
454 /// Get a component node by ID.
455 pub fn get_component(&self, id: &str) -> Option<&ComponentNode> {
456 self.index
457 .get(id)
458 .and_then(|idx| self.graph.node_weight(*idx))
459 }
460
461 /// Get a mutable reference to a component node by ID.
462 pub fn get_component_mut(&mut self, id: &str) -> Option<&mut ComponentNode> {
463 self.index
464 .get(id)
465 .copied()
466 .and_then(|idx| self.graph.node_weight_mut(idx))
467 }
468
469 /// Check if a component exists in the graph.
470 pub fn contains(&self, id: &str) -> bool {
471 self.index.contains_key(id)
472 }
473
474 /// List all components of a specific kind with their status.
475 pub fn list_by_kind(&self, kind: &ComponentKind) -> Vec<(String, ComponentStatus)> {
476 self.graph
477 .node_weights()
478 .filter(|node| &node.kind == kind)
479 .map(|node| (node.id.clone(), node.status))
480 .collect()
481 }
482
483 /// Update a component's status.
484 ///
485 /// Emits a [`ComponentEvent`] with the new status to all subscribers.
486 /// Used internally by [`apply_update`] and tests.
487 pub(super) fn update_status(
488 &mut self,
489 id: &str,
490 status: ComponentStatus,
491 ) -> anyhow::Result<Option<ComponentEvent>> {
492 self.update_status_with_message(id, status, None)
493 }
494
495 /// Update a component's status with an optional message.
496 ///
497 /// Emits a [`ComponentEvent`] with the new status and message to all broadcast
498 /// subscribers AND records it in the centralized event history. This ensures
499 /// events are visible to both global subscribers (via broadcast) and per-component
500 /// subscribers (via event history channels).
501 ///
502 /// Called by [`apply_update`] in the graph update loop and by
503 /// [`validate_and_transition`] for command-initiated transitions.
504 fn update_status_with_message(
505 &mut self,
506 id: &str,
507 status: ComponentStatus,
508 message: Option<String>,
509 ) -> anyhow::Result<Option<ComponentEvent>> {
510 let node = self
511 .get_component_mut(id)
512 .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in graph"))?;
513 let kind = node.kind.clone();
514
515 // Same-state updates are idempotent no-ops (no event, no warning)
516 if node.status == status {
517 return Ok(None);
518 }
519
520 if !is_valid_transition(&node.status, &status) {
521 tracing::warn!(
522 "Invalid state transition for component '{}': {:?} → {:?}, ignoring update",
523 id,
524 node.status,
525 status
526 );
527 return Ok(None);
528 }
529
530 node.status = status;
531
532 // Wake up any waiters blocking on status changes (e.g., wait_for_status)
533 self.status_notify.notify_waiters();
534
535 let event = self.emit_event(id, &kind, status, message);
536 if let Some(ref event) = event {
537 self.event_history.record_event(event.clone());
538 }
539 Ok(event)
540 }
541
542 // ========================================================================
543 // Edge Operations
544 // ========================================================================
545
546 /// Add a bidirectional relationship between two components (idempotent).
547 ///
548 /// Creates both the forward edge (from → to with `forward` relationship) and
549 /// the reverse edge (to → from with the reverse of `forward`).
550 /// If the relationship already exists, this is a no-op and returns `Ok(())`.
551 ///
552 /// # Errors
553 ///
554 /// Returns an error if either component doesn't exist.
555 pub fn add_relationship(
556 &mut self,
557 from_id: &str,
558 to_id: &str,
559 forward: RelationshipKind,
560 ) -> anyhow::Result<()> {
561 let (_, _) = self.add_relationship_internal(from_id, to_id, forward)?;
562 Ok(())
563 }
564
565 /// Internal: adds a relationship and returns the edge indices for rollback.
566 /// Returns `(None, None)` if the relationship already exists (idempotent).
567 pub(super) fn add_relationship_internal(
568 &mut self,
569 from_id: &str,
570 to_id: &str,
571 forward: RelationshipKind,
572 ) -> anyhow::Result<(Option<EdgeIndex>, Option<EdgeIndex>)> {
573 let from_idx = self
574 .index
575 .get(from_id)
576 .copied()
577 .ok_or_else(|| anyhow::anyhow!("Component '{from_id}' not found in graph"))?;
578 let to_idx = self
579 .index
580 .get(to_id)
581 .copied()
582 .ok_or_else(|| anyhow::anyhow!("Component '{to_id}' not found in graph"))?;
583
584 // Validate the relationship is semantically valid for the node kinds
585 let from_kind = &self.graph[from_idx].kind;
586 let to_kind = &self.graph[to_idx].kind;
587 if !is_valid_relationship(from_kind, to_kind, &forward) {
588 return Err(anyhow::anyhow!(
589 "Invalid relationship: {forward:?} from {from_kind} '{from_id}' to {to_kind} '{to_id}'"
590 ));
591 }
592
593 // Idempotency: check if the forward edge already exists
594 let already_exists = self
595 .graph
596 .edges_directed(from_idx, Direction::Outgoing)
597 .any(|e| e.target() == to_idx && e.weight() == &forward);
598 if already_exists {
599 return Ok((None, None));
600 }
601
602 let reverse = forward.reverse();
603 let fwd_edge = self.graph.add_edge(from_idx, to_idx, forward);
604 let rev_edge = self.graph.add_edge(to_idx, from_idx, reverse);
605
606 Ok((Some(fwd_edge), Some(rev_edge)))
607 }
608
609 /// Remove a bidirectional relationship between two components.
610 ///
611 /// Removes both the forward edge (from → to with `forward` relationship) and
612 /// the reverse edge (to → from with the reverse of `forward`).
613 /// If the relationship doesn't exist, this is a no-op and returns `Ok(())`.
614 ///
615 /// # Errors
616 ///
617 /// Returns an error if either component doesn't exist in the graph.
618 pub fn remove_relationship(
619 &mut self,
620 from_id: &str,
621 to_id: &str,
622 forward: RelationshipKind,
623 ) -> anyhow::Result<()> {
624 let from_idx = self
625 .index
626 .get(from_id)
627 .copied()
628 .ok_or_else(|| anyhow::anyhow!("Component '{from_id}' not found in graph"))?;
629 let to_idx = self
630 .index
631 .get(to_id)
632 .copied()
633 .ok_or_else(|| anyhow::anyhow!("Component '{to_id}' not found in graph"))?;
634
635 let reverse = forward.reverse();
636
637 // Find and remove forward edge
638 let forward_edge = self
639 .graph
640 .edges_directed(from_idx, Direction::Outgoing)
641 .find(|e| e.target() == to_idx && e.weight() == &forward)
642 .map(|e| e.id());
643 if let Some(edge_id) = forward_edge {
644 self.graph.remove_edge(edge_id);
645 }
646
647 // Find and remove reverse edge
648 let reverse_edge = self
649 .graph
650 .edges_directed(to_idx, Direction::Outgoing)
651 .find(|e| e.target() == from_idx && e.weight() == &reverse)
652 .map(|e| e.id());
653 if let Some(edge_id) = reverse_edge {
654 self.graph.remove_edge(edge_id);
655 }
656
657 Ok(())
658 }
659
660 // ========================================================================
661 // Relationship Queries
662 // ========================================================================
663
664 /// Get all components that this component has outgoing edges to,
665 /// filtered by relationship kind.
666 pub fn get_neighbors(&self, id: &str, relationship: &RelationshipKind) -> Vec<&ComponentNode> {
667 let Some(&node_idx) = self.index.get(id) else {
668 return Vec::new();
669 };
670
671 self.graph
672 .edges_directed(node_idx, Direction::Outgoing)
673 .filter(|edge| edge.weight() == relationship)
674 .filter_map(|edge| self.graph.node_weight(edge.target()))
675 .collect()
676 }
677
678 /// Get all components that depend on the given component.
679 ///
680 /// "Dependents" are components that would be affected if this component
681 /// were removed or stopped. This follows Feeds edges (outgoing).
682 pub fn get_dependents(&self, id: &str) -> Vec<&ComponentNode> {
683 let Some(&node_idx) = self.index.get(id) else {
684 return Vec::new();
685 };
686
687 self.graph
688 .edges_directed(node_idx, Direction::Outgoing)
689 .filter(|edge| matches!(edge.weight(), RelationshipKind::Feeds))
690 .filter_map(|edge| self.graph.node_weight(edge.target()))
691 .collect()
692 }
693
694 /// Get all components that this component depends on.
695 ///
696 /// "Dependencies" are components that this component needs to function.
697 /// This follows SubscribesTo edges (outgoing).
698 pub fn get_dependencies(&self, id: &str) -> Vec<&ComponentNode> {
699 let Some(&node_idx) = self.index.get(id) else {
700 return Vec::new();
701 };
702
703 self.graph
704 .edges_directed(node_idx, Direction::Outgoing)
705 .filter(|edge| matches!(edge.weight(), RelationshipKind::SubscribesTo))
706 .filter_map(|edge| self.graph.node_weight(edge.target()))
707 .collect()
708 }
709
710 /// Check if a component can be safely removed (no dependents that would break).
711 ///
712 /// Returns Ok(()) if safe, or Err with the list of dependent component IDs.
713 pub fn can_remove(&self, id: &str) -> Result<(), Vec<String>> {
714 let dependents = self.get_dependents(id);
715 if dependents.is_empty() {
716 Ok(())
717 } else {
718 Err(dependents.iter().map(|n| n.id.clone()).collect())
719 }
720 }
721
722 // ========================================================================
723 // Lifecycle
724 // ========================================================================
725
726 /// Atomically validate and apply a commanded status transition.
727 ///
728 /// This is the **single canonical way** for managers to change a component's
729 /// status for command-initiated transitions (`Starting`, `Stopping`,
730 /// `Reconfiguring`). It combines validation and mutation under a single
731 /// `&mut self` borrow, eliminating the TOCTOU gap between checking status
732 /// and updating it.
733 ///
734 /// Components still report runtime-initiated transitions (`Running`,
735 /// `Stopped`, `Error`) via the mpsc channel → [`apply_update`].
736 ///
737 /// # Returns
738 ///
739 /// - `Ok(Some(event))` — transition applied, event emitted to broadcast subscribers
740 /// - `Ok(None)` — same-state no-op (component already in `target_status`)
741 /// - `Err(...)` — component not found or transition not valid from current state
742 ///
743 /// # Example
744 ///
745 /// ```ignore
746 /// let mut graph = self.graph.write().await;
747 /// graph.validate_and_transition("source-1", ComponentStatus::Starting, Some("Starting source"))?;
748 /// drop(graph); // release lock before calling source.start()
749 /// source.start().await?;
750 /// ```
751 pub fn validate_and_transition(
752 &mut self,
753 id: &str,
754 target_status: ComponentStatus,
755 message: Option<String>,
756 ) -> anyhow::Result<Option<ComponentEvent>> {
757 let node = self
758 .get_component(id)
759 .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in graph"))?;
760 let current = node.status;
761
762 // Same-state is an idempotent no-op
763 if current == target_status {
764 return Ok(None);
765 }
766
767 // Produce a descriptive error message for invalid transitions
768 if !is_valid_transition(¤t, &target_status) {
769 let reason = describe_invalid_transition(id, ¤t, &target_status);
770 return Err(anyhow::anyhow!(reason));
771 }
772
773 // Transition is valid — apply it
774 self.update_status_with_message(id, target_status, message)
775 }
776
777 /// Get a topological ordering of components for lifecycle operations.
778 ///
779 /// Returns components in dependency order: sources first, then queries, then reactions.
780 /// Only follows Feeds edges for ordering (other edge types don't affect lifecycle order).
781 ///
782 /// The instance root node is excluded from the result.
783 pub fn topological_order(&self) -> anyhow::Result<Vec<&ComponentNode>> {
784 // Build a filtered subgraph with only Feeds edges for ordering
785 // (bidirectional edges like SubscribesTo/OwnedBy create cycles that
786 // would prevent toposort on the full graph)
787 let mut order_graph: StableGraph<(), ()> = StableGraph::new();
788 let mut idx_map: HashMap<NodeIndex, NodeIndex> = HashMap::new();
789
790 // Add all nodes
791 for node_idx in self.graph.node_indices() {
792 let new_idx = order_graph.add_node(());
793 idx_map.insert(node_idx, new_idx);
794 }
795
796 // Add only Feeds edges
797 for edge_idx in self.graph.edge_indices() {
798 if let Some(weight) = self.graph.edge_weight(edge_idx) {
799 if matches!(weight, RelationshipKind::Feeds) {
800 if let Some((from, to)) = self.graph.edge_endpoints(edge_idx) {
801 if let (Some(&new_from), Some(&new_to)) =
802 (idx_map.get(&from), idx_map.get(&to))
803 {
804 order_graph.add_edge(new_from, new_to, ());
805 }
806 }
807 }
808 }
809 }
810
811 // Reverse map: new index → original index
812 let reverse_map: HashMap<NodeIndex, NodeIndex> =
813 idx_map.iter().map(|(&orig, &new)| (new, orig)).collect();
814
815 match petgraph::algo::toposort(&order_graph, None) {
816 Ok(sorted) => Ok(sorted
817 .into_iter()
818 .filter_map(|new_idx| reverse_map.get(&new_idx))
819 .filter(|idx| **idx != self.instance_idx)
820 .filter_map(|idx| self.graph.node_weight(*idx))
821 .collect()),
822 Err(_cycle) => Err(anyhow::anyhow!(
823 "Cycle detected in component graph — cannot determine lifecycle order"
824 )),
825 }
826 }
827
828 // ========================================================================
829 // Serialization
830 // ========================================================================
831
832 /// Create a serializable snapshot of the entire graph.
833 ///
834 /// The snapshot includes the instance root node and all components
835 /// with their relationships. Used for API responses and UI visualization.
836 pub fn snapshot(&self) -> GraphSnapshot {
837 let nodes: Vec<ComponentNode> = self.graph.node_weights().cloned().collect();
838
839 let edges: Vec<GraphEdge> = self
840 .graph
841 .edge_indices()
842 .filter_map(|edge_idx| {
843 let (from_idx, to_idx) = self.graph.edge_endpoints(edge_idx)?;
844 let from = self.graph.node_weight(from_idx)?;
845 let to = self.graph.node_weight(to_idx)?;
846 let relationship = self.graph.edge_weight(edge_idx)?;
847 Some(GraphEdge {
848 from: from.id.clone(),
849 to: to.id.clone(),
850 relationship: relationship.clone(),
851 })
852 })
853 .collect();
854
855 GraphSnapshot {
856 instance_id: self.instance_id().to_string(),
857 nodes,
858 edges,
859 }
860 }
861
862 /// Get the total number of components (including the instance root).
863 pub fn node_count(&self) -> usize {
864 self.graph.node_count()
865 }
866
867 /// Get the total number of edges.
868 pub fn edge_count(&self) -> usize {
869 self.graph.edge_count()
870 }
871
872 // ========================================================================
873 // High-Level Registration (Source of Truth)
874 // ========================================================================
875
876 /// Register a source component in the graph.
877 ///
878 /// Creates the node and bidirectional ownership edges transactionally.
879 /// Events are emitted only on successful commit.
880 ///
881 /// # Errors
882 ///
883 /// Returns an error if a component with the same ID already exists.
884 pub fn register_source(
885 &mut self,
886 id: &str,
887 metadata: HashMap<String, String>,
888 ) -> anyhow::Result<()> {
889 let node = ComponentNode {
890 id: id.to_string(),
891 kind: ComponentKind::Source,
892 status: ComponentStatus::Stopped,
893 metadata,
894 };
895 let mut txn = self.begin();
896 txn.add_component(node)?;
897 txn.commit();
898 Ok(())
899 }
900
901 /// Register a query component with its source dependencies.
902 ///
903 /// Creates the node, ownership edges, and `Feeds` edges from each source.
904 /// All operations are transactional — if any dependency is missing or any
905 /// step fails, the entire registration is rolled back.
906 ///
907 /// # Errors
908 ///
909 /// Returns an error if:
910 /// - A component with the same ID already exists
911 /// - Any referenced source does not exist in the graph
912 pub fn register_query(
913 &mut self,
914 id: &str,
915 metadata: HashMap<String, String>,
916 source_ids: &[String],
917 ) -> anyhow::Result<()> {
918 // Validate all dependencies exist before starting the transaction
919 for source_id in source_ids {
920 if !self.contains(source_id) {
921 return Err(anyhow::anyhow!(
922 "Cannot register query '{id}': referenced source '{source_id}' does not exist in the graph"
923 ));
924 }
925 }
926
927 let node = ComponentNode {
928 id: id.to_string(),
929 kind: ComponentKind::Query,
930 status: ComponentStatus::Stopped,
931 metadata,
932 };
933 let mut txn = self.begin();
934 txn.add_component(node)?;
935 for source_id in source_ids {
936 txn.add_relationship(source_id, id, RelationshipKind::Feeds)?;
937 }
938 txn.commit();
939 Ok(())
940 }
941
942 /// Register a reaction component with its query dependencies.
943 ///
944 /// Creates the node, ownership edges, and `Feeds` edges from each query.
945 /// All operations are transactional — if any dependency is missing or any
946 /// step fails, the entire registration is rolled back.
947 ///
948 /// # Errors
949 ///
950 /// Returns an error if:
951 /// - A component with the same ID already exists
952 /// - Any referenced query does not exist in the graph
953 pub fn register_reaction(
954 &mut self,
955 id: &str,
956 metadata: HashMap<String, String>,
957 query_ids: &[String],
958 ) -> anyhow::Result<()> {
959 // Validate all dependencies exist before starting the transaction
960 for query_id in query_ids {
961 if !self.contains(query_id) {
962 return Err(anyhow::anyhow!(
963 "Cannot register reaction '{id}': referenced query '{query_id}' does not exist in the graph"
964 ));
965 }
966 }
967
968 let node = ComponentNode {
969 id: id.to_string(),
970 kind: ComponentKind::Reaction,
971 status: ComponentStatus::Stopped,
972 metadata,
973 };
974 let mut txn = self.begin();
975 txn.add_component(node)?;
976 for query_id in query_ids {
977 txn.add_relationship(query_id, id, RelationshipKind::Feeds)?;
978 }
979 txn.commit();
980 Ok(())
981 }
982
983 /// Deregister a component and all its edges from the graph.
984 ///
985 /// Validates that the component exists and has no dependents before removal.
986 /// The instance root node cannot be deregistered.
987 ///
988 /// # Errors
989 ///
990 /// Returns an error if:
991 /// - The component does not exist
992 /// - The component has dependents (use `can_remove()` to check first)
993 /// - The component is the instance root node
994 pub fn deregister(&mut self, id: &str) -> anyhow::Result<ComponentNode> {
995 // Validate no dependents
996 if let Err(dependent_ids) = self.can_remove(id) {
997 return Err(anyhow::anyhow!(
998 "Cannot deregister '{}': depended on by: {}",
999 id,
1000 dependent_ids.join(", ")
1001 ));
1002 }
1003 self.remove_component(id)
1004 }
1005
1006 /// Register a bootstrap provider in the graph for topology visibility.
1007 ///
1008 /// Creates the node and bidirectional ownership edges transactionally.
1009 /// Optionally links the provider to its target source via `Bootstraps` edges.
1010 ///
1011 /// # Usage
1012 ///
1013 /// Bootstrap providers are managed internally by source plugins (set via
1014 /// `Source::set_bootstrap_provider()`). Call this method to make a bootstrap
1015 /// provider visible in the component graph for topology visualization and
1016 /// dependency tracking.
1017 ///
1018 /// ```ignore
1019 /// // After adding a source with a bootstrap provider:
1020 /// let mut graph = core.component_graph().write().await;
1021 /// graph.register_bootstrap_provider("my-bootstrap", metadata, &["my-source".into()])?;
1022 /// ```
1023 ///
1024 /// # Errors
1025 ///
1026 /// Returns an error if:
1027 /// - A component with the same ID already exists
1028 /// - Any referenced source does not exist in the graph
1029 pub fn register_bootstrap_provider(
1030 &mut self,
1031 id: &str,
1032 metadata: HashMap<String, String>,
1033 source_ids: &[String],
1034 ) -> anyhow::Result<()> {
1035 for source_id in source_ids {
1036 if !self.contains(source_id) {
1037 return Err(anyhow::anyhow!(
1038 "Cannot register bootstrap provider '{id}': referenced source '{source_id}' does not exist in the graph"
1039 ));
1040 }
1041 }
1042
1043 let node = ComponentNode {
1044 id: id.to_string(),
1045 kind: ComponentKind::BootstrapProvider,
1046 status: ComponentStatus::Stopped,
1047 metadata,
1048 };
1049 let mut txn = self.begin();
1050 txn.add_component(node)?;
1051 for source_id in source_ids {
1052 txn.add_relationship(id, source_id, RelationshipKind::Bootstraps)?;
1053 }
1054 txn.commit();
1055 Ok(())
1056 }
1057
1058 /// Register an identity provider in the graph for topology visibility.
1059 ///
1060 /// Creates the node and bidirectional ownership edges transactionally.
1061 /// Optionally links the provider to components it authenticates via
1062 /// `Authenticates` edges.
1063 ///
1064 /// # Current Status
1065 ///
1066 /// This method is **reserved for future use**. Identity provider support
1067 /// is not yet implemented in the component lifecycle. The registration
1068 /// infrastructure is in place for when authentication integration is added.
1069 ///
1070 /// # Errors
1071 ///
1072 /// Returns an error if:
1073 /// - A component with the same ID already exists
1074 /// - Any referenced component does not exist in the graph
1075 pub fn register_identity_provider(
1076 &mut self,
1077 id: &str,
1078 metadata: HashMap<String, String>,
1079 component_ids: &[String],
1080 ) -> anyhow::Result<()> {
1081 for component_id in component_ids {
1082 if !self.contains(component_id) {
1083 return Err(anyhow::anyhow!(
1084 "Cannot register identity provider '{id}': referenced component '{component_id}' does not exist in the graph"
1085 ));
1086 }
1087 }
1088
1089 let node = ComponentNode {
1090 id: id.to_string(),
1091 kind: ComponentKind::IdentityProvider,
1092 status: ComponentStatus::Stopped,
1093 metadata,
1094 };
1095 let mut txn = self.begin();
1096 txn.add_component(node)?;
1097 for component_id in component_ids {
1098 txn.add_relationship(id, component_id, RelationshipKind::Authenticates)?;
1099 }
1100 txn.commit();
1101 Ok(())
1102 }
1103
1104 // ========================================================================
1105 // Transactions
1106 // ========================================================================
1107
1108 /// Begin a transactional mutation of the graph.
1109 ///
1110 /// Returns a [`GraphTransaction`] that collects mutations (nodes, edges)
1111 /// and defers event emission until [`commit()`](GraphTransaction::commit).
1112 /// If the transaction is dropped without being committed, all added nodes
1113 /// and edges are rolled back automatically.
1114 ///
1115 /// The `&mut self` borrow ensures compile-time exclusivity — no other code
1116 /// can access the graph while a transaction is in progress.
1117 ///
1118 /// # Example
1119 ///
1120 /// ```ignore
1121 /// let mut graph = self.graph.write().await;
1122 /// let mut txn = graph.begin();
1123 /// txn.add_component(source_node)?;
1124 /// txn.add_relationship("source-1", "query-1", RelationshipKind::Feeds)?;
1125 /// txn.commit(); // events emitted here; if this line is not reached, rollback on drop
1126 /// ```
1127 pub fn begin(&mut self) -> GraphTransaction<'_> {
1128 GraphTransaction::new(self)
1129 }
1130
1131 // ========================================================================
1132 // Event History (centralized)
1133 // ========================================================================
1134
1135 /// Record a component event in the centralized history.
1136 ///
1137 /// Called internally by [`apply_update()`]. Managers should NOT call this
1138 /// directly — status updates flow through the mpsc channel and are recorded
1139 /// automatically.
1140 pub fn record_event(&mut self, event: ComponentEvent) {
1141 self.event_history.record_event(event);
1142 }
1143
1144 /// Get all lifecycle events for a specific component.
1145 ///
1146 /// Returns events in chronological order (oldest first).
1147 /// Up to 100 most recent events are retained per component.
1148 pub fn get_events(&self, component_id: &str) -> Vec<ComponentEvent> {
1149 self.event_history.get_events(component_id)
1150 }
1151
1152 /// Get all lifecycle events across all components.
1153 ///
1154 /// Returns events sorted by timestamp (oldest first).
1155 pub fn get_all_events(&self) -> Vec<ComponentEvent> {
1156 self.event_history.get_all_events()
1157 }
1158
1159 /// Get the most recent error message for a component.
1160 pub fn get_last_error(&self, component_id: &str) -> Option<String> {
1161 self.event_history.get_last_error(component_id)
1162 }
1163
1164 /// Subscribe to live lifecycle events for a component.
1165 ///
1166 /// Returns the current history and a broadcast receiver for new events.
1167 /// Creates the component's event channel if it doesn't exist.
1168 pub fn subscribe_events(
1169 &mut self,
1170 component_id: &str,
1171 ) -> (Vec<ComponentEvent>, broadcast::Receiver<ComponentEvent>) {
1172 self.event_history.subscribe(component_id)
1173 }
1174}
1175
1176// ============================================================================
1177// Relationship Validation
1178// ============================================================================
1179
1180/// Check if a relationship kind is semantically valid between two component kinds.
1181///
1182/// This enforces the graph topology rules:
1183/// - **Feeds**: Source → Query, or Query → Reaction
1184/// - **Owns/OwnedBy**: Instance ↔ any component (created automatically)
1185/// - **Bootstraps**: BootstrapProvider → Source
1186/// - **Authenticates**: IdentityProvider → any component
1187pub(super) fn is_valid_relationship(
1188 from_kind: &ComponentKind,
1189 to_kind: &ComponentKind,
1190 relationship: &RelationshipKind,
1191) -> bool {
1192 use ComponentKind::*;
1193 use RelationshipKind::*;
1194 matches!(
1195 (from_kind, to_kind, relationship),
1196 // Data flow
1197 (Source, Query, Feeds)
1198 | (Query, Reaction, Feeds)
1199 // Ownership (auto-created by add_component_internal)
1200 | (Instance, _, Owns)
1201 // Bootstrap
1202 | (BootstrapProvider, Source, Bootstraps)
1203 // Authentication
1204 | (IdentityProvider, _, Authenticates)
1205 )
1206}
1207
1208// ============================================================================
1209// State Transition Validation
1210// ============================================================================
1211
1212/// Check if a status transition is valid according to the component lifecycle state machine.
1213///
1214/// ```text
1215/// Stopped ──→ Starting ──→ Running ──→ Stopping ──→ Stopped
1216/// │ │ │ │
1217/// │ ↓ ↓ ↓
1218/// │ Error Error Error
1219/// │ │
1220/// │ ↓
1221/// │ Stopped (aborted start)
1222/// │
1223/// ↓
1224/// Reconfiguring ──→ Stopped | Starting | Error
1225///
1226/// Error ──→ Starting (retry) | Stopped (reset)
1227/// ```
1228pub(super) fn is_valid_transition(from: &ComponentStatus, to: &ComponentStatus) -> bool {
1229 use ComponentStatus::*;
1230 matches!(
1231 (from, to),
1232 // Normal lifecycle
1233 (Stopped, Starting)
1234 | (Starting, Running)
1235 | (Starting, Error)
1236 | (Starting, Stopped) // aborted start
1237 | (Running, Stopping)
1238 | (Running, Stopped) // direct stop (async channel may skip Stopping)
1239 | (Running, Error)
1240 | (Stopping, Stopped)
1241 | (Stopping, Error)
1242 // Error recovery
1243 | (Error, Starting) // retry
1244 | (Error, Stopped) // reset
1245 // Reconfiguration (from any stable state)
1246 | (Stopped, Reconfiguring)
1247 | (Running, Reconfiguring)
1248 | (Error, Reconfiguring)
1249 | (Reconfiguring, Stopped)
1250 | (Reconfiguring, Starting)
1251 | (Reconfiguring, Error)
1252 )
1253}
1254
1255/// Produce a human-readable error message for an invalid transition.
1256///
1257/// These messages provide actionable feedback (e.g., "Component is already running"
1258/// instead of just "invalid transition").
1259fn describe_invalid_transition(id: &str, from: &ComponentStatus, to: &ComponentStatus) -> String {
1260 use ComponentStatus::*;
1261 match (from, to) {
1262 // Trying to start something that's already starting/running
1263 (Starting, Starting) => format!("Component '{id}' is already starting"),
1264 (Running, Starting) => format!("Component '{id}' is already running"),
1265 (Stopping, Starting) => {
1266 format!("Cannot start component '{id}' while it is stopping")
1267 }
1268 (Reconfiguring, Starting) => {
1269 // Reconfiguring → Starting is actually valid, so this shouldn't be reached,
1270 // but kept for safety
1271 format!("Cannot start component '{id}' while it is reconfiguring")
1272 }
1273 // Trying to stop something that's already stopped/stopping
1274 (Stopped, Stopping) => {
1275 format!("Cannot stop component '{id}': it is already stopped")
1276 }
1277 (Stopping, Stopping) => format!("Component '{id}' is already stopping"),
1278 (Error, Stopping) => {
1279 format!("Cannot stop component '{id}': it is in error state")
1280 }
1281 // Trying to reconfigure during a transition
1282 (Starting, Reconfiguring) => {
1283 format!("Cannot reconfigure component '{id}' while it is starting")
1284 }
1285 (Stopping, Reconfiguring) => {
1286 format!("Cannot reconfigure component '{id}' while it is stopping")
1287 }
1288 (Reconfiguring, Reconfiguring) => {
1289 format!("Component '{id}' is already reconfiguring")
1290 }
1291 // Generic fallback
1292 _ => format!("Invalid state transition for component '{id}': {from:?} → {to:?}"),
1293 }
1294}
1295
1296impl std::fmt::Debug for ComponentGraph {
1297 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1298 f.debug_struct("ComponentGraph")
1299 .field("instance_id", &self.instance_id())
1300 .field("node_count", &self.node_count())
1301 .field("edge_count", &self.edge_count())
1302 .finish()
1303 }
1304}