1use crate::{Error, Result};
7use async_trait::async_trait;
8use serde::{Deserialize, Serialize};
9use std::any::{Any, TypeId};
10use std::collections::HashMap;
11use std::sync::Arc;
12use uuid::Uuid;
13
14#[cfg(feature = "native")]
15use tokio::sync::{broadcast, RwLock};
16
17#[cfg(feature = "wasm")]
18use parking_lot::RwLock;
19
20pub trait Event: Send + Sync + Clone + std::fmt::Debug + 'static {
22 fn event_type(&self) -> &'static str;
24
25 fn priority(&self) -> EventPriority {
27 EventPriority::Normal
28 }
29
30 fn persistent(&self) -> bool {
32 false
33 }
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
38pub enum EventPriority {
39 Low = 0,
40 Normal = 1,
41 High = 2,
42 Critical = 3,
43}
44
45#[async_trait]
47pub trait EventHandler<E: Event>: Send + Sync {
48 async fn handle(&self, event: &E) -> Result<()>;
50
51 fn priority(&self) -> i32 {
53 0
54 }
55
56 fn early(&self) -> bool {
58 false
59 }
60}
61
62type BoxedHandler = Box<dyn EventHandlerDyn + Send + Sync>;
64
65#[async_trait]
67trait EventHandlerDyn {
68 async fn handle_dyn(&self, event: &(dyn Any + Send + Sync)) -> Result<()>;
69 fn priority(&self) -> i32;
70 fn early(&self) -> bool;
71}
72
73struct EventHandlerWrapper<E: Event, H: EventHandler<E>> {
75 handler: H,
76 _phantom: std::marker::PhantomData<E>,
77}
78
79#[async_trait]
80impl<E: Event, H: EventHandler<E>> EventHandlerDyn for EventHandlerWrapper<E, H> {
81 async fn handle_dyn(&self, event: &(dyn Any + Send + Sync)) -> Result<()> {
82 if let Some(typed_event) = event.downcast_ref::<E>() {
83 self.handler.handle(typed_event).await
84 } else {
85 Err(Error::Other(anyhow::anyhow!("Event type mismatch")))
86 }
87 }
88
89 fn priority(&self) -> i32 {
90 self.handler.priority()
91 }
92
93 fn early(&self) -> bool {
94 self.handler.early()
95 }
96}
97
98pub struct EventBus {
100 #[cfg(feature = "native")]
101 handlers: RwLock<HashMap<TypeId, Vec<BoxedHandler>>>,
102
103 #[cfg(feature = "native")]
104 broadcast_senders: RwLock<HashMap<TypeId, broadcast::Sender<Arc<dyn Any + Send + Sync>>>>,
105
106 #[cfg(feature = "wasm")]
107 handlers: RwLock<HashMap<TypeId, Vec<BoxedHandler>>>,
108
109 max_queue_size: usize,
111
112 tracing_enabled: bool,
114}
115
116impl EventBus {
117 pub fn new() -> Self {
119 Self {
120 handlers: RwLock::new(HashMap::new()),
121 #[cfg(feature = "native")]
122 broadcast_senders: RwLock::new(HashMap::new()),
123 max_queue_size: 1000,
124 tracing_enabled: true,
125 }
126 }
127
128 pub fn with_config(max_queue_size: usize, tracing_enabled: bool) -> Self {
130 Self {
131 handlers: RwLock::new(HashMap::new()),
132 #[cfg(feature = "native")]
133 broadcast_senders: RwLock::new(HashMap::new()),
134 max_queue_size,
135 tracing_enabled,
136 }
137 }
138
139 pub async fn subscribe<E: Event, H: EventHandler<E> + 'static>(&self, handler: H) -> Result<()> {
141 let type_id = TypeId::of::<E>();
142 let boxed_handler = Box::new(EventHandlerWrapper {
143 handler,
144 _phantom: std::marker::PhantomData::<E>,
145 });
146
147 #[cfg(feature = "native")]
148 {
149 let mut handlers = self.handlers.write().await;
150 let handlers_list = handlers.entry(type_id).or_insert_with(Vec::new);
151 handlers_list.push(boxed_handler);
152
153 handlers_list.sort_by(|a, b| {
155 match (a.early(), b.early()) {
156 (true, false) => std::cmp::Ordering::Less,
157 (false, true) => std::cmp::Ordering::Greater,
158 _ => b.priority().cmp(&a.priority()),
159 }
160 });
161 }
162
163 #[cfg(feature = "wasm")]
164 {
165 let mut handlers = self.handlers.write();
166 let handlers_list = handlers.entry(type_id).or_insert_with(Vec::new);
167 handlers_list.push(boxed_handler);
168
169 handlers_list.sort_by(|a, b| {
171 match (a.early(), b.early()) {
172 (true, false) => std::cmp::Ordering::Less,
173 (false, true) => std::cmp::Ordering::Greater,
174 _ => b.priority().cmp(&a.priority()),
175 }
176 });
177 }
178
179 if self.tracing_enabled {
180 tracing::debug!("Subscribed to event type: {}", std::any::type_name::<E>());
181 }
182
183 Ok(())
184 }
185
186 pub async fn publish<E: Event>(&self, event: E) -> Result<()> {
188 let type_id = TypeId::of::<E>();
189
190 if self.tracing_enabled {
191 tracing::debug!(
192 "Publishing event: {} with priority: {:?}",
193 event.event_type(),
194 event.priority()
195 );
196 }
197
198 #[cfg(feature = "native")]
200 {
201 let handlers = self.handlers.read().await;
202 if let Some(handlers_list) = handlers.get(&type_id) {
203 for handler in handlers_list {
204 if let Err(e) = handler.handle_dyn(&event as &(dyn Any + Send + Sync)).await {
205 tracing::error!("Error handling event: {}", e);
206 }
208 }
209 }
210
211 let senders = self.broadcast_senders.read().await;
213 if let Some(sender) = senders.get(&type_id) {
214 let arc_event: Arc<dyn Any + Send + Sync> = Arc::new(event.clone());
215 if sender.send(arc_event).is_err() {
216 }
218 }
219 }
220
221 #[cfg(feature = "wasm")]
222 {
223 let handlers = self.handlers.read();
224 if let Some(handlers_list) = handlers.get(&type_id) {
225 for handler in handlers_list {
226 if let Err(e) = handler.handle_dyn(&event as &(dyn Any + Send + Sync)).await {
227 tracing::error!("Error handling event: {}", e);
228 }
230 }
231 }
232 }
233
234 Ok(())
235 }
236
237 #[cfg(feature = "native")]
239 pub async fn create_stream<E: Event>(&self) -> broadcast::Receiver<Arc<dyn Any + Send + Sync>> {
240 let type_id = TypeId::of::<E>();
241
242 let mut senders = self.broadcast_senders.write().await;
243 let sender = senders.entry(type_id).or_insert_with(|| {
244 let (sender, _) = broadcast::channel(self.max_queue_size);
245 sender
246 });
247
248 sender.subscribe()
249 }
250
251 pub async fn clear(&self) {
253 #[cfg(feature = "native")]
254 {
255 self.handlers.write().await.clear();
256 self.broadcast_senders.write().await.clear();
257 }
258
259 #[cfg(feature = "wasm")]
260 {
261 self.handlers.write().clear();
262 }
263 }
264
265 pub async fn handler_count<E: Event>(&self) -> usize {
267 let type_id = TypeId::of::<E>();
268
269 #[cfg(feature = "native")]
270 {
271 self.handlers.read().await
272 .get(&type_id)
273 .map(|h| h.len())
274 .unwrap_or(0)
275 }
276
277 #[cfg(feature = "wasm")]
278 {
279 self.handlers.read()
280 .get(&type_id)
281 .map(|h| h.len())
282 .unwrap_or(0)
283 }
284 }
285}
286
287impl Default for EventBus {
288 fn default() -> Self {
289 Self::new()
290 }
291}
292
293pub mod events {
295 use super::*;
296 use chrono::{DateTime, Utc};
297
298 #[derive(Debug, Clone, Serialize, Deserialize)]
300 pub struct SessionCreated {
301 pub session_id: String,
302 pub timestamp: DateTime<Utc>,
303 pub metadata: HashMap<String, serde_json::Value>,
304 }
305
306 impl Event for SessionCreated {
307 fn event_type(&self) -> &'static str {
308 "session.created"
309 }
310
311 fn persistent(&self) -> bool {
312 true
313 }
314 }
315
316 #[derive(Debug, Clone, Serialize, Deserialize)]
317 pub struct SessionEnded {
318 pub session_id: String,
319 pub timestamp: DateTime<Utc>,
320 pub reason: String,
321 }
322
323 impl Event for SessionEnded {
324 fn event_type(&self) -> &'static str {
325 "session.ended"
326 }
327
328 fn persistent(&self) -> bool {
329 true
330 }
331 }
332
333 #[derive(Debug, Clone, Serialize, Deserialize)]
335 pub struct MessageSent {
336 pub session_id: String,
337 pub message_id: String,
338 pub role: String,
339 pub content: String,
340 pub timestamp: DateTime<Utc>,
341 }
342
343 impl Event for MessageSent {
344 fn event_type(&self) -> &'static str {
345 "message.sent"
346 }
347
348 fn priority(&self) -> EventPriority {
349 EventPriority::Normal
350 }
351 }
352
353 #[derive(Debug, Clone, Serialize, Deserialize)]
354 pub struct MessageReceived {
355 pub session_id: String,
356 pub message_id: String,
357 pub role: String,
358 pub content: String,
359 pub timestamp: DateTime<Utc>,
360 pub tokens_used: Option<u32>,
361 }
362
363 impl Event for MessageReceived {
364 fn event_type(&self) -> &'static str {
365 "message.received"
366 }
367
368 fn priority(&self) -> EventPriority {
369 EventPriority::Normal
370 }
371 }
372
373 #[derive(Debug, Clone, Serialize, Deserialize)]
375 pub struct ToolExecuted {
376 pub session_id: String,
377 pub tool_id: String,
378 pub tool_name: String,
379 pub arguments: serde_json::Value,
380 pub result: serde_json::Value,
381 pub duration_ms: u64,
382 pub timestamp: DateTime<Utc>,
383 }
384
385 impl Event for ToolExecuted {
386 fn event_type(&self) -> &'static str {
387 "tool.executed"
388 }
389
390 fn priority(&self) -> EventPriority {
391 EventPriority::Normal
392 }
393
394 fn persistent(&self) -> bool {
395 true
396 }
397 }
398
399 #[derive(Debug, Clone, Serialize, Deserialize)]
400 pub struct ToolFailed {
401 pub session_id: String,
402 pub tool_id: String,
403 pub tool_name: String,
404 pub arguments: serde_json::Value,
405 pub error: String,
406 pub duration_ms: u64,
407 pub timestamp: DateTime<Utc>,
408 }
409
410 impl Event for ToolFailed {
411 fn event_type(&self) -> &'static str {
412 "tool.failed"
413 }
414
415 fn priority(&self) -> EventPriority {
416 EventPriority::High
417 }
418
419 fn persistent(&self) -> bool {
420 true
421 }
422 }
423
424 #[derive(Debug, Clone, Serialize, Deserialize)]
426 pub struct ProviderConnected {
427 pub provider_id: String,
428 pub provider_name: String,
429 pub timestamp: DateTime<Utc>,
430 }
431
432 impl Event for ProviderConnected {
433 fn event_type(&self) -> &'static str {
434 "provider.connected"
435 }
436
437 fn priority(&self) -> EventPriority {
438 EventPriority::High
439 }
440 }
441
442 #[derive(Debug, Clone, Serialize, Deserialize)]
443 pub struct ProviderDisconnected {
444 pub provider_id: String,
445 pub provider_name: String,
446 pub reason: String,
447 pub timestamp: DateTime<Utc>,
448 }
449
450 impl Event for ProviderDisconnected {
451 fn event_type(&self) -> &'static str {
452 "provider.disconnected"
453 }
454
455 fn priority(&self) -> EventPriority {
456 EventPriority::High
457 }
458 }
459
460 #[derive(Debug, Clone, Serialize, Deserialize)]
462 pub struct DataStored {
463 pub key: String,
464 pub size_bytes: u64,
465 pub timestamp: DateTime<Utc>,
466 }
467
468 impl Event for DataStored {
469 fn event_type(&self) -> &'static str {
470 "storage.stored"
471 }
472 }
473
474 #[derive(Debug, Clone, Serialize, Deserialize)]
475 pub struct DataRetrieved {
476 pub key: String,
477 pub size_bytes: u64,
478 pub timestamp: DateTime<Utc>,
479 }
480
481 impl Event for DataRetrieved {
482 fn event_type(&self) -> &'static str {
483 "storage.retrieved"
484 }
485 }
486
487 #[derive(Debug, Clone, Serialize, Deserialize)]
489 pub struct ErrorOccurred {
490 pub error_id: String,
491 pub component: String,
492 pub error_message: String,
493 pub error_code: Option<String>,
494 pub context: HashMap<String, serde_json::Value>,
495 pub timestamp: DateTime<Utc>,
496 }
497
498 impl Event for ErrorOccurred {
499 fn event_type(&self) -> &'static str {
500 "error.occurred"
501 }
502
503 fn priority(&self) -> EventPriority {
504 EventPriority::Critical
505 }
506
507 fn persistent(&self) -> bool {
508 true
509 }
510 }
511
512 #[derive(Debug, Clone, Serialize, Deserialize)]
514 pub struct SystemStarted {
515 pub version: String,
516 pub features: Vec<String>,
517 pub timestamp: DateTime<Utc>,
518 }
519
520 impl Event for SystemStarted {
521 fn event_type(&self) -> &'static str {
522 "system.started"
523 }
524
525 fn priority(&self) -> EventPriority {
526 EventPriority::High
527 }
528
529 fn persistent(&self) -> bool {
530 true
531 }
532 }
533
534 #[derive(Debug, Clone, Serialize, Deserialize)]
535 pub struct SystemShutdown {
536 pub reason: String,
537 pub timestamp: DateTime<Utc>,
538 }
539
540 impl Event for SystemShutdown {
541 fn event_type(&self) -> &'static str {
542 "system.shutdown"
543 }
544
545 fn priority(&self) -> EventPriority {
546 EventPriority::Critical
547 }
548
549 fn persistent(&self) -> bool {
550 true
551 }
552 }
553}
554
555#[macro_export]
557macro_rules! event_handler {
558 ($event_type:ty, $handler_fn:expr) => {
559 struct SimpleEventHandler {
560 handler: fn(&$event_type) -> Result<()>,
561 }
562
563 #[async_trait]
564 impl EventHandler<$event_type> for SimpleEventHandler {
565 async fn handle(&self, event: &$event_type) -> Result<()> {
566 (self.handler)(event)
567 }
568 }
569
570 SimpleEventHandler {
571 handler: $handler_fn,
572 }
573 };
574}
575
576#[cfg(test)]
577mod tests {
578 use super::*;
579 use std::sync::atomic::{AtomicU32, Ordering};
580
581 #[derive(Debug, Clone)]
582 struct TestEvent {
583 message: String,
584 }
585
586 impl Event for TestEvent {
587 fn event_type(&self) -> &'static str {
588 "test.event"
589 }
590 }
591
592 struct TestHandler {
593 counter: Arc<AtomicU32>,
594 }
595
596 #[async_trait]
597 impl EventHandler<TestEvent> for TestHandler {
598 async fn handle(&self, _event: &TestEvent) -> Result<()> {
599 self.counter.fetch_add(1, Ordering::SeqCst);
600 Ok(())
601 }
602 }
603
604 #[cfg(feature = "native")]
605 #[tokio::test]
606 async fn test_event_bus() {
607 let bus = EventBus::new();
608 let counter = Arc::new(AtomicU32::new(0));
609
610 let handler = TestHandler {
611 counter: counter.clone(),
612 };
613
614 bus.subscribe(handler).await.unwrap();
615
616 let event = TestEvent {
617 message: "Hello, World!".to_string(),
618 };
619
620 bus.publish(event).await.unwrap();
621
622 assert_eq!(counter.load(Ordering::SeqCst), 1);
623 }
624}