Skip to main content

a3s_box_core/
event.rs

1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3use std::sync::Arc;
4use tokio::sync::broadcast;
5
6/// Event key type
7pub type EventKey = String;
8
9/// Event payload
10#[derive(Debug, Clone, Serialize, Deserialize)]
11#[serde(untagged)]
12pub enum EventPayload {
13    Empty,
14    String(String),
15    Map(HashMap<String, serde_json::Value>),
16}
17
18/// Box event
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct BoxEvent {
21    /// Event key (e.g., "box.ready", "box.error")
22    pub key: EventKey,
23
24    /// Event payload
25    pub payload: EventPayload,
26
27    /// Timestamp
28    pub timestamp: chrono::DateTime<chrono::Utc>,
29}
30
31impl BoxEvent {
32    /// Create a new event
33    pub fn new(key: impl Into<String>, payload: EventPayload) -> Self {
34        Self {
35            key: key.into(),
36            payload,
37            timestamp: chrono::Utc::now(),
38        }
39    }
40
41    /// Create an event with no payload
42    pub fn empty(key: impl Into<String>) -> Self {
43        Self::new(key, EventPayload::Empty)
44    }
45
46    /// Create an event with a string payload
47    pub fn with_string(key: impl Into<String>, message: impl Into<String>) -> Self {
48        Self::new(key, EventPayload::String(message.into()))
49    }
50
51    /// Create an event with a map payload
52    pub fn with_map(key: impl Into<String>, map: HashMap<String, serde_json::Value>) -> Self {
53        Self::new(key, EventPayload::Map(map))
54    }
55}
56
57/// Event emitter
58#[derive(Clone)]
59pub struct EventEmitter {
60    sender: Arc<broadcast::Sender<BoxEvent>>,
61}
62
63impl EventEmitter {
64    /// Create a new event emitter
65    pub fn new(capacity: usize) -> Self {
66        let (sender, _) = broadcast::channel(capacity);
67        Self {
68            sender: Arc::new(sender),
69        }
70    }
71
72    /// Emit an event
73    pub fn emit(&self, event: BoxEvent) {
74        let _ = self.sender.send(event);
75    }
76
77    /// Subscribe to events
78    pub fn subscribe(&self) -> broadcast::Receiver<BoxEvent> {
79        self.sender.subscribe()
80    }
81
82    /// Subscribe to events with a filter
83    pub fn subscribe_filtered(
84        &self,
85        filter: impl Fn(&BoxEvent) -> bool + Send + Sync + 'static,
86    ) -> EventStream {
87        EventStream {
88            receiver: self.sender.subscribe(),
89            filter: Arc::new(filter),
90        }
91    }
92}
93
94/// Event stream with filtering
95pub struct EventStream {
96    receiver: broadcast::Receiver<BoxEvent>,
97    filter: Arc<dyn Fn(&BoxEvent) -> bool + Send + Sync>,
98}
99
100impl EventStream {
101    /// Receive the next matching event
102    pub async fn recv(&mut self) -> Option<BoxEvent> {
103        loop {
104            match self.receiver.recv().await {
105                Ok(event) => {
106                    if (self.filter)(&event) {
107                        return Some(event);
108                    }
109                }
110                Err(_) => return None,
111            }
112        }
113    }
114}
115
116/// Event catalog - predefined event keys for Box runtime events.
117///
118/// Agent-level events (session, prompt, skill, queue, context) belong
119/// in the a3s-code crate, not the Box runtime.
120pub mod events {
121    // Box lifecycle events
122    pub const BOX_READY: &str = "box.ready";
123    pub const BOX_ERROR: &str = "box.error";
124    pub const BOX_TIMEOUT: &str = "box.timeout";
125
126    // Warm pool events
127    pub const POOL_VM_CREATED: &str = "pool.vm.created";
128    pub const POOL_VM_ACQUIRED: &str = "pool.vm.acquired";
129    pub const POOL_VM_RELEASED: &str = "pool.vm.released";
130    pub const POOL_VM_EVICTED: &str = "pool.vm.evicted";
131    pub const POOL_REPLENISH: &str = "pool.replenish";
132    pub const POOL_DRAINED: &str = "pool.drained";
133
134    // Cache events
135    pub const CACHE_HIT: &str = "cache.hit";
136    pub const CACHE_MISS: &str = "cache.miss";
137    pub const CACHE_PRUNED: &str = "cache.pruned";
138
139    // Exec events
140    pub const EXEC_COMMAND_STARTED: &str = "exec.command.started";
141    pub const EXEC_COMMAND_COMPLETED: &str = "exec.command.completed";
142    pub const EXEC_COMMAND_FAILED: &str = "exec.command.failed";
143    pub const EXEC_COMMAND_TIMEOUT: &str = "exec.command.timeout";
144
145    // Restart events
146    pub const BOX_RESTARTING: &str = "box.restarting";
147    pub const BOX_RESTARTED: &str = "box.restarted";
148    pub const BOX_RESTART_FAILED: &str = "box.restart.failed";
149    pub const BOX_RESTART_BACKOFF: &str = "box.restart.backoff";
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155
156    #[test]
157    fn test_box_event_new() {
158        let event = BoxEvent::new("test.event", EventPayload::Empty);
159
160        assert_eq!(event.key, "test.event");
161        assert!(matches!(event.payload, EventPayload::Empty));
162    }
163
164    #[test]
165    fn test_box_event_empty() {
166        let event = BoxEvent::empty("box.ready");
167
168        assert_eq!(event.key, "box.ready");
169        assert!(matches!(event.payload, EventPayload::Empty));
170    }
171
172    #[test]
173    fn test_box_event_with_string() {
174        let event = BoxEvent::with_string("box.error", "Connection lost");
175
176        assert_eq!(event.key, "box.error");
177        if let EventPayload::String(msg) = &event.payload {
178            assert_eq!(msg, "Connection lost");
179        } else {
180            panic!("Expected string payload");
181        }
182    }
183
184    #[test]
185    fn test_box_event_with_map() {
186        let mut map = HashMap::new();
187        map.insert("box_id".to_string(), serde_json::json!("box-123"));
188        map.insert("vcpus".to_string(), serde_json::json!(4));
189
190        let event = BoxEvent::with_map("box.ready", map);
191
192        assert_eq!(event.key, "box.ready");
193        if let EventPayload::Map(m) = &event.payload {
194            assert_eq!(m.get("box_id").unwrap(), &serde_json::json!("box-123"));
195            assert_eq!(m.get("vcpus").unwrap(), &serde_json::json!(4));
196        } else {
197            panic!("Expected map payload");
198        }
199    }
200
201    #[test]
202    fn test_box_event_timestamp() {
203        let before = chrono::Utc::now();
204        let event = BoxEvent::empty("test.event");
205        let after = chrono::Utc::now();
206
207        assert!(event.timestamp >= before);
208        assert!(event.timestamp <= after);
209    }
210
211    #[test]
212    fn test_event_emitter_new() {
213        let emitter = EventEmitter::new(100);
214        // Should not panic
215        let _receiver = emitter.subscribe();
216    }
217
218    #[test]
219    fn test_event_emitter_clone() {
220        let emitter = EventEmitter::new(100);
221        let cloned = emitter.clone();
222
223        // Both should work
224        emitter.emit(BoxEvent::empty("test.1"));
225        cloned.emit(BoxEvent::empty("test.2"));
226    }
227
228    #[tokio::test]
229    async fn test_event_emitter_subscribe() {
230        let emitter = EventEmitter::new(100);
231        let mut receiver = emitter.subscribe();
232
233        emitter.emit(BoxEvent::empty("test.event"));
234
235        let event = receiver.recv().await.unwrap();
236        assert_eq!(event.key, "test.event");
237    }
238
239    #[tokio::test]
240    async fn test_event_emitter_multiple_subscribers() {
241        let emitter = EventEmitter::new(100);
242        let mut receiver1 = emitter.subscribe();
243        let mut receiver2 = emitter.subscribe();
244
245        emitter.emit(BoxEvent::with_string("broadcast", "hello"));
246
247        let event1 = receiver1.recv().await.unwrap();
248        let event2 = receiver2.recv().await.unwrap();
249
250        assert_eq!(event1.key, "broadcast");
251        assert_eq!(event2.key, "broadcast");
252    }
253
254    #[tokio::test]
255    async fn test_event_emitter_multiple_events() {
256        let emitter = EventEmitter::new(100);
257        let mut receiver = emitter.subscribe();
258
259        emitter.emit(BoxEvent::empty("event.1"));
260        emitter.emit(BoxEvent::empty("event.2"));
261        emitter.emit(BoxEvent::empty("event.3"));
262
263        assert_eq!(receiver.recv().await.unwrap().key, "event.1");
264        assert_eq!(receiver.recv().await.unwrap().key, "event.2");
265        assert_eq!(receiver.recv().await.unwrap().key, "event.3");
266    }
267
268    #[tokio::test]
269    async fn test_event_stream_filtered() {
270        let emitter = EventEmitter::new(100);
271        let mut stream = emitter.subscribe_filtered(|e| e.key.starts_with("box."));
272
273        emitter.emit(BoxEvent::empty("box.ready"));
274        emitter.emit(BoxEvent::empty("other.event"));
275        emitter.emit(BoxEvent::empty("box.error"));
276
277        // Should only receive box events
278        let event1 = stream.recv().await.unwrap();
279        assert_eq!(event1.key, "box.ready");
280
281        let event2 = stream.recv().await.unwrap();
282        assert_eq!(event2.key, "box.error");
283    }
284
285    #[tokio::test]
286    async fn test_event_stream_filter_by_key() {
287        let emitter = EventEmitter::new(100);
288        let mut stream = emitter.subscribe_filtered(|e| e.key == events::BOX_READY);
289
290        emitter.emit(BoxEvent::empty(events::BOX_ERROR));
291        emitter.emit(BoxEvent::empty(events::BOX_READY));
292        emitter.emit(BoxEvent::empty(events::BOX_TIMEOUT));
293
294        let event = stream.recv().await.unwrap();
295        assert_eq!(event.key, events::BOX_READY);
296    }
297
298    #[test]
299    fn test_event_payload_empty_serialization() {
300        let payload = EventPayload::Empty;
301        let json = serde_json::to_string(&payload).unwrap();
302        let parsed: EventPayload = serde_json::from_str(&json).unwrap();
303        assert!(matches!(parsed, EventPayload::Empty));
304    }
305
306    #[test]
307    fn test_event_payload_string_serialization() {
308        let payload = EventPayload::String("test message".to_string());
309        let json = serde_json::to_string(&payload).unwrap();
310        let parsed: EventPayload = serde_json::from_str(&json).unwrap();
311
312        if let EventPayload::String(s) = parsed {
313            assert_eq!(s, "test message");
314        } else {
315            panic!("Expected string payload");
316        }
317    }
318
319    #[test]
320    fn test_event_payload_map_serialization() {
321        let mut map = HashMap::new();
322        map.insert("key1".to_string(), serde_json::json!("value1"));
323        map.insert("key2".to_string(), serde_json::json!(42));
324
325        let payload = EventPayload::Map(map);
326        let json = serde_json::to_string(&payload).unwrap();
327        let parsed: EventPayload = serde_json::from_str(&json).unwrap();
328
329        if let EventPayload::Map(m) = parsed {
330            assert_eq!(m.get("key1").unwrap(), &serde_json::json!("value1"));
331            assert_eq!(m.get("key2").unwrap(), &serde_json::json!(42));
332        } else {
333            panic!("Expected map payload");
334        }
335    }
336
337    #[test]
338    fn test_box_event_serialization() {
339        let event = BoxEvent::with_string("test.event", "hello");
340        let json = serde_json::to_string(&event).unwrap();
341
342        assert!(json.contains("test.event"));
343        assert!(json.contains("hello"));
344        assert!(json.contains("timestamp"));
345
346        let parsed: BoxEvent = serde_json::from_str(&json).unwrap();
347        assert_eq!(parsed.key, "test.event");
348    }
349
350    #[test]
351    fn test_box_event_debug() {
352        let event = BoxEvent::empty("debug.test");
353        let debug_str = format!("{:?}", event);
354
355        assert!(debug_str.contains("BoxEvent"));
356        assert!(debug_str.contains("debug.test"));
357    }
358
359    #[test]
360    fn test_box_event_clone() {
361        let event = BoxEvent::with_string("clone.test", "original");
362        let cloned = event.clone();
363
364        assert_eq!(event.key, cloned.key);
365        assert_eq!(event.timestamp, cloned.timestamp);
366    }
367
368    #[test]
369    fn test_event_catalog_box_events() {
370        assert_eq!(events::BOX_READY, "box.ready");
371        assert_eq!(events::BOX_ERROR, "box.error");
372        assert_eq!(events::BOX_TIMEOUT, "box.timeout");
373    }
374
375    #[test]
376    fn test_event_catalog_pool_events() {
377        assert_eq!(events::POOL_VM_CREATED, "pool.vm.created");
378        assert_eq!(events::POOL_VM_ACQUIRED, "pool.vm.acquired");
379        assert_eq!(events::POOL_VM_RELEASED, "pool.vm.released");
380        assert_eq!(events::POOL_VM_EVICTED, "pool.vm.evicted");
381        assert_eq!(events::POOL_REPLENISH, "pool.replenish");
382        assert_eq!(events::POOL_DRAINED, "pool.drained");
383    }
384
385    #[test]
386    fn test_event_catalog_cache_events() {
387        assert_eq!(events::CACHE_HIT, "cache.hit");
388        assert_eq!(events::CACHE_MISS, "cache.miss");
389        assert_eq!(events::CACHE_PRUNED, "cache.pruned");
390    }
391
392    #[test]
393    fn test_event_catalog_exec_events() {
394        assert_eq!(events::EXEC_COMMAND_STARTED, "exec.command.started");
395        assert_eq!(events::EXEC_COMMAND_COMPLETED, "exec.command.completed");
396        assert_eq!(events::EXEC_COMMAND_FAILED, "exec.command.failed");
397        assert_eq!(events::EXEC_COMMAND_TIMEOUT, "exec.command.timeout");
398    }
399
400    #[test]
401    fn test_event_catalog_restart_events() {
402        assert_eq!(events::BOX_RESTARTING, "box.restarting");
403        assert_eq!(events::BOX_RESTARTED, "box.restarted");
404        assert_eq!(events::BOX_RESTART_FAILED, "box.restart.failed");
405        assert_eq!(events::BOX_RESTART_BACKOFF, "box.restart.backoff");
406    }
407
408    #[test]
409    fn test_event_key_naming_convention() {
410        // All event keys should follow dot-separated lowercase format
411        let all_events = vec![
412            events::BOX_READY,
413            events::BOX_ERROR,
414            events::BOX_TIMEOUT,
415            events::BOX_RESTARTING,
416            events::BOX_RESTARTED,
417            events::BOX_RESTART_FAILED,
418            events::BOX_RESTART_BACKOFF,
419            events::POOL_VM_CREATED,
420            events::POOL_VM_ACQUIRED,
421            events::POOL_VM_RELEASED,
422            events::POOL_VM_EVICTED,
423            events::POOL_REPLENISH,
424            events::POOL_DRAINED,
425            events::CACHE_HIT,
426            events::CACHE_MISS,
427            events::CACHE_PRUNED,
428            events::EXEC_COMMAND_STARTED,
429            events::EXEC_COMMAND_COMPLETED,
430            events::EXEC_COMMAND_FAILED,
431            events::EXEC_COMMAND_TIMEOUT,
432        ];
433
434        for event_key in all_events {
435            assert!(event_key.chars().all(|c| c.is_lowercase() || c == '.'));
436            assert!(event_key.contains('.'));
437        }
438    }
439}