Skip to main content

drasi_lib/channels/
events.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::profiling::ProfilingMetadata;
16use drasi_core::models::SourceChange;
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::{broadcast, mpsc};
21
22/// Trait for types that have a timestamp, required for priority queue ordering
23pub trait Timestamped {
24    fn timestamp(&self) -> chrono::DateTime<chrono::Utc>;
25}
26
27/// Type of Drasi component
28///
29/// Used for identifying component types in events and monitoring.
30#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
31pub enum ComponentType {
32    Source,
33    Query,
34    Reaction,
35}
36
37/// Execution status of a Drasi component
38///
39/// `ComponentStatus` represents the current lifecycle state of sources, queries, and reactions.
40/// Components transition through these states during their lifecycle, from creation through
41/// execution to shutdown.
42///
43/// # Status Lifecycle
44///
45/// A typical component lifecycle follows this progression:
46///
47/// ```text
48/// Stopped → Starting → Running → Stopping → Stopped
49///                ↓
50///              Error
51/// ```
52///
53/// # Status Values
54///
55/// - **Starting**: Component is initializing (connecting to resources, loading data, etc.)
56/// - **Running**: Component is actively processing (ingesting, querying, or delivering)
57/// - **Stopping**: Component is shutting down gracefully
58/// - **Stopped**: Component is not running (initial or final state)
59/// - **Error**: Component encountered an error and cannot continue (see error_message)
60///
61/// # Usage
62///
63/// Status is available through runtime information methods on [`DrasiLib`](crate::DrasiLib):
64///
65/// - [`get_source_status()`](crate::DrasiLib::get_source_status)
66/// - [`get_query_status()`](crate::DrasiLib::get_query_status)
67/// - [`get_reaction_status()`](crate::DrasiLib::get_reaction_status)
68///
69/// And through runtime info structs:
70///
71/// - [`SourceRuntime`](crate::SourceRuntime)
72/// - [`QueryRuntime`](crate::QueryRuntime)
73/// - [`ReactionRuntime`](crate::ReactionRuntime)
74///
75/// # Examples
76///
77/// ## Monitoring Component Status
78///
79/// ```no_run
80/// use drasi_lib::{DrasiLib, ComponentStatus};
81///
82/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
83/// let core = DrasiLib::builder().with_id("my-server").build().await?;
84/// core.start().await?;
85///
86/// // Check source status
87/// let source_status = core.get_source_status("orders_db").await?;
88/// match source_status {
89///     ComponentStatus::Running => println!("Source is running"),
90///     ComponentStatus::Error => println!("Source has errors"),
91///     ComponentStatus::Starting => println!("Source is starting up"),
92///     _ => println!("Source status: {:?}", source_status),
93/// }
94///
95/// // Get detailed info with status
96/// let source_info = core.get_source_info("orders_db").await?;
97/// if source_info.status == ComponentStatus::Error {
98///     if let Some(error) = source_info.error_message {
99///         eprintln!("Error: {}", error);
100///     }
101/// }
102/// # Ok(())
103/// # }
104/// ```
105///
106/// ## Waiting for Component to Start
107///
108/// ```no_run
109/// use drasi_lib::{DrasiLib, ComponentStatus};
110/// use tokio::time::{sleep, Duration};
111///
112/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
113/// let core = DrasiLib::builder().with_id("my-server").build().await?;
114/// core.start_source("orders_db").await?;
115///
116/// // Poll until source is running
117/// loop {
118///     let status = core.get_source_status("orders_db").await?;
119///     match status {
120///         ComponentStatus::Running => break,
121///         ComponentStatus::Error => return Err("Source failed to start".into()),
122///         _ => sleep(Duration::from_millis(100)).await,
123///     }
124/// }
125/// println!("Source is now running");
126/// # Ok(())
127/// # }
128/// ```
129///
130/// ## Checking All Components
131///
132/// ```no_run
133/// use drasi_lib::{DrasiLib, ComponentStatus};
134///
135/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
136/// let core = DrasiLib::builder().with_id("my-server").build().await?;
137/// core.start().await?;
138///
139/// // Check all sources
140/// let sources = core.list_sources().await?;
141/// for (id, status) in sources {
142///     println!("Source {}: {:?}", id, status);
143/// }
144///
145/// // Check all queries
146/// let queries = core.list_queries().await?;
147/// for (id, status) in queries {
148///     println!("Query {}: {:?}", id, status);
149/// }
150///
151/// // Check all reactions
152/// let reactions = core.list_reactions().await?;
153/// for (id, status) in reactions {
154///     println!("Reaction {}: {:?}", id, status);
155/// }
156/// # Ok(())
157/// # }
158/// ```
159#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
160pub enum ComponentStatus {
161    Starting,
162    Running,
163    Stopping,
164    Stopped,
165    Error,
166}
167
168#[derive(Debug, Clone)]
169pub struct SourceChangeEvent {
170    pub source_id: String,
171    pub change: SourceChange,
172    pub timestamp: chrono::DateTime<chrono::Utc>,
173}
174
175/// Control events from sources for query coordination
176#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
177pub enum SourceControl {
178    /// Query subscription control event
179    Subscription {
180        query_id: String,
181        query_node_id: String,
182        node_labels: Vec<String>,
183        rel_labels: Vec<String>,
184        operation: ControlOperation,
185    },
186}
187
188/// Control operation types
189#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
190pub enum ControlOperation {
191    Insert,
192    Update,
193    Delete,
194}
195
196/// Unified event envelope carrying both data changes and control messages
197#[derive(Debug, Clone)]
198pub enum SourceEvent {
199    /// Data change event from source
200    Change(SourceChange),
201    /// Control event for query coordination
202    Control(SourceControl),
203    /// Bootstrap start marker for a specific query
204    BootstrapStart { query_id: String },
205    /// Bootstrap end marker for a specific query
206    BootstrapEnd { query_id: String },
207}
208
209/// Wrapper for source events with metadata
210#[derive(Debug, Clone)]
211pub struct SourceEventWrapper {
212    pub source_id: String,
213    pub event: SourceEvent,
214    pub timestamp: chrono::DateTime<chrono::Utc>,
215    /// Optional profiling metadata for performance tracking
216    pub profiling: Option<ProfilingMetadata>,
217}
218
219impl SourceEventWrapper {
220    /// Create a new SourceEventWrapper without profiling
221    pub fn new(
222        source_id: String,
223        event: SourceEvent,
224        timestamp: chrono::DateTime<chrono::Utc>,
225    ) -> Self {
226        Self {
227            source_id,
228            event,
229            timestamp,
230            profiling: None,
231        }
232    }
233
234    /// Create a new SourceEventWrapper with profiling metadata
235    pub fn with_profiling(
236        source_id: String,
237        event: SourceEvent,
238        timestamp: chrono::DateTime<chrono::Utc>,
239        profiling: ProfilingMetadata,
240    ) -> Self {
241        Self {
242            source_id,
243            event,
244            timestamp,
245            profiling: Some(profiling),
246        }
247    }
248
249    /// Consume this wrapper and return its components.
250    /// This enables zero-copy extraction when the wrapper has sole ownership.
251    pub fn into_parts(
252        self,
253    ) -> (
254        String,
255        SourceEvent,
256        chrono::DateTime<chrono::Utc>,
257        Option<ProfilingMetadata>,
258    ) {
259        (self.source_id, self.event, self.timestamp, self.profiling)
260    }
261
262    /// Try to extract components from an Arc<SourceEventWrapper>.
263    /// Uses Arc::try_unwrap to avoid cloning when we have sole ownership.
264    /// Returns Ok with owned components if sole owner, Err with Arc back if shared.
265    ///
266    /// This enables zero-copy in Channel dispatch mode (single consumer per event)
267    /// while still working correctly in Broadcast mode (cloning required).
268    pub fn try_unwrap_arc(
269        arc_self: Arc<Self>,
270    ) -> Result<
271        (
272            String,
273            SourceEvent,
274            chrono::DateTime<chrono::Utc>,
275            Option<ProfilingMetadata>,
276        ),
277        Arc<Self>,
278    > {
279        Arc::try_unwrap(arc_self).map(|wrapper| wrapper.into_parts())
280    }
281}
282
283// Implement Timestamped for SourceEventWrapper for use in generic priority queue
284impl Timestamped for SourceEventWrapper {
285    fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
286        self.timestamp
287    }
288}
289
290/// Arc-wrapped SourceEventWrapper for zero-copy distribution
291pub type ArcSourceEvent = Arc<SourceEventWrapper>;
292
293/// Bootstrap event wrapper for dedicated bootstrap channels
294#[derive(Debug, Clone)]
295pub struct BootstrapEvent {
296    pub source_id: String,
297    pub change: SourceChange,
298    pub timestamp: chrono::DateTime<chrono::Utc>,
299    pub sequence: u64,
300}
301
302/// Bootstrap completion signal
303#[derive(Debug, Clone)]
304pub struct BootstrapComplete {
305    pub source_id: String,
306    pub total_events: u64,
307}
308
309/// Subscription request from Query to Source
310#[derive(Debug, Clone)]
311pub struct SubscriptionRequest {
312    pub query_id: String,
313    pub source_id: String,
314    pub enable_bootstrap: bool,
315    pub node_labels: Vec<String>,
316    pub relation_labels: Vec<String>,
317}
318
319/// Subscription response from Source to Query
320pub struct SubscriptionResponse {
321    pub query_id: String,
322    pub source_id: String,
323    pub receiver: Box<dyn super::ChangeReceiver<SourceEventWrapper>>,
324    pub bootstrap_receiver: Option<BootstrapEventReceiver>,
325}
326
327/// Subscription response from Query to Reaction
328pub struct QuerySubscriptionResponse {
329    pub query_id: String,
330    pub receiver: Box<dyn super::ChangeReceiver<QueryResult>>,
331}
332
333/// Typed result diff emitted by continuous queries.
334#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
335#[serde(tag = "type")]
336pub enum ResultDiff {
337    #[serde(rename = "ADD")]
338    Add { data: serde_json::Value },
339    #[serde(rename = "DELETE")]
340    Delete { data: serde_json::Value },
341    #[serde(rename = "UPDATE")]
342    Update {
343        data: serde_json::Value,
344        before: serde_json::Value,
345        after: serde_json::Value,
346        #[serde(skip_serializing_if = "Option::is_none")]
347        grouping_keys: Option<Vec<String>>,
348    },
349    #[serde(rename = "aggregation")]
350    Aggregation {
351        before: Option<serde_json::Value>,
352        after: serde_json::Value,
353    },
354    #[serde(rename = "noop")]
355    Noop,
356}
357
358#[derive(Debug, Clone, Serialize, Deserialize)]
359pub struct QueryResult {
360    pub query_id: String,
361    pub timestamp: chrono::DateTime<chrono::Utc>,
362    pub results: Vec<ResultDiff>,
363    pub metadata: HashMap<String, serde_json::Value>,
364    /// Optional profiling metadata for performance tracking
365    #[serde(skip_serializing_if = "Option::is_none")]
366    pub profiling: Option<ProfilingMetadata>,
367}
368
369impl QueryResult {
370    /// Create a new QueryResult without profiling
371    pub fn new(
372        query_id: String,
373        timestamp: chrono::DateTime<chrono::Utc>,
374        results: Vec<ResultDiff>,
375        metadata: HashMap<String, serde_json::Value>,
376    ) -> Self {
377        Self {
378            query_id,
379            timestamp,
380            results,
381            metadata,
382            profiling: None,
383        }
384    }
385
386    /// Create a new QueryResult with profiling metadata
387    pub fn with_profiling(
388        query_id: String,
389        timestamp: chrono::DateTime<chrono::Utc>,
390        results: Vec<ResultDiff>,
391        metadata: HashMap<String, serde_json::Value>,
392        profiling: ProfilingMetadata,
393    ) -> Self {
394        Self {
395            query_id,
396            timestamp,
397            results,
398            metadata,
399            profiling: Some(profiling),
400        }
401    }
402}
403
404// Implement Timestamped for QueryResult for use in generic priority queue
405impl Timestamped for QueryResult {
406    fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
407        self.timestamp
408    }
409}
410
411/// Arc-wrapped QueryResult for zero-copy distribution
412pub type ArcQueryResult = Arc<QueryResult>;
413
414#[derive(Debug, Clone, Serialize, Deserialize)]
415pub struct ComponentEvent {
416    pub component_id: String,
417    pub component_type: ComponentType,
418    pub status: ComponentStatus,
419    pub timestamp: chrono::DateTime<chrono::Utc>,
420    pub message: Option<String>,
421}
422
423#[derive(Debug, Clone, Serialize, Deserialize)]
424pub enum ControlMessage {
425    Start(String),
426    Stop(String),
427    Status(String),
428    Shutdown,
429}
430
431pub type ComponentEventReceiver = mpsc::Receiver<ComponentEvent>;
432pub type ComponentEventSender = mpsc::Sender<ComponentEvent>;
433pub type ControlMessageReceiver = mpsc::Receiver<ControlMessage>;
434pub type ControlMessageSender = mpsc::Sender<ControlMessage>;
435
436// Broadcast channel types for zero-copy event distribution
437pub type SourceBroadcastSender = broadcast::Sender<ArcSourceEvent>;
438pub type SourceBroadcastReceiver = broadcast::Receiver<ArcSourceEvent>;
439
440// Broadcast channel types for zero-copy query result distribution
441pub type QueryResultBroadcastSender = broadcast::Sender<ArcQueryResult>;
442pub type QueryResultBroadcastReceiver = broadcast::Receiver<ArcQueryResult>;
443
444// Bootstrap channel types for dedicated bootstrap data delivery
445pub type BootstrapEventSender = mpsc::Sender<BootstrapEvent>;
446pub type BootstrapEventReceiver = mpsc::Receiver<BootstrapEvent>;
447pub type BootstrapCompleteSender = mpsc::Sender<BootstrapComplete>;
448pub type BootstrapCompleteReceiver = mpsc::Receiver<BootstrapComplete>;
449
450/// Control signals for coordination
451#[derive(Debug, Clone, Serialize, Deserialize)]
452pub enum ControlSignal {
453    /// Query has entered running state
454    Running { query_id: String },
455    /// Query has stopped
456    Stopped { query_id: String },
457    /// Query has been deleted
458    Deleted { query_id: String },
459}
460
461/// Wrapper for control signals with metadata
462#[derive(Debug, Clone)]
463pub struct ControlSignalWrapper {
464    pub signal: ControlSignal,
465    pub timestamp: chrono::DateTime<chrono::Utc>,
466    pub sequence_number: Option<u64>,
467}
468
469impl ControlSignalWrapper {
470    pub fn new(signal: ControlSignal) -> Self {
471        Self {
472            signal,
473            timestamp: chrono::Utc::now(),
474            sequence_number: None,
475        }
476    }
477
478    pub fn with_sequence(signal: ControlSignal, sequence_number: u64) -> Self {
479        Self {
480            signal,
481            timestamp: chrono::Utc::now(),
482            sequence_number: Some(sequence_number),
483        }
484    }
485}
486
487pub type ControlSignalReceiver = mpsc::Receiver<ControlSignalWrapper>;
488pub type ControlSignalSender = mpsc::Sender<ControlSignalWrapper>;
489
490pub struct EventChannels {
491    pub component_event_tx: ComponentEventSender,
492    pub _control_tx: ControlMessageSender,
493    pub control_signal_tx: ControlSignalSender,
494}
495
496pub struct EventReceivers {
497    pub component_event_rx: ComponentEventReceiver,
498    pub _control_rx: ControlMessageReceiver,
499    pub control_signal_rx: ControlSignalReceiver,
500}
501
502impl EventChannels {
503    pub fn new() -> (Self, EventReceivers) {
504        let (component_event_tx, component_event_rx) = mpsc::channel(1000);
505        let (control_tx, control_rx) = mpsc::channel(100);
506        let (control_signal_tx, control_signal_rx) = mpsc::channel(100);
507
508        let channels = Self {
509            component_event_tx,
510            _control_tx: control_tx,
511            control_signal_tx,
512        };
513
514        let receivers = EventReceivers {
515            component_event_rx,
516            _control_rx: control_rx,
517            control_signal_rx,
518        };
519
520        (channels, receivers)
521    }
522}
523
524#[cfg(test)]
525mod tests {
526    use super::*;
527    use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
528
529    fn create_test_source_change() -> SourceChange {
530        let element = Element::Node {
531            metadata: ElementMetadata {
532                reference: ElementReference::new("TestSource", "test-node-1"),
533                labels: vec!["TestLabel".into()].into(),
534                effective_from: 1000,
535            },
536            properties: Default::default(),
537        };
538        SourceChange::Insert { element }
539    }
540
541    #[test]
542    fn test_source_event_wrapper_into_parts() {
543        let change = create_test_source_change();
544        let wrapper = SourceEventWrapper::new(
545            "test-source".to_string(),
546            SourceEvent::Change(change),
547            chrono::Utc::now(),
548        );
549
550        let (source_id, event, _timestamp, profiling) = wrapper.into_parts();
551
552        assert_eq!(source_id, "test-source");
553        assert!(matches!(event, SourceEvent::Change(_)));
554        assert!(profiling.is_none());
555    }
556
557    #[test]
558    fn test_try_unwrap_arc_sole_owner() {
559        let change = create_test_source_change();
560        let wrapper = SourceEventWrapper::new(
561            "test-source".to_string(),
562            SourceEvent::Change(change),
563            chrono::Utc::now(),
564        );
565        let arc = Arc::new(wrapper);
566
567        // With sole ownership, try_unwrap_arc should succeed
568        let result = SourceEventWrapper::try_unwrap_arc(arc);
569        assert!(result.is_ok());
570
571        let (source_id, event, _timestamp, _profiling) = result.unwrap();
572        assert_eq!(source_id, "test-source");
573        assert!(matches!(event, SourceEvent::Change(_)));
574    }
575
576    #[test]
577    fn test_try_unwrap_arc_shared() {
578        let change = create_test_source_change();
579        let wrapper = SourceEventWrapper::new(
580            "test-source".to_string(),
581            SourceEvent::Change(change),
582            chrono::Utc::now(),
583        );
584        let arc = Arc::new(wrapper);
585        let _arc2 = arc.clone(); // Create another reference
586
587        // With shared ownership, try_unwrap_arc should fail and return the Arc
588        let result = SourceEventWrapper::try_unwrap_arc(arc);
589        assert!(result.is_err());
590
591        // The returned Arc should still be valid
592        let returned_arc = result.unwrap_err();
593        assert_eq!(returned_arc.source_id, "test-source");
594    }
595
596    #[test]
597    fn test_zero_copy_extraction_path() {
598        // Simulate the zero-copy extraction path used in query processing
599        let change = create_test_source_change();
600        let wrapper = SourceEventWrapper::new(
601            "test-source".to_string(),
602            SourceEvent::Change(change),
603            chrono::Utc::now(),
604        );
605        let arc = Arc::new(wrapper);
606
607        // This is the zero-copy path - when we have sole ownership
608        let (source_id, event, _timestamp, _profiling) =
609            match SourceEventWrapper::try_unwrap_arc(arc) {
610                Ok(parts) => parts,
611                Err(arc) => {
612                    // Fallback to cloning (would be needed in broadcast mode)
613                    (
614                        arc.source_id.clone(),
615                        arc.event.clone(),
616                        arc.timestamp,
617                        arc.profiling.clone(),
618                    )
619                }
620            };
621
622        // Extract SourceChange from owned event (no clone!)
623        let source_change = match event {
624            SourceEvent::Change(change) => Some(change),
625            _ => None,
626        };
627
628        assert_eq!(source_id, "test-source");
629        assert!(source_change.is_some());
630    }
631}