#[cfg(test)]
mod tests {
use crate::channels::*;
use drasi_core::models::{
Element, ElementMetadata, ElementPropertyMap, ElementReference, SourceChange,
};
use std::sync::Arc;
#[test]
fn test_component_event_source_starting() {
let event = ComponentEvent {
component_id: "source-1".to_string(),
component_type: ComponentType::Source,
status: ComponentStatus::Starting,
timestamp: chrono::Utc::now(),
message: Some("Starting source".to_string()),
};
assert_eq!(event.component_id, "source-1");
assert!(matches!(event.component_type, ComponentType::Source));
assert!(matches!(event.status, ComponentStatus::Starting));
assert_eq!(event.message, Some("Starting source".to_string()));
}
#[test]
fn test_source_event_change() {
let metadata = ElementMetadata {
reference: ElementReference::new("source1", "node1"),
labels: Arc::from(vec![Arc::from("Person")]),
effective_from: 12345,
};
let element = Element::Node {
metadata,
properties: ElementPropertyMap::new(),
};
let change = SourceChange::Insert { element };
let event = SourceEvent::Change(change);
assert!(matches!(event, SourceEvent::Change(_)));
}
#[test]
fn test_source_event_control() {
let control = SourceControl::Subscription {
query_id: "query-1".to_string(),
query_node_id: "node-1".to_string(),
node_labels: vec!["Person".to_string()],
rel_labels: vec!["KNOWS".to_string()],
operation: ControlOperation::Insert,
};
let event = SourceEvent::Control(control);
assert!(matches!(
event,
SourceEvent::Control(SourceControl::Subscription { .. })
));
}
#[test]
fn test_source_event_wrapper() {
let metadata = ElementMetadata {
reference: ElementReference::new("source1", "node1"),
labels: Arc::from(vec![Arc::from("Person")]),
effective_from: 12345,
};
let element = Element::Node {
metadata,
properties: ElementPropertyMap::new(),
};
let change = SourceChange::Insert { element };
let wrapper = SourceEventWrapper {
source_id: "test-source".to_string(),
event: SourceEvent::Change(change),
timestamp: chrono::Utc::now(),
profiling: None,
sequence: None,
};
assert_eq!(wrapper.source_id, "test-source");
assert!(matches!(wrapper.event, SourceEvent::Change(_)));
}
#[test]
fn test_query_result_creation() {
use std::collections::HashMap;
let result = QueryResult {
query_id: "query-1".to_string(),
results: vec![],
metadata: HashMap::new(),
profiling: None,
timestamp: chrono::Utc::now(),
};
assert_eq!(result.query_id, "query-1");
assert!(result.results.is_empty());
assert!(result.metadata.is_empty());
}
#[test]
fn test_query_result_with_metadata() {
use std::collections::HashMap;
let mut metadata = HashMap::new();
metadata.insert("key".to_string(), serde_json::json!("value"));
let result = QueryResult {
query_id: "query-1".to_string(),
results: vec![],
metadata: metadata.clone(),
profiling: None,
timestamp: chrono::Utc::now(),
};
assert!(!result.metadata.is_empty());
assert_eq!(
result.metadata.get("key"),
Some(&serde_json::json!("value"))
);
}
#[test]
fn test_event_channels_creation() {
let (channels, receivers) = EventChannels::new();
drop(channels);
drop(receivers.control_signal_rx);
}
#[test]
fn test_component_status_variants() {
let statuses = [
ComponentStatus::Starting,
ComponentStatus::Running,
ComponentStatus::Stopping,
ComponentStatus::Stopped,
ComponentStatus::Error,
];
assert_eq!(statuses.len(), 5);
}
#[test]
fn test_component_type_variants() {
let types = [ComponentType::Source, ComponentType::Query, ComponentType::Reaction];
assert_eq!(types.len(), 3);
}
#[test]
fn test_source_control_subscription() {
let control = SourceControl::Subscription {
query_id: "query-1".to_string(),
query_node_id: "node-1".to_string(),
node_labels: vec!["Person".to_string()],
rel_labels: vec!["KNOWS".to_string()],
operation: ControlOperation::Insert,
};
assert!(matches!(control, SourceControl::Subscription { .. }));
}
#[tokio::test]
async fn test_send_receive_component_event() {
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
let event = ComponentEvent {
component_id: "test".to_string(),
component_type: ComponentType::Source,
status: ComponentStatus::Running,
timestamp: chrono::Utc::now(),
message: None,
};
tx.send(event).await.unwrap();
let received = rx.recv().await.unwrap();
assert_eq!(received.component_id, "test");
assert!(matches!(received.status, ComponentStatus::Running));
}
#[tokio::test]
async fn test_send_receive_query_result() {
use std::collections::HashMap;
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
let result = QueryResult {
query_id: "query-1".to_string(),
results: vec![],
metadata: HashMap::new(),
profiling: None,
timestamp: chrono::Utc::now(),
};
tx.send(result).await.unwrap();
let received = rx.recv().await.unwrap();
assert_eq!(received.query_id, "query-1");
assert!(received.results.is_empty());
}
#[tokio::test]
async fn test_channel_closes_when_sender_dropped() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<ComponentEvent>(10);
drop(tx);
let result = rx.recv().await;
assert!(
result.is_none(),
"Channel should be closed after sender dropped"
);
}
#[tokio::test]
async fn test_multiple_events_in_order() {
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
for i in 1..=5 {
let event = ComponentEvent {
component_id: format!("component-{i}"),
component_type: ComponentType::Source,
status: ComponentStatus::Running,
timestamp: chrono::Utc::now(),
message: None,
};
tx.send(event).await.unwrap();
}
for i in 1..=5 {
let received = rx.recv().await.unwrap();
assert_eq!(received.component_id, format!("component-{i}"));
}
}
}