sa_token_core/online.rs
1//! Online User Management and Real-time Push Module | 在线用户管理和实时推送模块
2//!
3//! # Code Flow Logic | 代码流程逻辑
4//!
5//! ## English
6//!
7//! ### Overview
8//! This module provides comprehensive online user management and real-time message
9//! push capabilities. It tracks user online status, manages connections, and
10//! delivers messages to users in real-time.
11//!
12//! ### Online User Management Flow
13//! ```text
14//! 1. User Connects (e.g., WebSocket, SSE)
15//! ↓
16//! 2. OnlineManager.mark_online(OnlineUser)
17//! ├─→ Store user info in online_users HashMap
18//! ├─→ Key: login_id
19//! └─→ Value: Vec<OnlineUser> (supports multiple devices)
20//! ↓
21//! 3. User Activity Updates
22//! OnlineManager.update_activity(login_id, token)
23//! └─→ Update last_activity timestamp
24//! ↓
25//! 4. User Disconnects
26//! OnlineManager.mark_offline(login_id, token)
27//! ├─→ Remove from online_users
28//! └─→ Clean up if no more sessions
29//! ```
30//!
31//! ### Message Push Flow
32//! ```text
33//! 1. Create PushMessage
34//! ├─→ Generate unique message_id
35//! ├─→ Set content and message_type
36//! └─→ Add timestamp and metadata
37//! ↓
38//! 2. Select Push Method
39//! ├─→ push_to_user(login_id, content) - Single user
40//! ├─→ push_to_users(vec![ids], content) - Multiple users
41//! └─→ broadcast(content) - All online users
42//! ↓
43//! 3. OnlineManager Dispatches to Pushers
44//! For each registered MessagePusher:
45//! └─→ pusher.push(login_id, message)
46//! ↓
47//! 4. Pusher Delivers Message
48//! ├─→ InMemoryPusher: Store in memory
49//! ├─→ WebSocketPusher: Send via WS
50//! └─→ Custom: Your implementation
51//! ```
52//!
53//! ### Kick-Out Flow
54//! ```text
55//! 1. Trigger kick_out_notify(login_id, reason)
56//! ↓
57//! 2. Create KickOut message
58//! └─→ message_type: MessageType::KickOut
59//! ↓
60//! 3. Push notification to user
61//! └─→ All registered pushers receive message
62//! ↓
63//! 4. Mark user offline
64//! └─→ mark_offline_all(login_id)
65//! ```
66//!
67//! ### Message Types
68//! - Text: Plain text messages
69//! - Binary: Binary data
70//! - KickOut: Force logout notification
71//! - Notification: System notifications
72//! - Custom: User-defined types
73//!
74//! ## 中文
75//!
76//! ### 概述
77//! 本模块提供全面的在线用户管理和实时消息推送功能。
78//! 它跟踪用户在线状态、管理连接,并实时向用户推送消息。
79//!
80//! ### 在线用户管理流程
81//! ```text
82//! 1. 用户连接(如 WebSocket、SSE)
83//! ↓
84//! 2. OnlineManager.mark_online(OnlineUser)
85//! ├─→ 在 online_users HashMap 中存储用户信息
86//! ├─→ 键: login_id
87//! └─→ 值: Vec<OnlineUser>(支持多设备)
88//! ↓
89//! 3. 用户活跃度更新
90//! OnlineManager.update_activity(login_id, token)
91//! └─→ 更新 last_activity 时间戳
92//! ↓
93//! 4. 用户断开连接
94//! OnlineManager.mark_offline(login_id, token)
95//! ├─→ 从 online_users 中移除
96//! └─→ 如果没有更多会话则清理
97//! ```
98//!
99//! ### 消息推送流程
100//! ```text
101//! 1. 创建 PushMessage
102//! ├─→ 生成唯一的 message_id
103//! ├─→ 设置 content 和 message_type
104//! └─→ 添加时间戳和元数据
105//! ↓
106//! 2. 选择推送方法
107//! ├─→ push_to_user(login_id, content) - 单个用户
108//! ├─→ push_to_users(vec![ids], content) - 多个用户
109//! └─→ broadcast(content) - 所有在线用户
110//! ↓
111//! 3. OnlineManager 分发给推送器
112//! 对每个已注册的 MessagePusher:
113//! └─→ pusher.push(login_id, message)
114//! ↓
115//! 4. 推送器传递消息
116//! ├─→ InMemoryPusher: 存储在内存
117//! ├─→ WebSocketPusher: 通过 WS 发送
118//! └─→ Custom: 你的实现
119//! ```
120//!
121//! ### 强制下线流程
122//! ```text
123//! 1. 触发 kick_out_notify(login_id, reason)
124//! ↓
125//! 2. 创建 KickOut 消息
126//! └─→ message_type: MessageType::KickOut
127//! ↓
128//! 3. 推送通知给用户
129//! └─→ 所有已注册的推送器接收消息
130//! ↓
131//! 4. 标记用户离线
132//! └─→ mark_offline_all(login_id)
133//! ```
134//!
135//! ### 消息类型
136//! - Text: 纯文本消息
137//! - Binary: 二进制数据
138//! - KickOut: 强制登出通知
139//! - Notification: 系统通知
140//! - Custom: 用户自定义类型
141
142use crate::error::SaTokenError;
143use async_trait::async_trait;
144use std::collections::HashMap;
145use std::sync::Arc;
146use tokio::sync::RwLock;
147use chrono::{DateTime, Utc};
148
149/// Online user information
150/// 在线用户信息
151///
152/// Represents an active user connection with device and activity tracking
153/// 表示具有设备和活动跟踪的活跃用户连接
154#[derive(Debug, Clone)]
155pub struct OnlineUser {
156 /// User login ID | 用户登录 ID
157 pub login_id: String,
158
159 /// Authentication token | 认证 Token
160 pub token: String,
161
162 /// Device identifier (e.g., "web", "mobile", "ios", "android") | 设备标识
163 pub device: String,
164
165 /// Connection establishment time | 连接建立时间
166 pub connect_time: DateTime<Utc>,
167
168 /// Last activity timestamp | 最后活跃时间戳
169 pub last_activity: DateTime<Utc>,
170
171 /// Custom metadata for this connection | 该连接的自定义元数据
172 pub metadata: HashMap<String, String>,
173}
174
175/// Push message structure
176/// 推送消息结构
177///
178/// Represents a message to be delivered to online users
179/// 表示要传递给在线用户的消息
180#[derive(Debug, Clone)]
181pub struct PushMessage {
182 /// Unique message identifier | 唯一消息标识符
183 pub message_id: String,
184
185 /// Message content | 消息内容
186 pub content: String,
187
188 /// Message type | 消息类型
189 pub message_type: MessageType,
190
191 /// Message timestamp | 消息时间戳
192 pub timestamp: DateTime<Utc>,
193
194 /// Additional metadata | 额外元数据
195 pub metadata: HashMap<String, String>,
196}
197
198/// Message type enumeration
199/// 消息类型枚举
200///
201/// Defines different types of messages that can be sent
202/// 定义可以发送的不同类型的消息
203#[derive(Debug, Clone, PartialEq)]
204pub enum MessageType {
205 /// Plain text message | 纯文本消息
206 Text,
207
208 /// Binary data message | 二进制数据消息
209 Binary,
210
211 /// Force logout notification | 强制登出通知
212 KickOut,
213
214 /// System notification | 系统通知
215 Notification,
216
217 /// Custom message type | 自定义消息类型
218 Custom(String),
219}
220
221/// Message pusher trait
222/// 消息推送器 trait
223///
224/// Implement this trait to create custom message delivery mechanisms
225/// 实现此 trait 以创建自定义消息传递机制
226#[async_trait]
227pub trait MessagePusher: Send + Sync {
228 /// Push a message to a specific user
229 /// 向特定用户推送消息
230 ///
231 /// # Arguments | 参数
232 /// * `login_id` - User login ID | 用户登录 ID
233 /// * `message` - Message to push | 要推送的消息
234 async fn push(&self, login_id: &str, message: PushMessage) -> Result<(), SaTokenError>;
235}
236
237/// Online user manager
238/// 在线用户管理器
239///
240/// Manages online users and handles real-time message pushing
241/// 管理在线用户并处理实时消息推送
242pub struct OnlineManager {
243 /// Online users map: login_id -> Vec<OnlineUser>
244 /// 在线用户映射: login_id -> Vec<OnlineUser>
245 /// Supports multiple devices per user
246 /// 支持每个用户多设备
247 online_users: Arc<RwLock<HashMap<String, Vec<OnlineUser>>>>,
248
249 /// Registered message pushers | 已注册的消息推送器
250 pushers: Arc<RwLock<Vec<Arc<dyn MessagePusher>>>>,
251}
252
253impl OnlineManager {
254 /// Create a new online manager
255 /// 创建新的在线管理器
256 ///
257 /// # Example | 示例
258 /// ```rust,ignore
259 /// let manager = OnlineManager::new();
260 /// ```
261 pub fn new() -> Self {
262 Self {
263 online_users: Arc::new(RwLock::new(HashMap::new())),
264 pushers: Arc::new(RwLock::new(Vec::new())),
265 }
266 }
267
268 /// Register a message pusher
269 /// 注册消息推送器
270 ///
271 /// # Arguments | 参数
272 /// * `pusher` - Message pusher implementation | 消息推送器实现
273 ///
274 /// # Example | 示例
275 /// ```rust,ignore
276 /// let pusher = Arc::new(MyCustomPusher::new());
277 /// manager.register_pusher(pusher).await;
278 /// ```
279 pub async fn register_pusher(&self, pusher: Arc<dyn MessagePusher>) {
280 let mut pushers = self.pushers.write().await;
281 pushers.push(pusher);
282 }
283
284 /// Mark a user as online
285 /// 标记用户上线
286 ///
287 /// # Arguments | 参数
288 /// * `user` - Online user information | 在线用户信息
289 ///
290 /// # Example | 示例
291 /// ```rust,ignore
292 /// let user = OnlineUser {
293 /// login_id: "user123".to_string(),
294 /// token: "token123".to_string(),
295 /// device: "web".to_string(),
296 /// connect_time: Utc::now(),
297 /// last_activity: Utc::now(),
298 /// metadata: HashMap::new(),
299 /// };
300 /// manager.mark_online(user).await;
301 /// ```
302 pub async fn mark_online(&self, user: OnlineUser) {
303 let mut users = self.online_users.write().await;
304 users.entry(user.login_id.clone())
305 .or_insert_with(Vec::new)
306 .push(user);
307 }
308
309 /// Mark a specific user session as offline
310 /// 标记特定用户会话离线
311 ///
312 /// # Arguments | 参数
313 /// * `login_id` - User login ID | 用户登录 ID
314 /// * `token` - Session token to remove | 要移除的会话 Token
315 ///
316 /// # Example | 示例
317 /// ```rust,ignore
318 /// manager.mark_offline("user123", "token123").await;
319 /// ```
320 pub async fn mark_offline(&self, login_id: &str, token: &str) {
321 let mut users = self.online_users.write().await;
322 if let Some(user_sessions) = users.get_mut(login_id) {
323 user_sessions.retain(|u| u.token != token);
324 if user_sessions.is_empty() {
325 users.remove(login_id);
326 }
327 }
328 }
329
330 /// Mark all sessions of a user as offline
331 /// 标记用户的所有会话离线
332 ///
333 /// # Arguments | 参数
334 /// * `login_id` - User login ID | 用户登录 ID
335 ///
336 /// # Example | 示例
337 /// ```rust,ignore
338 /// manager.mark_offline_all("user123").await;
339 /// ```
340 pub async fn mark_offline_all(&self, login_id: &str) {
341 let mut users = self.online_users.write().await;
342 users.remove(login_id);
343 }
344
345 /// Check if a user is online
346 /// 检查用户是否在线
347 ///
348 /// # Arguments | 参数
349 /// * `login_id` - User login ID | 用户登录 ID
350 ///
351 /// # Returns | 返回值
352 /// * `true` - User is online | 用户在线
353 /// * `false` - User is offline | 用户离线
354 ///
355 /// # Example | 示例
356 /// ```rust,ignore
357 /// if manager.is_online("user123").await {
358 /// println!("User is online");
359 /// }
360 /// ```
361 pub async fn is_online(&self, login_id: &str) -> bool {
362 let users = self.online_users.read().await;
363 users.contains_key(login_id)
364 }
365
366 /// Get online user count
367 /// 获取在线用户数量
368 ///
369 /// # Returns | 返回值
370 /// Number of online users | 在线用户数量
371 ///
372 /// # Example | 示例
373 /// ```rust,ignore
374 /// let count = manager.get_online_count().await;
375 /// println!("{} users online", count);
376 /// ```
377 pub async fn get_online_count(&self) -> usize {
378 let users = self.online_users.read().await;
379 users.len()
380 }
381
382 /// Get list of online user IDs
383 /// 获取在线用户 ID 列表
384 ///
385 /// # Returns | 返回值
386 /// Vector of login IDs | 登录 ID 向量
387 ///
388 /// # Example | 示例
389 /// ```rust,ignore
390 /// let users = manager.get_online_users().await;
391 /// for user_id in users {
392 /// println!("User {} is online", user_id);
393 /// }
394 /// ```
395 pub async fn get_online_users(&self) -> Vec<String> {
396 let users = self.online_users.read().await;
397 users.keys().cloned().collect()
398 }
399
400 /// Get all sessions for a specific user
401 /// 获取特定用户的所有会话
402 ///
403 /// # Arguments | 参数
404 /// * `login_id` - User login ID | 用户登录 ID
405 ///
406 /// # Returns | 返回值
407 /// Vector of online user sessions | 在线用户会话向量
408 ///
409 /// # Example | 示例
410 /// ```rust,ignore
411 /// let sessions = manager.get_user_sessions("user123").await;
412 /// println!("User has {} active sessions", sessions.len());
413 /// ```
414 pub async fn get_user_sessions(&self, login_id: &str) -> Vec<OnlineUser> {
415 let users = self.online_users.read().await;
416 users.get(login_id).cloned().unwrap_or_default()
417 }
418
419 /// Update user activity timestamp
420 /// 更新用户活跃时间戳
421 ///
422 /// # Arguments | 参数
423 /// * `login_id` - User login ID | 用户登录 ID
424 /// * `token` - Session token | 会话 Token
425 ///
426 /// # Example | 示例
427 /// ```rust,ignore
428 /// manager.update_activity("user123", "token123").await;
429 /// ```
430 pub async fn update_activity(&self, login_id: &str, token: &str) {
431 let mut users = self.online_users.write().await;
432 if let Some(user_sessions) = users.get_mut(login_id) {
433 for user in user_sessions.iter_mut() {
434 if user.token == token {
435 user.last_activity = Utc::now();
436 break;
437 }
438 }
439 }
440 }
441
442 /// Push a text message to a specific user
443 /// 向特定用户推送文本消息
444 ///
445 /// # Arguments | 参数
446 /// * `login_id` - User login ID | 用户登录 ID
447 /// * `content` - Message content | 消息内容
448 ///
449 /// # Example | 示例
450 /// ```rust,ignore
451 /// manager.push_to_user("user123", "Hello!".to_string()).await?;
452 /// ```
453 pub async fn push_to_user(&self, login_id: &str, content: String) -> Result<(), SaTokenError> {
454 let message = PushMessage {
455 message_id: uuid::Uuid::new_v4().to_string(),
456 content,
457 message_type: MessageType::Text,
458 timestamp: Utc::now(),
459 metadata: HashMap::new(),
460 };
461
462 let pushers = self.pushers.read().await;
463 for pusher in pushers.iter() {
464 pusher.push(login_id, message.clone()).await?;
465 }
466
467 Ok(())
468 }
469
470 /// Push a message to multiple users
471 /// 向多个用户推送消息
472 ///
473 /// # Arguments | 参数
474 /// * `login_ids` - List of user login IDs | 用户登录 ID 列表
475 /// * `content` - Message content | 消息内容
476 ///
477 /// # Example | 示例
478 /// ```rust,ignore
479 /// let users = vec!["user1".to_string(), "user2".to_string()];
480 /// manager.push_to_users(users, "Broadcast!".to_string()).await?;
481 /// ```
482 pub async fn push_to_users(&self, login_ids: Vec<String>, content: String) -> Result<(), SaTokenError> {
483 for login_id in login_ids {
484 self.push_to_user(&login_id, content.clone()).await?;
485 }
486 Ok(())
487 }
488
489 /// Broadcast a message to all online users
490 /// 向所有在线用户广播消息
491 ///
492 /// # Arguments | 参数
493 /// * `content` - Message content | 消息内容
494 ///
495 /// # Example | 示例
496 /// ```rust,ignore
497 /// manager.broadcast("System maintenance in 5 minutes".to_string()).await?;
498 /// ```
499 pub async fn broadcast(&self, content: String) -> Result<(), SaTokenError> {
500 let login_ids = self.get_online_users().await;
501 self.push_to_users(login_ids, content).await
502 }
503
504 /// Push a custom message to a user
505 /// 向用户推送自定义消息
506 ///
507 /// # Arguments | 参数
508 /// * `login_id` - User login ID | 用户登录 ID
509 /// * `message` - Custom push message | 自定义推送消息
510 ///
511 /// # Example | 示例
512 /// ```rust,ignore
513 /// let message = PushMessage {
514 /// message_id: uuid::Uuid::new_v4().to_string(),
515 /// content: "Custom content".to_string(),
516 /// message_type: MessageType::Custom("event".to_string()),
517 /// timestamp: Utc::now(),
518 /// metadata: HashMap::new(),
519 /// };
520 /// manager.push_message_to_user("user123", message).await?;
521 /// ```
522 pub async fn push_message_to_user(&self, login_id: &str, message: PushMessage) -> Result<(), SaTokenError> {
523 let pushers = self.pushers.read().await;
524 for pusher in pushers.iter() {
525 pusher.push(login_id, message.clone()).await?;
526 }
527 Ok(())
528 }
529
530 /// Kick out a user and send notification
531 /// 踢出用户并发送通知
532 ///
533 /// # Arguments | 参数
534 /// * `login_id` - User login ID | 用户登录 ID
535 /// * `reason` - Kick-out reason | 踢出原因
536 ///
537 /// # Example | 示例
538 /// ```rust,ignore
539 /// manager.kick_out_notify("user123", "Duplicate login detected".to_string()).await?;
540 /// ```
541 pub async fn kick_out_notify(&self, login_id: &str, reason: String) -> Result<(), SaTokenError> {
542 // Create kick-out message | 创建踢出消息
543 let message = PushMessage {
544 message_id: uuid::Uuid::new_v4().to_string(),
545 content: reason,
546 message_type: MessageType::KickOut,
547 timestamp: Utc::now(),
548 metadata: HashMap::new(),
549 };
550
551 // Push notification | 推送通知
552 self.push_message_to_user(login_id, message).await?;
553
554 // Mark user offline | 标记用户离线
555 self.mark_offline_all(login_id).await;
556 Ok(())
557 }
558}
559
560impl Default for OnlineManager {
561 fn default() -> Self {
562 Self::new()
563 }
564}
565
566/// In-memory message pusher implementation
567/// 内存消息推送器实现
568///
569/// Stores messages in memory for testing and development
570/// 在内存中存储消息用于测试和开发
571pub struct InMemoryPusher {
572 /// Messages storage: login_id -> Vec<PushMessage>
573 /// 消息存储: login_id -> Vec<PushMessage>
574 messages: Arc<RwLock<HashMap<String, Vec<PushMessage>>>>,
575}
576
577impl InMemoryPusher {
578 /// Create a new in-memory pusher
579 /// 创建新的内存推送器
580 pub fn new() -> Self {
581 Self {
582 messages: Arc::new(RwLock::new(HashMap::new())),
583 }
584 }
585
586 /// Get all messages for a user
587 /// 获取用户的所有消息
588 ///
589 /// # Arguments | 参数
590 /// * `login_id` - User login ID | 用户登录 ID
591 ///
592 /// # Returns | 返回值
593 /// Vector of push messages | 推送消息向量
594 pub async fn get_messages(&self, login_id: &str) -> Vec<PushMessage> {
595 let messages = self.messages.read().await;
596 messages.get(login_id).cloned().unwrap_or_default()
597 }
598
599 /// Clear all messages for a user
600 /// 清除用户的所有消息
601 ///
602 /// # Arguments | 参数
603 /// * `login_id` - User login ID | 用户登录 ID
604 pub async fn clear_messages(&self, login_id: &str) {
605 let mut messages = self.messages.write().await;
606 messages.remove(login_id);
607 }
608}
609
610impl Default for InMemoryPusher {
611 fn default() -> Self {
612 Self::new()
613 }
614}
615
616#[async_trait]
617impl MessagePusher for InMemoryPusher {
618 async fn push(&self, login_id: &str, message: PushMessage) -> Result<(), SaTokenError> {
619 let mut messages = self.messages.write().await;
620 messages.entry(login_id.to_string())
621 .or_insert_with(Vec::new)
622 .push(message);
623 Ok(())
624 }
625}
626
627#[cfg(test)]
628mod tests {
629 use super::*;
630
631 #[tokio::test]
632 async fn test_online_manager() {
633 let manager = OnlineManager::new();
634
635 let user = OnlineUser {
636 login_id: "user1".to_string(),
637 token: "token1".to_string(),
638 device: "web".to_string(),
639 connect_time: Utc::now(),
640 last_activity: Utc::now(),
641 metadata: HashMap::new(),
642 };
643
644 manager.mark_online(user).await;
645
646 assert!(manager.is_online("user1").await);
647 assert_eq!(manager.get_online_count().await, 1);
648 }
649
650 #[tokio::test]
651 async fn test_mark_offline() {
652 let manager = OnlineManager::new();
653
654 let user = OnlineUser {
655 login_id: "user2".to_string(),
656 token: "token2".to_string(),
657 device: "mobile".to_string(),
658 connect_time: Utc::now(),
659 last_activity: Utc::now(),
660 metadata: HashMap::new(),
661 };
662
663 manager.mark_online(user).await;
664 assert!(manager.is_online("user2").await);
665
666 manager.mark_offline("user2", "token2").await;
667 assert!(!manager.is_online("user2").await);
668 }
669
670 #[tokio::test]
671 async fn test_push_message() {
672 let manager = OnlineManager::new();
673 let pusher = Arc::new(InMemoryPusher::new());
674
675 manager.register_pusher(pusher.clone()).await;
676
677 let user = OnlineUser {
678 login_id: "user3".to_string(),
679 token: "token3".to_string(),
680 device: "web".to_string(),
681 connect_time: Utc::now(),
682 last_activity: Utc::now(),
683 metadata: HashMap::new(),
684 };
685
686 manager.mark_online(user).await;
687 manager.push_to_user("user3", "Hello".to_string()).await.unwrap();
688
689 let messages = pusher.get_messages("user3").await;
690 assert_eq!(messages.len(), 1);
691 assert_eq!(messages[0].content, "Hello");
692 }
693
694 #[tokio::test]
695 async fn test_broadcast() {
696 let manager = OnlineManager::new();
697 let pusher = Arc::new(InMemoryPusher::new());
698
699 manager.register_pusher(pusher.clone()).await;
700
701 for i in 1..=3 {
702 let user = OnlineUser {
703 login_id: format!("user{}", i),
704 token: format!("token{}", i),
705 device: "web".to_string(),
706 connect_time: Utc::now(),
707 last_activity: Utc::now(),
708 metadata: HashMap::new(),
709 };
710 manager.mark_online(user).await;
711 }
712
713 manager.broadcast("Broadcast message".to_string()).await.unwrap();
714
715 for i in 1..=3 {
716 let messages = pusher.get_messages(&format!("user{}", i)).await;
717 assert_eq!(messages.len(), 1);
718 }
719 }
720
721 #[tokio::test]
722 async fn test_kick_out_notify() {
723 let manager = OnlineManager::new();
724 let pusher = Arc::new(InMemoryPusher::new());
725
726 manager.register_pusher(pusher.clone()).await;
727
728 let user = OnlineUser {
729 login_id: "user4".to_string(),
730 token: "token4".to_string(),
731 device: "web".to_string(),
732 connect_time: Utc::now(),
733 last_activity: Utc::now(),
734 metadata: HashMap::new(),
735 };
736
737 manager.mark_online(user).await;
738 assert!(manager.is_online("user4").await);
739
740 manager.kick_out_notify("user4", "Kicked out".to_string()).await.unwrap();
741
742 assert!(!manager.is_online("user4").await);
743
744 let messages = pusher.get_messages("user4").await;
745 assert_eq!(messages.len(), 1);
746 assert_eq!(messages[0].message_type, MessageType::KickOut);
747 }
748}