sa_token_core/event/mod.rs
1// Author: 金书记
2//
3//! Event Listener Module | 事件监听模块
4//!
5//! Provides event listening capabilities for sa-token, supporting monitoring of login, logout, kick-out, and other operations.
6//!
7//! 提供 sa-token 的事件监听功能,支持监听登录、登出、踢出等操作。
8//!
9//! ## EventBus Code Flow Logic | EventBus 代码流程逻辑
10//!
11//! ### Overall Architecture | 整体架构
12//!
13//! ```text
14//! ┌─────────────────────────────────────────────────────────────┐
15//! │ SaTokenEventBus │
16//! │ ┌────────────────────────────────────────────────────┐ │
17//! │ │ listeners: Arc<RwLock<Vec<Arc<dyn SaTokenListener>>>> │
18//! │ │ - Stores all registered listeners │
19//! │ │ 存储所有注册的监听器 │
20//! │ │ - Uses RwLock for thread safety │
21//! │ │ 使用 RwLock 保证线程安全 │
22//! │ │ - Arc wrapping allows multi-thread sharing │
23//! │ │ Arc 包装允许多线程共享 │
24//! │ └────────────────────────────────────────────────────┘ │
25//! └─────────────────────────────────────────────────────────────┘
26//! ```
27//!
28//! ### Core Processes | 核心流程
29//!
30//! #### 1. Listener Registration Process | 监听器注册流程
31//!
32//! ```text
33//! ┌──────────┐ ┌──────────────┐ ┌─────────────┐
34//! │User Code │────▶│ register() │────▶│Acquire Write│
35//! │用户代码 │ │ │ │Lock 写锁获取│
36//! └──────────┘ │ - Receive │ │ │
37//! │ listener │ │ - Get lock │
38//! │ 接收监听器 │ │ 获取写锁 │
39//! │ - Arc wrap │ │ - Add to │
40//! │ Arc包装 │ │ list │
41//! └──────────────┘ │ 添加到列表 │
42//! │ - Release │
43//! │ 释放写锁 │
44//! └─────────────┘
45//!
46//! Steps | 步骤:
47//! 1. User creates custom listener instance
48//! 用户创建自定义监听器实例
49//! 2. Wrap listener with Arc::new()
50//! 使用 Arc::new() 包装监听器
51//! 3. Call event_bus.register(listener).await
52//! 调用 event_bus.register(listener).await
53//! 4. EventBus acquires write lock, adds listener to Vec
54//! EventBus 获取写锁,将监听器添加到 Vec 中
55//! 5. Registration complete, waiting for event triggers
56//! 监听器注册完成,等待事件触发
57//! ```
58//!
59//! #### 2. Event Publishing Process | 事件发布流程
60//!
61//! ```text
62//! ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
63//! │SaTokenManager│────▶│ publish() │────▶│Acquire Read │
64//! │(login) │ │ │ │Lock 读锁获取 │
65//! └──────────────┘ │ 1.Create │ │ │
66//! │ event │ │ 2.Iterate │
67//! │ 创建事件 │ │ listeners │
68//! │ 2.Call │ │ 遍历监听器 │
69//! │ publish │ │ 3.Invoke │
70//! │ 调用publish │ │ callbacks │
71//! └──────────────┘ │ 调用回调 │
72//! │ └──────────────┘
73//! ▼ │
74//! ┌──────────────┐ ▼
75//! │ SaTokenEvent │ ┌──────────────┐
76//! │ - event_type │ │ Listener 1 │
77//! │ - login_id │ │ on_login() │
78//! │ - token │ ├──────────────┤
79//! │ - timestamp │ │ Listener 2 │
80//! └──────────────┘ │ on_login() │
81//! ├──────────────┤
82//! │ Listener N │
83//! │ on_login() │
84//! └──────────────┘
85//!
86//! Steps | 步骤:
87//! 1. After business operation (e.g., login) completes, create corresponding event object
88//! 业务操作(如 login)完成后,创建对应的事件对象
89//! 2. Call event_bus.publish(event).await
90//! 调用 event_bus.publish(event).await
91//! 3. EventBus acquires read lock, accesses listener list
92//! EventBus 获取读锁,访问监听器列表
93//! 4. Call each listener's corresponding method in registration order
94//! 按注册顺序依次调用每个监听器的对应方法
95//! 5. After all listeners complete, event publishing process ends
96//! 所有监听器执行完成后,事件发布流程结束
97//! ```
98//!
99//! #### 3. Event Dispatching Process | 事件分发流程
100//!
101//! ```text
102//! ┌──────────────────┐
103//! │ publish(event) │
104//! └────────┬─────────┘
105//! │
106//! ▼
107//! ┌──────────────────────────────────────┐
108//! │ 1. Get all listeners (read lock) │
109//! │ 获取所有监听器(读锁) │
110//! └────────┬─────────────────────────────┘
111//! │
112//! ▼
113//! ┌──────────────────────────────────────┐
114//! │ 2. Iterate listener list │
115//! │ 遍历监听器列表 │
116//! └────────┬─────────────────────────────┘
117//! │
118//! ├──▶ on_event(event) ──▶ Generic event handler
119//! │ 通用事件处理
120//! │
121//! └──▶ Dispatch by event type | 根据事件类型分发:
122//! │
123//! ├─ Login ──────▶ on_login(...)
124//! ├─ Logout ─────▶ on_logout(...)
125//! ├─ KickOut ────▶ on_kick_out(...)
126//! ├─ RenewTimeout ▶ on_renew_timeout(...)
127//! ├─ Replaced ───▶ on_replaced(...)
128//! └─ Banned ─────▶ on_banned(...)
129//!
130//! Notes | 注意:
131//! - Listeners execute in registration order
132//! 监听器按注册顺序执行
133//! - Each listener executes asynchronously
134//! 每个监听器都是异步执行的
135//! - Errors in listeners don't interrupt event propagation
136//! 监听器中的错误不会中断事件传播
137//! ```
138//!
139//! ### Thread Safety Guarantees | 线程安全保证
140//!
141//! ```text
142//! Arc<RwLock<Vec<Arc<dyn SaTokenListener>>>>
143//! │ │ │ │
144//! │ │ │ └─ Listener trait object | 监听器 trait 对象
145//! │ │ └────── Listener collection | 监听器集合
146//! │ └──────────── Read-write lock protection | 读写锁保护
147//! └───────────────── Cross-thread sharing | 跨线程共享
148//!
149//! - Arc: Allows EventBus to be shared across multiple Manager instances
150//! 允许 EventBus 被多个 Manager 实例共享
151//! - RwLock: Allows multiple readers to publish events concurrently, writer has exclusive registration
152//! 允许多个读者同时发布事件,写者独占注册
153//! - Inner Arc: Listeners can be shared across multiple EventBus instances
154//! 监听器可以被多个 EventBus 共享
155//! ```
156//!
157//! ### Complete Call Chain Example | 完整调用链示例
158//!
159//! ```text
160//! User Code | 用户代码
161//! │
162//! └─▶ StpUtil::login("user_123")
163//! │
164//! └─▶ SaTokenManager::login(...)
165//! │
166//! ├─ 1. Generate token | 生成 token
167//! ├─ 2. Save to storage | 保存到存储
168//! └─ 3. Trigger event | 触发事件
169//! │
170//! └─▶ event_bus.publish(
171//! SaTokenEvent::login("user_123", "token_abc")
172//! )
173//! │
174//! ├─▶ LoggingListener::on_login()
175//! │ └─ Log to file | 记录日志
176//! │
177//! ├─▶ DatabaseListener::on_login()
178//! │ └─ Save to database | 保存到数据库
179//! │
180//! └─▶ StatisticsListener::on_login()
181//! └─ Update statistics | 更新统计信息
182//!
183//! After all listeners complete, login() returns token
184//! 所有监听器执行完成后,login() 方法返回 token
185//! ```
186//!
187//! ### Performance Considerations | 性能考虑
188//!
189//! 1. **Async Execution | 异步执行**: All listener methods are async, but execute sequentially
190//! 所有监听器方法都是异步的,但按顺序执行
191//! 2. **Read-Write Lock | 读写锁**: Multiple events can be published concurrently (read lock), registration requires exclusive access (write lock)
192//! 多个事件可以并发发布(读锁),注册需要独占(写锁)
193//! 3. **Zero-Copy | 零拷贝**: Event objects are passed by reference, avoiding unnecessary cloning
194//! 事件对象通过引用传递,避免不必要的克隆
195//! 4. **Error Isolation | 错误隔离**: Errors in one listener don't affect other listeners
196//! 单个监听器的错误不会影响其他监听器
197//!
198//! ### Error Handling | 错误处理
199//!
200//! ```text
201//! ┌────────────────┐
202//! │ Listener 1 │ ─▶ Success ✓ | 成功 ✓
203//! ├────────────────┤
204//! │ Listener 2 │ ─▶ Error ✗ (handled internally, doesn't affect others)
205//! │ │ 错误 ✗ (内部处理,不影响后续)
206//! ├────────────────┤
207//! │ Listener 3 │ ─▶ Success ✓ (still executes) | 成功 ✓ (仍然执行)
208//! └────────────────┘
209//!
210//! Recommendation | 建议:
211//! Listeners should catch all errors internally and handle them appropriately
212//! 监听器内部应捕获所有错误并适当处理
213//! ```
214//!
215//! ## Usage Example | 使用示例
216//!
217//! ```rust,ignore
218//! use sa_token_core::event::{SaTokenEvent, SaTokenListener, SaTokenEventBus};
219//!
220//! // Custom listener | 自定义监听器
221//! struct MyListener;
222//!
223//! #[async_trait]
224//! impl SaTokenListener for MyListener {
225//! async fn on_login(&self, login_id: &str, token: &str, login_type: &str) {
226//! println!("User {} logged in, token: {}", login_id, token);
227//! // 用户 {} 登录了,token: {}
228//! }
229//!
230//! async fn on_logout(&self, login_id: &str, token: &str, login_type: &str) {
231//! println!("User {} logged out", login_id);
232//! // 用户 {} 登出了
233//! }
234//! }
235//!
236//! // Register listener | 注册监听器
237//! let event_bus = SaTokenEventBus::new();
238//! event_bus.register(Arc::new(MyListener)).await;
239//! ```
240
241use async_trait::async_trait;
242use std::sync::Arc;
243use tokio::sync::RwLock;
244use chrono::{DateTime, Utc};
245use serde::{Serialize, Deserialize};
246
247/// 事件类型
248#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
249pub enum SaTokenEventType {
250 /// 登录事件
251 Login,
252 /// 登出事件
253 Logout,
254 /// 踢出下线事件
255 KickOut,
256 /// Token 续期事件
257 RenewTimeout,
258 /// 被顶下线事件(被其他设备登录)
259 Replaced,
260 /// 被封禁事件
261 Banned,
262}
263
264/// 事件数据
265#[derive(Debug, Clone, Serialize, Deserialize)]
266pub struct SaTokenEvent {
267 /// 事件类型
268 pub event_type: SaTokenEventType,
269 /// 登录ID
270 pub login_id: String,
271 /// Token 值
272 pub token: String,
273 /// 登录类型(如 "default", "admin" 等)
274 pub login_type: String,
275 /// 事件发生时间
276 pub timestamp: DateTime<Utc>,
277 /// 额外数据(用于扩展)
278 pub extra: Option<serde_json::Value>,
279}
280
281impl SaTokenEvent {
282 /// 创建登录事件
283 pub fn login(login_id: impl Into<String>, token: impl Into<String>) -> Self {
284 Self {
285 event_type: SaTokenEventType::Login,
286 login_id: login_id.into(),
287 token: token.into(),
288 login_type: "default".to_string(),
289 timestamp: Utc::now(),
290 extra: None,
291 }
292 }
293
294 /// 创建登出事件
295 pub fn logout(login_id: impl Into<String>, token: impl Into<String>) -> Self {
296 Self {
297 event_type: SaTokenEventType::Logout,
298 login_id: login_id.into(),
299 token: token.into(),
300 login_type: "default".to_string(),
301 timestamp: Utc::now(),
302 extra: None,
303 }
304 }
305
306 /// 创建踢出下线事件
307 pub fn kick_out(login_id: impl Into<String>, token: impl Into<String>) -> Self {
308 Self {
309 event_type: SaTokenEventType::KickOut,
310 login_id: login_id.into(),
311 token: token.into(),
312 login_type: "default".to_string(),
313 timestamp: Utc::now(),
314 extra: None,
315 }
316 }
317
318 /// 创建 Token 续期事件
319 pub fn renew_timeout(login_id: impl Into<String>, token: impl Into<String>) -> Self {
320 Self {
321 event_type: SaTokenEventType::RenewTimeout,
322 login_id: login_id.into(),
323 token: token.into(),
324 login_type: "default".to_string(),
325 timestamp: Utc::now(),
326 extra: None,
327 }
328 }
329
330 /// 创建被顶下线事件
331 pub fn replaced(login_id: impl Into<String>, token: impl Into<String>) -> Self {
332 Self {
333 event_type: SaTokenEventType::Replaced,
334 login_id: login_id.into(),
335 token: token.into(),
336 login_type: "default".to_string(),
337 timestamp: Utc::now(),
338 extra: None,
339 }
340 }
341
342 /// 创建被封禁事件
343 pub fn banned(login_id: impl Into<String>) -> Self {
344 Self {
345 event_type: SaTokenEventType::Banned,
346 login_id: login_id.into(),
347 token: String::new(),
348 login_type: "default".to_string(),
349 timestamp: Utc::now(),
350 extra: None,
351 }
352 }
353
354 /// 设置登录类型
355 pub fn with_login_type(mut self, login_type: impl Into<String>) -> Self {
356 self.login_type = login_type.into();
357 self
358 }
359
360 /// 设置额外数据
361 pub fn with_extra(mut self, extra: serde_json::Value) -> Self {
362 self.extra = Some(extra);
363 self
364 }
365}
366
367/// 事件监听器 trait
368///
369/// 实现此 trait 来自定义事件处理逻辑
370#[async_trait]
371pub trait SaTokenListener: Send + Sync {
372 /// 登录事件
373 async fn on_login(&self, login_id: &str, token: &str, login_type: &str) {
374 let _ = (login_id, token, login_type);
375 }
376
377 /// 登出事件
378 async fn on_logout(&self, login_id: &str, token: &str, login_type: &str) {
379 let _ = (login_id, token, login_type);
380 }
381
382 /// 踢出下线事件
383 async fn on_kick_out(&self, login_id: &str, token: &str, login_type: &str) {
384 let _ = (login_id, token, login_type);
385 }
386
387 /// Token 续期事件
388 async fn on_renew_timeout(&self, login_id: &str, token: &str, login_type: &str) {
389 let _ = (login_id, token, login_type);
390 }
391
392 /// 被顶下线事件
393 async fn on_replaced(&self, login_id: &str, token: &str, login_type: &str) {
394 let _ = (login_id, token, login_type);
395 }
396
397 /// 被封禁事件
398 async fn on_banned(&self, login_id: &str, login_type: &str) {
399 let _ = (login_id, login_type);
400 }
401
402 /// 通用事件处理(所有事件都会触发此方法)
403 async fn on_event(&self, event: &SaTokenEvent) {
404 let _ = event;
405 }
406}
407
408/// 事件总线 - 管理所有监听器并分发事件
409#[derive(Clone)]
410pub struct SaTokenEventBus {
411 listeners: Arc<RwLock<Vec<Arc<dyn SaTokenListener>>>>,
412}
413
414impl SaTokenEventBus {
415 /// 创建新的事件总线
416 pub fn new() -> Self {
417 Self {
418 listeners: Arc::new(RwLock::new(Vec::new())),
419 }
420 }
421
422 /// 注册监听器
423 pub async fn register(&self, listener: Arc<dyn SaTokenListener>) {
424 let mut listeners = self.listeners.write().await;
425 listeners.push(listener);
426 }
427
428 /// 移除所有监听器
429 pub async fn clear(&self) {
430 let mut listeners = self.listeners.write().await;
431 listeners.clear();
432 }
433
434 /// 获取监听器数量
435 pub async fn listener_count(&self) -> usize {
436 let listeners = self.listeners.read().await;
437 listeners.len()
438 }
439
440 /// 发布事件
441 pub async fn publish(&self, event: SaTokenEvent) {
442 let listeners = self.listeners.read().await;
443
444 for listener in listeners.iter() {
445 // 触发通用事件处理
446 listener.on_event(&event).await;
447
448 // 根据事件类型触发特定处理
449 match event.event_type {
450 SaTokenEventType::Login => {
451 listener.on_login(&event.login_id, &event.token, &event.login_type).await;
452 }
453 SaTokenEventType::Logout => {
454 listener.on_logout(&event.login_id, &event.token, &event.login_type).await;
455 }
456 SaTokenEventType::KickOut => {
457 listener.on_kick_out(&event.login_id, &event.token, &event.login_type).await;
458 }
459 SaTokenEventType::RenewTimeout => {
460 listener.on_renew_timeout(&event.login_id, &event.token, &event.login_type).await;
461 }
462 SaTokenEventType::Replaced => {
463 listener.on_replaced(&event.login_id, &event.token, &event.login_type).await;
464 }
465 SaTokenEventType::Banned => {
466 listener.on_banned(&event.login_id, &event.login_type).await;
467 }
468 }
469 }
470 }
471}
472
473impl Default for SaTokenEventBus {
474 fn default() -> Self {
475 Self::new()
476 }
477}
478
479/// 简单的日志监听器示例
480pub struct LoggingListener;
481
482#[async_trait]
483impl SaTokenListener for LoggingListener {
484 async fn on_login(&self, login_id: &str, token: &str, login_type: &str) {
485 tracing::info!(
486 login_id = %login_id,
487 token = %token,
488 login_type = %login_type,
489 "用户登录"
490 );
491 }
492
493 async fn on_logout(&self, login_id: &str, token: &str, login_type: &str) {
494 tracing::info!(
495 login_id = %login_id,
496 token = %token,
497 login_type = %login_type,
498 "用户登出"
499 );
500 }
501
502 async fn on_kick_out(&self, login_id: &str, token: &str, login_type: &str) {
503 tracing::warn!(
504 login_id = %login_id,
505 token = %token,
506 login_type = %login_type,
507 "用户被踢出下线"
508 );
509 }
510
511 async fn on_renew_timeout(&self, login_id: &str, token: &str, login_type: &str) {
512 tracing::debug!(
513 login_id = %login_id,
514 token = %token,
515 login_type = %login_type,
516 "Token 续期"
517 );
518 }
519
520 async fn on_replaced(&self, login_id: &str, token: &str, login_type: &str) {
521 tracing::warn!(
522 login_id = %login_id,
523 token = %token,
524 login_type = %login_type,
525 "用户被顶下线"
526 );
527 }
528
529 async fn on_banned(&self, login_id: &str, login_type: &str) {
530 tracing::warn!(
531 login_id = %login_id,
532 login_type = %login_type,
533 "用户被封禁"
534 );
535 }
536}
537
538#[cfg(test)]
539mod tests {
540 use super::*;
541
542 struct TestListener {
543 login_count: Arc<RwLock<i32>>,
544 }
545
546 impl TestListener {
547 fn new() -> Self {
548 Self {
549 login_count: Arc::new(RwLock::new(0)),
550 }
551 }
552 }
553
554 #[async_trait]
555 impl SaTokenListener for TestListener {
556 async fn on_login(&self, _login_id: &str, _token: &str, _login_type: &str) {
557 let mut count = self.login_count.write().await;
558 *count += 1;
559 }
560 }
561
562 #[tokio::test]
563 async fn test_event_bus() {
564 let bus = SaTokenEventBus::new();
565 let listener = Arc::new(TestListener::new());
566 let login_count = Arc::clone(&listener.login_count);
567
568 bus.register(listener).await;
569
570 // 发布登录事件
571 let event = SaTokenEvent::login("user_123", "token_abc");
572 bus.publish(event).await;
573
574 // 验证监听器被调用
575 let count = login_count.read().await;
576 assert_eq!(*count, 1);
577 }
578
579 #[test]
580 fn test_event_creation() {
581 let event = SaTokenEvent::login("user_123", "token_abc");
582 assert_eq!(event.event_type, SaTokenEventType::Login);
583 assert_eq!(event.login_id, "user_123");
584 assert_eq!(event.token, "token_abc");
585 }
586}
587