Skip to main content

peat_protocol/event/
emitter.rs

1//! Event emission with routing policy enforcement (ADR-027)
2//!
3//! The EventEmitter is responsible for:
4//! 1. Accepting events from application code
5//! 2. Enforcing propagation policies
6//! 3. Queueing events by priority for transmission
7
8use super::priority_queue::PriorityEventQueue;
9use crate::Result;
10use peat_schema::common::v1::Timestamp;
11use peat_schema::event::v1::{
12    AggregationPolicy, EventClass, EventPriority, PeatEvent, PropagationMode,
13};
14use std::collections::HashMap;
15use std::sync::{Arc, RwLock};
16use std::time::SystemTime;
17
18/// Event emitter for a single node
19///
20/// Handles event creation, policy enforcement, and priority queuing.
21/// Events with `PropagationMode::Local` are stored locally and not queued
22/// for transmission.
23#[derive(Debug)]
24pub struct EventEmitter {
25    /// Node identifier
26    node_id: String,
27
28    /// Formation this node belongs to
29    formation_id: String,
30
31    /// Outbound priority queue for events to transmit
32    outbound_queue: Arc<RwLock<PriorityEventQueue>>,
33
34    /// Local event storage for non-propagating events (keyed by event_id)
35    local_store: Arc<RwLock<HashMap<String, PeatEvent>>>,
36
37    /// Counter for generating unique event IDs
38    event_counter: Arc<RwLock<u64>>,
39}
40
41impl EventEmitter {
42    /// Create a new event emitter for a node
43    pub fn new(node_id: String, formation_id: String) -> Self {
44        Self {
45            node_id,
46            formation_id,
47            outbound_queue: Arc::new(RwLock::new(PriorityEventQueue::new())),
48            local_store: Arc::new(RwLock::new(HashMap::new())),
49            event_counter: Arc::new(RwLock::new(0)),
50        }
51    }
52
53    /// Emit an event with automatic routing based on policy
54    ///
55    /// Events are routed according to their `AggregationPolicy`:
56    /// - `PropagationMode::Local`: Stored locally, not transmitted
57    /// - `PropagationMode::Query`: Stored locally for query access
58    /// - `PropagationMode::Summary` or `Full`: Queued for transmission
59    pub fn emit(&self, event: PeatEvent) -> Result<()> {
60        // Validate event has required fields
61        if event.event_id.is_empty() {
62            return Err(crate::Error::EventOp {
63                message: "Event must have an event_id".to_string(),
64                operation: "emit".to_string(),
65                source: None,
66            });
67        }
68
69        let routing = event.routing.as_ref();
70
71        // Check propagation mode
72        let propagation = routing
73            .map(|r| {
74                PropagationMode::try_from(r.propagation).unwrap_or(PropagationMode::PropagationFull)
75            })
76            .unwrap_or(PropagationMode::PropagationFull);
77
78        match propagation {
79            PropagationMode::PropagationLocal => {
80                // Store locally only, do not queue for transmission
81                self.store_local(event)?;
82            }
83            PropagationMode::PropagationQuery => {
84                // Store locally for query access, do not transmit
85                self.store_local(event)?;
86            }
87            PropagationMode::PropagationSummary | PropagationMode::PropagationFull => {
88                // Queue for transmission to parent echelon
89                let mut queue = self.outbound_queue.write().unwrap();
90                queue.push(event);
91            }
92        }
93
94        Ok(())
95    }
96
97    /// Create and emit an event with the given parameters
98    ///
99    /// Automatically generates event_id and timestamp.
100    pub fn emit_new(
101        &self,
102        event_class: EventClass,
103        event_type: String,
104        routing: AggregationPolicy,
105        payload_type_url: String,
106        payload_value: Vec<u8>,
107        source_instance_id: Option<String>,
108    ) -> Result<String> {
109        let event_id = self.generate_event_id();
110
111        let event = PeatEvent {
112            event_id: event_id.clone(),
113            timestamp: Some(current_timestamp()),
114            source_node_id: self.node_id.clone(),
115            source_formation_id: self.formation_id.clone(),
116            source_instance_id,
117            event_class: event_class as i32,
118            event_type,
119            routing: Some(routing),
120            payload_type_url,
121            payload_value,
122        };
123
124        self.emit(event)?;
125        Ok(event_id)
126    }
127
128    /// Emit a product event (output from software processing)
129    ///
130    /// Products are the primary output events from software instances.
131    pub fn emit_product(
132        &self,
133        product_type: &str,
134        payload: Vec<u8>,
135        propagation: PropagationMode,
136        priority: EventPriority,
137    ) -> Result<String> {
138        let routing = AggregationPolicy {
139            propagation: propagation as i32,
140            priority: priority as i32,
141            ttl_seconds: 300,
142            aggregation_window_ms: if propagation == PropagationMode::PropagationSummary {
143                1000
144            } else {
145                0
146            },
147        };
148
149        self.emit_new(
150            EventClass::Product,
151            format!("product.{}", product_type),
152            routing,
153            format!("type.peat/product.{}", product_type),
154            payload,
155            None,
156        )
157    }
158
159    /// Emit a telemetry event (metrics, health, diagnostics)
160    ///
161    /// Telemetry defaults to Query propagation (stored locally, queryable)
162    pub fn emit_telemetry(&self, metric_name: &str, payload: Vec<u8>) -> Result<String> {
163        let routing = AggregationPolicy {
164            propagation: PropagationMode::PropagationQuery as i32,
165            priority: EventPriority::PriorityLow as i32,
166            ttl_seconds: 3600, // 1 hour TTL for telemetry
167            aggregation_window_ms: 0,
168        };
169
170        self.emit_new(
171            EventClass::Telemetry,
172            format!("telemetry.{}", metric_name),
173            routing,
174            format!("type.peat/telemetry.{}", metric_name),
175            payload,
176            None,
177        )
178    }
179
180    /// Emit an anomaly event (unusual patterns, alerts)
181    ///
182    /// Anomalies default to Full propagation with High priority
183    pub fn emit_anomaly(&self, anomaly_type: &str, payload: Vec<u8>) -> Result<String> {
184        let routing = AggregationPolicy {
185            propagation: PropagationMode::PropagationFull as i32,
186            priority: EventPriority::PriorityHigh as i32,
187            ttl_seconds: 600, // 10 minutes
188            aggregation_window_ms: 0,
189        };
190
191        self.emit_new(
192            EventClass::Anomaly,
193            format!("anomaly.{}", anomaly_type),
194            routing,
195            format!("type.peat/anomaly.{}", anomaly_type),
196            payload,
197            None,
198        )
199    }
200
201    /// Emit a critical event (immediate attention required)
202    ///
203    /// Critical events have CRITICAL priority and Full propagation
204    pub fn emit_critical(&self, event_type: &str, payload: Vec<u8>) -> Result<String> {
205        let routing = AggregationPolicy {
206            propagation: PropagationMode::PropagationFull as i32,
207            priority: EventPriority::PriorityCritical as i32,
208            ttl_seconds: 300,
209            aggregation_window_ms: 0,
210        };
211
212        self.emit_new(
213            EventClass::Anomaly,
214            format!("critical.{}", event_type),
215            routing,
216            format!("type.peat/critical.{}", event_type),
217            payload,
218            None,
219        )
220    }
221
222    /// Pop all critical events for immediate transmission
223    pub fn pop_critical(&self) -> Vec<PeatEvent> {
224        let mut queue = self.outbound_queue.write().unwrap();
225        queue.pop_critical()
226    }
227
228    /// Pop events for transmission using weighted fair queuing
229    pub fn pop_events(&self, max_events: usize) -> Vec<PeatEvent> {
230        let mut queue = self.outbound_queue.write().unwrap();
231
232        // Always drain critical first
233        let mut events = queue.pop_critical();
234
235        // Then weighted fair queue for the rest
236        let remaining = max_events.saturating_sub(events.len());
237        if remaining > 0 {
238            events.extend(queue.pop_weighted(remaining));
239        }
240
241        events
242    }
243
244    /// Check if there are critical events pending
245    pub fn has_critical(&self) -> bool {
246        let queue = self.outbound_queue.read().unwrap();
247        queue.has_critical()
248    }
249
250    /// Get count of pending outbound events
251    pub fn pending_count(&self) -> usize {
252        let queue = self.outbound_queue.read().unwrap();
253        queue.len()
254    }
255
256    /// Get count of locally stored events
257    pub fn local_count(&self) -> usize {
258        let store = self.local_store.read().unwrap();
259        store.len()
260    }
261
262    /// Query locally stored events by event type
263    pub fn query_local(&self, event_type: Option<&str>) -> Vec<PeatEvent> {
264        let store = self.local_store.read().unwrap();
265        store
266            .values()
267            .filter(|e| event_type.is_none() || Some(e.event_type.as_str()) == event_type)
268            .cloned()
269            .collect()
270    }
271
272    /// Get a specific locally stored event by ID
273    pub fn get_local(&self, event_id: &str) -> Option<PeatEvent> {
274        let store = self.local_store.read().unwrap();
275        store.get(event_id).cloned()
276    }
277
278    /// Get the node ID
279    pub fn node_id(&self) -> &str {
280        &self.node_id
281    }
282
283    /// Get the formation ID
284    pub fn formation_id(&self) -> &str {
285        &self.formation_id
286    }
287
288    // Internal helpers
289
290    fn store_local(&self, event: PeatEvent) -> Result<()> {
291        let mut store = self.local_store.write().unwrap();
292        store.insert(event.event_id.clone(), event);
293        Ok(())
294    }
295
296    fn generate_event_id(&self) -> String {
297        let mut counter = self.event_counter.write().unwrap();
298        *counter += 1;
299        format!("{}-{}", self.node_id, *counter)
300    }
301}
302
303/// Get current timestamp
304fn current_timestamp() -> Timestamp {
305    let now = SystemTime::now()
306        .duration_since(SystemTime::UNIX_EPOCH)
307        .unwrap();
308    Timestamp {
309        seconds: now.as_secs(),
310        nanos: now.subsec_nanos(),
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use super::*;
317
318    #[test]
319    fn test_emit_event_full_propagation() {
320        let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
321
322        let event = PeatEvent {
323            event_id: "evt-1".to_string(),
324            timestamp: None,
325            source_node_id: "node-1".to_string(),
326            source_formation_id: "squad-1".to_string(),
327            source_instance_id: None,
328            event_class: EventClass::Product as i32,
329            event_type: "detection".to_string(),
330            routing: Some(AggregationPolicy {
331                propagation: PropagationMode::PropagationFull as i32,
332                priority: EventPriority::PriorityNormal as i32,
333                ttl_seconds: 300,
334                aggregation_window_ms: 0,
335            }),
336            payload_type_url: String::new(),
337            payload_value: vec![],
338        };
339
340        emitter.emit(event).unwrap();
341
342        assert_eq!(emitter.pending_count(), 1);
343        assert_eq!(emitter.local_count(), 0);
344    }
345
346    #[test]
347    fn test_emit_event_local_propagation() {
348        let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
349
350        let event = PeatEvent {
351            event_id: "evt-1".to_string(),
352            timestamp: None,
353            source_node_id: "node-1".to_string(),
354            source_formation_id: "squad-1".to_string(),
355            source_instance_id: None,
356            event_class: EventClass::Telemetry as i32,
357            event_type: "debug".to_string(),
358            routing: Some(AggregationPolicy {
359                propagation: PropagationMode::PropagationLocal as i32,
360                priority: EventPriority::PriorityLow as i32,
361                ttl_seconds: 60,
362                aggregation_window_ms: 0,
363            }),
364            payload_type_url: String::new(),
365            payload_value: vec![],
366        };
367
368        emitter.emit(event).unwrap();
369
370        assert_eq!(emitter.pending_count(), 0);
371        assert_eq!(emitter.local_count(), 1);
372    }
373
374    #[test]
375    fn test_emit_event_query_propagation() {
376        let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
377
378        let event = PeatEvent {
379            event_id: "evt-1".to_string(),
380            timestamp: None,
381            source_node_id: "node-1".to_string(),
382            source_formation_id: "squad-1".to_string(),
383            source_instance_id: None,
384            event_class: EventClass::Telemetry as i32,
385            event_type: "metrics".to_string(),
386            routing: Some(AggregationPolicy {
387                propagation: PropagationMode::PropagationQuery as i32,
388                priority: EventPriority::PriorityLow as i32,
389                ttl_seconds: 3600,
390                aggregation_window_ms: 0,
391            }),
392            payload_type_url: String::new(),
393            payload_value: vec![],
394        };
395
396        emitter.emit(event).unwrap();
397
398        // Query mode stores locally, doesn't transmit
399        assert_eq!(emitter.pending_count(), 0);
400        assert_eq!(emitter.local_count(), 1);
401
402        // Should be queryable
403        let local = emitter.query_local(Some("metrics"));
404        assert_eq!(local.len(), 1);
405    }
406
407    #[test]
408    fn test_emit_new_generates_id() {
409        let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
410
411        let routing = AggregationPolicy {
412            propagation: PropagationMode::PropagationFull as i32,
413            priority: EventPriority::PriorityNormal as i32,
414            ttl_seconds: 300,
415            aggregation_window_ms: 0,
416        };
417
418        let event_id = emitter
419            .emit_new(
420                EventClass::Product,
421                "test".to_string(),
422                routing,
423                String::new(),
424                vec![],
425                None,
426            )
427            .unwrap();
428
429        assert!(event_id.starts_with("node-1-"));
430        assert_eq!(emitter.pending_count(), 1);
431    }
432
433    #[test]
434    fn test_emit_product() {
435        let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
436
437        let event_id = emitter
438            .emit_product(
439                "output_v1",
440                vec![1, 2, 3],
441                PropagationMode::PropagationSummary,
442                EventPriority::PriorityNormal,
443            )
444            .unwrap();
445
446        assert!(!event_id.is_empty());
447        assert_eq!(emitter.pending_count(), 1);
448    }
449
450    #[test]
451    fn test_emit_telemetry_stored_locally() {
452        let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
453
454        emitter.emit_telemetry("cpu_usage", vec![42]).unwrap();
455
456        // Telemetry defaults to QUERY mode - stored locally
457        assert_eq!(emitter.pending_count(), 0);
458        assert_eq!(emitter.local_count(), 1);
459    }
460
461    #[test]
462    fn test_emit_critical() {
463        let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
464
465        emitter.emit_critical("urgent_condition", vec![]).unwrap();
466
467        assert!(emitter.has_critical());
468
469        let critical = emitter.pop_critical();
470        assert_eq!(critical.len(), 1);
471        assert!(critical[0].event_type.starts_with("critical."));
472    }
473
474    #[test]
475    fn test_pop_events_critical_first() {
476        let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
477
478        // Add normal event first
479        emitter
480            .emit_product(
481                "normal_output",
482                vec![],
483                PropagationMode::PropagationFull,
484                EventPriority::PriorityNormal,
485            )
486            .unwrap();
487
488        // Add critical event second
489        emitter
490            .emit_critical("immediate_attention", vec![])
491            .unwrap();
492
493        // Pop should return critical first
494        let events = emitter.pop_events(10);
495        assert_eq!(events.len(), 2);
496        assert!(events[0].event_type.starts_with("critical."));
497    }
498
499    #[test]
500    fn test_emit_without_event_id_fails() {
501        let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
502
503        let event = PeatEvent {
504            event_id: String::new(), // Empty ID should fail
505            timestamp: None,
506            source_node_id: "node-1".to_string(),
507            source_formation_id: "squad-1".to_string(),
508            source_instance_id: None,
509            event_class: EventClass::Product as i32,
510            event_type: "test".to_string(),
511            routing: None,
512            payload_type_url: String::new(),
513            payload_value: vec![],
514        };
515
516        let result = emitter.emit(event);
517        assert!(result.is_err());
518    }
519
520    #[test]
521    fn test_query_local_by_type() {
522        let emitter = EventEmitter::new("node-1".to_string(), "squad-1".to_string());
523
524        // Emit different telemetry types
525        emitter.emit_telemetry("cpu", vec![]).unwrap();
526        emitter.emit_telemetry("memory", vec![]).unwrap();
527        emitter.emit_telemetry("cpu", vec![]).unwrap();
528
529        // Query all
530        let all = emitter.query_local(None);
531        assert_eq!(all.len(), 3);
532
533        // Query by type
534        let cpu = emitter.query_local(Some("telemetry.cpu"));
535        assert_eq!(cpu.len(), 2);
536
537        let memory = emitter.query_local(Some("telemetry.memory"));
538        assert_eq!(memory.len(), 1);
539    }
540}