Skip to main content

drasi_lib/channels/
events.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::profiling::ProfilingMetadata;
16use drasi_core::models::SourceChange;
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::{broadcast, mpsc};
21
22/// Trait for types that have a timestamp, required for priority queue ordering
23pub trait Timestamped {
24    fn timestamp(&self) -> chrono::DateTime<chrono::Utc>;
25}
26
27/// Type of Drasi component
28///
29/// Used for identifying component types in events and monitoring.
30#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
31pub enum ComponentType {
32    Source,
33    Query,
34    Reaction,
35    BootstrapProvider,
36    IdentityProvider,
37}
38
39/// Execution status of a Drasi component
40///
41/// `ComponentStatus` represents the current lifecycle state of sources, queries, and reactions.
42/// Components transition through these states during their lifecycle, from creation through
43/// execution to shutdown.
44///
45/// # Status Lifecycle
46///
47/// A typical component lifecycle follows this progression:
48///
49/// ```text
50/// Stopped → Starting → Running → Stopping → Stopped
51///                ↓
52///              Error
53/// ```
54///
55/// # Status Values
56///
57/// - **Starting**: Component is initializing (connecting to resources, loading data, etc.)
58/// - **Running**: Component is actively processing (ingesting, querying, or delivering)
59/// - **Stopping**: Component is shutting down gracefully
60/// - **Stopped**: Component is not running (initial or final state)
61/// - **Error**: Component encountered an error and cannot continue (see error_message)
62///
63/// # Usage
64///
65/// Status is available through runtime information methods on [`DrasiLib`](crate::DrasiLib):
66///
67/// - [`get_source_status()`](crate::DrasiLib::get_source_status)
68/// - [`get_query_status()`](crate::DrasiLib::get_query_status)
69/// - [`get_reaction_status()`](crate::DrasiLib::get_reaction_status)
70///
71/// And through runtime info structs:
72///
73/// - [`SourceRuntime`](crate::SourceRuntime)
74/// - [`QueryRuntime`](crate::QueryRuntime)
75/// - [`ReactionRuntime`](crate::ReactionRuntime)
76///
77/// # Examples
78///
79/// ## Monitoring Component Status
80///
81/// ```no_run
82/// use drasi_lib::{DrasiLib, ComponentStatus};
83///
84/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
85/// let core = DrasiLib::builder().with_id("my-server").build().await?;
86/// core.start().await?;
87///
88/// // Check source status
89/// let source_status = core.get_source_status("orders_db").await?;
90/// match source_status {
91///     ComponentStatus::Running => println!("Source is running"),
92///     ComponentStatus::Error => println!("Source has errors"),
93///     ComponentStatus::Starting => println!("Source is starting up"),
94///     _ => println!("Source status: {:?}", source_status),
95/// }
96///
97/// // Get detailed info with status
98/// let source_info = core.get_source_info("orders_db").await?;
99/// if source_info.status == ComponentStatus::Error {
100///     if let Some(error) = source_info.error_message {
101///         eprintln!("Error: {}", error);
102///     }
103/// }
104/// # Ok(())
105/// # }
106/// ```
107///
108/// ## Waiting for Component to Start
109///
110/// ```no_run
111/// use drasi_lib::{DrasiLib, ComponentStatus};
112/// use tokio::time::{sleep, Duration};
113///
114/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
115/// let core = DrasiLib::builder().with_id("my-server").build().await?;
116/// core.start_source("orders_db").await?;
117///
118/// // Poll until source is running
119/// loop {
120///     let status = core.get_source_status("orders_db").await?;
121///     match status {
122///         ComponentStatus::Running => break,
123///         ComponentStatus::Error => return Err("Source failed to start".into()),
124///         _ => sleep(Duration::from_millis(100)).await,
125///     }
126/// }
127/// println!("Source is now running");
128/// # Ok(())
129/// # }
130/// ```
131///
132/// ## Checking All Components
133///
134/// ```no_run
135/// use drasi_lib::{DrasiLib, ComponentStatus};
136///
137/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
138/// let core = DrasiLib::builder().with_id("my-server").build().await?;
139/// core.start().await?;
140///
141/// // Check all sources
142/// let sources = core.list_sources().await?;
143/// for (id, status) in sources {
144///     println!("Source {}: {:?}", id, status);
145/// }
146///
147/// // Check all queries
148/// let queries = core.list_queries().await?;
149/// for (id, status) in queries {
150///     println!("Query {}: {:?}", id, status);
151/// }
152///
153/// // Check all reactions
154/// let reactions = core.list_reactions().await?;
155/// for (id, status) in reactions {
156///     println!("Reaction {}: {:?}", id, status);
157/// }
158/// # Ok(())
159/// # }
160/// ```
161#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
162pub enum ComponentStatus {
163    Starting,
164    Running,
165    Stopping,
166    Stopped,
167    Reconfiguring,
168    Error,
169}
170
171/// A source change event with metadata for dispatching to queries.
172#[derive(Debug, Clone)]
173pub struct SourceChangeEvent {
174    pub source_id: String,
175    pub change: SourceChange,
176    pub timestamp: chrono::DateTime<chrono::Utc>,
177}
178
179/// Control events from sources for query coordination
180#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
181pub enum SourceControl {
182    /// Query subscription control event
183    Subscription {
184        query_id: String,
185        query_node_id: String,
186        node_labels: Vec<String>,
187        rel_labels: Vec<String>,
188        operation: ControlOperation,
189    },
190    /// Signal from FutureQueueSource that one or more future items are due.
191    FuturesDue,
192}
193
194/// Control operation types
195#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
196pub enum ControlOperation {
197    Insert,
198    Update,
199    Delete,
200}
201
202/// Unified event envelope carrying both data changes and control messages
203#[derive(Debug, Clone)]
204pub enum SourceEvent {
205    /// Data change event from source
206    Change(SourceChange),
207    /// Control event for query coordination
208    Control(SourceControl),
209}
210
211/// Wrapper for source events with metadata
212#[derive(Debug, Clone)]
213pub struct SourceEventWrapper {
214    pub source_id: String,
215    pub event: SourceEvent,
216    pub timestamp: chrono::DateTime<chrono::Utc>,
217    /// Optional profiling metadata for performance tracking
218    pub profiling: Option<ProfilingMetadata>,
219}
220
221impl SourceEventWrapper {
222    /// Create a new SourceEventWrapper without profiling
223    pub fn new(
224        source_id: String,
225        event: SourceEvent,
226        timestamp: chrono::DateTime<chrono::Utc>,
227    ) -> Self {
228        Self {
229            source_id,
230            event,
231            timestamp,
232            profiling: None,
233        }
234    }
235
236    /// Create a new SourceEventWrapper with profiling metadata
237    pub fn with_profiling(
238        source_id: String,
239        event: SourceEvent,
240        timestamp: chrono::DateTime<chrono::Utc>,
241        profiling: ProfilingMetadata,
242    ) -> Self {
243        Self {
244            source_id,
245            event,
246            timestamp,
247            profiling: Some(profiling),
248        }
249    }
250
251    /// Consume this wrapper and return its components.
252    /// This enables zero-copy extraction when the wrapper has sole ownership.
253    pub fn into_parts(
254        self,
255    ) -> (
256        String,
257        SourceEvent,
258        chrono::DateTime<chrono::Utc>,
259        Option<ProfilingMetadata>,
260    ) {
261        (self.source_id, self.event, self.timestamp, self.profiling)
262    }
263
264    /// Try to extract components from an Arc<SourceEventWrapper>.
265    /// Uses Arc::try_unwrap to avoid cloning when we have sole ownership.
266    /// Returns Ok with owned components if sole owner, Err with Arc back if shared.
267    ///
268    /// This enables zero-copy in Channel dispatch mode (single consumer per event)
269    /// while still working correctly in Broadcast mode (cloning required).
270    pub fn try_unwrap_arc(
271        arc_self: Arc<Self>,
272    ) -> Result<
273        (
274            String,
275            SourceEvent,
276            chrono::DateTime<chrono::Utc>,
277            Option<ProfilingMetadata>,
278        ),
279        Arc<Self>,
280    > {
281        Arc::try_unwrap(arc_self).map(|wrapper| wrapper.into_parts())
282    }
283}
284
285// Implement Timestamped for SourceEventWrapper for use in generic priority queue
286impl Timestamped for SourceEventWrapper {
287    fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
288        self.timestamp
289    }
290}
291
292/// Arc-wrapped SourceEventWrapper for zero-copy distribution
293pub type ArcSourceEvent = Arc<SourceEventWrapper>;
294
295/// Bootstrap event wrapper for dedicated bootstrap channels
296#[derive(Debug, Clone)]
297pub struct BootstrapEvent {
298    pub source_id: String,
299    pub change: SourceChange,
300    pub timestamp: chrono::DateTime<chrono::Utc>,
301    pub sequence: u64,
302}
303
304/// Bootstrap completion signal
305#[derive(Debug, Clone)]
306pub struct BootstrapComplete {
307    pub source_id: String,
308    pub total_events: u64,
309}
310
311/// Subscription request from Query to Source
312#[derive(Debug, Clone)]
313pub struct SubscriptionRequest {
314    pub query_id: String,
315    pub source_id: String,
316    pub enable_bootstrap: bool,
317    pub node_labels: Vec<String>,
318    pub relation_labels: Vec<String>,
319}
320
321/// Subscription response from Source to Query
322pub struct SubscriptionResponse {
323    pub query_id: String,
324    pub source_id: String,
325    pub receiver: Box<dyn super::ChangeReceiver<SourceEventWrapper>>,
326    pub bootstrap_receiver: Option<BootstrapEventReceiver>,
327}
328
329/// Subscription response from Query to Reaction
330pub struct QuerySubscriptionResponse {
331    pub query_id: String,
332    pub receiver: Box<dyn super::ChangeReceiver<QueryResult>>,
333}
334
335/// Typed result diff emitted by continuous queries.
336#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
337#[serde(tag = "type")]
338pub enum ResultDiff {
339    #[serde(rename = "ADD")]
340    Add { data: serde_json::Value },
341    #[serde(rename = "DELETE")]
342    Delete { data: serde_json::Value },
343    #[serde(rename = "UPDATE")]
344    Update {
345        data: serde_json::Value,
346        before: serde_json::Value,
347        after: serde_json::Value,
348        #[serde(skip_serializing_if = "Option::is_none")]
349        grouping_keys: Option<Vec<String>>,
350    },
351    #[serde(rename = "aggregation")]
352    Aggregation {
353        before: Option<serde_json::Value>,
354        after: serde_json::Value,
355    },
356    #[serde(rename = "noop")]
357    Noop,
358}
359
360/// Result emitted by a continuous query when data changes.
361///
362/// Contains the diff (added, updated, deleted rows) plus metadata and
363/// optional profiling information. Dispatched to reactions via the priority queue.
364#[derive(Debug, Clone, Serialize, Deserialize)]
365pub struct QueryResult {
366    pub query_id: String,
367    pub timestamp: chrono::DateTime<chrono::Utc>,
368    pub results: Vec<ResultDiff>,
369    pub metadata: HashMap<String, serde_json::Value>,
370    /// Optional profiling metadata for performance tracking
371    #[serde(skip_serializing_if = "Option::is_none")]
372    pub profiling: Option<ProfilingMetadata>,
373}
374
375impl QueryResult {
376    /// Create a new QueryResult without profiling
377    pub fn new(
378        query_id: String,
379        timestamp: chrono::DateTime<chrono::Utc>,
380        results: Vec<ResultDiff>,
381        metadata: HashMap<String, serde_json::Value>,
382    ) -> Self {
383        Self {
384            query_id,
385            timestamp,
386            results,
387            metadata,
388            profiling: None,
389        }
390    }
391
392    /// Create a new QueryResult with profiling metadata
393    pub fn with_profiling(
394        query_id: String,
395        timestamp: chrono::DateTime<chrono::Utc>,
396        results: Vec<ResultDiff>,
397        metadata: HashMap<String, serde_json::Value>,
398        profiling: ProfilingMetadata,
399    ) -> Self {
400        Self {
401            query_id,
402            timestamp,
403            results,
404            metadata,
405            profiling: Some(profiling),
406        }
407    }
408}
409
410// Implement Timestamped for QueryResult for use in generic priority queue
411impl Timestamped for QueryResult {
412    fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
413        self.timestamp
414    }
415}
416
417/// Arc-wrapped QueryResult for zero-copy distribution
418pub type ArcQueryResult = Arc<QueryResult>;
419
420/// Lifecycle event emitted when a component's status changes.
421///
422/// Broadcast via the component event channel to all subscribers.
423/// Used for monitoring, logging, and reactive lifecycle coordination.
424#[derive(Debug, Clone, Serialize, Deserialize)]
425pub struct ComponentEvent {
426    pub component_id: String,
427    pub component_type: ComponentType,
428    pub status: ComponentStatus,
429    pub timestamp: chrono::DateTime<chrono::Utc>,
430    pub message: Option<String>,
431}
432
433/// Control messages for component lifecycle management.
434#[derive(Debug, Clone, Serialize, Deserialize)]
435pub enum ControlMessage {
436    Start(String),
437    Stop(String),
438    Status(String),
439    Shutdown,
440}
441
442pub type ComponentEventBroadcastSender = broadcast::Sender<ComponentEvent>;
443pub type ComponentEventBroadcastReceiver = broadcast::Receiver<ComponentEvent>;
444/// Backward-compatible mpsc channel types used by host-sdk plugin callbacks.
445/// New code should use `ComponentUpdateSender` from `component_graph` instead.
446pub type ComponentEventSender = mpsc::Sender<ComponentEvent>;
447pub type ComponentEventReceiver = mpsc::Receiver<ComponentEvent>;
448pub type ControlMessageReceiver = mpsc::Receiver<ControlMessage>;
449pub type ControlMessageSender = mpsc::Sender<ControlMessage>;
450
451// Broadcast channel types for zero-copy event distribution
452pub type SourceBroadcastSender = broadcast::Sender<ArcSourceEvent>;
453pub type SourceBroadcastReceiver = broadcast::Receiver<ArcSourceEvent>;
454
455// Broadcast channel types for zero-copy query result distribution
456pub type QueryResultBroadcastSender = broadcast::Sender<ArcQueryResult>;
457pub type QueryResultBroadcastReceiver = broadcast::Receiver<ArcQueryResult>;
458
459// Bootstrap channel types for dedicated bootstrap data delivery
460pub type BootstrapEventSender = mpsc::Sender<BootstrapEvent>;
461pub type BootstrapEventReceiver = mpsc::Receiver<BootstrapEvent>;
462pub type BootstrapCompleteSender = mpsc::Sender<BootstrapComplete>;
463pub type BootstrapCompleteReceiver = mpsc::Receiver<BootstrapComplete>;
464
465/// Control signals for coordination
466#[derive(Debug, Clone, Serialize, Deserialize)]
467pub enum ControlSignal {
468    /// Query has entered running state
469    Running { query_id: String },
470    /// Query has stopped
471    Stopped { query_id: String },
472    /// Query has been deleted
473    Deleted { query_id: String },
474}
475
476/// Wrapper for control signals with metadata
477#[derive(Debug, Clone)]
478pub struct ControlSignalWrapper {
479    pub signal: ControlSignal,
480    pub timestamp: chrono::DateTime<chrono::Utc>,
481    pub sequence_number: Option<u64>,
482}
483
484impl ControlSignalWrapper {
485    pub fn new(signal: ControlSignal) -> Self {
486        Self {
487            signal,
488            timestamp: chrono::Utc::now(),
489            sequence_number: None,
490        }
491    }
492
493    pub fn with_sequence(signal: ControlSignal, sequence_number: u64) -> Self {
494        Self {
495            signal,
496            timestamp: chrono::Utc::now(),
497            sequence_number: Some(sequence_number),
498        }
499    }
500}
501
502pub type ControlSignalReceiver = mpsc::Receiver<ControlSignalWrapper>;
503pub type ControlSignalSender = mpsc::Sender<ControlSignalWrapper>;
504
505pub struct EventChannels {
506    pub _control_tx: ControlMessageSender,
507    pub control_signal_tx: ControlSignalSender,
508}
509
510pub struct EventReceivers {
511    pub _control_rx: ControlMessageReceiver,
512    pub control_signal_rx: ControlSignalReceiver,
513}
514
515impl EventChannels {
516    pub fn new() -> (Self, EventReceivers) {
517        let (control_tx, control_rx) = mpsc::channel(100);
518        let (control_signal_tx, control_signal_rx) = mpsc::channel(100);
519
520        let channels = Self {
521            _control_tx: control_tx,
522            control_signal_tx,
523        };
524
525        let receivers = EventReceivers {
526            _control_rx: control_rx,
527            control_signal_rx,
528        };
529
530        (channels, receivers)
531    }
532}
533
534#[cfg(test)]
535mod tests {
536    use super::*;
537    use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
538
539    fn create_test_source_change() -> SourceChange {
540        let element = Element::Node {
541            metadata: ElementMetadata {
542                reference: ElementReference::new("TestSource", "test-node-1"),
543                labels: vec!["TestLabel".into()].into(),
544                effective_from: 1000,
545            },
546            properties: Default::default(),
547        };
548        SourceChange::Insert { element }
549    }
550
551    #[test]
552    fn test_source_event_wrapper_into_parts() {
553        let change = create_test_source_change();
554        let wrapper = SourceEventWrapper::new(
555            "test-source".to_string(),
556            SourceEvent::Change(change),
557            chrono::Utc::now(),
558        );
559
560        let (source_id, event, _timestamp, profiling) = wrapper.into_parts();
561
562        assert_eq!(source_id, "test-source");
563        assert!(matches!(event, SourceEvent::Change(_)));
564        assert!(profiling.is_none());
565    }
566
567    #[test]
568    fn test_try_unwrap_arc_sole_owner() {
569        let change = create_test_source_change();
570        let wrapper = SourceEventWrapper::new(
571            "test-source".to_string(),
572            SourceEvent::Change(change),
573            chrono::Utc::now(),
574        );
575        let arc = Arc::new(wrapper);
576
577        // With sole ownership, try_unwrap_arc should succeed
578        let result = SourceEventWrapper::try_unwrap_arc(arc);
579        assert!(result.is_ok());
580
581        let (source_id, event, _timestamp, _profiling) = result.unwrap();
582        assert_eq!(source_id, "test-source");
583        assert!(matches!(event, SourceEvent::Change(_)));
584    }
585
586    #[test]
587    fn test_try_unwrap_arc_shared() {
588        let change = create_test_source_change();
589        let wrapper = SourceEventWrapper::new(
590            "test-source".to_string(),
591            SourceEvent::Change(change),
592            chrono::Utc::now(),
593        );
594        let arc = Arc::new(wrapper);
595        let _arc2 = arc.clone(); // Create another reference
596
597        // With shared ownership, try_unwrap_arc should fail and return the Arc
598        let result = SourceEventWrapper::try_unwrap_arc(arc);
599        assert!(result.is_err());
600
601        // The returned Arc should still be valid
602        let returned_arc = result.unwrap_err();
603        assert_eq!(returned_arc.source_id, "test-source");
604    }
605
606    #[test]
607    fn test_zero_copy_extraction_path() {
608        // Simulate the zero-copy extraction path used in query processing
609        let change = create_test_source_change();
610        let wrapper = SourceEventWrapper::new(
611            "test-source".to_string(),
612            SourceEvent::Change(change),
613            chrono::Utc::now(),
614        );
615        let arc = Arc::new(wrapper);
616
617        // This is the zero-copy path - when we have sole ownership
618        let (source_id, event, _timestamp, _profiling) =
619            match SourceEventWrapper::try_unwrap_arc(arc) {
620                Ok(parts) => parts,
621                Err(arc) => {
622                    // Fallback to cloning (would be needed in broadcast mode)
623                    (
624                        arc.source_id.clone(),
625                        arc.event.clone(),
626                        arc.timestamp,
627                        arc.profiling.clone(),
628                    )
629                }
630            };
631
632        // Extract SourceChange from owned event (no clone!)
633        let source_change = match event {
634            SourceEvent::Change(change) => Some(change),
635            _ => None,
636        };
637
638        assert_eq!(source_id, "test-source");
639        assert!(source_change.is_some());
640    }
641}