Skip to main content

drasi_lib/component_graph/
node.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use serde::{Deserialize, Serialize};
19use tokio::sync::{mpsc, RwLock};
20
21use crate::channels::{ComponentStatus, ComponentType};
22
23// ============================================================================
24// Component Update Messages (mpsc fan-in from components to graph)
25// ============================================================================
26
27/// A status or metric update sent from a component to the graph update loop.
28///
29/// Components hold a cloned `mpsc::Sender<ComponentUpdate>` and call `send()` or
30/// `try_send()` to report status changes without acquiring any locks. The graph
31/// update loop is the sole consumer.
32#[derive(Debug, Clone)]
33pub enum ComponentUpdate {
34    /// A component status change (e.g., Running, Error, Stopped).
35    Status {
36        /// The component ID reporting the update
37        component_id: String,
38        /// The new status
39        status: ComponentStatus,
40        /// Optional human-readable message
41        message: Option<String>,
42    },
43    // Future variants:
44    // Metric { component_id: String, name: String, value: f64 },
45    // LifecycleTransition { component_id: String, from: ComponentStatus, to: ComponentStatus },
46}
47
48/// Sender half of the component update channel.
49///
50/// Cloned and given to each component's `SourceBase`/`ReactionBase`/`QueryBase`.
51/// Uses `send().await` which applies backpressure if the channel is full,
52/// ensuring status transitions are never silently dropped.
53pub type ComponentUpdateSender = mpsc::Sender<ComponentUpdate>;
54
55/// Receiver half of the component update channel.
56///
57/// Owned by the graph update loop task, which is the sole consumer.
58pub type ComponentUpdateReceiver = mpsc::Receiver<ComponentUpdate>;
59
60/// A clonable handle for reading, writing, and reporting component status.
61///
62/// `ComponentStatusHandle` owns the component's local status (`Arc<RwLock<ComponentStatus>>`)
63/// **and** the mpsc sender to the graph update loop. This means a single cloned handle is
64/// all a spawned task needs to both read the current status and update it (with automatic
65/// graph notification).
66///
67/// # Obtaining a handle
68///
69/// Source and reaction plugins obtain this from their base class:
70/// ```ignore
71/// let handle = self.base.status_handle();
72/// ```
73///
74/// # Usage in spawned tasks
75///
76/// ```ignore
77/// let handle = self.base.status_handle();
78/// tokio::spawn(async move {
79///     let current = handle.get_status().await;
80///     if let Err(e) = do_work().await {
81///         handle.set_status(ComponentStatus::Error, Some(format!("Failed: {e}"))).await;
82///     }
83/// });
84/// ```
85#[derive(Clone)]
86pub struct ComponentStatusHandle {
87    component_id: String,
88    status: Arc<RwLock<ComponentStatus>>,
89    update_tx: Arc<tokio::sync::OnceCell<ComponentUpdateSender>>,
90}
91
92impl ComponentStatusHandle {
93    /// Create a handle without a graph channel.
94    ///
95    /// The handle is fully functional for local status reads/writes immediately.
96    /// Call [`wire`] later to connect to the graph update loop.
97    pub fn new(component_id: impl Into<String>) -> Self {
98        Self {
99            component_id: component_id.into(),
100            status: Arc::new(RwLock::new(ComponentStatus::Stopped)),
101            update_tx: Arc::new(tokio::sync::OnceCell::new()),
102        }
103    }
104
105    /// Create a handle already connected to the graph update loop.
106    ///
107    /// Use this when the update channel is available at construction time
108    /// (e.g., in `QueryManager` where queries are created with full context).
109    pub fn new_wired(component_id: impl Into<String>, update_tx: ComponentUpdateSender) -> Self {
110        let cell = tokio::sync::OnceCell::new();
111        let _ = cell.set(update_tx);
112        Self {
113            component_id: component_id.into(),
114            status: Arc::new(RwLock::new(ComponentStatus::Stopped)),
115            update_tx: Arc::new(cell),
116        }
117    }
118
119    /// Connect this handle to the graph update loop.
120    ///
121    /// After wiring, every [`set_status`](Self::set_status) call will also
122    /// send a fire-and-forget notification to the graph.
123    pub async fn wire(&self, update_tx: ComponentUpdateSender) {
124        let _ = self.update_tx.set(update_tx);
125    }
126
127    /// Set the component's status — updates local state AND notifies the graph.
128    ///
129    /// This is the single canonical way to change a component's status. It writes
130    /// to the local `Arc<RwLock<ComponentStatus>>` and sends the update to the
131    /// graph update loop (if wired). The send awaits backpressure if the channel
132    /// is full, ensuring status transitions are never silently dropped.
133    pub async fn set_status(&self, status: ComponentStatus, message: Option<String>) {
134        // Update local state first, then release the lock before sending
135        // to avoid holding the RwLock during a potential channel backpressure wait.
136        {
137            *self.status.write().await = status;
138        }
139
140        if let Some(tx) = self.update_tx.get() {
141            if let Err(e) = tx
142                .send(ComponentUpdate::Status {
143                    component_id: self.component_id.clone(),
144                    status,
145                    message,
146                })
147                .await
148            {
149                log::warn!(
150                    "Status update for '{}' dropped (channel closed): {e}",
151                    self.component_id
152                );
153            }
154        }
155    }
156
157    /// Read the current status.
158    pub async fn get_status(&self) -> ComponentStatus {
159        *self.status.read().await
160    }
161}
162
163// ============================================================================
164// Graph Type Definitions
165// ============================================================================
166
167/// Type of component in the graph
168#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
169pub enum ComponentKind {
170    /// The DrasiLib instance itself (root node)
171    Instance,
172    /// A data source
173    Source,
174    /// A continuous query
175    Query,
176    /// A reaction/output
177    Reaction,
178    /// A bootstrap provider
179    BootstrapProvider,
180    /// An identity provider
181    IdentityProvider,
182}
183
184impl std::fmt::Display for ComponentKind {
185    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186        match self {
187            ComponentKind::Instance => write!(f, "instance"),
188            ComponentKind::Source => write!(f, "source"),
189            ComponentKind::Query => write!(f, "query"),
190            ComponentKind::Reaction => write!(f, "reaction"),
191            ComponentKind::BootstrapProvider => write!(f, "bootstrap_provider"),
192            ComponentKind::IdentityProvider => write!(f, "identity_provider"),
193        }
194    }
195}
196
197impl ComponentKind {
198    /// Convert to [`ComponentType`] for event emission.
199    ///
200    /// Returns `None` only for the Instance kind (the root node), which has no
201    /// corresponding event type. All other component kinds map to a `ComponentType`.
202    pub fn to_component_type(&self) -> Option<ComponentType> {
203        match self {
204            ComponentKind::Source => Some(ComponentType::Source),
205            ComponentKind::Query => Some(ComponentType::Query),
206            ComponentKind::Reaction => Some(ComponentType::Reaction),
207            ComponentKind::BootstrapProvider => Some(ComponentType::BootstrapProvider),
208            ComponentKind::IdentityProvider => Some(ComponentType::IdentityProvider),
209            ComponentKind::Instance => None,
210        }
211    }
212}
213
214/// Type of relationship between components (bidirectional pairs)
215#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
216pub enum RelationshipKind {
217    // Ownership
218    /// Instance → Component
219    Owns,
220    /// Component → Instance
221    OwnedBy,
222
223    // Data flow
224    /// Source → Query, or Query → Reaction
225    Feeds,
226    /// Query → Source, or Reaction → Query
227    SubscribesTo,
228
229    // Bootstrap
230    /// BootstrapProvider → Source
231    Bootstraps,
232    /// Source → BootstrapProvider
233    BootstrappedBy,
234
235    // Authentication
236    /// IdentityProvider → Source/Reaction
237    Authenticates,
238    /// Source/Reaction → IdentityProvider
239    AuthenticatedBy,
240}
241
242impl RelationshipKind {
243    /// Get the reverse of this relationship
244    pub fn reverse(&self) -> Self {
245        match self {
246            RelationshipKind::Owns => RelationshipKind::OwnedBy,
247            RelationshipKind::OwnedBy => RelationshipKind::Owns,
248            RelationshipKind::Feeds => RelationshipKind::SubscribesTo,
249            RelationshipKind::SubscribesTo => RelationshipKind::Feeds,
250            RelationshipKind::Bootstraps => RelationshipKind::BootstrappedBy,
251            RelationshipKind::BootstrappedBy => RelationshipKind::Bootstraps,
252            RelationshipKind::Authenticates => RelationshipKind::AuthenticatedBy,
253            RelationshipKind::AuthenticatedBy => RelationshipKind::Authenticates,
254        }
255    }
256}
257
258// ============================================================================
259// Graph Node Data
260// ============================================================================
261
262/// A node in the component graph.
263///
264/// Contains metadata (ID, kind, status, properties) for each component.
265/// Runtime component instances (`Arc<dyn Source>`, etc.) are stored in the
266/// managers' HashMaps alongside the graph, keyed by the same component ID.
267/// The graph and HashMaps are always updated together in the same code paths.
268#[derive(Debug, Clone, Serialize, Deserialize)]
269pub struct ComponentNode {
270    /// Unique identifier for this component
271    pub id: String,
272    /// Type of component
273    pub kind: ComponentKind,
274    /// Current lifecycle status
275    pub status: ComponentStatus,
276    /// Optional metadata (e.g., source_type, query_language, reaction_type)
277    #[serde(default)]
278    pub metadata: HashMap<String, String>,
279}
280
281// ============================================================================
282// Serializable Graph Snapshot
283// ============================================================================
284
285/// Serializable snapshot of the entire component graph.
286///
287/// Used for API responses, UI visualization, and debugging.
288/// Contains only metadata — no runtime component instances.
289#[derive(Debug, Clone, Serialize, Deserialize)]
290pub struct GraphSnapshot {
291    /// The instance ID (root node)
292    pub instance_id: String,
293    /// All nodes in the graph
294    pub nodes: Vec<ComponentNode>,
295    /// All edges in the graph
296    pub edges: Vec<GraphEdge>,
297}
298
299/// A serializable edge in the graph snapshot
300#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct GraphEdge {
302    /// Source component ID
303    pub from: String,
304    /// Target component ID
305    pub to: String,
306    /// Type of relationship
307    pub relationship: RelationshipKind,
308}