Skip to main content

drasi_lib/channels/
events.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::profiling::ProfilingMetadata;
16use drasi_core::models::SourceChange;
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::{broadcast, mpsc};
21
22/// Trait for types that have a timestamp, required for priority queue ordering
23pub trait Timestamped {
24    fn timestamp(&self) -> chrono::DateTime<chrono::Utc>;
25}
26
27/// Type of Drasi component
28///
29/// Used for identifying component types in events and monitoring.
30#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
31pub enum ComponentType {
32    Source,
33    Query,
34    Reaction,
35}
36
37/// Execution status of a Drasi component
38///
39/// `ComponentStatus` represents the current lifecycle state of sources, queries, and reactions.
40/// Components transition through these states during their lifecycle, from creation through
41/// execution to shutdown.
42///
43/// # Status Lifecycle
44///
45/// A typical component lifecycle follows this progression:
46///
47/// ```text
48/// Stopped → Starting → Running → Stopping → Stopped
49///                ↓
50///              Error
51/// ```
52///
53/// # Status Values
54///
55/// - **Starting**: Component is initializing (connecting to resources, loading data, etc.)
56/// - **Running**: Component is actively processing (ingesting, querying, or delivering)
57/// - **Stopping**: Component is shutting down gracefully
58/// - **Stopped**: Component is not running (initial or final state)
59/// - **Error**: Component encountered an error and cannot continue (see error_message)
60///
61/// # Usage
62///
63/// Status is available through runtime information methods on [`DrasiLib`](crate::DrasiLib):
64///
65/// - [`get_source_status()`](crate::DrasiLib::get_source_status)
66/// - [`get_query_status()`](crate::DrasiLib::get_query_status)
67/// - [`get_reaction_status()`](crate::DrasiLib::get_reaction_status)
68///
69/// And through runtime info structs:
70///
71/// - [`SourceRuntime`](crate::SourceRuntime)
72/// - [`QueryRuntime`](crate::QueryRuntime)
73/// - [`ReactionRuntime`](crate::ReactionRuntime)
74///
75/// # Examples
76///
77/// ## Monitoring Component Status
78///
79/// ```no_run
80/// use drasi_lib::{DrasiLib, ComponentStatus};
81///
82/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
83/// let core = DrasiLib::builder().with_id("my-server").build().await?;
84/// core.start().await?;
85///
86/// // Check source status
87/// let source_status = core.get_source_status("orders_db").await?;
88/// match source_status {
89///     ComponentStatus::Running => println!("Source is running"),
90///     ComponentStatus::Error => println!("Source has errors"),
91///     ComponentStatus::Starting => println!("Source is starting up"),
92///     _ => println!("Source status: {:?}", source_status),
93/// }
94///
95/// // Get detailed info with status
96/// let source_info = core.get_source_info("orders_db").await?;
97/// if source_info.status == ComponentStatus::Error {
98///     if let Some(error) = source_info.error_message {
99///         eprintln!("Error: {}", error);
100///     }
101/// }
102/// # Ok(())
103/// # }
104/// ```
105///
106/// ## Waiting for Component to Start
107///
108/// ```no_run
109/// use drasi_lib::{DrasiLib, ComponentStatus};
110/// use tokio::time::{sleep, Duration};
111///
112/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
113/// let core = DrasiLib::builder().with_id("my-server").build().await?;
114/// core.start_source("orders_db").await?;
115///
116/// // Poll until source is running
117/// loop {
118///     let status = core.get_source_status("orders_db").await?;
119///     match status {
120///         ComponentStatus::Running => break,
121///         ComponentStatus::Error => return Err("Source failed to start".into()),
122///         _ => sleep(Duration::from_millis(100)).await,
123///     }
124/// }
125/// println!("Source is now running");
126/// # Ok(())
127/// # }
128/// ```
129///
130/// ## Checking All Components
131///
132/// ```no_run
133/// use drasi_lib::{DrasiLib, ComponentStatus};
134///
135/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
136/// let core = DrasiLib::builder().with_id("my-server").build().await?;
137/// core.start().await?;
138///
139/// // Check all sources
140/// let sources = core.list_sources().await?;
141/// for (id, status) in sources {
142///     println!("Source {}: {:?}", id, status);
143/// }
144///
145/// // Check all queries
146/// let queries = core.list_queries().await?;
147/// for (id, status) in queries {
148///     println!("Query {}: {:?}", id, status);
149/// }
150///
151/// // Check all reactions
152/// let reactions = core.list_reactions().await?;
153/// for (id, status) in reactions {
154///     println!("Reaction {}: {:?}", id, status);
155/// }
156/// # Ok(())
157/// # }
158/// ```
159#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
160pub enum ComponentStatus {
161    Starting,
162    Running,
163    Stopping,
164    Stopped,
165    Reconfiguring,
166    Error,
167}
168
169#[derive(Debug, Clone)]
170pub struct SourceChangeEvent {
171    pub source_id: String,
172    pub change: SourceChange,
173    pub timestamp: chrono::DateTime<chrono::Utc>,
174}
175
176/// Control events from sources for query coordination
177#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
178pub enum SourceControl {
179    /// Query subscription control event
180    Subscription {
181        query_id: String,
182        query_node_id: String,
183        node_labels: Vec<String>,
184        rel_labels: Vec<String>,
185        operation: ControlOperation,
186    },
187}
188
189/// Control operation types
190#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
191pub enum ControlOperation {
192    Insert,
193    Update,
194    Delete,
195}
196
197/// Unified event envelope carrying both data changes and control messages
198#[derive(Debug, Clone)]
199pub enum SourceEvent {
200    /// Data change event from source
201    Change(SourceChange),
202    /// Control event for query coordination
203    Control(SourceControl),
204}
205
206/// Wrapper for source events with metadata
207#[derive(Debug, Clone)]
208pub struct SourceEventWrapper {
209    pub source_id: String,
210    pub event: SourceEvent,
211    pub timestamp: chrono::DateTime<chrono::Utc>,
212    /// Optional profiling metadata for performance tracking
213    pub profiling: Option<ProfilingMetadata>,
214}
215
216impl SourceEventWrapper {
217    /// Create a new SourceEventWrapper without profiling
218    pub fn new(
219        source_id: String,
220        event: SourceEvent,
221        timestamp: chrono::DateTime<chrono::Utc>,
222    ) -> Self {
223        Self {
224            source_id,
225            event,
226            timestamp,
227            profiling: None,
228        }
229    }
230
231    /// Create a new SourceEventWrapper with profiling metadata
232    pub fn with_profiling(
233        source_id: String,
234        event: SourceEvent,
235        timestamp: chrono::DateTime<chrono::Utc>,
236        profiling: ProfilingMetadata,
237    ) -> Self {
238        Self {
239            source_id,
240            event,
241            timestamp,
242            profiling: Some(profiling),
243        }
244    }
245
246    /// Consume this wrapper and return its components.
247    /// This enables zero-copy extraction when the wrapper has sole ownership.
248    pub fn into_parts(
249        self,
250    ) -> (
251        String,
252        SourceEvent,
253        chrono::DateTime<chrono::Utc>,
254        Option<ProfilingMetadata>,
255    ) {
256        (self.source_id, self.event, self.timestamp, self.profiling)
257    }
258
259    /// Try to extract components from an Arc<SourceEventWrapper>.
260    /// Uses Arc::try_unwrap to avoid cloning when we have sole ownership.
261    /// Returns Ok with owned components if sole owner, Err with Arc back if shared.
262    ///
263    /// This enables zero-copy in Channel dispatch mode (single consumer per event)
264    /// while still working correctly in Broadcast mode (cloning required).
265    pub fn try_unwrap_arc(
266        arc_self: Arc<Self>,
267    ) -> Result<
268        (
269            String,
270            SourceEvent,
271            chrono::DateTime<chrono::Utc>,
272            Option<ProfilingMetadata>,
273        ),
274        Arc<Self>,
275    > {
276        Arc::try_unwrap(arc_self).map(|wrapper| wrapper.into_parts())
277    }
278}
279
280// Implement Timestamped for SourceEventWrapper for use in generic priority queue
281impl Timestamped for SourceEventWrapper {
282    fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
283        self.timestamp
284    }
285}
286
287/// Arc-wrapped SourceEventWrapper for zero-copy distribution
288pub type ArcSourceEvent = Arc<SourceEventWrapper>;
289
290/// Bootstrap event wrapper for dedicated bootstrap channels
291#[derive(Debug, Clone)]
292pub struct BootstrapEvent {
293    pub source_id: String,
294    pub change: SourceChange,
295    pub timestamp: chrono::DateTime<chrono::Utc>,
296    pub sequence: u64,
297}
298
299/// Bootstrap completion signal
300#[derive(Debug, Clone)]
301pub struct BootstrapComplete {
302    pub source_id: String,
303    pub total_events: u64,
304}
305
306/// Subscription request from Query to Source
307#[derive(Debug, Clone)]
308pub struct SubscriptionRequest {
309    pub query_id: String,
310    pub source_id: String,
311    pub enable_bootstrap: bool,
312    pub node_labels: Vec<String>,
313    pub relation_labels: Vec<String>,
314}
315
316/// Subscription response from Source to Query
317pub struct SubscriptionResponse {
318    pub query_id: String,
319    pub source_id: String,
320    pub receiver: Box<dyn super::ChangeReceiver<SourceEventWrapper>>,
321    pub bootstrap_receiver: Option<BootstrapEventReceiver>,
322}
323
324/// Subscription response from Query to Reaction
325pub struct QuerySubscriptionResponse {
326    pub query_id: String,
327    pub receiver: Box<dyn super::ChangeReceiver<QueryResult>>,
328}
329
330/// Typed result diff emitted by continuous queries.
331#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
332#[serde(tag = "type")]
333pub enum ResultDiff {
334    #[serde(rename = "ADD")]
335    Add { data: serde_json::Value },
336    #[serde(rename = "DELETE")]
337    Delete { data: serde_json::Value },
338    #[serde(rename = "UPDATE")]
339    Update {
340        data: serde_json::Value,
341        before: serde_json::Value,
342        after: serde_json::Value,
343        #[serde(skip_serializing_if = "Option::is_none")]
344        grouping_keys: Option<Vec<String>>,
345    },
346    #[serde(rename = "aggregation")]
347    Aggregation {
348        before: Option<serde_json::Value>,
349        after: serde_json::Value,
350    },
351    #[serde(rename = "noop")]
352    Noop,
353}
354
355#[derive(Debug, Clone, Serialize, Deserialize)]
356pub struct QueryResult {
357    pub query_id: String,
358    pub timestamp: chrono::DateTime<chrono::Utc>,
359    pub results: Vec<ResultDiff>,
360    pub metadata: HashMap<String, serde_json::Value>,
361    /// Optional profiling metadata for performance tracking
362    #[serde(skip_serializing_if = "Option::is_none")]
363    pub profiling: Option<ProfilingMetadata>,
364}
365
366impl QueryResult {
367    /// Create a new QueryResult without profiling
368    pub fn new(
369        query_id: String,
370        timestamp: chrono::DateTime<chrono::Utc>,
371        results: Vec<ResultDiff>,
372        metadata: HashMap<String, serde_json::Value>,
373    ) -> Self {
374        Self {
375            query_id,
376            timestamp,
377            results,
378            metadata,
379            profiling: None,
380        }
381    }
382
383    /// Create a new QueryResult with profiling metadata
384    pub fn with_profiling(
385        query_id: String,
386        timestamp: chrono::DateTime<chrono::Utc>,
387        results: Vec<ResultDiff>,
388        metadata: HashMap<String, serde_json::Value>,
389        profiling: ProfilingMetadata,
390    ) -> Self {
391        Self {
392            query_id,
393            timestamp,
394            results,
395            metadata,
396            profiling: Some(profiling),
397        }
398    }
399}
400
401// Implement Timestamped for QueryResult for use in generic priority queue
402impl Timestamped for QueryResult {
403    fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
404        self.timestamp
405    }
406}
407
408/// Arc-wrapped QueryResult for zero-copy distribution
409pub type ArcQueryResult = Arc<QueryResult>;
410
411#[derive(Debug, Clone, Serialize, Deserialize)]
412pub struct ComponentEvent {
413    pub component_id: String,
414    pub component_type: ComponentType,
415    pub status: ComponentStatus,
416    pub timestamp: chrono::DateTime<chrono::Utc>,
417    pub message: Option<String>,
418}
419
420#[derive(Debug, Clone, Serialize, Deserialize)]
421pub enum ControlMessage {
422    Start(String),
423    Stop(String),
424    Status(String),
425    Shutdown,
426}
427
428pub type ComponentEventReceiver = mpsc::Receiver<ComponentEvent>;
429pub type ComponentEventSender = mpsc::Sender<ComponentEvent>;
430pub type ControlMessageReceiver = mpsc::Receiver<ControlMessage>;
431pub type ControlMessageSender = mpsc::Sender<ControlMessage>;
432
433// Broadcast channel types for zero-copy event distribution
434pub type SourceBroadcastSender = broadcast::Sender<ArcSourceEvent>;
435pub type SourceBroadcastReceiver = broadcast::Receiver<ArcSourceEvent>;
436
437// Broadcast channel types for zero-copy query result distribution
438pub type QueryResultBroadcastSender = broadcast::Sender<ArcQueryResult>;
439pub type QueryResultBroadcastReceiver = broadcast::Receiver<ArcQueryResult>;
440
441// Bootstrap channel types for dedicated bootstrap data delivery
442pub type BootstrapEventSender = mpsc::Sender<BootstrapEvent>;
443pub type BootstrapEventReceiver = mpsc::Receiver<BootstrapEvent>;
444pub type BootstrapCompleteSender = mpsc::Sender<BootstrapComplete>;
445pub type BootstrapCompleteReceiver = mpsc::Receiver<BootstrapComplete>;
446
447/// Control signals for coordination
448#[derive(Debug, Clone, Serialize, Deserialize)]
449pub enum ControlSignal {
450    /// Query has entered running state
451    Running { query_id: String },
452    /// Query has stopped
453    Stopped { query_id: String },
454    /// Query has been deleted
455    Deleted { query_id: String },
456}
457
458/// Wrapper for control signals with metadata
459#[derive(Debug, Clone)]
460pub struct ControlSignalWrapper {
461    pub signal: ControlSignal,
462    pub timestamp: chrono::DateTime<chrono::Utc>,
463    pub sequence_number: Option<u64>,
464}
465
466impl ControlSignalWrapper {
467    pub fn new(signal: ControlSignal) -> Self {
468        Self {
469            signal,
470            timestamp: chrono::Utc::now(),
471            sequence_number: None,
472        }
473    }
474
475    pub fn with_sequence(signal: ControlSignal, sequence_number: u64) -> Self {
476        Self {
477            signal,
478            timestamp: chrono::Utc::now(),
479            sequence_number: Some(sequence_number),
480        }
481    }
482}
483
484pub type ControlSignalReceiver = mpsc::Receiver<ControlSignalWrapper>;
485pub type ControlSignalSender = mpsc::Sender<ControlSignalWrapper>;
486
487pub struct EventChannels {
488    pub component_event_tx: ComponentEventSender,
489    pub _control_tx: ControlMessageSender,
490    pub control_signal_tx: ControlSignalSender,
491}
492
493pub struct EventReceivers {
494    pub component_event_rx: ComponentEventReceiver,
495    pub _control_rx: ControlMessageReceiver,
496    pub control_signal_rx: ControlSignalReceiver,
497}
498
499impl EventChannels {
500    pub fn new() -> (Self, EventReceivers) {
501        let (component_event_tx, component_event_rx) = mpsc::channel(1000);
502        let (control_tx, control_rx) = mpsc::channel(100);
503        let (control_signal_tx, control_signal_rx) = mpsc::channel(100);
504
505        let channels = Self {
506            component_event_tx,
507            _control_tx: control_tx,
508            control_signal_tx,
509        };
510
511        let receivers = EventReceivers {
512            component_event_rx,
513            _control_rx: control_rx,
514            control_signal_rx,
515        };
516
517        (channels, receivers)
518    }
519}
520
521#[cfg(test)]
522mod tests {
523    use super::*;
524    use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
525
526    fn create_test_source_change() -> SourceChange {
527        let element = Element::Node {
528            metadata: ElementMetadata {
529                reference: ElementReference::new("TestSource", "test-node-1"),
530                labels: vec!["TestLabel".into()].into(),
531                effective_from: 1000,
532            },
533            properties: Default::default(),
534        };
535        SourceChange::Insert { element }
536    }
537
538    #[test]
539    fn test_source_event_wrapper_into_parts() {
540        let change = create_test_source_change();
541        let wrapper = SourceEventWrapper::new(
542            "test-source".to_string(),
543            SourceEvent::Change(change),
544            chrono::Utc::now(),
545        );
546
547        let (source_id, event, _timestamp, profiling) = wrapper.into_parts();
548
549        assert_eq!(source_id, "test-source");
550        assert!(matches!(event, SourceEvent::Change(_)));
551        assert!(profiling.is_none());
552    }
553
554    #[test]
555    fn test_try_unwrap_arc_sole_owner() {
556        let change = create_test_source_change();
557        let wrapper = SourceEventWrapper::new(
558            "test-source".to_string(),
559            SourceEvent::Change(change),
560            chrono::Utc::now(),
561        );
562        let arc = Arc::new(wrapper);
563
564        // With sole ownership, try_unwrap_arc should succeed
565        let result = SourceEventWrapper::try_unwrap_arc(arc);
566        assert!(result.is_ok());
567
568        let (source_id, event, _timestamp, _profiling) = result.unwrap();
569        assert_eq!(source_id, "test-source");
570        assert!(matches!(event, SourceEvent::Change(_)));
571    }
572
573    #[test]
574    fn test_try_unwrap_arc_shared() {
575        let change = create_test_source_change();
576        let wrapper = SourceEventWrapper::new(
577            "test-source".to_string(),
578            SourceEvent::Change(change),
579            chrono::Utc::now(),
580        );
581        let arc = Arc::new(wrapper);
582        let _arc2 = arc.clone(); // Create another reference
583
584        // With shared ownership, try_unwrap_arc should fail and return the Arc
585        let result = SourceEventWrapper::try_unwrap_arc(arc);
586        assert!(result.is_err());
587
588        // The returned Arc should still be valid
589        let returned_arc = result.unwrap_err();
590        assert_eq!(returned_arc.source_id, "test-source");
591    }
592
593    #[test]
594    fn test_zero_copy_extraction_path() {
595        // Simulate the zero-copy extraction path used in query processing
596        let change = create_test_source_change();
597        let wrapper = SourceEventWrapper::new(
598            "test-source".to_string(),
599            SourceEvent::Change(change),
600            chrono::Utc::now(),
601        );
602        let arc = Arc::new(wrapper);
603
604        // This is the zero-copy path - when we have sole ownership
605        let (source_id, event, _timestamp, _profiling) =
606            match SourceEventWrapper::try_unwrap_arc(arc) {
607                Ok(parts) => parts,
608                Err(arc) => {
609                    // Fallback to cloning (would be needed in broadcast mode)
610                    (
611                        arc.source_id.clone(),
612                        arc.event.clone(),
613                        arc.timestamp,
614                        arc.profiling.clone(),
615                    )
616                }
617            };
618
619        // Extract SourceChange from owned event (no clone!)
620        let source_change = match event {
621            SourceEvent::Change(change) => Some(change),
622            _ => None,
623        };
624
625        assert_eq!(source_id, "test-source");
626        assert!(source_change.is_some());
627    }
628}