Skip to main content

drasi_lib/channels/
events.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::profiling::ProfilingMetadata;
16use drasi_core::models::SourceChange;
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::{broadcast, mpsc};
21
22/// Trait for types that have a timestamp, required for priority queue ordering
23pub trait Timestamped {
24    fn timestamp(&self) -> chrono::DateTime<chrono::Utc>;
25}
26
27/// Type of Drasi component
28///
29/// Used for identifying component types in events and monitoring.
30#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
31pub enum ComponentType {
32    Source,
33    Query,
34    Reaction,
35}
36
37/// Execution status of a Drasi component
38///
39/// `ComponentStatus` represents the current lifecycle state of sources, queries, and reactions.
40/// Components transition through these states during their lifecycle, from creation through
41/// execution to shutdown.
42///
43/// # Status Lifecycle
44///
45/// A typical component lifecycle follows this progression:
46///
47/// ```text
48/// Stopped → Starting → Running → Stopping → Stopped
49///                ↓
50///              Error
51/// ```
52///
53/// # Status Values
54///
55/// - **Starting**: Component is initializing (connecting to resources, loading data, etc.)
56/// - **Running**: Component is actively processing (ingesting, querying, or delivering)
57/// - **Stopping**: Component is shutting down gracefully
58/// - **Stopped**: Component is not running (initial or final state)
59/// - **Error**: Component encountered an error and cannot continue (see error_message)
60///
61/// # Usage
62///
63/// Status is available through runtime information methods on [`DrasiLib`](crate::DrasiLib):
64///
65/// - [`get_source_status()`](crate::DrasiLib::get_source_status)
66/// - [`get_query_status()`](crate::DrasiLib::get_query_status)
67/// - [`get_reaction_status()`](crate::DrasiLib::get_reaction_status)
68///
69/// And through runtime info structs:
70///
71/// - [`SourceRuntime`](crate::SourceRuntime)
72/// - [`QueryRuntime`](crate::QueryRuntime)
73/// - [`ReactionRuntime`](crate::ReactionRuntime)
74///
75/// # Examples
76///
77/// ## Monitoring Component Status
78///
79/// ```no_run
80/// use drasi_lib::{DrasiLib, ComponentStatus};
81///
82/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
83/// let core = DrasiLib::builder().with_id("my-server").build().await?;
84/// core.start().await?;
85///
86/// // Check source status
87/// let source_status = core.get_source_status("orders_db").await?;
88/// match source_status {
89///     ComponentStatus::Running => println!("Source is running"),
90///     ComponentStatus::Error => println!("Source has errors"),
91///     ComponentStatus::Starting => println!("Source is starting up"),
92///     _ => println!("Source status: {:?}", source_status),
93/// }
94///
95/// // Get detailed info with status
96/// let source_info = core.get_source_info("orders_db").await?;
97/// if source_info.status == ComponentStatus::Error {
98///     if let Some(error) = source_info.error_message {
99///         eprintln!("Error: {}", error);
100///     }
101/// }
102/// # Ok(())
103/// # }
104/// ```
105///
106/// ## Waiting for Component to Start
107///
108/// ```no_run
109/// use drasi_lib::{DrasiLib, ComponentStatus};
110/// use tokio::time::{sleep, Duration};
111///
112/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
113/// let core = DrasiLib::builder().with_id("my-server").build().await?;
114/// core.start_source("orders_db").await?;
115///
116/// // Poll until source is running
117/// loop {
118///     let status = core.get_source_status("orders_db").await?;
119///     match status {
120///         ComponentStatus::Running => break,
121///         ComponentStatus::Error => return Err("Source failed to start".into()),
122///         _ => sleep(Duration::from_millis(100)).await,
123///     }
124/// }
125/// println!("Source is now running");
126/// # Ok(())
127/// # }
128/// ```
129///
130/// ## Checking All Components
131///
132/// ```no_run
133/// use drasi_lib::{DrasiLib, ComponentStatus};
134///
135/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
136/// let core = DrasiLib::builder().with_id("my-server").build().await?;
137/// core.start().await?;
138///
139/// // Check all sources
140/// let sources = core.list_sources().await?;
141/// for (id, status) in sources {
142///     println!("Source {}: {:?}", id, status);
143/// }
144///
145/// // Check all queries
146/// let queries = core.list_queries().await?;
147/// for (id, status) in queries {
148///     println!("Query {}: {:?}", id, status);
149/// }
150///
151/// // Check all reactions
152/// let reactions = core.list_reactions().await?;
153/// for (id, status) in reactions {
154///     println!("Reaction {}: {:?}", id, status);
155/// }
156/// # Ok(())
157/// # }
158/// ```
159#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
160pub enum ComponentStatus {
161    Starting,
162    Running,
163    Stopping,
164    Stopped,
165    Reconfiguring,
166    Error,
167}
168
169#[derive(Debug, Clone)]
170pub struct SourceChangeEvent {
171    pub source_id: String,
172    pub change: SourceChange,
173    pub timestamp: chrono::DateTime<chrono::Utc>,
174}
175
176/// Control events from sources for query coordination
177#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
178pub enum SourceControl {
179    /// Query subscription control event
180    Subscription {
181        query_id: String,
182        query_node_id: String,
183        node_labels: Vec<String>,
184        rel_labels: Vec<String>,
185        operation: ControlOperation,
186    },
187    /// Signal from FutureQueueSource that one or more future items are due.
188    FuturesDue,
189}
190
191/// Control operation types
192#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
193pub enum ControlOperation {
194    Insert,
195    Update,
196    Delete,
197}
198
199/// Unified event envelope carrying both data changes and control messages
200#[derive(Debug, Clone)]
201pub enum SourceEvent {
202    /// Data change event from source
203    Change(SourceChange),
204    /// Control event for query coordination
205    Control(SourceControl),
206}
207
208/// Wrapper for source events with metadata
209#[derive(Debug, Clone)]
210pub struct SourceEventWrapper {
211    pub source_id: String,
212    pub event: SourceEvent,
213    pub timestamp: chrono::DateTime<chrono::Utc>,
214    /// Optional profiling metadata for performance tracking
215    pub profiling: Option<ProfilingMetadata>,
216}
217
218impl SourceEventWrapper {
219    /// Create a new SourceEventWrapper without profiling
220    pub fn new(
221        source_id: String,
222        event: SourceEvent,
223        timestamp: chrono::DateTime<chrono::Utc>,
224    ) -> Self {
225        Self {
226            source_id,
227            event,
228            timestamp,
229            profiling: None,
230        }
231    }
232
233    /// Create a new SourceEventWrapper with profiling metadata
234    pub fn with_profiling(
235        source_id: String,
236        event: SourceEvent,
237        timestamp: chrono::DateTime<chrono::Utc>,
238        profiling: ProfilingMetadata,
239    ) -> Self {
240        Self {
241            source_id,
242            event,
243            timestamp,
244            profiling: Some(profiling),
245        }
246    }
247
248    /// Consume this wrapper and return its components.
249    /// This enables zero-copy extraction when the wrapper has sole ownership.
250    pub fn into_parts(
251        self,
252    ) -> (
253        String,
254        SourceEvent,
255        chrono::DateTime<chrono::Utc>,
256        Option<ProfilingMetadata>,
257    ) {
258        (self.source_id, self.event, self.timestamp, self.profiling)
259    }
260
261    /// Try to extract components from an Arc<SourceEventWrapper>.
262    /// Uses Arc::try_unwrap to avoid cloning when we have sole ownership.
263    /// Returns Ok with owned components if sole owner, Err with Arc back if shared.
264    ///
265    /// This enables zero-copy in Channel dispatch mode (single consumer per event)
266    /// while still working correctly in Broadcast mode (cloning required).
267    pub fn try_unwrap_arc(
268        arc_self: Arc<Self>,
269    ) -> Result<
270        (
271            String,
272            SourceEvent,
273            chrono::DateTime<chrono::Utc>,
274            Option<ProfilingMetadata>,
275        ),
276        Arc<Self>,
277    > {
278        Arc::try_unwrap(arc_self).map(|wrapper| wrapper.into_parts())
279    }
280}
281
282// Implement Timestamped for SourceEventWrapper for use in generic priority queue
283impl Timestamped for SourceEventWrapper {
284    fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
285        self.timestamp
286    }
287}
288
289/// Arc-wrapped SourceEventWrapper for zero-copy distribution
290pub type ArcSourceEvent = Arc<SourceEventWrapper>;
291
292/// Bootstrap event wrapper for dedicated bootstrap channels
293#[derive(Debug, Clone)]
294pub struct BootstrapEvent {
295    pub source_id: String,
296    pub change: SourceChange,
297    pub timestamp: chrono::DateTime<chrono::Utc>,
298    pub sequence: u64,
299}
300
301/// Bootstrap completion signal
302#[derive(Debug, Clone)]
303pub struct BootstrapComplete {
304    pub source_id: String,
305    pub total_events: u64,
306}
307
308/// Subscription request from Query to Source
309#[derive(Debug, Clone)]
310pub struct SubscriptionRequest {
311    pub query_id: String,
312    pub source_id: String,
313    pub enable_bootstrap: bool,
314    pub node_labels: Vec<String>,
315    pub relation_labels: Vec<String>,
316}
317
318/// Subscription response from Source to Query
319pub struct SubscriptionResponse {
320    pub query_id: String,
321    pub source_id: String,
322    pub receiver: Box<dyn super::ChangeReceiver<SourceEventWrapper>>,
323    pub bootstrap_receiver: Option<BootstrapEventReceiver>,
324}
325
326/// Subscription response from Query to Reaction
327pub struct QuerySubscriptionResponse {
328    pub query_id: String,
329    pub receiver: Box<dyn super::ChangeReceiver<QueryResult>>,
330}
331
332/// Typed result diff emitted by continuous queries.
333#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
334#[serde(tag = "type")]
335pub enum ResultDiff {
336    #[serde(rename = "ADD")]
337    Add { data: serde_json::Value },
338    #[serde(rename = "DELETE")]
339    Delete { data: serde_json::Value },
340    #[serde(rename = "UPDATE")]
341    Update {
342        data: serde_json::Value,
343        before: serde_json::Value,
344        after: serde_json::Value,
345        #[serde(skip_serializing_if = "Option::is_none")]
346        grouping_keys: Option<Vec<String>>,
347    },
348    #[serde(rename = "aggregation")]
349    Aggregation {
350        before: Option<serde_json::Value>,
351        after: serde_json::Value,
352    },
353    #[serde(rename = "noop")]
354    Noop,
355}
356
357#[derive(Debug, Clone, Serialize, Deserialize)]
358pub struct QueryResult {
359    pub query_id: String,
360    pub timestamp: chrono::DateTime<chrono::Utc>,
361    pub results: Vec<ResultDiff>,
362    pub metadata: HashMap<String, serde_json::Value>,
363    /// Optional profiling metadata for performance tracking
364    #[serde(skip_serializing_if = "Option::is_none")]
365    pub profiling: Option<ProfilingMetadata>,
366}
367
368impl QueryResult {
369    /// Create a new QueryResult without profiling
370    pub fn new(
371        query_id: String,
372        timestamp: chrono::DateTime<chrono::Utc>,
373        results: Vec<ResultDiff>,
374        metadata: HashMap<String, serde_json::Value>,
375    ) -> Self {
376        Self {
377            query_id,
378            timestamp,
379            results,
380            metadata,
381            profiling: None,
382        }
383    }
384
385    /// Create a new QueryResult with profiling metadata
386    pub fn with_profiling(
387        query_id: String,
388        timestamp: chrono::DateTime<chrono::Utc>,
389        results: Vec<ResultDiff>,
390        metadata: HashMap<String, serde_json::Value>,
391        profiling: ProfilingMetadata,
392    ) -> Self {
393        Self {
394            query_id,
395            timestamp,
396            results,
397            metadata,
398            profiling: Some(profiling),
399        }
400    }
401}
402
403// Implement Timestamped for QueryResult for use in generic priority queue
404impl Timestamped for QueryResult {
405    fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
406        self.timestamp
407    }
408}
409
410/// Arc-wrapped QueryResult for zero-copy distribution
411pub type ArcQueryResult = Arc<QueryResult>;
412
413#[derive(Debug, Clone, Serialize, Deserialize)]
414pub struct ComponentEvent {
415    pub component_id: String,
416    pub component_type: ComponentType,
417    pub status: ComponentStatus,
418    pub timestamp: chrono::DateTime<chrono::Utc>,
419    pub message: Option<String>,
420}
421
422#[derive(Debug, Clone, Serialize, Deserialize)]
423pub enum ControlMessage {
424    Start(String),
425    Stop(String),
426    Status(String),
427    Shutdown,
428}
429
430pub type ComponentEventReceiver = mpsc::Receiver<ComponentEvent>;
431pub type ComponentEventSender = mpsc::Sender<ComponentEvent>;
432pub type ControlMessageReceiver = mpsc::Receiver<ControlMessage>;
433pub type ControlMessageSender = mpsc::Sender<ControlMessage>;
434
435// Broadcast channel types for zero-copy event distribution
436pub type SourceBroadcastSender = broadcast::Sender<ArcSourceEvent>;
437pub type SourceBroadcastReceiver = broadcast::Receiver<ArcSourceEvent>;
438
439// Broadcast channel types for zero-copy query result distribution
440pub type QueryResultBroadcastSender = broadcast::Sender<ArcQueryResult>;
441pub type QueryResultBroadcastReceiver = broadcast::Receiver<ArcQueryResult>;
442
443// Bootstrap channel types for dedicated bootstrap data delivery
444pub type BootstrapEventSender = mpsc::Sender<BootstrapEvent>;
445pub type BootstrapEventReceiver = mpsc::Receiver<BootstrapEvent>;
446pub type BootstrapCompleteSender = mpsc::Sender<BootstrapComplete>;
447pub type BootstrapCompleteReceiver = mpsc::Receiver<BootstrapComplete>;
448
449/// Control signals for coordination
450#[derive(Debug, Clone, Serialize, Deserialize)]
451pub enum ControlSignal {
452    /// Query has entered running state
453    Running { query_id: String },
454    /// Query has stopped
455    Stopped { query_id: String },
456    /// Query has been deleted
457    Deleted { query_id: String },
458}
459
460/// Wrapper for control signals with metadata
461#[derive(Debug, Clone)]
462pub struct ControlSignalWrapper {
463    pub signal: ControlSignal,
464    pub timestamp: chrono::DateTime<chrono::Utc>,
465    pub sequence_number: Option<u64>,
466}
467
468impl ControlSignalWrapper {
469    pub fn new(signal: ControlSignal) -> Self {
470        Self {
471            signal,
472            timestamp: chrono::Utc::now(),
473            sequence_number: None,
474        }
475    }
476
477    pub fn with_sequence(signal: ControlSignal, sequence_number: u64) -> Self {
478        Self {
479            signal,
480            timestamp: chrono::Utc::now(),
481            sequence_number: Some(sequence_number),
482        }
483    }
484}
485
486pub type ControlSignalReceiver = mpsc::Receiver<ControlSignalWrapper>;
487pub type ControlSignalSender = mpsc::Sender<ControlSignalWrapper>;
488
489pub struct EventChannels {
490    pub component_event_tx: ComponentEventSender,
491    pub _control_tx: ControlMessageSender,
492    pub control_signal_tx: ControlSignalSender,
493}
494
495pub struct EventReceivers {
496    pub component_event_rx: ComponentEventReceiver,
497    pub _control_rx: ControlMessageReceiver,
498    pub control_signal_rx: ControlSignalReceiver,
499}
500
501impl EventChannels {
502    pub fn new() -> (Self, EventReceivers) {
503        let (component_event_tx, component_event_rx) = mpsc::channel(1000);
504        let (control_tx, control_rx) = mpsc::channel(100);
505        let (control_signal_tx, control_signal_rx) = mpsc::channel(100);
506
507        let channels = Self {
508            component_event_tx,
509            _control_tx: control_tx,
510            control_signal_tx,
511        };
512
513        let receivers = EventReceivers {
514            component_event_rx,
515            _control_rx: control_rx,
516            control_signal_rx,
517        };
518
519        (channels, receivers)
520    }
521}
522
523#[cfg(test)]
524mod tests {
525    use super::*;
526    use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
527
528    fn create_test_source_change() -> SourceChange {
529        let element = Element::Node {
530            metadata: ElementMetadata {
531                reference: ElementReference::new("TestSource", "test-node-1"),
532                labels: vec!["TestLabel".into()].into(),
533                effective_from: 1000,
534            },
535            properties: Default::default(),
536        };
537        SourceChange::Insert { element }
538    }
539
540    #[test]
541    fn test_source_event_wrapper_into_parts() {
542        let change = create_test_source_change();
543        let wrapper = SourceEventWrapper::new(
544            "test-source".to_string(),
545            SourceEvent::Change(change),
546            chrono::Utc::now(),
547        );
548
549        let (source_id, event, _timestamp, profiling) = wrapper.into_parts();
550
551        assert_eq!(source_id, "test-source");
552        assert!(matches!(event, SourceEvent::Change(_)));
553        assert!(profiling.is_none());
554    }
555
556    #[test]
557    fn test_try_unwrap_arc_sole_owner() {
558        let change = create_test_source_change();
559        let wrapper = SourceEventWrapper::new(
560            "test-source".to_string(),
561            SourceEvent::Change(change),
562            chrono::Utc::now(),
563        );
564        let arc = Arc::new(wrapper);
565
566        // With sole ownership, try_unwrap_arc should succeed
567        let result = SourceEventWrapper::try_unwrap_arc(arc);
568        assert!(result.is_ok());
569
570        let (source_id, event, _timestamp, _profiling) = result.unwrap();
571        assert_eq!(source_id, "test-source");
572        assert!(matches!(event, SourceEvent::Change(_)));
573    }
574
575    #[test]
576    fn test_try_unwrap_arc_shared() {
577        let change = create_test_source_change();
578        let wrapper = SourceEventWrapper::new(
579            "test-source".to_string(),
580            SourceEvent::Change(change),
581            chrono::Utc::now(),
582        );
583        let arc = Arc::new(wrapper);
584        let _arc2 = arc.clone(); // Create another reference
585
586        // With shared ownership, try_unwrap_arc should fail and return the Arc
587        let result = SourceEventWrapper::try_unwrap_arc(arc);
588        assert!(result.is_err());
589
590        // The returned Arc should still be valid
591        let returned_arc = result.unwrap_err();
592        assert_eq!(returned_arc.source_id, "test-source");
593    }
594
595    #[test]
596    fn test_zero_copy_extraction_path() {
597        // Simulate the zero-copy extraction path used in query processing
598        let change = create_test_source_change();
599        let wrapper = SourceEventWrapper::new(
600            "test-source".to_string(),
601            SourceEvent::Change(change),
602            chrono::Utc::now(),
603        );
604        let arc = Arc::new(wrapper);
605
606        // This is the zero-copy path - when we have sole ownership
607        let (source_id, event, _timestamp, _profiling) =
608            match SourceEventWrapper::try_unwrap_arc(arc) {
609                Ok(parts) => parts,
610                Err(arc) => {
611                    // Fallback to cloning (would be needed in broadcast mode)
612                    (
613                        arc.source_id.clone(),
614                        arc.event.clone(),
615                        arc.timestamp,
616                        arc.profiling.clone(),
617                    )
618                }
619            };
620
621        // Extract SourceChange from owned event (no clone!)
622        let source_change = match event {
623            SourceEvent::Change(change) => Some(change),
624            _ => None,
625        };
626
627        assert_eq!(source_id, "test-source");
628        assert!(source_change.is_some());
629    }
630}