Skip to main content

peat_protocol/event/
aggregator.rs

1//! Echelon Aggregation for Event Routing (ADR-027 Phase 2)
2//!
3//! The EchelonAggregator processes incoming events from subordinates and applies
4//! aggregation policies at echelon boundaries (squad → platoon → company).
5//!
6//! ## Aggregation Flow
7//!
8//! ```text
9//! Subordinate Events → EchelonAggregator → Parent Echelon
10//!                           ↓
11//!                    ┌──────┴──────┐
12//!                    │ PropagationMode │
13//!                    └──────┬──────┘
14//!        ┌─────────────┬────┴───────┬─────────────┐
15//!        ↓             ↓            ↓             ↓
16//!      Full         Summary       Query         Local
17//!   (passthrough)  (aggregate)  (store only)  (ignored)
18//! ```
19
20use super::priority_queue::PriorityEventQueue;
21use super::summary::{DefaultSummaryStrategy, SummaryStrategy};
22use crate::Result;
23use peat_schema::common::v1::Timestamp;
24use peat_schema::event::v1::{
25    AggregationPolicy, EventClass, EventPriority, EventSummary, PeatEvent, PropagationMode,
26};
27use std::collections::{HashMap, HashSet};
28use std::sync::{Arc, RwLock};
29use std::time::{Duration, Instant, SystemTime};
30
31/// Echelon type for aggregation context
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum EchelonType {
34    /// Squad level (8-12 platforms)
35    Squad,
36    /// Platoon level (3-4 squads)
37    Platoon,
38    /// Company level (3-4 platoons)
39    Company,
40}
41
42impl std::fmt::Display for EchelonType {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        match self {
45            EchelonType::Squad => write!(f, "squad"),
46            EchelonType::Platoon => write!(f, "platoon"),
47            EchelonType::Company => write!(f, "company"),
48        }
49    }
50}
51
52/// Aggregation window for collecting events before summarization
53#[derive(Debug)]
54pub struct AggregationWindow {
55    /// Event class being aggregated
56    event_class: EventClass,
57
58    /// Event type identifier
59    event_type: String,
60
61    /// Window duration for aggregation
62    window_duration: Duration,
63
64    /// When this window started
65    window_start: Instant,
66
67    /// Events collected in this window
68    events: Vec<PeatEvent>,
69
70    /// Source nodes that contributed events
71    source_nodes: HashSet<String>,
72}
73
74impl AggregationWindow {
75    /// Create a new aggregation window
76    pub fn new(event_class: EventClass, event_type: &str, window_duration: Duration) -> Self {
77        Self {
78            event_class,
79            event_type: event_type.to_string(),
80            window_duration,
81            window_start: Instant::now(),
82            events: Vec::new(),
83            source_nodes: HashSet::new(),
84        }
85    }
86
87    /// Add an event to this window
88    pub fn add(&mut self, event: PeatEvent) {
89        self.source_nodes.insert(event.source_node_id.clone());
90        self.events.push(event);
91    }
92
93    /// Check if the window should be flushed (time expired)
94    pub fn should_flush(&self) -> bool {
95        self.window_start.elapsed() >= self.window_duration
96    }
97
98    /// Get the number of events in this window
99    pub fn event_count(&self) -> usize {
100        self.events.len()
101    }
102
103    /// Get events in this window
104    pub fn events(&self) -> &[PeatEvent] {
105        &self.events
106    }
107
108    /// Get source node IDs
109    pub fn source_nodes(&self) -> &HashSet<String> {
110        &self.source_nodes
111    }
112
113    /// Get the event class
114    pub fn event_class(&self) -> EventClass {
115        self.event_class
116    }
117
118    /// Get the event type
119    pub fn event_type(&self) -> &str {
120        &self.event_type
121    }
122
123    /// Get window start time
124    pub fn window_start(&self) -> Instant {
125        self.window_start
126    }
127
128    /// Reset the window for a new aggregation period
129    pub fn reset(&mut self) {
130        self.window_start = Instant::now();
131        self.events.clear();
132        self.source_nodes.clear();
133    }
134}
135
136/// Key for identifying aggregation windows
137type WindowKey = (i32, String); // (event_class as i32, event_type)
138
139/// Event aggregator at echelon boundary
140///
141/// Processes incoming events from subordinates and applies aggregation policies.
142/// Events are routed based on their `PropagationMode`:
143/// - `Full`: Passed through immediately to the parent
144/// - `Summary`: Collected in aggregation windows and summarized
145/// - `Query`: Stored locally for query access
146/// - `Local`: Ignored (should not reach aggregator)
147#[derive(Debug)]
148pub struct EchelonAggregator {
149    /// Unique identifier for this echelon
150    echelon_id: String,
151
152    /// Type of echelon (Squad, Platoon, Company)
153    echelon_type: EchelonType,
154
155    /// Aggregation windows by (event_class, event_type)
156    windows: Arc<RwLock<HashMap<WindowKey, AggregationWindow>>>,
157
158    /// Events to forward without aggregation (passthrough)
159    passthrough_queue: Arc<RwLock<PriorityEventQueue>>,
160
161    /// Local event storage for Query mode events
162    queryable_store: Arc<RwLock<HashMap<String, PeatEvent>>>,
163
164    /// Summary strategies by event type
165    summary_strategies: Arc<RwLock<HashMap<String, Box<dyn SummaryStrategy>>>>,
166
167    /// Generated summaries ready for transmission
168    summary_queue: Arc<RwLock<Vec<PeatEvent>>>,
169
170    /// Counter for generating unique summary IDs
171    summary_counter: Arc<RwLock<u64>>,
172
173    /// Default aggregation window duration (used when not specified in policy)
174    default_window_duration: Duration,
175}
176
177impl EchelonAggregator {
178    /// Create a new echelon aggregator
179    pub fn new(echelon_id: String, echelon_type: EchelonType) -> Self {
180        let mut strategies: HashMap<String, Box<dyn SummaryStrategy>> = HashMap::new();
181
182        // Register default strategies
183        strategies.insert(
184            "detection".to_string(),
185            Box::new(DefaultSummaryStrategy::new("detection")),
186        );
187        strategies.insert(
188            "telemetry".to_string(),
189            Box::new(DefaultSummaryStrategy::new("telemetry")),
190        );
191
192        Self {
193            echelon_id,
194            echelon_type,
195            windows: Arc::new(RwLock::new(HashMap::new())),
196            passthrough_queue: Arc::new(RwLock::new(PriorityEventQueue::new())),
197            queryable_store: Arc::new(RwLock::new(HashMap::new())),
198            summary_strategies: Arc::new(RwLock::new(strategies)),
199            summary_queue: Arc::new(RwLock::new(Vec::new())),
200            summary_counter: Arc::new(RwLock::new(0)),
201            default_window_duration: Duration::from_secs(1),
202        }
203    }
204
205    /// Set the default aggregation window duration
206    pub fn with_default_window_duration(mut self, duration: Duration) -> Self {
207        self.default_window_duration = duration;
208        self
209    }
210
211    /// Register a custom summary strategy for an event type
212    pub fn register_strategy(&self, strategy: Box<dyn SummaryStrategy>) {
213        let mut strategies = self.summary_strategies.write().unwrap();
214        strategies.insert(strategy.event_type().to_string(), strategy);
215    }
216
217    /// Process an incoming event from a subordinate
218    ///
219    /// Routes the event based on its `PropagationMode`:
220    /// - `Full`: Added to passthrough queue for immediate forwarding
221    /// - `Summary`: Added to aggregation window
222    /// - `Query`: Stored locally
223    /// - `Local`: Ignored
224    pub fn receive(&self, event: PeatEvent) -> Result<()> {
225        let routing = event.routing.as_ref();
226
227        let propagation = routing
228            .map(|r| {
229                PropagationMode::try_from(r.propagation).unwrap_or(PropagationMode::PropagationFull)
230            })
231            .unwrap_or(PropagationMode::PropagationFull);
232
233        match propagation {
234            PropagationMode::PropagationFull => {
235                // Forward immediately without aggregation
236                let priority = routing
237                    .map(|r| {
238                        EventPriority::try_from(r.priority).unwrap_or(EventPriority::PriorityNormal)
239                    })
240                    .unwrap_or(EventPriority::PriorityNormal);
241
242                let mut queue = self.passthrough_queue.write().unwrap();
243                queue.push(event);
244                let _ = priority; // Used implicitly via event.routing in push
245            }
246
247            PropagationMode::PropagationSummary => {
248                // Add to aggregation window
249                let key = (event.event_class, event.event_type.clone());
250                let window_duration = routing
251                    .map(|r| {
252                        if r.aggregation_window_ms > 0 {
253                            Duration::from_millis(r.aggregation_window_ms as u64)
254                        } else {
255                            self.default_window_duration
256                        }
257                    })
258                    .unwrap_or(self.default_window_duration);
259
260                let event_class =
261                    EventClass::try_from(event.event_class).unwrap_or(EventClass::Unspecified);
262
263                let mut windows = self.windows.write().unwrap();
264                let window = windows.entry(key).or_insert_with(|| {
265                    AggregationWindow::new(event_class, &event.event_type, window_duration)
266                });
267
268                window.add(event);
269            }
270
271            PropagationMode::PropagationQuery => {
272                // Store locally for query access
273                let mut store = self.queryable_store.write().unwrap();
274                store.insert(event.event_id.clone(), event);
275            }
276
277            PropagationMode::PropagationLocal => {
278                // Should not reach aggregator, but handle gracefully by ignoring
279            }
280        }
281
282        Ok(())
283    }
284
285    /// Flush all windows that have expired and generate summaries
286    ///
287    /// Returns the number of summaries generated.
288    pub fn flush_expired_windows(&self) -> usize {
289        let mut windows = self.windows.write().unwrap();
290        let mut summaries_generated = 0;
291
292        let expired_keys: Vec<WindowKey> = windows
293            .iter()
294            .filter(|(_, w)| w.should_flush() && !w.events.is_empty())
295            .map(|(k, _)| k.clone())
296            .collect();
297
298        for key in expired_keys {
299            if let Some(window) = windows.get_mut(&key) {
300                if let Some(summary_event) = self.generate_summary(window) {
301                    let mut queue = self.summary_queue.write().unwrap();
302                    queue.push(summary_event);
303                    summaries_generated += 1;
304                }
305                window.reset();
306            }
307        }
308
309        summaries_generated
310    }
311
312    /// Force flush all windows regardless of expiry
313    ///
314    /// Useful for graceful shutdown or immediate summary generation.
315    pub fn flush_all_windows(&self) -> usize {
316        let mut windows = self.windows.write().unwrap();
317        let mut summaries_generated = 0;
318
319        let non_empty_keys: Vec<WindowKey> = windows
320            .iter()
321            .filter(|(_, w)| !w.events.is_empty())
322            .map(|(k, _)| k.clone())
323            .collect();
324
325        for key in non_empty_keys {
326            if let Some(window) = windows.get_mut(&key) {
327                if let Some(summary_event) = self.generate_summary(window) {
328                    let mut queue = self.summary_queue.write().unwrap();
329                    queue.push(summary_event);
330                    summaries_generated += 1;
331                }
332                window.reset();
333            }
334        }
335
336        summaries_generated
337    }
338
339    /// Generate a summary event from an aggregation window
340    fn generate_summary(&self, window: &AggregationWindow) -> Option<PeatEvent> {
341        if window.events.is_empty() {
342            return None;
343        }
344
345        let summary_id = self.generate_summary_id();
346        let now = current_timestamp();
347
348        // Get the appropriate strategy or use default
349        let strategies = self.summary_strategies.read().unwrap();
350
351        // Try to find a matching strategy by event type prefix
352        let event_type_base = window
353            .event_type
354            .split('.')
355            .next()
356            .unwrap_or(&window.event_type);
357
358        let summary_payload = if let Some(strategy) = strategies.get(event_type_base) {
359            strategy.summarize(window.events())
360        } else {
361            // Use default strategy
362            DefaultSummaryStrategy::new(&window.event_type).summarize(window.events())
363        };
364
365        // Create the EventSummary
366        let event_summary = EventSummary {
367            formation_id: self.echelon_id.clone(),
368            window_start: Some(now), // We'd need to track actual start time
369            window_end: Some(now),
370            event_class: window.event_class as i32,
371            event_type: window.event_type.clone(),
372            event_count: window.event_count() as u32,
373            source_node_ids: window.source_nodes().iter().cloned().collect(),
374            summary_type_url: format!("type.peat/summary.{}", window.event_type),
375            summary_value: summary_payload,
376        };
377
378        // Wrap summary in PeatEvent for transmission
379        Some(PeatEvent {
380            event_id: summary_id,
381            timestamp: Some(now),
382            source_node_id: self.echelon_id.clone(),
383            source_formation_id: self.echelon_id.clone(),
384            source_instance_id: None,
385            event_class: window.event_class as i32,
386            event_type: format!("{}_summary", window.event_type),
387            routing: Some(AggregationPolicy {
388                propagation: PropagationMode::PropagationFull as i32, // Summaries propagate fully
389                priority: EventPriority::PriorityNormal as i32,
390                ttl_seconds: 300,
391                aggregation_window_ms: 0,
392            }),
393            payload_type_url: format!("type.peat/event_summary.{}", window.event_type),
394            payload_value: prost::Message::encode_to_vec(&event_summary),
395        })
396    }
397
398    /// Pop all passthrough events (for transmission to parent)
399    pub fn pop_passthrough(&self) -> Vec<PeatEvent> {
400        let mut queue = self.passthrough_queue.write().unwrap();
401        let mut events = Vec::new();
402
403        // Pop critical first
404        events.extend(queue.pop_critical());
405
406        // Then weighted fair queue
407        events.extend(queue.pop_weighted(100)); // Configurable batch size
408
409        events
410    }
411
412    /// Pop all generated summaries (for transmission to parent)
413    pub fn pop_summaries(&self) -> Vec<PeatEvent> {
414        let mut queue = self.summary_queue.write().unwrap();
415        queue.drain(..).collect()
416    }
417
418    /// Pop all events ready for transmission (passthrough + summaries)
419    pub fn pop_all(&self) -> Vec<PeatEvent> {
420        let mut events = self.pop_passthrough();
421        events.extend(self.pop_summaries());
422        events
423    }
424
425    /// Query locally stored events
426    pub fn query_local(&self, event_type: Option<&str>) -> Vec<PeatEvent> {
427        let store = self.queryable_store.read().unwrap();
428        store
429            .values()
430            .filter(|e| event_type.is_none() || Some(e.event_type.as_str()) == event_type)
431            .cloned()
432            .collect()
433    }
434
435    /// Get a specific locally stored event by ID
436    pub fn get_local(&self, event_id: &str) -> Option<PeatEvent> {
437        let store = self.queryable_store.read().unwrap();
438        store.get(event_id).cloned()
439    }
440
441    /// Get count of events in passthrough queue
442    pub fn passthrough_count(&self) -> usize {
443        let queue = self.passthrough_queue.read().unwrap();
444        queue.len()
445    }
446
447    /// Get count of events in queryable store
448    pub fn queryable_count(&self) -> usize {
449        let store = self.queryable_store.read().unwrap();
450        store.len()
451    }
452
453    /// Get count of pending summaries
454    pub fn summary_count(&self) -> usize {
455        let queue = self.summary_queue.read().unwrap();
456        queue.len()
457    }
458
459    /// Get count of active aggregation windows
460    pub fn window_count(&self) -> usize {
461        let windows = self.windows.read().unwrap();
462        windows.len()
463    }
464
465    /// Get the echelon ID
466    pub fn echelon_id(&self) -> &str {
467        &self.echelon_id
468    }
469
470    /// Get the echelon type
471    pub fn echelon_type(&self) -> EchelonType {
472        self.echelon_type
473    }
474
475    /// Clear expired events from queryable store based on TTL
476    pub fn enforce_ttl(&self) {
477        let now = SystemTime::now()
478            .duration_since(SystemTime::UNIX_EPOCH)
479            .unwrap();
480        let now_secs = now.as_secs();
481
482        let mut store = self.queryable_store.write().unwrap();
483        store.retain(|_, event| {
484            if let Some(routing) = &event.routing {
485                if routing.ttl_seconds > 0 {
486                    if let Some(ts) = &event.timestamp {
487                        let event_secs = ts.seconds;
488                        let expiry = event_secs + routing.ttl_seconds as u64;
489                        return now_secs < expiry;
490                    }
491                }
492            }
493            true // Keep events without TTL or timestamp
494        });
495    }
496
497    fn generate_summary_id(&self) -> String {
498        let mut counter = self.summary_counter.write().unwrap();
499        *counter += 1;
500        format!("{}-summary-{}", self.echelon_id, *counter)
501    }
502}
503
504/// Get current timestamp
505fn current_timestamp() -> Timestamp {
506    let now = SystemTime::now()
507        .duration_since(SystemTime::UNIX_EPOCH)
508        .unwrap();
509    Timestamp {
510        seconds: now.as_secs(),
511        nanos: now.subsec_nanos(),
512    }
513}
514
515#[cfg(test)]
516mod tests {
517    use super::*;
518
519    fn make_event(
520        id: &str,
521        event_type: &str,
522        propagation: PropagationMode,
523        priority: EventPriority,
524    ) -> PeatEvent {
525        PeatEvent {
526            event_id: id.to_string(),
527            timestamp: Some(current_timestamp()),
528            source_node_id: format!("node-{}", id),
529            source_formation_id: "squad-1".to_string(),
530            source_instance_id: None,
531            event_class: EventClass::Product as i32,
532            event_type: event_type.to_string(),
533            routing: Some(AggregationPolicy {
534                propagation: propagation as i32,
535                priority: priority as i32,
536                ttl_seconds: 300,
537                aggregation_window_ms: 0, // Use default window duration from aggregator
538            }),
539            payload_type_url: String::new(),
540            payload_value: vec![],
541        }
542    }
543
544    #[test]
545    fn test_aggregator_creation() {
546        let aggregator = EchelonAggregator::new("squad-1".to_string(), EchelonType::Squad);
547        assert_eq!(aggregator.echelon_id(), "squad-1");
548        assert_eq!(aggregator.echelon_type(), EchelonType::Squad);
549    }
550
551    #[test]
552    fn test_full_propagation_passthrough() {
553        let aggregator = EchelonAggregator::new("squad-1".to_string(), EchelonType::Squad);
554
555        let event = make_event(
556            "evt-1",
557            "detection",
558            PropagationMode::PropagationFull,
559            EventPriority::PriorityNormal,
560        );
561        aggregator.receive(event).unwrap();
562
563        assert_eq!(aggregator.passthrough_count(), 1);
564        assert_eq!(aggregator.queryable_count(), 0);
565
566        let events = aggregator.pop_passthrough();
567        assert_eq!(events.len(), 1);
568        assert_eq!(events[0].event_id, "evt-1");
569    }
570
571    #[test]
572    fn test_query_propagation_stored_locally() {
573        let aggregator = EchelonAggregator::new("squad-1".to_string(), EchelonType::Squad);
574
575        let event = make_event(
576            "evt-1",
577            "telemetry.cpu",
578            PropagationMode::PropagationQuery,
579            EventPriority::PriorityLow,
580        );
581        aggregator.receive(event).unwrap();
582
583        assert_eq!(aggregator.passthrough_count(), 0);
584        assert_eq!(aggregator.queryable_count(), 1);
585
586        let local = aggregator.query_local(Some("telemetry.cpu"));
587        assert_eq!(local.len(), 1);
588        assert_eq!(local[0].event_id, "evt-1");
589    }
590
591    #[test]
592    fn test_summary_propagation_aggregated() {
593        let aggregator = EchelonAggregator::new("squad-1".to_string(), EchelonType::Squad)
594            .with_default_window_duration(Duration::from_millis(50));
595
596        // Add multiple events for aggregation
597        for i in 0..5 {
598            let event = make_event(
599                &format!("evt-{}", i),
600                "detection.vehicle",
601                PropagationMode::PropagationSummary,
602                EventPriority::PriorityNormal,
603            );
604            aggregator.receive(event).unwrap();
605        }
606
607        assert_eq!(aggregator.window_count(), 1);
608        assert_eq!(aggregator.passthrough_count(), 0);
609
610        // Wait for window to expire
611        std::thread::sleep(Duration::from_millis(100));
612
613        let summaries = aggregator.flush_expired_windows();
614        assert_eq!(summaries, 1);
615
616        let summary_events = aggregator.pop_summaries();
617        assert_eq!(summary_events.len(), 1);
618        assert!(summary_events[0]
619            .event_type
620            .contains("detection.vehicle_summary"));
621    }
622
623    #[test]
624    fn test_local_propagation_ignored() {
625        let aggregator = EchelonAggregator::new("squad-1".to_string(), EchelonType::Squad);
626
627        let event = make_event(
628            "evt-1",
629            "debug.trace",
630            PropagationMode::PropagationLocal,
631            EventPriority::PriorityLow,
632        );
633        aggregator.receive(event).unwrap();
634
635        assert_eq!(aggregator.passthrough_count(), 0);
636        assert_eq!(aggregator.queryable_count(), 0);
637        assert_eq!(aggregator.window_count(), 0);
638    }
639
640    #[test]
641    fn test_critical_events_passthrough() {
642        let aggregator = EchelonAggregator::new("squad-1".to_string(), EchelonType::Squad);
643
644        // Add critical event
645        let event = make_event(
646            "critical-1",
647            "anomaly.urgent",
648            PropagationMode::PropagationFull,
649            EventPriority::PriorityCritical,
650        );
651        aggregator.receive(event).unwrap();
652
653        // Add normal event
654        let event = make_event(
655            "normal-1",
656            "detection",
657            PropagationMode::PropagationFull,
658            EventPriority::PriorityNormal,
659        );
660        aggregator.receive(event).unwrap();
661
662        let events = aggregator.pop_passthrough();
663        assert_eq!(events.len(), 2);
664        // Critical should be first
665        assert_eq!(events[0].event_id, "critical-1");
666    }
667
668    #[test]
669    fn test_flush_all_windows() {
670        let aggregator = EchelonAggregator::new("squad-1".to_string(), EchelonType::Squad)
671            .with_default_window_duration(Duration::from_secs(3600)); // Long window
672
673        // Add events
674        for i in 0..3 {
675            let event = make_event(
676                &format!("evt-{}", i),
677                "detection",
678                PropagationMode::PropagationSummary,
679                EventPriority::PriorityNormal,
680            );
681            aggregator.receive(event).unwrap();
682        }
683
684        // Force flush all (even though window hasn't expired)
685        let summaries = aggregator.flush_all_windows();
686        assert_eq!(summaries, 1);
687
688        let summary_events = aggregator.pop_summaries();
689        assert_eq!(summary_events.len(), 1);
690    }
691
692    #[test]
693    fn test_multiple_event_types_separate_windows() {
694        let aggregator = EchelonAggregator::new("squad-1".to_string(), EchelonType::Squad)
695            .with_default_window_duration(Duration::from_millis(50));
696
697        // Add detection events
698        aggregator
699            .receive(make_event(
700                "det-1",
701                "detection.vehicle",
702                PropagationMode::PropagationSummary,
703                EventPriority::PriorityNormal,
704            ))
705            .unwrap();
706
707        // Add telemetry events
708        aggregator
709            .receive(make_event(
710                "tel-1",
711                "telemetry.cpu",
712                PropagationMode::PropagationSummary,
713                EventPriority::PriorityNormal,
714            ))
715            .unwrap();
716
717        assert_eq!(aggregator.window_count(), 2);
718
719        std::thread::sleep(Duration::from_millis(100));
720        let summaries = aggregator.flush_expired_windows();
721        assert_eq!(summaries, 2);
722    }
723
724    #[test]
725    fn test_pop_all_includes_passthrough_and_summaries() {
726        let aggregator = EchelonAggregator::new("squad-1".to_string(), EchelonType::Squad)
727            .with_default_window_duration(Duration::from_millis(50));
728
729        // Add passthrough event
730        aggregator
731            .receive(make_event(
732                "pass-1",
733                "anomaly",
734                PropagationMode::PropagationFull,
735                EventPriority::PriorityHigh,
736            ))
737            .unwrap();
738
739        // Add summary event
740        aggregator
741            .receive(make_event(
742                "sum-1",
743                "detection",
744                PropagationMode::PropagationSummary,
745                EventPriority::PriorityNormal,
746            ))
747            .unwrap();
748
749        std::thread::sleep(Duration::from_millis(100));
750        aggregator.flush_expired_windows();
751
752        let all = aggregator.pop_all();
753        assert_eq!(all.len(), 2); // 1 passthrough + 1 summary
754    }
755
756    #[test]
757    fn test_source_nodes_tracked_in_window() {
758        let aggregator = EchelonAggregator::new("squad-1".to_string(), EchelonType::Squad)
759            .with_default_window_duration(Duration::from_millis(50));
760
761        // Add events from different nodes
762        for i in 0..3 {
763            let mut event = make_event(
764                &format!("evt-{}", i),
765                "detection",
766                PropagationMode::PropagationSummary,
767                EventPriority::PriorityNormal,
768            );
769            event.source_node_id = format!("node-{}", i);
770            aggregator.receive(event).unwrap();
771        }
772
773        std::thread::sleep(Duration::from_millis(100));
774        aggregator.flush_expired_windows();
775
776        let summaries = aggregator.pop_summaries();
777        assert_eq!(summaries.len(), 1);
778
779        // Decode the summary and check source nodes
780        let summary: EventSummary =
781            prost::Message::decode(&summaries[0].payload_value[..]).unwrap();
782        assert_eq!(summary.source_node_ids.len(), 3);
783        assert_eq!(summary.event_count, 3);
784    }
785}