Skip to main content

drasi_lib/channels/
events.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::profiling::ProfilingMetadata;
16use drasi_core::models::SourceChange;
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::{broadcast, mpsc};
21
22/// Trait for types that have a timestamp, required for priority queue ordering
23pub trait Timestamped {
24    fn timestamp(&self) -> chrono::DateTime<chrono::Utc>;
25}
26
27/// Type of Drasi component
28///
29/// Used for identifying component types in events and monitoring.
30#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
31pub enum ComponentType {
32    Source,
33    Query,
34    Reaction,
35}
36
37/// Execution status of a Drasi component
38///
39/// `ComponentStatus` represents the current lifecycle state of sources, queries, and reactions.
40/// Components transition through these states during their lifecycle, from creation through
41/// execution to shutdown.
42///
43/// # Status Lifecycle
44///
45/// A typical component lifecycle follows this progression:
46///
47/// ```text
48/// Stopped → Starting → Running → Stopping → Stopped
49///                ↓
50///              Error
51/// ```
52///
53/// # Status Values
54///
55/// - **Starting**: Component is initializing (connecting to resources, loading data, etc.)
56/// - **Running**: Component is actively processing (ingesting, querying, or delivering)
57/// - **Stopping**: Component is shutting down gracefully
58/// - **Stopped**: Component is not running (initial or final state)
59/// - **Error**: Component encountered an error and cannot continue (see error_message)
60///
61/// # Usage
62///
63/// Status is available through runtime information methods on [`DrasiLib`](crate::DrasiLib):
64///
65/// - [`get_source_status()`](crate::DrasiLib::get_source_status)
66/// - [`get_query_status()`](crate::DrasiLib::get_query_status)
67/// - [`get_reaction_status()`](crate::DrasiLib::get_reaction_status)
68///
69/// And through runtime info structs:
70///
71/// - [`SourceRuntime`](crate::SourceRuntime)
72/// - [`QueryRuntime`](crate::QueryRuntime)
73/// - [`ReactionRuntime`](crate::ReactionRuntime)
74///
75/// # Examples
76///
77/// ## Monitoring Component Status
78///
79/// ```no_run
80/// use drasi_lib::{DrasiLib, ComponentStatus};
81///
82/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
83/// let core = DrasiLib::builder().with_id("my-server").build().await?;
84/// core.start().await?;
85///
86/// // Check source status
87/// let source_status = core.get_source_status("orders_db").await?;
88/// match source_status {
89///     ComponentStatus::Running => println!("Source is running"),
90///     ComponentStatus::Error => println!("Source has errors"),
91///     ComponentStatus::Starting => println!("Source is starting up"),
92///     _ => println!("Source status: {:?}", source_status),
93/// }
94///
95/// // Get detailed info with status
96/// let source_info = core.get_source_info("orders_db").await?;
97/// if source_info.status == ComponentStatus::Error {
98///     if let Some(error) = source_info.error_message {
99///         eprintln!("Error: {}", error);
100///     }
101/// }
102/// # Ok(())
103/// # }
104/// ```
105///
106/// ## Waiting for Component to Start
107///
108/// ```no_run
109/// use drasi_lib::{DrasiLib, ComponentStatus};
110/// use tokio::time::{sleep, Duration};
111///
112/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
113/// let core = DrasiLib::builder().with_id("my-server").build().await?;
114/// core.start_source("orders_db").await?;
115///
116/// // Poll until source is running
117/// loop {
118///     let status = core.get_source_status("orders_db").await?;
119///     match status {
120///         ComponentStatus::Running => break,
121///         ComponentStatus::Error => return Err("Source failed to start".into()),
122///         _ => sleep(Duration::from_millis(100)).await,
123///     }
124/// }
125/// println!("Source is now running");
126/// # Ok(())
127/// # }
128/// ```
129///
130/// ## Checking All Components
131///
132/// ```no_run
133/// use drasi_lib::{DrasiLib, ComponentStatus};
134///
135/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
136/// let core = DrasiLib::builder().with_id("my-server").build().await?;
137/// core.start().await?;
138///
139/// // Check all sources
140/// let sources = core.list_sources().await?;
141/// for (id, status) in sources {
142///     println!("Source {}: {:?}", id, status);
143/// }
144///
145/// // Check all queries
146/// let queries = core.list_queries().await?;
147/// for (id, status) in queries {
148///     println!("Query {}: {:?}", id, status);
149/// }
150///
151/// // Check all reactions
152/// let reactions = core.list_reactions().await?;
153/// for (id, status) in reactions {
154///     println!("Reaction {}: {:?}", id, status);
155/// }
156/// # Ok(())
157/// # }
158/// ```
159#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
160pub enum ComponentStatus {
161    Starting,
162    Running,
163    Stopping,
164    Stopped,
165    Reconfiguring,
166    Error,
167}
168
169#[derive(Debug, Clone)]
170pub struct SourceChangeEvent {
171    pub source_id: String,
172    pub change: SourceChange,
173    pub timestamp: chrono::DateTime<chrono::Utc>,
174}
175
176/// Control events from sources for query coordination
177#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
178pub enum SourceControl {
179    /// Query subscription control event
180    Subscription {
181        query_id: String,
182        query_node_id: String,
183        node_labels: Vec<String>,
184        rel_labels: Vec<String>,
185        operation: ControlOperation,
186    },
187}
188
189/// Control operation types
190#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
191pub enum ControlOperation {
192    Insert,
193    Update,
194    Delete,
195}
196
197/// Unified event envelope carrying both data changes and control messages
198#[derive(Debug, Clone)]
199pub enum SourceEvent {
200    /// Data change event from source
201    Change(SourceChange),
202    /// Control event for query coordination
203    Control(SourceControl),
204    /// Bootstrap start marker for a specific query
205    BootstrapStart { query_id: String },
206    /// Bootstrap end marker for a specific query
207    BootstrapEnd { query_id: String },
208}
209
210/// Wrapper for source events with metadata
211#[derive(Debug, Clone)]
212pub struct SourceEventWrapper {
213    pub source_id: String,
214    pub event: SourceEvent,
215    pub timestamp: chrono::DateTime<chrono::Utc>,
216    /// Optional profiling metadata for performance tracking
217    pub profiling: Option<ProfilingMetadata>,
218}
219
220impl SourceEventWrapper {
221    /// Create a new SourceEventWrapper without profiling
222    pub fn new(
223        source_id: String,
224        event: SourceEvent,
225        timestamp: chrono::DateTime<chrono::Utc>,
226    ) -> Self {
227        Self {
228            source_id,
229            event,
230            timestamp,
231            profiling: None,
232        }
233    }
234
235    /// Create a new SourceEventWrapper with profiling metadata
236    pub fn with_profiling(
237        source_id: String,
238        event: SourceEvent,
239        timestamp: chrono::DateTime<chrono::Utc>,
240        profiling: ProfilingMetadata,
241    ) -> Self {
242        Self {
243            source_id,
244            event,
245            timestamp,
246            profiling: Some(profiling),
247        }
248    }
249
250    /// Consume this wrapper and return its components.
251    /// This enables zero-copy extraction when the wrapper has sole ownership.
252    pub fn into_parts(
253        self,
254    ) -> (
255        String,
256        SourceEvent,
257        chrono::DateTime<chrono::Utc>,
258        Option<ProfilingMetadata>,
259    ) {
260        (self.source_id, self.event, self.timestamp, self.profiling)
261    }
262
263    /// Try to extract components from an Arc<SourceEventWrapper>.
264    /// Uses Arc::try_unwrap to avoid cloning when we have sole ownership.
265    /// Returns Ok with owned components if sole owner, Err with Arc back if shared.
266    ///
267    /// This enables zero-copy in Channel dispatch mode (single consumer per event)
268    /// while still working correctly in Broadcast mode (cloning required).
269    pub fn try_unwrap_arc(
270        arc_self: Arc<Self>,
271    ) -> Result<
272        (
273            String,
274            SourceEvent,
275            chrono::DateTime<chrono::Utc>,
276            Option<ProfilingMetadata>,
277        ),
278        Arc<Self>,
279    > {
280        Arc::try_unwrap(arc_self).map(|wrapper| wrapper.into_parts())
281    }
282}
283
284// Implement Timestamped for SourceEventWrapper for use in generic priority queue
285impl Timestamped for SourceEventWrapper {
286    fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
287        self.timestamp
288    }
289}
290
291/// Arc-wrapped SourceEventWrapper for zero-copy distribution
292pub type ArcSourceEvent = Arc<SourceEventWrapper>;
293
294/// Bootstrap event wrapper for dedicated bootstrap channels
295#[derive(Debug, Clone)]
296pub struct BootstrapEvent {
297    pub source_id: String,
298    pub change: SourceChange,
299    pub timestamp: chrono::DateTime<chrono::Utc>,
300    pub sequence: u64,
301}
302
303/// Bootstrap completion signal
304#[derive(Debug, Clone)]
305pub struct BootstrapComplete {
306    pub source_id: String,
307    pub total_events: u64,
308}
309
310/// Subscription request from Query to Source
311#[derive(Debug, Clone)]
312pub struct SubscriptionRequest {
313    pub query_id: String,
314    pub source_id: String,
315    pub enable_bootstrap: bool,
316    pub node_labels: Vec<String>,
317    pub relation_labels: Vec<String>,
318}
319
320/// Subscription response from Source to Query
321pub struct SubscriptionResponse {
322    pub query_id: String,
323    pub source_id: String,
324    pub receiver: Box<dyn super::ChangeReceiver<SourceEventWrapper>>,
325    pub bootstrap_receiver: Option<BootstrapEventReceiver>,
326}
327
328/// Subscription response from Query to Reaction
329pub struct QuerySubscriptionResponse {
330    pub query_id: String,
331    pub receiver: Box<dyn super::ChangeReceiver<QueryResult>>,
332}
333
334/// Typed result diff emitted by continuous queries.
335#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
336#[serde(tag = "type")]
337pub enum ResultDiff {
338    #[serde(rename = "ADD")]
339    Add { data: serde_json::Value },
340    #[serde(rename = "DELETE")]
341    Delete { data: serde_json::Value },
342    #[serde(rename = "UPDATE")]
343    Update {
344        data: serde_json::Value,
345        before: serde_json::Value,
346        after: serde_json::Value,
347        #[serde(skip_serializing_if = "Option::is_none")]
348        grouping_keys: Option<Vec<String>>,
349    },
350    #[serde(rename = "aggregation")]
351    Aggregation {
352        before: Option<serde_json::Value>,
353        after: serde_json::Value,
354    },
355    #[serde(rename = "noop")]
356    Noop,
357}
358
359#[derive(Debug, Clone, Serialize, Deserialize)]
360pub struct QueryResult {
361    pub query_id: String,
362    pub timestamp: chrono::DateTime<chrono::Utc>,
363    pub results: Vec<ResultDiff>,
364    pub metadata: HashMap<String, serde_json::Value>,
365    /// Optional profiling metadata for performance tracking
366    #[serde(skip_serializing_if = "Option::is_none")]
367    pub profiling: Option<ProfilingMetadata>,
368}
369
370impl QueryResult {
371    /// Create a new QueryResult without profiling
372    pub fn new(
373        query_id: String,
374        timestamp: chrono::DateTime<chrono::Utc>,
375        results: Vec<ResultDiff>,
376        metadata: HashMap<String, serde_json::Value>,
377    ) -> Self {
378        Self {
379            query_id,
380            timestamp,
381            results,
382            metadata,
383            profiling: None,
384        }
385    }
386
387    /// Create a new QueryResult with profiling metadata
388    pub fn with_profiling(
389        query_id: String,
390        timestamp: chrono::DateTime<chrono::Utc>,
391        results: Vec<ResultDiff>,
392        metadata: HashMap<String, serde_json::Value>,
393        profiling: ProfilingMetadata,
394    ) -> Self {
395        Self {
396            query_id,
397            timestamp,
398            results,
399            metadata,
400            profiling: Some(profiling),
401        }
402    }
403}
404
405// Implement Timestamped for QueryResult for use in generic priority queue
406impl Timestamped for QueryResult {
407    fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
408        self.timestamp
409    }
410}
411
412/// Arc-wrapped QueryResult for zero-copy distribution
413pub type ArcQueryResult = Arc<QueryResult>;
414
415#[derive(Debug, Clone, Serialize, Deserialize)]
416pub struct ComponentEvent {
417    pub component_id: String,
418    pub component_type: ComponentType,
419    pub status: ComponentStatus,
420    pub timestamp: chrono::DateTime<chrono::Utc>,
421    pub message: Option<String>,
422}
423
424#[derive(Debug, Clone, Serialize, Deserialize)]
425pub enum ControlMessage {
426    Start(String),
427    Stop(String),
428    Status(String),
429    Shutdown,
430}
431
432pub type ComponentEventReceiver = mpsc::Receiver<ComponentEvent>;
433pub type ComponentEventSender = mpsc::Sender<ComponentEvent>;
434pub type ControlMessageReceiver = mpsc::Receiver<ControlMessage>;
435pub type ControlMessageSender = mpsc::Sender<ControlMessage>;
436
437// Broadcast channel types for zero-copy event distribution
438pub type SourceBroadcastSender = broadcast::Sender<ArcSourceEvent>;
439pub type SourceBroadcastReceiver = broadcast::Receiver<ArcSourceEvent>;
440
441// Broadcast channel types for zero-copy query result distribution
442pub type QueryResultBroadcastSender = broadcast::Sender<ArcQueryResult>;
443pub type QueryResultBroadcastReceiver = broadcast::Receiver<ArcQueryResult>;
444
445// Bootstrap channel types for dedicated bootstrap data delivery
446pub type BootstrapEventSender = mpsc::Sender<BootstrapEvent>;
447pub type BootstrapEventReceiver = mpsc::Receiver<BootstrapEvent>;
448pub type BootstrapCompleteSender = mpsc::Sender<BootstrapComplete>;
449pub type BootstrapCompleteReceiver = mpsc::Receiver<BootstrapComplete>;
450
451/// Control signals for coordination
452#[derive(Debug, Clone, Serialize, Deserialize)]
453pub enum ControlSignal {
454    /// Query has entered running state
455    Running { query_id: String },
456    /// Query has stopped
457    Stopped { query_id: String },
458    /// Query has been deleted
459    Deleted { query_id: String },
460}
461
462/// Wrapper for control signals with metadata
463#[derive(Debug, Clone)]
464pub struct ControlSignalWrapper {
465    pub signal: ControlSignal,
466    pub timestamp: chrono::DateTime<chrono::Utc>,
467    pub sequence_number: Option<u64>,
468}
469
470impl ControlSignalWrapper {
471    pub fn new(signal: ControlSignal) -> Self {
472        Self {
473            signal,
474            timestamp: chrono::Utc::now(),
475            sequence_number: None,
476        }
477    }
478
479    pub fn with_sequence(signal: ControlSignal, sequence_number: u64) -> Self {
480        Self {
481            signal,
482            timestamp: chrono::Utc::now(),
483            sequence_number: Some(sequence_number),
484        }
485    }
486}
487
488pub type ControlSignalReceiver = mpsc::Receiver<ControlSignalWrapper>;
489pub type ControlSignalSender = mpsc::Sender<ControlSignalWrapper>;
490
491pub struct EventChannels {
492    pub component_event_tx: ComponentEventSender,
493    pub _control_tx: ControlMessageSender,
494    pub control_signal_tx: ControlSignalSender,
495}
496
497pub struct EventReceivers {
498    pub component_event_rx: ComponentEventReceiver,
499    pub _control_rx: ControlMessageReceiver,
500    pub control_signal_rx: ControlSignalReceiver,
501}
502
503impl EventChannels {
504    pub fn new() -> (Self, EventReceivers) {
505        let (component_event_tx, component_event_rx) = mpsc::channel(1000);
506        let (control_tx, control_rx) = mpsc::channel(100);
507        let (control_signal_tx, control_signal_rx) = mpsc::channel(100);
508
509        let channels = Self {
510            component_event_tx,
511            _control_tx: control_tx,
512            control_signal_tx,
513        };
514
515        let receivers = EventReceivers {
516            component_event_rx,
517            _control_rx: control_rx,
518            control_signal_rx,
519        };
520
521        (channels, receivers)
522    }
523}
524
525#[cfg(test)]
526mod tests {
527    use super::*;
528    use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
529
530    fn create_test_source_change() -> SourceChange {
531        let element = Element::Node {
532            metadata: ElementMetadata {
533                reference: ElementReference::new("TestSource", "test-node-1"),
534                labels: vec!["TestLabel".into()].into(),
535                effective_from: 1000,
536            },
537            properties: Default::default(),
538        };
539        SourceChange::Insert { element }
540    }
541
542    #[test]
543    fn test_source_event_wrapper_into_parts() {
544        let change = create_test_source_change();
545        let wrapper = SourceEventWrapper::new(
546            "test-source".to_string(),
547            SourceEvent::Change(change),
548            chrono::Utc::now(),
549        );
550
551        let (source_id, event, _timestamp, profiling) = wrapper.into_parts();
552
553        assert_eq!(source_id, "test-source");
554        assert!(matches!(event, SourceEvent::Change(_)));
555        assert!(profiling.is_none());
556    }
557
558    #[test]
559    fn test_try_unwrap_arc_sole_owner() {
560        let change = create_test_source_change();
561        let wrapper = SourceEventWrapper::new(
562            "test-source".to_string(),
563            SourceEvent::Change(change),
564            chrono::Utc::now(),
565        );
566        let arc = Arc::new(wrapper);
567
568        // With sole ownership, try_unwrap_arc should succeed
569        let result = SourceEventWrapper::try_unwrap_arc(arc);
570        assert!(result.is_ok());
571
572        let (source_id, event, _timestamp, _profiling) = result.unwrap();
573        assert_eq!(source_id, "test-source");
574        assert!(matches!(event, SourceEvent::Change(_)));
575    }
576
577    #[test]
578    fn test_try_unwrap_arc_shared() {
579        let change = create_test_source_change();
580        let wrapper = SourceEventWrapper::new(
581            "test-source".to_string(),
582            SourceEvent::Change(change),
583            chrono::Utc::now(),
584        );
585        let arc = Arc::new(wrapper);
586        let _arc2 = arc.clone(); // Create another reference
587
588        // With shared ownership, try_unwrap_arc should fail and return the Arc
589        let result = SourceEventWrapper::try_unwrap_arc(arc);
590        assert!(result.is_err());
591
592        // The returned Arc should still be valid
593        let returned_arc = result.unwrap_err();
594        assert_eq!(returned_arc.source_id, "test-source");
595    }
596
597    #[test]
598    fn test_zero_copy_extraction_path() {
599        // Simulate the zero-copy extraction path used in query processing
600        let change = create_test_source_change();
601        let wrapper = SourceEventWrapper::new(
602            "test-source".to_string(),
603            SourceEvent::Change(change),
604            chrono::Utc::now(),
605        );
606        let arc = Arc::new(wrapper);
607
608        // This is the zero-copy path - when we have sole ownership
609        let (source_id, event, _timestamp, _profiling) =
610            match SourceEventWrapper::try_unwrap_arc(arc) {
611                Ok(parts) => parts,
612                Err(arc) => {
613                    // Fallback to cloning (would be needed in broadcast mode)
614                    (
615                        arc.source_id.clone(),
616                        arc.event.clone(),
617                        arc.timestamp,
618                        arc.profiling.clone(),
619                    )
620                }
621            };
622
623        // Extract SourceChange from owned event (no clone!)
624        let source_change = match event {
625            SourceEvent::Change(change) => Some(change),
626            _ => None,
627        };
628
629        assert_eq!(source_id, "test-source");
630        assert!(source_change.is_some());
631    }
632}