use peat_protocol::event::{
AggregationPolicy, BandwidthAllocation, EchelonAggregator, EchelonType, EventPriority,
EventTransmitter, OverflowPolicy, PeatEvent, PropagationMode,
};
use peat_schema::event::v1::EventClass;
use std::time::{Duration, Instant};
fn make_test_event(
id: &str,
event_type: &str,
propagation: PropagationMode,
priority: EventPriority,
payload_size: usize,
) -> PeatEvent {
PeatEvent {
event_id: id.to_string(),
timestamp: Some(peat_schema::common::v1::Timestamp {
seconds: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
nanos: 0,
}),
source_node_id: format!("node-{}", id),
source_formation_id: "squad-1".to_string(),
source_instance_id: None,
event_class: EventClass::Product as i32,
event_type: event_type.to_string(),
routing: Some(AggregationPolicy {
propagation: propagation as i32,
priority: priority as i32,
ttl_seconds: 300,
aggregation_window_ms: 0, }),
payload_type_url: format!("type.peat/{}", event_type),
payload_value: vec![0u8; payload_size],
}
}
#[test]
fn test_bandwidth_reduction_through_aggregation() {
let aggregator = EchelonAggregator::new("squad-1".to_string(), EchelonType::Squad)
.with_default_window_duration(Duration::from_millis(100));
let platforms_per_squad = 8;
let detections_per_platform = 10; let test_seconds = 1;
let total_detections = platforms_per_squad * detections_per_platform * test_seconds;
for i in 0..total_detections {
let event = make_test_event(
&format!("det-{}", i),
"detection.vehicle",
PropagationMode::PropagationSummary,
EventPriority::PriorityNormal,
100, );
aggregator.receive(event).unwrap();
}
let telemetry_per_platform = 1; let total_telemetry = platforms_per_squad * telemetry_per_platform * test_seconds;
for i in 0..total_telemetry {
let event = make_test_event(
&format!("tel-{}", i),
"telemetry.cpu",
PropagationMode::PropagationQuery,
EventPriority::PriorityLow,
50,
);
aggregator.receive(event).unwrap();
}
std::thread::sleep(Duration::from_millis(300));
let summaries_generated = aggregator.flush_expired_windows();
let passthrough_events = aggregator.pop_passthrough();
let summary_events = aggregator.pop_summaries();
let raw_events = total_detections + total_telemetry;
let aggregated_events = passthrough_events.len() + summary_events.len();
println!("Raw events: {}", raw_events);
println!("Summaries generated: {}", summaries_generated);
println!("Passthrough events: {}", passthrough_events.len());
println!("Summary events: {}", summary_events.len());
println!(
"Queryable (stored locally): {}",
aggregator.queryable_count()
);
assert!(
summaries_generated >= 1,
"Expected at least 1 summary, got {}",
summaries_generated
);
assert_eq!(
aggregator.queryable_count(),
total_telemetry,
"Expected {} telemetry events stored locally",
total_telemetry
);
if raw_events > 0 && aggregated_events > 0 {
let reduction = (1.0 - (aggregated_events as f64 / raw_events as f64)) * 100.0;
println!("Bandwidth reduction: {:.1}%", reduction);
assert!(
reduction >= 90.0,
"Expected >=90% reduction, got {:.1}%",
reduction
);
}
}
#[test]
fn test_critical_event_priority_preemption() {
let mut transmitter = EventTransmitter::with_defaults();
transmitter.set_max_queue_size(EventPriority::PriorityLow, 100);
transmitter.set_max_queue_size(EventPriority::PriorityNormal, 100);
transmitter.set_max_queue_size(EventPriority::PriorityHigh, 100);
for i in 0..50 {
let event = make_test_event(
&format!("low-{}", i),
"telemetry",
PropagationMode::PropagationFull,
EventPriority::PriorityLow,
100,
);
transmitter.enqueue(event);
}
for i in 0..30 {
let event = make_test_event(
&format!("normal-{}", i),
"detection",
PropagationMode::PropagationFull,
EventPriority::PriorityNormal,
100,
);
transmitter.enqueue(event);
}
let critical_event = make_test_event(
"critical-1",
"anomaly.urgent",
PropagationMode::PropagationFull,
EventPriority::PriorityCritical,
50,
);
let start_time = Instant::now();
transmitter.enqueue(critical_event);
let transmitted = transmitter.transmit(10);
assert!(
!transmitted.is_empty(),
"Expected at least 1 event transmitted"
);
assert_eq!(
transmitted[0].event_id, "critical-1",
"CRITICAL event should be transmitted first"
);
let latency = start_time.elapsed();
println!("CRITICAL event transmission latency: {:?}", latency);
assert!(
latency < Duration::from_millis(10),
"CRITICAL latency should be < 10ms, was {:?}",
latency
);
}
#[test]
fn test_weighted_fair_queuing_distribution() {
let mut transmitter = EventTransmitter::with_defaults();
let events_per_priority = 100;
for i in 0..events_per_priority {
transmitter.enqueue(make_test_event(
&format!("high-{}", i),
"detection",
PropagationMode::PropagationFull,
EventPriority::PriorityHigh,
50,
));
transmitter.enqueue(make_test_event(
&format!("normal-{}", i),
"status",
PropagationMode::PropagationFull,
EventPriority::PriorityNormal,
50,
));
transmitter.enqueue(make_test_event(
&format!("low-{}", i),
"telemetry",
PropagationMode::PropagationFull,
EventPriority::PriorityLow,
50,
));
}
let batch_size = 100;
let transmitted = transmitter.transmit(batch_size);
let high_count = transmitted
.iter()
.filter(|e| e.event_id.starts_with("high"))
.count();
let normal_count = transmitted
.iter()
.filter(|e| e.event_id.starts_with("normal"))
.count();
let low_count = transmitted
.iter()
.filter(|e| e.event_id.starts_with("low"))
.count();
println!(
"Distribution - HIGH: {}, NORMAL: {}, LOW: {}",
high_count, normal_count, low_count
);
let total = high_count + normal_count + low_count;
if total > 0 {
let high_pct = (high_count as f64 / total as f64) * 100.0;
let normal_pct = (normal_count as f64 / total as f64) * 100.0;
let low_pct = (low_count as f64 / total as f64) * 100.0;
println!(
"Percentages - HIGH: {:.1}%, NORMAL: {:.1}%, LOW: {:.1}%",
high_pct, normal_pct, low_pct
);
assert!(
high_count >= normal_count,
"HIGH should get >= NORMAL events"
);
assert!(normal_count >= low_count, "NORMAL should get >= LOW events");
}
}
#[test]
fn test_overflow_drops_lowest_priority() {
let mut transmitter = EventTransmitter::with_defaults();
transmitter.set_max_queue_size(EventPriority::PriorityHigh, 10);
transmitter.set_overflow_policy(OverflowPolicy::RemoveLowestPriority);
for i in 0..20 {
transmitter.enqueue(make_test_event(
&format!("low-{}", i),
"telemetry",
PropagationMode::PropagationFull,
EventPriority::PriorityLow,
50,
));
}
for i in 0..15 {
transmitter.enqueue(make_test_event(
&format!("high-{}", i),
"detection",
PropagationMode::PropagationFull,
EventPriority::PriorityHigh,
50,
));
}
let stats = transmitter.stats();
println!("LOW dropped: {}", stats.dropped[3]);
assert!(
stats.dropped[3] > 0,
"Expected LOW priority events to be dropped"
);
assert_eq!(stats.dropped[0], 0, "CRITICAL should never be dropped");
assert_eq!(
stats.dropped[1], 0,
"HIGH should not be dropped when LOW available"
);
}
#[test]
fn test_aggregation_window_summarization() {
let aggregator = EchelonAggregator::new("squad-alpha".to_string(), EchelonType::Squad)
.with_default_window_duration(Duration::from_millis(50));
let source_nodes = ["node-1", "node-2", "node-3", "node-4"];
for (i, node) in source_nodes.iter().enumerate() {
let mut event = make_test_event(
&format!("det-{}", i),
"detection.vehicle",
PropagationMode::PropagationSummary,
EventPriority::PriorityNormal,
100,
);
event.source_node_id = node.to_string();
aggregator.receive(event).unwrap();
}
std::thread::sleep(Duration::from_millis(200));
let summaries = aggregator.flush_expired_windows();
assert_eq!(summaries, 1, "Expected exactly 1 summary");
let summary_events = aggregator.pop_summaries();
assert_eq!(summary_events.len(), 1, "Expected 1 summary event");
let summary_event = &summary_events[0];
assert!(
summary_event.event_type.contains("summary"),
"Event type should indicate summary"
);
assert_eq!(
summary_event.source_formation_id, "squad-alpha",
"Summary should be from the aggregator echelon"
);
}
#[test]
fn test_query_mode_local_storage() {
let aggregator = EchelonAggregator::new("squad-bravo".to_string(), EchelonType::Squad);
let telemetry_events = 50;
for i in 0..telemetry_events {
let event = make_test_event(
&format!("tel-{}", i),
"telemetry.cpu",
PropagationMode::PropagationQuery,
EventPriority::PriorityLow,
30,
);
aggregator.receive(event).unwrap();
}
assert_eq!(
aggregator.queryable_count(),
telemetry_events,
"All Query mode events should be stored"
);
assert_eq!(
aggregator.passthrough_count(),
0,
"Query mode events should not be in passthrough queue"
);
let queried = aggregator.query_local(Some("telemetry.cpu"));
assert_eq!(
queried.len(),
telemetry_events,
"Query should return all stored telemetry events"
);
let empty_query = aggregator.query_local(Some("nonexistent"));
assert!(
empty_query.is_empty(),
"Query for non-existent type should be empty"
);
}
#[test]
fn test_full_mode_passthrough() {
let aggregator = EchelonAggregator::new("squad-charlie".to_string(), EchelonType::Squad);
let anomaly_events = 10;
for i in 0..anomaly_events {
let event = make_test_event(
&format!("anomaly-{}", i),
"anomaly.detection",
PropagationMode::PropagationFull,
EventPriority::PriorityHigh,
80,
);
aggregator.receive(event).unwrap();
}
assert_eq!(
aggregator.passthrough_count(),
anomaly_events,
"All Full mode events should be in passthrough"
);
let passed_through = aggregator.pop_passthrough();
assert_eq!(
passed_through.len(),
anomaly_events,
"All events should be returned on pop"
);
}
#[test]
fn test_separate_aggregation_windows_per_event_type() {
let aggregator = EchelonAggregator::new("squad-delta".to_string(), EchelonType::Squad)
.with_default_window_duration(Duration::from_millis(50));
let event_types = ["detection.vehicle", "detection.person", "sensor.radar"];
for event_type in event_types.iter() {
for i in 0..5 {
let event = make_test_event(
&format!("{}-{}", event_type, i),
event_type,
PropagationMode::PropagationSummary,
EventPriority::PriorityNormal,
50,
);
aggregator.receive(event).unwrap();
}
}
assert_eq!(
aggregator.window_count(),
event_types.len(),
"Each event type should have its own window"
);
std::thread::sleep(Duration::from_millis(200));
let summaries = aggregator.flush_expired_windows();
assert_eq!(
summaries,
event_types.len(),
"Each window should produce one summary"
);
}
#[test]
fn test_custom_bandwidth_allocation() {
let custom_allocation = BandwidthAllocation::with_percentages(
10_000_000, 20, 40, 25, 15, );
let mut transmitter = EventTransmitter::new(custom_allocation);
let available = transmitter.available_bandwidth();
assert!(available[0] > 0.0, "CRITICAL bucket should have capacity");
assert!(available[1] > 0.0, "HIGH bucket should have capacity");
assert!(available[2] > 0.0, "NORMAL bucket should have capacity");
assert!(available[3] > 0.0, "LOW bucket should have capacity");
}
#[test]
fn test_echelon_type_hierarchy() {
let squad_agg = EchelonAggregator::new("squad-echo".to_string(), EchelonType::Squad);
let platoon_agg = EchelonAggregator::new("platoon-1".to_string(), EchelonType::Platoon);
let company_agg = EchelonAggregator::new("company-alpha".to_string(), EchelonType::Company);
assert_eq!(squad_agg.echelon_type(), EchelonType::Squad);
assert_eq!(platoon_agg.echelon_type(), EchelonType::Platoon);
assert_eq!(company_agg.echelon_type(), EchelonType::Company);
assert_eq!(squad_agg.echelon_id(), "squad-echo");
assert_eq!(platoon_agg.echelon_id(), "platoon-1");
assert_eq!(company_agg.echelon_id(), "company-alpha");
}
#[test]
fn test_transmitter_statistics() {
let mut transmitter = EventTransmitter::with_defaults();
for i in 0..10 {
transmitter.enqueue(make_test_event(
&format!("critical-{}", i),
"urgent",
PropagationMode::PropagationFull,
EventPriority::PriorityCritical,
100,
));
transmitter.enqueue(make_test_event(
&format!("high-{}", i),
"detection",
PropagationMode::PropagationFull,
EventPriority::PriorityHigh,
100,
));
}
let _transmitted = transmitter.transmit(100);
let stats = transmitter.stats();
assert_eq!(
stats.transmitted[0], 10,
"Should have transmitted 10 CRITICAL"
);
assert_eq!(stats.transmitted[1], 10, "Should have transmitted 10 HIGH");
assert!(
stats.bytes_transmitted[0] > 0,
"Should track CRITICAL bytes"
);
assert!(stats.bytes_transmitted[1] > 0, "Should track HIGH bytes");
transmitter.reset_stats();
let reset_stats = transmitter.stats();
assert_eq!(reset_stats.transmitted[0], 0, "Stats should be reset");
}