1use super::priority_queue::PriorityEventQueue;
9use crate::Result;
10use peat_schema::common::v1::Timestamp;
11use peat_schema::event::v1::{
12 AggregationPolicy, EventClass, EventPriority, PeatEvent, PropagationMode,
13};
14use std::collections::HashMap;
15use std::sync::{Arc, RwLock};
16use std::time::SystemTime;
17
18#[derive(Debug)]
24pub struct EventEmitter {
25 node_id: String,
27
28 formation_id: String,
30
31 outbound_queue: Arc<RwLock<PriorityEventQueue>>,
33
34 local_store: Arc<RwLock<HashMap<String, PeatEvent>>>,
36
37 event_counter: Arc<RwLock<u64>>,
39}
40
41impl EventEmitter {
42 pub fn new(node_id: String, formation_id: String) -> Self {
44 Self {
45 node_id,
46 formation_id,
47 outbound_queue: Arc::new(RwLock::new(PriorityEventQueue::new())),
48 local_store: Arc::new(RwLock::new(HashMap::new())),
49 event_counter: Arc::new(RwLock::new(0)),
50 }
51 }
52
53 pub fn emit(&self, event: PeatEvent) -> Result<()> {
60 if event.event_id.is_empty() {
62 return Err(crate::Error::EventOp {
63 message: "Event must have an event_id".to_string(),
64 operation: "emit".to_string(),
65 source: None,
66 });
67 }
68
69 let routing = event.routing.as_ref();
70
71 let propagation = routing
73 .map(|r| {
74 PropagationMode::try_from(r.propagation).unwrap_or(PropagationMode::PropagationFull)
75 })
76 .unwrap_or(PropagationMode::PropagationFull);
77
78 match propagation {
79 PropagationMode::PropagationLocal => {
80 self.store_local(event)?;
82 }
83 PropagationMode::PropagationQuery => {
84 self.store_local(event)?;
86 }
87 PropagationMode::PropagationSummary | PropagationMode::PropagationFull => {
88 let mut queue = self.outbound_queue.write().unwrap();
90 queue.push(event);
91 }
92 }
93
94 Ok(())
95 }
96
97 pub fn emit_new(
101 &self,
102 event_class: EventClass,
103 event_type: String,
104 routing: AggregationPolicy,
105 payload_type_url: String,
106 payload_value: Vec<u8>,
107 source_instance_id: Option<String>,
108 ) -> Result<String> {
109 let event_id = self.generate_event_id();
110
111 let event = PeatEvent {
112 event_id: event_id.clone(),
113 timestamp: Some(current_timestamp()),
114 source_node_id: self.node_id.clone(),
115 source_formation_id: self.formation_id.clone(),
116 source_instance_id,
117 event_class: event_class as i32,
118 event_type,
119 routing: Some(routing),
120 payload_type_url,
121 payload_value,
122 };
123
124 self.emit(event)?;
125 Ok(event_id)
126 }
127
128 pub fn emit_product(
132 &self,
133 product_type: &str,
134 payload: Vec<u8>,
135 propagation: PropagationMode,
136 priority: EventPriority,
137 ) -> Result<String> {
138 let routing = AggregationPolicy {
139 propagation: propagation as i32,
140 priority: priority as i32,
141 ttl_seconds: 300,
142 aggregation_window_ms: if propagation == PropagationMode::PropagationSummary {
143 1000
144 } else {
145 0
146 },
147 };
148
149 self.emit_new(
150 EventClass::Product,
151 format!("product.{}", product_type),
152 routing,
153 format!("type.peat/product.{}", product_type),
154 payload,
155 None,
156 )
157 }
158
159 pub fn emit_telemetry(&self, metric_name: &str, payload: Vec<u8>) -> Result<String> {
163 let routing = AggregationPolicy {
164 propagation: PropagationMode::PropagationQuery as i32,
165 priority: EventPriority::PriorityLow as i32,
166 ttl_seconds: 3600, aggregation_window_ms: 0,
168 };
169
170 self.emit_new(
171 EventClass::Telemetry,
172 format!("telemetry.{}", metric_name),
173 routing,
174 format!("type.peat/telemetry.{}", metric_name),
175 payload,
176 None,
177 )
178 }
179
180 pub fn emit_anomaly(&self, anomaly_type: &str, payload: Vec<u8>) -> Result<String> {
184 let routing = AggregationPolicy {
185 propagation: PropagationMode::PropagationFull as i32,
186 priority: EventPriority::PriorityHigh as i32,
187 ttl_seconds: 600, aggregation_window_ms: 0,
189 };
190
191 self.emit_new(
192 EventClass::Anomaly,
193 format!("anomaly.{}", anomaly_type),
194 routing,
195 format!("type.peat/anomaly.{}", anomaly_type),
196 payload,
197 None,
198 )
199 }
200
201 pub fn emit_critical(&self, event_type: &str, payload: Vec<u8>) -> Result<String> {
205 let routing = AggregationPolicy {
206 propagation: PropagationMode::PropagationFull as i32,
207 priority: EventPriority::PriorityCritical as i32,
208 ttl_seconds: 300,
209 aggregation_window_ms: 0,
210 };
211
212 self.emit_new(
213 EventClass::Anomaly,
214 format!("critical.{}", event_type),
215 routing,
216 format!("type.peat/critical.{}", event_type),
217 payload,
218 None,
219 )
220 }
221
222 pub fn pop_critical(&self) -> Vec<PeatEvent> {
224 let mut queue = self.outbound_queue.write().unwrap();
225 queue.pop_critical()
226 }
227
228 pub fn pop_events(&self, max_events: usize) -> Vec<PeatEvent> {
230 let mut queue = self.outbound_queue.write().unwrap();
231
232 let mut events = queue.pop_critical();
234
235 let remaining = max_events.saturating_sub(events.len());
237 if remaining > 0 {
238 events.extend(queue.pop_weighted(remaining));
239 }
240
241 events
242 }
243
244 pub fn has_critical(&self) -> bool {
246 let queue = self.outbound_queue.read().unwrap();
247 queue.has_critical()
248 }
249
250 pub fn pending_count(&self) -> usize {
252 let queue = self.outbound_queue.read().unwrap();
253 queue.len()
254 }
255
256 pub fn local_count(&self) -> usize {
258 let store = self.local_store.read().unwrap();
259 store.len()
260 }
261
262 pub fn query_local(&self, event_type: Option<&str>) -> Vec<PeatEvent> {
264 let store = self.local_store.read().unwrap();
265 store
266 .values()
267 .filter(|e| event_type.is_none() || Some(e.event_type.as_str()) == event_type)
268 .cloned()
269 .collect()
270 }
271
272 pub fn get_local(&self, event_id: &str) -> Option<PeatEvent> {
274 let store = self.local_store.read().unwrap();
275 store.get(event_id).cloned()
276 }
277
278 pub fn node_id(&self) -> &str {
280 &self.node_id
281 }
282
283 pub fn formation_id(&self) -> &str {
285 &self.formation_id
286 }
287
288 fn store_local(&self, event: PeatEvent) -> Result<()> {
291 let mut store = self.local_store.write().unwrap();
292 store.insert(event.event_id.clone(), event);
293 Ok(())
294 }
295
296 fn generate_event_id(&self) -> String {
297 let mut counter = self.event_counter.write().unwrap();
298 *counter += 1;
299 format!("{}-{}", self.node_id, *counter)
300 }
301}
302
303fn current_timestamp() -> Timestamp {
305 let now = SystemTime::now()
306 .duration_since(SystemTime::UNIX_EPOCH)
307 .unwrap();
308 Timestamp {
309 seconds: now.as_secs(),
310 nanos: now.subsec_nanos(),
311 }
312}
313
314#[cfg(test)]
315mod tests {
316 use super::*;
317
318 #[test]
319 fn test_emit_event_full_propagation() {
320 let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
321
322 let event = PeatEvent {
323 event_id: "evt-1".to_string(),
324 timestamp: None,
325 source_node_id: "node-1".to_string(),
326 source_formation_id: "squad-1".to_string(),
327 source_instance_id: None,
328 event_class: EventClass::Product as i32,
329 event_type: "detection".to_string(),
330 routing: Some(AggregationPolicy {
331 propagation: PropagationMode::PropagationFull as i32,
332 priority: EventPriority::PriorityNormal as i32,
333 ttl_seconds: 300,
334 aggregation_window_ms: 0,
335 }),
336 payload_type_url: String::new(),
337 payload_value: vec![],
338 };
339
340 emitter.emit(event).unwrap();
341
342 assert_eq!(emitter.pending_count(), 1);
343 assert_eq!(emitter.local_count(), 0);
344 }
345
346 #[test]
347 fn test_emit_event_local_propagation() {
348 let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
349
350 let event = PeatEvent {
351 event_id: "evt-1".to_string(),
352 timestamp: None,
353 source_node_id: "node-1".to_string(),
354 source_formation_id: "squad-1".to_string(),
355 source_instance_id: None,
356 event_class: EventClass::Telemetry as i32,
357 event_type: "debug".to_string(),
358 routing: Some(AggregationPolicy {
359 propagation: PropagationMode::PropagationLocal as i32,
360 priority: EventPriority::PriorityLow as i32,
361 ttl_seconds: 60,
362 aggregation_window_ms: 0,
363 }),
364 payload_type_url: String::new(),
365 payload_value: vec![],
366 };
367
368 emitter.emit(event).unwrap();
369
370 assert_eq!(emitter.pending_count(), 0);
371 assert_eq!(emitter.local_count(), 1);
372 }
373
374 #[test]
375 fn test_emit_event_query_propagation() {
376 let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
377
378 let event = PeatEvent {
379 event_id: "evt-1".to_string(),
380 timestamp: None,
381 source_node_id: "node-1".to_string(),
382 source_formation_id: "squad-1".to_string(),
383 source_instance_id: None,
384 event_class: EventClass::Telemetry as i32,
385 event_type: "metrics".to_string(),
386 routing: Some(AggregationPolicy {
387 propagation: PropagationMode::PropagationQuery as i32,
388 priority: EventPriority::PriorityLow as i32,
389 ttl_seconds: 3600,
390 aggregation_window_ms: 0,
391 }),
392 payload_type_url: String::new(),
393 payload_value: vec![],
394 };
395
396 emitter.emit(event).unwrap();
397
398 assert_eq!(emitter.pending_count(), 0);
400 assert_eq!(emitter.local_count(), 1);
401
402 let local = emitter.query_local(Some("metrics"));
404 assert_eq!(local.len(), 1);
405 }
406
407 #[test]
408 fn test_emit_new_generates_id() {
409 let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
410
411 let routing = AggregationPolicy {
412 propagation: PropagationMode::PropagationFull as i32,
413 priority: EventPriority::PriorityNormal as i32,
414 ttl_seconds: 300,
415 aggregation_window_ms: 0,
416 };
417
418 let event_id = emitter
419 .emit_new(
420 EventClass::Product,
421 "test".to_string(),
422 routing,
423 String::new(),
424 vec![],
425 None,
426 )
427 .unwrap();
428
429 assert!(event_id.starts_with("node-1-"));
430 assert_eq!(emitter.pending_count(), 1);
431 }
432
433 #[test]
434 fn test_emit_product() {
435 let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
436
437 let event_id = emitter
438 .emit_product(
439 "output_v1",
440 vec![1, 2, 3],
441 PropagationMode::PropagationSummary,
442 EventPriority::PriorityNormal,
443 )
444 .unwrap();
445
446 assert!(!event_id.is_empty());
447 assert_eq!(emitter.pending_count(), 1);
448 }
449
450 #[test]
451 fn test_emit_telemetry_stored_locally() {
452 let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
453
454 emitter.emit_telemetry("cpu_usage", vec![42]).unwrap();
455
456 assert_eq!(emitter.pending_count(), 0);
458 assert_eq!(emitter.local_count(), 1);
459 }
460
461 #[test]
462 fn test_emit_critical() {
463 let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
464
465 emitter.emit_critical("urgent_condition", vec![]).unwrap();
466
467 assert!(emitter.has_critical());
468
469 let critical = emitter.pop_critical();
470 assert_eq!(critical.len(), 1);
471 assert!(critical[0].event_type.starts_with("critical."));
472 }
473
474 #[test]
475 fn test_pop_events_critical_first() {
476 let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
477
478 emitter
480 .emit_product(
481 "normal_output",
482 vec![],
483 PropagationMode::PropagationFull,
484 EventPriority::PriorityNormal,
485 )
486 .unwrap();
487
488 emitter
490 .emit_critical("immediate_attention", vec![])
491 .unwrap();
492
493 let events = emitter.pop_events(10);
495 assert_eq!(events.len(), 2);
496 assert!(events[0].event_type.starts_with("critical."));
497 }
498
499 #[test]
500 fn test_emit_without_event_id_fails() {
501 let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
502
503 let event = PeatEvent {
504 event_id: String::new(), timestamp: None,
506 source_node_id: "node-1".to_string(),
507 source_formation_id: "squad-1".to_string(),
508 source_instance_id: None,
509 event_class: EventClass::Product as i32,
510 event_type: "test".to_string(),
511 routing: None,
512 payload_type_url: String::new(),
513 payload_value: vec![],
514 };
515
516 let result = emitter.emit(event);
517 assert!(result.is_err());
518 }
519
520 #[test]
521 fn test_query_local_by_type() {
522 let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
523
524 emitter.emit_telemetry("cpu", vec![]).unwrap();
526 emitter.emit_telemetry("memory", vec![]).unwrap();
527 emitter.emit_telemetry("cpu", vec![]).unwrap();
528
529 let all = emitter.query_local(None);
531 assert_eq!(all.len(), 3);
532
533 let cpu = emitter.query_local(Some("telemetry.cpu"));
535 assert_eq!(cpu.len(), 2);
536
537 let memory = emitter.query_local(Some("telemetry.memory"));
538 assert_eq!(memory.len(), 1);
539 }
540}