1use crate::profiling::ProfilingMetadata;
16use drasi_core::models::SourceChange;
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::sync::atomic::AtomicU64;
20use std::sync::Arc;
21use tokio::sync::{broadcast, mpsc};
22
23pub trait Timestamped {
25 fn timestamp(&self) -> chrono::DateTime<chrono::Utc>;
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
32pub enum ComponentType {
33 Source,
34 Query,
35 Reaction,
36 BootstrapProvider,
37 IdentityProvider,
38}
39
40#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
165pub enum ComponentStatus {
166 Added,
168 Starting,
169 Running,
170 Stopping,
171 Stopped,
172 Removed,
174 Reconfiguring,
175 Error,
176}
177
178#[derive(Debug, Clone)]
180pub struct SourceChangeEvent {
181 pub source_id: String,
182 pub change: SourceChange,
183 pub timestamp: chrono::DateTime<chrono::Utc>,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
188pub enum SourceControl {
189 Subscription {
191 query_id: String,
192 query_node_id: String,
193 node_labels: Vec<String>,
194 rel_labels: Vec<String>,
195 operation: ControlOperation,
196 },
197 FuturesDue,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
203pub enum ControlOperation {
204 Insert,
205 Update,
206 Delete,
207}
208
209#[derive(Debug, Clone)]
211pub enum SourceEvent {
212 Change(SourceChange),
214 Control(SourceControl),
216}
217
218#[derive(Debug, Clone)]
220pub struct SourceEventWrapper {
221 pub source_id: String,
222 pub event: SourceEvent,
223 pub timestamp: chrono::DateTime<chrono::Utc>,
224 pub profiling: Option<ProfilingMetadata>,
226 pub sequence: Option<u64>,
230}
231
232impl SourceEventWrapper {
233 pub fn new(
235 source_id: String,
236 event: SourceEvent,
237 timestamp: chrono::DateTime<chrono::Utc>,
238 ) -> Self {
239 Self {
240 source_id,
241 event,
242 timestamp,
243 profiling: None,
244 sequence: None,
245 }
246 }
247
248 pub fn with_profiling(
250 source_id: String,
251 event: SourceEvent,
252 timestamp: chrono::DateTime<chrono::Utc>,
253 profiling: ProfilingMetadata,
254 ) -> Self {
255 Self {
256 source_id,
257 event,
258 timestamp,
259 profiling: Some(profiling),
260 sequence: None,
261 }
262 }
263
264 pub fn with_sequence(
266 source_id: String,
267 event: SourceEvent,
268 timestamp: chrono::DateTime<chrono::Utc>,
269 sequence: u64,
270 profiling: Option<ProfilingMetadata>,
271 ) -> Self {
272 Self {
273 source_id,
274 event,
275 timestamp,
276 profiling,
277 sequence: Some(sequence),
278 }
279 }
280
281 pub fn into_parts(
284 self,
285 ) -> (
286 String,
287 SourceEvent,
288 chrono::DateTime<chrono::Utc>,
289 Option<ProfilingMetadata>,
290 Option<u64>,
291 ) {
292 (
293 self.source_id,
294 self.event,
295 self.timestamp,
296 self.profiling,
297 self.sequence,
298 )
299 }
300
301 pub fn try_unwrap_arc(
308 arc_self: Arc<Self>,
309 ) -> Result<
310 (
311 String,
312 SourceEvent,
313 chrono::DateTime<chrono::Utc>,
314 Option<ProfilingMetadata>,
315 Option<u64>,
316 ),
317 Arc<Self>,
318 > {
319 Arc::try_unwrap(arc_self).map(|wrapper| wrapper.into_parts())
320 }
321}
322
323impl Timestamped for SourceEventWrapper {
325 fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
326 self.timestamp
327 }
328}
329
330pub type ArcSourceEvent = Arc<SourceEventWrapper>;
332
333#[derive(Debug, Clone)]
335pub struct BootstrapEvent {
336 pub source_id: String,
337 pub change: SourceChange,
338 pub timestamp: chrono::DateTime<chrono::Utc>,
339 pub sequence: u64,
340}
341
342#[derive(Debug, Clone)]
344pub struct SubscriptionRequest {
345 pub query_id: String,
346 pub source_id: String,
347 pub enable_bootstrap: bool,
348 pub node_labels: Vec<String>,
349 pub relation_labels: Vec<String>,
350}
351
352pub struct SubscriptionResponse {
354 pub query_id: String,
355 pub source_id: String,
356 pub receiver: Box<dyn super::ChangeReceiver<SourceEventWrapper>>,
357 pub bootstrap_receiver: Option<BootstrapEventReceiver>,
358 pub position_handle: Option<Arc<AtomicU64>>,
364}
365
366pub struct QuerySubscriptionResponse {
368 pub query_id: String,
369 pub receiver: Box<dyn super::ChangeReceiver<QueryResult>>,
370}
371
372#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
374#[serde(tag = "type")]
375pub enum ResultDiff {
376 #[serde(rename = "ADD")]
377 Add { data: serde_json::Value },
378 #[serde(rename = "DELETE")]
379 Delete { data: serde_json::Value },
380 #[serde(rename = "UPDATE")]
381 Update {
382 data: serde_json::Value,
383 before: serde_json::Value,
384 after: serde_json::Value,
385 #[serde(skip_serializing_if = "Option::is_none")]
386 grouping_keys: Option<Vec<String>>,
387 },
388 #[serde(rename = "aggregation")]
389 Aggregation {
390 before: Option<serde_json::Value>,
391 after: serde_json::Value,
392 },
393 #[serde(rename = "noop")]
394 Noop,
395}
396
397#[derive(Debug, Clone, Serialize, Deserialize)]
402pub struct QueryResult {
403 pub query_id: String,
404 pub timestamp: chrono::DateTime<chrono::Utc>,
405 pub results: Vec<ResultDiff>,
406 pub metadata: HashMap<String, serde_json::Value>,
407 #[serde(skip_serializing_if = "Option::is_none")]
409 pub profiling: Option<ProfilingMetadata>,
410}
411
412impl QueryResult {
413 pub fn new(
415 query_id: String,
416 timestamp: chrono::DateTime<chrono::Utc>,
417 results: Vec<ResultDiff>,
418 metadata: HashMap<String, serde_json::Value>,
419 ) -> Self {
420 Self {
421 query_id,
422 timestamp,
423 results,
424 metadata,
425 profiling: None,
426 }
427 }
428
429 pub fn with_profiling(
431 query_id: String,
432 timestamp: chrono::DateTime<chrono::Utc>,
433 results: Vec<ResultDiff>,
434 metadata: HashMap<String, serde_json::Value>,
435 profiling: ProfilingMetadata,
436 ) -> Self {
437 Self {
438 query_id,
439 timestamp,
440 results,
441 metadata,
442 profiling: Some(profiling),
443 }
444 }
445}
446
447impl Timestamped for QueryResult {
449 fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
450 self.timestamp
451 }
452}
453
454pub type ArcQueryResult = Arc<QueryResult>;
456
457#[derive(Debug, Clone, Serialize, Deserialize)]
462pub struct ComponentEvent {
463 pub component_id: String,
464 pub component_type: ComponentType,
465 pub status: ComponentStatus,
466 pub timestamp: chrono::DateTime<chrono::Utc>,
467 pub message: Option<String>,
468}
469
470#[derive(Debug, Clone, Serialize, Deserialize)]
472pub enum ControlMessage {
473 Start(String),
474 Stop(String),
475 Status(String),
476 Shutdown,
477}
478
479pub type ComponentEventBroadcastSender = broadcast::Sender<ComponentEvent>;
480pub type ComponentEventBroadcastReceiver = broadcast::Receiver<ComponentEvent>;
481pub type ComponentEventSender = mpsc::Sender<ComponentEvent>;
484pub type ComponentEventReceiver = mpsc::Receiver<ComponentEvent>;
485pub type ControlMessageReceiver = mpsc::Receiver<ControlMessage>;
486pub type ControlMessageSender = mpsc::Sender<ControlMessage>;
487
488pub type SourceBroadcastSender = broadcast::Sender<ArcSourceEvent>;
490pub type SourceBroadcastReceiver = broadcast::Receiver<ArcSourceEvent>;
491
492pub type QueryResultBroadcastSender = broadcast::Sender<ArcQueryResult>;
494pub type QueryResultBroadcastReceiver = broadcast::Receiver<ArcQueryResult>;
495
496pub type BootstrapEventSender = mpsc::Sender<BootstrapEvent>;
498pub type BootstrapEventReceiver = mpsc::Receiver<BootstrapEvent>;
499
500#[derive(Debug, Clone, Serialize, Deserialize)]
502pub enum ControlSignal {
503 Running { query_id: String },
505 Stopped { query_id: String },
507 Deleted { query_id: String },
509}
510
511#[derive(Debug, Clone)]
513pub struct ControlSignalWrapper {
514 pub signal: ControlSignal,
515 pub timestamp: chrono::DateTime<chrono::Utc>,
516 pub sequence_number: Option<u64>,
517}
518
519impl ControlSignalWrapper {
520 pub fn new(signal: ControlSignal) -> Self {
521 Self {
522 signal,
523 timestamp: chrono::Utc::now(),
524 sequence_number: None,
525 }
526 }
527
528 pub fn with_sequence(signal: ControlSignal, sequence_number: u64) -> Self {
529 Self {
530 signal,
531 timestamp: chrono::Utc::now(),
532 sequence_number: Some(sequence_number),
533 }
534 }
535}
536
537pub type ControlSignalReceiver = mpsc::Receiver<ControlSignalWrapper>;
538pub type ControlSignalSender = mpsc::Sender<ControlSignalWrapper>;
539
540pub struct EventChannels {
541 pub _control_tx: ControlMessageSender,
542 pub control_signal_tx: ControlSignalSender,
543}
544
545pub struct EventReceivers {
546 pub _control_rx: ControlMessageReceiver,
547 pub control_signal_rx: ControlSignalReceiver,
548}
549
550impl EventChannels {
551 pub fn new() -> (Self, EventReceivers) {
552 let (control_tx, control_rx) = mpsc::channel(100);
553 let (control_signal_tx, control_signal_rx) = mpsc::channel(100);
554
555 let channels = Self {
556 _control_tx: control_tx,
557 control_signal_tx,
558 };
559
560 let receivers = EventReceivers {
561 _control_rx: control_rx,
562 control_signal_rx,
563 };
564
565 (channels, receivers)
566 }
567}
568
569#[cfg(test)]
570mod tests {
571 use super::*;
572 use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
573
574 fn create_test_source_change() -> SourceChange {
575 let element = Element::Node {
576 metadata: ElementMetadata {
577 reference: ElementReference::new("TestSource", "test-node-1"),
578 labels: vec!["TestLabel".into()].into(),
579 effective_from: 1000,
580 },
581 properties: Default::default(),
582 };
583 SourceChange::Insert { element }
584 }
585
586 #[test]
587 fn test_source_event_wrapper_into_parts() {
588 let change = create_test_source_change();
589 let wrapper = SourceEventWrapper::new(
590 "test-source".to_string(),
591 SourceEvent::Change(change),
592 chrono::Utc::now(),
593 );
594
595 let (source_id, event, _timestamp, profiling, _sequence) = wrapper.into_parts();
596
597 assert_eq!(source_id, "test-source");
598 assert!(matches!(event, SourceEvent::Change(_)));
599 assert!(profiling.is_none());
600 }
601
602 #[test]
603 fn test_try_unwrap_arc_sole_owner() {
604 let change = create_test_source_change();
605 let wrapper = SourceEventWrapper::new(
606 "test-source".to_string(),
607 SourceEvent::Change(change),
608 chrono::Utc::now(),
609 );
610 let arc = Arc::new(wrapper);
611
612 let result = SourceEventWrapper::try_unwrap_arc(arc);
614 assert!(result.is_ok());
615
616 let (source_id, event, _timestamp, _profiling, _sequence) = result.unwrap();
617 assert_eq!(source_id, "test-source");
618 assert!(matches!(event, SourceEvent::Change(_)));
619 }
620
621 #[test]
622 fn test_try_unwrap_arc_shared() {
623 let change = create_test_source_change();
624 let wrapper = SourceEventWrapper::new(
625 "test-source".to_string(),
626 SourceEvent::Change(change),
627 chrono::Utc::now(),
628 );
629 let arc = Arc::new(wrapper);
630 let _arc2 = arc.clone(); let result = SourceEventWrapper::try_unwrap_arc(arc);
634 assert!(result.is_err());
635
636 let returned_arc = result.unwrap_err();
638 assert_eq!(returned_arc.source_id, "test-source");
639 }
640
641 #[test]
642 fn test_zero_copy_extraction_path() {
643 let change = create_test_source_change();
645 let wrapper = SourceEventWrapper::new(
646 "test-source".to_string(),
647 SourceEvent::Change(change),
648 chrono::Utc::now(),
649 );
650 let arc = Arc::new(wrapper);
651
652 let (source_id, event, _timestamp, _profiling, _sequence) =
654 match SourceEventWrapper::try_unwrap_arc(arc) {
655 Ok(parts) => parts,
656 Err(arc) => {
657 (
659 arc.source_id.clone(),
660 arc.event.clone(),
661 arc.timestamp,
662 arc.profiling.clone(),
663 arc.sequence,
664 )
665 }
666 };
667
668 let source_change = match event {
670 SourceEvent::Change(change) => Some(change),
671 _ => None,
672 };
673
674 assert_eq!(source_id, "test-source");
675 assert!(source_change.is_some());
676 }
677
678 #[test]
679 fn test_source_event_wrapper_with_sequence() {
680 let change = create_test_source_change();
681 let wrapper = SourceEventWrapper::with_sequence(
682 "test-source".to_string(),
683 SourceEvent::Change(change),
684 chrono::Utc::now(),
685 42,
686 None,
687 );
688 assert_eq!(wrapper.sequence, Some(42));
689 assert!(wrapper.profiling.is_none());
690
691 let (_source_id, _event, _timestamp, _profiling, sequence) = wrapper.into_parts();
692 assert_eq!(sequence, Some(42));
693 }
694
695 #[test]
696 fn test_source_event_wrapper_new_has_no_sequence() {
697 let change = create_test_source_change();
698 let wrapper = SourceEventWrapper::new(
699 "test-source".to_string(),
700 SourceEvent::Change(change),
701 chrono::Utc::now(),
702 );
703 assert!(wrapper.sequence.is_none());
704 }
705
706 #[test]
707 fn test_subscription_response_with_position_handle() {
708 use std::sync::atomic::{AtomicU64, Ordering};
709 use std::sync::Arc;
710
711 let handle = Arc::new(AtomicU64::new(u64::MAX));
712 assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
713
714 let handle_clone = handle.clone();
716 handle.store(500, Ordering::Relaxed);
717 assert_eq!(handle_clone.load(Ordering::Relaxed), 500);
718 }
719
720 #[test]
721 fn test_subscription_settings_with_resume_from() {
722 use std::collections::HashSet;
723 let settings = crate::config::SourceSubscriptionSettings {
724 source_id: "test-source".to_string(),
725 enable_bootstrap: false,
726 query_id: "test-query".to_string(),
727 nodes: HashSet::new(),
728 relations: HashSet::new(),
729 resume_from: Some(500),
730 request_position_handle: true,
731 };
732 assert_eq!(settings.resume_from, Some(500));
733 assert!(settings.request_position_handle);
734 }
735}