drasi_lib/component_graph/graph.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use petgraph::stable_graph::{EdgeIndex, NodeIndex, StableGraph};
20use petgraph::visit::EdgeRef;
21use petgraph::Direction;
22use tokio::sync::{broadcast, mpsc, Notify};
23
24use crate::channels::{ComponentEvent, ComponentEventBroadcastReceiver, ComponentStatus};
25use crate::managers::ComponentEventHistory;
26
27use super::transaction::GraphTransaction;
28use super::{
29 ComponentKind, ComponentNode, ComponentUpdate, ComponentUpdateReceiver, ComponentUpdateSender,
30 GraphEdge, GraphSnapshot, RelationshipKind,
31};
32
33// ============================================================================
34// Component Graph
35// ============================================================================
36
37/// Central component dependency graph — the single source of truth.
38///
39/// Backed by `petgraph::stable_graph::StableGraph` which keeps node/edge indices
40/// stable across removals (critical for a graph that changes at runtime).
41///
42/// # Error Handling
43///
44/// Mutation methods on this struct return `anyhow::Result`, following the Layer 2
45/// (internal module) error convention. The public API boundary in `DrasiLib` and
46/// `lib_core_ops` wraps these into `DrasiError` variants before returning to callers.
47/// External consumers should use `DrasiLib` methods rather than calling graph
48/// mutations directly.
49///
50/// # Event Emission
51///
52/// The graph emits [`ComponentEvent`]s via a built-in broadcast channel whenever
53/// components are added, removed, or change status. Call [`subscribe()`](Self::subscribe)
54/// to receive events.
55///
56/// # Thread Safety
57///
58/// `ComponentGraph` is NOT `Send`/`Sync` by itself due to the underlying `StableGraph`.
59/// It must be wrapped in `Arc<RwLock<ComponentGraph>>` for multi-threaded access.
60/// All public APIs in `DrasiLib` handle this wrapping automatically.
61///
62/// # Instance Root
63///
64/// The graph always has the DrasiLib instance as its root node.
65/// All other components are connected to it via Owns/OwnedBy edges.
66pub struct ComponentGraph {
67 /// The underlying petgraph directed graph
68 pub(super) graph: StableGraph<ComponentNode, RelationshipKind>,
69 /// Fast lookup: component ID → node index (O(1) access)
70 pub(super) index: HashMap<String, NodeIndex>,
71 /// The instance node index (always present)
72 instance_idx: NodeIndex,
73 /// Broadcast sender for component lifecycle events (fan-out to subscribers).
74 /// Events are emitted by `add_component()`, `remove_component()`, `update_status()`,
75 /// and the graph update loop.
76 pub(super) event_tx: broadcast::Sender<ComponentEvent>,
77 /// mpsc sender for component status updates (fan-in from components).
78 /// Cloned and given to each component's Base struct. The graph update loop
79 /// owns the corresponding receiver.
80 update_tx: mpsc::Sender<ComponentUpdate>,
81 /// Notifies waiters when any component's status changes.
82 /// Used by `wait_for_status()` to replace polling loops with event-driven waits.
83 /// Follows the same pattern as `PriorityQueue::notify`.
84 status_notify: Arc<Notify>,
85 /// Type-erased runtime instances, keyed by component ID.
86 ///
87 /// Managers store their `Arc<dyn Source>`, `Arc<dyn Query>`, `Arc<dyn Reaction>`, etc.
88 /// here during provisioning. This eliminates the dual-registry pattern where each
89 /// manager maintained its own HashMap — the graph is now the single store for both
90 /// metadata (in the petgraph node) and runtime instances (here).
91 ///
92 /// Access via typed helpers: [`set_runtime()`], [`get_runtime()`], [`take_runtime()`].
93 runtimes: HashMap<String, Box<dyn Any + Send + Sync>>,
94 /// Centralized event history for all components.
95 ///
96 /// Stores lifecycle events (Starting, Running, Error, Stopped, etc.) for every
97 /// component in the graph. Events are recorded by [`apply_update()`] and
98 /// [`remove_component()`]. Managers delegate event queries here rather than
99 /// maintaining their own per-manager histories.
100 pub(super) event_history: ComponentEventHistory,
101}
102
103/// Default broadcast channel capacity for component events.
104const EVENT_CHANNEL_CAPACITY: usize = 1000;
105
106/// Default mpsc channel capacity for component updates.
107const UPDATE_CHANNEL_CAPACITY: usize = 1000;
108
109impl ComponentGraph {
110 /// Create a new component graph with the given instance as root node.
111 ///
112 /// Returns the graph and a [`ComponentUpdateReceiver`] that must be consumed by
113 /// a graph update loop task (see [`Self::apply_update`]).
114 pub fn new(instance_id: &str) -> (Self, ComponentUpdateReceiver) {
115 let mut graph = StableGraph::new();
116 let instance_node = ComponentNode {
117 id: instance_id.to_string(),
118 kind: ComponentKind::Instance,
119 status: ComponentStatus::Running,
120 metadata: HashMap::new(),
121 };
122 let instance_idx = graph.add_node(instance_node);
123
124 let mut index = HashMap::new();
125 index.insert(instance_id.to_string(), instance_idx);
126 let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
127 let (update_tx, update_rx) = mpsc::channel(UPDATE_CHANNEL_CAPACITY);
128
129 (
130 Self {
131 graph,
132 index,
133 instance_idx,
134 event_tx,
135 update_tx,
136 status_notify: Arc::new(Notify::new()),
137 runtimes: HashMap::new(),
138 event_history: ComponentEventHistory::new(),
139 },
140 update_rx,
141 )
142 }
143
144 /// Subscribe to component lifecycle events.
145 ///
146 /// Returns a broadcast receiver that gets a copy of every [`ComponentEvent`]
147 /// emitted by graph mutations (`add_component`, `remove_component`, `update_status`).
148 pub fn subscribe(&self) -> ComponentEventBroadcastReceiver {
149 self.event_tx.subscribe()
150 }
151
152 /// Get a reference to the broadcast sender for component events.
153 ///
154 /// This allows callers to clone the sender before the graph is wrapped in
155 /// `Arc<RwLock<>>`, enabling subscription without needing to acquire the lock.
156 /// The returned sender is the same one used by the graph for event emission.
157 pub fn event_sender(&self) -> &broadcast::Sender<ComponentEvent> {
158 &self.event_tx
159 }
160
161 /// Get a clone of the mpsc update sender.
162 ///
163 /// This is the sender that components use to report status changes without
164 /// acquiring any graph lock. Clone this and pass to `SourceBase`/`ReactionBase`.
165 pub fn update_sender(&self) -> ComponentUpdateSender {
166 self.update_tx.clone()
167 }
168
169 /// Get a clone of the status change notifier.
170 ///
171 /// This `Notify` is signalled whenever any component's status changes in the graph.
172 /// Use it with [`wait_for_status`] or build custom wait loops that avoid polling
173 /// with sleep.
174 ///
175 /// # Pattern
176 ///
177 /// ```ignore
178 /// let notify = graph.status_notifier();
179 /// // Register interest BEFORE releasing the graph lock
180 /// let notified = notify.notified();
181 /// drop(graph); // release lock
182 /// notified.await; // woken when any status changes
183 /// ```
184 pub fn status_notifier(&self) -> Arc<Notify> {
185 self.status_notify.clone()
186 }
187
188 /// Apply a [`ComponentUpdate`] received from the mpsc channel.
189 ///
190 /// Called by the graph update loop task. Updates the graph, emits a
191 /// broadcast event, and records the event in the centralized event history.
192 /// Returns the event for external logging/processing.
193 pub fn apply_update(&mut self, update: ComponentUpdate) -> Option<ComponentEvent> {
194 match update {
195 ComponentUpdate::Status {
196 component_id,
197 status,
198 message,
199 } => match self.update_status_with_message(&component_id, status, message) {
200 Ok(event) => event,
201 Err(e) => {
202 tracing::debug!(
203 "Graph update loop: status update skipped for '{}': {e}",
204 component_id
205 );
206 None
207 }
208 },
209 }
210 }
211
212 /// Emit a [`ComponentEvent`] to all broadcast subscribers and return it.
213 ///
214 /// Returns `Some(event)` if the component kind maps to a [`ComponentType`],
215 /// `None` for kinds like `Instance` that have no external type.
216 /// Silently ignores broadcast send failures (no subscribers connected).
217 fn emit_event(
218 &self,
219 component_id: &str,
220 kind: &ComponentKind,
221 status: ComponentStatus,
222 message: Option<String>,
223 ) -> Option<ComponentEvent> {
224 if let Some(component_type) = kind.to_component_type() {
225 let event = ComponentEvent {
226 component_id: component_id.to_string(),
227 component_type,
228 status,
229 timestamp: chrono::Utc::now(),
230 message,
231 };
232 let _ = self.event_tx.send(event.clone());
233 Some(event)
234 } else {
235 None
236 }
237 }
238
239 /// Get the instance ID (root node)
240 pub fn instance_id(&self) -> &str {
241 &self.graph[self.instance_idx].id
242 }
243
244 // ========================================================================
245 // Runtime Instance Store
246 // ========================================================================
247
248 /// Store a runtime instance for a component.
249 ///
250 /// The component must already exist in the graph (registered via `add_component`
251 /// or one of the `register_*` methods). The runtime instance is stored in a
252 /// type-erased map keyed by component ID.
253 ///
254 /// Managers call this during provisioning after creating the runtime instance:
255 /// ```ignore
256 /// let source: Arc<dyn Source> = Arc::new(my_source);
257 /// source.initialize(context).await;
258 /// graph.set_runtime("source-1", Box::new(source))?;
259 /// ```
260 ///
261 /// # Errors
262 ///
263 /// Returns an error if the component ID is not present in the graph.
264 pub fn set_runtime(
265 &mut self,
266 id: &str,
267 runtime: Box<dyn Any + Send + Sync>,
268 ) -> anyhow::Result<()> {
269 if !self.index.contains_key(id) {
270 return Err(anyhow::anyhow!(
271 "set_runtime called for component '{id}' which is not in the graph"
272 ));
273 }
274
275 // Warn if the runtime type doesn't match the component kind.
276 // This catches type mismatches during development. Uses a warning rather
277 // than a hard assert to avoid breaking unit tests that use substitute types.
278 #[cfg(debug_assertions)]
279 if let Some(node) = self.get_component(id) {
280 let kind = &node.kind;
281 let type_ok = match kind {
282 ComponentKind::Source => runtime
283 .downcast_ref::<std::sync::Arc<dyn crate::sources::Source>>()
284 .is_some(),
285 ComponentKind::Query => runtime
286 .downcast_ref::<std::sync::Arc<dyn crate::queries::manager::Query>>()
287 .is_some(),
288 ComponentKind::Reaction => runtime
289 .downcast_ref::<std::sync::Arc<dyn crate::reactions::Reaction>>()
290 .is_some(),
291 _ => true,
292 };
293 if !type_ok {
294 tracing::warn!(
295 "set_runtime: possible type mismatch for component '{id}' (kind={kind})"
296 );
297 }
298 }
299
300 self.runtimes.insert(id.to_string(), runtime);
301 Ok(())
302 }
303
304 /// Retrieve a reference to a component's runtime instance, downcasting to `T`.
305 ///
306 /// Returns `None` if the component has no runtime instance or if the stored
307 /// type doesn't match `T`.
308 ///
309 /// # Type Parameter
310 ///
311 /// `T` is typically `Arc<dyn Source>`, `Arc<dyn Query>`, or `Arc<dyn Reaction>`.
312 ///
313 /// # Example
314 ///
315 /// ```ignore
316 /// let source: &Arc<dyn Source> = graph.get_runtime::<Arc<dyn Source>>("source-1")
317 /// .ok_or_else(|| anyhow!("Source not found"))?;
318 /// let source = source.clone(); // Clone the Arc for use outside the lock
319 /// ```
320 pub fn get_runtime<T: 'static>(&self, id: &str) -> Option<&T> {
321 self.runtimes.get(id).and_then(|r| r.downcast_ref::<T>())
322 }
323
324 /// Remove and return a component's runtime instance, downcasting to `T`.
325 ///
326 /// Returns `None` if the component has no runtime instance or if the stored
327 /// type doesn't match `T`. On type mismatch, the runtime is **put back** into
328 /// the store (not lost) and an error is logged.
329 ///
330 /// Used during teardown when the manager needs ownership of the instance
331 /// (e.g., to call `deprovision()`).
332 pub fn take_runtime<T: 'static>(&mut self, id: &str) -> Option<T> {
333 let runtime = self.runtimes.remove(id)?;
334 match runtime.downcast::<T>() {
335 Ok(boxed) => Some(*boxed),
336 Err(runtime) => {
337 tracing::error!(
338 "take_runtime: type mismatch for component '{id}', putting runtime back"
339 );
340 self.runtimes.insert(id.to_string(), runtime);
341 None
342 }
343 }
344 }
345
346 /// Check if a component has a runtime instance stored.
347 pub fn has_runtime(&self, id: &str) -> bool {
348 self.runtimes.contains_key(id)
349 }
350
351 // ========================================================================
352 // Node Operations
353 // ========================================================================
354
355 /// Add a component node to the graph.
356 ///
357 /// Automatically creates bidirectional Owns/OwnedBy edges between the
358 /// instance root and the new component. Emits a [`ComponentEvent`] with
359 /// the node's current status to all subscribers.
360 ///
361 /// # Errors
362 ///
363 /// Returns an error if a component with the same ID already exists.
364 pub fn add_component(&mut self, node: ComponentNode) -> anyhow::Result<NodeIndex> {
365 let (node_idx, event, _, _) = self.add_component_internal(node)?;
366 if let Some(event) = event {
367 let _ = self.event_tx.send(event.clone());
368 self.event_history.record_event(event);
369 }
370 Ok(node_idx)
371 }
372
373 /// Internal: adds a component and returns the event without emitting it.
374 /// Used by both `add_component()` (emits immediately) and `GraphTransaction`
375 /// (defers emission to commit).
376 pub(super) fn add_component_internal(
377 &mut self,
378 node: ComponentNode,
379 ) -> anyhow::Result<(NodeIndex, Option<ComponentEvent>, EdgeIndex, EdgeIndex)> {
380 if self.index.contains_key(&node.id) {
381 return Err(anyhow::anyhow!(
382 "{} '{}' already exists in the graph",
383 node.kind,
384 node.id
385 ));
386 }
387
388 let id = node.id.clone();
389 let kind = node.kind.clone();
390 let status = node.status;
391 let node_idx = self.graph.add_node(node);
392 self.index.insert(id.clone(), node_idx);
393
394 // Create bidirectional ownership edges (Instance ↔ Component)
395 let owns_edge = self
396 .graph
397 .add_edge(self.instance_idx, node_idx, RelationshipKind::Owns);
398 let owned_by_edge =
399 self.graph
400 .add_edge(node_idx, self.instance_idx, RelationshipKind::OwnedBy);
401
402 let event = kind
403 .to_component_type()
404 .map(|component_type| ComponentEvent {
405 component_id: id,
406 component_type,
407 status,
408 timestamp: chrono::Utc::now(),
409 message: Some(format!("{kind} added")),
410 });
411
412 Ok((node_idx, event, owns_edge, owned_by_edge))
413 }
414
415 /// Remove a component node and all its edges from the graph.
416 ///
417 /// Emits a [`ComponentEvent`] with status [`ComponentStatus::Removed`] to all
418 /// subscribers. Returns the removed node data, or an error if the component
419 /// doesn't exist. The instance root node cannot be removed.
420 pub fn remove_component(&mut self, id: &str) -> anyhow::Result<ComponentNode> {
421 let node_idx = self
422 .index
423 .get(id)
424 .copied()
425 .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in graph"))?;
426
427 if node_idx == self.instance_idx {
428 return Err(anyhow::anyhow!("Cannot remove the instance root node"));
429 }
430
431 // Capture kind before removing so we can emit an event
432 let kind = self.graph[node_idx].kind.clone();
433
434 self.index.remove(id);
435 // Remove runtime instance if present (atomic with node removal)
436 self.runtimes.remove(id);
437 // Remove event history for this component
438 self.event_history.remove_component(id);
439 // StableGraph::remove_node automatically removes all edges connected to this node
440 let removed = self
441 .graph
442 .remove_node(node_idx)
443 .ok_or_else(|| anyhow::anyhow!("Component '{id}' already removed"))?;
444
445 self.emit_event(
446 id,
447 &kind,
448 ComponentStatus::Removed,
449 Some(format!("{kind} removed")),
450 );
451
452 Ok(removed)
453 }
454
455 /// Get a component node by ID.
456 pub fn get_component(&self, id: &str) -> Option<&ComponentNode> {
457 self.index
458 .get(id)
459 .and_then(|idx| self.graph.node_weight(*idx))
460 }
461
462 /// Get a mutable reference to a component node by ID.
463 pub fn get_component_mut(&mut self, id: &str) -> Option<&mut ComponentNode> {
464 self.index
465 .get(id)
466 .copied()
467 .and_then(|idx| self.graph.node_weight_mut(idx))
468 }
469
470 /// Check if a component exists in the graph.
471 pub fn contains(&self, id: &str) -> bool {
472 self.index.contains_key(id)
473 }
474
475 /// List all components of a specific kind with their status.
476 pub fn list_by_kind(&self, kind: &ComponentKind) -> Vec<(String, ComponentStatus)> {
477 self.graph
478 .node_weights()
479 .filter(|node| &node.kind == kind)
480 .map(|node| (node.id.clone(), node.status))
481 .collect()
482 }
483
484 /// Update a component's status.
485 ///
486 /// Emits a [`ComponentEvent`] with the new status to all subscribers.
487 /// Used internally by [`apply_update`] and tests.
488 pub(super) fn update_status(
489 &mut self,
490 id: &str,
491 status: ComponentStatus,
492 ) -> anyhow::Result<Option<ComponentEvent>> {
493 self.update_status_with_message(id, status, None)
494 }
495
496 /// Update a component's status with an optional message.
497 ///
498 /// Emits a [`ComponentEvent`] with the new status and message to all broadcast
499 /// subscribers AND records it in the centralized event history. This ensures
500 /// events are visible to both global subscribers (via broadcast) and per-component
501 /// subscribers (via event history channels).
502 ///
503 /// Called by [`apply_update`] in the graph update loop and by
504 /// [`validate_and_transition`] for command-initiated transitions.
505 fn update_status_with_message(
506 &mut self,
507 id: &str,
508 status: ComponentStatus,
509 message: Option<String>,
510 ) -> anyhow::Result<Option<ComponentEvent>> {
511 let node = self
512 .get_component_mut(id)
513 .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in graph"))?;
514 let kind = node.kind.clone();
515
516 // Same-state updates are idempotent no-ops (no event, no warning)
517 if node.status == status {
518 return Ok(None);
519 }
520
521 if !is_valid_transition(&node.status, &status) {
522 tracing::warn!(
523 "Invalid state transition for component '{}': {:?} → {:?}, ignoring update",
524 id,
525 node.status,
526 status
527 );
528 return Ok(None);
529 }
530
531 node.status = status;
532
533 // Wake up any waiters blocking on status changes (e.g., wait_for_status)
534 self.status_notify.notify_waiters();
535
536 let event = self.emit_event(id, &kind, status, message);
537 if let Some(ref event) = event {
538 self.event_history.record_event(event.clone());
539 }
540 Ok(event)
541 }
542
543 // ========================================================================
544 // Edge Operations
545 // ========================================================================
546
547 /// Add a bidirectional relationship between two components (idempotent).
548 ///
549 /// Creates both the forward edge (from → to with `forward` relationship) and
550 /// the reverse edge (to → from with the reverse of `forward`).
551 /// If the relationship already exists, this is a no-op and returns `Ok(())`.
552 ///
553 /// # Errors
554 ///
555 /// Returns an error if either component doesn't exist.
556 pub fn add_relationship(
557 &mut self,
558 from_id: &str,
559 to_id: &str,
560 forward: RelationshipKind,
561 ) -> anyhow::Result<()> {
562 let (_, _) = self.add_relationship_internal(from_id, to_id, forward)?;
563 Ok(())
564 }
565
566 /// Internal: adds a relationship and returns the edge indices for rollback.
567 /// Returns `(None, None)` if the relationship already exists (idempotent).
568 pub(super) fn add_relationship_internal(
569 &mut self,
570 from_id: &str,
571 to_id: &str,
572 forward: RelationshipKind,
573 ) -> anyhow::Result<(Option<EdgeIndex>, Option<EdgeIndex>)> {
574 let from_idx = self
575 .index
576 .get(from_id)
577 .copied()
578 .ok_or_else(|| anyhow::anyhow!("Component '{from_id}' not found in graph"))?;
579 let to_idx = self
580 .index
581 .get(to_id)
582 .copied()
583 .ok_or_else(|| anyhow::anyhow!("Component '{to_id}' not found in graph"))?;
584
585 // Validate the relationship is semantically valid for the node kinds
586 let from_kind = &self.graph[from_idx].kind;
587 let to_kind = &self.graph[to_idx].kind;
588 if !is_valid_relationship(from_kind, to_kind, &forward) {
589 return Err(anyhow::anyhow!(
590 "Invalid relationship: {forward:?} from {from_kind} '{from_id}' to {to_kind} '{to_id}'"
591 ));
592 }
593
594 // Idempotency: check if the forward edge already exists
595 let already_exists = self
596 .graph
597 .edges_directed(from_idx, Direction::Outgoing)
598 .any(|e| e.target() == to_idx && e.weight() == &forward);
599 if already_exists {
600 return Ok((None, None));
601 }
602
603 let reverse = forward.reverse();
604 let fwd_edge = self.graph.add_edge(from_idx, to_idx, forward);
605 let rev_edge = self.graph.add_edge(to_idx, from_idx, reverse);
606
607 Ok((Some(fwd_edge), Some(rev_edge)))
608 }
609
610 /// Remove a bidirectional relationship between two components.
611 ///
612 /// Removes both the forward edge (from → to with `forward` relationship) and
613 /// the reverse edge (to → from with the reverse of `forward`).
614 /// If the relationship doesn't exist, this is a no-op and returns `Ok(())`.
615 ///
616 /// # Errors
617 ///
618 /// Returns an error if either component doesn't exist in the graph.
619 pub fn remove_relationship(
620 &mut self,
621 from_id: &str,
622 to_id: &str,
623 forward: RelationshipKind,
624 ) -> anyhow::Result<()> {
625 let from_idx = self
626 .index
627 .get(from_id)
628 .copied()
629 .ok_or_else(|| anyhow::anyhow!("Component '{from_id}' not found in graph"))?;
630 let to_idx = self
631 .index
632 .get(to_id)
633 .copied()
634 .ok_or_else(|| anyhow::anyhow!("Component '{to_id}' not found in graph"))?;
635
636 let reverse = forward.reverse();
637
638 // Find and remove forward edge
639 let forward_edge = self
640 .graph
641 .edges_directed(from_idx, Direction::Outgoing)
642 .find(|e| e.target() == to_idx && e.weight() == &forward)
643 .map(|e| e.id());
644 if let Some(edge_id) = forward_edge {
645 self.graph.remove_edge(edge_id);
646 }
647
648 // Find and remove reverse edge
649 let reverse_edge = self
650 .graph
651 .edges_directed(to_idx, Direction::Outgoing)
652 .find(|e| e.target() == from_idx && e.weight() == &reverse)
653 .map(|e| e.id());
654 if let Some(edge_id) = reverse_edge {
655 self.graph.remove_edge(edge_id);
656 }
657
658 Ok(())
659 }
660
661 // ========================================================================
662 // Relationship Queries
663 // ========================================================================
664
665 /// Get all components that this component has outgoing edges to,
666 /// filtered by relationship kind.
667 pub fn get_neighbors(&self, id: &str, relationship: &RelationshipKind) -> Vec<&ComponentNode> {
668 let Some(&node_idx) = self.index.get(id) else {
669 return Vec::new();
670 };
671
672 self.graph
673 .edges_directed(node_idx, Direction::Outgoing)
674 .filter(|edge| edge.weight() == relationship)
675 .filter_map(|edge| self.graph.node_weight(edge.target()))
676 .collect()
677 }
678
679 /// Get all components that depend on the given component.
680 ///
681 /// "Dependents" are components that would be affected if this component
682 /// were removed or stopped. This follows Feeds edges (outgoing).
683 pub fn get_dependents(&self, id: &str) -> Vec<&ComponentNode> {
684 let Some(&node_idx) = self.index.get(id) else {
685 return Vec::new();
686 };
687
688 self.graph
689 .edges_directed(node_idx, Direction::Outgoing)
690 .filter(|edge| matches!(edge.weight(), RelationshipKind::Feeds))
691 .filter_map(|edge| self.graph.node_weight(edge.target()))
692 .collect()
693 }
694
695 /// Get all components that this component depends on.
696 ///
697 /// "Dependencies" are components that this component needs to function.
698 /// This follows SubscribesTo edges (outgoing).
699 pub fn get_dependencies(&self, id: &str) -> Vec<&ComponentNode> {
700 let Some(&node_idx) = self.index.get(id) else {
701 return Vec::new();
702 };
703
704 self.graph
705 .edges_directed(node_idx, Direction::Outgoing)
706 .filter(|edge| matches!(edge.weight(), RelationshipKind::SubscribesTo))
707 .filter_map(|edge| self.graph.node_weight(edge.target()))
708 .collect()
709 }
710
711 /// Check if a component can be safely removed (no dependents that would break).
712 ///
713 /// Returns Ok(()) if safe, or Err with the list of dependent component IDs.
714 pub fn can_remove(&self, id: &str) -> Result<(), Vec<String>> {
715 let dependents = self.get_dependents(id);
716 if dependents.is_empty() {
717 Ok(())
718 } else {
719 Err(dependents.iter().map(|n| n.id.clone()).collect())
720 }
721 }
722
723 // ========================================================================
724 // Lifecycle
725 // ========================================================================
726
727 /// Atomically validate and apply a commanded status transition.
728 ///
729 /// This is the **single canonical way** for managers to change a component's
730 /// status for command-initiated transitions (`Starting`, `Stopping`,
731 /// `Reconfiguring`). It combines validation and mutation under a single
732 /// `&mut self` borrow, eliminating the TOCTOU gap between checking status
733 /// and updating it.
734 ///
735 /// Components still report runtime-initiated transitions (`Running`,
736 /// `Stopped`, `Error`) via the mpsc channel → [`apply_update`].
737 ///
738 /// # Returns
739 ///
740 /// - `Ok(Some(event))` — transition applied, event emitted to broadcast subscribers
741 /// - `Ok(None)` — same-state no-op (component already in `target_status`)
742 /// - `Err(...)` — component not found or transition not valid from current state
743 ///
744 /// # Example
745 ///
746 /// ```ignore
747 /// let mut graph = self.graph.write().await;
748 /// graph.validate_and_transition("source-1", ComponentStatus::Starting, Some("Starting source"))?;
749 /// drop(graph); // release lock before calling source.start()
750 /// source.start().await?;
751 /// ```
752 pub fn validate_and_transition(
753 &mut self,
754 id: &str,
755 target_status: ComponentStatus,
756 message: Option<String>,
757 ) -> anyhow::Result<Option<ComponentEvent>> {
758 let node = self
759 .get_component(id)
760 .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in graph"))?;
761 let current = node.status;
762
763 // Same-state is an idempotent no-op
764 if current == target_status {
765 return Ok(None);
766 }
767
768 // Produce a descriptive error message for invalid transitions
769 if !is_valid_transition(¤t, &target_status) {
770 let reason = describe_invalid_transition(id, ¤t, &target_status);
771 return Err(anyhow::anyhow!(reason));
772 }
773
774 // Transition is valid — apply it
775 self.update_status_with_message(id, target_status, message)
776 }
777
778 /// Get a topological ordering of components for lifecycle operations.
779 ///
780 /// Returns components in dependency order: sources first, then queries, then reactions.
781 /// Only follows Feeds edges for ordering (other edge types don't affect lifecycle order).
782 ///
783 /// The instance root node is excluded from the result.
784 pub fn topological_order(&self) -> anyhow::Result<Vec<&ComponentNode>> {
785 // Build a filtered subgraph with only Feeds edges for ordering
786 // (bidirectional edges like SubscribesTo/OwnedBy create cycles that
787 // would prevent toposort on the full graph)
788 let mut order_graph: StableGraph<(), ()> = StableGraph::new();
789 let mut idx_map: HashMap<NodeIndex, NodeIndex> = HashMap::new();
790
791 // Add all nodes
792 for node_idx in self.graph.node_indices() {
793 let new_idx = order_graph.add_node(());
794 idx_map.insert(node_idx, new_idx);
795 }
796
797 // Add only Feeds edges
798 for edge_idx in self.graph.edge_indices() {
799 if let Some(weight) = self.graph.edge_weight(edge_idx) {
800 if matches!(weight, RelationshipKind::Feeds) {
801 if let Some((from, to)) = self.graph.edge_endpoints(edge_idx) {
802 if let (Some(&new_from), Some(&new_to)) =
803 (idx_map.get(&from), idx_map.get(&to))
804 {
805 order_graph.add_edge(new_from, new_to, ());
806 }
807 }
808 }
809 }
810 }
811
812 // Reverse map: new index → original index
813 let reverse_map: HashMap<NodeIndex, NodeIndex> =
814 idx_map.iter().map(|(&orig, &new)| (new, orig)).collect();
815
816 match petgraph::algo::toposort(&order_graph, None) {
817 Ok(sorted) => Ok(sorted
818 .into_iter()
819 .filter_map(|new_idx| reverse_map.get(&new_idx))
820 .filter(|idx| **idx != self.instance_idx)
821 .filter_map(|idx| self.graph.node_weight(*idx))
822 .collect()),
823 Err(_cycle) => Err(anyhow::anyhow!(
824 "Cycle detected in component graph — cannot determine lifecycle order"
825 )),
826 }
827 }
828
829 // ========================================================================
830 // Serialization
831 // ========================================================================
832
833 /// Create a serializable snapshot of the entire graph.
834 ///
835 /// The snapshot includes the instance root node and all components
836 /// with their relationships. Used for API responses and UI visualization.
837 pub fn snapshot(&self) -> GraphSnapshot {
838 let nodes: Vec<ComponentNode> = self.graph.node_weights().cloned().collect();
839
840 let edges: Vec<GraphEdge> = self
841 .graph
842 .edge_indices()
843 .filter_map(|edge_idx| {
844 let (from_idx, to_idx) = self.graph.edge_endpoints(edge_idx)?;
845 let from = self.graph.node_weight(from_idx)?;
846 let to = self.graph.node_weight(to_idx)?;
847 let relationship = self.graph.edge_weight(edge_idx)?;
848 Some(GraphEdge {
849 from: from.id.clone(),
850 to: to.id.clone(),
851 relationship: relationship.clone(),
852 })
853 })
854 .collect();
855
856 GraphSnapshot {
857 instance_id: self.instance_id().to_string(),
858 nodes,
859 edges,
860 }
861 }
862
863 /// Get the total number of components (including the instance root).
864 pub fn node_count(&self) -> usize {
865 self.graph.node_count()
866 }
867
868 /// Get the total number of edges.
869 pub fn edge_count(&self) -> usize {
870 self.graph.edge_count()
871 }
872
873 // ========================================================================
874 // High-Level Registration (Source of Truth)
875 // ========================================================================
876
877 /// Register a source component in the graph.
878 ///
879 /// Creates the node and bidirectional ownership edges transactionally.
880 /// Events are emitted only on successful commit.
881 ///
882 /// # Errors
883 ///
884 /// Returns an error if a component with the same ID already exists.
885 pub fn register_source(
886 &mut self,
887 id: &str,
888 metadata: HashMap<String, String>,
889 ) -> anyhow::Result<()> {
890 let node = ComponentNode {
891 id: id.to_string(),
892 kind: ComponentKind::Source,
893 status: ComponentStatus::Added,
894 metadata,
895 };
896 let mut txn = self.begin();
897 txn.add_component(node)?;
898 txn.commit();
899 Ok(())
900 }
901
902 /// Register a query component with its source dependencies.
903 ///
904 /// Creates the node, ownership edges, and `Feeds` edges from each source.
905 /// All operations are transactional — if any dependency is missing or any
906 /// step fails, the entire registration is rolled back.
907 ///
908 /// # Errors
909 ///
910 /// Returns an error if:
911 /// - A component with the same ID already exists
912 /// - Any referenced source does not exist in the graph
913 pub fn register_query(
914 &mut self,
915 id: &str,
916 metadata: HashMap<String, String>,
917 source_ids: &[String],
918 ) -> anyhow::Result<()> {
919 // Validate all dependencies exist before starting the transaction
920 for source_id in source_ids {
921 if !self.contains(source_id) {
922 return Err(anyhow::anyhow!(
923 "Cannot register query '{id}': referenced source '{source_id}' does not exist in the graph"
924 ));
925 }
926 }
927
928 let node = ComponentNode {
929 id: id.to_string(),
930 kind: ComponentKind::Query,
931 status: ComponentStatus::Added,
932 metadata,
933 };
934 let mut txn = self.begin();
935 txn.add_component(node)?;
936 for source_id in source_ids {
937 txn.add_relationship(source_id, id, RelationshipKind::Feeds)?;
938 }
939 txn.commit();
940 Ok(())
941 }
942
943 /// Register a reaction component with its query dependencies.
944 ///
945 /// Creates the node, ownership edges, and `Feeds` edges from each query.
946 /// All operations are transactional — if any dependency is missing or any
947 /// step fails, the entire registration is rolled back.
948 ///
949 /// # Errors
950 ///
951 /// Returns an error if:
952 /// - A component with the same ID already exists
953 /// - Any referenced query does not exist in the graph
954 pub fn register_reaction(
955 &mut self,
956 id: &str,
957 metadata: HashMap<String, String>,
958 query_ids: &[String],
959 ) -> anyhow::Result<()> {
960 // Validate all dependencies exist before starting the transaction
961 for query_id in query_ids {
962 if !self.contains(query_id) {
963 return Err(anyhow::anyhow!(
964 "Cannot register reaction '{id}': referenced query '{query_id}' does not exist in the graph"
965 ));
966 }
967 }
968
969 let node = ComponentNode {
970 id: id.to_string(),
971 kind: ComponentKind::Reaction,
972 status: ComponentStatus::Added,
973 metadata,
974 };
975 let mut txn = self.begin();
976 txn.add_component(node)?;
977 for query_id in query_ids {
978 txn.add_relationship(query_id, id, RelationshipKind::Feeds)?;
979 }
980 txn.commit();
981 Ok(())
982 }
983
984 /// Deregister a component and all its edges from the graph.
985 ///
986 /// Validates that the component exists and has no dependents before removal.
987 /// The instance root node cannot be deregistered.
988 ///
989 /// # Errors
990 ///
991 /// Returns an error if:
992 /// - The component does not exist
993 /// - The component has dependents (use `can_remove()` to check first)
994 /// - The component is the instance root node
995 pub fn deregister(&mut self, id: &str) -> anyhow::Result<ComponentNode> {
996 // Validate no dependents
997 if let Err(dependent_ids) = self.can_remove(id) {
998 return Err(anyhow::anyhow!(
999 "Cannot deregister '{}': depended on by: {}",
1000 id,
1001 dependent_ids.join(", ")
1002 ));
1003 }
1004 self.remove_component(id)
1005 }
1006
1007 /// Register a bootstrap provider in the graph for topology visibility.
1008 ///
1009 /// Creates the node and bidirectional ownership edges transactionally.
1010 /// Optionally links the provider to its target source via `Bootstraps` edges.
1011 ///
1012 /// # Usage
1013 ///
1014 /// Bootstrap providers are managed internally by source plugins (set via
1015 /// `Source::set_bootstrap_provider()`). Call this method to make a bootstrap
1016 /// provider visible in the component graph for topology visualization and
1017 /// dependency tracking.
1018 ///
1019 /// ```ignore
1020 /// // After adding a source with a bootstrap provider:
1021 /// let mut graph = core.component_graph().write().await;
1022 /// graph.register_bootstrap_provider("my-bootstrap", metadata, &["my-source".into()])?;
1023 /// ```
1024 ///
1025 /// # Errors
1026 ///
1027 /// Returns an error if:
1028 /// - A component with the same ID already exists
1029 /// - Any referenced source does not exist in the graph
1030 pub fn register_bootstrap_provider(
1031 &mut self,
1032 id: &str,
1033 metadata: HashMap<String, String>,
1034 source_ids: &[String],
1035 ) -> anyhow::Result<()> {
1036 for source_id in source_ids {
1037 if !self.contains(source_id) {
1038 return Err(anyhow::anyhow!(
1039 "Cannot register bootstrap provider '{id}': referenced source '{source_id}' does not exist in the graph"
1040 ));
1041 }
1042 }
1043
1044 let node = ComponentNode {
1045 id: id.to_string(),
1046 kind: ComponentKind::BootstrapProvider,
1047 status: ComponentStatus::Added,
1048 metadata,
1049 };
1050 let mut txn = self.begin();
1051 txn.add_component(node)?;
1052 for source_id in source_ids {
1053 txn.add_relationship(id, source_id, RelationshipKind::Bootstraps)?;
1054 }
1055 txn.commit();
1056 Ok(())
1057 }
1058
1059 /// Register an identity provider in the graph for topology visibility.
1060 ///
1061 /// Creates the node and bidirectional ownership edges transactionally.
1062 /// Optionally links the provider to components it authenticates via
1063 /// `Authenticates` edges.
1064 ///
1065 /// # Current Status
1066 ///
1067 /// This method is **reserved for future use**. Identity provider support
1068 /// is not yet implemented in the component lifecycle. The registration
1069 /// infrastructure is in place for when authentication integration is added.
1070 ///
1071 /// # Errors
1072 ///
1073 /// Returns an error if:
1074 /// - A component with the same ID already exists
1075 /// - Any referenced component does not exist in the graph
1076 pub fn register_identity_provider(
1077 &mut self,
1078 id: &str,
1079 metadata: HashMap<String, String>,
1080 component_ids: &[String],
1081 ) -> anyhow::Result<()> {
1082 for component_id in component_ids {
1083 if !self.contains(component_id) {
1084 return Err(anyhow::anyhow!(
1085 "Cannot register identity provider '{id}': referenced component '{component_id}' does not exist in the graph"
1086 ));
1087 }
1088 }
1089
1090 let node = ComponentNode {
1091 id: id.to_string(),
1092 kind: ComponentKind::IdentityProvider,
1093 status: ComponentStatus::Added,
1094 metadata,
1095 };
1096 let mut txn = self.begin();
1097 txn.add_component(node)?;
1098 for component_id in component_ids {
1099 txn.add_relationship(id, component_id, RelationshipKind::Authenticates)?;
1100 }
1101 txn.commit();
1102 Ok(())
1103 }
1104
1105 // ========================================================================
1106 // Transactions
1107 // ========================================================================
1108
1109 /// Begin a transactional mutation of the graph.
1110 ///
1111 /// Returns a [`GraphTransaction`] that collects mutations (nodes, edges)
1112 /// and defers event emission until [`commit()`](GraphTransaction::commit).
1113 /// If the transaction is dropped without being committed, all added nodes
1114 /// and edges are rolled back automatically.
1115 ///
1116 /// The `&mut self` borrow ensures compile-time exclusivity — no other code
1117 /// can access the graph while a transaction is in progress.
1118 ///
1119 /// # Example
1120 ///
1121 /// ```ignore
1122 /// let mut graph = self.graph.write().await;
1123 /// let mut txn = graph.begin();
1124 /// txn.add_component(source_node)?;
1125 /// txn.add_relationship("source-1", "query-1", RelationshipKind::Feeds)?;
1126 /// txn.commit(); // events emitted here; if this line is not reached, rollback on drop
1127 /// ```
1128 pub fn begin(&mut self) -> GraphTransaction<'_> {
1129 GraphTransaction::new(self)
1130 }
1131
1132 // ========================================================================
1133 // Event History (centralized)
1134 // ========================================================================
1135
1136 /// Record a component event in the centralized history.
1137 ///
1138 /// Called internally by [`apply_update()`]. Managers should NOT call this
1139 /// directly — status updates flow through the mpsc channel and are recorded
1140 /// automatically.
1141 pub fn record_event(&mut self, event: ComponentEvent) {
1142 self.event_history.record_event(event);
1143 }
1144
1145 /// Get all lifecycle events for a specific component.
1146 ///
1147 /// Returns events in chronological order (oldest first).
1148 /// Up to 100 most recent events are retained per component.
1149 pub fn get_events(&self, component_id: &str) -> Vec<ComponentEvent> {
1150 self.event_history.get_events(component_id)
1151 }
1152
1153 /// Get all lifecycle events across all components.
1154 ///
1155 /// Returns events sorted by timestamp (oldest first).
1156 pub fn get_all_events(&self) -> Vec<ComponentEvent> {
1157 self.event_history.get_all_events()
1158 }
1159
1160 /// Get the most recent error message for a component.
1161 pub fn get_last_error(&self, component_id: &str) -> Option<String> {
1162 self.event_history.get_last_error(component_id)
1163 }
1164
1165 /// Subscribe to live lifecycle events for a component.
1166 ///
1167 /// Returns the current history and a broadcast receiver for new events,
1168 /// or `None` if the component has no event channel yet.
1169 pub fn subscribe_events(
1170 &self,
1171 component_id: &str,
1172 ) -> Option<(Vec<ComponentEvent>, broadcast::Receiver<ComponentEvent>)> {
1173 self.event_history.try_subscribe(component_id)
1174 }
1175}
1176
1177// ============================================================================
1178// Relationship Validation
1179// ============================================================================
1180
1181/// Check if a relationship kind is semantically valid between two component kinds.
1182///
1183/// This enforces the graph topology rules:
1184/// - **Feeds**: Source → Query, or Query → Reaction
1185/// - **Owns/OwnedBy**: Instance ↔ any component (created automatically)
1186/// - **Bootstraps**: BootstrapProvider → Source
1187/// - **Authenticates**: IdentityProvider → any component
1188pub(super) fn is_valid_relationship(
1189 from_kind: &ComponentKind,
1190 to_kind: &ComponentKind,
1191 relationship: &RelationshipKind,
1192) -> bool {
1193 use ComponentKind::*;
1194 use RelationshipKind::*;
1195 matches!(
1196 (from_kind, to_kind, relationship),
1197 // Data flow
1198 (Source, Query, Feeds)
1199 | (Query, Reaction, Feeds)
1200 // Ownership (auto-created by add_component_internal)
1201 | (Instance, _, Owns)
1202 // Bootstrap
1203 | (BootstrapProvider, Source, Bootstraps)
1204 // Authentication
1205 | (IdentityProvider, _, Authenticates)
1206 )
1207}
1208
1209// ============================================================================
1210// State Transition Validation
1211// ============================================================================
1212
1213/// Check if a status transition is valid according to the component lifecycle state machine.
1214///
1215/// ```text
1216/// Added ──→ Starting ──→ Running ──→ Stopping ──→ Stopped
1217/// │ │ │ │
1218/// │ ↓ ↓ ↓
1219/// │ Error Error Error
1220/// │ │
1221/// │ ↓
1222/// │ Stopped (aborted start)
1223/// │
1224/// ↓
1225/// Reconfiguring ──→ Stopped | Starting | Error
1226///
1227/// Error ──→ Starting (retry) | Stopped (reset)
1228///
1229/// Note: Added and Removed are set by the graph on add/remove_component()
1230/// and are NOT valid targets for validate_and_transition().
1231/// ```
1232pub(super) fn is_valid_transition(from: &ComponentStatus, to: &ComponentStatus) -> bool {
1233 use ComponentStatus::*;
1234 matches!(
1235 (from, to),
1236 // Normal lifecycle
1237 (Added, Starting)
1238 | (Added, Stopped) // immediate deactivation without starting
1239 | (Stopped, Starting)
1240 | (Starting, Running)
1241 | (Starting, Error)
1242 | (Starting, Stopped) // aborted start
1243 | (Running, Stopping)
1244 | (Running, Stopped) // direct stop (async channel may skip Stopping)
1245 | (Running, Error)
1246 | (Stopping, Stopped)
1247 | (Stopping, Error)
1248 // Error recovery
1249 | (Error, Starting) // retry
1250 | (Error, Stopped) // reset
1251 // Reconfiguration (from any stable state)
1252 | (Added, Reconfiguring)
1253 | (Stopped, Reconfiguring)
1254 | (Running, Reconfiguring)
1255 | (Error, Reconfiguring)
1256 | (Reconfiguring, Stopped)
1257 | (Reconfiguring, Starting)
1258 | (Reconfiguring, Error)
1259 )
1260}
1261
1262/// Produce a human-readable error message for an invalid transition.
1263///
1264/// These messages provide actionable feedback (e.g., "Component is already running"
1265/// instead of just "invalid transition").
1266fn describe_invalid_transition(id: &str, from: &ComponentStatus, to: &ComponentStatus) -> String {
1267 use ComponentStatus::*;
1268 match (from, to) {
1269 // Trying to start something that's already starting/running
1270 (Starting, Starting) => format!("Component '{id}' is already starting"),
1271 (Running, Starting) => format!("Component '{id}' is already running"),
1272 (Stopping, Starting) => {
1273 format!("Cannot start component '{id}' while it is stopping")
1274 }
1275 (Reconfiguring, Starting) => {
1276 // Reconfiguring → Starting is actually valid, so this shouldn't be reached,
1277 // but kept for safety
1278 format!("Cannot start component '{id}' while it is reconfiguring")
1279 }
1280 // Trying to stop something that's already stopped/stopping
1281 (Stopped, Stopping) => {
1282 format!("Cannot stop component '{id}': it is already stopped")
1283 }
1284 (Stopping, Stopping) => format!("Component '{id}' is already stopping"),
1285 (Error, Stopping) => {
1286 format!("Cannot stop component '{id}': it is in error state")
1287 }
1288 // Trying to reconfigure during a transition
1289 (Starting, Reconfiguring) => {
1290 format!("Cannot reconfigure component '{id}' while it is starting")
1291 }
1292 (Stopping, Reconfiguring) => {
1293 format!("Cannot reconfigure component '{id}' while it is stopping")
1294 }
1295 (Reconfiguring, Reconfiguring) => {
1296 format!("Component '{id}' is already reconfiguring")
1297 }
1298 // Generic fallback
1299 _ => format!("Invalid state transition for component '{id}': {from:?} → {to:?}"),
1300 }
1301}
1302
1303impl std::fmt::Debug for ComponentGraph {
1304 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1305 f.debug_struct("ComponentGraph")
1306 .field("instance_id", &self.instance_id())
1307 .field("node_count", &self.node_count())
1308 .field("edge_count", &self.edge_count())
1309 .finish()
1310 }
1311}