#[cfg(test)]
mod tests {
use drasi_lib::channels::{ComponentStatus, ResultDiff};
use drasi_lib::config::{ReactionConfig, ReactionSpecificConfig};
use drasi_lib::reactions::common::AdaptiveBatchConfig as ConfigAdaptiveBatchConfig;
use drasi_lib::reactions::http_adaptive::HttpAdaptiveReactionConfig;
use drasi_lib::reactions::http_adaptive::{AdaptiveHttpReaction, BatchResult};
use drasi_lib::reactions::Reaction;
use serde_json::json;
use std::collections::HashMap;
use tokio::sync::mpsc;
fn create_test_config(base_url: String) -> ReactionConfig {
let routes = HashMap::new();
let http_config = HttpAdaptiveReactionConfig {
base_url,
token: None,
timeout_ms: 5000,
routes,
adaptive: ConfigAdaptiveBatchConfig {
adaptive_min_batch_size: 1,
adaptive_max_batch_size: 100,
adaptive_window_size: 10,
adaptive_batch_timeout_ms: 1000,
},
};
ReactionConfig {
id: "test-adaptive-http".to_string(),
queries: vec!["test-query".to_string()],
auto_start: true,
config: ReactionSpecificConfig::HttpAdaptive(http_config),
priority_queue_capacity: None,
}
}
#[tokio::test]
async fn test_adaptive_http_reaction_creation() {
let (event_tx, _event_rx) = mpsc::channel(100);
let config = create_test_config("http://localhost:8080".to_string());
let reaction = AdaptiveHttpReaction::new(config.clone(), event_tx);
assert_eq!(reaction.status().await, ComponentStatus::Stopped);
assert_eq!(reaction.get_config().id, config.id);
assert_eq!(reaction.get_config().queries, config.queries);
}
#[tokio::test]
async fn test_adaptive_config_defaults() {
let (event_tx, _event_rx) = mpsc::channel(100);
let config = create_test_config("http://localhost:8080".to_string());
let _reaction = AdaptiveHttpReaction::new(config, event_tx);
}
#[tokio::test]
async fn test_adaptive_config_custom() {
let (event_tx, _event_rx) = mpsc::channel(100);
let config = ReactionConfig {
id: "test-custom-adaptive".to_string(),
queries: vec!["test-query".to_string()],
auto_start: true,
config: ReactionSpecificConfig::HttpAdaptive(HttpAdaptiveReactionConfig {
base_url: "http://localhost:8080".to_string(),
token: None,
timeout_ms: 5000,
routes: HashMap::new(),
adaptive: ConfigAdaptiveBatchConfig {
adaptive_min_batch_size: 20,
adaptive_max_batch_size: 500,
adaptive_window_size: 100, adaptive_batch_timeout_ms: 50,
},
}),
priority_queue_capacity: None,
};
let _reaction = AdaptiveHttpReaction::new(config, event_tx);
}
#[test]
fn test_batch_result_serialization() {
let batch_result = BatchResult {
query_id: "test-query".to_string(),
results: vec![
ResultDiff::Add {
data: json!({"id": 1, "name": "Alice"}),
},
ResultDiff::Update {
data: json!({"id": 2, "name": "Bob Updated"}),
before: json!({"id": 2, "name": "Bob"}),
after: json!({"id": 2, "name": "Bob Updated"}),
grouping_keys: None,
},
],
timestamp: "2025-10-19T12:34:56.789Z".to_string(),
count: 2,
};
let json = serde_json::to_string(&batch_result).unwrap();
assert!(json.contains("test-query"));
assert!(json.contains("\"count\":2"));
let deserialized: BatchResult = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.query_id, "test-query");
assert_eq!(deserialized.count, 2);
assert_eq!(deserialized.results.len(), 2);
}
#[test]
fn test_batch_result_array_serialization() {
let batches = vec![
BatchResult {
query_id: "query1".to_string(),
results: vec![ResultDiff::Add {
data: json!({"id": 1}),
}],
timestamp: "2025-10-19T12:34:56Z".to_string(),
count: 1,
},
BatchResult {
query_id: "query2".to_string(),
results: vec![
ResultDiff::Update {
data: json!({"id": 2}),
before: json!({"id": 2}),
after: json!({"id": 2}),
grouping_keys: None,
},
ResultDiff::Delete {
data: json!({"id": 3}),
},
],
timestamp: "2025-10-19T12:34:57Z".to_string(),
count: 2,
},
];
let json = serde_json::to_string(&batches).unwrap();
assert!(json.contains("query1"));
assert!(json.contains("query2"));
let deserialized: Vec<BatchResult> = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.len(), 2);
assert_eq!(deserialized[0].count, 1);
assert_eq!(deserialized[1].count, 2);
}
#[test]
fn test_batch_result_matches_specification() {
let batch_result = BatchResult {
query_id: "user-changes".to_string(),
results: vec![
ResultDiff::Add {
data: json!({"id": "user_123", "name": "John Doe"}),
},
ResultDiff::Update {
data: json!({"id": "user_456", "name": "Jane Smith"}),
before: json!({"id": "user_456", "name": "Jane Doe"}),
after: json!({"id": "user_456", "name": "Jane Smith"}),
grouping_keys: None,
},
ResultDiff::Delete {
data: json!({"id": "user_789", "name": "Bob Wilson"}),
},
],
timestamp: "2025-10-19T12:34:56.789Z".to_string(),
count: 3,
};
assert_eq!(batch_result.query_id, "user-changes");
assert_eq!(batch_result.count, 3);
assert_eq!(batch_result.results.len(), 3);
assert!(!batch_result.timestamp.is_empty());
match &batch_result.results[0] {
ResultDiff::Add { data } => assert!(data.is_object()),
_ => panic!("Expected add result"),
}
match &batch_result.results[1] {
ResultDiff::Update {
data,
before,
after,
..
} => {
assert!(data.is_object());
assert!(before.is_object());
assert!(after.is_object());
}
_ => panic!("Expected update result"),
}
match &batch_result.results[2] {
ResultDiff::Delete { data } => assert!(data.is_object()),
_ => panic!("Expected delete result"),
}
}
#[tokio::test]
async fn test_batch_endpoint_enabled_by_default() {
let (event_tx, _event_rx) = mpsc::channel(100);
let config = create_test_config("http://localhost:8080".to_string());
let _reaction = AdaptiveHttpReaction::new(config, event_tx);
}
#[tokio::test]
async fn test_reaction_type_identification() {
let (event_tx, _event_rx) = mpsc::channel(100);
let config = create_test_config("http://localhost:8080".to_string());
let reaction = AdaptiveHttpReaction::new(config, event_tx);
let config = reaction.get_config();
assert_eq!(config.reaction_type(), "http_adaptive");
}
#[test]
fn test_batch_result_count_matches_results_length() {
let results = vec![
ResultDiff::Add {
data: json!({"id": 1}),
},
ResultDiff::Add {
data: json!({"id": 2}),
},
ResultDiff::Add {
data: json!({"id": 3}),
},
];
let batch_result = BatchResult {
query_id: "test".to_string(),
results: results.clone(),
timestamp: chrono::Utc::now().to_rfc3339(),
count: results.len(),
};
assert_eq!(batch_result.count, batch_result.results.len());
assert_eq!(batch_result.count, 3);
}
#[test]
fn test_batch_result_timestamp_format() {
let batch_result = BatchResult {
query_id: "test".to_string(),
results: vec![],
timestamp: chrono::Utc::now().to_rfc3339(),
count: 0,
};
assert!(chrono::DateTime::parse_from_rfc3339(&batch_result.timestamp).is_ok());
}
#[tokio::test]
async fn test_multiple_queries_support() {
let (event_tx, _event_rx) = mpsc::channel(100);
let config = ReactionConfig {
id: "test-multi-query".to_string(),
queries: vec!["query1".to_string(), "query2".to_string(), "query3".to_string()],
auto_start: true,
config: ReactionSpecificConfig::HttpAdaptive(HttpAdaptiveReactionConfig {
base_url: "http://localhost:8080".to_string(),
token: None,
timeout_ms: 5000,
routes: HashMap::new(),
adaptive: ConfigAdaptiveBatchConfig {
adaptive_min_batch_size: 1,
adaptive_max_batch_size: 100,
adaptive_window_size: 10,
adaptive_batch_timeout_ms: 1000,
},
}),
priority_queue_capacity: None,
};
let reaction = AdaptiveHttpReaction::new(config, event_tx);
assert_eq!(reaction.get_config().queries.len(), 3);
}
#[tokio::test]
async fn test_http2_client_configuration() {
let (event_tx, _event_rx) = mpsc::channel(100);
let config = create_test_config("http://localhost:8080".to_string());
let _reaction = AdaptiveHttpReaction::new(config, event_tx);
}
#[tokio::test]
async fn test_token_authentication_optional() {
let (event_tx, _event_rx) = mpsc::channel(100);
let config_no_token = create_test_config("http://localhost:8080".to_string());
let _reaction1 = AdaptiveHttpReaction::new(config_no_token, event_tx.clone());
let config_with_token = ReactionConfig {
id: "test-with-token".to_string(),
queries: vec!["test-query".to_string()],
auto_start: true,
config: ReactionSpecificConfig::HttpAdaptive(HttpAdaptiveReactionConfig {
base_url: "http://localhost:8080".to_string(),
token: Some("test-token".to_string()),
timeout_ms: 5000,
routes: HashMap::new(),
adaptive: ConfigAdaptiveBatchConfig {
adaptive_min_batch_size: 1,
adaptive_max_batch_size: 100,
adaptive_window_size: 10,
adaptive_batch_timeout_ms: 1000,
},
}),
priority_queue_capacity: None,
};
let _reaction2 = AdaptiveHttpReaction::new(config_with_token, event_tx);
}
#[test]
fn test_empty_batch_result() {
let batch_result = BatchResult {
query_id: "test".to_string(),
results: vec![],
timestamp: chrono::Utc::now().to_rfc3339(),
count: 0,
};
assert_eq!(batch_result.count, 0);
assert!(batch_result.results.is_empty());
let json = serde_json::to_string(&batch_result).unwrap();
assert!(json.contains("\"count\":0"));
}
#[test]
fn test_large_batch_result() {
let mut results = Vec::new();
for i in 0..1000 {
results.push(ResultDiff::Add {
data: json!({"id": i, "value": format!("item_{}", i)}),
});
}
let batch_result = BatchResult {
query_id: "large-batch".to_string(),
results: results.clone(),
timestamp: chrono::Utc::now().to_rfc3339(),
count: results.len(),
};
assert_eq!(batch_result.count, 1000);
assert_eq!(batch_result.results.len(), 1000);
let json = serde_json::to_string(&batch_result).unwrap();
assert!(json.contains("large-batch"));
assert!(json.contains("\"count\":1000"));
}
}