use crate::profiling::ProfilingMetadata;
use drasi_core::models::SourceChange;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
pub trait Timestamped {
fn timestamp(&self) -> chrono::DateTime<chrono::Utc>;
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum ComponentType {
Source,
Query,
Reaction,
BootstrapProvider,
IdentityProvider,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum ComponentStatus {
Added,
Starting,
Running,
Stopping,
Stopped,
Removed,
Reconfiguring,
Error,
}
#[derive(Debug, Clone)]
pub struct SourceChangeEvent {
pub source_id: String,
pub change: SourceChange,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum SourceControl {
Subscription {
query_id: String,
query_node_id: String,
node_labels: Vec<String>,
rel_labels: Vec<String>,
operation: ControlOperation,
},
FuturesDue,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ControlOperation {
Insert,
Update,
Delete,
}
#[derive(Debug, Clone)]
pub enum SourceEvent {
Change(SourceChange),
Control(SourceControl),
}
#[derive(Debug, Clone)]
pub struct SourceEventWrapper {
pub source_id: String,
pub event: SourceEvent,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub profiling: Option<ProfilingMetadata>,
pub sequence: Option<u64>,
}
impl SourceEventWrapper {
pub fn new(
source_id: String,
event: SourceEvent,
timestamp: chrono::DateTime<chrono::Utc>,
) -> Self {
Self {
source_id,
event,
timestamp,
profiling: None,
sequence: None,
}
}
pub fn with_profiling(
source_id: String,
event: SourceEvent,
timestamp: chrono::DateTime<chrono::Utc>,
profiling: ProfilingMetadata,
) -> Self {
Self {
source_id,
event,
timestamp,
profiling: Some(profiling),
sequence: None,
}
}
pub fn with_sequence(
source_id: String,
event: SourceEvent,
timestamp: chrono::DateTime<chrono::Utc>,
sequence: u64,
profiling: Option<ProfilingMetadata>,
) -> Self {
Self {
source_id,
event,
timestamp,
profiling,
sequence: Some(sequence),
}
}
pub fn into_parts(
self,
) -> (
String,
SourceEvent,
chrono::DateTime<chrono::Utc>,
Option<ProfilingMetadata>,
Option<u64>,
) {
(
self.source_id,
self.event,
self.timestamp,
self.profiling,
self.sequence,
)
}
pub fn try_unwrap_arc(
arc_self: Arc<Self>,
) -> Result<
(
String,
SourceEvent,
chrono::DateTime<chrono::Utc>,
Option<ProfilingMetadata>,
Option<u64>,
),
Arc<Self>,
> {
Arc::try_unwrap(arc_self).map(|wrapper| wrapper.into_parts())
}
}
impl Timestamped for SourceEventWrapper {
fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
self.timestamp
}
}
pub type ArcSourceEvent = Arc<SourceEventWrapper>;
#[derive(Debug, Clone)]
pub struct BootstrapEvent {
pub source_id: String,
pub change: SourceChange,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub sequence: u64,
}
#[derive(Debug, Clone)]
pub struct SubscriptionRequest {
pub query_id: String,
pub source_id: String,
pub enable_bootstrap: bool,
pub node_labels: Vec<String>,
pub relation_labels: Vec<String>,
}
pub struct SubscriptionResponse {
pub query_id: String,
pub source_id: String,
pub receiver: Box<dyn super::ChangeReceiver<SourceEventWrapper>>,
pub bootstrap_receiver: Option<BootstrapEventReceiver>,
pub position_handle: Option<Arc<AtomicU64>>,
}
pub struct QuerySubscriptionResponse {
pub query_id: String,
pub receiver: Box<dyn super::ChangeReceiver<QueryResult>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
pub enum ResultDiff {
#[serde(rename = "ADD")]
Add { data: serde_json::Value },
#[serde(rename = "DELETE")]
Delete { data: serde_json::Value },
#[serde(rename = "UPDATE")]
Update {
data: serde_json::Value,
before: serde_json::Value,
after: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
grouping_keys: Option<Vec<String>>,
},
#[serde(rename = "aggregation")]
Aggregation {
before: Option<serde_json::Value>,
after: serde_json::Value,
},
#[serde(rename = "noop")]
Noop,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryResult {
pub query_id: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub results: Vec<ResultDiff>,
pub metadata: HashMap<String, serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub profiling: Option<ProfilingMetadata>,
}
impl QueryResult {
pub fn new(
query_id: String,
timestamp: chrono::DateTime<chrono::Utc>,
results: Vec<ResultDiff>,
metadata: HashMap<String, serde_json::Value>,
) -> Self {
Self {
query_id,
timestamp,
results,
metadata,
profiling: None,
}
}
pub fn with_profiling(
query_id: String,
timestamp: chrono::DateTime<chrono::Utc>,
results: Vec<ResultDiff>,
metadata: HashMap<String, serde_json::Value>,
profiling: ProfilingMetadata,
) -> Self {
Self {
query_id,
timestamp,
results,
metadata,
profiling: Some(profiling),
}
}
}
impl Timestamped for QueryResult {
fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
self.timestamp
}
}
pub type ArcQueryResult = Arc<QueryResult>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ComponentEvent {
pub component_id: String,
pub component_type: ComponentType,
pub status: ComponentStatus,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub message: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ControlMessage {
Start(String),
Stop(String),
Status(String),
Shutdown,
}
pub type ComponentEventBroadcastSender = broadcast::Sender<ComponentEvent>;
pub type ComponentEventBroadcastReceiver = broadcast::Receiver<ComponentEvent>;
pub type ComponentEventSender = mpsc::Sender<ComponentEvent>;
pub type ComponentEventReceiver = mpsc::Receiver<ComponentEvent>;
pub type ControlMessageReceiver = mpsc::Receiver<ControlMessage>;
pub type ControlMessageSender = mpsc::Sender<ControlMessage>;
pub type SourceBroadcastSender = broadcast::Sender<ArcSourceEvent>;
pub type SourceBroadcastReceiver = broadcast::Receiver<ArcSourceEvent>;
pub type QueryResultBroadcastSender = broadcast::Sender<ArcQueryResult>;
pub type QueryResultBroadcastReceiver = broadcast::Receiver<ArcQueryResult>;
pub type BootstrapEventSender = mpsc::Sender<BootstrapEvent>;
pub type BootstrapEventReceiver = mpsc::Receiver<BootstrapEvent>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ControlSignal {
Running { query_id: String },
Stopped { query_id: String },
Deleted { query_id: String },
}
#[derive(Debug, Clone)]
pub struct ControlSignalWrapper {
pub signal: ControlSignal,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub sequence_number: Option<u64>,
}
impl ControlSignalWrapper {
pub fn new(signal: ControlSignal) -> Self {
Self {
signal,
timestamp: chrono::Utc::now(),
sequence_number: None,
}
}
pub fn with_sequence(signal: ControlSignal, sequence_number: u64) -> Self {
Self {
signal,
timestamp: chrono::Utc::now(),
sequence_number: Some(sequence_number),
}
}
}
pub type ControlSignalReceiver = mpsc::Receiver<ControlSignalWrapper>;
pub type ControlSignalSender = mpsc::Sender<ControlSignalWrapper>;
pub struct EventChannels {
pub _control_tx: ControlMessageSender,
pub control_signal_tx: ControlSignalSender,
}
pub struct EventReceivers {
pub _control_rx: ControlMessageReceiver,
pub control_signal_rx: ControlSignalReceiver,
}
impl EventChannels {
pub fn new() -> (Self, EventReceivers) {
let (control_tx, control_rx) = mpsc::channel(100);
let (control_signal_tx, control_signal_rx) = mpsc::channel(100);
let channels = Self {
_control_tx: control_tx,
control_signal_tx,
};
let receivers = EventReceivers {
_control_rx: control_rx,
control_signal_rx,
};
(channels, receivers)
}
}
#[cfg(test)]
mod tests {
use super::*;
use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
fn create_test_source_change() -> SourceChange {
let element = Element::Node {
metadata: ElementMetadata {
reference: ElementReference::new("TestSource", "test-node-1"),
labels: vec!["TestLabel".into()].into(),
effective_from: 1000,
},
properties: Default::default(),
};
SourceChange::Insert { element }
}
#[test]
fn test_source_event_wrapper_into_parts() {
let change = create_test_source_change();
let wrapper = SourceEventWrapper::new(
"test-source".to_string(),
SourceEvent::Change(change),
chrono::Utc::now(),
);
let (source_id, event, _timestamp, profiling, _sequence) = wrapper.into_parts();
assert_eq!(source_id, "test-source");
assert!(matches!(event, SourceEvent::Change(_)));
assert!(profiling.is_none());
}
#[test]
fn test_try_unwrap_arc_sole_owner() {
let change = create_test_source_change();
let wrapper = SourceEventWrapper::new(
"test-source".to_string(),
SourceEvent::Change(change),
chrono::Utc::now(),
);
let arc = Arc::new(wrapper);
let result = SourceEventWrapper::try_unwrap_arc(arc);
assert!(result.is_ok());
let (source_id, event, _timestamp, _profiling, _sequence) = result.unwrap();
assert_eq!(source_id, "test-source");
assert!(matches!(event, SourceEvent::Change(_)));
}
#[test]
fn test_try_unwrap_arc_shared() {
let change = create_test_source_change();
let wrapper = SourceEventWrapper::new(
"test-source".to_string(),
SourceEvent::Change(change),
chrono::Utc::now(),
);
let arc = Arc::new(wrapper);
let _arc2 = arc.clone();
let result = SourceEventWrapper::try_unwrap_arc(arc);
assert!(result.is_err());
let returned_arc = result.unwrap_err();
assert_eq!(returned_arc.source_id, "test-source");
}
#[test]
fn test_zero_copy_extraction_path() {
let change = create_test_source_change();
let wrapper = SourceEventWrapper::new(
"test-source".to_string(),
SourceEvent::Change(change),
chrono::Utc::now(),
);
let arc = Arc::new(wrapper);
let (source_id, event, _timestamp, _profiling, _sequence) =
match SourceEventWrapper::try_unwrap_arc(arc) {
Ok(parts) => parts,
Err(arc) => {
(
arc.source_id.clone(),
arc.event.clone(),
arc.timestamp,
arc.profiling.clone(),
arc.sequence,
)
}
};
let source_change = match event {
SourceEvent::Change(change) => Some(change),
_ => None,
};
assert_eq!(source_id, "test-source");
assert!(source_change.is_some());
}
#[test]
fn test_source_event_wrapper_with_sequence() {
let change = create_test_source_change();
let wrapper = SourceEventWrapper::with_sequence(
"test-source".to_string(),
SourceEvent::Change(change),
chrono::Utc::now(),
42,
None,
);
assert_eq!(wrapper.sequence, Some(42));
assert!(wrapper.profiling.is_none());
let (_source_id, _event, _timestamp, _profiling, sequence) = wrapper.into_parts();
assert_eq!(sequence, Some(42));
}
#[test]
fn test_source_event_wrapper_new_has_no_sequence() {
let change = create_test_source_change();
let wrapper = SourceEventWrapper::new(
"test-source".to_string(),
SourceEvent::Change(change),
chrono::Utc::now(),
);
assert!(wrapper.sequence.is_none());
}
#[test]
fn test_subscription_response_with_position_handle() {
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
let handle = Arc::new(AtomicU64::new(u64::MAX));
assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
let handle_clone = handle.clone();
handle.store(500, Ordering::Relaxed);
assert_eq!(handle_clone.load(Ordering::Relaxed), 500);
}
#[test]
fn test_subscription_settings_with_resume_from() {
use std::collections::HashSet;
let settings = crate::config::SourceSubscriptionSettings {
source_id: "test-source".to_string(),
enable_bootstrap: false,
query_id: "test-query".to_string(),
nodes: HashSet::new(),
relations: HashSet::new(),
resume_from: Some(500),
request_position_handle: true,
};
assert_eq!(settings.resume_from, Some(500));
assert!(settings.request_position_handle);
}
}