code_mesh_core/
events.rs

1//! Event system for Code Mesh Core
2//!
3//! This module provides a comprehensive event system for communication
4//! between different components of the Code Mesh ecosystem.
5
6use crate::{Error, Result};
7use async_trait::async_trait;
8use serde::{Deserialize, Serialize};
9use std::any::{Any, TypeId};
10use std::collections::HashMap;
11use std::sync::Arc;
12use uuid::Uuid;
13
14#[cfg(feature = "native")]
15use tokio::sync::{broadcast, RwLock};
16
17#[cfg(feature = "wasm")]
18use parking_lot::RwLock;
19
20/// Event trait that all events must implement
21pub trait Event: Send + Sync + Clone + std::fmt::Debug + 'static {
22    /// Event type identifier
23    fn event_type(&self) -> &'static str;
24    
25    /// Event priority
26    fn priority(&self) -> EventPriority {
27        EventPriority::Normal
28    }
29    
30    /// Whether this event should be persisted
31    fn persistent(&self) -> bool {
32        false
33    }
34}
35
36/// Event priority levels
37#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
38pub enum EventPriority {
39    Low = 0,
40    Normal = 1,
41    High = 2,
42    Critical = 3,
43}
44
45/// Event handler trait
46#[async_trait]
47pub trait EventHandler<E: Event>: Send + Sync {
48    /// Handle an event
49    async fn handle(&self, event: &E) -> Result<()>;
50    
51    /// Handler priority (higher values are called first)
52    fn priority(&self) -> i32 {
53        0
54    }
55    
56    /// Whether this handler should receive events before others
57    fn early(&self) -> bool {
58        false
59    }
60}
61
62/// Boxed event handler that can handle any event
63type BoxedHandler = Box<dyn EventHandlerDyn + Send + Sync>;
64
65/// Dynamic event handler trait for type erasure
66#[async_trait]
67trait EventHandlerDyn {
68    async fn handle_dyn(&self, event: &(dyn Any + Send + Sync)) -> Result<()>;
69    fn priority(&self) -> i32;
70    fn early(&self) -> bool;
71}
72
73/// Wrapper to implement EventHandlerDyn for concrete handlers
74struct EventHandlerWrapper<E: Event, H: EventHandler<E>> {
75    handler: H,
76    _phantom: std::marker::PhantomData<E>,
77}
78
79#[async_trait]
80impl<E: Event, H: EventHandler<E>> EventHandlerDyn for EventHandlerWrapper<E, H> {
81    async fn handle_dyn(&self, event: &(dyn Any + Send + Sync)) -> Result<()> {
82        if let Some(typed_event) = event.downcast_ref::<E>() {
83            self.handler.handle(typed_event).await
84        } else {
85            Err(Error::Other(anyhow::anyhow!("Event type mismatch")))
86        }
87    }
88
89    fn priority(&self) -> i32 {
90        self.handler.priority()
91    }
92
93    fn early(&self) -> bool {
94        self.handler.early()
95    }
96}
97
98/// Event bus for managing event distribution
99pub struct EventBus {
100    #[cfg(feature = "native")]
101    handlers: RwLock<HashMap<TypeId, Vec<BoxedHandler>>>,
102    
103    #[cfg(feature = "native")]
104    broadcast_senders: RwLock<HashMap<TypeId, broadcast::Sender<Arc<dyn Any + Send + Sync>>>>,
105    
106    #[cfg(feature = "wasm")]
107    handlers: RwLock<HashMap<TypeId, Vec<BoxedHandler>>>,
108    
109    /// Maximum number of queued events per type
110    max_queue_size: usize,
111    
112    /// Whether to enable event tracing
113    tracing_enabled: bool,
114}
115
116impl EventBus {
117    /// Create a new event bus
118    pub fn new() -> Self {
119        Self {
120            handlers: RwLock::new(HashMap::new()),
121            #[cfg(feature = "native")]
122            broadcast_senders: RwLock::new(HashMap::new()),
123            max_queue_size: 1000,
124            tracing_enabled: true,
125        }
126    }
127
128    /// Create a new event bus with custom configuration
129    pub fn with_config(max_queue_size: usize, tracing_enabled: bool) -> Self {
130        Self {
131            handlers: RwLock::new(HashMap::new()),
132            #[cfg(feature = "native")]
133            broadcast_senders: RwLock::new(HashMap::new()),
134            max_queue_size,
135            tracing_enabled,
136        }
137    }
138
139    /// Subscribe to events of a specific type
140    pub async fn subscribe<E: Event, H: EventHandler<E> + 'static>(&self, handler: H) -> Result<()> {
141        let type_id = TypeId::of::<E>();
142        let boxed_handler = Box::new(EventHandlerWrapper {
143            handler,
144            _phantom: std::marker::PhantomData::<E>,
145        });
146
147        #[cfg(feature = "native")]
148        {
149            let mut handlers = self.handlers.write().await;
150            let handlers_list = handlers.entry(type_id).or_insert_with(Vec::new);
151            handlers_list.push(boxed_handler);
152            
153            // Sort by priority and early flag
154            handlers_list.sort_by(|a, b| {
155                match (a.early(), b.early()) {
156                    (true, false) => std::cmp::Ordering::Less,
157                    (false, true) => std::cmp::Ordering::Greater,
158                    _ => b.priority().cmp(&a.priority()),
159                }
160            });
161        }
162
163        #[cfg(feature = "wasm")]
164        {
165            let mut handlers = self.handlers.write();
166            let handlers_list = handlers.entry(type_id).or_insert_with(Vec::new);
167            handlers_list.push(boxed_handler);
168            
169            // Sort by priority and early flag
170            handlers_list.sort_by(|a, b| {
171                match (a.early(), b.early()) {
172                    (true, false) => std::cmp::Ordering::Less,
173                    (false, true) => std::cmp::Ordering::Greater,
174                    _ => b.priority().cmp(&a.priority()),
175                }
176            });
177        }
178
179        if self.tracing_enabled {
180            tracing::debug!("Subscribed to event type: {}", std::any::type_name::<E>());
181        }
182
183        Ok(())
184    }
185
186    /// Publish an event to all subscribers
187    pub async fn publish<E: Event>(&self, event: E) -> Result<()> {
188        let type_id = TypeId::of::<E>();
189        
190        if self.tracing_enabled {
191            tracing::debug!(
192                "Publishing event: {} with priority: {:?}",
193                event.event_type(),
194                event.priority()
195            );
196        }
197
198        // Handle direct subscriptions
199        #[cfg(feature = "native")]
200        {
201            let handlers = self.handlers.read().await;
202            if let Some(handlers_list) = handlers.get(&type_id) {
203                for handler in handlers_list {
204                    if let Err(e) = handler.handle_dyn(&event as &(dyn Any + Send + Sync)).await {
205                        tracing::error!("Error handling event: {}", e);
206                        // Continue processing other handlers
207                    }
208                }
209            }
210
211            // Handle broadcast subscriptions
212            let senders = self.broadcast_senders.read().await;
213            if let Some(sender) = senders.get(&type_id) {
214                let arc_event: Arc<dyn Any + Send + Sync> = Arc::new(event.clone());
215                if sender.send(arc_event).is_err() {
216                    // No receivers, which is fine
217                }
218            }
219        }
220
221        #[cfg(feature = "wasm")]
222        {
223            let handlers = self.handlers.read();
224            if let Some(handlers_list) = handlers.get(&type_id) {
225                for handler in handlers_list {
226                    if let Err(e) = handler.handle_dyn(&event as &(dyn Any + Send + Sync)).await {
227                        tracing::error!("Error handling event: {}", e);
228                        // Continue processing other handlers
229                    }
230                }
231            }
232        }
233
234        Ok(())
235    }
236
237    /// Create a broadcast channel for streaming events
238    #[cfg(feature = "native")]
239    pub async fn create_stream<E: Event>(&self) -> broadcast::Receiver<Arc<dyn Any + Send + Sync>> {
240        let type_id = TypeId::of::<E>();
241        
242        let mut senders = self.broadcast_senders.write().await;
243        let sender = senders.entry(type_id).or_insert_with(|| {
244            let (sender, _) = broadcast::channel(self.max_queue_size);
245            sender
246        });
247        
248        sender.subscribe()
249    }
250
251    /// Unsubscribe from all events (clears all handlers)
252    pub async fn clear(&self) {
253        #[cfg(feature = "native")]
254        {
255            self.handlers.write().await.clear();
256            self.broadcast_senders.write().await.clear();
257        }
258
259        #[cfg(feature = "wasm")]
260        {
261            self.handlers.write().clear();
262        }
263    }
264
265    /// Get the number of handlers for a specific event type
266    pub async fn handler_count<E: Event>(&self) -> usize {
267        let type_id = TypeId::of::<E>();
268        
269        #[cfg(feature = "native")]
270        {
271            self.handlers.read().await
272                .get(&type_id)
273                .map(|h| h.len())
274                .unwrap_or(0)
275        }
276
277        #[cfg(feature = "wasm")]
278        {
279            self.handlers.read()
280                .get(&type_id)
281                .map(|h| h.len())
282                .unwrap_or(0)
283        }
284    }
285}
286
287impl Default for EventBus {
288    fn default() -> Self {
289        Self::new()
290    }
291}
292
293/// Common events used throughout Code Mesh
294pub mod events {
295    use super::*;
296    use chrono::{DateTime, Utc};
297
298    /// Session-related events
299    #[derive(Debug, Clone, Serialize, Deserialize)]
300    pub struct SessionCreated {
301        pub session_id: String,
302        pub timestamp: DateTime<Utc>,
303        pub metadata: HashMap<String, serde_json::Value>,
304    }
305
306    impl Event for SessionCreated {
307        fn event_type(&self) -> &'static str {
308            "session.created"
309        }
310
311        fn persistent(&self) -> bool {
312            true
313        }
314    }
315
316    #[derive(Debug, Clone, Serialize, Deserialize)]
317    pub struct SessionEnded {
318        pub session_id: String,
319        pub timestamp: DateTime<Utc>,
320        pub reason: String,
321    }
322
323    impl Event for SessionEnded {
324        fn event_type(&self) -> &'static str {
325            "session.ended"
326        }
327
328        fn persistent(&self) -> bool {
329            true
330        }
331    }
332
333    /// Message-related events
334    #[derive(Debug, Clone, Serialize, Deserialize)]
335    pub struct MessageSent {
336        pub session_id: String,
337        pub message_id: String,
338        pub role: String,
339        pub content: String,
340        pub timestamp: DateTime<Utc>,
341    }
342
343    impl Event for MessageSent {
344        fn event_type(&self) -> &'static str {
345            "message.sent"
346        }
347
348        fn priority(&self) -> EventPriority {
349            EventPriority::Normal
350        }
351    }
352
353    #[derive(Debug, Clone, Serialize, Deserialize)]
354    pub struct MessageReceived {
355        pub session_id: String,
356        pub message_id: String,
357        pub role: String,
358        pub content: String,
359        pub timestamp: DateTime<Utc>,
360        pub tokens_used: Option<u32>,
361    }
362
363    impl Event for MessageReceived {
364        fn event_type(&self) -> &'static str {
365            "message.received"
366        }
367
368        fn priority(&self) -> EventPriority {
369            EventPriority::Normal
370        }
371    }
372
373    /// Tool-related events
374    #[derive(Debug, Clone, Serialize, Deserialize)]
375    pub struct ToolExecuted {
376        pub session_id: String,
377        pub tool_id: String,
378        pub tool_name: String,
379        pub arguments: serde_json::Value,
380        pub result: serde_json::Value,
381        pub duration_ms: u64,
382        pub timestamp: DateTime<Utc>,
383    }
384
385    impl Event for ToolExecuted {
386        fn event_type(&self) -> &'static str {
387            "tool.executed"
388        }
389
390        fn priority(&self) -> EventPriority {
391            EventPriority::Normal
392        }
393
394        fn persistent(&self) -> bool {
395            true
396        }
397    }
398
399    #[derive(Debug, Clone, Serialize, Deserialize)]
400    pub struct ToolFailed {
401        pub session_id: String,
402        pub tool_id: String,
403        pub tool_name: String,
404        pub arguments: serde_json::Value,
405        pub error: String,
406        pub duration_ms: u64,
407        pub timestamp: DateTime<Utc>,
408    }
409
410    impl Event for ToolFailed {
411        fn event_type(&self) -> &'static str {
412            "tool.failed"
413        }
414
415        fn priority(&self) -> EventPriority {
416            EventPriority::High
417        }
418
419        fn persistent(&self) -> bool {
420            true
421        }
422    }
423
424    /// Provider-related events
425    #[derive(Debug, Clone, Serialize, Deserialize)]
426    pub struct ProviderConnected {
427        pub provider_id: String,
428        pub provider_name: String,
429        pub timestamp: DateTime<Utc>,
430    }
431
432    impl Event for ProviderConnected {
433        fn event_type(&self) -> &'static str {
434            "provider.connected"
435        }
436
437        fn priority(&self) -> EventPriority {
438            EventPriority::High
439        }
440    }
441
442    #[derive(Debug, Clone, Serialize, Deserialize)]
443    pub struct ProviderDisconnected {
444        pub provider_id: String,
445        pub provider_name: String,
446        pub reason: String,
447        pub timestamp: DateTime<Utc>,
448    }
449
450    impl Event for ProviderDisconnected {
451        fn event_type(&self) -> &'static str {
452            "provider.disconnected"
453        }
454
455        fn priority(&self) -> EventPriority {
456            EventPriority::High
457        }
458    }
459
460    /// Storage-related events
461    #[derive(Debug, Clone, Serialize, Deserialize)]
462    pub struct DataStored {
463        pub key: String,
464        pub size_bytes: u64,
465        pub timestamp: DateTime<Utc>,
466    }
467
468    impl Event for DataStored {
469        fn event_type(&self) -> &'static str {
470            "storage.stored"
471        }
472    }
473
474    #[derive(Debug, Clone, Serialize, Deserialize)]
475    pub struct DataRetrieved {
476        pub key: String,
477        pub size_bytes: u64,
478        pub timestamp: DateTime<Utc>,
479    }
480
481    impl Event for DataRetrieved {
482        fn event_type(&self) -> &'static str {
483            "storage.retrieved"
484        }
485    }
486
487    /// Error-related events
488    #[derive(Debug, Clone, Serialize, Deserialize)]
489    pub struct ErrorOccurred {
490        pub error_id: String,
491        pub component: String,
492        pub error_message: String,
493        pub error_code: Option<String>,
494        pub context: HashMap<String, serde_json::Value>,
495        pub timestamp: DateTime<Utc>,
496    }
497
498    impl Event for ErrorOccurred {
499        fn event_type(&self) -> &'static str {
500            "error.occurred"
501        }
502
503        fn priority(&self) -> EventPriority {
504            EventPriority::Critical
505        }
506
507        fn persistent(&self) -> bool {
508            true
509        }
510    }
511
512    /// System-related events
513    #[derive(Debug, Clone, Serialize, Deserialize)]
514    pub struct SystemStarted {
515        pub version: String,
516        pub features: Vec<String>,
517        pub timestamp: DateTime<Utc>,
518    }
519
520    impl Event for SystemStarted {
521        fn event_type(&self) -> &'static str {
522            "system.started"
523        }
524
525        fn priority(&self) -> EventPriority {
526            EventPriority::High
527        }
528
529        fn persistent(&self) -> bool {
530            true
531        }
532    }
533
534    #[derive(Debug, Clone, Serialize, Deserialize)]
535    pub struct SystemShutdown {
536        pub reason: String,
537        pub timestamp: DateTime<Utc>,
538    }
539
540    impl Event for SystemShutdown {
541        fn event_type(&self) -> &'static str {
542            "system.shutdown"
543        }
544
545        fn priority(&self) -> EventPriority {
546            EventPriority::Critical
547        }
548
549        fn persistent(&self) -> bool {
550            true
551        }
552    }
553}
554
555/// Convenience macro for creating simple event handlers
556#[macro_export]
557macro_rules! event_handler {
558    ($event_type:ty, $handler_fn:expr) => {
559        struct SimpleEventHandler {
560            handler: fn(&$event_type) -> Result<()>,
561        }
562
563        #[async_trait]
564        impl EventHandler<$event_type> for SimpleEventHandler {
565            async fn handle(&self, event: &$event_type) -> Result<()> {
566                (self.handler)(event)
567            }
568        }
569
570        SimpleEventHandler {
571            handler: $handler_fn,
572        }
573    };
574}
575
576#[cfg(test)]
577mod tests {
578    use super::*;
579    use std::sync::atomic::{AtomicU32, Ordering};
580
581    #[derive(Debug, Clone)]
582    struct TestEvent {
583        message: String,
584    }
585
586    impl Event for TestEvent {
587        fn event_type(&self) -> &'static str {
588            "test.event"
589        }
590    }
591
592    struct TestHandler {
593        counter: Arc<AtomicU32>,
594    }
595
596    #[async_trait]
597    impl EventHandler<TestEvent> for TestHandler {
598        async fn handle(&self, _event: &TestEvent) -> Result<()> {
599            self.counter.fetch_add(1, Ordering::SeqCst);
600            Ok(())
601        }
602    }
603
604    #[cfg(feature = "native")]
605    #[tokio::test]
606    async fn test_event_bus() {
607        let bus = EventBus::new();
608        let counter = Arc::new(AtomicU32::new(0));
609        
610        let handler = TestHandler {
611            counter: counter.clone(),
612        };
613
614        bus.subscribe(handler).await.unwrap();
615
616        let event = TestEvent {
617            message: "Hello, World!".to_string(),
618        };
619
620        bus.publish(event).await.unwrap();
621
622        assert_eq!(counter.load(Ordering::SeqCst), 1);
623    }
624}