Skip to main content

drasi_lib/channels/
events.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::profiling::ProfilingMetadata;
16use bytes::Bytes;
17use drasi_core::models::SourceChange;
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::sync::atomic::AtomicU64;
21use std::sync::Arc;
22use tokio::sync::{broadcast, mpsc};
23
24/// Trait for types that have a timestamp, required for priority queue ordering
25pub trait Timestamped {
26    fn timestamp(&self) -> chrono::DateTime<chrono::Utc>;
27}
28
29/// Type of Drasi component
30///
31/// Used for identifying component types in events and monitoring.
32#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
33pub enum ComponentType {
34    Source,
35    Query,
36    Reaction,
37    BootstrapProvider,
38    IdentityProvider,
39}
40
41/// Execution status of a Drasi component
42///
43/// `ComponentStatus` represents the current lifecycle state of sources, queries, and reactions.
44/// Components transition through these states during their lifecycle, from creation through
45/// execution to shutdown.
46///
47/// # Status Lifecycle
48///
49/// A typical component lifecycle follows this progression:
50///
51/// ```text
52/// Added → Starting → Running → Stopping → Stopped
53///              ↓                              ↓
54///            Error                         Removed
55/// ```
56///
57/// # Status Values
58///
59/// - **Added**: Component has been registered in the graph but not yet started
60/// - **Starting**: Component is initializing (connecting to resources, loading data, etc.)
61/// - **Running**: Component is actively processing (ingesting, querying, or delivering)
62/// - **Stopping**: Component is shutting down gracefully
63/// - **Stopped**: Component is not running (stopped after previously running)
64/// - **Removed**: Component has been removed from the graph
65/// - **Error**: Component encountered an error and cannot continue (see error_message)
66///
67/// # Usage
68///
69/// Status is available through runtime information methods on [`DrasiLib`](crate::DrasiLib):
70///
71/// - [`get_source_status()`](crate::DrasiLib::get_source_status)
72/// - [`get_query_status()`](crate::DrasiLib::get_query_status)
73/// - [`get_reaction_status()`](crate::DrasiLib::get_reaction_status)
74///
75/// And through runtime info structs:
76///
77/// - [`SourceRuntime`](crate::SourceRuntime)
78/// - [`QueryRuntime`](crate::QueryRuntime)
79/// - [`ReactionRuntime`](crate::ReactionRuntime)
80///
81/// # Examples
82///
83/// ## Monitoring Component Status
84///
85/// ```no_run
86/// use drasi_lib::{DrasiLib, ComponentStatus};
87///
88/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
89/// let core = DrasiLib::builder().with_id("my-server").build().await?;
90/// core.start().await?;
91///
92/// // Check source status
93/// let source_status = core.get_source_status("orders_db").await?;
94/// match source_status {
95///     ComponentStatus::Running => println!("Source is running"),
96///     ComponentStatus::Error => println!("Source has errors"),
97///     ComponentStatus::Starting => println!("Source is starting up"),
98///     _ => println!("Source status: {:?}", source_status),
99/// }
100///
101/// // Get detailed info with status
102/// let source_info = core.get_source_info("orders_db").await?;
103/// if source_info.status == ComponentStatus::Error {
104///     if let Some(error) = source_info.error_message {
105///         eprintln!("Error: {}", error);
106///     }
107/// }
108/// # Ok(())
109/// # }
110/// ```
111///
112/// ## Waiting for Component to Start
113///
114/// ```no_run
115/// use drasi_lib::{DrasiLib, ComponentStatus};
116/// use tokio::time::{sleep, Duration};
117///
118/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
119/// let core = DrasiLib::builder().with_id("my-server").build().await?;
120/// core.start_source("orders_db").await?;
121///
122/// // Poll until source is running
123/// loop {
124///     let status = core.get_source_status("orders_db").await?;
125///     match status {
126///         ComponentStatus::Running => break,
127///         ComponentStatus::Error => return Err("Source failed to start".into()),
128///         _ => sleep(Duration::from_millis(100)).await,
129///     }
130/// }
131/// println!("Source is now running");
132/// # Ok(())
133/// # }
134/// ```
135///
136/// ## Checking All Components
137///
138/// ```no_run
139/// use drasi_lib::{DrasiLib, ComponentStatus};
140///
141/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
142/// let core = DrasiLib::builder().with_id("my-server").build().await?;
143/// core.start().await?;
144///
145/// // Check all sources
146/// let sources = core.list_sources().await?;
147/// for (id, status) in sources {
148///     println!("Source {}: {:?}", id, status);
149/// }
150///
151/// // Check all queries
152/// let queries = core.list_queries().await?;
153/// for (id, status) in queries {
154///     println!("Query {}: {:?}", id, status);
155/// }
156///
157/// // Check all reactions
158/// let reactions = core.list_reactions().await?;
159/// for (id, status) in reactions {
160///     println!("Reaction {}: {:?}", id, status);
161/// }
162/// # Ok(())
163/// # }
164/// ```
165#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
166pub enum ComponentStatus {
167    /// Component has been registered in the graph but not yet started.
168    Added,
169    Starting,
170    Running,
171    Stopping,
172    Stopped,
173    /// Component has been removed from the graph.
174    Removed,
175    Reconfiguring,
176    Error,
177}
178
179/// A source change event with metadata for dispatching to queries.
180#[derive(Debug, Clone)]
181pub struct SourceChangeEvent {
182    pub source_id: String,
183    pub change: SourceChange,
184    pub timestamp: chrono::DateTime<chrono::Utc>,
185}
186
187/// Control events from sources for query coordination
188#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
189pub enum SourceControl {
190    /// Query subscription control event
191    Subscription {
192        query_id: String,
193        query_node_id: String,
194        node_labels: Vec<String>,
195        rel_labels: Vec<String>,
196        operation: ControlOperation,
197    },
198    /// Signal from FutureQueueSource that one or more future items are due.
199    FuturesDue,
200}
201
202/// Control operation types
203#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
204pub enum ControlOperation {
205    Insert,
206    Update,
207    Delete,
208}
209
210/// Unified event envelope carrying both data changes and control messages
211#[derive(Debug, Clone)]
212pub enum SourceEvent {
213    /// Data change event from source
214    Change(SourceChange),
215    /// Control event for query coordination
216    Control(SourceControl),
217}
218
219/// Wrapper for source events with metadata
220#[derive(Debug, Clone)]
221pub struct SourceEventWrapper {
222    pub source_id: String,
223    pub event: SourceEvent,
224    pub timestamp: chrono::DateTime<chrono::Utc>,
225    /// Optional profiling metadata for performance tracking
226    pub profiling: Option<ProfilingMetadata>,
227    /// Monotonic sequence number assigned by the framework.
228    /// Used for ordering, watermarks, gap detection, and dedup.
229    /// `None` for volatile sources that don't support replay.
230    pub sequence: Option<u64>,
231    /// Opaque source position bytes for stream resumption on restart.
232    /// Only the source can interpret these bytes — the framework persists
233    /// them alongside the sequence and returns them on restart via
234    /// subscribe(resume_from: ...).
235    /// `None` for volatile sources that don't support replay.
236    pub source_position: Option<Bytes>,
237}
238
239/// Decomposed parts of a [`SourceEventWrapper`], returned by [`SourceEventWrapper::into_parts()`].
240///
241/// Using a named struct instead of a tuple makes call sites resilient to
242/// field reordering and easier to evolve with new fields.
243#[derive(Debug)]
244pub struct SourceEventParts {
245    pub source_id: String,
246    pub event: SourceEvent,
247    pub timestamp: chrono::DateTime<chrono::Utc>,
248    pub profiling: Option<ProfilingMetadata>,
249    pub sequence: Option<u64>,
250    pub source_position: Option<Bytes>,
251}
252
253impl SourceEventWrapper {
254    /// Create a new SourceEventWrapper without profiling
255    pub fn new(
256        source_id: String,
257        event: SourceEvent,
258        timestamp: chrono::DateTime<chrono::Utc>,
259    ) -> Self {
260        Self {
261            source_id,
262            event,
263            timestamp,
264            profiling: None,
265            sequence: None,
266            source_position: None,
267        }
268    }
269
270    /// Create a new SourceEventWrapper with profiling metadata
271    pub fn with_profiling(
272        source_id: String,
273        event: SourceEvent,
274        timestamp: chrono::DateTime<chrono::Utc>,
275        profiling: ProfilingMetadata,
276    ) -> Self {
277        Self {
278            source_id,
279            event,
280            timestamp,
281            profiling: Some(profiling),
282            sequence: None,
283            source_position: None,
284        }
285    }
286
287    /// Create a new SourceEventWrapper with a sequence number (and optional profiling)
288    pub fn with_sequence(
289        source_id: String,
290        event: SourceEvent,
291        timestamp: chrono::DateTime<chrono::Utc>,
292        sequence: u64,
293        profiling: Option<ProfilingMetadata>,
294    ) -> Self {
295        Self {
296            source_id,
297            event,
298            timestamp,
299            profiling,
300            sequence: Some(sequence),
301            source_position: None,
302        }
303    }
304
305    /// Set the opaque source position bytes for stream resumption.
306    /// Only called by source plugins to attach their native position token.
307    pub fn set_source_position(&mut self, position: Bytes) {
308        self.source_position = Some(position);
309    }
310
311    /// Consume this wrapper and return its components as a named struct.
312    /// This enables zero-copy extraction when the wrapper has sole ownership.
313    pub fn into_parts(self) -> SourceEventParts {
314        SourceEventParts {
315            source_id: self.source_id,
316            event: self.event,
317            timestamp: self.timestamp,
318            profiling: self.profiling,
319            sequence: self.sequence,
320            source_position: self.source_position,
321        }
322    }
323
324    /// Try to extract components from an Arc<SourceEventWrapper>.
325    /// Uses Arc::try_unwrap to avoid cloning when we have sole ownership.
326    /// Returns Ok with owned components if sole owner, Err with Arc back if shared.
327    ///
328    /// This enables zero-copy in Channel dispatch mode (single consumer per event)
329    /// while still working correctly in Broadcast mode (cloning required).
330    pub fn try_unwrap_arc(arc_self: Arc<Self>) -> Result<SourceEventParts, Arc<Self>> {
331        Arc::try_unwrap(arc_self).map(|wrapper| wrapper.into_parts())
332    }
333}
334
335// Implement Timestamped for SourceEventWrapper for use in generic priority queue
336impl Timestamped for SourceEventWrapper {
337    fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
338        self.timestamp
339    }
340}
341
342/// Arc-wrapped SourceEventWrapper for zero-copy distribution
343pub type ArcSourceEvent = Arc<SourceEventWrapper>;
344
345/// Bootstrap event wrapper for dedicated bootstrap channels
346#[derive(Debug, Clone)]
347pub struct BootstrapEvent {
348    pub source_id: String,
349    pub change: SourceChange,
350    pub timestamp: chrono::DateTime<chrono::Utc>,
351    pub sequence: u64,
352}
353
354/// Subscription request from Query to Source
355#[derive(Debug, Clone)]
356pub struct SubscriptionRequest {
357    pub query_id: String,
358    pub source_id: String,
359    pub enable_bootstrap: bool,
360    pub node_labels: Vec<String>,
361    pub relation_labels: Vec<String>,
362}
363
364/// Subscription response from Source to Query
365pub struct SubscriptionResponse {
366    pub query_id: String,
367    pub source_id: String,
368    pub receiver: Box<dyn super::ChangeReceiver<SourceEventWrapper>>,
369    pub bootstrap_receiver: Option<BootstrapEventReceiver>,
370    /// Shared handle for the query to report its last durably-processed sequence position.
371    /// Created by replay-capable sources when `request_position_handle` is true.
372    /// The query writes to this atomically after each commit; the source reads the
373    /// minimum across all subscribers to advance its upstream cursor.
374    /// Sources should initialize this to `u64::MAX` (meaning "no position confirmed yet").
375    pub position_handle: Option<Arc<AtomicU64>>,
376}
377
378/// Subscription response from Query to Reaction
379pub struct QuerySubscriptionResponse {
380    pub query_id: String,
381    pub receiver: Box<dyn super::ChangeReceiver<QueryResult>>,
382}
383
384/// Typed result diff emitted by continuous queries.
385///
386/// Each non-`Noop` variant carries a `row_signature` stamped by the core engine:
387/// the path-solver binding hash for non-aggregating rows, and the grouping-key hash
388/// for aggregations. Downstream consumers use it as the row identity in place of
389/// JSON-equality matching.
390#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
391#[serde(tag = "type")]
392pub enum ResultDiff {
393    #[serde(rename = "ADD")]
394    Add {
395        data: serde_json::Value,
396        #[serde(default)]
397        row_signature: u64,
398    },
399    #[serde(rename = "DELETE")]
400    Delete {
401        data: serde_json::Value,
402        #[serde(default)]
403        row_signature: u64,
404    },
405    #[serde(rename = "UPDATE")]
406    Update {
407        data: serde_json::Value,
408        before: serde_json::Value,
409        after: serde_json::Value,
410        #[serde(skip_serializing_if = "Option::is_none")]
411        grouping_keys: Option<Vec<String>>,
412        #[serde(default)]
413        row_signature: u64,
414    },
415    #[serde(rename = "aggregation")]
416    Aggregation {
417        before: Option<serde_json::Value>,
418        after: serde_json::Value,
419        #[serde(default)]
420        row_signature: u64,
421    },
422    #[serde(rename = "noop")]
423    Noop,
424}
425
426/// Result emitted by a continuous query when data changes.
427///
428/// Contains the diff (added, updated, deleted rows) plus metadata and
429/// optional profiling information. Dispatched to reactions via the priority queue.
430#[derive(Debug, Clone, Serialize, Deserialize)]
431pub struct QueryResult {
432    pub query_id: String,
433    /// Monotonic per-query sequence number identifying this emission.
434    /// Reactions persist this in their checkpoint, the outbox is keyed by it,
435    /// and the bootstrap APIs return it as `as_of_sequence`.
436    #[serde(default)]
437    pub sequence: u64,
438    pub timestamp: chrono::DateTime<chrono::Utc>,
439    pub results: Vec<ResultDiff>,
440    pub metadata: HashMap<String, serde_json::Value>,
441    /// Optional profiling metadata for performance tracking
442    #[serde(skip_serializing_if = "Option::is_none")]
443    pub profiling: Option<ProfilingMetadata>,
444}
445
446impl QueryResult {
447    /// Create a new QueryResult without profiling
448    pub fn new(
449        query_id: String,
450        sequence: u64,
451        timestamp: chrono::DateTime<chrono::Utc>,
452        results: Vec<ResultDiff>,
453        metadata: HashMap<String, serde_json::Value>,
454    ) -> Self {
455        Self {
456            query_id,
457            sequence,
458            timestamp,
459            results,
460            metadata,
461            profiling: None,
462        }
463    }
464
465    /// Create a new QueryResult with profiling metadata
466    pub fn with_profiling(
467        query_id: String,
468        sequence: u64,
469        timestamp: chrono::DateTime<chrono::Utc>,
470        results: Vec<ResultDiff>,
471        metadata: HashMap<String, serde_json::Value>,
472        profiling: ProfilingMetadata,
473    ) -> Self {
474        Self {
475            query_id,
476            sequence,
477            timestamp,
478            results,
479            metadata,
480            profiling: Some(profiling),
481        }
482    }
483}
484
485// Implement Timestamped for QueryResult for use in generic priority queue
486impl Timestamped for QueryResult {
487    fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
488        self.timestamp
489    }
490}
491
492/// Arc-wrapped QueryResult for zero-copy distribution
493pub type ArcQueryResult = Arc<QueryResult>;
494
495/// Lifecycle event emitted when a component's status changes.
496///
497/// Broadcast via the component event channel to all subscribers.
498/// Used for monitoring, logging, and reactive lifecycle coordination.
499#[derive(Debug, Clone, Serialize, Deserialize)]
500pub struct ComponentEvent {
501    pub component_id: String,
502    pub component_type: ComponentType,
503    pub status: ComponentStatus,
504    pub timestamp: chrono::DateTime<chrono::Utc>,
505    pub message: Option<String>,
506}
507
508/// Control messages for component lifecycle management.
509#[derive(Debug, Clone, Serialize, Deserialize)]
510pub enum ControlMessage {
511    Start(String),
512    Stop(String),
513    Status(String),
514    Shutdown,
515}
516
517pub type ComponentEventBroadcastSender = broadcast::Sender<ComponentEvent>;
518pub type ComponentEventBroadcastReceiver = broadcast::Receiver<ComponentEvent>;
519/// Backward-compatible mpsc channel types used by host-sdk plugin callbacks.
520/// New code should use `ComponentUpdateSender` from `component_graph` instead.
521pub type ComponentEventSender = mpsc::Sender<ComponentEvent>;
522pub type ComponentEventReceiver = mpsc::Receiver<ComponentEvent>;
523pub type ControlMessageReceiver = mpsc::Receiver<ControlMessage>;
524pub type ControlMessageSender = mpsc::Sender<ControlMessage>;
525
526// Broadcast channel types for zero-copy event distribution
527pub type SourceBroadcastSender = broadcast::Sender<ArcSourceEvent>;
528pub type SourceBroadcastReceiver = broadcast::Receiver<ArcSourceEvent>;
529
530// Broadcast channel types for zero-copy query result distribution
531pub type QueryResultBroadcastSender = broadcast::Sender<ArcQueryResult>;
532pub type QueryResultBroadcastReceiver = broadcast::Receiver<ArcQueryResult>;
533
534// Bootstrap channel types for dedicated bootstrap data delivery
535pub type BootstrapEventSender = mpsc::Sender<BootstrapEvent>;
536pub type BootstrapEventReceiver = mpsc::Receiver<BootstrapEvent>;
537
538/// Control signals for coordination
539#[derive(Debug, Clone, Serialize, Deserialize)]
540pub enum ControlSignal {
541    /// Query has entered running state
542    Running { query_id: String },
543    /// Query has stopped
544    Stopped { query_id: String },
545    /// Query has been deleted
546    Deleted { query_id: String },
547}
548
549/// Wrapper for control signals with metadata
550#[derive(Debug, Clone)]
551pub struct ControlSignalWrapper {
552    pub signal: ControlSignal,
553    pub timestamp: chrono::DateTime<chrono::Utc>,
554    pub sequence_number: Option<u64>,
555}
556
557impl ControlSignalWrapper {
558    pub fn new(signal: ControlSignal) -> Self {
559        Self {
560            signal,
561            timestamp: chrono::Utc::now(),
562            sequence_number: None,
563        }
564    }
565
566    pub fn with_sequence(signal: ControlSignal, sequence_number: u64) -> Self {
567        Self {
568            signal,
569            timestamp: chrono::Utc::now(),
570            sequence_number: Some(sequence_number),
571        }
572    }
573}
574
575pub type ControlSignalReceiver = mpsc::Receiver<ControlSignalWrapper>;
576pub type ControlSignalSender = mpsc::Sender<ControlSignalWrapper>;
577
578pub struct EventChannels {
579    pub _control_tx: ControlMessageSender,
580    pub control_signal_tx: ControlSignalSender,
581}
582
583pub struct EventReceivers {
584    pub _control_rx: ControlMessageReceiver,
585    pub control_signal_rx: ControlSignalReceiver,
586}
587
588impl EventChannels {
589    pub fn new() -> (Self, EventReceivers) {
590        let (control_tx, control_rx) = mpsc::channel(100);
591        let (control_signal_tx, control_signal_rx) = mpsc::channel(100);
592
593        let channels = Self {
594            _control_tx: control_tx,
595            control_signal_tx,
596        };
597
598        let receivers = EventReceivers {
599            _control_rx: control_rx,
600            control_signal_rx,
601        };
602
603        (channels, receivers)
604    }
605}
606
607#[cfg(test)]
608mod tests {
609    use super::*;
610    use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
611
612    fn create_test_source_change() -> SourceChange {
613        let element = Element::Node {
614            metadata: ElementMetadata {
615                reference: ElementReference::new("TestSource", "test-node-1"),
616                labels: vec!["TestLabel".into()].into(),
617                effective_from: 1000,
618            },
619            properties: Default::default(),
620        };
621        SourceChange::Insert { element }
622    }
623
624    #[test]
625    fn test_source_event_wrapper_into_parts() {
626        let change = create_test_source_change();
627        let wrapper = SourceEventWrapper::new(
628            "test-source".to_string(),
629            SourceEvent::Change(change),
630            chrono::Utc::now(),
631        );
632
633        let parts = wrapper.into_parts();
634
635        assert_eq!(parts.source_id, "test-source");
636        assert!(matches!(parts.event, SourceEvent::Change(_)));
637        assert!(parts.profiling.is_none());
638    }
639
640    #[test]
641    fn test_try_unwrap_arc_sole_owner() {
642        let change = create_test_source_change();
643        let wrapper = SourceEventWrapper::new(
644            "test-source".to_string(),
645            SourceEvent::Change(change),
646            chrono::Utc::now(),
647        );
648        let arc = Arc::new(wrapper);
649
650        // With sole ownership, try_unwrap_arc should succeed
651        let result = SourceEventWrapper::try_unwrap_arc(arc);
652        assert!(result.is_ok());
653
654        let parts = result.unwrap();
655        assert_eq!(parts.source_id, "test-source");
656        assert!(matches!(parts.event, SourceEvent::Change(_)));
657    }
658
659    #[test]
660    fn test_try_unwrap_arc_shared() {
661        let change = create_test_source_change();
662        let wrapper = SourceEventWrapper::new(
663            "test-source".to_string(),
664            SourceEvent::Change(change),
665            chrono::Utc::now(),
666        );
667        let arc = Arc::new(wrapper);
668        let _arc2 = arc.clone(); // Create another reference
669
670        // With shared ownership, try_unwrap_arc should fail and return the Arc
671        let result = SourceEventWrapper::try_unwrap_arc(arc);
672        assert!(result.is_err());
673
674        // The returned Arc should still be valid
675        let returned_arc = result.unwrap_err();
676        assert_eq!(returned_arc.source_id, "test-source");
677    }
678
679    #[test]
680    fn test_zero_copy_extraction_path() {
681        // Simulate the zero-copy extraction path used in query processing
682        let change = create_test_source_change();
683        let wrapper = SourceEventWrapper::new(
684            "test-source".to_string(),
685            SourceEvent::Change(change),
686            chrono::Utc::now(),
687        );
688        let arc = Arc::new(wrapper);
689
690        // This is the zero-copy path - when we have sole ownership
691        let parts = match SourceEventWrapper::try_unwrap_arc(arc) {
692            Ok(parts) => parts,
693            Err(arc) => {
694                // Fallback to cloning (would be needed in broadcast mode)
695                SourceEventParts {
696                    source_id: arc.source_id.clone(),
697                    event: arc.event.clone(),
698                    timestamp: arc.timestamp,
699                    profiling: arc.profiling.clone(),
700                    sequence: arc.sequence,
701                    source_position: arc.source_position.clone(),
702                }
703            }
704        };
705
706        // Extract SourceChange from owned event (no clone!)
707        let source_change = match parts.event {
708            SourceEvent::Change(change) => Some(change),
709            _ => None,
710        };
711
712        assert_eq!(parts.source_id, "test-source");
713        assert!(source_change.is_some());
714    }
715
716    #[test]
717    fn test_source_event_wrapper_with_sequence() {
718        let change = create_test_source_change();
719        let wrapper = SourceEventWrapper::with_sequence(
720            "test-source".to_string(),
721            SourceEvent::Change(change),
722            chrono::Utc::now(),
723            42,
724            None,
725        );
726        assert_eq!(wrapper.sequence, Some(42));
727        assert!(wrapper.profiling.is_none());
728
729        let parts = wrapper.into_parts();
730        assert_eq!(parts.sequence, Some(42));
731    }
732
733    #[test]
734    fn test_source_event_wrapper_new_has_no_sequence() {
735        let change = create_test_source_change();
736        let wrapper = SourceEventWrapper::new(
737            "test-source".to_string(),
738            SourceEvent::Change(change),
739            chrono::Utc::now(),
740        );
741        assert!(wrapper.sequence.is_none());
742    }
743
744    #[test]
745    fn test_subscription_response_with_position_handle() {
746        use std::sync::atomic::{AtomicU64, Ordering};
747        use std::sync::Arc;
748
749        let handle = Arc::new(AtomicU64::new(u64::MAX));
750        assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
751
752        // Verify the handle can be cloned and read (simulates source reading query's position)
753        let handle_clone = handle.clone();
754        handle.store(500, Ordering::Relaxed);
755        assert_eq!(handle_clone.load(Ordering::Relaxed), 500);
756    }
757
758    #[test]
759    fn test_subscription_settings_with_resume_from() {
760        use std::collections::HashSet;
761        let position_bytes = Bytes::from_static(&[0x01, 0x02, 0x03, 0x04]);
762        let settings = crate::config::SourceSubscriptionSettings {
763            source_id: "test-source".to_string(),
764            enable_bootstrap: false,
765            query_id: "test-query".to_string(),
766            nodes: HashSet::new(),
767            relations: HashSet::new(),
768            resume_from: Some(position_bytes.clone()),
769            request_position_handle: true,
770            last_sequence: None,
771        };
772        assert_eq!(settings.resume_from, Some(position_bytes));
773        assert!(settings.request_position_handle);
774    }
775}