1use peat_schema::event::v1::PeatEvent;
20use std::collections::HashMap;
21use std::fmt::Debug;
22
23pub trait SummaryStrategy: Send + Sync + Debug {
27 fn event_type(&self) -> &str;
29
30 fn summarize(&self, events: &[PeatEvent]) -> Vec<u8>;
35}
36
37#[derive(Debug)]
41pub struct DefaultSummaryStrategy {
42 event_type: String,
43}
44
45impl DefaultSummaryStrategy {
46 pub fn new(event_type: &str) -> Self {
48 Self {
49 event_type: event_type.to_string(),
50 }
51 }
52}
53
54impl SummaryStrategy for DefaultSummaryStrategy {
55 fn event_type(&self) -> &str {
56 &self.event_type
57 }
58
59 fn summarize(&self, events: &[PeatEvent]) -> Vec<u8> {
60 let summary = serde_json::json!({
62 "event_type": self.event_type,
63 "event_count": events.len(),
64 "source_nodes": events.iter()
65 .map(|e| e.source_node_id.clone())
66 .collect::<std::collections::HashSet<_>>()
67 .into_iter()
68 .collect::<Vec<_>>(),
69 });
70
71 serde_json::to_vec(&summary).unwrap_or_default()
72 }
73}
74
75#[derive(Debug, Default)]
82pub struct DetectionSummaryStrategy;
83
84impl DetectionSummaryStrategy {
85 pub fn new() -> Self {
87 Self
88 }
89}
90
91impl SummaryStrategy for DetectionSummaryStrategy {
92 fn event_type(&self) -> &str {
93 "detection"
94 }
95
96 fn summarize(&self, events: &[PeatEvent]) -> Vec<u8> {
97 let mut counts_by_type: HashMap<String, u32> = HashMap::new();
98 let mut confidence_histogram = [0u32; 10];
99 let mut total_detections = 0u32;
100
101 for event in events {
102 total_detections += 1;
103
104 let subtype = event
106 .event_type
107 .strip_prefix("detection.")
108 .or_else(|| event.event_type.strip_prefix("product.detection."))
109 .unwrap_or("unknown");
110
111 *counts_by_type.entry(subtype.to_string()).or_default() += 1;
112
113 if !event.payload_value.is_empty() {
115 if let Ok(payload) =
117 serde_json::from_slice::<serde_json::Value>(&event.payload_value)
118 {
119 if let Some(conf) = payload.get("confidence").and_then(|v| v.as_f64()) {
120 let bucket = ((conf * 10.0).clamp(0.0, 9.0)) as usize;
121 confidence_histogram[bucket] += 1;
122 }
123 }
124 }
125 }
126
127 let summary = DetectionSummary {
128 counts_by_type,
129 confidence_histogram: confidence_histogram.to_vec(),
130 total_detections,
131 };
132
133 serde_json::to_vec(&summary).unwrap_or_default()
134 }
135}
136
137#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
139pub struct DetectionSummary {
140 pub counts_by_type: HashMap<String, u32>,
142
143 pub confidence_histogram: Vec<u32>,
145
146 pub total_detections: u32,
148}
149
150#[derive(Debug, Default)]
156pub struct TelemetrySummaryStrategy;
157
158impl TelemetrySummaryStrategy {
159 pub fn new() -> Self {
161 Self
162 }
163}
164
165impl SummaryStrategy for TelemetrySummaryStrategy {
166 fn event_type(&self) -> &str {
167 "telemetry"
168 }
169
170 fn summarize(&self, events: &[PeatEvent]) -> Vec<u8> {
171 let mut metrics: HashMap<String, MetricStats> = HashMap::new();
172
173 for event in events {
174 if !event.payload_value.is_empty() {
176 if let Ok(payload) =
177 serde_json::from_slice::<serde_json::Value>(&event.payload_value)
178 {
179 if let Some(obj) = payload.as_object() {
181 for (key, value) in obj {
182 if let Some(v) = value.as_f64() {
183 let stats = metrics.entry(key.clone()).or_default();
184 stats.update(v);
185 }
186 }
187 }
188 }
189 }
190
191 let metric_name = event
193 .event_type
194 .strip_prefix("telemetry.")
195 .unwrap_or(&event.event_type);
196
197 let stats = metrics.entry(metric_name.to_string()).or_default();
199 if stats.count == 0 {
200 stats.count = 1;
201 } else {
202 stats.count += 1;
203 }
204 }
205
206 let summary = TelemetrySummary {
207 metrics: metrics
208 .into_iter()
209 .map(|(k, v)| (k, v.finalize()))
210 .collect(),
211 sample_count: events.len() as u32,
212 };
213
214 serde_json::to_vec(&summary).unwrap_or_default()
215 }
216}
217
218#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
220pub struct TelemetrySummary {
221 pub metrics: HashMap<String, MetricSummaryStats>,
223
224 pub sample_count: u32,
226}
227
228#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
230pub struct MetricStats {
231 min: f64,
232 max: f64,
233 sum: f64,
234 count: u32,
235}
236
237impl MetricStats {
238 pub fn update(&mut self, value: f64) {
240 if self.count == 0 {
241 self.min = value;
242 self.max = value;
243 } else {
244 self.min = self.min.min(value);
245 self.max = self.max.max(value);
246 }
247 self.sum += value;
248 self.count += 1;
249 }
250
251 pub fn finalize(&self) -> MetricSummaryStats {
253 MetricSummaryStats {
254 min: self.min,
255 max: self.max,
256 avg: if self.count > 0 {
257 self.sum / self.count as f64
258 } else {
259 0.0
260 },
261 count: self.count,
262 }
263 }
264}
265
266#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
268pub struct MetricSummaryStats {
269 pub min: f64,
271
272 pub max: f64,
274
275 pub avg: f64,
277
278 pub count: u32,
280}
281
282#[derive(Debug, Default)]
289pub struct AnomalySummaryStrategy;
290
291impl AnomalySummaryStrategy {
292 pub fn new() -> Self {
294 Self
295 }
296}
297
298impl SummaryStrategy for AnomalySummaryStrategy {
299 fn event_type(&self) -> &str {
300 "anomaly"
301 }
302
303 fn summarize(&self, events: &[PeatEvent]) -> Vec<u8> {
304 let mut counts_by_severity: HashMap<String, u32> = HashMap::new();
305 let mut anomaly_types: std::collections::HashSet<String> = std::collections::HashSet::new();
306
307 for event in events {
308 let severity = if let Some(routing) = &event.routing {
310 match routing.priority {
311 0 => "critical",
312 1 => "high",
313 2 => "normal",
314 3 => "low",
315 _ => "unknown",
316 }
317 } else {
318 "unknown"
319 };
320
321 *counts_by_severity.entry(severity.to_string()).or_default() += 1;
322
323 let anomaly_type = event
325 .event_type
326 .strip_prefix("anomaly.")
327 .unwrap_or(&event.event_type);
328 anomaly_types.insert(anomaly_type.to_string());
329 }
330
331 let summary = AnomalySummary {
332 counts_by_severity,
333 anomaly_types: anomaly_types.into_iter().collect(),
334 total_anomalies: events.len() as u32,
335 };
336
337 serde_json::to_vec(&summary).unwrap_or_default()
338 }
339}
340
341#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
343pub struct AnomalySummary {
344 pub counts_by_severity: HashMap<String, u32>,
346
347 pub anomaly_types: Vec<String>,
349
350 pub total_anomalies: u32,
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357 use peat_schema::common::v1::Timestamp;
358 use peat_schema::event::v1::{AggregationPolicy, EventClass, EventPriority, PropagationMode};
359
360 fn make_event(event_type: &str, payload: Option<serde_json::Value>) -> PeatEvent {
361 PeatEvent {
362 event_id: "test-1".to_string(),
363 timestamp: Some(Timestamp {
364 seconds: 0,
365 nanos: 0,
366 }),
367 source_node_id: "node-1".to_string(),
368 source_formation_id: "squad-1".to_string(),
369 source_instance_id: None,
370 event_class: EventClass::Product as i32,
371 event_type: event_type.to_string(),
372 routing: Some(AggregationPolicy {
373 propagation: PropagationMode::PropagationSummary as i32,
374 priority: EventPriority::PriorityNormal as i32,
375 ttl_seconds: 300,
376 aggregation_window_ms: 1000,
377 }),
378 payload_type_url: String::new(),
379 payload_value: payload
380 .map(|p| serde_json::to_vec(&p).unwrap())
381 .unwrap_or_default(),
382 }
383 }
384
385 #[test]
386 fn test_default_strategy() {
387 let strategy = DefaultSummaryStrategy::new("test");
388 assert_eq!(strategy.event_type(), "test");
389
390 let events = vec![
391 make_event("test.a", None),
392 make_event("test.b", None),
393 make_event("test.a", None),
394 ];
395
396 let summary_bytes = strategy.summarize(&events);
397 let summary: serde_json::Value = serde_json::from_slice(&summary_bytes).unwrap();
398
399 assert_eq!(summary["event_count"], 3);
400 assert_eq!(summary["event_type"], "test");
401 }
402
403 #[test]
404 fn test_detection_strategy_counts() {
405 let strategy = DetectionSummaryStrategy::new();
406 assert_eq!(strategy.event_type(), "detection");
407
408 let events = vec![
409 make_event("detection.vehicle", None),
410 make_event("detection.person", None),
411 make_event("detection.vehicle", None),
412 ];
413
414 let summary_bytes = strategy.summarize(&events);
415 let summary: DetectionSummary = serde_json::from_slice(&summary_bytes).unwrap();
416
417 assert_eq!(summary.total_detections, 3);
418 assert_eq!(*summary.counts_by_type.get("vehicle").unwrap(), 2);
419 assert_eq!(*summary.counts_by_type.get("person").unwrap(), 1);
420 }
421
422 #[test]
423 fn test_detection_strategy_confidence() {
424 let strategy = DetectionSummaryStrategy::new();
425
426 let events = vec![
427 make_event(
428 "detection.vehicle",
429 Some(serde_json::json!({"confidence": 0.95})),
430 ),
431 make_event(
432 "detection.vehicle",
433 Some(serde_json::json!({"confidence": 0.85})),
434 ),
435 make_event(
436 "detection.vehicle",
437 Some(serde_json::json!({"confidence": 0.35})),
438 ),
439 ];
440
441 let summary_bytes = strategy.summarize(&events);
442 let summary: DetectionSummary = serde_json::from_slice(&summary_bytes).unwrap();
443
444 assert_eq!(summary.confidence_histogram[9], 1);
448 assert_eq!(summary.confidence_histogram[8], 1);
449 assert_eq!(summary.confidence_histogram[3], 1);
450 }
451
452 #[test]
453 fn test_telemetry_strategy() {
454 let strategy = TelemetrySummaryStrategy::new();
455 assert_eq!(strategy.event_type(), "telemetry");
456
457 let events = vec![
458 make_event(
459 "telemetry.cpu",
460 Some(serde_json::json!({"cpu_percent": 50.0, "memory_mb": 1024.0})),
461 ),
462 make_event(
463 "telemetry.cpu",
464 Some(serde_json::json!({"cpu_percent": 75.0, "memory_mb": 2048.0})),
465 ),
466 ];
467
468 let summary_bytes = strategy.summarize(&events);
469 let summary: TelemetrySummary = serde_json::from_slice(&summary_bytes).unwrap();
470
471 assert_eq!(summary.sample_count, 2);
472
473 let cpu = summary.metrics.get("cpu_percent").unwrap();
474 assert_eq!(cpu.min, 50.0);
475 assert_eq!(cpu.max, 75.0);
476 assert!((cpu.avg - 62.5).abs() < 0.01);
477 assert_eq!(cpu.count, 2);
478
479 let mem = summary.metrics.get("memory_mb").unwrap();
480 assert_eq!(mem.min, 1024.0);
481 assert_eq!(mem.max, 2048.0);
482 }
483
484 #[test]
485 fn test_anomaly_strategy() {
486 let strategy = AnomalySummaryStrategy::new();
487 assert_eq!(strategy.event_type(), "anomaly");
488
489 let events = vec![
490 {
491 let mut e = make_event("anomaly.intrusion", None);
492 e.routing.as_mut().unwrap().priority = EventPriority::PriorityCritical as i32;
493 e
494 },
495 {
496 let mut e = make_event("anomaly.network_spike", None);
497 e.routing.as_mut().unwrap().priority = EventPriority::PriorityHigh as i32;
498 e
499 },
500 {
501 let mut e = make_event("anomaly.intrusion", None);
502 e.routing.as_mut().unwrap().priority = EventPriority::PriorityCritical as i32;
503 e
504 },
505 ];
506
507 let summary_bytes = strategy.summarize(&events);
508 let summary: AnomalySummary = serde_json::from_slice(&summary_bytes).unwrap();
509
510 assert_eq!(summary.total_anomalies, 3);
511 assert_eq!(*summary.counts_by_severity.get("critical").unwrap(), 2);
512 assert_eq!(*summary.counts_by_severity.get("high").unwrap(), 1);
513 assert!(summary.anomaly_types.contains(&"intrusion".to_string()));
514 assert!(summary.anomaly_types.contains(&"network_spike".to_string()));
515 }
516
517 #[test]
518 fn test_empty_events() {
519 let strategy = DetectionSummaryStrategy::new();
520 let summary_bytes = strategy.summarize(&[]);
521 let summary: DetectionSummary = serde_json::from_slice(&summary_bytes).unwrap();
522
523 assert_eq!(summary.total_detections, 0);
524 assert!(summary.counts_by_type.is_empty());
525 }
526}