1#![warn(missing_docs)]
9
10use async_trait::async_trait;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize, de::DeserializeOwned};
13use std::{
14 collections::HashMap,
15 sync::Arc,
16 time::{Duration, SystemTime, UNIX_EPOCH},
17};
18use tokio::sync::mpsc;
19use tracing::{debug, error, info, instrument, warn};
20use wae_types::{WaeError, WaeResult};
21
22pub type EventId = String;
24
25pub type SubscriptionId = String;
27
28pub type EventTypeName = String;
30
31pub trait Event: Send + Sync + 'static {
35 fn event_type(&self) -> EventTypeName;
37
38 fn event_id(&self) -> &EventId;
40
41 fn timestamp(&self) -> u64;
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct EventData {
50 pub id: EventId,
52 pub event_type: EventTypeName,
54 pub timestamp: u64,
56 pub payload: serde_json::Value,
58 pub metadata: HashMap<String, String>,
60}
61
62impl EventData {
63 pub fn new<E: Event + Serialize>(event: &E) -> WaeResult<Self> {
65 let payload = serde_json::to_value(event).map_err(|_e| WaeError::serialization_failed("Event"))?;
66 Ok(Self {
67 id: event.event_id().clone(),
68 event_type: event.event_type(),
69 timestamp: event.timestamp(),
70 payload,
71 metadata: HashMap::new(),
72 })
73 }
74
75 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
77 self.metadata.insert(key.into(), value.into());
78 self
79 }
80
81 pub fn into_event<E: DeserializeOwned>(self) -> WaeResult<E> {
83 serde_json::from_value(self.payload).map_err(|_e| WaeError::deserialization_failed("Event"))
84 }
85
86 pub fn id(&self) -> &str {
88 &self.id
89 }
90
91 pub fn event_type(&self) -> &str {
93 &self.event_type
94 }
95
96 pub fn timestamp(&self) -> u64 {
98 self.timestamp
99 }
100}
101
102#[async_trait]
106pub trait EventHandler: Send + Sync {
107 async fn handle(&self, event: EventData) -> WaeResult<()>;
109
110 fn event_types(&self) -> Vec<EventTypeName>;
112}
113
114pub trait SyncEventHandler: Send + Sync {
118 fn handle(&self, event: EventData) -> WaeResult<()>;
120
121 fn event_types(&self) -> Vec<EventTypeName>;
123}
124
125pub struct AsyncEventHandler<F> {
129 handler: F,
130 event_types: Vec<EventTypeName>,
131}
132
133impl<F> AsyncEventHandler<F>
134where
135 F: Fn(EventData) -> std::pin::Pin<Box<dyn std::future::Future<Output = WaeResult<()>> + Send + Sync>>
136 + Send
137 + Sync
138 + 'static,
139{
140 pub fn new(event_types: Vec<EventTypeName>, handler: F) -> Self {
142 Self { handler, event_types }
143 }
144}
145
146#[async_trait]
147impl<F> EventHandler for AsyncEventHandler<F>
148where
149 F: Fn(EventData) -> std::pin::Pin<Box<dyn std::future::Future<Output = WaeResult<()>> + Send + Sync>>
150 + Send
151 + Sync
152 + 'static,
153{
154 async fn handle(&self, event: EventData) -> WaeResult<()> {
155 (self.handler)(event).await
156 }
157
158 fn event_types(&self) -> Vec<EventTypeName> {
159 self.event_types.clone()
160 }
161}
162
163pub struct SyncEventHandlerWrapper<F> {
167 handler: F,
168 event_types: Vec<EventTypeName>,
169}
170
171impl<F> SyncEventHandlerWrapper<F>
172where
173 F: Fn(EventData) -> WaeResult<()> + Send + Sync + 'static,
174{
175 pub fn new(event_types: Vec<EventTypeName>, handler: F) -> Self {
177 Self { handler, event_types }
178 }
179}
180
181impl<F> SyncEventHandler for SyncEventHandlerWrapper<F>
182where
183 F: Fn(EventData) -> WaeResult<()> + Send + Sync + 'static,
184{
185 fn handle(&self, event: EventData) -> WaeResult<()> {
186 (self.handler)(event)
187 }
188
189 fn event_types(&self) -> Vec<EventTypeName> {
190 self.event_types.clone()
191 }
192}
193
194pub trait EventFilter: Send + Sync {
198 fn should_handle(&self, event: &EventData) -> bool;
200}
201
202pub struct TypeEventFilter {
206 allowed_types: Vec<EventTypeName>,
207}
208
209impl TypeEventFilter {
210 pub fn new(allowed_types: Vec<EventTypeName>) -> Self {
212 Self { allowed_types }
213 }
214}
215
216impl EventFilter for TypeEventFilter {
217 fn should_handle(&self, event: &EventData) -> bool {
218 self.allowed_types.contains(&event.event_type)
219 }
220}
221
222pub struct MetadataEventFilter {
226 key: String,
227 value: String,
228}
229
230impl MetadataEventFilter {
231 pub fn new(key: impl Into<String>, value: impl Into<String>) -> Self {
233 Self { key: key.into(), value: value.into() }
234 }
235}
236
237impl EventFilter for MetadataEventFilter {
238 fn should_handle(&self, event: &EventData) -> bool {
239 event.metadata.get(&self.key).map(|v| v == &self.value).unwrap_or(false)
240 }
241}
242
243#[derive(Debug, Clone)]
247pub struct Subscription {
248 pub id: SubscriptionId,
250 pub event_types: Vec<EventTypeName>,
252 pub created_at: u64,
254 pub active: bool,
256}
257
258impl Subscription {
259 pub fn new(id: SubscriptionId, event_types: Vec<EventTypeName>) -> Self {
261 Self {
262 id,
263 event_types,
264 created_at: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64,
265 active: true,
266 }
267 }
268}
269
270#[async_trait]
274pub trait EventStore: Send + Sync {
275 async fn append(&self, event: &EventData) -> WaeResult<()>;
277
278 async fn append_batch(&self, events: &[EventData]) -> WaeResult<()>;
280
281 async fn get_events(&self, event_type: &str) -> WaeResult<Vec<EventData>>;
283
284 async fn get_events_by_time(&self, start: u64, end: u64) -> WaeResult<Vec<EventData>>;
286
287 async fn get_all_events(&self) -> WaeResult<Vec<EventData>>;
289
290 async fn count(&self) -> WaeResult<u64>;
292
293 async fn clear(&self) -> WaeResult<()>;
295}
296
297pub struct InMemoryEventStore {
301 events: RwLock<Vec<EventData>>,
302}
303
304impl InMemoryEventStore {
305 pub fn new() -> Self {
307 Self { events: RwLock::new(Vec::new()) }
308 }
309
310 pub fn with_capacity(capacity: usize) -> Self {
312 Self { events: RwLock::new(Vec::with_capacity(capacity)) }
313 }
314}
315
316impl Default for InMemoryEventStore {
317 fn default() -> Self {
318 Self::new()
319 }
320}
321
322#[async_trait]
323impl EventStore for InMemoryEventStore {
324 async fn append(&self, event: &EventData) -> WaeResult<()> {
325 let mut events = self.events.write();
326 events.push(event.clone());
327 debug!("Event appended: {} [{}]", event.id, event.event_type);
328 Ok(())
329 }
330
331 async fn append_batch(&self, new_events: &[EventData]) -> WaeResult<()> {
332 let mut events = self.events.write();
333 events.extend(new_events.iter().cloned());
334 debug!("Batch appended: {} events", new_events.len());
335 Ok(())
336 }
337
338 async fn get_events(&self, event_type: &str) -> WaeResult<Vec<EventData>> {
339 let events = self.events.read();
340 let filtered: Vec<EventData> = events.iter().filter(|e| e.event_type == event_type).cloned().collect();
341 Ok(filtered)
342 }
343
344 async fn get_events_by_time(&self, start: u64, end: u64) -> WaeResult<Vec<EventData>> {
345 let events = self.events.read();
346 let filtered: Vec<EventData> = events.iter().filter(|e| e.timestamp >= start && e.timestamp <= end).cloned().collect();
347 Ok(filtered)
348 }
349
350 async fn get_all_events(&self) -> WaeResult<Vec<EventData>> {
351 let events = self.events.read();
352 Ok(events.clone())
353 }
354
355 async fn count(&self) -> WaeResult<u64> {
356 let events = self.events.read();
357 Ok(events.len() as u64)
358 }
359
360 async fn clear(&self) -> WaeResult<()> {
361 let mut events = self.events.write();
362 events.clear();
363 info!("Event store cleared");
364 Ok(())
365 }
366}
367
368#[derive(Debug, Clone)]
370pub struct EventBusConfig {
371 pub queue_capacity: usize,
373 pub handler_timeout: Duration,
375 pub enable_store: bool,
377 pub max_retries: u32,
379 pub retry_interval: Duration,
381}
382
383impl Default for EventBusConfig {
384 fn default() -> Self {
385 Self {
386 queue_capacity: 1000,
387 handler_timeout: Duration::from_secs(30),
388 enable_store: false,
389 max_retries: 3,
390 retry_interval: Duration::from_millis(100),
391 }
392 }
393}
394
395struct SubscriptionInfo {
397 subscription: Subscription,
398 handler: Arc<dyn EventHandler>,
399 filter: Option<Arc<dyn EventFilter>>,
400}
401
402pub struct EventBus {
406 config: EventBusConfig,
407 store: Option<Arc<dyn EventStore>>,
408 subscriptions: Arc<RwLock<HashMap<SubscriptionId, SubscriptionInfo>>>,
409 event_sender: mpsc::Sender<EventData>,
410 shutdown_sender: RwLock<Option<mpsc::Sender<()>>>,
411}
412
413impl EventBus {
414 pub fn new(config: EventBusConfig) -> Self {
416 Self::with_store(config, None)
417 }
418
419 pub fn with_store(config: EventBusConfig, store: Option<Arc<dyn EventStore>>) -> Self {
421 let (event_sender, event_receiver) = mpsc::channel::<EventData>(config.queue_capacity);
422 let (shutdown_sender, shutdown_receiver) = mpsc::channel::<()>(1);
423
424 let subscriptions = Arc::new(RwLock::new(HashMap::new()));
425
426 let bus = Self {
427 config,
428 store,
429 subscriptions: subscriptions.clone(),
430 event_sender,
431 shutdown_sender: RwLock::new(Some(shutdown_sender)),
432 };
433
434 let store_clone = bus.store.clone();
435 let config_clone = bus.config.clone();
436
437 tokio::spawn(Self::dispatcher(event_receiver, shutdown_receiver, subscriptions, store_clone, config_clone));
438
439 bus
440 }
441
442 async fn dispatcher(
443 mut receiver: mpsc::Receiver<EventData>,
444 mut shutdown: mpsc::Receiver<()>,
445 subscriptions: Arc<RwLock<HashMap<SubscriptionId, SubscriptionInfo>>>,
446 store: Option<Arc<dyn EventStore>>,
447 config: EventBusConfig,
448 ) {
449 loop {
450 tokio::select! {
451 Some(event) = receiver.recv() => {
452 if config.enable_store
453 && let Some(ref event_store) = store
454 && let Err(e) = event_store.append(&event).await
455 {
456 error!("Failed to store event: {}", e);
457 }
458
459 let subs = subscriptions.read();
460 for (_, info) in subs.iter() {
461 if !info.subscription.active {
462 continue;
463 }
464
465 if !info.subscription.event_types.contains(&event.event_type) {
466 continue;
467 }
468
469 if let Some(ref filter) = info.filter
470 && !filter.should_handle(&event)
471 {
472 continue;
473 }
474
475 let event_clone = event.clone();
476 let handler_clone = info.handler.clone();
477 let event_type = event.event_type.clone();
478 let sub_id = info.subscription.id.clone();
479 let timeout = config.handler_timeout;
480 let max_retries = config.max_retries;
481 let retry_interval = config.retry_interval;
482
483 tokio::spawn(async move {
484 for attempt in 0..max_retries {
485 match tokio::time::timeout(
486 timeout,
487 handler_clone.handle(event_clone.clone()),
488 )
489 .await
490 {
491 Ok(Ok(())) => {
492 debug!(
493 "Event handled successfully: {} [subscription: {}]",
494 event_type, sub_id
495 );
496 return;
497 }
498 Ok(Err(e)) => {
499 warn!(
500 "Event handler failed (attempt {}/{}): {}",
501 attempt + 1,
502 max_retries,
503 e
504 );
505 }
506 Err(_) => {
507 warn!(
508 "Event handler timeout (attempt {}/{})",
509 attempt + 1,
510 max_retries
511 );
512 }
513 }
514
515 if attempt + 1 < max_retries {
516 tokio::time::sleep(retry_interval).await;
517 }
518 }
519 error!(
520 "Event handler failed after {} retries: {} [subscription: {}]",
521 max_retries, event_type, sub_id
522 );
523 });
524 }
525 }
526 _ = shutdown.recv() => {
527 info!("Event bus dispatcher shutting down");
528 break;
529 }
530 else => break,
531 }
532 }
533 }
534
535 #[instrument(skip(self, handler))]
539 pub fn subscribe<H: EventHandler + 'static>(&self, event_types: Vec<EventTypeName>, handler: H) -> WaeResult<Subscription> {
540 self.subscribe_with_filter(event_types, handler, None::<TypeEventFilter>)
541 }
542
543 #[instrument(skip(self, handler, filter))]
547 pub fn subscribe_with_filter<H: EventHandler + 'static, F: EventFilter + 'static>(
548 &self,
549 event_types: Vec<EventTypeName>,
550 handler: H,
551 filter: Option<F>,
552 ) -> WaeResult<Subscription> {
553 let subscription_id = uuid::Uuid::new_v4().to_string();
554 let subscription = Subscription::new(subscription_id.clone(), event_types.clone());
555
556 let info = SubscriptionInfo {
557 subscription: subscription.clone(),
558 handler: Arc::new(handler),
559 filter: filter.map(|f| Arc::new(f) as Arc<dyn EventFilter>),
560 };
561
562 {
563 let mut subs = self.subscriptions.write();
564 subs.insert(subscription_id, info);
565 }
566
567 info!("Subscription created: {} for types: {:?}", subscription.id, event_types);
568 Ok(subscription)
569 }
570
571 #[instrument(skip(self))]
575 pub fn unsubscribe(&self, subscription_id: &str) -> WaeResult<bool> {
576 let mut subs = self.subscriptions.write();
577 if subs.remove(subscription_id).is_some() {
578 info!("Subscription removed: {}", subscription_id);
579 Ok(true)
580 }
581 else {
582 warn!("Subscription not found: {}", subscription_id);
583 Ok(false)
584 }
585 }
586
587 #[instrument(skip(self, event))]
591 pub fn publish<E: Event + Serialize>(&self, event: &E) -> WaeResult<()> {
592 let event_data = EventData::new(event)?;
593 self.publish_data(event_data)
594 }
595
596 #[instrument(skip(self, event_data))]
600 pub fn publish_data(&self, event_data: EventData) -> WaeResult<()> {
601 match self.event_sender.blocking_send(event_data.clone()) {
602 Ok(()) => {
603 debug!("Event published: {} [{}]", event_data.id, event_data.event_type);
604 Ok(())
605 }
606 Err(_) => {
607 error!("Event bus channel closed");
608 Err(WaeError::internal("Event bus channel closed"))
609 }
610 }
611 }
612
613 #[instrument(skip(self, event))]
617 pub async fn publish_async<E: Event + Serialize>(&self, event: &E) -> WaeResult<()> {
618 let event_data = EventData::new(event)?;
619 self.publish_data_async(event_data).await
620 }
621
622 #[instrument(skip(self, event_data))]
626 pub async fn publish_data_async(&self, event_data: EventData) -> WaeResult<()> {
627 match self.event_sender.send(event_data.clone()).await {
628 Ok(()) => {
629 debug!("Event published async: {} [{}]", event_data.id, event_data.event_type);
630 Ok(())
631 }
632 Err(_) => {
633 error!("Event bus channel closed");
634 Err(WaeError::internal("Event bus channel closed"))
635 }
636 }
637 }
638
639 pub fn subscription_count(&self) -> usize {
641 self.subscriptions.read().len()
642 }
643
644 pub fn subscription_count_for_type(&self, event_type: &str) -> usize {
646 let subs = self.subscriptions.read();
647 subs.values().filter(|info| info.subscription.event_types.contains(&event_type.to_string())).count()
648 }
649
650 pub async fn shutdown(&self) -> WaeResult<()> {
652 let tx = {
653 let mut sender = self.shutdown_sender.write();
654 sender.take()
655 };
656 if let Some(tx) = tx {
657 let _ = tx.send(()).await;
658 info!("Event bus shutdown initiated");
659 }
660 Ok(())
661 }
662
663 pub async fn replay_events<H: EventHandler + 'static>(
667 &self,
668 store: &dyn EventStore,
669 handler: &H,
670 event_type: Option<&str>,
671 ) -> WaeResult<u64> {
672 let events = match event_type {
673 Some(t) => store.get_events(t).await?,
674 None => store.get_all_events().await?,
675 };
676
677 let mut count = 0u64;
678 for event in events {
679 if handler.event_types().contains(&event.event_type) {
680 handler.handle(event).await?;
681 count += 1;
682 }
683 }
684
685 info!("Replayed {} events", count);
686 Ok(count)
687 }
688}
689
690#[derive(Debug, Clone, Serialize, Deserialize)]
694pub struct BaseEvent<T> {
695 pub id: EventId,
697 pub event_type: EventTypeName,
699 pub timestamp: u64,
701 pub payload: T,
703}
704
705impl<T> BaseEvent<T>
706where
707 T: Send + Sync + 'static,
708{
709 pub fn new(event_type: impl Into<String>, payload: T) -> Self {
711 Self {
712 id: uuid::Uuid::new_v4().to_string(),
713 event_type: event_type.into(),
714 timestamp: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64,
715 payload,
716 }
717 }
718}
719
720impl<T: Serialize + Send + Sync + 'static> Event for BaseEvent<T> {
721 fn event_type(&self) -> EventTypeName {
722 self.event_type.clone()
723 }
724
725 fn event_id(&self) -> &EventId {
726 &self.id
727 }
728
729 fn timestamp(&self) -> u64 {
730 self.timestamp
731 }
732}
733
734pub fn memory_event_store() -> Arc<InMemoryEventStore> {
736 Arc::new(InMemoryEventStore::new())
737}
738
739pub fn event_bus(config: EventBusConfig) -> Arc<EventBus> {
741 Arc::new(EventBus::new(config))
742}
743
744pub fn event_bus_with_store(config: EventBusConfig, store: Arc<dyn EventStore>) -> Arc<EventBus> {
746 Arc::new(EventBus::with_store(config, Some(store)))
747}