Skip to main content

drasi_lib/component_graph/
node.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use serde::{Deserialize, Serialize};
19use tokio::sync::{mpsc, RwLock};
20
21use crate::channels::{ComponentStatus, ComponentType};
22
23// ============================================================================
24// Component Update Messages (mpsc fan-in from components to graph)
25// ============================================================================
26
27/// A status or metric update sent from a component to the graph update loop.
28///
29/// Components hold a cloned `mpsc::Sender<ComponentUpdate>` and call `send()` or
30/// `try_send()` to report status changes without acquiring any locks. The graph
31/// update loop is the sole consumer.
32#[derive(Debug, Clone)]
33pub enum ComponentUpdate {
34    /// A component status change (e.g., Running, Error, Stopped).
35    Status {
36        /// The component ID reporting the update
37        component_id: String,
38        /// The new status
39        status: ComponentStatus,
40        /// Optional human-readable message
41        message: Option<String>,
42    },
43    // Future variants:
44    // Metric { component_id: String, name: String, value: f64 },
45    // LifecycleTransition { component_id: String, from: ComponentStatus, to: ComponentStatus },
46}
47
48/// Sender half of the component update channel.
49///
50/// Cloned and given to each component's `SourceBase`/`ReactionBase`/`QueryBase`.
51/// Uses `send().await` which applies backpressure if the channel is full,
52/// ensuring status transitions are never silently dropped.
53pub type ComponentUpdateSender = mpsc::Sender<ComponentUpdate>;
54
55/// Receiver half of the component update channel.
56///
57/// Owned by the graph update loop task, which is the sole consumer.
58pub type ComponentUpdateReceiver = mpsc::Receiver<ComponentUpdate>;
59
60/// A clonable handle for reading, writing, and reporting component status.
61///
62/// `ComponentStatusHandle` owns the component's local status (`Arc<RwLock<ComponentStatus>>`)
63/// **and** the mpsc sender to the graph update loop. This means a single cloned handle is
64/// all a spawned task needs to both read the current status and update it (with automatic
65/// graph notification).
66///
67/// # Obtaining a handle
68///
69/// Source and reaction plugins obtain this from their base class:
70/// ```ignore
71/// let handle = self.base.status_handle();
72/// ```
73///
74/// # Usage in spawned tasks
75///
76/// ```ignore
77/// let handle = self.base.status_handle();
78/// tokio::spawn(async move {
79///     let current = handle.get_status().await;
80///     if let Err(e) = do_work().await {
81///         handle.set_status(ComponentStatus::Error, Some(format!("Failed: {e}"))).await;
82///     }
83/// });
84/// ```
85#[derive(Clone)]
86pub struct ComponentStatusHandle {
87    component_id: String,
88    status: Arc<RwLock<ComponentStatus>>,
89    update_tx: Arc<tokio::sync::OnceCell<ComponentUpdateSender>>,
90    /// Watch channel for event-driven status observation.
91    /// Updated on every `set_status` call, enabling zero-latency `wait_for` patterns.
92    status_watch_tx: Arc<tokio::sync::watch::Sender<ComponentStatus>>,
93}
94
95impl ComponentStatusHandle {
96    /// Create a handle without a graph channel.
97    ///
98    /// The handle is fully functional for local status reads/writes immediately.
99    /// Call [`wire`] later to connect to the graph update loop.
100    pub fn new(component_id: impl Into<String>) -> Self {
101        let (watch_tx, _) = tokio::sync::watch::channel(ComponentStatus::Stopped);
102        Self {
103            component_id: component_id.into(),
104            status: Arc::new(RwLock::new(ComponentStatus::Stopped)),
105            update_tx: Arc::new(tokio::sync::OnceCell::new()),
106            status_watch_tx: Arc::new(watch_tx),
107        }
108    }
109
110    /// Create a handle already connected to the graph update loop.
111    ///
112    /// Use this when the update channel is available at construction time
113    /// (e.g., in `QueryManager` where queries are created with full context).
114    pub fn new_wired(component_id: impl Into<String>, update_tx: ComponentUpdateSender) -> Self {
115        let cell = tokio::sync::OnceCell::new();
116        let _ = cell.set(update_tx);
117        let (watch_tx, _) = tokio::sync::watch::channel(ComponentStatus::Stopped);
118        Self {
119            component_id: component_id.into(),
120            status: Arc::new(RwLock::new(ComponentStatus::Stopped)),
121            update_tx: Arc::new(cell),
122            status_watch_tx: Arc::new(watch_tx),
123        }
124    }
125
126    /// Connect this handle to the graph update loop.
127    ///
128    /// After wiring, every [`set_status`](Self::set_status) call will also
129    /// send a fire-and-forget notification to the graph.
130    pub async fn wire(&self, update_tx: ComponentUpdateSender) {
131        let _ = self.update_tx.set(update_tx);
132    }
133
134    /// Set the component's status — updates local state AND notifies the graph.
135    ///
136    /// This is the single canonical way to change a component's status. It writes
137    /// to the local `Arc<RwLock<ComponentStatus>>` and sends the update to the
138    /// graph update loop (if wired). The send awaits backpressure if the channel
139    /// is full, ensuring status transitions are never silently dropped.
140    pub async fn set_status(&self, status: ComponentStatus, message: Option<String>) {
141        // Update local state first, then release the lock before sending
142        // to avoid holding the RwLock during a potential channel backpressure wait.
143        {
144            *self.status.write().await = status;
145        }
146
147        // Notify watch subscribers — send_modify always updates the stored value
148        // even when no receivers currently exist (unlike send() which is a no-op
149        // without receivers).
150        self.status_watch_tx.send_modify(|s| *s = status);
151
152        if let Some(tx) = self.update_tx.get() {
153            if let Err(e) = tx
154                .send(ComponentUpdate::Status {
155                    component_id: self.component_id.clone(),
156                    status,
157                    message,
158                })
159                .await
160            {
161                log::warn!(
162                    "Status update for '{}' dropped (channel closed): {e}",
163                    self.component_id
164                );
165            }
166        }
167    }
168
169    /// Read the current status.
170    pub async fn get_status(&self) -> ComponentStatus {
171        *self.status.read().await
172    }
173
174    /// Subscribe to status changes via a `tokio::sync::watch::Receiver`.
175    ///
176    /// The receiver can be used with `wait_for()` for event-driven waiting
177    /// without polling. Each call returns a fresh receiver; multiple
178    /// subscribers are supported.
179    pub fn subscribe_status(&self) -> tokio::sync::watch::Receiver<ComponentStatus> {
180        self.status_watch_tx.subscribe()
181    }
182}
183
184// ============================================================================
185// Graph Type Definitions
186// ============================================================================
187
188/// Type of component in the graph
189#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
190pub enum ComponentKind {
191    /// The DrasiLib instance itself (root node)
192    Instance,
193    /// A data source
194    Source,
195    /// A continuous query
196    Query,
197    /// A reaction/output
198    Reaction,
199    /// A bootstrap provider
200    BootstrapProvider,
201    /// An identity provider
202    IdentityProvider,
203}
204
205impl std::fmt::Display for ComponentKind {
206    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207        match self {
208            ComponentKind::Instance => write!(f, "instance"),
209            ComponentKind::Source => write!(f, "source"),
210            ComponentKind::Query => write!(f, "query"),
211            ComponentKind::Reaction => write!(f, "reaction"),
212            ComponentKind::BootstrapProvider => write!(f, "bootstrap_provider"),
213            ComponentKind::IdentityProvider => write!(f, "identity_provider"),
214        }
215    }
216}
217
218impl ComponentKind {
219    /// Convert to [`ComponentType`] for event emission.
220    ///
221    /// Returns `None` only for the Instance kind (the root node), which has no
222    /// corresponding event type. All other component kinds map to a `ComponentType`.
223    pub fn to_component_type(&self) -> Option<ComponentType> {
224        match self {
225            ComponentKind::Source => Some(ComponentType::Source),
226            ComponentKind::Query => Some(ComponentType::Query),
227            ComponentKind::Reaction => Some(ComponentType::Reaction),
228            ComponentKind::BootstrapProvider => Some(ComponentType::BootstrapProvider),
229            ComponentKind::IdentityProvider => Some(ComponentType::IdentityProvider),
230            ComponentKind::Instance => None,
231        }
232    }
233}
234
235/// Type of relationship between components (bidirectional pairs)
236#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
237pub enum RelationshipKind {
238    // Ownership
239    /// Instance → Component
240    Owns,
241    /// Component → Instance
242    OwnedBy,
243
244    // Data flow
245    /// Source → Query, or Query → Reaction
246    Feeds,
247    /// Query → Source, or Reaction → Query
248    SubscribesTo,
249
250    // Bootstrap
251    /// BootstrapProvider → Source
252    Bootstraps,
253    /// Source → BootstrapProvider
254    BootstrappedBy,
255
256    // Authentication
257    /// IdentityProvider → Source/Reaction
258    Authenticates,
259    /// Source/Reaction → IdentityProvider
260    AuthenticatedBy,
261}
262
263impl RelationshipKind {
264    /// Get the reverse of this relationship
265    pub fn reverse(&self) -> Self {
266        match self {
267            RelationshipKind::Owns => RelationshipKind::OwnedBy,
268            RelationshipKind::OwnedBy => RelationshipKind::Owns,
269            RelationshipKind::Feeds => RelationshipKind::SubscribesTo,
270            RelationshipKind::SubscribesTo => RelationshipKind::Feeds,
271            RelationshipKind::Bootstraps => RelationshipKind::BootstrappedBy,
272            RelationshipKind::BootstrappedBy => RelationshipKind::Bootstraps,
273            RelationshipKind::Authenticates => RelationshipKind::AuthenticatedBy,
274            RelationshipKind::AuthenticatedBy => RelationshipKind::Authenticates,
275        }
276    }
277}
278
279// ============================================================================
280// Graph Node Data
281// ============================================================================
282
283/// A node in the component graph.
284///
285/// Contains metadata (ID, kind, status, properties) for each component.
286/// Runtime component instances (`Arc<dyn Source>`, etc.) are stored in the
287/// managers' HashMaps alongside the graph, keyed by the same component ID.
288/// The graph and HashMaps are always updated together in the same code paths.
289#[derive(Debug, Clone, Serialize, Deserialize)]
290pub struct ComponentNode {
291    /// Unique identifier for this component
292    pub id: String,
293    /// Type of component
294    pub kind: ComponentKind,
295    /// Current lifecycle status
296    pub status: ComponentStatus,
297    /// Optional metadata (e.g., source_type, query_language, reaction_type)
298    #[serde(default)]
299    pub metadata: HashMap<String, String>,
300}
301
302// ============================================================================
303// Serializable Graph Snapshot
304// ============================================================================
305
306/// Serializable snapshot of the entire component graph.
307///
308/// Used for API responses, UI visualization, and debugging.
309/// Contains only metadata — no runtime component instances.
310#[derive(Debug, Clone, Serialize, Deserialize)]
311pub struct GraphSnapshot {
312    /// The instance ID (root node)
313    pub instance_id: String,
314    /// All nodes in the graph
315    pub nodes: Vec<ComponentNode>,
316    /// All edges in the graph
317    pub edges: Vec<GraphEdge>,
318}
319
320/// A serializable edge in the graph snapshot
321#[derive(Debug, Clone, Serialize, Deserialize)]
322pub struct GraphEdge {
323    /// Source component ID
324    pub from: String,
325    /// Target component ID
326    pub to: String,
327    /// Type of relationship
328    pub relationship: RelationshipKind,
329}