Skip to main content

peat_protocol/event/
summary.rs

1//! Summary Strategies for Event Aggregation (ADR-027 Phase 2)
2//!
3//! Different event types require different summarization strategies.
4//! This module provides a trait and implementations for generating
5//! summaries from aggregated events.
6//!
7//! ## Strategy Pattern
8//!
9//! ```text
10//! Events → SummaryStrategy → Summary Payload (bytes)
11//!              ↓
12//!    ┌─────────┴─────────┐
13//!    │  Detection: counts, histogram │
14//!    │  Telemetry: min/max/avg      │
15//!    │  Custom: user-defined        │
16//!    └───────────────────────────────┘
17//! ```
18
19use peat_schema::event::v1::PeatEvent;
20use std::collections::HashMap;
21use std::fmt::Debug;
22
23/// Strategy for summarizing events of a given type
24///
25/// Implementations should be stateless and thread-safe.
26pub trait SummaryStrategy: Send + Sync + Debug {
27    /// Event type this strategy handles (e.g., "detection", "telemetry")
28    fn event_type(&self) -> &str;
29
30    /// Generate summary payload from collected events
31    ///
32    /// Returns a byte vector containing the summarized data.
33    /// The format is application-specific but should be consistent.
34    fn summarize(&self, events: &[PeatEvent]) -> Vec<u8>;
35}
36
37/// Default summary strategy for events without a specific strategy
38///
39/// Generates a simple count-based summary.
40#[derive(Debug)]
41pub struct DefaultSummaryStrategy {
42    event_type: String,
43}
44
45impl DefaultSummaryStrategy {
46    /// Create a new default strategy for an event type
47    pub fn new(event_type: &str) -> Self {
48        Self {
49            event_type: event_type.to_string(),
50        }
51    }
52}
53
54impl SummaryStrategy for DefaultSummaryStrategy {
55    fn event_type(&self) -> &str {
56        &self.event_type
57    }
58
59    fn summarize(&self, events: &[PeatEvent]) -> Vec<u8> {
60        // Simple JSON summary with counts
61        let summary = serde_json::json!({
62            "event_type": self.event_type,
63            "event_count": events.len(),
64            "source_nodes": events.iter()
65                .map(|e| e.source_node_id.clone())
66                .collect::<std::collections::HashSet<_>>()
67                .into_iter()
68                .collect::<Vec<_>>(),
69        });
70
71        serde_json::to_vec(&summary).unwrap_or_default()
72    }
73}
74
75/// Detection event summary strategy
76///
77/// Generates summaries with:
78/// - Counts by detection type
79/// - Confidence histogram (10 buckets)
80/// - Total detection count
81#[derive(Debug, Default)]
82pub struct DetectionSummaryStrategy;
83
84impl DetectionSummaryStrategy {
85    /// Create a new detection summary strategy
86    pub fn new() -> Self {
87        Self
88    }
89}
90
91impl SummaryStrategy for DetectionSummaryStrategy {
92    fn event_type(&self) -> &str {
93        "detection"
94    }
95
96    fn summarize(&self, events: &[PeatEvent]) -> Vec<u8> {
97        let mut counts_by_type: HashMap<String, u32> = HashMap::new();
98        let mut confidence_histogram = [0u32; 10];
99        let mut total_detections = 0u32;
100
101        for event in events {
102            total_detections += 1;
103
104            // Parse event type for detection subtype
105            let subtype = event
106                .event_type
107                .strip_prefix("detection.")
108                .or_else(|| event.event_type.strip_prefix("product.detection."))
109                .unwrap_or("unknown");
110
111            *counts_by_type.entry(subtype.to_string()).or_default() += 1;
112
113            // Try to extract confidence from payload if present
114            if !event.payload_value.is_empty() {
115                // Attempt to parse confidence from JSON payload
116                if let Ok(payload) =
117                    serde_json::from_slice::<serde_json::Value>(&event.payload_value)
118                {
119                    if let Some(conf) = payload.get("confidence").and_then(|v| v.as_f64()) {
120                        let bucket = ((conf * 10.0).clamp(0.0, 9.0)) as usize;
121                        confidence_histogram[bucket] += 1;
122                    }
123                }
124            }
125        }
126
127        let summary = DetectionSummary {
128            counts_by_type,
129            confidence_histogram: confidence_histogram.to_vec(),
130            total_detections,
131        };
132
133        serde_json::to_vec(&summary).unwrap_or_default()
134    }
135}
136
137/// Summary data for detection events
138#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
139pub struct DetectionSummary {
140    /// Counts of detections by type
141    pub counts_by_type: HashMap<String, u32>,
142
143    /// Confidence histogram (10 buckets: 0.0-0.1, 0.1-0.2, ..., 0.9-1.0)
144    pub confidence_histogram: Vec<u32>,
145
146    /// Total number of detections
147    pub total_detections: u32,
148}
149
150/// Telemetry event summary strategy
151///
152/// Generates summaries with:
153/// - Min/max/avg for each metric
154/// - Sample count
155#[derive(Debug, Default)]
156pub struct TelemetrySummaryStrategy;
157
158impl TelemetrySummaryStrategy {
159    /// Create a new telemetry summary strategy
160    pub fn new() -> Self {
161        Self
162    }
163}
164
165impl SummaryStrategy for TelemetrySummaryStrategy {
166    fn event_type(&self) -> &str {
167        "telemetry"
168    }
169
170    fn summarize(&self, events: &[PeatEvent]) -> Vec<u8> {
171        let mut metrics: HashMap<String, MetricStats> = HashMap::new();
172
173        for event in events {
174            // Try to parse metrics from payload
175            if !event.payload_value.is_empty() {
176                if let Ok(payload) =
177                    serde_json::from_slice::<serde_json::Value>(&event.payload_value)
178                {
179                    // Look for metrics in the payload
180                    if let Some(obj) = payload.as_object() {
181                        for (key, value) in obj {
182                            if let Some(v) = value.as_f64() {
183                                let stats = metrics.entry(key.clone()).or_default();
184                                stats.update(v);
185                            }
186                        }
187                    }
188                }
189            }
190
191            // Also track by event type (e.g., "telemetry.cpu" -> "cpu")
192            let metric_name = event
193                .event_type
194                .strip_prefix("telemetry.")
195                .unwrap_or(&event.event_type);
196
197            // Track at least the count for this metric type
198            let stats = metrics.entry(metric_name.to_string()).or_default();
199            if stats.count == 0 {
200                stats.count = 1;
201            } else {
202                stats.count += 1;
203            }
204        }
205
206        let summary = TelemetrySummary {
207            metrics: metrics
208                .into_iter()
209                .map(|(k, v)| (k, v.finalize()))
210                .collect(),
211            sample_count: events.len() as u32,
212        };
213
214        serde_json::to_vec(&summary).unwrap_or_default()
215    }
216}
217
218/// Summary data for telemetry events
219#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
220pub struct TelemetrySummary {
221    /// Statistics for each metric
222    pub metrics: HashMap<String, MetricSummaryStats>,
223
224    /// Total number of samples
225    pub sample_count: u32,
226}
227
228/// Statistics for a single metric
229#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
230pub struct MetricStats {
231    min: f64,
232    max: f64,
233    sum: f64,
234    count: u32,
235}
236
237impl MetricStats {
238    /// Update stats with a new value
239    pub fn update(&mut self, value: f64) {
240        if self.count == 0 {
241            self.min = value;
242            self.max = value;
243        } else {
244            self.min = self.min.min(value);
245            self.max = self.max.max(value);
246        }
247        self.sum += value;
248        self.count += 1;
249    }
250
251    /// Finalize into a summary stats structure
252    pub fn finalize(&self) -> MetricSummaryStats {
253        MetricSummaryStats {
254            min: self.min,
255            max: self.max,
256            avg: if self.count > 0 {
257                self.sum / self.count as f64
258            } else {
259                0.0
260            },
261            count: self.count,
262        }
263    }
264}
265
266/// Final statistics for a metric
267#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
268pub struct MetricSummaryStats {
269    /// Minimum value
270    pub min: f64,
271
272    /// Maximum value
273    pub max: f64,
274
275    /// Average value
276    pub avg: f64,
277
278    /// Sample count
279    pub count: u32,
280}
281
282/// Anomaly event summary strategy
283///
284/// Generates summaries with:
285/// - Counts by severity
286/// - List of unique anomaly types
287/// - Total anomaly count
288#[derive(Debug, Default)]
289pub struct AnomalySummaryStrategy;
290
291impl AnomalySummaryStrategy {
292    /// Create a new anomaly summary strategy
293    pub fn new() -> Self {
294        Self
295    }
296}
297
298impl SummaryStrategy for AnomalySummaryStrategy {
299    fn event_type(&self) -> &str {
300        "anomaly"
301    }
302
303    fn summarize(&self, events: &[PeatEvent]) -> Vec<u8> {
304        let mut counts_by_severity: HashMap<String, u32> = HashMap::new();
305        let mut anomaly_types: std::collections::HashSet<String> = std::collections::HashSet::new();
306
307        for event in events {
308            // Extract severity from priority
309            let severity = if let Some(routing) = &event.routing {
310                match routing.priority {
311                    0 => "critical",
312                    1 => "high",
313                    2 => "normal",
314                    3 => "low",
315                    _ => "unknown",
316                }
317            } else {
318                "unknown"
319            };
320
321            *counts_by_severity.entry(severity.to_string()).or_default() += 1;
322
323            // Extract anomaly type
324            let anomaly_type = event
325                .event_type
326                .strip_prefix("anomaly.")
327                .unwrap_or(&event.event_type);
328            anomaly_types.insert(anomaly_type.to_string());
329        }
330
331        let summary = AnomalySummary {
332            counts_by_severity,
333            anomaly_types: anomaly_types.into_iter().collect(),
334            total_anomalies: events.len() as u32,
335        };
336
337        serde_json::to_vec(&summary).unwrap_or_default()
338    }
339}
340
341/// Summary data for anomaly events
342#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
343pub struct AnomalySummary {
344    /// Counts by severity level
345    pub counts_by_severity: HashMap<String, u32>,
346
347    /// Unique anomaly types observed
348    pub anomaly_types: Vec<String>,
349
350    /// Total anomaly count
351    pub total_anomalies: u32,
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357    use peat_schema::common::v1::Timestamp;
358    use peat_schema::event::v1::{AggregationPolicy, EventClass, EventPriority, PropagationMode};
359
360    fn make_event(event_type: &str, payload: Option<serde_json::Value>) -> PeatEvent {
361        PeatEvent {
362            event_id: "test-1".to_string(),
363            timestamp: Some(Timestamp {
364                seconds: 0,
365                nanos: 0,
366            }),
367            source_node_id: "node-1".to_string(),
368            source_formation_id: "squad-1".to_string(),
369            source_instance_id: None,
370            event_class: EventClass::Product as i32,
371            event_type: event_type.to_string(),
372            routing: Some(AggregationPolicy {
373                propagation: PropagationMode::PropagationSummary as i32,
374                priority: EventPriority::PriorityNormal as i32,
375                ttl_seconds: 300,
376                aggregation_window_ms: 1000,
377            }),
378            payload_type_url: String::new(),
379            payload_value: payload
380                .map(|p| serde_json::to_vec(&p).unwrap())
381                .unwrap_or_default(),
382        }
383    }
384
385    #[test]
386    fn test_default_strategy() {
387        let strategy = DefaultSummaryStrategy::new("test");
388        assert_eq!(strategy.event_type(), "test");
389
390        let events = vec![
391            make_event("test.a", None),
392            make_event("test.b", None),
393            make_event("test.a", None),
394        ];
395
396        let summary_bytes = strategy.summarize(&events);
397        let summary: serde_json::Value = serde_json::from_slice(&summary_bytes).unwrap();
398
399        assert_eq!(summary["event_count"], 3);
400        assert_eq!(summary["event_type"], "test");
401    }
402
403    #[test]
404    fn test_detection_strategy_counts() {
405        let strategy = DetectionSummaryStrategy::new();
406        assert_eq!(strategy.event_type(), "detection");
407
408        let events = vec![
409            make_event("detection.vehicle", None),
410            make_event("detection.person", None),
411            make_event("detection.vehicle", None),
412        ];
413
414        let summary_bytes = strategy.summarize(&events);
415        let summary: DetectionSummary = serde_json::from_slice(&summary_bytes).unwrap();
416
417        assert_eq!(summary.total_detections, 3);
418        assert_eq!(*summary.counts_by_type.get("vehicle").unwrap(), 2);
419        assert_eq!(*summary.counts_by_type.get("person").unwrap(), 1);
420    }
421
422    #[test]
423    fn test_detection_strategy_confidence() {
424        let strategy = DetectionSummaryStrategy::new();
425
426        let events = vec![
427            make_event(
428                "detection.vehicle",
429                Some(serde_json::json!({"confidence": 0.95})),
430            ),
431            make_event(
432                "detection.vehicle",
433                Some(serde_json::json!({"confidence": 0.85})),
434            ),
435            make_event(
436                "detection.vehicle",
437                Some(serde_json::json!({"confidence": 0.35})),
438            ),
439        ];
440
441        let summary_bytes = strategy.summarize(&events);
442        let summary: DetectionSummary = serde_json::from_slice(&summary_bytes).unwrap();
443
444        // Bucket 9 (0.9-1.0): 1 event with 0.95
445        // Bucket 8 (0.8-0.9): 1 event with 0.85
446        // Bucket 3 (0.3-0.4): 1 event with 0.35
447        assert_eq!(summary.confidence_histogram[9], 1);
448        assert_eq!(summary.confidence_histogram[8], 1);
449        assert_eq!(summary.confidence_histogram[3], 1);
450    }
451
452    #[test]
453    fn test_telemetry_strategy() {
454        let strategy = TelemetrySummaryStrategy::new();
455        assert_eq!(strategy.event_type(), "telemetry");
456
457        let events = vec![
458            make_event(
459                "telemetry.cpu",
460                Some(serde_json::json!({"cpu_percent": 50.0, "memory_mb": 1024.0})),
461            ),
462            make_event(
463                "telemetry.cpu",
464                Some(serde_json::json!({"cpu_percent": 75.0, "memory_mb": 2048.0})),
465            ),
466        ];
467
468        let summary_bytes = strategy.summarize(&events);
469        let summary: TelemetrySummary = serde_json::from_slice(&summary_bytes).unwrap();
470
471        assert_eq!(summary.sample_count, 2);
472
473        let cpu = summary.metrics.get("cpu_percent").unwrap();
474        assert_eq!(cpu.min, 50.0);
475        assert_eq!(cpu.max, 75.0);
476        assert!((cpu.avg - 62.5).abs() < 0.01);
477        assert_eq!(cpu.count, 2);
478
479        let mem = summary.metrics.get("memory_mb").unwrap();
480        assert_eq!(mem.min, 1024.0);
481        assert_eq!(mem.max, 2048.0);
482    }
483
484    #[test]
485    fn test_anomaly_strategy() {
486        let strategy = AnomalySummaryStrategy::new();
487        assert_eq!(strategy.event_type(), "anomaly");
488
489        let events = vec![
490            {
491                let mut e = make_event("anomaly.intrusion", None);
492                e.routing.as_mut().unwrap().priority = EventPriority::PriorityCritical as i32;
493                e
494            },
495            {
496                let mut e = make_event("anomaly.network_spike", None);
497                e.routing.as_mut().unwrap().priority = EventPriority::PriorityHigh as i32;
498                e
499            },
500            {
501                let mut e = make_event("anomaly.intrusion", None);
502                e.routing.as_mut().unwrap().priority = EventPriority::PriorityCritical as i32;
503                e
504            },
505        ];
506
507        let summary_bytes = strategy.summarize(&events);
508        let summary: AnomalySummary = serde_json::from_slice(&summary_bytes).unwrap();
509
510        assert_eq!(summary.total_anomalies, 3);
511        assert_eq!(*summary.counts_by_severity.get("critical").unwrap(), 2);
512        assert_eq!(*summary.counts_by_severity.get("high").unwrap(), 1);
513        assert!(summary.anomaly_types.contains(&"intrusion".to_string()));
514        assert!(summary.anomaly_types.contains(&"network_spike".to_string()));
515    }
516
517    #[test]
518    fn test_empty_events() {
519        let strategy = DetectionSummaryStrategy::new();
520        let summary_bytes = strategy.summarize(&[]);
521        let summary: DetectionSummary = serde_json::from_slice(&summary_bytes).unwrap();
522
523        assert_eq!(summary.total_detections, 0);
524        assert!(summary.counts_by_type.is_empty());
525    }
526}