Skip to main content

wae_event/
lib.rs

1//! WAE Event - 事件驱动模块
2//!
3//! 提供统一的事件驱动能力抽象,支持事件发布/订阅、事件存储和事件回放。
4//!
5//! 深度融合 tokio 运行时,所有 API 都是异步优先设计。
6//! 微服务架构友好,支持事件溯源、CQRS 等模式。
7
8#![warn(missing_docs)]
9
10use async_trait::async_trait;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize, de::DeserializeOwned};
13use std::{
14    collections::HashMap,
15    fmt,
16    sync::Arc,
17    time::{Duration, SystemTime, UNIX_EPOCH},
18};
19use tokio::sync::mpsc;
20use tracing::{debug, error, info, instrument, warn};
21
22/// 事件错误类型
23#[derive(Debug)]
24pub enum EventError {
25    /// 事件序列化失败
26    SerializationFailed(String),
27
28    /// 事件反序列化失败
29    DeserializationFailed(String),
30
31    /// 事件处理器执行失败
32    HandlerFailed(String),
33
34    /// 订阅不存在
35    SubscriptionNotFound(String),
36
37    /// 事件类型未注册
38    EventTypeNotRegistered(String),
39
40    /// 事件存储失败
41    StoreFailed(String),
42
43    /// 事件总线已关闭
44    BusClosed,
45
46    /// 操作超时
47    Timeout(String),
48
49    /// 内部错误
50    Internal(String),
51}
52
53impl fmt::Display for EventError {
54    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55        match self {
56            EventError::SerializationFailed(msg) => write!(f, "Event serialization failed: {}", msg),
57            EventError::DeserializationFailed(msg) => write!(f, "Event deserialization failed: {}", msg),
58            EventError::HandlerFailed(msg) => write!(f, "Event handler failed: {}", msg),
59            EventError::SubscriptionNotFound(msg) => write!(f, "Subscription not found: {}", msg),
60            EventError::EventTypeNotRegistered(msg) => write!(f, "Event type not registered: {}", msg),
61            EventError::StoreFailed(msg) => write!(f, "Event store failed: {}", msg),
62            EventError::BusClosed => write!(f, "Event bus is closed"),
63            EventError::Timeout(msg) => write!(f, "Operation timeout: {}", msg),
64            EventError::Internal(msg) => write!(f, "Internal error: {}", msg),
65        }
66    }
67}
68
69impl std::error::Error for EventError {}
70
71/// 事件操作结果类型
72pub type EventResult<T> = Result<T, EventError>;
73
74/// 事件 ID 类型
75pub type EventId = String;
76
77/// 订阅 ID 类型
78pub type SubscriptionId = String;
79
80/// 事件类型名称
81pub type EventTypeName = String;
82
83/// 事件基础 trait
84///
85/// 所有事件必须实现此 trait,定义事件的基本属性。
86pub trait Event: Send + Sync + 'static {
87    /// 获取事件类型名称
88    fn event_type(&self) -> EventTypeName;
89
90    /// 获取事件 ID
91    fn event_id(&self) -> &EventId;
92
93    /// 获取事件时间戳
94    fn timestamp(&self) -> u64;
95}
96
97/// 事件数据封装
98///
99/// 用于序列化和传输事件的通用容器。
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct EventData {
102    /// 事件 ID
103    pub id: EventId,
104    /// 事件类型名称
105    pub event_type: EventTypeName,
106    /// 事件时间戳 (Unix 毫秒)
107    pub timestamp: u64,
108    /// 事件载荷 (JSON 序列化)
109    pub payload: serde_json::Value,
110    /// 事件元数据
111    pub metadata: HashMap<String, String>,
112}
113
114impl EventData {
115    /// 创建新的事件数据
116    pub fn new<E: Event + Serialize>(event: &E) -> EventResult<Self> {
117        let payload = serde_json::to_value(event).map_err(|e| EventError::SerializationFailed(e.to_string()))?;
118        Ok(Self {
119            id: event.event_id().clone(),
120            event_type: event.event_type(),
121            timestamp: event.timestamp(),
122            payload,
123            metadata: HashMap::new(),
124        })
125    }
126
127    /// 添加元数据
128    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
129        self.metadata.insert(key.into(), value.into());
130        self
131    }
132
133    /// 反序列化为具体事件类型
134    pub fn into_event<E: DeserializeOwned>(self) -> EventResult<E> {
135        serde_json::from_value(self.payload).map_err(|e| EventError::DeserializationFailed(e.to_string()))
136    }
137
138    /// 获取事件 ID
139    pub fn id(&self) -> &str {
140        &self.id
141    }
142
143    /// 获取事件类型
144    pub fn event_type(&self) -> &str {
145        &self.event_type
146    }
147
148    /// 获取时间戳
149    pub fn timestamp(&self) -> u64 {
150        self.timestamp
151    }
152}
153
154/// 事件处理器 trait (异步)
155///
156/// 定义异步事件处理器的接口。
157#[async_trait]
158pub trait EventHandler: Send + Sync {
159    /// 处理事件
160    async fn handle(&self, event: EventData) -> EventResult<()>;
161
162    /// 获取处理器感兴趣的事件类型
163    fn event_types(&self) -> Vec<EventTypeName>;
164}
165
166/// 同步事件处理器 trait
167///
168/// 定义同步事件处理器的接口。
169pub trait SyncEventHandler: Send + Sync {
170    /// 处理事件
171    fn handle(&self, event: EventData) -> EventResult<()>;
172
173    /// 获取处理器感兴趣的事件类型
174    fn event_types(&self) -> Vec<EventTypeName>;
175}
176
177/// 异步事件处理器包装器
178///
179/// 将闭包包装为 EventHandler trait 对象。
180pub struct AsyncEventHandler<F> {
181    handler: F,
182    event_types: Vec<EventTypeName>,
183}
184
185impl<F> AsyncEventHandler<F>
186where
187    F: Fn(EventData) -> std::pin::Pin<Box<dyn std::future::Future<Output = EventResult<()>> + Send + Sync>>
188        + Send
189        + Sync
190        + 'static,
191{
192    /// 创建新的异步事件处理器
193    pub fn new(event_types: Vec<EventTypeName>, handler: F) -> Self {
194        Self { handler, event_types }
195    }
196}
197
198#[async_trait]
199impl<F> EventHandler for AsyncEventHandler<F>
200where
201    F: Fn(EventData) -> std::pin::Pin<Box<dyn std::future::Future<Output = EventResult<()>> + Send + Sync>>
202        + Send
203        + Sync
204        + 'static,
205{
206    async fn handle(&self, event: EventData) -> EventResult<()> {
207        (self.handler)(event).await
208    }
209
210    fn event_types(&self) -> Vec<EventTypeName> {
211        self.event_types.clone()
212    }
213}
214
215/// 同步事件处理器包装器
216///
217/// 将闭包包装为 SyncEventHandler trait 对象。
218pub struct SyncEventHandlerWrapper<F> {
219    handler: F,
220    event_types: Vec<EventTypeName>,
221}
222
223impl<F> SyncEventHandlerWrapper<F>
224where
225    F: Fn(EventData) -> EventResult<()> + Send + Sync + 'static,
226{
227    /// 创建新的同步事件处理器
228    pub fn new(event_types: Vec<EventTypeName>, handler: F) -> Self {
229        Self { handler, event_types }
230    }
231}
232
233impl<F> SyncEventHandler for SyncEventHandlerWrapper<F>
234where
235    F: Fn(EventData) -> EventResult<()> + Send + Sync + 'static,
236{
237    fn handle(&self, event: EventData) -> EventResult<()> {
238        (self.handler)(event)
239    }
240
241    fn event_types(&self) -> Vec<EventTypeName> {
242        self.event_types.clone()
243    }
244}
245
246/// 事件过滤器 trait
247///
248/// 用于过滤事件,决定是否处理某个事件。
249pub trait EventFilter: Send + Sync {
250    /// 判断是否应该处理该事件
251    fn should_handle(&self, event: &EventData) -> bool;
252}
253
254/// 类型事件过滤器
255///
256/// 只处理指定类型的事件。
257pub struct TypeEventFilter {
258    allowed_types: Vec<EventTypeName>,
259}
260
261impl TypeEventFilter {
262    /// 创建新的类型过滤器
263    pub fn new(allowed_types: Vec<EventTypeName>) -> Self {
264        Self { allowed_types }
265    }
266}
267
268impl EventFilter for TypeEventFilter {
269    fn should_handle(&self, event: &EventData) -> bool {
270        self.allowed_types.contains(&event.event_type)
271    }
272}
273
274/// 元数据事件过滤器
275///
276/// 根据元数据过滤事件。
277pub struct MetadataEventFilter {
278    key: String,
279    value: String,
280}
281
282impl MetadataEventFilter {
283    /// 创建新的元数据过滤器
284    pub fn new(key: impl Into<String>, value: impl Into<String>) -> Self {
285        Self { key: key.into(), value: value.into() }
286    }
287}
288
289impl EventFilter for MetadataEventFilter {
290    fn should_handle(&self, event: &EventData) -> bool {
291        event.metadata.get(&self.key).map(|v| v == &self.value).unwrap_or(false)
292    }
293}
294
295/// 订阅句柄
296///
297/// 用于管理订阅生命周期。
298#[derive(Debug, Clone)]
299pub struct Subscription {
300    /// 订阅 ID
301    pub id: SubscriptionId,
302    /// 订阅的事件类型
303    pub event_types: Vec<EventTypeName>,
304    /// 订阅时间
305    pub created_at: u64,
306    /// 是否激活
307    pub active: bool,
308}
309
310impl Subscription {
311    /// 创建新的订阅
312    pub fn new(id: SubscriptionId, event_types: Vec<EventTypeName>) -> Self {
313        Self {
314            id,
315            event_types,
316            created_at: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64,
317            active: true,
318        }
319    }
320}
321
322/// 事件存储 trait
323///
324/// 定义事件持久化存储的接口。
325#[async_trait]
326pub trait EventStore: Send + Sync {
327    /// 追加事件到存储
328    async fn append(&self, event: &EventData) -> EventResult<()>;
329
330    /// 批量追加事件
331    async fn append_batch(&self, events: &[EventData]) -> EventResult<()>;
332
333    /// 获取指定类型的事件流
334    async fn get_events(&self, event_type: &str) -> EventResult<Vec<EventData>>;
335
336    /// 获取指定时间范围的事件
337    async fn get_events_by_time(&self, start: u64, end: u64) -> EventResult<Vec<EventData>>;
338
339    /// 获取所有事件
340    async fn get_all_events(&self) -> EventResult<Vec<EventData>>;
341
342    /// 获取事件数量
343    async fn count(&self) -> EventResult<u64>;
344
345    /// 清空所有事件
346    async fn clear(&self) -> EventResult<()>;
347}
348
349/// 内存事件存储
350///
351/// 基于内存的事件存储实现,用于开发和测试。
352pub struct InMemoryEventStore {
353    events: RwLock<Vec<EventData>>,
354}
355
356impl InMemoryEventStore {
357    /// 创建新的内存事件存储
358    pub fn new() -> Self {
359        Self { events: RwLock::new(Vec::new()) }
360    }
361
362    /// 创建带初始容量的内存事件存储
363    pub fn with_capacity(capacity: usize) -> Self {
364        Self { events: RwLock::new(Vec::with_capacity(capacity)) }
365    }
366}
367
368impl Default for InMemoryEventStore {
369    fn default() -> Self {
370        Self::new()
371    }
372}
373
374#[async_trait]
375impl EventStore for InMemoryEventStore {
376    async fn append(&self, event: &EventData) -> EventResult<()> {
377        let mut events = self.events.write();
378        events.push(event.clone());
379        debug!("Event appended: {} [{}]", event.id, event.event_type);
380        Ok(())
381    }
382
383    async fn append_batch(&self, new_events: &[EventData]) -> EventResult<()> {
384        let mut events = self.events.write();
385        events.extend(new_events.iter().cloned());
386        debug!("Batch appended: {} events", new_events.len());
387        Ok(())
388    }
389
390    async fn get_events(&self, event_type: &str) -> EventResult<Vec<EventData>> {
391        let events = self.events.read();
392        let filtered: Vec<EventData> = events.iter().filter(|e| e.event_type == event_type).cloned().collect();
393        Ok(filtered)
394    }
395
396    async fn get_events_by_time(&self, start: u64, end: u64) -> EventResult<Vec<EventData>> {
397        let events = self.events.read();
398        let filtered: Vec<EventData> = events.iter().filter(|e| e.timestamp >= start && e.timestamp <= end).cloned().collect();
399        Ok(filtered)
400    }
401
402    async fn get_all_events(&self) -> EventResult<Vec<EventData>> {
403        let events = self.events.read();
404        Ok(events.clone())
405    }
406
407    async fn count(&self) -> EventResult<u64> {
408        let events = self.events.read();
409        Ok(events.len() as u64)
410    }
411
412    async fn clear(&self) -> EventResult<()> {
413        let mut events = self.events.write();
414        events.clear();
415        info!("Event store cleared");
416        Ok(())
417    }
418}
419
420/// 事件总线配置
421#[derive(Debug, Clone)]
422pub struct EventBusConfig {
423    /// 事件队列容量
424    pub queue_capacity: usize,
425    /// 处理器超时时间
426    pub handler_timeout: Duration,
427    /// 是否启用事件存储
428    pub enable_store: bool,
429    /// 最大重试次数
430    pub max_retries: u32,
431    /// 重试间隔
432    pub retry_interval: Duration,
433}
434
435impl Default for EventBusConfig {
436    fn default() -> Self {
437        Self {
438            queue_capacity: 1000,
439            handler_timeout: Duration::from_secs(30),
440            enable_store: false,
441            max_retries: 3,
442            retry_interval: Duration::from_millis(100),
443        }
444    }
445}
446
447/// 内部订阅信息
448struct SubscriptionInfo {
449    subscription: Subscription,
450    handler: Arc<dyn EventHandler>,
451    filter: Option<Arc<dyn EventFilter>>,
452}
453
454/// 事件总线
455///
456/// 事件驱动的核心组件,负责事件的发布和分发。
457pub struct EventBus {
458    config: EventBusConfig,
459    store: Option<Arc<dyn EventStore>>,
460    subscriptions: Arc<RwLock<HashMap<SubscriptionId, SubscriptionInfo>>>,
461    event_sender: mpsc::Sender<EventData>,
462    shutdown_sender: RwLock<Option<mpsc::Sender<()>>>,
463}
464
465impl EventBus {
466    /// 创建新的事件总线
467    pub fn new(config: EventBusConfig) -> Self {
468        Self::with_store(config, None)
469    }
470
471    /// 创建带事件存储的事件总线
472    pub fn with_store(config: EventBusConfig, store: Option<Arc<dyn EventStore>>) -> Self {
473        let (event_sender, event_receiver) = mpsc::channel::<EventData>(config.queue_capacity);
474        let (shutdown_sender, shutdown_receiver) = mpsc::channel::<()>(1);
475
476        let subscriptions = Arc::new(RwLock::new(HashMap::new()));
477
478        let bus = Self {
479            config,
480            store,
481            subscriptions: subscriptions.clone(),
482            event_sender,
483            shutdown_sender: RwLock::new(Some(shutdown_sender)),
484        };
485
486        let store_clone = bus.store.clone();
487        let config_clone = bus.config.clone();
488
489        tokio::spawn(Self::dispatcher(event_receiver, shutdown_receiver, subscriptions, store_clone, config_clone));
490
491        bus
492    }
493
494    async fn dispatcher(
495        mut receiver: mpsc::Receiver<EventData>,
496        mut shutdown: mpsc::Receiver<()>,
497        subscriptions: Arc<RwLock<HashMap<SubscriptionId, SubscriptionInfo>>>,
498        store: Option<Arc<dyn EventStore>>,
499        config: EventBusConfig,
500    ) {
501        loop {
502            tokio::select! {
503                Some(event) = receiver.recv() => {
504                    if config.enable_store
505                        && let Some(ref event_store) = store
506                        && let Err(e) = event_store.append(&event).await
507                    {
508                        error!("Failed to store event: {}", e);
509                    }
510
511                    let subs = subscriptions.read();
512                    for (_, info) in subs.iter() {
513                        if !info.subscription.active {
514                            continue;
515                        }
516
517                        if !info.subscription.event_types.contains(&event.event_type) {
518                            continue;
519                        }
520
521                        if let Some(ref filter) = info.filter
522                            && !filter.should_handle(&event)
523                        {
524                            continue;
525                        }
526
527                        let event_clone = event.clone();
528                        let handler_clone = info.handler.clone();
529                        let event_type = event.event_type.clone();
530                        let sub_id = info.subscription.id.clone();
531                        let timeout = config.handler_timeout;
532                        let max_retries = config.max_retries;
533                        let retry_interval = config.retry_interval;
534
535                        tokio::spawn(async move {
536                            for attempt in 0..max_retries {
537                                match tokio::time::timeout(
538                                    timeout,
539                                    handler_clone.handle(event_clone.clone()),
540                                )
541                                .await
542                                {
543                                    Ok(Ok(())) => {
544                                        debug!(
545                                            "Event handled successfully: {} [subscription: {}]",
546                                            event_type, sub_id
547                                        );
548                                        return;
549                                    }
550                                    Ok(Err(e)) => {
551                                        warn!(
552                                            "Event handler failed (attempt {}/{}): {}",
553                                            attempt + 1,
554                                            max_retries,
555                                            e
556                                        );
557                                    }
558                                    Err(_) => {
559                                        warn!(
560                                            "Event handler timeout (attempt {}/{})",
561                                            attempt + 1,
562                                            max_retries
563                                        );
564                                    }
565                                }
566
567                                if attempt + 1 < max_retries {
568                                    tokio::time::sleep(retry_interval).await;
569                                }
570                            }
571                            error!(
572                                "Event handler failed after {} retries: {} [subscription: {}]",
573                                max_retries, event_type, sub_id
574                            );
575                        });
576                    }
577                }
578                _ = shutdown.recv() => {
579                    info!("Event bus dispatcher shutting down");
580                    break;
581                }
582                else => break,
583            }
584        }
585    }
586
587    /// 订阅事件
588    ///
589    /// 注册一个事件处理器,订阅指定类型的事件。
590    #[instrument(skip(self, handler))]
591    pub fn subscribe<H: EventHandler + 'static>(
592        &self,
593        event_types: Vec<EventTypeName>,
594        handler: H,
595    ) -> EventResult<Subscription> {
596        self.subscribe_with_filter(event_types, handler, None::<TypeEventFilter>)
597    }
598
599    /// 订阅事件 (带过滤器)
600    ///
601    /// 注册一个事件处理器,订阅指定类型的事件,并应用过滤器。
602    #[instrument(skip(self, handler, filter))]
603    pub fn subscribe_with_filter<H: EventHandler + 'static, F: EventFilter + 'static>(
604        &self,
605        event_types: Vec<EventTypeName>,
606        handler: H,
607        filter: Option<F>,
608    ) -> EventResult<Subscription> {
609        let subscription_id = uuid::Uuid::new_v4().to_string();
610        let subscription = Subscription::new(subscription_id.clone(), event_types.clone());
611
612        let info = SubscriptionInfo {
613            subscription: subscription.clone(),
614            handler: Arc::new(handler),
615            filter: filter.map(|f| Arc::new(f) as Arc<dyn EventFilter>),
616        };
617
618        {
619            let mut subs = self.subscriptions.write();
620            subs.insert(subscription_id, info);
621        }
622
623        info!("Subscription created: {} for types: {:?}", subscription.id, event_types);
624        Ok(subscription)
625    }
626
627    /// 取消订阅
628    ///
629    /// 移除指定 ID 的订阅。
630    #[instrument(skip(self))]
631    pub fn unsubscribe(&self, subscription_id: &str) -> EventResult<bool> {
632        let mut subs = self.subscriptions.write();
633        if subs.remove(subscription_id).is_some() {
634            info!("Subscription removed: {}", subscription_id);
635            Ok(true)
636        }
637        else {
638            warn!("Subscription not found: {}", subscription_id);
639            Ok(false)
640        }
641    }
642
643    /// 发布事件
644    ///
645    /// 同步发布事件到事件总线。
646    #[instrument(skip(self, event))]
647    pub fn publish<E: Event + Serialize>(&self, event: &E) -> EventResult<()> {
648        let event_data = EventData::new(event)?;
649        self.publish_data(event_data)
650    }
651
652    /// 发布事件数据
653    ///
654    /// 发布已封装的事件数据。
655    #[instrument(skip(self, event_data))]
656    pub fn publish_data(&self, event_data: EventData) -> EventResult<()> {
657        match self.event_sender.blocking_send(event_data.clone()) {
658            Ok(()) => {
659                debug!("Event published: {} [{}]", event_data.id, event_data.event_type);
660                Ok(())
661            }
662            Err(_) => {
663                error!("Event bus channel closed");
664                Err(EventError::BusClosed)
665            }
666        }
667    }
668
669    /// 异步发布事件
670    ///
671    /// 异步发布事件到事件总线。
672    #[instrument(skip(self, event))]
673    pub async fn publish_async<E: Event + Serialize>(&self, event: &E) -> EventResult<()> {
674        let event_data = EventData::new(event)?;
675        self.publish_data_async(event_data).await
676    }
677
678    /// 异步发布事件数据
679    ///
680    /// 异步发布已封装的事件数据。
681    #[instrument(skip(self, event_data))]
682    pub async fn publish_data_async(&self, event_data: EventData) -> EventResult<()> {
683        match self.event_sender.send(event_data.clone()).await {
684            Ok(()) => {
685                debug!("Event published async: {} [{}]", event_data.id, event_data.event_type);
686                Ok(())
687            }
688            Err(_) => {
689                error!("Event bus channel closed");
690                Err(EventError::BusClosed)
691            }
692        }
693    }
694
695    /// 获取订阅数量
696    pub fn subscription_count(&self) -> usize {
697        self.subscriptions.read().len()
698    }
699
700    /// 获取指定事件类型的订阅数量
701    pub fn subscription_count_for_type(&self, event_type: &str) -> usize {
702        let subs = self.subscriptions.read();
703        subs.values().filter(|info| info.subscription.event_types.contains(&event_type.to_string())).count()
704    }
705
706    /// 关闭事件总线
707    pub async fn shutdown(&self) -> EventResult<()> {
708        let tx = {
709            let mut sender = self.shutdown_sender.write();
710            sender.take()
711        };
712        if let Some(tx) = tx {
713            let _ = tx.send(()).await;
714            info!("Event bus shutdown initiated");
715        }
716        Ok(())
717    }
718
719    /// 事件回放
720    ///
721    /// 从事件存储中回放事件到指定处理器。
722    pub async fn replay_events<H: EventHandler + 'static>(
723        &self,
724        store: &dyn EventStore,
725        handler: &H,
726        event_type: Option<&str>,
727    ) -> EventResult<u64> {
728        let events = match event_type {
729            Some(t) => store.get_events(t).await?,
730            None => store.get_all_events().await?,
731        };
732
733        let mut count = 0u64;
734        for event in events {
735            if handler.event_types().contains(&event.event_type) {
736                handler.handle(event).await?;
737                count += 1;
738            }
739        }
740
741        info!("Replayed {} events", count);
742        Ok(count)
743    }
744}
745
746/// 基础事件实现
747///
748/// 提供一个简单的事件基类,方便快速创建事件。
749#[derive(Debug, Clone, Serialize, Deserialize)]
750pub struct BaseEvent<T> {
751    /// 事件 ID
752    pub id: EventId,
753    /// 事件类型
754    pub event_type: EventTypeName,
755    /// 时间戳
756    pub timestamp: u64,
757    /// 事件载荷
758    pub payload: T,
759}
760
761impl<T> BaseEvent<T>
762where
763    T: Send + Sync + 'static,
764{
765    /// 创建新的基础事件
766    pub fn new(event_type: impl Into<String>, payload: T) -> Self {
767        Self {
768            id: uuid::Uuid::new_v4().to_string(),
769            event_type: event_type.into(),
770            timestamp: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64,
771            payload,
772        }
773    }
774}
775
776impl<T: Serialize + Send + Sync + 'static> Event for BaseEvent<T> {
777    fn event_type(&self) -> EventTypeName {
778        self.event_type.clone()
779    }
780
781    fn event_id(&self) -> &EventId {
782        &self.id
783    }
784
785    fn timestamp(&self) -> u64 {
786        self.timestamp
787    }
788}
789
790/// 便捷函数:创建内存事件存储
791pub fn memory_event_store() -> Arc<InMemoryEventStore> {
792    Arc::new(InMemoryEventStore::new())
793}
794
795/// 便捷函数:创建事件总线
796pub fn event_bus(config: EventBusConfig) -> Arc<EventBus> {
797    Arc::new(EventBus::new(config))
798}
799
800/// 便捷函数:创建带存储的事件总线
801pub fn event_bus_with_store(config: EventBusConfig, store: Arc<dyn EventStore>) -> Arc<EventBus> {
802    Arc::new(EventBus::with_store(config, Some(store)))
803}