use peat_schema::event::v1::PeatEvent;
use std::collections::HashMap;
use std::fmt::Debug;
pub trait SummaryStrategy: Send + Sync + Debug {
fn event_type(&self) -> &str;
fn summarize(&self, events: &[PeatEvent]) -> Vec<u8>;
}
#[derive(Debug)]
pub struct DefaultSummaryStrategy {
event_type: String,
}
impl DefaultSummaryStrategy {
pub fn new(event_type: &str) -> Self {
Self {
event_type: event_type.to_string(),
}
}
}
impl SummaryStrategy for DefaultSummaryStrategy {
fn event_type(&self) -> &str {
&self.event_type
}
fn summarize(&self, events: &[PeatEvent]) -> Vec<u8> {
let summary = serde_json::json!({
"event_type": self.event_type,
"event_count": events.len(),
"source_nodes": events.iter()
.map(|e| e.source_node_id.clone())
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect::<Vec<_>>(),
});
serde_json::to_vec(&summary).unwrap_or_default()
}
}
#[derive(Debug, Default)]
pub struct DetectionSummaryStrategy;
impl DetectionSummaryStrategy {
pub fn new() -> Self {
Self
}
}
impl SummaryStrategy for DetectionSummaryStrategy {
fn event_type(&self) -> &str {
"detection"
}
fn summarize(&self, events: &[PeatEvent]) -> Vec<u8> {
let mut counts_by_type: HashMap<String, u32> = HashMap::new();
let mut confidence_histogram = [0u32; 10];
let mut total_detections = 0u32;
for event in events {
total_detections += 1;
let subtype = event
.event_type
.strip_prefix("detection.")
.or_else(|| event.event_type.strip_prefix("product.detection."))
.unwrap_or("unknown");
*counts_by_type.entry(subtype.to_string()).or_default() += 1;
if !event.payload_value.is_empty() {
if let Ok(payload) =
serde_json::from_slice::<serde_json::Value>(&event.payload_value)
{
if let Some(conf) = payload.get("confidence").and_then(|v| v.as_f64()) {
let bucket = ((conf * 10.0).clamp(0.0, 9.0)) as usize;
confidence_histogram[bucket] += 1;
}
}
}
}
let summary = DetectionSummary {
counts_by_type,
confidence_histogram: confidence_histogram.to_vec(),
total_detections,
};
serde_json::to_vec(&summary).unwrap_or_default()
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DetectionSummary {
pub counts_by_type: HashMap<String, u32>,
pub confidence_histogram: Vec<u32>,
pub total_detections: u32,
}
#[derive(Debug, Default)]
pub struct TelemetrySummaryStrategy;
impl TelemetrySummaryStrategy {
pub fn new() -> Self {
Self
}
}
impl SummaryStrategy for TelemetrySummaryStrategy {
fn event_type(&self) -> &str {
"telemetry"
}
fn summarize(&self, events: &[PeatEvent]) -> Vec<u8> {
let mut metrics: HashMap<String, MetricStats> = HashMap::new();
for event in events {
if !event.payload_value.is_empty() {
if let Ok(payload) =
serde_json::from_slice::<serde_json::Value>(&event.payload_value)
{
if let Some(obj) = payload.as_object() {
for (key, value) in obj {
if let Some(v) = value.as_f64() {
let stats = metrics.entry(key.clone()).or_default();
stats.update(v);
}
}
}
}
}
let metric_name = event
.event_type
.strip_prefix("telemetry.")
.unwrap_or(&event.event_type);
let stats = metrics.entry(metric_name.to_string()).or_default();
if stats.count == 0 {
stats.count = 1;
} else {
stats.count += 1;
}
}
let summary = TelemetrySummary {
metrics: metrics
.into_iter()
.map(|(k, v)| (k, v.finalize()))
.collect(),
sample_count: events.len() as u32,
};
serde_json::to_vec(&summary).unwrap_or_default()
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct TelemetrySummary {
pub metrics: HashMap<String, MetricSummaryStats>,
pub sample_count: u32,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct MetricStats {
min: f64,
max: f64,
sum: f64,
count: u32,
}
impl MetricStats {
pub fn update(&mut self, value: f64) {
if self.count == 0 {
self.min = value;
self.max = value;
} else {
self.min = self.min.min(value);
self.max = self.max.max(value);
}
self.sum += value;
self.count += 1;
}
pub fn finalize(&self) -> MetricSummaryStats {
MetricSummaryStats {
min: self.min,
max: self.max,
avg: if self.count > 0 {
self.sum / self.count as f64
} else {
0.0
},
count: self.count,
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct MetricSummaryStats {
pub min: f64,
pub max: f64,
pub avg: f64,
pub count: u32,
}
#[derive(Debug, Default)]
pub struct AnomalySummaryStrategy;
impl AnomalySummaryStrategy {
pub fn new() -> Self {
Self
}
}
impl SummaryStrategy for AnomalySummaryStrategy {
fn event_type(&self) -> &str {
"anomaly"
}
fn summarize(&self, events: &[PeatEvent]) -> Vec<u8> {
let mut counts_by_severity: HashMap<String, u32> = HashMap::new();
let mut anomaly_types: std::collections::HashSet<String> = std::collections::HashSet::new();
for event in events {
let severity = if let Some(routing) = &event.routing {
match routing.priority {
0 => "critical",
1 => "high",
2 => "normal",
3 => "low",
_ => "unknown",
}
} else {
"unknown"
};
*counts_by_severity.entry(severity.to_string()).or_default() += 1;
let anomaly_type = event
.event_type
.strip_prefix("anomaly.")
.unwrap_or(&event.event_type);
anomaly_types.insert(anomaly_type.to_string());
}
let summary = AnomalySummary {
counts_by_severity,
anomaly_types: anomaly_types.into_iter().collect(),
total_anomalies: events.len() as u32,
};
serde_json::to_vec(&summary).unwrap_or_default()
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct AnomalySummary {
pub counts_by_severity: HashMap<String, u32>,
pub anomaly_types: Vec<String>,
pub total_anomalies: u32,
}
#[cfg(test)]
mod tests {
use super::*;
use peat_schema::common::v1::Timestamp;
use peat_schema::event::v1::{AggregationPolicy, EventClass, EventPriority, PropagationMode};
fn make_event(event_type: &str, payload: Option<serde_json::Value>) -> PeatEvent {
PeatEvent {
event_id: "test-1".to_string(),
timestamp: Some(Timestamp {
seconds: 0,
nanos: 0,
}),
source_node_id: "node-1".to_string(),
source_formation_id: "squad-1".to_string(),
source_instance_id: None,
event_class: EventClass::Product as i32,
event_type: event_type.to_string(),
routing: Some(AggregationPolicy {
propagation: PropagationMode::PropagationSummary as i32,
priority: EventPriority::PriorityNormal as i32,
ttl_seconds: 300,
aggregation_window_ms: 1000,
}),
payload_type_url: String::new(),
payload_value: payload
.map(|p| serde_json::to_vec(&p).unwrap())
.unwrap_or_default(),
}
}
#[test]
fn test_default_strategy() {
let strategy = DefaultSummaryStrategy::new("test");
assert_eq!(strategy.event_type(), "test");
let events = vec![
make_event("test.a", None),
make_event("test.b", None),
make_event("test.a", None),
];
let summary_bytes = strategy.summarize(&events);
let summary: serde_json::Value = serde_json::from_slice(&summary_bytes).unwrap();
assert_eq!(summary["event_count"], 3);
assert_eq!(summary["event_type"], "test");
}
#[test]
fn test_detection_strategy_counts() {
let strategy = DetectionSummaryStrategy::new();
assert_eq!(strategy.event_type(), "detection");
let events = vec![
make_event("detection.vehicle", None),
make_event("detection.person", None),
make_event("detection.vehicle", None),
];
let summary_bytes = strategy.summarize(&events);
let summary: DetectionSummary = serde_json::from_slice(&summary_bytes).unwrap();
assert_eq!(summary.total_detections, 3);
assert_eq!(*summary.counts_by_type.get("vehicle").unwrap(), 2);
assert_eq!(*summary.counts_by_type.get("person").unwrap(), 1);
}
#[test]
fn test_detection_strategy_confidence() {
let strategy = DetectionSummaryStrategy::new();
let events = vec![
make_event(
"detection.vehicle",
Some(serde_json::json!({"confidence": 0.95})),
),
make_event(
"detection.vehicle",
Some(serde_json::json!({"confidence": 0.85})),
),
make_event(
"detection.vehicle",
Some(serde_json::json!({"confidence": 0.35})),
),
];
let summary_bytes = strategy.summarize(&events);
let summary: DetectionSummary = serde_json::from_slice(&summary_bytes).unwrap();
assert_eq!(summary.confidence_histogram[9], 1);
assert_eq!(summary.confidence_histogram[8], 1);
assert_eq!(summary.confidence_histogram[3], 1);
}
#[test]
fn test_telemetry_strategy() {
let strategy = TelemetrySummaryStrategy::new();
assert_eq!(strategy.event_type(), "telemetry");
let events = vec![
make_event(
"telemetry.cpu",
Some(serde_json::json!({"cpu_percent": 50.0, "memory_mb": 1024.0})),
),
make_event(
"telemetry.cpu",
Some(serde_json::json!({"cpu_percent": 75.0, "memory_mb": 2048.0})),
),
];
let summary_bytes = strategy.summarize(&events);
let summary: TelemetrySummary = serde_json::from_slice(&summary_bytes).unwrap();
assert_eq!(summary.sample_count, 2);
let cpu = summary.metrics.get("cpu_percent").unwrap();
assert_eq!(cpu.min, 50.0);
assert_eq!(cpu.max, 75.0);
assert!((cpu.avg - 62.5).abs() < 0.01);
assert_eq!(cpu.count, 2);
let mem = summary.metrics.get("memory_mb").unwrap();
assert_eq!(mem.min, 1024.0);
assert_eq!(mem.max, 2048.0);
}
#[test]
fn test_anomaly_strategy() {
let strategy = AnomalySummaryStrategy::new();
assert_eq!(strategy.event_type(), "anomaly");
let events = vec![
{
let mut e = make_event("anomaly.intrusion", None);
e.routing.as_mut().unwrap().priority = EventPriority::PriorityCritical as i32;
e
},
{
let mut e = make_event("anomaly.network_spike", None);
e.routing.as_mut().unwrap().priority = EventPriority::PriorityHigh as i32;
e
},
{
let mut e = make_event("anomaly.intrusion", None);
e.routing.as_mut().unwrap().priority = EventPriority::PriorityCritical as i32;
e
},
];
let summary_bytes = strategy.summarize(&events);
let summary: AnomalySummary = serde_json::from_slice(&summary_bytes).unwrap();
assert_eq!(summary.total_anomalies, 3);
assert_eq!(*summary.counts_by_severity.get("critical").unwrap(), 2);
assert_eq!(*summary.counts_by_severity.get("high").unwrap(), 1);
assert!(summary.anomaly_types.contains(&"intrusion".to_string()));
assert!(summary.anomaly_types.contains(&"network_spike".to_string()));
}
#[test]
fn test_empty_events() {
let strategy = DetectionSummaryStrategy::new();
let summary_bytes = strategy.summarize(&[]);
let summary: DetectionSummary = serde_json::from_slice(&summary_bytes).unwrap();
assert_eq!(summary.total_detections, 0);
assert!(summary.counts_by_type.is_empty());
}
}