sa_token_core/event/mod.rs
1// Author: 金书记
2//
3//! Event Listener Module | 事件监听模块
4//!
5//! Provides event listening capabilities for sa-token, supporting monitoring of login, logout, kick-out, and other operations.
6//!
7//! 提供 sa-token 的事件监听功能,支持监听登录、登出、踢出等操作。
8//!
9//! ## EventBus Code Flow Logic | EventBus 代码流程逻辑
10//!
11//! ### Overall Architecture | 整体架构
12//!
13//! ```text
14//! ┌─────────────────────────────────────────────────────────────┐
15//! │ SaTokenEventBus │
16//! │ ┌────────────────────────────────────────────────────┐ │
17//! │ │ listeners: Arc<RwLock<Vec<Arc<dyn SaTokenListener>>>> │
18//! │ │ - Stores all registered listeners │
19//! │ │ 存储所有注册的监听器 │
20//! │ │ - Uses RwLock for thread safety │
21//! │ │ 使用 RwLock 保证线程安全 │
22//! │ │ - Arc wrapping allows multi-thread sharing │
23//! │ │ Arc 包装允许多线程共享 │
24//! │ └────────────────────────────────────────────────────┘ │
25//! └─────────────────────────────────────────────────────────────┘
26//! ```
27//!
28//! ### Core Processes | 核心流程
29//!
30//! #### 1. Listener Registration Process | 监听器注册流程
31//!
32//! ```text
33//! ┌──────────┐ ┌──────────────┐ ┌─────────────┐
34//! │User Code │────▶│ register() │────▶│Acquire Write│
35//! │用户代码 │ │ │ │Lock 写锁获取│
36//! └──────────┘ │ - Receive │ │ │
37//! │ listener │ │ - Get lock │
38//! │ 接收监听器 │ │ 获取写锁 │
39//! │ - Arc wrap │ │ - Add to │
40//! │ Arc包装 │ │ list │
41//! └──────────────┘ │ 添加到列表 │
42//! │ - Release │
43//! │ 释放写锁 │
44//! └─────────────┘
45//!
46//! Steps | 步骤:
47//! 1. User creates custom listener instance
48//! 用户创建自定义监听器实例
49//! 2. Wrap listener with Arc::new()
50//! 使用 Arc::new() 包装监听器
51//! 3. Call event_bus.register(listener).await
52//! 调用 event_bus.register(listener).await
53//! 4. EventBus acquires write lock, adds listener to Vec
54//! EventBus 获取写锁,将监听器添加到 Vec 中
55//! 5. Registration complete, waiting for event triggers
56//! 监听器注册完成,等待事件触发
57//! ```
58//!
59//! #### 2. Event Publishing Process | 事件发布流程
60//!
61//! ```text
62//! ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
63//! │SaTokenManager│────▶│ publish() │────▶│Acquire Read │
64//! │(login) │ │ │ │Lock 读锁获取 │
65//! └──────────────┘ │ 1.Create │ │ │
66//! │ event │ │ 2.Iterate │
67//! │ 创建事件 │ │ listeners │
68//! │ 2.Call │ │ 遍历监听器 │
69//! │ publish │ │ 3.Invoke │
70//! │ 调用publish │ │ callbacks │
71//! └──────────────┘ │ 调用回调 │
72//! │ └──────────────┘
73//! ▼ │
74//! ┌──────────────┐ ▼
75//! │ SaTokenEvent │ ┌──────────────┐
76//! │ - event_type │ │ Listener 1 │
77//! │ - login_id │ │ on_login() │
78//! │ - token │ ├──────────────┤
79//! │ - timestamp │ │ Listener 2 │
80//! └──────────────┘ │ on_login() │
81//! ├──────────────┤
82//! │ Listener N │
83//! │ on_login() │
84//! └──────────────┘
85//!
86//! Steps | 步骤:
87//! 1. After business operation (e.g., login) completes, create corresponding event object
88//! 业务操作(如 login)完成后,创建对应的事件对象
89//! 2. Call event_bus.publish(event).await
90//! 调用 event_bus.publish(event).await
91//! 3. EventBus acquires read lock, accesses listener list
92//! EventBus 获取读锁,访问监听器列表
93//! 4. Call each listener's corresponding method in registration order
94//! 按注册顺序依次调用每个监听器的对应方法
95//! 5. After all listeners complete, event publishing process ends
96//! 所有监听器执行完成后,事件发布流程结束
97//! ```
98//!
99//! #### 3. Event Dispatching Process | 事件分发流程
100//!
101//! ```text
102//! ┌──────────────────┐
103//! │ publish(event) │
104//! └────────┬─────────┘
105//! │
106//! ▼
107//! ┌──────────────────────────────────────┐
108//! │ 1. Get all listeners (read lock) │
109//! │ 获取所有监听器(读锁) │
110//! └────────┬─────────────────────────────┘
111//! │
112//! ▼
113//! ┌──────────────────────────────────────┐
114//! │ 2. Iterate listener list │
115//! │ 遍历监听器列表 │
116//! └────────┬─────────────────────────────┘
117//! │
118//! ├──▶ on_event(event) ──▶ Generic event handler
119//! │ 通用事件处理
120//! │
121//! └──▶ Dispatch by event type | 根据事件类型分发:
122//! │
123//! ├─ Login ──────▶ on_login(...)
124//! ├─ Logout ─────▶ on_logout(...)
125//! ├─ KickOut ────▶ on_kick_out(...)
126//! ├─ RenewTimeout ▶ on_renew_timeout(...)
127//! ├─ Replaced ───▶ on_replaced(...)
128//! └─ Banned ─────▶ on_banned(...)
129//!
130//! Notes | 注意:
131//! - Listeners execute in registration order
132//! 监听器按注册顺序执行
133//! - Each listener executes asynchronously
134//! 每个监听器都是异步执行的
135//! - Errors in listeners don't interrupt event propagation
136//! 监听器中的错误不会中断事件传播
137//! ```
138//!
139//! ### Thread Safety Guarantees | 线程安全保证
140//!
141//! ```text
142//! Arc<RwLock<Vec<Arc<dyn SaTokenListener>>>>
143//! │ │ │ │
144//! │ │ │ └─ Listener trait object | 监听器 trait 对象
145//! │ │ └────── Listener collection | 监听器集合
146//! │ └──────────── Read-write lock protection | 读写锁保护
147//! └───────────────── Cross-thread sharing | 跨线程共享
148//!
149//! - Arc: Allows EventBus to be shared across multiple Manager instances
150//! 允许 EventBus 被多个 Manager 实例共享
151//! - RwLock: Allows multiple readers to publish events concurrently, writer has exclusive registration
152//! 允许多个读者同时发布事件,写者独占注册
153//! - Inner Arc: Listeners can be shared across multiple EventBus instances
154//! 监听器可以被多个 EventBus 共享
155//! ```
156//!
157//! ### Complete Call Chain Example | 完整调用链示例
158//!
159//! ```text
160//! User Code | 用户代码
161//! │
162//! └─▶ StpUtil::login("user_123")
163//! │
164//! └─▶ SaTokenManager::login(...)
165//! │
166//! ├─ 1. Generate token | 生成 token
167//! ├─ 2. Save to storage | 保存到存储
168//! └─ 3. Trigger event | 触发事件
169//! │
170//! └─▶ event_bus.publish(
171//! SaTokenEvent::login("user_123", "token_abc")
172//! )
173//! │
174//! ├─▶ LoggingListener::on_login()
175//! │ └─ Log to file | 记录日志
176//! │
177//! ├─▶ DatabaseListener::on_login()
178//! │ └─ Save to database | 保存到数据库
179//! │
180//! └─▶ StatisticsListener::on_login()
181//! └─ Update statistics | 更新统计信息
182//!
183//! After all listeners complete, login() returns token
184//! 所有监听器执行完成后,login() 方法返回 token
185//! ```
186//!
187//! ### Performance Considerations | 性能考虑
188//!
189//! 1. **Async Execution | 异步执行**: All listener methods are async, but execute sequentially
190//! 所有监听器方法都是异步的,但按顺序执行
191//! 2. **Read-Write Lock | 读写锁**: Multiple events can be published concurrently (read lock), registration requires exclusive access (write lock)
192//! 多个事件可以并发发布(读锁),注册需要独占(写锁)
193//! 3. **Zero-Copy | 零拷贝**: Event objects are passed by reference, avoiding unnecessary cloning
194//! 事件对象通过引用传递,避免不必要的克隆
195//! 4. **Error Isolation | 错误隔离**: Errors in one listener don't affect other listeners
196//! 单个监听器的错误不会影响其他监听器
197//!
198//! ### Error Handling | 错误处理
199//!
200//! ```text
201//! ┌────────────────┐
202//! │ Listener 1 │ ─▶ Success ✓ | 成功 ✓
203//! ├────────────────┤
204//! │ Listener 2 │ ─▶ Error ✗ (handled internally, doesn't affect others)
205//! │ │ 错误 ✗ (内部处理,不影响后续)
206//! ├────────────────┤
207//! │ Listener 3 │ ─▶ Success ✓ (still executes) | 成功 ✓ (仍然执行)
208//! └────────────────┘
209//!
210//! Recommendation | 建议:
211//! Listeners should catch all errors internally and handle them appropriately
212//! 监听器内部应捕获所有错误并适当处理
213//! ```
214//!
215//! ## Usage Example | 使用示例
216//!
217//! ```rust,ignore
218//! use sa_token_core::event::{SaTokenEvent, SaTokenListener, SaTokenEventBus};
219//!
220//! // Custom listener | 自定义监听器
221//! struct MyListener;
222//!
223//! #[async_trait]
224//! impl SaTokenListener for MyListener {
225//! async fn on_login(&self, login_id: &str, token: &str, login_type: &str) {
226//! println!("User {} logged in, token: {}", login_id, token);
227//! // 用户 {} 登录了,token: {}
228//! }
229//!
230//! async fn on_logout(&self, login_id: &str, token: &str, login_type: &str) {
231//! println!("User {} logged out", login_id);
232//! // 用户 {} 登出了
233//! }
234//! }
235//!
236//! // Register listener | 注册监听器
237//! let event_bus = SaTokenEventBus::new();
238//! event_bus.register(Arc::new(MyListener)).await;
239//! ```
240
241use async_trait::async_trait;
242use std::sync::Arc;
243use std::sync::RwLock;
244use chrono::{DateTime, Utc};
245use serde::{Serialize, Deserialize};
246
247/// 事件类型
248#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
249pub enum SaTokenEventType {
250 /// 登录事件
251 Login,
252 /// 登出事件
253 Logout,
254 /// 踢出下线事件
255 KickOut,
256 /// Token 续期事件
257 RenewTimeout,
258 /// 被顶下线事件(被其他设备登录)
259 Replaced,
260 /// 被封禁事件
261 Banned,
262 /// 开启二级认证
263 OpenSafe,
264 /// 关闭二级认证
265 CloseSafe,
266}
267
268/// 事件数据
269#[derive(Debug, Clone, Serialize, Deserialize)]
270pub struct SaTokenEvent {
271 /// 事件类型
272 pub event_type: SaTokenEventType,
273 /// 登录ID
274 pub login_id: String,
275 /// Token 值
276 pub token: String,
277 /// 登录类型(如 "default", "admin" 等)
278 pub login_type: String,
279 /// 事件发生时间
280 pub timestamp: DateTime<Utc>,
281 /// 额外数据(用于扩展)
282 pub extra: Option<serde_json::Value>,
283}
284
285impl SaTokenEvent {
286 /// 创建登录事件
287 pub fn login(login_id: impl Into<String>, token: impl Into<String>) -> Self {
288 Self {
289 event_type: SaTokenEventType::Login,
290 login_id: login_id.into(),
291 token: token.into(),
292 login_type: "default".to_string(),
293 timestamp: Utc::now(),
294 extra: None,
295 }
296 }
297
298 /// 创建登出事件
299 pub fn logout(login_id: impl Into<String>, token: impl Into<String>) -> Self {
300 Self {
301 event_type: SaTokenEventType::Logout,
302 login_id: login_id.into(),
303 token: token.into(),
304 login_type: "default".to_string(),
305 timestamp: Utc::now(),
306 extra: None,
307 }
308 }
309
310 /// 创建踢出下线事件
311 pub fn kick_out(login_id: impl Into<String>, token: impl Into<String>) -> Self {
312 Self {
313 event_type: SaTokenEventType::KickOut,
314 login_id: login_id.into(),
315 token: token.into(),
316 login_type: "default".to_string(),
317 timestamp: Utc::now(),
318 extra: None,
319 }
320 }
321
322 /// 创建 Token 续期事件
323 pub fn renew_timeout(login_id: impl Into<String>, token: impl Into<String>) -> Self {
324 Self {
325 event_type: SaTokenEventType::RenewTimeout,
326 login_id: login_id.into(),
327 token: token.into(),
328 login_type: "default".to_string(),
329 timestamp: Utc::now(),
330 extra: None,
331 }
332 }
333
334 /// 创建被顶下线事件
335 pub fn replaced(login_id: impl Into<String>, token: impl Into<String>) -> Self {
336 Self {
337 event_type: SaTokenEventType::Replaced,
338 login_id: login_id.into(),
339 token: token.into(),
340 login_type: "default".to_string(),
341 timestamp: Utc::now(),
342 extra: None,
343 }
344 }
345
346 /// 创建被封禁事件
347 pub fn banned(login_id: impl Into<String>) -> Self {
348 Self {
349 event_type: SaTokenEventType::Banned,
350 login_id: login_id.into(),
351 token: String::new(),
352 login_type: "default".to_string(),
353 timestamp: Utc::now(),
354 extra: None,
355 }
356 }
357
358 pub fn open_safe(token: impl Into<String>, service: impl Into<String>) -> Self {
359 Self {
360 event_type: SaTokenEventType::OpenSafe,
361 login_id: String::new(),
362 token: token.into(),
363 login_type: service.into(),
364 timestamp: Utc::now(),
365 extra: None,
366 }
367 }
368
369 pub fn close_safe(token: impl Into<String>, service: impl Into<String>) -> Self {
370 Self {
371 event_type: SaTokenEventType::CloseSafe,
372 login_id: String::new(),
373 token: token.into(),
374 login_type: service.into(),
375 timestamp: Utc::now(),
376 extra: None,
377 }
378 }
379
380 /// 设置登录类型
381 pub fn with_login_type(mut self, login_type: impl Into<String>) -> Self {
382 self.login_type = login_type.into();
383 self
384 }
385
386 /// 设置额外数据
387 pub fn with_extra(mut self, extra: serde_json::Value) -> Self {
388 self.extra = Some(extra);
389 self
390 }
391}
392
393/// 事件监听器 trait | Event Listener Trait
394///
395/// 实现此 trait 来自定义事件处理逻辑
396/// Implement this trait to customize event handling logic
397///
398/// # 使用示例 | Usage Example
399///
400/// ```rust,ignore
401/// use async_trait::async_trait;
402/// use sa_token_core::SaTokenListener;
403///
404/// struct MyListener;
405///
406/// #[async_trait]
407/// impl SaTokenListener for MyListener {
408/// async fn on_login(&self, login_id: &str, token: &str, login_type: &str) {
409/// // 自定义登录处理 | Custom login handling
410/// println!("User {} logged in", login_id);
411/// }
412/// }
413/// ```
414#[async_trait]
415pub trait SaTokenListener: Send + Sync {
416 /// 登录事件 | Login Event
417 ///
418 /// 当用户成功登录时触发 | Triggered when user successfully logs in
419 ///
420 /// # 参数 | Parameters
421 /// - `login_id`: 登录 ID | Login ID
422 /// - `token`: Token 值 | Token value
423 /// - `login_type`: 登录类型(如 "web", "websocket")| Login type (e.g., "web", "websocket")
424 async fn on_login(&self, login_id: &str, token: &str, login_type: &str) {
425 let _ = (login_id, token, login_type);
426 }
427
428 /// 登出事件 | Logout Event
429 ///
430 /// 当用户主动登出时触发 | Triggered when user actively logs out
431 ///
432 /// # 参数 | Parameters
433 /// - `login_id`: 登录 ID | Login ID
434 /// - `token`: Token 值 | Token value
435 /// - `login_type`: 登录类型 | Login type
436 async fn on_logout(&self, login_id: &str, token: &str, login_type: &str) {
437 let _ = (login_id, token, login_type);
438 }
439
440 /// 踢出下线事件 | Kick Out Event
441 ///
442 /// 当用户被强制踢出下线时触发 | Triggered when user is forcefully kicked out
443 ///
444 /// # 参数 | Parameters
445 /// - `login_id`: 登录 ID | Login ID
446 /// - `token`: Token 值 | Token value
447 /// - `login_type`: 登录类型 | Login type
448 async fn on_kick_out(&self, login_id: &str, token: &str, login_type: &str) {
449 let _ = (login_id, token, login_type);
450 }
451
452 /// Token 续期事件 | Token Renewal Event
453 ///
454 /// 当 Token 有效期被延长时触发 | Triggered when token validity is extended
455 ///
456 /// # 参数 | Parameters
457 /// - `login_id`: 登录 ID | Login ID
458 /// - `token`: Token 值 | Token value
459 /// - `login_type`: 登录类型 | Login type
460 async fn on_renew_timeout(&self, login_id: &str, token: &str, login_type: &str) {
461 let _ = (login_id, token, login_type);
462 }
463
464 /// 被顶下线事件 | Replaced Event
465 ///
466 /// 当用户在其他设备登录导致当前设备被顶下线时触发
467 /// Triggered when user logs in on another device and current device is replaced
468 ///
469 /// # 参数 | Parameters
470 /// - `login_id`: 登录 ID | Login ID
471 /// - `token`: Token 值 | Token value
472 /// - `login_type`: 登录类型 | Login type
473 async fn on_replaced(&self, login_id: &str, token: &str, login_type: &str) {
474 let _ = (login_id, token, login_type);
475 }
476
477 /// 被封禁事件 | Banned Event
478 ///
479 /// 当用户账号被封禁时触发 | Triggered when user account is banned
480 ///
481 /// # 参数 | Parameters
482 /// - `login_id`: 登录 ID | Login ID
483 /// - `login_type`: 登录类型 | Login type
484 async fn on_banned(&self, login_id: &str, login_type: &str) {
485 let _ = (login_id, login_type);
486 }
487
488 async fn on_open_safe(&self, token: &str, service: &str) {
489 let _ = (token, service);
490 }
491
492 async fn on_close_safe(&self, token: &str, service: &str) {
493 let _ = (token, service);
494 }
495
496 /// 通用事件处理(所有事件都会触发此方法)
497 /// Generic Event Handler (triggered by all events)
498 ///
499 /// # 参数 | Parameters
500 /// - `event`: 事件对象 | Event object
501 async fn on_event(&self, event: &SaTokenEvent) {
502 let _ = event;
503 }
504}
505
506/// 事件总线 - 管理所有监听器并分发事件
507#[derive(Clone)]
508pub struct SaTokenEventBus {
509 listeners: Arc<RwLock<Vec<Arc<dyn SaTokenListener>>>>,
510}
511
512impl SaTokenEventBus {
513 /// 创建新的事件总线
514 pub fn new() -> Self {
515 Self {
516 listeners: Arc::new(RwLock::new(Vec::new())),
517 }
518 }
519
520 /// 注册监听器
521 /// Register a listener
522 pub fn register(&self, listener: Arc<dyn SaTokenListener>) {
523 let mut listeners = self.listeners.write().unwrap();
524 listeners.push(listener);
525 }
526
527 /// 异步注册监听器(为了保持 API 兼容性)
528 /// Register a listener asynchronously (for API compatibility)
529 pub async fn register_async(&self, listener: Arc<dyn SaTokenListener>) {
530 self.register(listener);
531 }
532
533 /// 移除所有监听器
534 /// Clear all listeners
535 pub fn clear(&self) {
536 let mut listeners = self.listeners.write().unwrap();
537 listeners.clear();
538 }
539
540 /// 获取监听器数量
541 /// Get listener count
542 pub fn listener_count(&self) -> usize {
543 let listeners = self.listeners.read().unwrap();
544 listeners.len()
545 }
546
547 /// 发布事件
548 /// Publish an event to all listeners
549 pub async fn publish(&self, event: SaTokenEvent) {
550 // 克隆监听器列表以避免持有锁时异步等待
551 // Clone listener list to avoid holding lock during async operations
552 let listeners = {
553 let guard = self.listeners.read().unwrap();
554 guard.clone()
555 };
556
557 for listener in listeners.iter() {
558 // 触发通用事件处理
559 listener.on_event(&event).await;
560
561 // 根据事件类型触发特定处理
562 match event.event_type {
563 SaTokenEventType::Login => {
564 listener.on_login(&event.login_id, &event.token, &event.login_type).await;
565 }
566 SaTokenEventType::Logout => {
567 listener.on_logout(&event.login_id, &event.token, &event.login_type).await;
568 }
569 SaTokenEventType::KickOut => {
570 listener.on_kick_out(&event.login_id, &event.token, &event.login_type).await;
571 }
572 SaTokenEventType::RenewTimeout => {
573 listener.on_renew_timeout(&event.login_id, &event.token, &event.login_type).await;
574 }
575 SaTokenEventType::Replaced => {
576 listener.on_replaced(&event.login_id, &event.token, &event.login_type).await;
577 }
578 SaTokenEventType::Banned => {
579 listener.on_banned(&event.login_id, &event.login_type).await;
580 }
581 SaTokenEventType::OpenSafe => {
582 listener.on_open_safe(&event.token, &event.login_type).await;
583 }
584 SaTokenEventType::CloseSafe => {
585 listener.on_close_safe(&event.token, &event.login_type).await;
586 }
587 }
588 }
589 }
590}
591
592impl Default for SaTokenEventBus {
593 fn default() -> Self {
594 Self::new()
595 }
596}
597
598/// 简单的日志监听器示例
599pub struct LoggingListener;
600
601#[async_trait]
602impl SaTokenListener for LoggingListener {
603 async fn on_login(&self, login_id: &str, token: &str, login_type: &str) {
604 tracing::info!(
605 login_id = %login_id,
606 token = %token,
607 login_type = %login_type,
608 "用户登录"
609 );
610 }
611
612 async fn on_logout(&self, login_id: &str, token: &str, login_type: &str) {
613 tracing::info!(
614 login_id = %login_id,
615 token = %token,
616 login_type = %login_type,
617 "用户登出"
618 );
619 }
620
621 async fn on_kick_out(&self, login_id: &str, token: &str, login_type: &str) {
622 tracing::warn!(
623 login_id = %login_id,
624 token = %token,
625 login_type = %login_type,
626 "用户被踢出下线"
627 );
628 }
629
630 async fn on_renew_timeout(&self, login_id: &str, token: &str, login_type: &str) {
631 tracing::debug!(
632 login_id = %login_id,
633 token = %token,
634 login_type = %login_type,
635 "Token 续期"
636 );
637 }
638
639 async fn on_replaced(&self, login_id: &str, token: &str, login_type: &str) {
640 tracing::warn!(
641 login_id = %login_id,
642 token = %token,
643 login_type = %login_type,
644 "用户被顶下线"
645 );
646 }
647
648 async fn on_banned(&self, login_id: &str, login_type: &str) {
649 tracing::warn!(
650 login_id = %login_id,
651 login_type = %login_type,
652 "用户被封禁"
653 );
654 }
655}
656
657#[cfg(test)]
658mod tests {
659 use super::*;
660
661 struct TestListener {
662 login_count: Arc<RwLock<i32>>,
663 }
664
665 impl TestListener {
666 fn new() -> Self {
667 Self {
668 login_count: Arc::new(RwLock::new(0)),
669 }
670 }
671 }
672
673 #[async_trait]
674 impl SaTokenListener for TestListener {
675 async fn on_login(&self, _login_id: &str, _token: &str, _login_type: &str) {
676 let mut count = self.login_count.write().unwrap();
677 *count += 1;
678 }
679 }
680
681 #[tokio::test]
682 async fn test_event_bus() {
683 let bus = SaTokenEventBus::new();
684 let listener = Arc::new(TestListener::new());
685 let login_count = Arc::clone(&listener.login_count);
686
687 bus.register(listener);
688
689 // 发布登录事件
690 let event = SaTokenEvent::login("user_123", "token_abc");
691 bus.publish(event).await;
692
693 // 验证监听器被调用
694 let count = login_count.read().unwrap();
695 assert_eq!(*count, 1);
696 }
697
698 #[test]
699 fn test_event_creation() {
700 let event = SaTokenEvent::login("user_123", "token_abc");
701 assert_eq!(event.event_type, SaTokenEventType::Login);
702 assert_eq!(event.login_id, "user_123");
703 assert_eq!(event.token, "token_abc");
704 }
705}
706