1use crate::profiling::ProfilingMetadata;
16use bytes::Bytes;
17use drasi_core::models::SourceChange;
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::sync::atomic::AtomicU64;
21use std::sync::Arc;
22use tokio::sync::{broadcast, mpsc};
23
24pub trait Timestamped {
26 fn timestamp(&self) -> chrono::DateTime<chrono::Utc>;
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
33pub enum ComponentType {
34 Source,
35 Query,
36 Reaction,
37 BootstrapProvider,
38 IdentityProvider,
39}
40
41#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
166pub enum ComponentStatus {
167 Added,
169 Starting,
170 Running,
171 Stopping,
172 Stopped,
173 Removed,
175 Reconfiguring,
176 Error,
177}
178
179#[derive(Debug, Clone)]
181pub struct SourceChangeEvent {
182 pub source_id: String,
183 pub change: SourceChange,
184 pub timestamp: chrono::DateTime<chrono::Utc>,
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
189pub enum SourceControl {
190 Subscription {
192 query_id: String,
193 query_node_id: String,
194 node_labels: Vec<String>,
195 rel_labels: Vec<String>,
196 operation: ControlOperation,
197 },
198 FuturesDue,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
204pub enum ControlOperation {
205 Insert,
206 Update,
207 Delete,
208}
209
210#[derive(Debug, Clone)]
212pub enum SourceEvent {
213 Change(SourceChange),
215 Control(SourceControl),
217}
218
219#[derive(Debug, Clone)]
221pub struct SourceEventWrapper {
222 pub source_id: String,
223 pub event: SourceEvent,
224 pub timestamp: chrono::DateTime<chrono::Utc>,
225 pub profiling: Option<ProfilingMetadata>,
227 pub sequence: Option<u64>,
231 pub source_position: Option<Bytes>,
237}
238
239#[derive(Debug)]
244pub struct SourceEventParts {
245 pub source_id: String,
246 pub event: SourceEvent,
247 pub timestamp: chrono::DateTime<chrono::Utc>,
248 pub profiling: Option<ProfilingMetadata>,
249 pub sequence: Option<u64>,
250 pub source_position: Option<Bytes>,
251}
252
253impl SourceEventWrapper {
254 pub fn new(
256 source_id: String,
257 event: SourceEvent,
258 timestamp: chrono::DateTime<chrono::Utc>,
259 ) -> Self {
260 Self {
261 source_id,
262 event,
263 timestamp,
264 profiling: None,
265 sequence: None,
266 source_position: None,
267 }
268 }
269
270 pub fn with_profiling(
272 source_id: String,
273 event: SourceEvent,
274 timestamp: chrono::DateTime<chrono::Utc>,
275 profiling: ProfilingMetadata,
276 ) -> Self {
277 Self {
278 source_id,
279 event,
280 timestamp,
281 profiling: Some(profiling),
282 sequence: None,
283 source_position: None,
284 }
285 }
286
287 pub fn with_sequence(
289 source_id: String,
290 event: SourceEvent,
291 timestamp: chrono::DateTime<chrono::Utc>,
292 sequence: u64,
293 profiling: Option<ProfilingMetadata>,
294 ) -> Self {
295 Self {
296 source_id,
297 event,
298 timestamp,
299 profiling,
300 sequence: Some(sequence),
301 source_position: None,
302 }
303 }
304
305 pub fn set_source_position(&mut self, position: Bytes) {
308 self.source_position = Some(position);
309 }
310
311 pub fn into_parts(self) -> SourceEventParts {
314 SourceEventParts {
315 source_id: self.source_id,
316 event: self.event,
317 timestamp: self.timestamp,
318 profiling: self.profiling,
319 sequence: self.sequence,
320 source_position: self.source_position,
321 }
322 }
323
324 pub fn try_unwrap_arc(arc_self: Arc<Self>) -> Result<SourceEventParts, Arc<Self>> {
331 Arc::try_unwrap(arc_self).map(|wrapper| wrapper.into_parts())
332 }
333}
334
335impl Timestamped for SourceEventWrapper {
337 fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
338 self.timestamp
339 }
340}
341
342pub type ArcSourceEvent = Arc<SourceEventWrapper>;
344
345#[derive(Debug, Clone)]
347pub struct BootstrapEvent {
348 pub source_id: String,
349 pub change: SourceChange,
350 pub timestamp: chrono::DateTime<chrono::Utc>,
351 pub sequence: u64,
352}
353
354#[derive(Debug, Clone)]
356pub struct SubscriptionRequest {
357 pub query_id: String,
358 pub source_id: String,
359 pub enable_bootstrap: bool,
360 pub node_labels: Vec<String>,
361 pub relation_labels: Vec<String>,
362}
363
364pub struct SubscriptionResponse {
366 pub query_id: String,
367 pub source_id: String,
368 pub receiver: Box<dyn super::ChangeReceiver<SourceEventWrapper>>,
369 pub bootstrap_receiver: Option<BootstrapEventReceiver>,
370 pub position_handle: Option<Arc<AtomicU64>>,
376}
377
378pub struct QuerySubscriptionResponse {
380 pub query_id: String,
381 pub receiver: Box<dyn super::ChangeReceiver<QueryResult>>,
382}
383
384#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
391#[serde(tag = "type")]
392pub enum ResultDiff {
393 #[serde(rename = "ADD")]
394 Add {
395 data: serde_json::Value,
396 #[serde(default)]
397 row_signature: u64,
398 },
399 #[serde(rename = "DELETE")]
400 Delete {
401 data: serde_json::Value,
402 #[serde(default)]
403 row_signature: u64,
404 },
405 #[serde(rename = "UPDATE")]
406 Update {
407 data: serde_json::Value,
408 before: serde_json::Value,
409 after: serde_json::Value,
410 #[serde(skip_serializing_if = "Option::is_none")]
411 grouping_keys: Option<Vec<String>>,
412 #[serde(default)]
413 row_signature: u64,
414 },
415 #[serde(rename = "aggregation")]
416 Aggregation {
417 before: Option<serde_json::Value>,
418 after: serde_json::Value,
419 #[serde(default)]
420 row_signature: u64,
421 },
422 #[serde(rename = "noop")]
423 Noop,
424}
425
426#[derive(Debug, Clone, Serialize, Deserialize)]
431pub struct QueryResult {
432 pub query_id: String,
433 #[serde(default)]
437 pub sequence: u64,
438 pub timestamp: chrono::DateTime<chrono::Utc>,
439 pub results: Vec<ResultDiff>,
440 pub metadata: HashMap<String, serde_json::Value>,
441 #[serde(skip_serializing_if = "Option::is_none")]
443 pub profiling: Option<ProfilingMetadata>,
444}
445
446impl QueryResult {
447 pub fn new(
449 query_id: String,
450 sequence: u64,
451 timestamp: chrono::DateTime<chrono::Utc>,
452 results: Vec<ResultDiff>,
453 metadata: HashMap<String, serde_json::Value>,
454 ) -> Self {
455 Self {
456 query_id,
457 sequence,
458 timestamp,
459 results,
460 metadata,
461 profiling: None,
462 }
463 }
464
465 pub fn with_profiling(
467 query_id: String,
468 sequence: u64,
469 timestamp: chrono::DateTime<chrono::Utc>,
470 results: Vec<ResultDiff>,
471 metadata: HashMap<String, serde_json::Value>,
472 profiling: ProfilingMetadata,
473 ) -> Self {
474 Self {
475 query_id,
476 sequence,
477 timestamp,
478 results,
479 metadata,
480 profiling: Some(profiling),
481 }
482 }
483}
484
485impl Timestamped for QueryResult {
487 fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
488 self.timestamp
489 }
490}
491
492pub type ArcQueryResult = Arc<QueryResult>;
494
495#[derive(Debug, Clone, Serialize, Deserialize)]
500pub struct ComponentEvent {
501 pub component_id: String,
502 pub component_type: ComponentType,
503 pub status: ComponentStatus,
504 pub timestamp: chrono::DateTime<chrono::Utc>,
505 pub message: Option<String>,
506}
507
508#[derive(Debug, Clone, Serialize, Deserialize)]
510pub enum ControlMessage {
511 Start(String),
512 Stop(String),
513 Status(String),
514 Shutdown,
515}
516
517pub type ComponentEventBroadcastSender = broadcast::Sender<ComponentEvent>;
518pub type ComponentEventBroadcastReceiver = broadcast::Receiver<ComponentEvent>;
519pub type ComponentEventSender = mpsc::Sender<ComponentEvent>;
522pub type ComponentEventReceiver = mpsc::Receiver<ComponentEvent>;
523pub type ControlMessageReceiver = mpsc::Receiver<ControlMessage>;
524pub type ControlMessageSender = mpsc::Sender<ControlMessage>;
525
526pub type SourceBroadcastSender = broadcast::Sender<ArcSourceEvent>;
528pub type SourceBroadcastReceiver = broadcast::Receiver<ArcSourceEvent>;
529
530pub type QueryResultBroadcastSender = broadcast::Sender<ArcQueryResult>;
532pub type QueryResultBroadcastReceiver = broadcast::Receiver<ArcQueryResult>;
533
534pub type BootstrapEventSender = mpsc::Sender<BootstrapEvent>;
536pub type BootstrapEventReceiver = mpsc::Receiver<BootstrapEvent>;
537
538#[derive(Debug, Clone, Serialize, Deserialize)]
540pub enum ControlSignal {
541 Running { query_id: String },
543 Stopped { query_id: String },
545 Deleted { query_id: String },
547}
548
549#[derive(Debug, Clone)]
551pub struct ControlSignalWrapper {
552 pub signal: ControlSignal,
553 pub timestamp: chrono::DateTime<chrono::Utc>,
554 pub sequence_number: Option<u64>,
555}
556
557impl ControlSignalWrapper {
558 pub fn new(signal: ControlSignal) -> Self {
559 Self {
560 signal,
561 timestamp: chrono::Utc::now(),
562 sequence_number: None,
563 }
564 }
565
566 pub fn with_sequence(signal: ControlSignal, sequence_number: u64) -> Self {
567 Self {
568 signal,
569 timestamp: chrono::Utc::now(),
570 sequence_number: Some(sequence_number),
571 }
572 }
573}
574
575pub type ControlSignalReceiver = mpsc::Receiver<ControlSignalWrapper>;
576pub type ControlSignalSender = mpsc::Sender<ControlSignalWrapper>;
577
578pub struct EventChannels {
579 pub _control_tx: ControlMessageSender,
580 pub control_signal_tx: ControlSignalSender,
581}
582
583pub struct EventReceivers {
584 pub _control_rx: ControlMessageReceiver,
585 pub control_signal_rx: ControlSignalReceiver,
586}
587
588impl EventChannels {
589 pub fn new() -> (Self, EventReceivers) {
590 let (control_tx, control_rx) = mpsc::channel(100);
591 let (control_signal_tx, control_signal_rx) = mpsc::channel(100);
592
593 let channels = Self {
594 _control_tx: control_tx,
595 control_signal_tx,
596 };
597
598 let receivers = EventReceivers {
599 _control_rx: control_rx,
600 control_signal_rx,
601 };
602
603 (channels, receivers)
604 }
605}
606
607#[cfg(test)]
608mod tests {
609 use super::*;
610 use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
611
612 fn create_test_source_change() -> SourceChange {
613 let element = Element::Node {
614 metadata: ElementMetadata {
615 reference: ElementReference::new("TestSource", "test-node-1"),
616 labels: vec!["TestLabel".into()].into(),
617 effective_from: 1000,
618 },
619 properties: Default::default(),
620 };
621 SourceChange::Insert { element }
622 }
623
624 #[test]
625 fn test_source_event_wrapper_into_parts() {
626 let change = create_test_source_change();
627 let wrapper = SourceEventWrapper::new(
628 "test-source".to_string(),
629 SourceEvent::Change(change),
630 chrono::Utc::now(),
631 );
632
633 let parts = wrapper.into_parts();
634
635 assert_eq!(parts.source_id, "test-source");
636 assert!(matches!(parts.event, SourceEvent::Change(_)));
637 assert!(parts.profiling.is_none());
638 }
639
640 #[test]
641 fn test_try_unwrap_arc_sole_owner() {
642 let change = create_test_source_change();
643 let wrapper = SourceEventWrapper::new(
644 "test-source".to_string(),
645 SourceEvent::Change(change),
646 chrono::Utc::now(),
647 );
648 let arc = Arc::new(wrapper);
649
650 let result = SourceEventWrapper::try_unwrap_arc(arc);
652 assert!(result.is_ok());
653
654 let parts = result.unwrap();
655 assert_eq!(parts.source_id, "test-source");
656 assert!(matches!(parts.event, SourceEvent::Change(_)));
657 }
658
659 #[test]
660 fn test_try_unwrap_arc_shared() {
661 let change = create_test_source_change();
662 let wrapper = SourceEventWrapper::new(
663 "test-source".to_string(),
664 SourceEvent::Change(change),
665 chrono::Utc::now(),
666 );
667 let arc = Arc::new(wrapper);
668 let _arc2 = arc.clone(); let result = SourceEventWrapper::try_unwrap_arc(arc);
672 assert!(result.is_err());
673
674 let returned_arc = result.unwrap_err();
676 assert_eq!(returned_arc.source_id, "test-source");
677 }
678
679 #[test]
680 fn test_zero_copy_extraction_path() {
681 let change = create_test_source_change();
683 let wrapper = SourceEventWrapper::new(
684 "test-source".to_string(),
685 SourceEvent::Change(change),
686 chrono::Utc::now(),
687 );
688 let arc = Arc::new(wrapper);
689
690 let parts = match SourceEventWrapper::try_unwrap_arc(arc) {
692 Ok(parts) => parts,
693 Err(arc) => {
694 SourceEventParts {
696 source_id: arc.source_id.clone(),
697 event: arc.event.clone(),
698 timestamp: arc.timestamp,
699 profiling: arc.profiling.clone(),
700 sequence: arc.sequence,
701 source_position: arc.source_position.clone(),
702 }
703 }
704 };
705
706 let source_change = match parts.event {
708 SourceEvent::Change(change) => Some(change),
709 _ => None,
710 };
711
712 assert_eq!(parts.source_id, "test-source");
713 assert!(source_change.is_some());
714 }
715
716 #[test]
717 fn test_source_event_wrapper_with_sequence() {
718 let change = create_test_source_change();
719 let wrapper = SourceEventWrapper::with_sequence(
720 "test-source".to_string(),
721 SourceEvent::Change(change),
722 chrono::Utc::now(),
723 42,
724 None,
725 );
726 assert_eq!(wrapper.sequence, Some(42));
727 assert!(wrapper.profiling.is_none());
728
729 let parts = wrapper.into_parts();
730 assert_eq!(parts.sequence, Some(42));
731 }
732
733 #[test]
734 fn test_source_event_wrapper_new_has_no_sequence() {
735 let change = create_test_source_change();
736 let wrapper = SourceEventWrapper::new(
737 "test-source".to_string(),
738 SourceEvent::Change(change),
739 chrono::Utc::now(),
740 );
741 assert!(wrapper.sequence.is_none());
742 }
743
744 #[test]
745 fn test_subscription_response_with_position_handle() {
746 use std::sync::atomic::{AtomicU64, Ordering};
747 use std::sync::Arc;
748
749 let handle = Arc::new(AtomicU64::new(u64::MAX));
750 assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
751
752 let handle_clone = handle.clone();
754 handle.store(500, Ordering::Relaxed);
755 assert_eq!(handle_clone.load(Ordering::Relaxed), 500);
756 }
757
758 #[test]
759 fn test_subscription_settings_with_resume_from() {
760 use std::collections::HashSet;
761 let position_bytes = Bytes::from_static(&[0x01, 0x02, 0x03, 0x04]);
762 let settings = crate::config::SourceSubscriptionSettings {
763 source_id: "test-source".to_string(),
764 enable_bootstrap: false,
765 query_id: "test-query".to_string(),
766 nodes: HashSet::new(),
767 relations: HashSet::new(),
768 resume_from: Some(position_bytes.clone()),
769 request_position_handle: true,
770 last_sequence: None,
771 };
772 assert_eq!(settings.resume_from, Some(position_bytes));
773 assert!(settings.request_position_handle);
774 }
775}