Skip to main content

drasi_lib/channels/
events.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::profiling::ProfilingMetadata;
16use drasi_core::models::SourceChange;
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::sync::atomic::AtomicU64;
20use std::sync::Arc;
21use tokio::sync::{broadcast, mpsc};
22
23/// Trait for types that have a timestamp, required for priority queue ordering
24pub trait Timestamped {
25    fn timestamp(&self) -> chrono::DateTime<chrono::Utc>;
26}
27
28/// Type of Drasi component
29///
30/// Used for identifying component types in events and monitoring.
31#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
32pub enum ComponentType {
33    Source,
34    Query,
35    Reaction,
36    BootstrapProvider,
37    IdentityProvider,
38}
39
40/// Execution status of a Drasi component
41///
42/// `ComponentStatus` represents the current lifecycle state of sources, queries, and reactions.
43/// Components transition through these states during their lifecycle, from creation through
44/// execution to shutdown.
45///
46/// # Status Lifecycle
47///
48/// A typical component lifecycle follows this progression:
49///
50/// ```text
51/// Added → Starting → Running → Stopping → Stopped
52///              ↓                              ↓
53///            Error                         Removed
54/// ```
55///
56/// # Status Values
57///
58/// - **Added**: Component has been registered in the graph but not yet started
59/// - **Starting**: Component is initializing (connecting to resources, loading data, etc.)
60/// - **Running**: Component is actively processing (ingesting, querying, or delivering)
61/// - **Stopping**: Component is shutting down gracefully
62/// - **Stopped**: Component is not running (stopped after previously running)
63/// - **Removed**: Component has been removed from the graph
64/// - **Error**: Component encountered an error and cannot continue (see error_message)
65///
66/// # Usage
67///
68/// Status is available through runtime information methods on [`DrasiLib`](crate::DrasiLib):
69///
70/// - [`get_source_status()`](crate::DrasiLib::get_source_status)
71/// - [`get_query_status()`](crate::DrasiLib::get_query_status)
72/// - [`get_reaction_status()`](crate::DrasiLib::get_reaction_status)
73///
74/// And through runtime info structs:
75///
76/// - [`SourceRuntime`](crate::SourceRuntime)
77/// - [`QueryRuntime`](crate::QueryRuntime)
78/// - [`ReactionRuntime`](crate::ReactionRuntime)
79///
80/// # Examples
81///
82/// ## Monitoring Component Status
83///
84/// ```no_run
85/// use drasi_lib::{DrasiLib, ComponentStatus};
86///
87/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
88/// let core = DrasiLib::builder().with_id("my-server").build().await?;
89/// core.start().await?;
90///
91/// // Check source status
92/// let source_status = core.get_source_status("orders_db").await?;
93/// match source_status {
94///     ComponentStatus::Running => println!("Source is running"),
95///     ComponentStatus::Error => println!("Source has errors"),
96///     ComponentStatus::Starting => println!("Source is starting up"),
97///     _ => println!("Source status: {:?}", source_status),
98/// }
99///
100/// // Get detailed info with status
101/// let source_info = core.get_source_info("orders_db").await?;
102/// if source_info.status == ComponentStatus::Error {
103///     if let Some(error) = source_info.error_message {
104///         eprintln!("Error: {}", error);
105///     }
106/// }
107/// # Ok(())
108/// # }
109/// ```
110///
111/// ## Waiting for Component to Start
112///
113/// ```no_run
114/// use drasi_lib::{DrasiLib, ComponentStatus};
115/// use tokio::time::{sleep, Duration};
116///
117/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
118/// let core = DrasiLib::builder().with_id("my-server").build().await?;
119/// core.start_source("orders_db").await?;
120///
121/// // Poll until source is running
122/// loop {
123///     let status = core.get_source_status("orders_db").await?;
124///     match status {
125///         ComponentStatus::Running => break,
126///         ComponentStatus::Error => return Err("Source failed to start".into()),
127///         _ => sleep(Duration::from_millis(100)).await,
128///     }
129/// }
130/// println!("Source is now running");
131/// # Ok(())
132/// # }
133/// ```
134///
135/// ## Checking All Components
136///
137/// ```no_run
138/// use drasi_lib::{DrasiLib, ComponentStatus};
139///
140/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
141/// let core = DrasiLib::builder().with_id("my-server").build().await?;
142/// core.start().await?;
143///
144/// // Check all sources
145/// let sources = core.list_sources().await?;
146/// for (id, status) in sources {
147///     println!("Source {}: {:?}", id, status);
148/// }
149///
150/// // Check all queries
151/// let queries = core.list_queries().await?;
152/// for (id, status) in queries {
153///     println!("Query {}: {:?}", id, status);
154/// }
155///
156/// // Check all reactions
157/// let reactions = core.list_reactions().await?;
158/// for (id, status) in reactions {
159///     println!("Reaction {}: {:?}", id, status);
160/// }
161/// # Ok(())
162/// # }
163/// ```
164#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
165pub enum ComponentStatus {
166    /// Component has been registered in the graph but not yet started.
167    Added,
168    Starting,
169    Running,
170    Stopping,
171    Stopped,
172    /// Component has been removed from the graph.
173    Removed,
174    Reconfiguring,
175    Error,
176}
177
178/// A source change event with metadata for dispatching to queries.
179#[derive(Debug, Clone)]
180pub struct SourceChangeEvent {
181    pub source_id: String,
182    pub change: SourceChange,
183    pub timestamp: chrono::DateTime<chrono::Utc>,
184}
185
186/// Control events from sources for query coordination
187#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
188pub enum SourceControl {
189    /// Query subscription control event
190    Subscription {
191        query_id: String,
192        query_node_id: String,
193        node_labels: Vec<String>,
194        rel_labels: Vec<String>,
195        operation: ControlOperation,
196    },
197    /// Signal from FutureQueueSource that one or more future items are due.
198    FuturesDue,
199}
200
201/// Control operation types
202#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
203pub enum ControlOperation {
204    Insert,
205    Update,
206    Delete,
207}
208
209/// Unified event envelope carrying both data changes and control messages
210#[derive(Debug, Clone)]
211pub enum SourceEvent {
212    /// Data change event from source
213    Change(SourceChange),
214    /// Control event for query coordination
215    Control(SourceControl),
216}
217
218/// Wrapper for source events with metadata
219#[derive(Debug, Clone)]
220pub struct SourceEventWrapper {
221    pub source_id: String,
222    pub event: SourceEvent,
223    pub timestamp: chrono::DateTime<chrono::Utc>,
224    /// Optional profiling metadata for performance tracking
225    pub profiling: Option<ProfilingMetadata>,
226    /// Monotonic, replayable sequence number stamped by the source.
227    /// `None` for volatile sources that don't support replay.
228    /// When present, must be strictly increasing per source.
229    pub sequence: Option<u64>,
230}
231
232impl SourceEventWrapper {
233    /// Create a new SourceEventWrapper without profiling
234    pub fn new(
235        source_id: String,
236        event: SourceEvent,
237        timestamp: chrono::DateTime<chrono::Utc>,
238    ) -> Self {
239        Self {
240            source_id,
241            event,
242            timestamp,
243            profiling: None,
244            sequence: None,
245        }
246    }
247
248    /// Create a new SourceEventWrapper with profiling metadata
249    pub fn with_profiling(
250        source_id: String,
251        event: SourceEvent,
252        timestamp: chrono::DateTime<chrono::Utc>,
253        profiling: ProfilingMetadata,
254    ) -> Self {
255        Self {
256            source_id,
257            event,
258            timestamp,
259            profiling: Some(profiling),
260            sequence: None,
261        }
262    }
263
264    /// Create a new SourceEventWrapper with a sequence number (and optional profiling)
265    pub fn with_sequence(
266        source_id: String,
267        event: SourceEvent,
268        timestamp: chrono::DateTime<chrono::Utc>,
269        sequence: u64,
270        profiling: Option<ProfilingMetadata>,
271    ) -> Self {
272        Self {
273            source_id,
274            event,
275            timestamp,
276            profiling,
277            sequence: Some(sequence),
278        }
279    }
280
281    /// Consume this wrapper and return its components.
282    /// This enables zero-copy extraction when the wrapper has sole ownership.
283    pub fn into_parts(
284        self,
285    ) -> (
286        String,
287        SourceEvent,
288        chrono::DateTime<chrono::Utc>,
289        Option<ProfilingMetadata>,
290        Option<u64>,
291    ) {
292        (
293            self.source_id,
294            self.event,
295            self.timestamp,
296            self.profiling,
297            self.sequence,
298        )
299    }
300
301    /// Try to extract components from an Arc<SourceEventWrapper>.
302    /// Uses Arc::try_unwrap to avoid cloning when we have sole ownership.
303    /// Returns Ok with owned components if sole owner, Err with Arc back if shared.
304    ///
305    /// This enables zero-copy in Channel dispatch mode (single consumer per event)
306    /// while still working correctly in Broadcast mode (cloning required).
307    pub fn try_unwrap_arc(
308        arc_self: Arc<Self>,
309    ) -> Result<
310        (
311            String,
312            SourceEvent,
313            chrono::DateTime<chrono::Utc>,
314            Option<ProfilingMetadata>,
315            Option<u64>,
316        ),
317        Arc<Self>,
318    > {
319        Arc::try_unwrap(arc_self).map(|wrapper| wrapper.into_parts())
320    }
321}
322
323// Implement Timestamped for SourceEventWrapper for use in generic priority queue
324impl Timestamped for SourceEventWrapper {
325    fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
326        self.timestamp
327    }
328}
329
330/// Arc-wrapped SourceEventWrapper for zero-copy distribution
331pub type ArcSourceEvent = Arc<SourceEventWrapper>;
332
333/// Bootstrap event wrapper for dedicated bootstrap channels
334#[derive(Debug, Clone)]
335pub struct BootstrapEvent {
336    pub source_id: String,
337    pub change: SourceChange,
338    pub timestamp: chrono::DateTime<chrono::Utc>,
339    pub sequence: u64,
340}
341
342/// Subscription request from Query to Source
343#[derive(Debug, Clone)]
344pub struct SubscriptionRequest {
345    pub query_id: String,
346    pub source_id: String,
347    pub enable_bootstrap: bool,
348    pub node_labels: Vec<String>,
349    pub relation_labels: Vec<String>,
350}
351
352/// Subscription response from Source to Query
353pub struct SubscriptionResponse {
354    pub query_id: String,
355    pub source_id: String,
356    pub receiver: Box<dyn super::ChangeReceiver<SourceEventWrapper>>,
357    pub bootstrap_receiver: Option<BootstrapEventReceiver>,
358    /// Shared handle for the query to report its last durably-processed sequence position.
359    /// Created by replay-capable sources when `request_position_handle` is true.
360    /// The query writes to this atomically after each commit; the source reads the
361    /// minimum across all subscribers to advance its upstream cursor.
362    /// Sources should initialize this to `u64::MAX` (meaning "no position confirmed yet").
363    pub position_handle: Option<Arc<AtomicU64>>,
364}
365
366/// Subscription response from Query to Reaction
367pub struct QuerySubscriptionResponse {
368    pub query_id: String,
369    pub receiver: Box<dyn super::ChangeReceiver<QueryResult>>,
370}
371
372/// Typed result diff emitted by continuous queries.
373#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
374#[serde(tag = "type")]
375pub enum ResultDiff {
376    #[serde(rename = "ADD")]
377    Add { data: serde_json::Value },
378    #[serde(rename = "DELETE")]
379    Delete { data: serde_json::Value },
380    #[serde(rename = "UPDATE")]
381    Update {
382        data: serde_json::Value,
383        before: serde_json::Value,
384        after: serde_json::Value,
385        #[serde(skip_serializing_if = "Option::is_none")]
386        grouping_keys: Option<Vec<String>>,
387    },
388    #[serde(rename = "aggregation")]
389    Aggregation {
390        before: Option<serde_json::Value>,
391        after: serde_json::Value,
392    },
393    #[serde(rename = "noop")]
394    Noop,
395}
396
397/// Result emitted by a continuous query when data changes.
398///
399/// Contains the diff (added, updated, deleted rows) plus metadata and
400/// optional profiling information. Dispatched to reactions via the priority queue.
401#[derive(Debug, Clone, Serialize, Deserialize)]
402pub struct QueryResult {
403    pub query_id: String,
404    pub timestamp: chrono::DateTime<chrono::Utc>,
405    pub results: Vec<ResultDiff>,
406    pub metadata: HashMap<String, serde_json::Value>,
407    /// Optional profiling metadata for performance tracking
408    #[serde(skip_serializing_if = "Option::is_none")]
409    pub profiling: Option<ProfilingMetadata>,
410}
411
412impl QueryResult {
413    /// Create a new QueryResult without profiling
414    pub fn new(
415        query_id: String,
416        timestamp: chrono::DateTime<chrono::Utc>,
417        results: Vec<ResultDiff>,
418        metadata: HashMap<String, serde_json::Value>,
419    ) -> Self {
420        Self {
421            query_id,
422            timestamp,
423            results,
424            metadata,
425            profiling: None,
426        }
427    }
428
429    /// Create a new QueryResult with profiling metadata
430    pub fn with_profiling(
431        query_id: String,
432        timestamp: chrono::DateTime<chrono::Utc>,
433        results: Vec<ResultDiff>,
434        metadata: HashMap<String, serde_json::Value>,
435        profiling: ProfilingMetadata,
436    ) -> Self {
437        Self {
438            query_id,
439            timestamp,
440            results,
441            metadata,
442            profiling: Some(profiling),
443        }
444    }
445}
446
447// Implement Timestamped for QueryResult for use in generic priority queue
448impl Timestamped for QueryResult {
449    fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
450        self.timestamp
451    }
452}
453
454/// Arc-wrapped QueryResult for zero-copy distribution
455pub type ArcQueryResult = Arc<QueryResult>;
456
457/// Lifecycle event emitted when a component's status changes.
458///
459/// Broadcast via the component event channel to all subscribers.
460/// Used for monitoring, logging, and reactive lifecycle coordination.
461#[derive(Debug, Clone, Serialize, Deserialize)]
462pub struct ComponentEvent {
463    pub component_id: String,
464    pub component_type: ComponentType,
465    pub status: ComponentStatus,
466    pub timestamp: chrono::DateTime<chrono::Utc>,
467    pub message: Option<String>,
468}
469
470/// Control messages for component lifecycle management.
471#[derive(Debug, Clone, Serialize, Deserialize)]
472pub enum ControlMessage {
473    Start(String),
474    Stop(String),
475    Status(String),
476    Shutdown,
477}
478
479pub type ComponentEventBroadcastSender = broadcast::Sender<ComponentEvent>;
480pub type ComponentEventBroadcastReceiver = broadcast::Receiver<ComponentEvent>;
481/// Backward-compatible mpsc channel types used by host-sdk plugin callbacks.
482/// New code should use `ComponentUpdateSender` from `component_graph` instead.
483pub type ComponentEventSender = mpsc::Sender<ComponentEvent>;
484pub type ComponentEventReceiver = mpsc::Receiver<ComponentEvent>;
485pub type ControlMessageReceiver = mpsc::Receiver<ControlMessage>;
486pub type ControlMessageSender = mpsc::Sender<ControlMessage>;
487
488// Broadcast channel types for zero-copy event distribution
489pub type SourceBroadcastSender = broadcast::Sender<ArcSourceEvent>;
490pub type SourceBroadcastReceiver = broadcast::Receiver<ArcSourceEvent>;
491
492// Broadcast channel types for zero-copy query result distribution
493pub type QueryResultBroadcastSender = broadcast::Sender<ArcQueryResult>;
494pub type QueryResultBroadcastReceiver = broadcast::Receiver<ArcQueryResult>;
495
496// Bootstrap channel types for dedicated bootstrap data delivery
497pub type BootstrapEventSender = mpsc::Sender<BootstrapEvent>;
498pub type BootstrapEventReceiver = mpsc::Receiver<BootstrapEvent>;
499
500/// Control signals for coordination
501#[derive(Debug, Clone, Serialize, Deserialize)]
502pub enum ControlSignal {
503    /// Query has entered running state
504    Running { query_id: String },
505    /// Query has stopped
506    Stopped { query_id: String },
507    /// Query has been deleted
508    Deleted { query_id: String },
509}
510
511/// Wrapper for control signals with metadata
512#[derive(Debug, Clone)]
513pub struct ControlSignalWrapper {
514    pub signal: ControlSignal,
515    pub timestamp: chrono::DateTime<chrono::Utc>,
516    pub sequence_number: Option<u64>,
517}
518
519impl ControlSignalWrapper {
520    pub fn new(signal: ControlSignal) -> Self {
521        Self {
522            signal,
523            timestamp: chrono::Utc::now(),
524            sequence_number: None,
525        }
526    }
527
528    pub fn with_sequence(signal: ControlSignal, sequence_number: u64) -> Self {
529        Self {
530            signal,
531            timestamp: chrono::Utc::now(),
532            sequence_number: Some(sequence_number),
533        }
534    }
535}
536
537pub type ControlSignalReceiver = mpsc::Receiver<ControlSignalWrapper>;
538pub type ControlSignalSender = mpsc::Sender<ControlSignalWrapper>;
539
540pub struct EventChannels {
541    pub _control_tx: ControlMessageSender,
542    pub control_signal_tx: ControlSignalSender,
543}
544
545pub struct EventReceivers {
546    pub _control_rx: ControlMessageReceiver,
547    pub control_signal_rx: ControlSignalReceiver,
548}
549
550impl EventChannels {
551    pub fn new() -> (Self, EventReceivers) {
552        let (control_tx, control_rx) = mpsc::channel(100);
553        let (control_signal_tx, control_signal_rx) = mpsc::channel(100);
554
555        let channels = Self {
556            _control_tx: control_tx,
557            control_signal_tx,
558        };
559
560        let receivers = EventReceivers {
561            _control_rx: control_rx,
562            control_signal_rx,
563        };
564
565        (channels, receivers)
566    }
567}
568
569#[cfg(test)]
570mod tests {
571    use super::*;
572    use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
573
574    fn create_test_source_change() -> SourceChange {
575        let element = Element::Node {
576            metadata: ElementMetadata {
577                reference: ElementReference::new("TestSource", "test-node-1"),
578                labels: vec!["TestLabel".into()].into(),
579                effective_from: 1000,
580            },
581            properties: Default::default(),
582        };
583        SourceChange::Insert { element }
584    }
585
586    #[test]
587    fn test_source_event_wrapper_into_parts() {
588        let change = create_test_source_change();
589        let wrapper = SourceEventWrapper::new(
590            "test-source".to_string(),
591            SourceEvent::Change(change),
592            chrono::Utc::now(),
593        );
594
595        let (source_id, event, _timestamp, profiling, _sequence) = wrapper.into_parts();
596
597        assert_eq!(source_id, "test-source");
598        assert!(matches!(event, SourceEvent::Change(_)));
599        assert!(profiling.is_none());
600    }
601
602    #[test]
603    fn test_try_unwrap_arc_sole_owner() {
604        let change = create_test_source_change();
605        let wrapper = SourceEventWrapper::new(
606            "test-source".to_string(),
607            SourceEvent::Change(change),
608            chrono::Utc::now(),
609        );
610        let arc = Arc::new(wrapper);
611
612        // With sole ownership, try_unwrap_arc should succeed
613        let result = SourceEventWrapper::try_unwrap_arc(arc);
614        assert!(result.is_ok());
615
616        let (source_id, event, _timestamp, _profiling, _sequence) = result.unwrap();
617        assert_eq!(source_id, "test-source");
618        assert!(matches!(event, SourceEvent::Change(_)));
619    }
620
621    #[test]
622    fn test_try_unwrap_arc_shared() {
623        let change = create_test_source_change();
624        let wrapper = SourceEventWrapper::new(
625            "test-source".to_string(),
626            SourceEvent::Change(change),
627            chrono::Utc::now(),
628        );
629        let arc = Arc::new(wrapper);
630        let _arc2 = arc.clone(); // Create another reference
631
632        // With shared ownership, try_unwrap_arc should fail and return the Arc
633        let result = SourceEventWrapper::try_unwrap_arc(arc);
634        assert!(result.is_err());
635
636        // The returned Arc should still be valid
637        let returned_arc = result.unwrap_err();
638        assert_eq!(returned_arc.source_id, "test-source");
639    }
640
641    #[test]
642    fn test_zero_copy_extraction_path() {
643        // Simulate the zero-copy extraction path used in query processing
644        let change = create_test_source_change();
645        let wrapper = SourceEventWrapper::new(
646            "test-source".to_string(),
647            SourceEvent::Change(change),
648            chrono::Utc::now(),
649        );
650        let arc = Arc::new(wrapper);
651
652        // This is the zero-copy path - when we have sole ownership
653        let (source_id, event, _timestamp, _profiling, _sequence) =
654            match SourceEventWrapper::try_unwrap_arc(arc) {
655                Ok(parts) => parts,
656                Err(arc) => {
657                    // Fallback to cloning (would be needed in broadcast mode)
658                    (
659                        arc.source_id.clone(),
660                        arc.event.clone(),
661                        arc.timestamp,
662                        arc.profiling.clone(),
663                        arc.sequence,
664                    )
665                }
666            };
667
668        // Extract SourceChange from owned event (no clone!)
669        let source_change = match event {
670            SourceEvent::Change(change) => Some(change),
671            _ => None,
672        };
673
674        assert_eq!(source_id, "test-source");
675        assert!(source_change.is_some());
676    }
677
678    #[test]
679    fn test_source_event_wrapper_with_sequence() {
680        let change = create_test_source_change();
681        let wrapper = SourceEventWrapper::with_sequence(
682            "test-source".to_string(),
683            SourceEvent::Change(change),
684            chrono::Utc::now(),
685            42,
686            None,
687        );
688        assert_eq!(wrapper.sequence, Some(42));
689        assert!(wrapper.profiling.is_none());
690
691        let (_source_id, _event, _timestamp, _profiling, sequence) = wrapper.into_parts();
692        assert_eq!(sequence, Some(42));
693    }
694
695    #[test]
696    fn test_source_event_wrapper_new_has_no_sequence() {
697        let change = create_test_source_change();
698        let wrapper = SourceEventWrapper::new(
699            "test-source".to_string(),
700            SourceEvent::Change(change),
701            chrono::Utc::now(),
702        );
703        assert!(wrapper.sequence.is_none());
704    }
705
706    #[test]
707    fn test_subscription_response_with_position_handle() {
708        use std::sync::atomic::{AtomicU64, Ordering};
709        use std::sync::Arc;
710
711        let handle = Arc::new(AtomicU64::new(u64::MAX));
712        assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
713
714        // Verify the handle can be cloned and read (simulates source reading query's position)
715        let handle_clone = handle.clone();
716        handle.store(500, Ordering::Relaxed);
717        assert_eq!(handle_clone.load(Ordering::Relaxed), 500);
718    }
719
720    #[test]
721    fn test_subscription_settings_with_resume_from() {
722        use std::collections::HashSet;
723        let settings = crate::config::SourceSubscriptionSettings {
724            source_id: "test-source".to_string(),
725            enable_bootstrap: false,
726            query_id: "test-query".to_string(),
727            nodes: HashSet::new(),
728            relations: HashSet::new(),
729            resume_from: Some(500),
730            request_position_handle: true,
731        };
732        assert_eq!(settings.resume_from, Some(500));
733        assert!(settings.request_position_handle);
734    }
735}