1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3use std::sync::Arc;
4use tokio::sync::broadcast;
5
6pub type EventKey = String;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11#[serde(untagged)]
12pub enum EventPayload {
13 Empty,
14 String(String),
15 Map(HashMap<String, serde_json::Value>),
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct BoxEvent {
21 pub key: EventKey,
23
24 pub payload: EventPayload,
26
27 pub timestamp: chrono::DateTime<chrono::Utc>,
29}
30
31impl BoxEvent {
32 pub fn new(key: impl Into<String>, payload: EventPayload) -> Self {
34 Self {
35 key: key.into(),
36 payload,
37 timestamp: chrono::Utc::now(),
38 }
39 }
40
41 pub fn empty(key: impl Into<String>) -> Self {
43 Self::new(key, EventPayload::Empty)
44 }
45
46 pub fn with_string(key: impl Into<String>, message: impl Into<String>) -> Self {
48 Self::new(key, EventPayload::String(message.into()))
49 }
50
51 pub fn with_map(key: impl Into<String>, map: HashMap<String, serde_json::Value>) -> Self {
53 Self::new(key, EventPayload::Map(map))
54 }
55}
56
57#[derive(Clone)]
59pub struct EventEmitter {
60 sender: Arc<broadcast::Sender<BoxEvent>>,
61}
62
63impl EventEmitter {
64 pub fn new(capacity: usize) -> Self {
66 let (sender, _) = broadcast::channel(capacity);
67 Self {
68 sender: Arc::new(sender),
69 }
70 }
71
72 pub fn emit(&self, event: BoxEvent) {
74 let _ = self.sender.send(event);
75 }
76
77 pub fn subscribe(&self) -> broadcast::Receiver<BoxEvent> {
79 self.sender.subscribe()
80 }
81
82 pub fn subscribe_filtered(
84 &self,
85 filter: impl Fn(&BoxEvent) -> bool + Send + Sync + 'static,
86 ) -> EventStream {
87 EventStream {
88 receiver: self.sender.subscribe(),
89 filter: Arc::new(filter),
90 }
91 }
92}
93
94pub struct EventStream {
96 receiver: broadcast::Receiver<BoxEvent>,
97 filter: Arc<dyn Fn(&BoxEvent) -> bool + Send + Sync>,
98}
99
100impl EventStream {
101 pub async fn recv(&mut self) -> Option<BoxEvent> {
103 loop {
104 match self.receiver.recv().await {
105 Ok(event) => {
106 if (self.filter)(&event) {
107 return Some(event);
108 }
109 }
110 Err(_) => return None,
111 }
112 }
113 }
114}
115
116pub mod events {
121 pub const BOX_READY: &str = "box.ready";
123 pub const BOX_ERROR: &str = "box.error";
124 pub const BOX_TIMEOUT: &str = "box.timeout";
125
126 pub const POOL_VM_CREATED: &str = "pool.vm.created";
128 pub const POOL_VM_ACQUIRED: &str = "pool.vm.acquired";
129 pub const POOL_VM_RELEASED: &str = "pool.vm.released";
130 pub const POOL_VM_EVICTED: &str = "pool.vm.evicted";
131 pub const POOL_REPLENISH: &str = "pool.replenish";
132 pub const POOL_DRAINED: &str = "pool.drained";
133
134 pub const CACHE_HIT: &str = "cache.hit";
136 pub const CACHE_MISS: &str = "cache.miss";
137 pub const CACHE_PRUNED: &str = "cache.pruned";
138
139 pub const EXEC_COMMAND_STARTED: &str = "exec.command.started";
141 pub const EXEC_COMMAND_COMPLETED: &str = "exec.command.completed";
142 pub const EXEC_COMMAND_FAILED: &str = "exec.command.failed";
143 pub const EXEC_COMMAND_TIMEOUT: &str = "exec.command.timeout";
144
145 pub const BOX_RESTARTING: &str = "box.restarting";
147 pub const BOX_RESTARTED: &str = "box.restarted";
148 pub const BOX_RESTART_FAILED: &str = "box.restart.failed";
149 pub const BOX_RESTART_BACKOFF: &str = "box.restart.backoff";
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155
156 #[test]
157 fn test_box_event_new() {
158 let event = BoxEvent::new("test.event", EventPayload::Empty);
159
160 assert_eq!(event.key, "test.event");
161 assert!(matches!(event.payload, EventPayload::Empty));
162 }
163
164 #[test]
165 fn test_box_event_empty() {
166 let event = BoxEvent::empty("box.ready");
167
168 assert_eq!(event.key, "box.ready");
169 assert!(matches!(event.payload, EventPayload::Empty));
170 }
171
172 #[test]
173 fn test_box_event_with_string() {
174 let event = BoxEvent::with_string("box.error", "Connection lost");
175
176 assert_eq!(event.key, "box.error");
177 if let EventPayload::String(msg) = &event.payload {
178 assert_eq!(msg, "Connection lost");
179 } else {
180 panic!("Expected string payload");
181 }
182 }
183
184 #[test]
185 fn test_box_event_with_map() {
186 let mut map = HashMap::new();
187 map.insert("box_id".to_string(), serde_json::json!("box-123"));
188 map.insert("vcpus".to_string(), serde_json::json!(4));
189
190 let event = BoxEvent::with_map("box.ready", map);
191
192 assert_eq!(event.key, "box.ready");
193 if let EventPayload::Map(m) = &event.payload {
194 assert_eq!(m.get("box_id").unwrap(), &serde_json::json!("box-123"));
195 assert_eq!(m.get("vcpus").unwrap(), &serde_json::json!(4));
196 } else {
197 panic!("Expected map payload");
198 }
199 }
200
201 #[test]
202 fn test_box_event_timestamp() {
203 let before = chrono::Utc::now();
204 let event = BoxEvent::empty("test.event");
205 let after = chrono::Utc::now();
206
207 assert!(event.timestamp >= before);
208 assert!(event.timestamp <= after);
209 }
210
211 #[test]
212 fn test_event_emitter_new() {
213 let emitter = EventEmitter::new(100);
214 let _receiver = emitter.subscribe();
216 }
217
218 #[test]
219 fn test_event_emitter_clone() {
220 let emitter = EventEmitter::new(100);
221 let cloned = emitter.clone();
222
223 emitter.emit(BoxEvent::empty("test.1"));
225 cloned.emit(BoxEvent::empty("test.2"));
226 }
227
228 #[tokio::test]
229 async fn test_event_emitter_subscribe() {
230 let emitter = EventEmitter::new(100);
231 let mut receiver = emitter.subscribe();
232
233 emitter.emit(BoxEvent::empty("test.event"));
234
235 let event = receiver.recv().await.unwrap();
236 assert_eq!(event.key, "test.event");
237 }
238
239 #[tokio::test]
240 async fn test_event_emitter_multiple_subscribers() {
241 let emitter = EventEmitter::new(100);
242 let mut receiver1 = emitter.subscribe();
243 let mut receiver2 = emitter.subscribe();
244
245 emitter.emit(BoxEvent::with_string("broadcast", "hello"));
246
247 let event1 = receiver1.recv().await.unwrap();
248 let event2 = receiver2.recv().await.unwrap();
249
250 assert_eq!(event1.key, "broadcast");
251 assert_eq!(event2.key, "broadcast");
252 }
253
254 #[tokio::test]
255 async fn test_event_emitter_multiple_events() {
256 let emitter = EventEmitter::new(100);
257 let mut receiver = emitter.subscribe();
258
259 emitter.emit(BoxEvent::empty("event.1"));
260 emitter.emit(BoxEvent::empty("event.2"));
261 emitter.emit(BoxEvent::empty("event.3"));
262
263 assert_eq!(receiver.recv().await.unwrap().key, "event.1");
264 assert_eq!(receiver.recv().await.unwrap().key, "event.2");
265 assert_eq!(receiver.recv().await.unwrap().key, "event.3");
266 }
267
268 #[tokio::test]
269 async fn test_event_stream_filtered() {
270 let emitter = EventEmitter::new(100);
271 let mut stream = emitter.subscribe_filtered(|e| e.key.starts_with("box."));
272
273 emitter.emit(BoxEvent::empty("box.ready"));
274 emitter.emit(BoxEvent::empty("other.event"));
275 emitter.emit(BoxEvent::empty("box.error"));
276
277 let event1 = stream.recv().await.unwrap();
279 assert_eq!(event1.key, "box.ready");
280
281 let event2 = stream.recv().await.unwrap();
282 assert_eq!(event2.key, "box.error");
283 }
284
285 #[tokio::test]
286 async fn test_event_stream_filter_by_key() {
287 let emitter = EventEmitter::new(100);
288 let mut stream = emitter.subscribe_filtered(|e| e.key == events::BOX_READY);
289
290 emitter.emit(BoxEvent::empty(events::BOX_ERROR));
291 emitter.emit(BoxEvent::empty(events::BOX_READY));
292 emitter.emit(BoxEvent::empty(events::BOX_TIMEOUT));
293
294 let event = stream.recv().await.unwrap();
295 assert_eq!(event.key, events::BOX_READY);
296 }
297
298 #[test]
299 fn test_event_payload_empty_serialization() {
300 let payload = EventPayload::Empty;
301 let json = serde_json::to_string(&payload).unwrap();
302 let parsed: EventPayload = serde_json::from_str(&json).unwrap();
303 assert!(matches!(parsed, EventPayload::Empty));
304 }
305
306 #[test]
307 fn test_event_payload_string_serialization() {
308 let payload = EventPayload::String("test message".to_string());
309 let json = serde_json::to_string(&payload).unwrap();
310 let parsed: EventPayload = serde_json::from_str(&json).unwrap();
311
312 if let EventPayload::String(s) = parsed {
313 assert_eq!(s, "test message");
314 } else {
315 panic!("Expected string payload");
316 }
317 }
318
319 #[test]
320 fn test_event_payload_map_serialization() {
321 let mut map = HashMap::new();
322 map.insert("key1".to_string(), serde_json::json!("value1"));
323 map.insert("key2".to_string(), serde_json::json!(42));
324
325 let payload = EventPayload::Map(map);
326 let json = serde_json::to_string(&payload).unwrap();
327 let parsed: EventPayload = serde_json::from_str(&json).unwrap();
328
329 if let EventPayload::Map(m) = parsed {
330 assert_eq!(m.get("key1").unwrap(), &serde_json::json!("value1"));
331 assert_eq!(m.get("key2").unwrap(), &serde_json::json!(42));
332 } else {
333 panic!("Expected map payload");
334 }
335 }
336
337 #[test]
338 fn test_box_event_serialization() {
339 let event = BoxEvent::with_string("test.event", "hello");
340 let json = serde_json::to_string(&event).unwrap();
341
342 assert!(json.contains("test.event"));
343 assert!(json.contains("hello"));
344 assert!(json.contains("timestamp"));
345
346 let parsed: BoxEvent = serde_json::from_str(&json).unwrap();
347 assert_eq!(parsed.key, "test.event");
348 }
349
350 #[test]
351 fn test_box_event_debug() {
352 let event = BoxEvent::empty("debug.test");
353 let debug_str = format!("{:?}", event);
354
355 assert!(debug_str.contains("BoxEvent"));
356 assert!(debug_str.contains("debug.test"));
357 }
358
359 #[test]
360 fn test_box_event_clone() {
361 let event = BoxEvent::with_string("clone.test", "original");
362 let cloned = event.clone();
363
364 assert_eq!(event.key, cloned.key);
365 assert_eq!(event.timestamp, cloned.timestamp);
366 }
367
368 #[test]
369 fn test_event_catalog_box_events() {
370 assert_eq!(events::BOX_READY, "box.ready");
371 assert_eq!(events::BOX_ERROR, "box.error");
372 assert_eq!(events::BOX_TIMEOUT, "box.timeout");
373 }
374
375 #[test]
376 fn test_event_catalog_pool_events() {
377 assert_eq!(events::POOL_VM_CREATED, "pool.vm.created");
378 assert_eq!(events::POOL_VM_ACQUIRED, "pool.vm.acquired");
379 assert_eq!(events::POOL_VM_RELEASED, "pool.vm.released");
380 assert_eq!(events::POOL_VM_EVICTED, "pool.vm.evicted");
381 assert_eq!(events::POOL_REPLENISH, "pool.replenish");
382 assert_eq!(events::POOL_DRAINED, "pool.drained");
383 }
384
385 #[test]
386 fn test_event_catalog_cache_events() {
387 assert_eq!(events::CACHE_HIT, "cache.hit");
388 assert_eq!(events::CACHE_MISS, "cache.miss");
389 assert_eq!(events::CACHE_PRUNED, "cache.pruned");
390 }
391
392 #[test]
393 fn test_event_catalog_exec_events() {
394 assert_eq!(events::EXEC_COMMAND_STARTED, "exec.command.started");
395 assert_eq!(events::EXEC_COMMAND_COMPLETED, "exec.command.completed");
396 assert_eq!(events::EXEC_COMMAND_FAILED, "exec.command.failed");
397 assert_eq!(events::EXEC_COMMAND_TIMEOUT, "exec.command.timeout");
398 }
399
400 #[test]
401 fn test_event_catalog_restart_events() {
402 assert_eq!(events::BOX_RESTARTING, "box.restarting");
403 assert_eq!(events::BOX_RESTARTED, "box.restarted");
404 assert_eq!(events::BOX_RESTART_FAILED, "box.restart.failed");
405 assert_eq!(events::BOX_RESTART_BACKOFF, "box.restart.backoff");
406 }
407
408 #[test]
409 fn test_event_key_naming_convention() {
410 let all_events = vec![
412 events::BOX_READY,
413 events::BOX_ERROR,
414 events::BOX_TIMEOUT,
415 events::BOX_RESTARTING,
416 events::BOX_RESTARTED,
417 events::BOX_RESTART_FAILED,
418 events::BOX_RESTART_BACKOFF,
419 events::POOL_VM_CREATED,
420 events::POOL_VM_ACQUIRED,
421 events::POOL_VM_RELEASED,
422 events::POOL_VM_EVICTED,
423 events::POOL_REPLENISH,
424 events::POOL_DRAINED,
425 events::CACHE_HIT,
426 events::CACHE_MISS,
427 events::CACHE_PRUNED,
428 events::EXEC_COMMAND_STARTED,
429 events::EXEC_COMMAND_COMPLETED,
430 events::EXEC_COMMAND_FAILED,
431 events::EXEC_COMMAND_TIMEOUT,
432 ];
433
434 for event_key in all_events {
435 assert!(event_key.chars().all(|c| c.is_lowercase() || c == '.'));
436 assert!(event_key.contains('.'));
437 }
438 }
439}