1#![warn(missing_docs)]
9
10use async_trait::async_trait;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize, de::DeserializeOwned};
13use std::{
14 collections::HashMap,
15 fmt,
16 sync::Arc,
17 time::{Duration, SystemTime, UNIX_EPOCH},
18};
19use tokio::sync::mpsc;
20use tracing::{debug, error, info, instrument, warn};
21
22#[derive(Debug)]
24pub enum EventError {
25 SerializationFailed(String),
27
28 DeserializationFailed(String),
30
31 HandlerFailed(String),
33
34 SubscriptionNotFound(String),
36
37 EventTypeNotRegistered(String),
39
40 StoreFailed(String),
42
43 BusClosed,
45
46 Timeout(String),
48
49 Internal(String),
51}
52
53impl fmt::Display for EventError {
54 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55 match self {
56 EventError::SerializationFailed(msg) => write!(f, "Event serialization failed: {}", msg),
57 EventError::DeserializationFailed(msg) => write!(f, "Event deserialization failed: {}", msg),
58 EventError::HandlerFailed(msg) => write!(f, "Event handler failed: {}", msg),
59 EventError::SubscriptionNotFound(msg) => write!(f, "Subscription not found: {}", msg),
60 EventError::EventTypeNotRegistered(msg) => write!(f, "Event type not registered: {}", msg),
61 EventError::StoreFailed(msg) => write!(f, "Event store failed: {}", msg),
62 EventError::BusClosed => write!(f, "Event bus is closed"),
63 EventError::Timeout(msg) => write!(f, "Operation timeout: {}", msg),
64 EventError::Internal(msg) => write!(f, "Internal error: {}", msg),
65 }
66 }
67}
68
69impl std::error::Error for EventError {}
70
71pub type EventResult<T> = Result<T, EventError>;
73
74pub type EventId = String;
76
77pub type SubscriptionId = String;
79
80pub type EventTypeName = String;
82
83pub trait Event: Send + Sync + 'static {
87 fn event_type(&self) -> EventTypeName;
89
90 fn event_id(&self) -> &EventId;
92
93 fn timestamp(&self) -> u64;
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct EventData {
102 pub id: EventId,
104 pub event_type: EventTypeName,
106 pub timestamp: u64,
108 pub payload: serde_json::Value,
110 pub metadata: HashMap<String, String>,
112}
113
114impl EventData {
115 pub fn new<E: Event + Serialize>(event: &E) -> EventResult<Self> {
117 let payload = serde_json::to_value(event).map_err(|e| EventError::SerializationFailed(e.to_string()))?;
118 Ok(Self {
119 id: event.event_id().clone(),
120 event_type: event.event_type(),
121 timestamp: event.timestamp(),
122 payload,
123 metadata: HashMap::new(),
124 })
125 }
126
127 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
129 self.metadata.insert(key.into(), value.into());
130 self
131 }
132
133 pub fn into_event<E: DeserializeOwned>(self) -> EventResult<E> {
135 serde_json::from_value(self.payload).map_err(|e| EventError::DeserializationFailed(e.to_string()))
136 }
137
138 pub fn id(&self) -> &str {
140 &self.id
141 }
142
143 pub fn event_type(&self) -> &str {
145 &self.event_type
146 }
147
148 pub fn timestamp(&self) -> u64 {
150 self.timestamp
151 }
152}
153
154#[async_trait]
158pub trait EventHandler: Send + Sync {
159 async fn handle(&self, event: EventData) -> EventResult<()>;
161
162 fn event_types(&self) -> Vec<EventTypeName>;
164}
165
166pub trait SyncEventHandler: Send + Sync {
170 fn handle(&self, event: EventData) -> EventResult<()>;
172
173 fn event_types(&self) -> Vec<EventTypeName>;
175}
176
177pub struct AsyncEventHandler<F> {
181 handler: F,
182 event_types: Vec<EventTypeName>,
183}
184
185impl<F> AsyncEventHandler<F>
186where
187 F: Fn(EventData) -> std::pin::Pin<Box<dyn std::future::Future<Output = EventResult<()>> + Send + Sync>>
188 + Send
189 + Sync
190 + 'static,
191{
192 pub fn new(event_types: Vec<EventTypeName>, handler: F) -> Self {
194 Self { handler, event_types }
195 }
196}
197
198#[async_trait]
199impl<F> EventHandler for AsyncEventHandler<F>
200where
201 F: Fn(EventData) -> std::pin::Pin<Box<dyn std::future::Future<Output = EventResult<()>> + Send + Sync>>
202 + Send
203 + Sync
204 + 'static,
205{
206 async fn handle(&self, event: EventData) -> EventResult<()> {
207 (self.handler)(event).await
208 }
209
210 fn event_types(&self) -> Vec<EventTypeName> {
211 self.event_types.clone()
212 }
213}
214
215pub struct SyncEventHandlerWrapper<F> {
219 handler: F,
220 event_types: Vec<EventTypeName>,
221}
222
223impl<F> SyncEventHandlerWrapper<F>
224where
225 F: Fn(EventData) -> EventResult<()> + Send + Sync + 'static,
226{
227 pub fn new(event_types: Vec<EventTypeName>, handler: F) -> Self {
229 Self { handler, event_types }
230 }
231}
232
233impl<F> SyncEventHandler for SyncEventHandlerWrapper<F>
234where
235 F: Fn(EventData) -> EventResult<()> + Send + Sync + 'static,
236{
237 fn handle(&self, event: EventData) -> EventResult<()> {
238 (self.handler)(event)
239 }
240
241 fn event_types(&self) -> Vec<EventTypeName> {
242 self.event_types.clone()
243 }
244}
245
246pub trait EventFilter: Send + Sync {
250 fn should_handle(&self, event: &EventData) -> bool;
252}
253
254pub struct TypeEventFilter {
258 allowed_types: Vec<EventTypeName>,
259}
260
261impl TypeEventFilter {
262 pub fn new(allowed_types: Vec<EventTypeName>) -> Self {
264 Self { allowed_types }
265 }
266}
267
268impl EventFilter for TypeEventFilter {
269 fn should_handle(&self, event: &EventData) -> bool {
270 self.allowed_types.contains(&event.event_type)
271 }
272}
273
274pub struct MetadataEventFilter {
278 key: String,
279 value: String,
280}
281
282impl MetadataEventFilter {
283 pub fn new(key: impl Into<String>, value: impl Into<String>) -> Self {
285 Self { key: key.into(), value: value.into() }
286 }
287}
288
289impl EventFilter for MetadataEventFilter {
290 fn should_handle(&self, event: &EventData) -> bool {
291 event.metadata.get(&self.key).map(|v| v == &self.value).unwrap_or(false)
292 }
293}
294
295#[derive(Debug, Clone)]
299pub struct Subscription {
300 pub id: SubscriptionId,
302 pub event_types: Vec<EventTypeName>,
304 pub created_at: u64,
306 pub active: bool,
308}
309
310impl Subscription {
311 pub fn new(id: SubscriptionId, event_types: Vec<EventTypeName>) -> Self {
313 Self {
314 id,
315 event_types,
316 created_at: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64,
317 active: true,
318 }
319 }
320}
321
322#[async_trait]
326pub trait EventStore: Send + Sync {
327 async fn append(&self, event: &EventData) -> EventResult<()>;
329
330 async fn append_batch(&self, events: &[EventData]) -> EventResult<()>;
332
333 async fn get_events(&self, event_type: &str) -> EventResult<Vec<EventData>>;
335
336 async fn get_events_by_time(&self, start: u64, end: u64) -> EventResult<Vec<EventData>>;
338
339 async fn get_all_events(&self) -> EventResult<Vec<EventData>>;
341
342 async fn count(&self) -> EventResult<u64>;
344
345 async fn clear(&self) -> EventResult<()>;
347}
348
349pub struct InMemoryEventStore {
353 events: RwLock<Vec<EventData>>,
354}
355
356impl InMemoryEventStore {
357 pub fn new() -> Self {
359 Self { events: RwLock::new(Vec::new()) }
360 }
361
362 pub fn with_capacity(capacity: usize) -> Self {
364 Self { events: RwLock::new(Vec::with_capacity(capacity)) }
365 }
366}
367
368impl Default for InMemoryEventStore {
369 fn default() -> Self {
370 Self::new()
371 }
372}
373
374#[async_trait]
375impl EventStore for InMemoryEventStore {
376 async fn append(&self, event: &EventData) -> EventResult<()> {
377 let mut events = self.events.write();
378 events.push(event.clone());
379 debug!("Event appended: {} [{}]", event.id, event.event_type);
380 Ok(())
381 }
382
383 async fn append_batch(&self, new_events: &[EventData]) -> EventResult<()> {
384 let mut events = self.events.write();
385 events.extend(new_events.iter().cloned());
386 debug!("Batch appended: {} events", new_events.len());
387 Ok(())
388 }
389
390 async fn get_events(&self, event_type: &str) -> EventResult<Vec<EventData>> {
391 let events = self.events.read();
392 let filtered: Vec<EventData> = events.iter().filter(|e| e.event_type == event_type).cloned().collect();
393 Ok(filtered)
394 }
395
396 async fn get_events_by_time(&self, start: u64, end: u64) -> EventResult<Vec<EventData>> {
397 let events = self.events.read();
398 let filtered: Vec<EventData> = events.iter().filter(|e| e.timestamp >= start && e.timestamp <= end).cloned().collect();
399 Ok(filtered)
400 }
401
402 async fn get_all_events(&self) -> EventResult<Vec<EventData>> {
403 let events = self.events.read();
404 Ok(events.clone())
405 }
406
407 async fn count(&self) -> EventResult<u64> {
408 let events = self.events.read();
409 Ok(events.len() as u64)
410 }
411
412 async fn clear(&self) -> EventResult<()> {
413 let mut events = self.events.write();
414 events.clear();
415 info!("Event store cleared");
416 Ok(())
417 }
418}
419
420#[derive(Debug, Clone)]
422pub struct EventBusConfig {
423 pub queue_capacity: usize,
425 pub handler_timeout: Duration,
427 pub enable_store: bool,
429 pub max_retries: u32,
431 pub retry_interval: Duration,
433}
434
435impl Default for EventBusConfig {
436 fn default() -> Self {
437 Self {
438 queue_capacity: 1000,
439 handler_timeout: Duration::from_secs(30),
440 enable_store: false,
441 max_retries: 3,
442 retry_interval: Duration::from_millis(100),
443 }
444 }
445}
446
447struct SubscriptionInfo {
449 subscription: Subscription,
450 handler: Arc<dyn EventHandler>,
451 filter: Option<Arc<dyn EventFilter>>,
452}
453
454pub struct EventBus {
458 config: EventBusConfig,
459 store: Option<Arc<dyn EventStore>>,
460 subscriptions: Arc<RwLock<HashMap<SubscriptionId, SubscriptionInfo>>>,
461 event_sender: mpsc::Sender<EventData>,
462 shutdown_sender: RwLock<Option<mpsc::Sender<()>>>,
463}
464
465impl EventBus {
466 pub fn new(config: EventBusConfig) -> Self {
468 Self::with_store(config, None)
469 }
470
471 pub fn with_store(config: EventBusConfig, store: Option<Arc<dyn EventStore>>) -> Self {
473 let (event_sender, event_receiver) = mpsc::channel::<EventData>(config.queue_capacity);
474 let (shutdown_sender, shutdown_receiver) = mpsc::channel::<()>(1);
475
476 let subscriptions = Arc::new(RwLock::new(HashMap::new()));
477
478 let bus = Self {
479 config,
480 store,
481 subscriptions: subscriptions.clone(),
482 event_sender,
483 shutdown_sender: RwLock::new(Some(shutdown_sender)),
484 };
485
486 let store_clone = bus.store.clone();
487 let config_clone = bus.config.clone();
488
489 tokio::spawn(Self::dispatcher(event_receiver, shutdown_receiver, subscriptions, store_clone, config_clone));
490
491 bus
492 }
493
494 async fn dispatcher(
495 mut receiver: mpsc::Receiver<EventData>,
496 mut shutdown: mpsc::Receiver<()>,
497 subscriptions: Arc<RwLock<HashMap<SubscriptionId, SubscriptionInfo>>>,
498 store: Option<Arc<dyn EventStore>>,
499 config: EventBusConfig,
500 ) {
501 loop {
502 tokio::select! {
503 Some(event) = receiver.recv() => {
504 if config.enable_store
505 && let Some(ref event_store) = store
506 && let Err(e) = event_store.append(&event).await
507 {
508 error!("Failed to store event: {}", e);
509 }
510
511 let subs = subscriptions.read();
512 for (_, info) in subs.iter() {
513 if !info.subscription.active {
514 continue;
515 }
516
517 if !info.subscription.event_types.contains(&event.event_type) {
518 continue;
519 }
520
521 if let Some(ref filter) = info.filter
522 && !filter.should_handle(&event)
523 {
524 continue;
525 }
526
527 let event_clone = event.clone();
528 let handler_clone = info.handler.clone();
529 let event_type = event.event_type.clone();
530 let sub_id = info.subscription.id.clone();
531 let timeout = config.handler_timeout;
532 let max_retries = config.max_retries;
533 let retry_interval = config.retry_interval;
534
535 tokio::spawn(async move {
536 for attempt in 0..max_retries {
537 match tokio::time::timeout(
538 timeout,
539 handler_clone.handle(event_clone.clone()),
540 )
541 .await
542 {
543 Ok(Ok(())) => {
544 debug!(
545 "Event handled successfully: {} [subscription: {}]",
546 event_type, sub_id
547 );
548 return;
549 }
550 Ok(Err(e)) => {
551 warn!(
552 "Event handler failed (attempt {}/{}): {}",
553 attempt + 1,
554 max_retries,
555 e
556 );
557 }
558 Err(_) => {
559 warn!(
560 "Event handler timeout (attempt {}/{})",
561 attempt + 1,
562 max_retries
563 );
564 }
565 }
566
567 if attempt + 1 < max_retries {
568 tokio::time::sleep(retry_interval).await;
569 }
570 }
571 error!(
572 "Event handler failed after {} retries: {} [subscription: {}]",
573 max_retries, event_type, sub_id
574 );
575 });
576 }
577 }
578 _ = shutdown.recv() => {
579 info!("Event bus dispatcher shutting down");
580 break;
581 }
582 else => break,
583 }
584 }
585 }
586
587 #[instrument(skip(self, handler))]
591 pub fn subscribe<H: EventHandler + 'static>(
592 &self,
593 event_types: Vec<EventTypeName>,
594 handler: H,
595 ) -> EventResult<Subscription> {
596 self.subscribe_with_filter(event_types, handler, None::<TypeEventFilter>)
597 }
598
599 #[instrument(skip(self, handler, filter))]
603 pub fn subscribe_with_filter<H: EventHandler + 'static, F: EventFilter + 'static>(
604 &self,
605 event_types: Vec<EventTypeName>,
606 handler: H,
607 filter: Option<F>,
608 ) -> EventResult<Subscription> {
609 let subscription_id = uuid::Uuid::new_v4().to_string();
610 let subscription = Subscription::new(subscription_id.clone(), event_types.clone());
611
612 let info = SubscriptionInfo {
613 subscription: subscription.clone(),
614 handler: Arc::new(handler),
615 filter: filter.map(|f| Arc::new(f) as Arc<dyn EventFilter>),
616 };
617
618 {
619 let mut subs = self.subscriptions.write();
620 subs.insert(subscription_id, info);
621 }
622
623 info!("Subscription created: {} for types: {:?}", subscription.id, event_types);
624 Ok(subscription)
625 }
626
627 #[instrument(skip(self))]
631 pub fn unsubscribe(&self, subscription_id: &str) -> EventResult<bool> {
632 let mut subs = self.subscriptions.write();
633 if subs.remove(subscription_id).is_some() {
634 info!("Subscription removed: {}", subscription_id);
635 Ok(true)
636 }
637 else {
638 warn!("Subscription not found: {}", subscription_id);
639 Ok(false)
640 }
641 }
642
643 #[instrument(skip(self, event))]
647 pub fn publish<E: Event + Serialize>(&self, event: &E) -> EventResult<()> {
648 let event_data = EventData::new(event)?;
649 self.publish_data(event_data)
650 }
651
652 #[instrument(skip(self, event_data))]
656 pub fn publish_data(&self, event_data: EventData) -> EventResult<()> {
657 match self.event_sender.blocking_send(event_data.clone()) {
658 Ok(()) => {
659 debug!("Event published: {} [{}]", event_data.id, event_data.event_type);
660 Ok(())
661 }
662 Err(_) => {
663 error!("Event bus channel closed");
664 Err(EventError::BusClosed)
665 }
666 }
667 }
668
669 #[instrument(skip(self, event))]
673 pub async fn publish_async<E: Event + Serialize>(&self, event: &E) -> EventResult<()> {
674 let event_data = EventData::new(event)?;
675 self.publish_data_async(event_data).await
676 }
677
678 #[instrument(skip(self, event_data))]
682 pub async fn publish_data_async(&self, event_data: EventData) -> EventResult<()> {
683 match self.event_sender.send(event_data.clone()).await {
684 Ok(()) => {
685 debug!("Event published async: {} [{}]", event_data.id, event_data.event_type);
686 Ok(())
687 }
688 Err(_) => {
689 error!("Event bus channel closed");
690 Err(EventError::BusClosed)
691 }
692 }
693 }
694
695 pub fn subscription_count(&self) -> usize {
697 self.subscriptions.read().len()
698 }
699
700 pub fn subscription_count_for_type(&self, event_type: &str) -> usize {
702 let subs = self.subscriptions.read();
703 subs.values().filter(|info| info.subscription.event_types.contains(&event_type.to_string())).count()
704 }
705
706 pub async fn shutdown(&self) -> EventResult<()> {
708 let tx = {
709 let mut sender = self.shutdown_sender.write();
710 sender.take()
711 };
712 if let Some(tx) = tx {
713 let _ = tx.send(()).await;
714 info!("Event bus shutdown initiated");
715 }
716 Ok(())
717 }
718
719 pub async fn replay_events<H: EventHandler + 'static>(
723 &self,
724 store: &dyn EventStore,
725 handler: &H,
726 event_type: Option<&str>,
727 ) -> EventResult<u64> {
728 let events = match event_type {
729 Some(t) => store.get_events(t).await?,
730 None => store.get_all_events().await?,
731 };
732
733 let mut count = 0u64;
734 for event in events {
735 if handler.event_types().contains(&event.event_type) {
736 handler.handle(event).await?;
737 count += 1;
738 }
739 }
740
741 info!("Replayed {} events", count);
742 Ok(count)
743 }
744}
745
746#[derive(Debug, Clone, Serialize, Deserialize)]
750pub struct BaseEvent<T> {
751 pub id: EventId,
753 pub event_type: EventTypeName,
755 pub timestamp: u64,
757 pub payload: T,
759}
760
761impl<T> BaseEvent<T>
762where
763 T: Send + Sync + 'static,
764{
765 pub fn new(event_type: impl Into<String>, payload: T) -> Self {
767 Self {
768 id: uuid::Uuid::new_v4().to_string(),
769 event_type: event_type.into(),
770 timestamp: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64,
771 payload,
772 }
773 }
774}
775
776impl<T: Serialize + Send + Sync + 'static> Event for BaseEvent<T> {
777 fn event_type(&self) -> EventTypeName {
778 self.event_type.clone()
779 }
780
781 fn event_id(&self) -> &EventId {
782 &self.id
783 }
784
785 fn timestamp(&self) -> u64 {
786 self.timestamp
787 }
788}
789
790pub fn memory_event_store() -> Arc<InMemoryEventStore> {
792 Arc::new(InMemoryEventStore::new())
793}
794
795pub fn event_bus(config: EventBusConfig) -> Arc<EventBus> {
797 Arc::new(EventBus::new(config))
798}
799
800pub fn event_bus_with_store(config: EventBusConfig, store: Arc<dyn EventStore>) -> Arc<EventBus> {
802 Arc::new(EventBus::with_store(config, Some(store)))
803}