Skip to main content

wae_event/
lib.rs

1//! WAE Event - 事件驱动模块
2//!
3//! 提供统一的事件驱动能力抽象,支持事件发布/订阅、事件存储和事件回放。
4//!
5//! 深度融合 tokio 运行时,所有 API 都是异步优先设计。
6//! 微服务架构友好,支持事件溯源、CQRS 等模式。
7
8#![warn(missing_docs)]
9
10use async_trait::async_trait;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize, de::DeserializeOwned};
13use std::{
14    collections::HashMap,
15    sync::Arc,
16    time::{Duration, SystemTime, UNIX_EPOCH},
17};
18use tokio::sync::mpsc;
19use tracing::{debug, error, info, instrument, warn};
20use wae_types::{WaeError, WaeResult};
21
22/// 事件 ID 类型
23pub type EventId = String;
24
25/// 订阅 ID 类型
26pub type SubscriptionId = String;
27
28/// 事件类型名称
29pub type EventTypeName = String;
30
31/// 事件基础 trait
32///
33/// 所有事件必须实现此 trait,定义事件的基本属性。
34pub trait Event: Send + Sync + 'static {
35    /// 获取事件类型名称
36    fn event_type(&self) -> EventTypeName;
37
38    /// 获取事件 ID
39    fn event_id(&self) -> &EventId;
40
41    /// 获取事件时间戳
42    fn timestamp(&self) -> u64;
43}
44
45/// 事件数据封装
46///
47/// 用于序列化和传输事件的通用容器。
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct EventData {
50    /// 事件 ID
51    pub id: EventId,
52    /// 事件类型名称
53    pub event_type: EventTypeName,
54    /// 事件时间戳 (Unix 毫秒)
55    pub timestamp: u64,
56    /// 事件载荷 (JSON 序列化)
57    pub payload: serde_json::Value,
58    /// 事件元数据
59    pub metadata: HashMap<String, String>,
60}
61
62impl EventData {
63    /// 创建新的事件数据
64    pub fn new<E: Event + Serialize>(event: &E) -> WaeResult<Self> {
65        let payload = serde_json::to_value(event).map_err(|_e| WaeError::serialization_failed("Event"))?;
66        Ok(Self {
67            id: event.event_id().clone(),
68            event_type: event.event_type(),
69            timestamp: event.timestamp(),
70            payload,
71            metadata: HashMap::new(),
72        })
73    }
74
75    /// 添加元数据
76    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
77        self.metadata.insert(key.into(), value.into());
78        self
79    }
80
81    /// 反序列化为具体事件类型
82    pub fn into_event<E: DeserializeOwned>(self) -> WaeResult<E> {
83        serde_json::from_value(self.payload).map_err(|_e| WaeError::deserialization_failed("Event"))
84    }
85
86    /// 获取事件 ID
87    pub fn id(&self) -> &str {
88        &self.id
89    }
90
91    /// 获取事件类型
92    pub fn event_type(&self) -> &str {
93        &self.event_type
94    }
95
96    /// 获取时间戳
97    pub fn timestamp(&self) -> u64 {
98        self.timestamp
99    }
100}
101
102/// 事件处理器 trait (异步)
103///
104/// 定义异步事件处理器的接口。
105#[async_trait]
106pub trait EventHandler: Send + Sync {
107    /// 处理事件
108    async fn handle(&self, event: EventData) -> WaeResult<()>;
109
110    /// 获取处理器感兴趣的事件类型
111    fn event_types(&self) -> Vec<EventTypeName>;
112}
113
114/// 同步事件处理器 trait
115///
116/// 定义同步事件处理器的接口。
117pub trait SyncEventHandler: Send + Sync {
118    /// 处理事件
119    fn handle(&self, event: EventData) -> WaeResult<()>;
120
121    /// 获取处理器感兴趣的事件类型
122    fn event_types(&self) -> Vec<EventTypeName>;
123}
124
125/// 异步事件处理器包装器
126///
127/// 将闭包包装为 EventHandler trait 对象。
128pub struct AsyncEventHandler<F> {
129    handler: F,
130    event_types: Vec<EventTypeName>,
131}
132
133impl<F> AsyncEventHandler<F>
134where
135    F: Fn(EventData) -> std::pin::Pin<Box<dyn std::future::Future<Output = WaeResult<()>> + Send + Sync>>
136        + Send
137        + Sync
138        + 'static,
139{
140    /// 创建新的异步事件处理器
141    pub fn new(event_types: Vec<EventTypeName>, handler: F) -> Self {
142        Self { handler, event_types }
143    }
144}
145
146#[async_trait]
147impl<F> EventHandler for AsyncEventHandler<F>
148where
149    F: Fn(EventData) -> std::pin::Pin<Box<dyn std::future::Future<Output = WaeResult<()>> + Send + Sync>>
150        + Send
151        + Sync
152        + 'static,
153{
154    async fn handle(&self, event: EventData) -> WaeResult<()> {
155        (self.handler)(event).await
156    }
157
158    fn event_types(&self) -> Vec<EventTypeName> {
159        self.event_types.clone()
160    }
161}
162
163/// 同步事件处理器包装器
164///
165/// 将闭包包装为 SyncEventHandler trait 对象。
166pub struct SyncEventHandlerWrapper<F> {
167    handler: F,
168    event_types: Vec<EventTypeName>,
169}
170
171impl<F> SyncEventHandlerWrapper<F>
172where
173    F: Fn(EventData) -> WaeResult<()> + Send + Sync + 'static,
174{
175    /// 创建新的同步事件处理器
176    pub fn new(event_types: Vec<EventTypeName>, handler: F) -> Self {
177        Self { handler, event_types }
178    }
179}
180
181impl<F> SyncEventHandler for SyncEventHandlerWrapper<F>
182where
183    F: Fn(EventData) -> WaeResult<()> + Send + Sync + 'static,
184{
185    fn handle(&self, event: EventData) -> WaeResult<()> {
186        (self.handler)(event)
187    }
188
189    fn event_types(&self) -> Vec<EventTypeName> {
190        self.event_types.clone()
191    }
192}
193
194/// 事件过滤器 trait
195///
196/// 用于过滤事件,决定是否处理某个事件。
197pub trait EventFilter: Send + Sync {
198    /// 判断是否应该处理该事件
199    fn should_handle(&self, event: &EventData) -> bool;
200}
201
202/// 类型事件过滤器
203///
204/// 只处理指定类型的事件。
205pub struct TypeEventFilter {
206    allowed_types: Vec<EventTypeName>,
207}
208
209impl TypeEventFilter {
210    /// 创建新的类型过滤器
211    pub fn new(allowed_types: Vec<EventTypeName>) -> Self {
212        Self { allowed_types }
213    }
214}
215
216impl EventFilter for TypeEventFilter {
217    fn should_handle(&self, event: &EventData) -> bool {
218        self.allowed_types.contains(&event.event_type)
219    }
220}
221
222/// 元数据事件过滤器
223///
224/// 根据元数据过滤事件。
225pub struct MetadataEventFilter {
226    key: String,
227    value: String,
228}
229
230impl MetadataEventFilter {
231    /// 创建新的元数据过滤器
232    pub fn new(key: impl Into<String>, value: impl Into<String>) -> Self {
233        Self { key: key.into(), value: value.into() }
234    }
235}
236
237impl EventFilter for MetadataEventFilter {
238    fn should_handle(&self, event: &EventData) -> bool {
239        event.metadata.get(&self.key).map(|v| v == &self.value).unwrap_or(false)
240    }
241}
242
243/// 订阅句柄
244///
245/// 用于管理订阅生命周期。
246#[derive(Debug, Clone)]
247pub struct Subscription {
248    /// 订阅 ID
249    pub id: SubscriptionId,
250    /// 订阅的事件类型
251    pub event_types: Vec<EventTypeName>,
252    /// 订阅时间
253    pub created_at: u64,
254    /// 是否激活
255    pub active: bool,
256}
257
258impl Subscription {
259    /// 创建新的订阅
260    pub fn new(id: SubscriptionId, event_types: Vec<EventTypeName>) -> Self {
261        Self {
262            id,
263            event_types,
264            created_at: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64,
265            active: true,
266        }
267    }
268}
269
270/// 事件存储 trait
271///
272/// 定义事件持久化存储的接口。
273#[async_trait]
274pub trait EventStore: Send + Sync {
275    /// 追加事件到存储
276    async fn append(&self, event: &EventData) -> WaeResult<()>;
277
278    /// 批量追加事件
279    async fn append_batch(&self, events: &[EventData]) -> WaeResult<()>;
280
281    /// 获取指定类型的事件流
282    async fn get_events(&self, event_type: &str) -> WaeResult<Vec<EventData>>;
283
284    /// 获取指定时间范围的事件
285    async fn get_events_by_time(&self, start: u64, end: u64) -> WaeResult<Vec<EventData>>;
286
287    /// 获取所有事件
288    async fn get_all_events(&self) -> WaeResult<Vec<EventData>>;
289
290    /// 获取事件数量
291    async fn count(&self) -> WaeResult<u64>;
292
293    /// 清空所有事件
294    async fn clear(&self) -> WaeResult<()>;
295}
296
297/// 内存事件存储
298///
299/// 基于内存的事件存储实现,用于开发和测试。
300pub struct InMemoryEventStore {
301    events: RwLock<Vec<EventData>>,
302}
303
304impl InMemoryEventStore {
305    /// 创建新的内存事件存储
306    pub fn new() -> Self {
307        Self { events: RwLock::new(Vec::new()) }
308    }
309
310    /// 创建带初始容量的内存事件存储
311    pub fn with_capacity(capacity: usize) -> Self {
312        Self { events: RwLock::new(Vec::with_capacity(capacity)) }
313    }
314}
315
316impl Default for InMemoryEventStore {
317    fn default() -> Self {
318        Self::new()
319    }
320}
321
322#[async_trait]
323impl EventStore for InMemoryEventStore {
324    async fn append(&self, event: &EventData) -> WaeResult<()> {
325        let mut events = self.events.write();
326        events.push(event.clone());
327        debug!("Event appended: {} [{}]", event.id, event.event_type);
328        Ok(())
329    }
330
331    async fn append_batch(&self, new_events: &[EventData]) -> WaeResult<()> {
332        let mut events = self.events.write();
333        events.extend(new_events.iter().cloned());
334        debug!("Batch appended: {} events", new_events.len());
335        Ok(())
336    }
337
338    async fn get_events(&self, event_type: &str) -> WaeResult<Vec<EventData>> {
339        let events = self.events.read();
340        let filtered: Vec<EventData> = events.iter().filter(|e| e.event_type == event_type).cloned().collect();
341        Ok(filtered)
342    }
343
344    async fn get_events_by_time(&self, start: u64, end: u64) -> WaeResult<Vec<EventData>> {
345        let events = self.events.read();
346        let filtered: Vec<EventData> = events.iter().filter(|e| e.timestamp >= start && e.timestamp <= end).cloned().collect();
347        Ok(filtered)
348    }
349
350    async fn get_all_events(&self) -> WaeResult<Vec<EventData>> {
351        let events = self.events.read();
352        Ok(events.clone())
353    }
354
355    async fn count(&self) -> WaeResult<u64> {
356        let events = self.events.read();
357        Ok(events.len() as u64)
358    }
359
360    async fn clear(&self) -> WaeResult<()> {
361        let mut events = self.events.write();
362        events.clear();
363        info!("Event store cleared");
364        Ok(())
365    }
366}
367
368/// 事件总线配置
369#[derive(Debug, Clone)]
370pub struct EventBusConfig {
371    /// 事件队列容量
372    pub queue_capacity: usize,
373    /// 处理器超时时间
374    pub handler_timeout: Duration,
375    /// 是否启用事件存储
376    pub enable_store: bool,
377    /// 最大重试次数
378    pub max_retries: u32,
379    /// 重试间隔
380    pub retry_interval: Duration,
381}
382
383impl Default for EventBusConfig {
384    fn default() -> Self {
385        Self {
386            queue_capacity: 1000,
387            handler_timeout: Duration::from_secs(30),
388            enable_store: false,
389            max_retries: 3,
390            retry_interval: Duration::from_millis(100),
391        }
392    }
393}
394
395/// 内部订阅信息
396struct SubscriptionInfo {
397    subscription: Subscription,
398    handler: Arc<dyn EventHandler>,
399    filter: Option<Arc<dyn EventFilter>>,
400}
401
402/// 事件总线
403///
404/// 事件驱动的核心组件,负责事件的发布和分发。
405pub struct EventBus {
406    config: EventBusConfig,
407    store: Option<Arc<dyn EventStore>>,
408    subscriptions: Arc<RwLock<HashMap<SubscriptionId, SubscriptionInfo>>>,
409    event_sender: mpsc::Sender<EventData>,
410    shutdown_sender: RwLock<Option<mpsc::Sender<()>>>,
411}
412
413impl EventBus {
414    /// 创建新的事件总线
415    pub fn new(config: EventBusConfig) -> Self {
416        Self::with_store(config, None)
417    }
418
419    /// 创建带事件存储的事件总线
420    pub fn with_store(config: EventBusConfig, store: Option<Arc<dyn EventStore>>) -> Self {
421        let (event_sender, event_receiver) = mpsc::channel::<EventData>(config.queue_capacity);
422        let (shutdown_sender, shutdown_receiver) = mpsc::channel::<()>(1);
423
424        let subscriptions = Arc::new(RwLock::new(HashMap::new()));
425
426        let bus = Self {
427            config,
428            store,
429            subscriptions: subscriptions.clone(),
430            event_sender,
431            shutdown_sender: RwLock::new(Some(shutdown_sender)),
432        };
433
434        let store_clone = bus.store.clone();
435        let config_clone = bus.config.clone();
436
437        tokio::spawn(Self::dispatcher(event_receiver, shutdown_receiver, subscriptions, store_clone, config_clone));
438
439        bus
440    }
441
442    async fn dispatcher(
443        mut receiver: mpsc::Receiver<EventData>,
444        mut shutdown: mpsc::Receiver<()>,
445        subscriptions: Arc<RwLock<HashMap<SubscriptionId, SubscriptionInfo>>>,
446        store: Option<Arc<dyn EventStore>>,
447        config: EventBusConfig,
448    ) {
449        loop {
450            tokio::select! {
451                Some(event) = receiver.recv() => {
452                    if config.enable_store
453                        && let Some(ref event_store) = store
454                        && let Err(e) = event_store.append(&event).await
455                    {
456                        error!("Failed to store event: {}", e);
457                    }
458
459                    let subs = subscriptions.read();
460                    for (_, info) in subs.iter() {
461                        if !info.subscription.active {
462                            continue;
463                        }
464
465                        if !info.subscription.event_types.contains(&event.event_type) {
466                            continue;
467                        }
468
469                        if let Some(ref filter) = info.filter
470                            && !filter.should_handle(&event)
471                        {
472                            continue;
473                        }
474
475                        let event_clone = event.clone();
476                        let handler_clone = info.handler.clone();
477                        let event_type = event.event_type.clone();
478                        let sub_id = info.subscription.id.clone();
479                        let timeout = config.handler_timeout;
480                        let max_retries = config.max_retries;
481                        let retry_interval = config.retry_interval;
482
483                        tokio::spawn(async move {
484                            for attempt in 0..max_retries {
485                                match tokio::time::timeout(
486                                    timeout,
487                                    handler_clone.handle(event_clone.clone()),
488                                )
489                                .await
490                                {
491                                    Ok(Ok(())) => {
492                                        debug!(
493                                            "Event handled successfully: {} [subscription: {}]",
494                                            event_type, sub_id
495                                        );
496                                        return;
497                                    }
498                                    Ok(Err(e)) => {
499                                        warn!(
500                                            "Event handler failed (attempt {}/{}): {}",
501                                            attempt + 1,
502                                            max_retries,
503                                            e
504                                        );
505                                    }
506                                    Err(_) => {
507                                        warn!(
508                                            "Event handler timeout (attempt {}/{})",
509                                            attempt + 1,
510                                            max_retries
511                                        );
512                                    }
513                                }
514
515                                if attempt + 1 < max_retries {
516                                    tokio::time::sleep(retry_interval).await;
517                                }
518                            }
519                            error!(
520                                "Event handler failed after {} retries: {} [subscription: {}]",
521                                max_retries, event_type, sub_id
522                            );
523                        });
524                    }
525                }
526                _ = shutdown.recv() => {
527                    info!("Event bus dispatcher shutting down");
528                    break;
529                }
530                else => break,
531            }
532        }
533    }
534
535    /// 订阅事件
536    ///
537    /// 注册一个事件处理器,订阅指定类型的事件。
538    #[instrument(skip(self, handler))]
539    pub fn subscribe<H: EventHandler + 'static>(&self, event_types: Vec<EventTypeName>, handler: H) -> WaeResult<Subscription> {
540        self.subscribe_with_filter(event_types, handler, None::<TypeEventFilter>)
541    }
542
543    /// 订阅事件 (带过滤器)
544    ///
545    /// 注册一个事件处理器,订阅指定类型的事件,并应用过滤器。
546    #[instrument(skip(self, handler, filter))]
547    pub fn subscribe_with_filter<H: EventHandler + 'static, F: EventFilter + 'static>(
548        &self,
549        event_types: Vec<EventTypeName>,
550        handler: H,
551        filter: Option<F>,
552    ) -> WaeResult<Subscription> {
553        let subscription_id = uuid::Uuid::new_v4().to_string();
554        let subscription = Subscription::new(subscription_id.clone(), event_types.clone());
555
556        let info = SubscriptionInfo {
557            subscription: subscription.clone(),
558            handler: Arc::new(handler),
559            filter: filter.map(|f| Arc::new(f) as Arc<dyn EventFilter>),
560        };
561
562        {
563            let mut subs = self.subscriptions.write();
564            subs.insert(subscription_id, info);
565        }
566
567        info!("Subscription created: {} for types: {:?}", subscription.id, event_types);
568        Ok(subscription)
569    }
570
571    /// 取消订阅
572    ///
573    /// 移除指定 ID 的订阅。
574    #[instrument(skip(self))]
575    pub fn unsubscribe(&self, subscription_id: &str) -> WaeResult<bool> {
576        let mut subs = self.subscriptions.write();
577        if subs.remove(subscription_id).is_some() {
578            info!("Subscription removed: {}", subscription_id);
579            Ok(true)
580        }
581        else {
582            warn!("Subscription not found: {}", subscription_id);
583            Ok(false)
584        }
585    }
586
587    /// 发布事件
588    ///
589    /// 同步发布事件到事件总线。
590    #[instrument(skip(self, event))]
591    pub fn publish<E: Event + Serialize>(&self, event: &E) -> WaeResult<()> {
592        let event_data = EventData::new(event)?;
593        self.publish_data(event_data)
594    }
595
596    /// 发布事件数据
597    ///
598    /// 发布已封装的事件数据。
599    #[instrument(skip(self, event_data))]
600    pub fn publish_data(&self, event_data: EventData) -> WaeResult<()> {
601        match self.event_sender.blocking_send(event_data.clone()) {
602            Ok(()) => {
603                debug!("Event published: {} [{}]", event_data.id, event_data.event_type);
604                Ok(())
605            }
606            Err(_) => {
607                error!("Event bus channel closed");
608                Err(WaeError::internal("Event bus channel closed"))
609            }
610        }
611    }
612
613    /// 异步发布事件
614    ///
615    /// 异步发布事件到事件总线。
616    #[instrument(skip(self, event))]
617    pub async fn publish_async<E: Event + Serialize>(&self, event: &E) -> WaeResult<()> {
618        let event_data = EventData::new(event)?;
619        self.publish_data_async(event_data).await
620    }
621
622    /// 异步发布事件数据
623    ///
624    /// 异步发布已封装的事件数据。
625    #[instrument(skip(self, event_data))]
626    pub async fn publish_data_async(&self, event_data: EventData) -> WaeResult<()> {
627        match self.event_sender.send(event_data.clone()).await {
628            Ok(()) => {
629                debug!("Event published async: {} [{}]", event_data.id, event_data.event_type);
630                Ok(())
631            }
632            Err(_) => {
633                error!("Event bus channel closed");
634                Err(WaeError::internal("Event bus channel closed"))
635            }
636        }
637    }
638
639    /// 获取订阅数量
640    pub fn subscription_count(&self) -> usize {
641        self.subscriptions.read().len()
642    }
643
644    /// 获取指定事件类型的订阅数量
645    pub fn subscription_count_for_type(&self, event_type: &str) -> usize {
646        let subs = self.subscriptions.read();
647        subs.values().filter(|info| info.subscription.event_types.contains(&event_type.to_string())).count()
648    }
649
650    /// 关闭事件总线
651    pub async fn shutdown(&self) -> WaeResult<()> {
652        let tx = {
653            let mut sender = self.shutdown_sender.write();
654            sender.take()
655        };
656        if let Some(tx) = tx {
657            let _ = tx.send(()).await;
658            info!("Event bus shutdown initiated");
659        }
660        Ok(())
661    }
662
663    /// 事件回放
664    ///
665    /// 从事件存储中回放事件到指定处理器。
666    pub async fn replay_events<H: EventHandler + 'static>(
667        &self,
668        store: &dyn EventStore,
669        handler: &H,
670        event_type: Option<&str>,
671    ) -> WaeResult<u64> {
672        let events = match event_type {
673            Some(t) => store.get_events(t).await?,
674            None => store.get_all_events().await?,
675        };
676
677        let mut count = 0u64;
678        for event in events {
679            if handler.event_types().contains(&event.event_type) {
680                handler.handle(event).await?;
681                count += 1;
682            }
683        }
684
685        info!("Replayed {} events", count);
686        Ok(count)
687    }
688}
689
690/// 基础事件实现
691///
692/// 提供一个简单的事件基类,方便快速创建事件。
693#[derive(Debug, Clone, Serialize, Deserialize)]
694pub struct BaseEvent<T> {
695    /// 事件 ID
696    pub id: EventId,
697    /// 事件类型
698    pub event_type: EventTypeName,
699    /// 时间戳
700    pub timestamp: u64,
701    /// 事件载荷
702    pub payload: T,
703}
704
705impl<T> BaseEvent<T>
706where
707    T: Send + Sync + 'static,
708{
709    /// 创建新的基础事件
710    pub fn new(event_type: impl Into<String>, payload: T) -> Self {
711        Self {
712            id: uuid::Uuid::new_v4().to_string(),
713            event_type: event_type.into(),
714            timestamp: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64,
715            payload,
716        }
717    }
718}
719
720impl<T: Serialize + Send + Sync + 'static> Event for BaseEvent<T> {
721    fn event_type(&self) -> EventTypeName {
722        self.event_type.clone()
723    }
724
725    fn event_id(&self) -> &EventId {
726        &self.id
727    }
728
729    fn timestamp(&self) -> u64 {
730        self.timestamp
731    }
732}
733
734/// 便捷函数:创建内存事件存储
735pub fn memory_event_store() -> Arc<InMemoryEventStore> {
736    Arc::new(InMemoryEventStore::new())
737}
738
739/// 便捷函数:创建事件总线
740pub fn event_bus(config: EventBusConfig) -> Arc<EventBus> {
741    Arc::new(EventBus::new(config))
742}
743
744/// 便捷函数:创建带存储的事件总线
745pub fn event_bus_with_store(config: EventBusConfig, store: Arc<dyn EventStore>) -> Arc<EventBus> {
746    Arc::new(EventBus::with_store(config, Some(store)))
747}