use super::priority_queue::PriorityEventQueue;
use crate::Result;
use peat_schema::common::v1::Timestamp;
use peat_schema::event::v1::{
AggregationPolicy, EventClass, EventPriority, PeatEvent, PropagationMode,
};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::SystemTime;
#[derive(Debug)]
pub struct EventEmitter {
node_id: String,
formation_id: String,
outbound_queue: Arc<RwLock<PriorityEventQueue>>,
local_store: Arc<RwLock<HashMap<String, PeatEvent>>>,
event_counter: Arc<RwLock<u64>>,
}
impl EventEmitter {
pub fn new(node_id: String, formation_id: String) -> Self {
Self {
node_id,
formation_id,
outbound_queue: Arc::new(RwLock::new(PriorityEventQueue::new())),
local_store: Arc::new(RwLock::new(HashMap::new())),
event_counter: Arc::new(RwLock::new(0)),
}
}
pub fn emit(&self, event: PeatEvent) -> Result<()> {
if event.event_id.is_empty() {
return Err(crate::Error::EventOp {
message: "Event must have an event_id".to_string(),
operation: "emit".to_string(),
source: None,
});
}
let routing = event.routing.as_ref();
let propagation = routing
.map(|r| {
PropagationMode::try_from(r.propagation).unwrap_or(PropagationMode::PropagationFull)
})
.unwrap_or(PropagationMode::PropagationFull);
match propagation {
PropagationMode::PropagationLocal => {
self.store_local(event)?;
}
PropagationMode::PropagationQuery => {
self.store_local(event)?;
}
PropagationMode::PropagationSummary | PropagationMode::PropagationFull => {
let mut queue = self.outbound_queue.write().unwrap();
queue.push(event);
}
}
Ok(())
}
pub fn emit_new(
&self,
event_class: EventClass,
event_type: String,
routing: AggregationPolicy,
payload_type_url: String,
payload_value: Vec<u8>,
source_instance_id: Option<String>,
) -> Result<String> {
let event_id = self.generate_event_id();
let event = PeatEvent {
event_id: event_id.clone(),
timestamp: Some(current_timestamp()),
source_node_id: self.node_id.clone(),
source_formation_id: self.formation_id.clone(),
source_instance_id,
event_class: event_class as i32,
event_type,
routing: Some(routing),
payload_type_url,
payload_value,
};
self.emit(event)?;
Ok(event_id)
}
pub fn emit_product(
&self,
product_type: &str,
payload: Vec<u8>,
propagation: PropagationMode,
priority: EventPriority,
) -> Result<String> {
let routing = AggregationPolicy {
propagation: propagation as i32,
priority: priority as i32,
ttl_seconds: 300,
aggregation_window_ms: if propagation == PropagationMode::PropagationSummary {
1000
} else {
0
},
};
self.emit_new(
EventClass::Product,
format!("product.{}", product_type),
routing,
format!("type.peat/product.{}", product_type),
payload,
None,
)
}
pub fn emit_telemetry(&self, metric_name: &str, payload: Vec<u8>) -> Result<String> {
let routing = AggregationPolicy {
propagation: PropagationMode::PropagationQuery as i32,
priority: EventPriority::PriorityLow as i32,
ttl_seconds: 3600, aggregation_window_ms: 0,
};
self.emit_new(
EventClass::Telemetry,
format!("telemetry.{}", metric_name),
routing,
format!("type.peat/telemetry.{}", metric_name),
payload,
None,
)
}
pub fn emit_anomaly(&self, anomaly_type: &str, payload: Vec<u8>) -> Result<String> {
let routing = AggregationPolicy {
propagation: PropagationMode::PropagationFull as i32,
priority: EventPriority::PriorityHigh as i32,
ttl_seconds: 600, aggregation_window_ms: 0,
};
self.emit_new(
EventClass::Anomaly,
format!("anomaly.{}", anomaly_type),
routing,
format!("type.peat/anomaly.{}", anomaly_type),
payload,
None,
)
}
pub fn emit_critical(&self, event_type: &str, payload: Vec<u8>) -> Result<String> {
let routing = AggregationPolicy {
propagation: PropagationMode::PropagationFull as i32,
priority: EventPriority::PriorityCritical as i32,
ttl_seconds: 300,
aggregation_window_ms: 0,
};
self.emit_new(
EventClass::Anomaly,
format!("critical.{}", event_type),
routing,
format!("type.peat/critical.{}", event_type),
payload,
None,
)
}
pub fn pop_critical(&self) -> Vec<PeatEvent> {
let mut queue = self.outbound_queue.write().unwrap();
queue.pop_critical()
}
pub fn pop_events(&self, max_events: usize) -> Vec<PeatEvent> {
let mut queue = self.outbound_queue.write().unwrap();
let mut events = queue.pop_critical();
let remaining = max_events.saturating_sub(events.len());
if remaining > 0 {
events.extend(queue.pop_weighted(remaining));
}
events
}
pub fn has_critical(&self) -> bool {
let queue = self.outbound_queue.read().unwrap();
queue.has_critical()
}
pub fn pending_count(&self) -> usize {
let queue = self.outbound_queue.read().unwrap();
queue.len()
}
pub fn local_count(&self) -> usize {
let store = self.local_store.read().unwrap();
store.len()
}
pub fn query_local(&self, event_type: Option<&str>) -> Vec<PeatEvent> {
let store = self.local_store.read().unwrap();
store
.values()
.filter(|e| event_type.is_none() || Some(e.event_type.as_str()) == event_type)
.cloned()
.collect()
}
pub fn get_local(&self, event_id: &str) -> Option<PeatEvent> {
let store = self.local_store.read().unwrap();
store.get(event_id).cloned()
}
pub fn node_id(&self) -> &str {
&self.node_id
}
pub fn formation_id(&self) -> &str {
&self.formation_id
}
fn store_local(&self, event: PeatEvent) -> Result<()> {
let mut store = self.local_store.write().unwrap();
store.insert(event.event_id.clone(), event);
Ok(())
}
fn generate_event_id(&self) -> String {
let mut counter = self.event_counter.write().unwrap();
*counter += 1;
format!("{}-{}", self.node_id, *counter)
}
}
fn current_timestamp() -> Timestamp {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
Timestamp {
seconds: now.as_secs(),
nanos: now.subsec_nanos(),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_emit_event_full_propagation() {
let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
let event = PeatEvent {
event_id: "evt-1".to_string(),
timestamp: None,
source_node_id: "node-1".to_string(),
source_formation_id: "squad-1".to_string(),
source_instance_id: None,
event_class: EventClass::Product as i32,
event_type: "detection".to_string(),
routing: Some(AggregationPolicy {
propagation: PropagationMode::PropagationFull as i32,
priority: EventPriority::PriorityNormal as i32,
ttl_seconds: 300,
aggregation_window_ms: 0,
}),
payload_type_url: String::new(),
payload_value: vec![],
};
emitter.emit(event).unwrap();
assert_eq!(emitter.pending_count(), 1);
assert_eq!(emitter.local_count(), 0);
}
#[test]
fn test_emit_event_local_propagation() {
let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
let event = PeatEvent {
event_id: "evt-1".to_string(),
timestamp: None,
source_node_id: "node-1".to_string(),
source_formation_id: "squad-1".to_string(),
source_instance_id: None,
event_class: EventClass::Telemetry as i32,
event_type: "debug".to_string(),
routing: Some(AggregationPolicy {
propagation: PropagationMode::PropagationLocal as i32,
priority: EventPriority::PriorityLow as i32,
ttl_seconds: 60,
aggregation_window_ms: 0,
}),
payload_type_url: String::new(),
payload_value: vec![],
};
emitter.emit(event).unwrap();
assert_eq!(emitter.pending_count(), 0);
assert_eq!(emitter.local_count(), 1);
}
#[test]
fn test_emit_event_query_propagation() {
let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
let event = PeatEvent {
event_id: "evt-1".to_string(),
timestamp: None,
source_node_id: "node-1".to_string(),
source_formation_id: "squad-1".to_string(),
source_instance_id: None,
event_class: EventClass::Telemetry as i32,
event_type: "metrics".to_string(),
routing: Some(AggregationPolicy {
propagation: PropagationMode::PropagationQuery as i32,
priority: EventPriority::PriorityLow as i32,
ttl_seconds: 3600,
aggregation_window_ms: 0,
}),
payload_type_url: String::new(),
payload_value: vec![],
};
emitter.emit(event).unwrap();
assert_eq!(emitter.pending_count(), 0);
assert_eq!(emitter.local_count(), 1);
let local = emitter.query_local(Some("metrics"));
assert_eq!(local.len(), 1);
}
#[test]
fn test_emit_new_generates_id() {
let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
let routing = AggregationPolicy {
propagation: PropagationMode::PropagationFull as i32,
priority: EventPriority::PriorityNormal as i32,
ttl_seconds: 300,
aggregation_window_ms: 0,
};
let event_id = emitter
.emit_new(
EventClass::Product,
"test".to_string(),
routing,
String::new(),
vec![],
None,
)
.unwrap();
assert!(event_id.starts_with("node-1-"));
assert_eq!(emitter.pending_count(), 1);
}
#[test]
fn test_emit_product() {
let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
let event_id = emitter
.emit_product(
"output_v1",
vec![1, 2, 3],
PropagationMode::PropagationSummary,
EventPriority::PriorityNormal,
)
.unwrap();
assert!(!event_id.is_empty());
assert_eq!(emitter.pending_count(), 1);
}
#[test]
fn test_emit_telemetry_stored_locally() {
let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
emitter.emit_telemetry("cpu_usage", vec![42]).unwrap();
assert_eq!(emitter.pending_count(), 0);
assert_eq!(emitter.local_count(), 1);
}
#[test]
fn test_emit_critical() {
let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
emitter.emit_critical("urgent_condition", vec![]).unwrap();
assert!(emitter.has_critical());
let critical = emitter.pop_critical();
assert_eq!(critical.len(), 1);
assert!(critical[0].event_type.starts_with("critical."));
}
#[test]
fn test_pop_events_critical_first() {
let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
emitter
.emit_product(
"normal_output",
vec![],
PropagationMode::PropagationFull,
EventPriority::PriorityNormal,
)
.unwrap();
emitter
.emit_critical("immediate_attention", vec![])
.unwrap();
let events = emitter.pop_events(10);
assert_eq!(events.len(), 2);
assert!(events[0].event_type.starts_with("critical."));
}
#[test]
fn test_emit_without_event_id_fails() {
let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
let event = PeatEvent {
event_id: String::new(), timestamp: None,
source_node_id: "node-1".to_string(),
source_formation_id: "squad-1".to_string(),
source_instance_id: None,
event_class: EventClass::Product as i32,
event_type: "test".to_string(),
routing: None,
payload_type_url: String::new(),
payload_value: vec![],
};
let result = emitter.emit(event);
assert!(result.is_err());
}
#[test]
fn test_query_local_by_type() {
let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
emitter.emit_telemetry("cpu", vec![]).unwrap();
emitter.emit_telemetry("memory", vec![]).unwrap();
emitter.emit_telemetry("cpu", vec![]).unwrap();
let all = emitter.query_local(None);
assert_eq!(all.len(), 3);
let cpu = emitter.query_local(Some("telemetry.cpu"));
assert_eq!(cpu.len(), 2);
let memory = emitter.query_local(Some("telemetry.memory"));
assert_eq!(memory.len(), 1);
}
}