1use super::priority_queue::PriorityEventQueue;
21use super::summary::{DefaultSummaryStrategy, SummaryStrategy};
22use crate::Result;
23use peat_schema::common::v1::Timestamp;
24use peat_schema::event::v1::{
25 AggregationPolicy, EventClass, EventPriority, EventSummary, PeatEvent, PropagationMode,
26};
27use std::collections::{HashMap, HashSet};
28use std::sync::{Arc, RwLock};
29use std::time::{Duration, Instant, SystemTime};
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum EchelonType {
34 Squad,
36 Platoon,
38 Company,
40}
41
42impl std::fmt::Display for EchelonType {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 match self {
45 EchelonType::Squad => write!(f, "squad"),
46 EchelonType::Platoon => write!(f, "platoon"),
47 EchelonType::Company => write!(f, "company"),
48 }
49 }
50}
51
52#[derive(Debug)]
54pub struct AggregationWindow {
55 event_class: EventClass,
57
58 event_type: String,
60
61 window_duration: Duration,
63
64 window_start: Instant,
66
67 events: Vec<PeatEvent>,
69
70 source_nodes: HashSet<String>,
72}
73
74impl AggregationWindow {
75 pub fn new(event_class: EventClass, event_type: &str, window_duration: Duration) -> Self {
77 Self {
78 event_class,
79 event_type: event_type.to_string(),
80 window_duration,
81 window_start: Instant::now(),
82 events: Vec::new(),
83 source_nodes: HashSet::new(),
84 }
85 }
86
87 pub fn add(&mut self, event: PeatEvent) {
89 self.source_nodes.insert(event.source_node_id.clone());
90 self.events.push(event);
91 }
92
93 pub fn should_flush(&self) -> bool {
95 self.window_start.elapsed() >= self.window_duration
96 }
97
98 pub fn event_count(&self) -> usize {
100 self.events.len()
101 }
102
103 pub fn events(&self) -> &[PeatEvent] {
105 &self.events
106 }
107
108 pub fn source_nodes(&self) -> &HashSet<String> {
110 &self.source_nodes
111 }
112
113 pub fn event_class(&self) -> EventClass {
115 self.event_class
116 }
117
118 pub fn event_type(&self) -> &str {
120 &self.event_type
121 }
122
123 pub fn window_start(&self) -> Instant {
125 self.window_start
126 }
127
128 pub fn reset(&mut self) {
130 self.window_start = Instant::now();
131 self.events.clear();
132 self.source_nodes.clear();
133 }
134}
135
136type WindowKey = (i32, String); #[derive(Debug)]
148pub struct EchelonAggregator {
149 echelon_id: String,
151
152 echelon_type: EchelonType,
154
155 windows: Arc<RwLock<HashMap<WindowKey, AggregationWindow>>>,
157
158 passthrough_queue: Arc<RwLock<PriorityEventQueue>>,
160
161 queryable_store: Arc<RwLock<HashMap<String, PeatEvent>>>,
163
164 summary_strategies: Arc<RwLock<HashMap<String, Box<dyn SummaryStrategy>>>>,
166
167 summary_queue: Arc<RwLock<Vec<PeatEvent>>>,
169
170 summary_counter: Arc<RwLock<u64>>,
172
173 default_window_duration: Duration,
175}
176
177impl EchelonAggregator {
178 pub fn new(echelon_id: String, echelon_type: EchelonType) -> Self {
180 let mut strategies: HashMap<String, Box<dyn SummaryStrategy>> = HashMap::new();
181
182 strategies.insert(
184 "detection".to_string(),
185 Box::new(DefaultSummaryStrategy::new("detection")),
186 );
187 strategies.insert(
188 "telemetry".to_string(),
189 Box::new(DefaultSummaryStrategy::new("telemetry")),
190 );
191
192 Self {
193 echelon_id,
194 echelon_type,
195 windows: Arc::new(RwLock::new(HashMap::new())),
196 passthrough_queue: Arc::new(RwLock::new(PriorityEventQueue::new())),
197 queryable_store: Arc::new(RwLock::new(HashMap::new())),
198 summary_strategies: Arc::new(RwLock::new(strategies)),
199 summary_queue: Arc::new(RwLock::new(Vec::new())),
200 summary_counter: Arc::new(RwLock::new(0)),
201 default_window_duration: Duration::from_secs(1),
202 }
203 }
204
205 pub fn with_default_window_duration(mut self, duration: Duration) -> Self {
207 self.default_window_duration = duration;
208 self
209 }
210
211 pub fn register_strategy(&self, strategy: Box<dyn SummaryStrategy>) {
213 let mut strategies = self.summary_strategies.write().unwrap();
214 strategies.insert(strategy.event_type().to_string(), strategy);
215 }
216
217 pub fn receive(&self, event: PeatEvent) -> Result<()> {
225 let routing = event.routing.as_ref();
226
227 let propagation = routing
228 .map(|r| {
229 PropagationMode::try_from(r.propagation).unwrap_or(PropagationMode::PropagationFull)
230 })
231 .unwrap_or(PropagationMode::PropagationFull);
232
233 match propagation {
234 PropagationMode::PropagationFull => {
235 let priority = routing
237 .map(|r| {
238 EventPriority::try_from(r.priority).unwrap_or(EventPriority::PriorityNormal)
239 })
240 .unwrap_or(EventPriority::PriorityNormal);
241
242 let mut queue = self.passthrough_queue.write().unwrap();
243 queue.push(event);
244 let _ = priority; }
246
247 PropagationMode::PropagationSummary => {
248 let key = (event.event_class, event.event_type.clone());
250 let window_duration = routing
251 .map(|r| {
252 if r.aggregation_window_ms > 0 {
253 Duration::from_millis(r.aggregation_window_ms as u64)
254 } else {
255 self.default_window_duration
256 }
257 })
258 .unwrap_or(self.default_window_duration);
259
260 let event_class =
261 EventClass::try_from(event.event_class).unwrap_or(EventClass::Unspecified);
262
263 let mut windows = self.windows.write().unwrap();
264 let window = windows.entry(key).or_insert_with(|| {
265 AggregationWindow::new(event_class, &event.event_type, window_duration)
266 });
267
268 window.add(event);
269 }
270
271 PropagationMode::PropagationQuery => {
272 let mut store = self.queryable_store.write().unwrap();
274 store.insert(event.event_id.clone(), event);
275 }
276
277 PropagationMode::PropagationLocal => {
278 }
280 }
281
282 Ok(())
283 }
284
285 pub fn flush_expired_windows(&self) -> usize {
289 let mut windows = self.windows.write().unwrap();
290 let mut summaries_generated = 0;
291
292 let expired_keys: Vec<WindowKey> = windows
293 .iter()
294 .filter(|(_, w)| w.should_flush() && !w.events.is_empty())
295 .map(|(k, _)| k.clone())
296 .collect();
297
298 for key in expired_keys {
299 if let Some(window) = windows.get_mut(&key) {
300 if let Some(summary_event) = self.generate_summary(window) {
301 let mut queue = self.summary_queue.write().unwrap();
302 queue.push(summary_event);
303 summaries_generated += 1;
304 }
305 window.reset();
306 }
307 }
308
309 summaries_generated
310 }
311
312 pub fn flush_all_windows(&self) -> usize {
316 let mut windows = self.windows.write().unwrap();
317 let mut summaries_generated = 0;
318
319 let non_empty_keys: Vec<WindowKey> = windows
320 .iter()
321 .filter(|(_, w)| !w.events.is_empty())
322 .map(|(k, _)| k.clone())
323 .collect();
324
325 for key in non_empty_keys {
326 if let Some(window) = windows.get_mut(&key) {
327 if let Some(summary_event) = self.generate_summary(window) {
328 let mut queue = self.summary_queue.write().unwrap();
329 queue.push(summary_event);
330 summaries_generated += 1;
331 }
332 window.reset();
333 }
334 }
335
336 summaries_generated
337 }
338
339 fn generate_summary(&self, window: &AggregationWindow) -> Option<PeatEvent> {
341 if window.events.is_empty() {
342 return None;
343 }
344
345 let summary_id = self.generate_summary_id();
346 let now = current_timestamp();
347
348 let strategies = self.summary_strategies.read().unwrap();
350
351 let event_type_base = window
353 .event_type
354 .split('.')
355 .next()
356 .unwrap_or(&window.event_type);
357
358 let summary_payload = if let Some(strategy) = strategies.get(event_type_base) {
359 strategy.summarize(window.events())
360 } else {
361 DefaultSummaryStrategy::new(&window.event_type).summarize(window.events())
363 };
364
365 let event_summary = EventSummary {
367 formation_id: self.echelon_id.clone(),
368 window_start: Some(now), window_end: Some(now),
370 event_class: window.event_class as i32,
371 event_type: window.event_type.clone(),
372 event_count: window.event_count() as u32,
373 source_node_ids: window.source_nodes().iter().cloned().collect(),
374 summary_type_url: format!("type.peat/summary.{}", window.event_type),
375 summary_value: summary_payload,
376 };
377
378 Some(PeatEvent {
380 event_id: summary_id,
381 timestamp: Some(now),
382 source_node_id: self.echelon_id.clone(),
383 source_formation_id: self.echelon_id.clone(),
384 source_instance_id: None,
385 event_class: window.event_class as i32,
386 event_type: format!("{}_summary", window.event_type),
387 routing: Some(AggregationPolicy {
388 propagation: PropagationMode::PropagationFull as i32, priority: EventPriority::PriorityNormal as i32,
390 ttl_seconds: 300,
391 aggregation_window_ms: 0,
392 }),
393 payload_type_url: format!("type.peat/event_summary.{}", window.event_type),
394 payload_value: prost::Message::encode_to_vec(&event_summary),
395 })
396 }
397
398 pub fn pop_passthrough(&self) -> Vec<PeatEvent> {
400 let mut queue = self.passthrough_queue.write().unwrap();
401 let mut events = Vec::new();
402
403 events.extend(queue.pop_critical());
405
406 events.extend(queue.pop_weighted(100)); events
410 }
411
412 pub fn pop_summaries(&self) -> Vec<PeatEvent> {
414 let mut queue = self.summary_queue.write().unwrap();
415 queue.drain(..).collect()
416 }
417
418 pub fn pop_all(&self) -> Vec<PeatEvent> {
420 let mut events = self.pop_passthrough();
421 events.extend(self.pop_summaries());
422 events
423 }
424
425 pub fn query_local(&self, event_type: Option<&str>) -> Vec<PeatEvent> {
427 let store = self.queryable_store.read().unwrap();
428 store
429 .values()
430 .filter(|e| event_type.is_none() || Some(e.event_type.as_str()) == event_type)
431 .cloned()
432 .collect()
433 }
434
435 pub fn get_local(&self, event_id: &str) -> Option<PeatEvent> {
437 let store = self.queryable_store.read().unwrap();
438 store.get(event_id).cloned()
439 }
440
441 pub fn passthrough_count(&self) -> usize {
443 let queue = self.passthrough_queue.read().unwrap();
444 queue.len()
445 }
446
447 pub fn queryable_count(&self) -> usize {
449 let store = self.queryable_store.read().unwrap();
450 store.len()
451 }
452
453 pub fn summary_count(&self) -> usize {
455 let queue = self.summary_queue.read().unwrap();
456 queue.len()
457 }
458
459 pub fn window_count(&self) -> usize {
461 let windows = self.windows.read().unwrap();
462 windows.len()
463 }
464
465 pub fn echelon_id(&self) -> &str {
467 &self.echelon_id
468 }
469
470 pub fn echelon_type(&self) -> EchelonType {
472 self.echelon_type
473 }
474
475 pub fn enforce_ttl(&self) {
477 let now = SystemTime::now()
478 .duration_since(SystemTime::UNIX_EPOCH)
479 .unwrap();
480 let now_secs = now.as_secs();
481
482 let mut store = self.queryable_store.write().unwrap();
483 store.retain(|_, event| {
484 if let Some(routing) = &event.routing {
485 if routing.ttl_seconds > 0 {
486 if let Some(ts) = &event.timestamp {
487 let event_secs = ts.seconds;
488 let expiry = event_secs + routing.ttl_seconds as u64;
489 return now_secs < expiry;
490 }
491 }
492 }
493 true });
495 }
496
497 fn generate_summary_id(&self) -> String {
498 let mut counter = self.summary_counter.write().unwrap();
499 *counter += 1;
500 format!("{}-summary-{}", self.echelon_id, *counter)
501 }
502}
503
504fn current_timestamp() -> Timestamp {
506 let now = SystemTime::now()
507 .duration_since(SystemTime::UNIX_EPOCH)
508 .unwrap();
509 Timestamp {
510 seconds: now.as_secs(),
511 nanos: now.subsec_nanos(),
512 }
513}
514
515#[cfg(test)]
516mod tests {
517 use super::*;
518
519 fn make_event(
520 id: &str,
521 event_type: &str,
522 propagation: PropagationMode,
523 priority: EventPriority,
524 ) -> PeatEvent {
525 PeatEvent {
526 event_id: id.to_string(),
527 timestamp: Some(current_timestamp()),
528 source_node_id: format!("node-{}", id),
529 source_formation_id: "squad-1".to_string(),
530 source_instance_id: None,
531 event_class: EventClass::Product as i32,
532 event_type: event_type.to_string(),
533 routing: Some(AggregationPolicy {
534 propagation: propagation as i32,
535 priority: priority as i32,
536 ttl_seconds: 300,
537 aggregation_window_ms: 0, }),
539 payload_type_url: String::new(),
540 payload_value: vec![],
541 }
542 }
543
544 #[test]
545 fn test_aggregator_creation() {
546 let aggregator = EchelonAggregator::new("squad-1".to_string(), EchelonType::Squad);
547 assert_eq!(aggregator.echelon_id(), "squad-1");
548 assert_eq!(aggregator.echelon_type(), EchelonType::Squad);
549 }
550
551 #[test]
552 fn test_full_propagation_passthrough() {
553 let aggregator = EchelonAggregator::new("squad-1".to_string(), EchelonType::Squad);
554
555 let event = make_event(
556 "evt-1",
557 "detection",
558 PropagationMode::PropagationFull,
559 EventPriority::PriorityNormal,
560 );
561 aggregator.receive(event).unwrap();
562
563 assert_eq!(aggregator.passthrough_count(), 1);
564 assert_eq!(aggregator.queryable_count(), 0);
565
566 let events = aggregator.pop_passthrough();
567 assert_eq!(events.len(), 1);
568 assert_eq!(events[0].event_id, "evt-1");
569 }
570
571 #[test]
572 fn test_query_propagation_stored_locally() {
573 let aggregator = EchelonAggregator::new("squad-1".to_string(), EchelonType::Squad);
574
575 let event = make_event(
576 "evt-1",
577 "telemetry.cpu",
578 PropagationMode::PropagationQuery,
579 EventPriority::PriorityLow,
580 );
581 aggregator.receive(event).unwrap();
582
583 assert_eq!(aggregator.passthrough_count(), 0);
584 assert_eq!(aggregator.queryable_count(), 1);
585
586 let local = aggregator.query_local(Some("telemetry.cpu"));
587 assert_eq!(local.len(), 1);
588 assert_eq!(local[0].event_id, "evt-1");
589 }
590
591 #[test]
592 fn test_summary_propagation_aggregated() {
593 let aggregator = EchelonAggregator::new("squad-1".to_string(), EchelonType::Squad)
594 .with_default_window_duration(Duration::from_millis(50));
595
596 for i in 0..5 {
598 let event = make_event(
599 &format!("evt-{}", i),
600 "detection.vehicle",
601 PropagationMode::PropagationSummary,
602 EventPriority::PriorityNormal,
603 );
604 aggregator.receive(event).unwrap();
605 }
606
607 assert_eq!(aggregator.window_count(), 1);
608 assert_eq!(aggregator.passthrough_count(), 0);
609
610 std::thread::sleep(Duration::from_millis(100));
612
613 let summaries = aggregator.flush_expired_windows();
614 assert_eq!(summaries, 1);
615
616 let summary_events = aggregator.pop_summaries();
617 assert_eq!(summary_events.len(), 1);
618 assert!(summary_events[0]
619 .event_type
620 .contains("detection.vehicle_summary"));
621 }
622
623 #[test]
624 fn test_local_propagation_ignored() {
625 let aggregator = EchelonAggregator::new("squad-1".to_string(), EchelonType::Squad);
626
627 let event = make_event(
628 "evt-1",
629 "debug.trace",
630 PropagationMode::PropagationLocal,
631 EventPriority::PriorityLow,
632 );
633 aggregator.receive(event).unwrap();
634
635 assert_eq!(aggregator.passthrough_count(), 0);
636 assert_eq!(aggregator.queryable_count(), 0);
637 assert_eq!(aggregator.window_count(), 0);
638 }
639
640 #[test]
641 fn test_critical_events_passthrough() {
642 let aggregator = EchelonAggregator::new("squad-1".to_string(), EchelonType::Squad);
643
644 let event = make_event(
646 "critical-1",
647 "anomaly.urgent",
648 PropagationMode::PropagationFull,
649 EventPriority::PriorityCritical,
650 );
651 aggregator.receive(event).unwrap();
652
653 let event = make_event(
655 "normal-1",
656 "detection",
657 PropagationMode::PropagationFull,
658 EventPriority::PriorityNormal,
659 );
660 aggregator.receive(event).unwrap();
661
662 let events = aggregator.pop_passthrough();
663 assert_eq!(events.len(), 2);
664 assert_eq!(events[0].event_id, "critical-1");
666 }
667
668 #[test]
669 fn test_flush_all_windows() {
670 let aggregator = EchelonAggregator::new("squad-1".to_string(), EchelonType::Squad)
671 .with_default_window_duration(Duration::from_secs(3600)); for i in 0..3 {
675 let event = make_event(
676 &format!("evt-{}", i),
677 "detection",
678 PropagationMode::PropagationSummary,
679 EventPriority::PriorityNormal,
680 );
681 aggregator.receive(event).unwrap();
682 }
683
684 let summaries = aggregator.flush_all_windows();
686 assert_eq!(summaries, 1);
687
688 let summary_events = aggregator.pop_summaries();
689 assert_eq!(summary_events.len(), 1);
690 }
691
692 #[test]
693 fn test_multiple_event_types_separate_windows() {
694 let aggregator = EchelonAggregator::new("squad-1".to_string(), EchelonType::Squad)
695 .with_default_window_duration(Duration::from_millis(50));
696
697 aggregator
699 .receive(make_event(
700 "det-1",
701 "detection.vehicle",
702 PropagationMode::PropagationSummary,
703 EventPriority::PriorityNormal,
704 ))
705 .unwrap();
706
707 aggregator
709 .receive(make_event(
710 "tel-1",
711 "telemetry.cpu",
712 PropagationMode::PropagationSummary,
713 EventPriority::PriorityNormal,
714 ))
715 .unwrap();
716
717 assert_eq!(aggregator.window_count(), 2);
718
719 std::thread::sleep(Duration::from_millis(100));
720 let summaries = aggregator.flush_expired_windows();
721 assert_eq!(summaries, 2);
722 }
723
724 #[test]
725 fn test_pop_all_includes_passthrough_and_summaries() {
726 let aggregator = EchelonAggregator::new("squad-1".to_string(), EchelonType::Squad)
727 .with_default_window_duration(Duration::from_millis(50));
728
729 aggregator
731 .receive(make_event(
732 "pass-1",
733 "anomaly",
734 PropagationMode::PropagationFull,
735 EventPriority::PriorityHigh,
736 ))
737 .unwrap();
738
739 aggregator
741 .receive(make_event(
742 "sum-1",
743 "detection",
744 PropagationMode::PropagationSummary,
745 EventPriority::PriorityNormal,
746 ))
747 .unwrap();
748
749 std::thread::sleep(Duration::from_millis(100));
750 aggregator.flush_expired_windows();
751
752 let all = aggregator.pop_all();
753 assert_eq!(all.len(), 2); }
755
756 #[test]
757 fn test_source_nodes_tracked_in_window() {
758 let aggregator = EchelonAggregator::new("squad-1".to_string(), EchelonType::Squad)
759 .with_default_window_duration(Duration::from_millis(50));
760
761 for i in 0..3 {
763 let mut event = make_event(
764 &format!("evt-{}", i),
765 "detection",
766 PropagationMode::PropagationSummary,
767 EventPriority::PriorityNormal,
768 );
769 event.source_node_id = format!("node-{}", i);
770 aggregator.receive(event).unwrap();
771 }
772
773 std::thread::sleep(Duration::from_millis(100));
774 aggregator.flush_expired_windows();
775
776 let summaries = aggregator.pop_summaries();
777 assert_eq!(summaries.len(), 1);
778
779 let summary: EventSummary =
781 prost::Message::decode(&summaries[0].payload_value[..]).unwrap();
782 assert_eq!(summary.source_node_ids.len(), 3);
783 assert_eq!(summary.event_count, 3);
784 }
785}