Skip to main content

drasi_lib/channels/
events.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::profiling::ProfilingMetadata;
16use bytes::Bytes;
17use drasi_core::models::SourceChange;
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::sync::atomic::AtomicU64;
21use std::sync::Arc;
22use tokio::sync::{broadcast, mpsc};
23
24/// Trait for types that have a timestamp, required for priority queue ordering
25pub trait Timestamped {
26    fn timestamp(&self) -> chrono::DateTime<chrono::Utc>;
27}
28
29/// Type of Drasi component
30///
31/// Used for identifying component types in events and monitoring.
32#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
33pub enum ComponentType {
34    Source,
35    Query,
36    Reaction,
37    BootstrapProvider,
38    IdentityProvider,
39}
40
41/// Execution status of a Drasi component
42///
43/// `ComponentStatus` represents the current lifecycle state of sources, queries, and reactions.
44/// Components transition through these states during their lifecycle, from creation through
45/// execution to shutdown.
46///
47/// # Status Lifecycle
48///
49/// A typical component lifecycle follows this progression:
50///
51/// ```text
52/// Added → Starting → Running → Stopping → Stopped
53///              ↓                              ↓
54///            Error                         Removed
55/// ```
56///
57/// # Status Values
58///
59/// - **Added**: Component has been registered in the graph but not yet started
60/// - **Starting**: Component is initializing (connecting to resources, loading data, etc.)
61/// - **Running**: Component is actively processing (ingesting, querying, or delivering)
62/// - **Stopping**: Component is shutting down gracefully
63/// - **Stopped**: Component is not running (stopped after previously running)
64/// - **Removed**: Component has been removed from the graph
65/// - **Error**: Component encountered an error and cannot continue (see error_message)
66///
67/// # Usage
68///
69/// Status is available through runtime information methods on [`DrasiLib`](crate::DrasiLib):
70///
71/// - [`get_source_status()`](crate::DrasiLib::get_source_status)
72/// - [`get_query_status()`](crate::DrasiLib::get_query_status)
73/// - [`get_reaction_status()`](crate::DrasiLib::get_reaction_status)
74///
75/// And through runtime info structs:
76///
77/// - [`SourceRuntime`](crate::SourceRuntime)
78/// - [`QueryRuntime`](crate::QueryRuntime)
79/// - [`ReactionRuntime`](crate::ReactionRuntime)
80///
81/// # Examples
82///
83/// ## Monitoring Component Status
84///
85/// ```no_run
86/// use drasi_lib::{DrasiLib, ComponentStatus};
87///
88/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
89/// let core = DrasiLib::builder().with_id("my-server").build().await?;
90/// core.start().await?;
91///
92/// // Check source status
93/// let source_status = core.get_source_status("orders_db").await?;
94/// match source_status {
95///     ComponentStatus::Running => println!("Source is running"),
96///     ComponentStatus::Error => println!("Source has errors"),
97///     ComponentStatus::Starting => println!("Source is starting up"),
98///     _ => println!("Source status: {:?}", source_status),
99/// }
100///
101/// // Get detailed info with status
102/// let source_info = core.get_source_info("orders_db").await?;
103/// if source_info.status == ComponentStatus::Error {
104///     if let Some(error) = source_info.error_message {
105///         eprintln!("Error: {}", error);
106///     }
107/// }
108/// # Ok(())
109/// # }
110/// ```
111///
112/// ## Waiting for Component to Start
113///
114/// ```no_run
115/// use drasi_lib::{DrasiLib, ComponentStatus};
116/// use tokio::time::{sleep, Duration};
117///
118/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
119/// let core = DrasiLib::builder().with_id("my-server").build().await?;
120/// core.start_source("orders_db").await?;
121///
122/// // Poll until source is running
123/// loop {
124///     let status = core.get_source_status("orders_db").await?;
125///     match status {
126///         ComponentStatus::Running => break,
127///         ComponentStatus::Error => return Err("Source failed to start".into()),
128///         _ => sleep(Duration::from_millis(100)).await,
129///     }
130/// }
131/// println!("Source is now running");
132/// # Ok(())
133/// # }
134/// ```
135///
136/// ## Checking All Components
137///
138/// ```no_run
139/// use drasi_lib::{DrasiLib, ComponentStatus};
140///
141/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
142/// let core = DrasiLib::builder().with_id("my-server").build().await?;
143/// core.start().await?;
144///
145/// // Check all sources
146/// let sources = core.list_sources().await?;
147/// for (id, status) in sources {
148///     println!("Source {}: {:?}", id, status);
149/// }
150///
151/// // Check all queries
152/// let queries = core.list_queries().await?;
153/// for (id, status) in queries {
154///     println!("Query {}: {:?}", id, status);
155/// }
156///
157/// // Check all reactions
158/// let reactions = core.list_reactions().await?;
159/// for (id, status) in reactions {
160///     println!("Reaction {}: {:?}", id, status);
161/// }
162/// # Ok(())
163/// # }
164/// ```
165#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
166pub enum ComponentStatus {
167    /// Component has been registered in the graph but not yet started.
168    Added,
169    Starting,
170    Running,
171    Stopping,
172    Stopped,
173    /// Component has been removed from the graph.
174    Removed,
175    Reconfiguring,
176    Error,
177}
178
179/// A source change event with metadata for dispatching to queries.
180#[derive(Debug, Clone)]
181pub struct SourceChangeEvent {
182    pub source_id: String,
183    pub change: SourceChange,
184    pub timestamp: chrono::DateTime<chrono::Utc>,
185}
186
187/// Control events from sources for query coordination
188#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
189pub enum SourceControl {
190    /// Query subscription control event
191    Subscription {
192        query_id: String,
193        query_node_id: String,
194        node_labels: Vec<String>,
195        rel_labels: Vec<String>,
196        operation: ControlOperation,
197    },
198    /// Signal from FutureQueueSource that one or more future items are due.
199    FuturesDue,
200}
201
202/// Control operation types
203#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
204pub enum ControlOperation {
205    Insert,
206    Update,
207    Delete,
208}
209
210/// Unified event envelope carrying both data changes and control messages
211#[derive(Debug, Clone)]
212pub enum SourceEvent {
213    /// Data change event from source
214    Change(SourceChange),
215    /// Control event for query coordination
216    Control(SourceControl),
217}
218
219/// Wrapper for source events with metadata
220#[derive(Debug, Clone)]
221pub struct SourceEventWrapper {
222    pub source_id: String,
223    pub event: SourceEvent,
224    pub timestamp: chrono::DateTime<chrono::Utc>,
225    /// Optional profiling metadata for performance tracking
226    pub profiling: Option<ProfilingMetadata>,
227    /// Monotonic sequence number assigned by the framework.
228    /// Used for ordering, watermarks, gap detection, and dedup.
229    /// `None` for volatile sources that don't support replay.
230    pub sequence: Option<u64>,
231    /// Opaque source position bytes for stream resumption on restart.
232    /// Only the source can interpret these bytes — the framework persists
233    /// them alongside the sequence and returns them on restart via
234    /// subscribe(resume_from: ...).
235    /// `None` for volatile sources that don't support replay.
236    pub source_position: Option<Bytes>,
237}
238
239/// Decomposed parts of a [`SourceEventWrapper`], returned by [`SourceEventWrapper::into_parts()`].
240///
241/// Using a named struct instead of a tuple makes call sites resilient to
242/// field reordering and easier to evolve with new fields.
243#[derive(Debug)]
244pub struct SourceEventParts {
245    pub source_id: String,
246    pub event: SourceEvent,
247    pub timestamp: chrono::DateTime<chrono::Utc>,
248    pub profiling: Option<ProfilingMetadata>,
249    pub sequence: Option<u64>,
250    pub source_position: Option<Bytes>,
251}
252
253impl SourceEventWrapper {
254    /// Create a new SourceEventWrapper without profiling
255    pub fn new(
256        source_id: String,
257        event: SourceEvent,
258        timestamp: chrono::DateTime<chrono::Utc>,
259    ) -> Self {
260        Self {
261            source_id,
262            event,
263            timestamp,
264            profiling: None,
265            sequence: None,
266            source_position: None,
267        }
268    }
269
270    /// Create a new SourceEventWrapper with profiling metadata
271    pub fn with_profiling(
272        source_id: String,
273        event: SourceEvent,
274        timestamp: chrono::DateTime<chrono::Utc>,
275        profiling: ProfilingMetadata,
276    ) -> Self {
277        Self {
278            source_id,
279            event,
280            timestamp,
281            profiling: Some(profiling),
282            sequence: None,
283            source_position: None,
284        }
285    }
286
287    /// Create a new SourceEventWrapper with a sequence number (and optional profiling)
288    pub fn with_sequence(
289        source_id: String,
290        event: SourceEvent,
291        timestamp: chrono::DateTime<chrono::Utc>,
292        sequence: u64,
293        profiling: Option<ProfilingMetadata>,
294    ) -> Self {
295        Self {
296            source_id,
297            event,
298            timestamp,
299            profiling,
300            sequence: Some(sequence),
301            source_position: None,
302        }
303    }
304
305    /// Set the opaque source position bytes for stream resumption.
306    /// Only called by source plugins to attach their native position token.
307    pub fn set_source_position(&mut self, position: Bytes) {
308        self.source_position = Some(position);
309    }
310
311    /// Consume this wrapper and return its components as a named struct.
312    /// This enables zero-copy extraction when the wrapper has sole ownership.
313    pub fn into_parts(self) -> SourceEventParts {
314        SourceEventParts {
315            source_id: self.source_id,
316            event: self.event,
317            timestamp: self.timestamp,
318            profiling: self.profiling,
319            sequence: self.sequence,
320            source_position: self.source_position,
321        }
322    }
323
324    /// Try to extract components from an Arc<SourceEventWrapper>.
325    /// Uses Arc::try_unwrap to avoid cloning when we have sole ownership.
326    /// Returns Ok with owned components if sole owner, Err with Arc back if shared.
327    ///
328    /// This enables zero-copy in Channel dispatch mode (single consumer per event)
329    /// while still working correctly in Broadcast mode (cloning required).
330    pub fn try_unwrap_arc(arc_self: Arc<Self>) -> Result<SourceEventParts, Arc<Self>> {
331        Arc::try_unwrap(arc_self).map(|wrapper| wrapper.into_parts())
332    }
333}
334
335// Implement Timestamped for SourceEventWrapper for use in generic priority queue
336impl Timestamped for SourceEventWrapper {
337    fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
338        self.timestamp
339    }
340}
341
342/// Arc-wrapped SourceEventWrapper for zero-copy distribution
343pub type ArcSourceEvent = Arc<SourceEventWrapper>;
344
345/// Bootstrap event wrapper for dedicated bootstrap channels
346#[derive(Debug, Clone)]
347pub struct BootstrapEvent {
348    pub source_id: String,
349    pub change: SourceChange,
350    pub timestamp: chrono::DateTime<chrono::Utc>,
351    pub sequence: u64,
352}
353
354/// Subscription request from Query to Source
355#[derive(Debug, Clone)]
356pub struct SubscriptionRequest {
357    pub query_id: String,
358    pub source_id: String,
359    pub enable_bootstrap: bool,
360    pub node_labels: Vec<String>,
361    pub relation_labels: Vec<String>,
362}
363
364/// Subscription response from Source to Query
365pub struct SubscriptionResponse {
366    pub query_id: String,
367    pub source_id: String,
368    pub receiver: Box<dyn super::ChangeReceiver<SourceEventWrapper>>,
369    pub bootstrap_receiver: Option<BootstrapEventReceiver>,
370    /// Shared handle for the query to report its last durably-processed sequence position.
371    /// Created by replay-capable sources when `request_position_handle` is true.
372    /// The query writes to this atomically after each commit; the source reads the
373    /// minimum across all subscribers to advance its upstream cursor.
374    /// Sources should initialize this to `u64::MAX` (meaning "no position confirmed yet").
375    pub position_handle: Option<Arc<AtomicU64>>,
376    /// Receives the `BootstrapResult` after bootstrap completes, carrying handover
377    /// metadata (`last_sequence`, `sequences_aligned`) for the bootstrap-to-streaming
378    /// transition. `None` when bootstrap is not active or for FFI/plugin sources.
379    pub bootstrap_result_receiver:
380        Option<tokio::sync::oneshot::Receiver<anyhow::Result<crate::bootstrap::BootstrapResult>>>,
381}
382
383/// Subscription response from Query to Reaction
384pub struct QuerySubscriptionResponse {
385    pub query_id: String,
386    pub receiver: Box<dyn super::ChangeReceiver<QueryResult>>,
387}
388
389/// Typed result diff emitted by continuous queries.
390///
391/// Each non-`Noop` variant carries a `row_signature` stamped by the core engine:
392/// the path-solver binding hash for non-aggregating rows, and the grouping-key hash
393/// for aggregations. Downstream consumers use it as the row identity in place of
394/// JSON-equality matching.
395#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
396#[serde(tag = "type")]
397pub enum ResultDiff {
398    #[serde(rename = "ADD")]
399    Add {
400        data: serde_json::Value,
401        #[serde(default)]
402        row_signature: u64,
403    },
404    #[serde(rename = "DELETE")]
405    Delete {
406        data: serde_json::Value,
407        #[serde(default)]
408        row_signature: u64,
409    },
410    #[serde(rename = "UPDATE")]
411    Update {
412        data: serde_json::Value,
413        before: serde_json::Value,
414        after: serde_json::Value,
415        #[serde(skip_serializing_if = "Option::is_none")]
416        grouping_keys: Option<Vec<String>>,
417        #[serde(default)]
418        row_signature: u64,
419    },
420    #[serde(rename = "aggregation")]
421    Aggregation {
422        before: Option<serde_json::Value>,
423        after: serde_json::Value,
424        #[serde(default)]
425        row_signature: u64,
426    },
427    #[serde(rename = "noop")]
428    Noop,
429}
430
431/// Result emitted by a continuous query when data changes.
432///
433/// Contains the diff (added, updated, deleted rows) plus metadata and
434/// optional profiling information. Dispatched to reactions via the priority queue.
435#[derive(Debug, Clone, Serialize, Deserialize)]
436pub struct QueryResult {
437    pub query_id: String,
438    /// Monotonic per-query sequence number identifying this emission.
439    /// Reactions persist this in their checkpoint, the outbox is keyed by it,
440    /// and the bootstrap APIs return it as `as_of_sequence`.
441    #[serde(default)]
442    pub sequence: u64,
443    pub timestamp: chrono::DateTime<chrono::Utc>,
444    pub results: Vec<ResultDiff>,
445    pub metadata: HashMap<String, serde_json::Value>,
446    /// Optional profiling metadata for performance tracking
447    #[serde(skip_serializing_if = "Option::is_none")]
448    pub profiling: Option<ProfilingMetadata>,
449}
450
451impl QueryResult {
452    /// Create a new QueryResult without profiling
453    pub fn new(
454        query_id: String,
455        sequence: u64,
456        timestamp: chrono::DateTime<chrono::Utc>,
457        results: Vec<ResultDiff>,
458        metadata: HashMap<String, serde_json::Value>,
459    ) -> Self {
460        Self {
461            query_id,
462            sequence,
463            timestamp,
464            results,
465            metadata,
466            profiling: None,
467        }
468    }
469
470    /// Create a new QueryResult with profiling metadata
471    pub fn with_profiling(
472        query_id: String,
473        sequence: u64,
474        timestamp: chrono::DateTime<chrono::Utc>,
475        results: Vec<ResultDiff>,
476        metadata: HashMap<String, serde_json::Value>,
477        profiling: ProfilingMetadata,
478    ) -> Self {
479        Self {
480            query_id,
481            sequence,
482            timestamp,
483            results,
484            metadata,
485            profiling: Some(profiling),
486        }
487    }
488}
489
490// Implement Timestamped for QueryResult for use in generic priority queue
491impl Timestamped for QueryResult {
492    fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
493        self.timestamp
494    }
495}
496
497/// Arc-wrapped QueryResult for zero-copy distribution
498pub type ArcQueryResult = Arc<QueryResult>;
499
500/// Lifecycle event emitted when a component's status changes.
501///
502/// Broadcast via the component event channel to all subscribers.
503/// Used for monitoring, logging, and reactive lifecycle coordination.
504#[derive(Debug, Clone, Serialize, Deserialize)]
505pub struct ComponentEvent {
506    pub component_id: String,
507    pub component_type: ComponentType,
508    pub status: ComponentStatus,
509    pub timestamp: chrono::DateTime<chrono::Utc>,
510    pub message: Option<String>,
511}
512
513/// Control messages for component lifecycle management.
514#[derive(Debug, Clone, Serialize, Deserialize)]
515pub enum ControlMessage {
516    Start(String),
517    Stop(String),
518    Status(String),
519    Shutdown,
520}
521
522pub type ComponentEventBroadcastSender = broadcast::Sender<ComponentEvent>;
523pub type ComponentEventBroadcastReceiver = broadcast::Receiver<ComponentEvent>;
524/// Backward-compatible mpsc channel types used by host-sdk plugin callbacks.
525/// New code should use `ComponentUpdateSender` from `component_graph` instead.
526pub type ComponentEventSender = mpsc::Sender<ComponentEvent>;
527pub type ComponentEventReceiver = mpsc::Receiver<ComponentEvent>;
528pub type ControlMessageReceiver = mpsc::Receiver<ControlMessage>;
529pub type ControlMessageSender = mpsc::Sender<ControlMessage>;
530
531// Broadcast channel types for zero-copy event distribution
532pub type SourceBroadcastSender = broadcast::Sender<ArcSourceEvent>;
533pub type SourceBroadcastReceiver = broadcast::Receiver<ArcSourceEvent>;
534
535// Broadcast channel types for zero-copy query result distribution
536pub type QueryResultBroadcastSender = broadcast::Sender<ArcQueryResult>;
537pub type QueryResultBroadcastReceiver = broadcast::Receiver<ArcQueryResult>;
538
539// Bootstrap channel types for dedicated bootstrap data delivery
540pub type BootstrapEventSender = mpsc::Sender<BootstrapEvent>;
541pub type BootstrapEventReceiver = mpsc::Receiver<BootstrapEvent>;
542
543/// Control signals for coordination
544#[derive(Debug, Clone, Serialize, Deserialize)]
545pub enum ControlSignal {
546    /// Query has entered running state
547    Running { query_id: String },
548    /// Query has stopped
549    Stopped { query_id: String },
550    /// Query has been deleted
551    Deleted { query_id: String },
552}
553
554/// Wrapper for control signals with metadata
555#[derive(Debug, Clone)]
556pub struct ControlSignalWrapper {
557    pub signal: ControlSignal,
558    pub timestamp: chrono::DateTime<chrono::Utc>,
559    pub sequence_number: Option<u64>,
560}
561
562impl ControlSignalWrapper {
563    pub fn new(signal: ControlSignal) -> Self {
564        Self {
565            signal,
566            timestamp: chrono::Utc::now(),
567            sequence_number: None,
568        }
569    }
570
571    pub fn with_sequence(signal: ControlSignal, sequence_number: u64) -> Self {
572        Self {
573            signal,
574            timestamp: chrono::Utc::now(),
575            sequence_number: Some(sequence_number),
576        }
577    }
578}
579
580pub type ControlSignalReceiver = mpsc::Receiver<ControlSignalWrapper>;
581pub type ControlSignalSender = mpsc::Sender<ControlSignalWrapper>;
582
583pub struct EventChannels {
584    pub _control_tx: ControlMessageSender,
585    pub control_signal_tx: ControlSignalSender,
586}
587
588pub struct EventReceivers {
589    pub _control_rx: ControlMessageReceiver,
590    pub control_signal_rx: ControlSignalReceiver,
591}
592
593impl EventChannels {
594    pub fn new() -> (Self, EventReceivers) {
595        let (control_tx, control_rx) = mpsc::channel(100);
596        let (control_signal_tx, control_signal_rx) = mpsc::channel(100);
597
598        let channels = Self {
599            _control_tx: control_tx,
600            control_signal_tx,
601        };
602
603        let receivers = EventReceivers {
604            _control_rx: control_rx,
605            control_signal_rx,
606        };
607
608        (channels, receivers)
609    }
610}
611
612#[cfg(test)]
613mod tests {
614    use super::*;
615    use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
616
617    fn create_test_source_change() -> SourceChange {
618        let element = Element::Node {
619            metadata: ElementMetadata {
620                reference: ElementReference::new("TestSource", "test-node-1"),
621                labels: vec!["TestLabel".into()].into(),
622                effective_from: 1000,
623            },
624            properties: Default::default(),
625        };
626        SourceChange::Insert { element }
627    }
628
629    #[test]
630    fn test_source_event_wrapper_into_parts() {
631        let change = create_test_source_change();
632        let wrapper = SourceEventWrapper::new(
633            "test-source".to_string(),
634            SourceEvent::Change(change),
635            chrono::Utc::now(),
636        );
637
638        let parts = wrapper.into_parts();
639
640        assert_eq!(parts.source_id, "test-source");
641        assert!(matches!(parts.event, SourceEvent::Change(_)));
642        assert!(parts.profiling.is_none());
643    }
644
645    #[test]
646    fn test_try_unwrap_arc_sole_owner() {
647        let change = create_test_source_change();
648        let wrapper = SourceEventWrapper::new(
649            "test-source".to_string(),
650            SourceEvent::Change(change),
651            chrono::Utc::now(),
652        );
653        let arc = Arc::new(wrapper);
654
655        // With sole ownership, try_unwrap_arc should succeed
656        let result = SourceEventWrapper::try_unwrap_arc(arc);
657        assert!(result.is_ok());
658
659        let parts = result.unwrap();
660        assert_eq!(parts.source_id, "test-source");
661        assert!(matches!(parts.event, SourceEvent::Change(_)));
662    }
663
664    #[test]
665    fn test_try_unwrap_arc_shared() {
666        let change = create_test_source_change();
667        let wrapper = SourceEventWrapper::new(
668            "test-source".to_string(),
669            SourceEvent::Change(change),
670            chrono::Utc::now(),
671        );
672        let arc = Arc::new(wrapper);
673        let _arc2 = arc.clone(); // Create another reference
674
675        // With shared ownership, try_unwrap_arc should fail and return the Arc
676        let result = SourceEventWrapper::try_unwrap_arc(arc);
677        assert!(result.is_err());
678
679        // The returned Arc should still be valid
680        let returned_arc = result.unwrap_err();
681        assert_eq!(returned_arc.source_id, "test-source");
682    }
683
684    #[test]
685    fn test_zero_copy_extraction_path() {
686        // Simulate the zero-copy extraction path used in query processing
687        let change = create_test_source_change();
688        let wrapper = SourceEventWrapper::new(
689            "test-source".to_string(),
690            SourceEvent::Change(change),
691            chrono::Utc::now(),
692        );
693        let arc = Arc::new(wrapper);
694
695        // This is the zero-copy path - when we have sole ownership
696        let parts = match SourceEventWrapper::try_unwrap_arc(arc) {
697            Ok(parts) => parts,
698            Err(arc) => {
699                // Fallback to cloning (would be needed in broadcast mode)
700                SourceEventParts {
701                    source_id: arc.source_id.clone(),
702                    event: arc.event.clone(),
703                    timestamp: arc.timestamp,
704                    profiling: arc.profiling.clone(),
705                    sequence: arc.sequence,
706                    source_position: arc.source_position.clone(),
707                }
708            }
709        };
710
711        // Extract SourceChange from owned event (no clone!)
712        let source_change = match parts.event {
713            SourceEvent::Change(change) => Some(change),
714            _ => None,
715        };
716
717        assert_eq!(parts.source_id, "test-source");
718        assert!(source_change.is_some());
719    }
720
721    #[test]
722    fn test_source_event_wrapper_with_sequence() {
723        let change = create_test_source_change();
724        let wrapper = SourceEventWrapper::with_sequence(
725            "test-source".to_string(),
726            SourceEvent::Change(change),
727            chrono::Utc::now(),
728            42,
729            None,
730        );
731        assert_eq!(wrapper.sequence, Some(42));
732        assert!(wrapper.profiling.is_none());
733
734        let parts = wrapper.into_parts();
735        assert_eq!(parts.sequence, Some(42));
736    }
737
738    #[test]
739    fn test_source_event_wrapper_new_has_no_sequence() {
740        let change = create_test_source_change();
741        let wrapper = SourceEventWrapper::new(
742            "test-source".to_string(),
743            SourceEvent::Change(change),
744            chrono::Utc::now(),
745        );
746        assert!(wrapper.sequence.is_none());
747    }
748
749    #[test]
750    fn test_subscription_response_with_position_handle() {
751        use std::sync::atomic::{AtomicU64, Ordering};
752        use std::sync::Arc;
753
754        let handle = Arc::new(AtomicU64::new(u64::MAX));
755        assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
756
757        // Verify the handle can be cloned and read (simulates source reading query's position)
758        let handle_clone = handle.clone();
759        handle.store(500, Ordering::Relaxed);
760        assert_eq!(handle_clone.load(Ordering::Relaxed), 500);
761    }
762
763    #[test]
764    fn test_subscription_settings_with_resume_from() {
765        use std::collections::HashSet;
766        let position_bytes = Bytes::from_static(&[0x01, 0x02, 0x03, 0x04]);
767        let settings = crate::config::SourceSubscriptionSettings {
768            source_id: "test-source".to_string(),
769            enable_bootstrap: false,
770            query_id: "test-query".to_string(),
771            nodes: HashSet::new(),
772            relations: HashSet::new(),
773            resume_from: Some(position_bytes.clone()),
774            request_position_handle: true,
775            last_sequence: None,
776        };
777        assert_eq!(settings.resume_from, Some(position_bytes));
778        assert!(settings.request_position_handle);
779    }
780}