1use crate::profiling::ProfilingMetadata;
16use bytes::Bytes;
17use drasi_core::models::SourceChange;
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::sync::atomic::AtomicU64;
21use std::sync::Arc;
22use tokio::sync::{broadcast, mpsc};
23
24pub trait Timestamped {
26 fn timestamp(&self) -> chrono::DateTime<chrono::Utc>;
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
33pub enum ComponentType {
34 Source,
35 Query,
36 Reaction,
37 BootstrapProvider,
38 IdentityProvider,
39}
40
41#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
166pub enum ComponentStatus {
167 Added,
169 Starting,
170 Running,
171 Stopping,
172 Stopped,
173 Removed,
175 Reconfiguring,
176 Error,
177}
178
179#[derive(Debug, Clone)]
181pub struct SourceChangeEvent {
182 pub source_id: String,
183 pub change: SourceChange,
184 pub timestamp: chrono::DateTime<chrono::Utc>,
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
189pub enum SourceControl {
190 Subscription {
192 query_id: String,
193 query_node_id: String,
194 node_labels: Vec<String>,
195 rel_labels: Vec<String>,
196 operation: ControlOperation,
197 },
198 FuturesDue,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
204pub enum ControlOperation {
205 Insert,
206 Update,
207 Delete,
208}
209
210#[derive(Debug, Clone)]
212pub enum SourceEvent {
213 Change(SourceChange),
215 Control(SourceControl),
217}
218
219#[derive(Debug, Clone)]
221pub struct SourceEventWrapper {
222 pub source_id: String,
223 pub event: SourceEvent,
224 pub timestamp: chrono::DateTime<chrono::Utc>,
225 pub profiling: Option<ProfilingMetadata>,
227 pub sequence: Option<u64>,
231 pub source_position: Option<Bytes>,
237}
238
239#[derive(Debug)]
244pub struct SourceEventParts {
245 pub source_id: String,
246 pub event: SourceEvent,
247 pub timestamp: chrono::DateTime<chrono::Utc>,
248 pub profiling: Option<ProfilingMetadata>,
249 pub sequence: Option<u64>,
250 pub source_position: Option<Bytes>,
251}
252
253impl SourceEventWrapper {
254 pub fn new(
256 source_id: String,
257 event: SourceEvent,
258 timestamp: chrono::DateTime<chrono::Utc>,
259 ) -> Self {
260 Self {
261 source_id,
262 event,
263 timestamp,
264 profiling: None,
265 sequence: None,
266 source_position: None,
267 }
268 }
269
270 pub fn with_profiling(
272 source_id: String,
273 event: SourceEvent,
274 timestamp: chrono::DateTime<chrono::Utc>,
275 profiling: ProfilingMetadata,
276 ) -> Self {
277 Self {
278 source_id,
279 event,
280 timestamp,
281 profiling: Some(profiling),
282 sequence: None,
283 source_position: None,
284 }
285 }
286
287 pub fn with_sequence(
289 source_id: String,
290 event: SourceEvent,
291 timestamp: chrono::DateTime<chrono::Utc>,
292 sequence: u64,
293 profiling: Option<ProfilingMetadata>,
294 ) -> Self {
295 Self {
296 source_id,
297 event,
298 timestamp,
299 profiling,
300 sequence: Some(sequence),
301 source_position: None,
302 }
303 }
304
305 pub fn set_source_position(&mut self, position: Bytes) {
308 self.source_position = Some(position);
309 }
310
311 pub fn into_parts(self) -> SourceEventParts {
314 SourceEventParts {
315 source_id: self.source_id,
316 event: self.event,
317 timestamp: self.timestamp,
318 profiling: self.profiling,
319 sequence: self.sequence,
320 source_position: self.source_position,
321 }
322 }
323
324 pub fn try_unwrap_arc(arc_self: Arc<Self>) -> Result<SourceEventParts, Arc<Self>> {
331 Arc::try_unwrap(arc_self).map(|wrapper| wrapper.into_parts())
332 }
333}
334
335impl Timestamped for SourceEventWrapper {
337 fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
338 self.timestamp
339 }
340}
341
342pub type ArcSourceEvent = Arc<SourceEventWrapper>;
344
345#[derive(Debug, Clone)]
347pub struct BootstrapEvent {
348 pub source_id: String,
349 pub change: SourceChange,
350 pub timestamp: chrono::DateTime<chrono::Utc>,
351 pub sequence: u64,
352}
353
354#[derive(Debug, Clone)]
356pub struct SubscriptionRequest {
357 pub query_id: String,
358 pub source_id: String,
359 pub enable_bootstrap: bool,
360 pub node_labels: Vec<String>,
361 pub relation_labels: Vec<String>,
362}
363
364pub struct SubscriptionResponse {
366 pub query_id: String,
367 pub source_id: String,
368 pub receiver: Box<dyn super::ChangeReceiver<SourceEventWrapper>>,
369 pub bootstrap_receiver: Option<BootstrapEventReceiver>,
370 pub position_handle: Option<Arc<AtomicU64>>,
376 pub bootstrap_result_receiver:
380 Option<tokio::sync::oneshot::Receiver<anyhow::Result<crate::bootstrap::BootstrapResult>>>,
381}
382
383pub struct QuerySubscriptionResponse {
385 pub query_id: String,
386 pub receiver: Box<dyn super::ChangeReceiver<QueryResult>>,
387}
388
389#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
396#[serde(tag = "type")]
397pub enum ResultDiff {
398 #[serde(rename = "ADD")]
399 Add {
400 data: serde_json::Value,
401 #[serde(default)]
402 row_signature: u64,
403 },
404 #[serde(rename = "DELETE")]
405 Delete {
406 data: serde_json::Value,
407 #[serde(default)]
408 row_signature: u64,
409 },
410 #[serde(rename = "UPDATE")]
411 Update {
412 data: serde_json::Value,
413 before: serde_json::Value,
414 after: serde_json::Value,
415 #[serde(skip_serializing_if = "Option::is_none")]
416 grouping_keys: Option<Vec<String>>,
417 #[serde(default)]
418 row_signature: u64,
419 },
420 #[serde(rename = "aggregation")]
421 Aggregation {
422 before: Option<serde_json::Value>,
423 after: serde_json::Value,
424 #[serde(default)]
425 row_signature: u64,
426 },
427 #[serde(rename = "noop")]
428 Noop,
429}
430
431#[derive(Debug, Clone, Serialize, Deserialize)]
436pub struct QueryResult {
437 pub query_id: String,
438 #[serde(default)]
442 pub sequence: u64,
443 pub timestamp: chrono::DateTime<chrono::Utc>,
444 pub results: Vec<ResultDiff>,
445 pub metadata: HashMap<String, serde_json::Value>,
446 #[serde(skip_serializing_if = "Option::is_none")]
448 pub profiling: Option<ProfilingMetadata>,
449}
450
451impl QueryResult {
452 pub fn new(
454 query_id: String,
455 sequence: u64,
456 timestamp: chrono::DateTime<chrono::Utc>,
457 results: Vec<ResultDiff>,
458 metadata: HashMap<String, serde_json::Value>,
459 ) -> Self {
460 Self {
461 query_id,
462 sequence,
463 timestamp,
464 results,
465 metadata,
466 profiling: None,
467 }
468 }
469
470 pub fn with_profiling(
472 query_id: String,
473 sequence: u64,
474 timestamp: chrono::DateTime<chrono::Utc>,
475 results: Vec<ResultDiff>,
476 metadata: HashMap<String, serde_json::Value>,
477 profiling: ProfilingMetadata,
478 ) -> Self {
479 Self {
480 query_id,
481 sequence,
482 timestamp,
483 results,
484 metadata,
485 profiling: Some(profiling),
486 }
487 }
488}
489
490impl Timestamped for QueryResult {
492 fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
493 self.timestamp
494 }
495}
496
497pub type ArcQueryResult = Arc<QueryResult>;
499
500#[derive(Debug, Clone, Serialize, Deserialize)]
505pub struct ComponentEvent {
506 pub component_id: String,
507 pub component_type: ComponentType,
508 pub status: ComponentStatus,
509 pub timestamp: chrono::DateTime<chrono::Utc>,
510 pub message: Option<String>,
511}
512
513#[derive(Debug, Clone, Serialize, Deserialize)]
515pub enum ControlMessage {
516 Start(String),
517 Stop(String),
518 Status(String),
519 Shutdown,
520}
521
522pub type ComponentEventBroadcastSender = broadcast::Sender<ComponentEvent>;
523pub type ComponentEventBroadcastReceiver = broadcast::Receiver<ComponentEvent>;
524pub type ComponentEventSender = mpsc::Sender<ComponentEvent>;
527pub type ComponentEventReceiver = mpsc::Receiver<ComponentEvent>;
528pub type ControlMessageReceiver = mpsc::Receiver<ControlMessage>;
529pub type ControlMessageSender = mpsc::Sender<ControlMessage>;
530
531pub type SourceBroadcastSender = broadcast::Sender<ArcSourceEvent>;
533pub type SourceBroadcastReceiver = broadcast::Receiver<ArcSourceEvent>;
534
535pub type QueryResultBroadcastSender = broadcast::Sender<ArcQueryResult>;
537pub type QueryResultBroadcastReceiver = broadcast::Receiver<ArcQueryResult>;
538
539pub type BootstrapEventSender = mpsc::Sender<BootstrapEvent>;
541pub type BootstrapEventReceiver = mpsc::Receiver<BootstrapEvent>;
542
543#[derive(Debug, Clone, Serialize, Deserialize)]
545pub enum ControlSignal {
546 Running { query_id: String },
548 Stopped { query_id: String },
550 Deleted { query_id: String },
552}
553
554#[derive(Debug, Clone)]
556pub struct ControlSignalWrapper {
557 pub signal: ControlSignal,
558 pub timestamp: chrono::DateTime<chrono::Utc>,
559 pub sequence_number: Option<u64>,
560}
561
562impl ControlSignalWrapper {
563 pub fn new(signal: ControlSignal) -> Self {
564 Self {
565 signal,
566 timestamp: chrono::Utc::now(),
567 sequence_number: None,
568 }
569 }
570
571 pub fn with_sequence(signal: ControlSignal, sequence_number: u64) -> Self {
572 Self {
573 signal,
574 timestamp: chrono::Utc::now(),
575 sequence_number: Some(sequence_number),
576 }
577 }
578}
579
580pub type ControlSignalReceiver = mpsc::Receiver<ControlSignalWrapper>;
581pub type ControlSignalSender = mpsc::Sender<ControlSignalWrapper>;
582
583pub struct EventChannels {
584 pub _control_tx: ControlMessageSender,
585 pub control_signal_tx: ControlSignalSender,
586}
587
588pub struct EventReceivers {
589 pub _control_rx: ControlMessageReceiver,
590 pub control_signal_rx: ControlSignalReceiver,
591}
592
593impl EventChannels {
594 pub fn new() -> (Self, EventReceivers) {
595 let (control_tx, control_rx) = mpsc::channel(100);
596 let (control_signal_tx, control_signal_rx) = mpsc::channel(100);
597
598 let channels = Self {
599 _control_tx: control_tx,
600 control_signal_tx,
601 };
602
603 let receivers = EventReceivers {
604 _control_rx: control_rx,
605 control_signal_rx,
606 };
607
608 (channels, receivers)
609 }
610}
611
612#[cfg(test)]
613mod tests {
614 use super::*;
615 use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
616
617 fn create_test_source_change() -> SourceChange {
618 let element = Element::Node {
619 metadata: ElementMetadata {
620 reference: ElementReference::new("TestSource", "test-node-1"),
621 labels: vec!["TestLabel".into()].into(),
622 effective_from: 1000,
623 },
624 properties: Default::default(),
625 };
626 SourceChange::Insert { element }
627 }
628
629 #[test]
630 fn test_source_event_wrapper_into_parts() {
631 let change = create_test_source_change();
632 let wrapper = SourceEventWrapper::new(
633 "test-source".to_string(),
634 SourceEvent::Change(change),
635 chrono::Utc::now(),
636 );
637
638 let parts = wrapper.into_parts();
639
640 assert_eq!(parts.source_id, "test-source");
641 assert!(matches!(parts.event, SourceEvent::Change(_)));
642 assert!(parts.profiling.is_none());
643 }
644
645 #[test]
646 fn test_try_unwrap_arc_sole_owner() {
647 let change = create_test_source_change();
648 let wrapper = SourceEventWrapper::new(
649 "test-source".to_string(),
650 SourceEvent::Change(change),
651 chrono::Utc::now(),
652 );
653 let arc = Arc::new(wrapper);
654
655 let result = SourceEventWrapper::try_unwrap_arc(arc);
657 assert!(result.is_ok());
658
659 let parts = result.unwrap();
660 assert_eq!(parts.source_id, "test-source");
661 assert!(matches!(parts.event, SourceEvent::Change(_)));
662 }
663
664 #[test]
665 fn test_try_unwrap_arc_shared() {
666 let change = create_test_source_change();
667 let wrapper = SourceEventWrapper::new(
668 "test-source".to_string(),
669 SourceEvent::Change(change),
670 chrono::Utc::now(),
671 );
672 let arc = Arc::new(wrapper);
673 let _arc2 = arc.clone(); let result = SourceEventWrapper::try_unwrap_arc(arc);
677 assert!(result.is_err());
678
679 let returned_arc = result.unwrap_err();
681 assert_eq!(returned_arc.source_id, "test-source");
682 }
683
684 #[test]
685 fn test_zero_copy_extraction_path() {
686 let change = create_test_source_change();
688 let wrapper = SourceEventWrapper::new(
689 "test-source".to_string(),
690 SourceEvent::Change(change),
691 chrono::Utc::now(),
692 );
693 let arc = Arc::new(wrapper);
694
695 let parts = match SourceEventWrapper::try_unwrap_arc(arc) {
697 Ok(parts) => parts,
698 Err(arc) => {
699 SourceEventParts {
701 source_id: arc.source_id.clone(),
702 event: arc.event.clone(),
703 timestamp: arc.timestamp,
704 profiling: arc.profiling.clone(),
705 sequence: arc.sequence,
706 source_position: arc.source_position.clone(),
707 }
708 }
709 };
710
711 let source_change = match parts.event {
713 SourceEvent::Change(change) => Some(change),
714 _ => None,
715 };
716
717 assert_eq!(parts.source_id, "test-source");
718 assert!(source_change.is_some());
719 }
720
721 #[test]
722 fn test_source_event_wrapper_with_sequence() {
723 let change = create_test_source_change();
724 let wrapper = SourceEventWrapper::with_sequence(
725 "test-source".to_string(),
726 SourceEvent::Change(change),
727 chrono::Utc::now(),
728 42,
729 None,
730 );
731 assert_eq!(wrapper.sequence, Some(42));
732 assert!(wrapper.profiling.is_none());
733
734 let parts = wrapper.into_parts();
735 assert_eq!(parts.sequence, Some(42));
736 }
737
738 #[test]
739 fn test_source_event_wrapper_new_has_no_sequence() {
740 let change = create_test_source_change();
741 let wrapper = SourceEventWrapper::new(
742 "test-source".to_string(),
743 SourceEvent::Change(change),
744 chrono::Utc::now(),
745 );
746 assert!(wrapper.sequence.is_none());
747 }
748
749 #[test]
750 fn test_subscription_response_with_position_handle() {
751 use std::sync::atomic::{AtomicU64, Ordering};
752 use std::sync::Arc;
753
754 let handle = Arc::new(AtomicU64::new(u64::MAX));
755 assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
756
757 let handle_clone = handle.clone();
759 handle.store(500, Ordering::Relaxed);
760 assert_eq!(handle_clone.load(Ordering::Relaxed), 500);
761 }
762
763 #[test]
764 fn test_subscription_settings_with_resume_from() {
765 use std::collections::HashSet;
766 let position_bytes = Bytes::from_static(&[0x01, 0x02, 0x03, 0x04]);
767 let settings = crate::config::SourceSubscriptionSettings {
768 source_id: "test-source".to_string(),
769 enable_bootstrap: false,
770 query_id: "test-query".to_string(),
771 nodes: HashSet::new(),
772 relations: HashSet::new(),
773 resume_from: Some(position_bytes.clone()),
774 request_position_handle: true,
775 last_sequence: None,
776 };
777 assert_eq!(settings.resume_from, Some(position_bytes));
778 assert!(settings.request_position_handle);
779 }
780}