drasi_lib/component_graph/node.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use serde::{Deserialize, Serialize};
19use tokio::sync::{mpsc, RwLock};
20
21use crate::channels::{ComponentStatus, ComponentType};
22
23// ============================================================================
24// Component Update Messages (mpsc fan-in from components to graph)
25// ============================================================================
26
27/// A status or metric update sent from a component to the graph update loop.
28///
29/// Components hold a cloned `mpsc::Sender<ComponentUpdate>` and call `send()` or
30/// `try_send()` to report status changes without acquiring any locks. The graph
31/// update loop is the sole consumer.
32#[derive(Debug, Clone)]
33pub enum ComponentUpdate {
34 /// A component status change (e.g., Running, Error, Stopped).
35 Status {
36 /// The component ID reporting the update
37 component_id: String,
38 /// The new status
39 status: ComponentStatus,
40 /// Optional human-readable message
41 message: Option<String>,
42 },
43 // Future variants:
44 // Metric { component_id: String, name: String, value: f64 },
45 // LifecycleTransition { component_id: String, from: ComponentStatus, to: ComponentStatus },
46}
47
48/// Sender half of the component update channel.
49///
50/// Cloned and given to each component's `SourceBase`/`ReactionBase`/`QueryBase`.
51/// Uses `send().await` which applies backpressure if the channel is full,
52/// ensuring status transitions are never silently dropped.
53pub type ComponentUpdateSender = mpsc::Sender<ComponentUpdate>;
54
55/// Receiver half of the component update channel.
56///
57/// Owned by the graph update loop task, which is the sole consumer.
58pub type ComponentUpdateReceiver = mpsc::Receiver<ComponentUpdate>;
59
60/// A cloneable handle for reading, writing, and reporting component status.
61///
62/// `ComponentStatusHandle` owns the component's local status (`Arc<RwLock<ComponentStatus>>`)
63/// **and** the mpsc sender to the graph update loop. This means a single cloned handle is
64/// all a spawned task needs to both read the current status and update it (with automatic
65/// graph notification).
66///
67/// # Obtaining a handle
68///
69/// Source and reaction plugins obtain this from their base class:
70/// ```ignore
71/// let handle = self.base.status_handle();
72/// ```
73///
74/// # Usage in spawned tasks
75///
76/// ```ignore
77/// let handle = self.base.status_handle();
78/// tokio::spawn(async move {
79/// let current = handle.get_status().await;
80/// if let Err(e) = do_work().await {
81/// handle.set_status(ComponentStatus::Error, Some(format!("Failed: {e}"))).await;
82/// }
83/// });
84/// ```
85#[derive(Clone)]
86pub struct ComponentStatusHandle {
87 component_id: String,
88 status: Arc<RwLock<ComponentStatus>>,
89 update_tx: Arc<tokio::sync::OnceCell<ComponentUpdateSender>>,
90 /// Watch channel for event-driven status observation.
91 /// Updated on every `set_status` call, enabling zero-latency `wait_for` patterns.
92 status_watch_tx: Arc<tokio::sync::watch::Sender<ComponentStatus>>,
93}
94
95impl ComponentStatusHandle {
96 /// Create a handle without a graph channel.
97 ///
98 /// The handle is fully functional for local status reads/writes immediately.
99 /// Call [`wire`] later to connect to the graph update loop.
100 pub fn new(component_id: impl Into<String>) -> Self {
101 let (watch_tx, _) = tokio::sync::watch::channel(ComponentStatus::Stopped);
102 Self {
103 component_id: component_id.into(),
104 status: Arc::new(RwLock::new(ComponentStatus::Stopped)),
105 update_tx: Arc::new(tokio::sync::OnceCell::new()),
106 status_watch_tx: Arc::new(watch_tx),
107 }
108 }
109
110 /// Create a handle already connected to the graph update loop.
111 ///
112 /// Use this when the update channel is available at construction time
113 /// (e.g., in `QueryManager` where queries are created with full context).
114 pub fn new_wired(component_id: impl Into<String>, update_tx: ComponentUpdateSender) -> Self {
115 let cell = tokio::sync::OnceCell::new();
116 let _ = cell.set(update_tx);
117 let (watch_tx, _) = tokio::sync::watch::channel(ComponentStatus::Stopped);
118 Self {
119 component_id: component_id.into(),
120 status: Arc::new(RwLock::new(ComponentStatus::Stopped)),
121 update_tx: Arc::new(cell),
122 status_watch_tx: Arc::new(watch_tx),
123 }
124 }
125
126 /// Connect this handle to the graph update loop.
127 ///
128 /// After wiring, every [`set_status`](Self::set_status) call will also
129 /// send a fire-and-forget notification to the graph.
130 pub async fn wire(&self, update_tx: ComponentUpdateSender) {
131 let _ = self.update_tx.set(update_tx);
132 }
133
134 /// Set the component's status — updates local state AND notifies the graph.
135 ///
136 /// This is the single canonical way to change a component's status. It writes
137 /// to the local `Arc<RwLock<ComponentStatus>>` and sends the update to the
138 /// graph update loop (if wired). The send awaits backpressure if the channel
139 /// is full, ensuring status transitions are never silently dropped.
140 pub async fn set_status(&self, status: ComponentStatus, message: Option<String>) {
141 // Update local state first, then release the lock before sending
142 // to avoid holding the RwLock during a potential channel backpressure wait.
143 {
144 *self.status.write().await = status;
145 }
146
147 // Notify watch subscribers — send_modify always updates the stored value
148 // even when no receivers currently exist (unlike send() which is a no-op
149 // without receivers).
150 self.status_watch_tx.send_modify(|s| *s = status);
151
152 if let Some(tx) = self.update_tx.get() {
153 if let Err(e) = tx
154 .send(ComponentUpdate::Status {
155 component_id: self.component_id.clone(),
156 status,
157 message,
158 })
159 .await
160 {
161 log::warn!(
162 "Status update for '{}' dropped (channel closed): {e}",
163 self.component_id
164 );
165 }
166 }
167 }
168
169 /// Read the current status.
170 pub async fn get_status(&self) -> ComponentStatus {
171 *self.status.read().await
172 }
173
174 /// Subscribe to status changes via a `tokio::sync::watch::Receiver`.
175 ///
176 /// The receiver can be used with `wait_for()` for event-driven waiting
177 /// without polling. Each call returns a fresh receiver; multiple
178 /// subscribers are supported.
179 pub fn subscribe_status(&self) -> tokio::sync::watch::Receiver<ComponentStatus> {
180 self.status_watch_tx.subscribe()
181 }
182}
183
184// ============================================================================
185// Graph Type Definitions
186// ============================================================================
187
188/// Type of component in the graph
189#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
190pub enum ComponentKind {
191 /// The DrasiLib instance itself (root node)
192 Instance,
193 /// A data source
194 Source,
195 /// A continuous query
196 Query,
197 /// A reaction/output
198 Reaction,
199 /// A bootstrap provider
200 BootstrapProvider,
201 /// An identity provider
202 IdentityProvider,
203}
204
205impl std::fmt::Display for ComponentKind {
206 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207 match self {
208 ComponentKind::Instance => write!(f, "instance"),
209 ComponentKind::Source => write!(f, "source"),
210 ComponentKind::Query => write!(f, "query"),
211 ComponentKind::Reaction => write!(f, "reaction"),
212 ComponentKind::BootstrapProvider => write!(f, "bootstrap_provider"),
213 ComponentKind::IdentityProvider => write!(f, "identity_provider"),
214 }
215 }
216}
217
218impl ComponentKind {
219 /// Convert to [`ComponentType`] for event emission.
220 ///
221 /// Returns `None` only for the Instance kind (the root node), which has no
222 /// corresponding event type. All other component kinds map to a `ComponentType`.
223 pub fn to_component_type(&self) -> Option<ComponentType> {
224 match self {
225 ComponentKind::Source => Some(ComponentType::Source),
226 ComponentKind::Query => Some(ComponentType::Query),
227 ComponentKind::Reaction => Some(ComponentType::Reaction),
228 ComponentKind::BootstrapProvider => Some(ComponentType::BootstrapProvider),
229 ComponentKind::IdentityProvider => Some(ComponentType::IdentityProvider),
230 ComponentKind::Instance => None,
231 }
232 }
233}
234
235/// Type of relationship between components (bidirectional pairs)
236#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
237pub enum RelationshipKind {
238 // Ownership
239 /// Instance → Component
240 Owns,
241 /// Component → Instance
242 OwnedBy,
243
244 // Data flow
245 /// Source → Query, or Query → Reaction
246 Feeds,
247 /// Query → Source, or Reaction → Query
248 SubscribesTo,
249
250 // Bootstrap
251 /// BootstrapProvider → Source
252 Bootstraps,
253 /// Source → BootstrapProvider
254 BootstrappedBy,
255
256 // Authentication
257 /// IdentityProvider → Source/Reaction
258 Authenticates,
259 /// Source/Reaction → IdentityProvider
260 AuthenticatedBy,
261}
262
263impl RelationshipKind {
264 /// Get the reverse of this relationship
265 pub fn reverse(&self) -> Self {
266 match self {
267 RelationshipKind::Owns => RelationshipKind::OwnedBy,
268 RelationshipKind::OwnedBy => RelationshipKind::Owns,
269 RelationshipKind::Feeds => RelationshipKind::SubscribesTo,
270 RelationshipKind::SubscribesTo => RelationshipKind::Feeds,
271 RelationshipKind::Bootstraps => RelationshipKind::BootstrappedBy,
272 RelationshipKind::BootstrappedBy => RelationshipKind::Bootstraps,
273 RelationshipKind::Authenticates => RelationshipKind::AuthenticatedBy,
274 RelationshipKind::AuthenticatedBy => RelationshipKind::Authenticates,
275 }
276 }
277}
278
279// ============================================================================
280// Graph Node Data
281// ============================================================================
282
283/// A node in the component graph.
284///
285/// Contains metadata (ID, kind, status, properties) for each component.
286/// Runtime component instances (`Arc<dyn Source>`, etc.) are stored in the
287/// managers' HashMaps alongside the graph, keyed by the same component ID.
288/// The graph and HashMaps are always updated together in the same code paths.
289#[derive(Debug, Clone, Serialize, Deserialize)]
290pub struct ComponentNode {
291 /// Unique identifier for this component
292 pub id: String,
293 /// Type of component
294 pub kind: ComponentKind,
295 /// Current lifecycle status
296 pub status: ComponentStatus,
297 /// Optional metadata (e.g., source_type, query_language, reaction_type)
298 #[serde(default)]
299 pub metadata: HashMap<String, String>,
300}
301
302// ============================================================================
303// Serializable Graph Snapshot
304// ============================================================================
305
306/// Serializable snapshot of the entire component graph.
307///
308/// Used for API responses, UI visualization, and debugging.
309/// Contains only metadata — no runtime component instances.
310#[derive(Debug, Clone, Serialize, Deserialize)]
311pub struct GraphSnapshot {
312 /// The instance ID (root node)
313 pub instance_id: String,
314 /// All nodes in the graph
315 pub nodes: Vec<ComponentNode>,
316 /// All edges in the graph
317 pub edges: Vec<GraphEdge>,
318}
319
320/// A serializable edge in the graph snapshot
321#[derive(Debug, Clone, Serialize, Deserialize)]
322pub struct GraphEdge {
323 /// Source component ID
324 pub from: String,
325 /// Target component ID
326 pub to: String,
327 /// Type of relationship
328 pub relationship: RelationshipKind,
329}