#[cfg(test)]
mod tests {
use crate::lag_monitor::*;
use crate::sharding::{BackpressureAction, BackpressureManager};
use chrono::Utc;
use std::time::Duration;
use tokio::sync::mpsc;
fn create_mock_lag_stats(
consumer_name: &str,
stream_name: &str,
message_lag: u64,
pending_acks: i64,
backpressure_active: bool,
) -> ConsumerLagStats {
ConsumerLagStats {
consumer_name: consumer_name.to_string(),
stream_name: stream_name.to_string(),
message_lag,
pending_acks,
throughput_mps: 25.5,
last_updated: Utc::now(),
backpressure_active,
utilization_percent: (pending_acks as f64 / 1000.0) * 100.0,
}
}
fn create_mock_backpressure_manager(
lag_threshold: u64,
pending_ack_threshold: i64,
) -> BackpressureManager {
BackpressureManager {
lag_threshold,
pending_ack_threshold,
response_actions: vec![
BackpressureAction::RouteToQuarantine,
BackpressureAction::ReduceBatchSize(10),
BackpressureAction::ExtendAckWait(Duration::from_secs(30)),
BackpressureAction::AlertOps("High lag detected in test".to_string()),
],
}
}
#[test]
fn test_consumer_lag_stats_creation_comprehensive() {
let stats = ConsumerLagStats {
consumer_name: "test-consumer-comprehensive".to_string(),
stream_name: "ATOMS_VETTED".to_string(),
message_lag: 1500,
pending_acks: 300,
throughput_mps: 45.2,
last_updated: Utc::now(),
backpressure_active: true,
utilization_percent: 87.5,
};
assert_eq!(stats.consumer_name, "test-consumer-comprehensive");
assert_eq!(stats.stream_name, "ATOMS_VETTED");
assert_eq!(stats.message_lag, 1500);
assert_eq!(stats.pending_acks, 300);
assert_eq!(stats.throughput_mps, 45.2);
assert!(stats.backpressure_active);
assert_eq!(stats.utilization_percent, 87.5);
let now = Utc::now();
let time_diff = now.signed_duration_since(stats.last_updated);
assert!(time_diff.num_seconds() < 60);
}
#[test]
fn test_consumer_lag_stats_serialization() {
let stats = create_mock_lag_stats("serialization-test", "TEST_STREAM", 500, 100, false);
let json = serde_json::to_string(&stats).expect("Should serialize to JSON");
assert!(json.contains("serialization-test"));
assert!(json.contains("TEST_STREAM"));
assert!(json.contains("500"));
let deserialized: ConsumerLagStats =
serde_json::from_str(&json).expect("Should deserialize from JSON");
assert_eq!(deserialized.consumer_name, stats.consumer_name);
assert_eq!(deserialized.message_lag, stats.message_lag);
assert_eq!(deserialized.pending_acks, stats.pending_acks);
}
#[test]
fn test_backpressure_alert_comprehensive() {
let timestamp = Utc::now();
let alert = BackpressureAlert {
consumer_name: "alert-test-consumer".to_string(),
stream_name: "SDLC_RAW".to_string(),
alert_type: BackpressureAlertType::HighLag,
message_lag: 2500,
pending_acks: 800,
actions_taken: vec![
"Routed to quarantine".to_string(),
"Reduced batch size to 5".to_string(),
"Extended ack wait to 60s".to_string(),
],
timestamp,
};
assert_eq!(alert.consumer_name, "alert-test-consumer");
assert_eq!(alert.stream_name, "SDLC_RAW");
assert!(matches!(alert.alert_type, BackpressureAlertType::HighLag));
assert_eq!(alert.message_lag, 2500);
assert_eq!(alert.pending_acks, 800);
assert_eq!(alert.actions_taken.len(), 3);
assert_eq!(alert.timestamp, timestamp);
}
#[test]
fn test_backpressure_alert_types() {
let high_lag_alert = BackpressureAlert {
consumer_name: "test".to_string(),
stream_name: "TEST".to_string(),
alert_type: BackpressureAlertType::HighLag,
message_lag: 3000,
pending_acks: 500,
actions_taken: vec![],
timestamp: Utc::now(),
};
let high_acks_alert = BackpressureAlert {
alert_type: BackpressureAlertType::HighPendingAcks,
..high_lag_alert.clone()
};
let stalled_alert = BackpressureAlert {
alert_type: BackpressureAlertType::ConsumerStalled,
..high_lag_alert.clone()
};
let resolved_alert = BackpressureAlert {
alert_type: BackpressureAlertType::BackpressureResolved,
..high_lag_alert.clone()
};
assert!(matches!(
high_lag_alert.alert_type,
BackpressureAlertType::HighLag
));
assert!(matches!(
high_acks_alert.alert_type,
BackpressureAlertType::HighPendingAcks
));
assert!(matches!(
stalled_alert.alert_type,
BackpressureAlertType::ConsumerStalled
));
assert!(matches!(
resolved_alert.alert_type,
BackpressureAlertType::BackpressureResolved
));
}
#[test]
fn test_backpressure_alert_serialization() {
let alert = BackpressureAlert {
consumer_name: "serialize-test".to_string(),
stream_name: "SERIALIZE_STREAM".to_string(),
alert_type: BackpressureAlertType::ConsumerStalled,
message_lag: 5000,
pending_acks: 1200,
actions_taken: vec!["Emergency alert sent".to_string()],
timestamp: Utc::now(),
};
let json = serde_json::to_string(&alert).expect("Should serialize");
let deserialized: BackpressureAlert =
serde_json::from_str(&json).expect("Should deserialize");
assert_eq!(deserialized.consumer_name, alert.consumer_name);
assert_eq!(deserialized.message_lag, alert.message_lag);
assert!(matches!(
deserialized.alert_type,
BackpressureAlertType::ConsumerStalled
));
}
#[tokio::test]
async fn test_backpressure_alert_handler() {
let (tx, rx) = mpsc::channel(10);
let mut handler = BackpressureAlertHandler::new(rx);
let alert1 = BackpressureAlert {
consumer_name: "handler-test-1".to_string(),
stream_name: "TEST_STREAM".to_string(),
alert_type: BackpressureAlertType::HighLag,
message_lag: 1500,
pending_acks: 400,
actions_taken: vec!["Test action".to_string()],
timestamp: Utc::now(),
};
let alert2 = BackpressureAlert {
consumer_name: "handler-test-2".to_string(),
stream_name: "TEST_STREAM".to_string(),
alert_type: BackpressureAlertType::BackpressureResolved,
message_lag: 100,
pending_acks: 50,
actions_taken: vec!["Resolved".to_string()],
timestamp: Utc::now(),
};
tx.send(alert1.clone()).await.expect("Should send alert");
tx.send(alert2.clone()).await.expect("Should send alert");
drop(tx);
handler.start_handling().await;
}
#[test]
fn test_backpressure_manager_integration() {
let manager = create_mock_backpressure_manager(1000, 500);
assert!(!manager.should_apply_backpressure(500, 250));
let actions = manager.generate_backpressure_response(500, 250);
assert!(actions.is_empty());
assert!(manager.should_apply_backpressure(1500, 250));
let actions = manager.generate_backpressure_response(1500, 250);
assert_eq!(actions.len(), 4);
assert!(manager.should_apply_backpressure(500, 700));
let actions = manager.generate_backpressure_response(500, 700);
assert_eq!(actions.len(), 4);
assert!(manager.should_apply_backpressure(2000, 800));
let actions = manager.generate_backpressure_response(2000, 800);
assert_eq!(actions.len(), 4);
}
#[test]
fn test_smith_stream_names() {
let expected_streams = vec![
"SDLC_RAW",
"ATOMS_VETTED",
"ATOMS_RESULTS",
"AUDIT_SECURITY",
"SDLC_QUARANTINE_BACKPRESSURE",
];
for stream in &expected_streams {
assert!(!stream.is_empty());
assert!(stream.chars().all(|c| c.is_ascii_uppercase() || c == '_'));
}
assert_eq!(expected_streams.len(), 5);
}
#[test]
fn test_consumer_lag_stats_edge_cases() {
let zero_stats = ConsumerLagStats {
consumer_name: "zero-test".to_string(),
stream_name: "ZERO_STREAM".to_string(),
message_lag: 0,
pending_acks: 0,
throughput_mps: 0.0,
last_updated: Utc::now(),
backpressure_active: false,
utilization_percent: 0.0,
};
assert_eq!(zero_stats.message_lag, 0);
assert_eq!(zero_stats.pending_acks, 0);
assert_eq!(zero_stats.throughput_mps, 0.0);
assert_eq!(zero_stats.utilization_percent, 0.0);
let max_stats = ConsumerLagStats {
consumer_name: "max-test".to_string(),
stream_name: "MAX_STREAM".to_string(),
message_lag: u64::MAX,
pending_acks: i64::MAX,
throughput_mps: f64::MAX,
last_updated: Utc::now(),
backpressure_active: true,
utilization_percent: 100.0,
};
assert_eq!(max_stats.message_lag, u64::MAX);
assert_eq!(max_stats.pending_acks, i64::MAX);
assert_eq!(max_stats.throughput_mps, f64::MAX);
assert_eq!(max_stats.utilization_percent, 100.0);
}
#[test]
fn test_utilization_calculation() {
let stats_25_percent = ConsumerLagStats {
consumer_name: "util-test".to_string(),
stream_name: "UTIL_STREAM".to_string(),
message_lag: 100,
pending_acks: 250,
throughput_mps: 10.0,
last_updated: Utc::now(),
backpressure_active: false,
utilization_percent: 25.0, };
assert_eq!(stats_25_percent.utilization_percent, 25.0);
let stats_100_percent = ConsumerLagStats {
consumer_name: "util-test-full".to_string(),
stream_name: "UTIL_STREAM".to_string(),
message_lag: 100,
pending_acks: 1000,
throughput_mps: 10.0,
last_updated: Utc::now(),
backpressure_active: true,
utilization_percent: 100.0, };
assert_eq!(stats_100_percent.utilization_percent, 100.0);
assert!(stats_100_percent.backpressure_active);
}
#[test]
fn test_timestamp_ordering() {
let timestamp1 = Utc::now();
std::thread::sleep(Duration::from_millis(10));
let timestamp2 = Utc::now();
let stats1 = ConsumerLagStats {
consumer_name: "time-test-1".to_string(),
stream_name: "TIME_STREAM".to_string(),
message_lag: 100,
pending_acks: 50,
throughput_mps: 10.0,
last_updated: timestamp1,
backpressure_active: false,
utilization_percent: 50.0,
};
let stats2 = ConsumerLagStats {
consumer_name: "time-test-2".to_string(),
stream_name: "TIME_STREAM".to_string(),
message_lag: 200,
pending_acks: 100,
throughput_mps: 15.0,
last_updated: timestamp2,
backpressure_active: false,
utilization_percent: 75.0,
};
assert!(stats2.last_updated > stats1.last_updated);
let time_diff = stats2
.last_updated
.signed_duration_since(stats1.last_updated);
assert!(time_diff.num_milliseconds() >= 10);
}
#[test]
fn test_error_conditions() {
let negative_stats = ConsumerLagStats {
consumer_name: "negative-test".to_string(),
stream_name: "NEG_STREAM".to_string(),
message_lag: 100,
pending_acks: -50, throughput_mps: 10.0,
last_updated: Utc::now(),
backpressure_active: false,
utilization_percent: -5.0, };
assert_eq!(negative_stats.pending_acks, -50);
assert_eq!(negative_stats.utilization_percent, -5.0);
let manager = BackpressureManager::default();
let should_apply = manager.should_apply_backpressure(100, -50);
assert!(!should_apply);
}
#[test]
fn test_backpressure_actions_comprehensive() {
let manager = BackpressureManager {
lag_threshold: 500,
pending_ack_threshold: 200,
response_actions: vec![
BackpressureAction::RouteToQuarantine,
BackpressureAction::ReduceBatchSize(5),
BackpressureAction::ExtendAckWait(Duration::from_secs(45)),
BackpressureAction::AlertOps("Critical lag detected".to_string()),
],
};
let actions = manager.generate_backpressure_response(100, 50);
assert!(actions.is_empty());
let actions = manager.generate_backpressure_response(1000, 400);
assert_eq!(actions.len(), 4);
assert!(matches!(actions[0], BackpressureAction::RouteToQuarantine));
assert!(matches!(actions[1], BackpressureAction::ReduceBatchSize(5)));
if let BackpressureAction::ExtendAckWait(duration) = &actions[2] {
assert_eq!(*duration, Duration::from_secs(45));
} else {
panic!("Expected ExtendAckWait action");
}
if let BackpressureAction::AlertOps(message) = &actions[3] {
assert_eq!(message, "Critical lag detected");
} else {
panic!("Expected AlertOps action");
}
}
}