drasi_lib/component_graph/node.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use serde::{Deserialize, Serialize};
19use tokio::sync::{mpsc, RwLock};
20
21use crate::channels::{ComponentStatus, ComponentType};
22
23// ============================================================================
24// Component Update Messages (mpsc fan-in from components to graph)
25// ============================================================================
26
27/// A status or metric update sent from a component to the graph update loop.
28///
29/// Components hold a cloned `mpsc::Sender<ComponentUpdate>` and call `send()` or
30/// `try_send()` to report status changes without acquiring any locks. The graph
31/// update loop is the sole consumer.
32#[derive(Debug, Clone)]
33pub enum ComponentUpdate {
34 /// A component status change (e.g., Running, Error, Stopped).
35 Status {
36 /// The component ID reporting the update
37 component_id: String,
38 /// The new status
39 status: ComponentStatus,
40 /// Optional human-readable message
41 message: Option<String>,
42 },
43 // Future variants:
44 // Metric { component_id: String, name: String, value: f64 },
45 // LifecycleTransition { component_id: String, from: ComponentStatus, to: ComponentStatus },
46}
47
48/// Sender half of the component update channel.
49///
50/// Cloned and given to each component's `SourceBase`/`ReactionBase`/`QueryBase`.
51/// Uses `send().await` which applies backpressure if the channel is full,
52/// ensuring status transitions are never silently dropped.
53pub type ComponentUpdateSender = mpsc::Sender<ComponentUpdate>;
54
55/// Receiver half of the component update channel.
56///
57/// Owned by the graph update loop task, which is the sole consumer.
58pub type ComponentUpdateReceiver = mpsc::Receiver<ComponentUpdate>;
59
60/// A clonable handle for reading, writing, and reporting component status.
61///
62/// `ComponentStatusHandle` owns the component's local status (`Arc<RwLock<ComponentStatus>>`)
63/// **and** the mpsc sender to the graph update loop. This means a single cloned handle is
64/// all a spawned task needs to both read the current status and update it (with automatic
65/// graph notification).
66///
67/// # Obtaining a handle
68///
69/// Source and reaction plugins obtain this from their base class:
70/// ```ignore
71/// let handle = self.base.status_handle();
72/// ```
73///
74/// # Usage in spawned tasks
75///
76/// ```ignore
77/// let handle = self.base.status_handle();
78/// tokio::spawn(async move {
79/// let current = handle.get_status().await;
80/// if let Err(e) = do_work().await {
81/// handle.set_status(ComponentStatus::Error, Some(format!("Failed: {e}"))).await;
82/// }
83/// });
84/// ```
85#[derive(Clone)]
86pub struct ComponentStatusHandle {
87 component_id: String,
88 status: Arc<RwLock<ComponentStatus>>,
89 update_tx: Arc<tokio::sync::OnceCell<ComponentUpdateSender>>,
90}
91
92impl ComponentStatusHandle {
93 /// Create a handle without a graph channel.
94 ///
95 /// The handle is fully functional for local status reads/writes immediately.
96 /// Call [`wire`] later to connect to the graph update loop.
97 pub fn new(component_id: impl Into<String>) -> Self {
98 Self {
99 component_id: component_id.into(),
100 status: Arc::new(RwLock::new(ComponentStatus::Stopped)),
101 update_tx: Arc::new(tokio::sync::OnceCell::new()),
102 }
103 }
104
105 /// Create a handle already connected to the graph update loop.
106 ///
107 /// Use this when the update channel is available at construction time
108 /// (e.g., in `QueryManager` where queries are created with full context).
109 pub fn new_wired(component_id: impl Into<String>, update_tx: ComponentUpdateSender) -> Self {
110 let cell = tokio::sync::OnceCell::new();
111 let _ = cell.set(update_tx);
112 Self {
113 component_id: component_id.into(),
114 status: Arc::new(RwLock::new(ComponentStatus::Stopped)),
115 update_tx: Arc::new(cell),
116 }
117 }
118
119 /// Connect this handle to the graph update loop.
120 ///
121 /// After wiring, every [`set_status`](Self::set_status) call will also
122 /// send a fire-and-forget notification to the graph.
123 pub async fn wire(&self, update_tx: ComponentUpdateSender) {
124 let _ = self.update_tx.set(update_tx);
125 }
126
127 /// Set the component's status — updates local state AND notifies the graph.
128 ///
129 /// This is the single canonical way to change a component's status. It writes
130 /// to the local `Arc<RwLock<ComponentStatus>>` and sends the update to the
131 /// graph update loop (if wired). The send awaits backpressure if the channel
132 /// is full, ensuring status transitions are never silently dropped.
133 pub async fn set_status(&self, status: ComponentStatus, message: Option<String>) {
134 // Update local state first, then release the lock before sending
135 // to avoid holding the RwLock during a potential channel backpressure wait.
136 {
137 *self.status.write().await = status;
138 }
139
140 if let Some(tx) = self.update_tx.get() {
141 if let Err(e) = tx
142 .send(ComponentUpdate::Status {
143 component_id: self.component_id.clone(),
144 status,
145 message,
146 })
147 .await
148 {
149 log::warn!(
150 "Status update for '{}' dropped (channel closed): {e}",
151 self.component_id
152 );
153 }
154 }
155 }
156
157 /// Read the current status.
158 pub async fn get_status(&self) -> ComponentStatus {
159 *self.status.read().await
160 }
161}
162
163// ============================================================================
164// Graph Type Definitions
165// ============================================================================
166
167/// Type of component in the graph
168#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
169pub enum ComponentKind {
170 /// The DrasiLib instance itself (root node)
171 Instance,
172 /// A data source
173 Source,
174 /// A continuous query
175 Query,
176 /// A reaction/output
177 Reaction,
178 /// A bootstrap provider
179 BootstrapProvider,
180 /// An identity provider
181 IdentityProvider,
182}
183
184impl std::fmt::Display for ComponentKind {
185 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 match self {
187 ComponentKind::Instance => write!(f, "instance"),
188 ComponentKind::Source => write!(f, "source"),
189 ComponentKind::Query => write!(f, "query"),
190 ComponentKind::Reaction => write!(f, "reaction"),
191 ComponentKind::BootstrapProvider => write!(f, "bootstrap_provider"),
192 ComponentKind::IdentityProvider => write!(f, "identity_provider"),
193 }
194 }
195}
196
197impl ComponentKind {
198 /// Convert to [`ComponentType`] for event emission.
199 ///
200 /// Returns `None` only for the Instance kind (the root node), which has no
201 /// corresponding event type. All other component kinds map to a `ComponentType`.
202 pub fn to_component_type(&self) -> Option<ComponentType> {
203 match self {
204 ComponentKind::Source => Some(ComponentType::Source),
205 ComponentKind::Query => Some(ComponentType::Query),
206 ComponentKind::Reaction => Some(ComponentType::Reaction),
207 ComponentKind::BootstrapProvider => Some(ComponentType::BootstrapProvider),
208 ComponentKind::IdentityProvider => Some(ComponentType::IdentityProvider),
209 ComponentKind::Instance => None,
210 }
211 }
212}
213
214/// Type of relationship between components (bidirectional pairs)
215#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
216pub enum RelationshipKind {
217 // Ownership
218 /// Instance → Component
219 Owns,
220 /// Component → Instance
221 OwnedBy,
222
223 // Data flow
224 /// Source → Query, or Query → Reaction
225 Feeds,
226 /// Query → Source, or Reaction → Query
227 SubscribesTo,
228
229 // Bootstrap
230 /// BootstrapProvider → Source
231 Bootstraps,
232 /// Source → BootstrapProvider
233 BootstrappedBy,
234
235 // Authentication
236 /// IdentityProvider → Source/Reaction
237 Authenticates,
238 /// Source/Reaction → IdentityProvider
239 AuthenticatedBy,
240}
241
242impl RelationshipKind {
243 /// Get the reverse of this relationship
244 pub fn reverse(&self) -> Self {
245 match self {
246 RelationshipKind::Owns => RelationshipKind::OwnedBy,
247 RelationshipKind::OwnedBy => RelationshipKind::Owns,
248 RelationshipKind::Feeds => RelationshipKind::SubscribesTo,
249 RelationshipKind::SubscribesTo => RelationshipKind::Feeds,
250 RelationshipKind::Bootstraps => RelationshipKind::BootstrappedBy,
251 RelationshipKind::BootstrappedBy => RelationshipKind::Bootstraps,
252 RelationshipKind::Authenticates => RelationshipKind::AuthenticatedBy,
253 RelationshipKind::AuthenticatedBy => RelationshipKind::Authenticates,
254 }
255 }
256}
257
258// ============================================================================
259// Graph Node Data
260// ============================================================================
261
262/// A node in the component graph.
263///
264/// Contains metadata (ID, kind, status, properties) for each component.
265/// Runtime component instances (`Arc<dyn Source>`, etc.) are stored in the
266/// managers' HashMaps alongside the graph, keyed by the same component ID.
267/// The graph and HashMaps are always updated together in the same code paths.
268#[derive(Debug, Clone, Serialize, Deserialize)]
269pub struct ComponentNode {
270 /// Unique identifier for this component
271 pub id: String,
272 /// Type of component
273 pub kind: ComponentKind,
274 /// Current lifecycle status
275 pub status: ComponentStatus,
276 /// Optional metadata (e.g., source_type, query_language, reaction_type)
277 #[serde(default)]
278 pub metadata: HashMap<String, String>,
279}
280
281// ============================================================================
282// Serializable Graph Snapshot
283// ============================================================================
284
285/// Serializable snapshot of the entire component graph.
286///
287/// Used for API responses, UI visualization, and debugging.
288/// Contains only metadata — no runtime component instances.
289#[derive(Debug, Clone, Serialize, Deserialize)]
290pub struct GraphSnapshot {
291 /// The instance ID (root node)
292 pub instance_id: String,
293 /// All nodes in the graph
294 pub nodes: Vec<ComponentNode>,
295 /// All edges in the graph
296 pub edges: Vec<GraphEdge>,
297}
298
299/// A serializable edge in the graph snapshot
300#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct GraphEdge {
302 /// Source component ID
303 pub from: String,
304 /// Target component ID
305 pub to: String,
306 /// Type of relationship
307 pub relationship: RelationshipKind,
308}