Skip to main content

sa_token_core/
distributed.rs

1//! Distributed Session Management Module | 分布式 Session 管理模块
2//!
3//! # Overview | 概述
4//!
5//! This module enables **distributed session management** for microservices architecture,
6//! allowing multiple services to share authentication sessions seamlessly.
7//! 本模块为微服务架构提供**分布式 Session 管理**,允许多个服务无缝共享认证会话。
8//!
9//! ## Architecture Context | 架构上下文
10//!
11//! ```text
12//! ┌────────────────────────────────────────────────────────────────────┐
13//! │                   Microservices Architecture                       │
14//! │                   微服务架构                                        │
15//! └────────────────────────────────────────────────────────────────────┘
16//!
17//!    ┌──────────────┐  ┌──────────────┐  ┌──────────────┐
18//!    │  Service A   │  │  Service B   │  │  Service C   │
19//!    │  (User API)  │  │  (Order API) │  │  (Pay API)   │
20//!    └──────┬───────┘  └──────┬───────┘  └──────┬───────┘
21//!           │                  │                  │
22//!           └──────────────────┼──────────────────┘
23//!                              │
24//!                    ┌─────────▼──────────┐
25//!                    │  Distributed       │
26//!                    │  Session Storage   │
27//!                    │  (Redis/Database)  │
28//!                    └────────────────────┘
29//!
30//! Each service can:
31//! 每个服务可以:
32//!   1. Create sessions for users
33//!      为用户创建会话
34//!   2. Access sessions created by other services
35//!      访问其他服务创建的会话
36//!   3. Share user authentication state
37//!      共享用户认证状态
38//! ```
39//!
40//! ## Key Use Cases | 关键使用场景
41//!
42//! ### 1. Single Sign-On (SSO) Across Services | 跨服务单点登录
43//!
44//! ```text
45//! Scenario: User logs in to Service A and accesses Service B
46//! 场景:用户登录服务 A 并访问服务 B
47//!
48//! 1. User → Service A: Login
49//!    用户 → 服务 A:登录
50//!    ├─ Service A creates session: session_id = "abc123"
51//!    │  服务 A 创建会话:session_id = "abc123"
52//!    └─ Saves to distributed storage
53//!       保存到分布式存储
54//!
55//! 2. User → Service B: Request with session_id = "abc123"
56//!    用户 → 服务 B:请求带 session_id = "abc123"
57//!    ├─ Service B retrieves session from storage
58//!    │  服务 B 从存储中获取会话
59//!    ├─ Validates user is authenticated
60//!    │  验证用户已认证
61//!    └─ Processes request ✅
62//!       处理请求 ✅
63//!
64//! No need to log in again! 无需再次登录!
65//! ```
66//!
67//! ### 2. Session Sharing for User Context | 会话共享用户上下文
68//!
69//! ```text
70//! Service A stores: { "user_role": "admin", "department": "IT" }
71//! 服务 A 存储:{ "user_role": "admin", "department": "IT" }
72//!
73//! Service B reads: Same session attributes available
74//! 服务 B 读取:相同的会话属性可用
75//!
76//! Service C updates: { "last_order": "order_123" }
77//! 服务 C 更新:{ "last_order": "order_123" }
78//!
79//! All services share the same session state!
80//! 所有服务共享相同的会话状态!
81//! ```
82//!
83//! ### 3. Multi-Device Session Management | 多设备会话管理
84//!
85//! ```text
86//! User: user_123
87//!   ├─ Session 1: Web (Service A)
88//!   │  会话 1:网页(服务 A)
89//!   ├─ Session 2: Mobile (Service B)
90//!   │  会话 2:移动端(服务 B)
91//!   └─ Session 3: Desktop (Service C)
92//!      会话 3:桌面端(服务 C)
93//!
94//! All sessions can be:
95//! 所有会话可以:
96//!   - Listed: get_sessions_by_login_id()
97//!   - Managed individually
98//!   - Terminated all at once: delete_all_sessions()
99//! ```
100//!
101//! ## Integration with Sa-Token | 与 Sa-Token 的集成
102//!
103//! ```text
104//! ┌─────────────────────────────────────────────────────────┐
105//! │               Sa-Token Core Flow                        │
106//! │               Sa-Token 核心流程                          │
107//! └─────────────────────────────────────────────────────────┘
108//!
109//! SaTokenManager::login()
110//!   ├─ 1. Generate token
111//!   │     生成 token
112//!   ├─ 2. Create TokenInfo
113//!   │     创建 TokenInfo
114//!   └─ 3. Create DistributedSession (if enabled)
115//!          创建 DistributedSession(如果启用)
116//!          ├─ session_id: UUID
117//!          ├─ login_id: user's login ID
118//!          ├─ token: access token
119//!          ├─ service_id: current service
120//!          └─ attributes: custom data
121//!
122//! StpUtil::get_session()
123//!   └─ Retrieves distributed session
124//!      获取分布式会话
125//!
126//! StpUtil::logout()
127//!   └─ Deletes distributed session(s)
128//!      删除分布式会话
129//! ```
130
131//!
132//! ## Workflow Diagrams | 工作流程图
133//!
134//! ### Complete Session Lifecycle | 完整会话生命周期
135//!
136//! ```text
137//! ┌──────────────────────────────────────────────────────────────────┐
138//! │                    Session Lifecycle                             │
139//! │                    会话生命周期                                   │
140//! └──────────────────────────────────────────────────────────────────┘
141//!
142//! User                Service A           Storage           Service B
143//! 用户                服务 A              存储              服务 B
144//!  │                     │                   │                   │
145//!  │  1. Login           │                   │                   │
146//!  │  登录               │                   │                   │
147//!  ├────────────────────▶│                   │                   │
148//!  │                     │  2. create_session()                  │
149//!  │                     │  创建会话          │                   │
150//!  │                     │  ├─ session_id: uuid                  │
151//!  │                     │  ├─ login_id: user_123                │
152//!  │                     │  ├─ token: access_token               │
153//!  │                     │  └─ service_id: service-a             │
154//!  │                     │                   │                   │
155//!  │                     │  3. save_session()│                   │
156//!  │                     │  保存会话          │                   │
157//!  │                     ├──────────────────▶│                   │
158//!  │                     │                   │  Store with TTL   │
159//!  │                     │                   │  存储并设置 TTL    │
160//!  │                     │                   │                   │
161//!  │  4. session_id      │                   │                   │
162//!  │  返回会话 ID        │                   │                   │
163//!  │◀────────────────────│                   │                   │
164//!  │                     │                   │                   │
165//!  │  5. Request to Service B with session_id                    │
166//!  │  带 session_id 请求服务 B                                    │
167//!  ├────────────────────────────────────────────────────────────▶│
168//!  │                     │                   │                   │
169//!  │                     │                   │  6. get_session() │
170//!  │                     │                   │  获取会话          │
171//!  │                     │                   │◀──────────────────│
172//!  │                     │                   │                   │
173//!  │                     │                   │  7. Return session│
174//!  │                     │                   │  返回会话数据      │
175//!  │                     │                   ├──────────────────▶│
176//!  │                     │                   │                   │
177//!  │                     │                   │  8. refresh_session()
178//!  │                     │                   │  刷新会话          │
179//!  │                     │                   │  (update last_access)
180//!  │                     │                   │◀──────────────────│
181//!  │                     │                   │                   │
182//!  │  9. Response        │                   │                   │
183//!  │  响应               │                   │                   │
184//!  │◀────────────────────────────────────────────────────────────│
185//!  │                     │                   │                   │
186//!  │  10. Logout         │                   │                   │
187//!  │  登出               │                   │                   │
188//!  ├────────────────────▶│                   │                   │
189//!  │                     │  11. delete_session()                 │
190//!  │                     │  删除会话          │                   │
191//!  │                     ├──────────────────▶│                   │
192//!  │                     │                   │  Remove from storage
193//!  │                     │                   │  从存储中移除      │
194//!  │                     │                   │                   │
195//!  │  12. Logout Success │                   │                   │
196//!  │  登出成功           │                   │                   │
197//!  │◀────────────────────│                   │                   │
198//!  │                     │                   │                   │
199//! ```
200//!
201//! ### Service Authentication Flow | 服务认证流程
202//!
203//! ```text
204//! ┌──────────────────────────────────────────────────────────────────┐
205//! │                Service Inter-Communication                       │
206//! │                服务间通信                                         │
207//! └──────────────────────────────────────────────────────────────────┘
208//!
209//! Service B          Service A (Session Manager)          Storage
210//! 服务 B             服务 A(会话管理器)                  存储
211//!   │                        │                               │
212//!   │  1. Register           │                               │
213//!   │  注册服务              │                               │
214//!   │  ├─ service_id         │                               │
215//!   │  ├─ service_name       │                               │
216//!   │  ├─ secret_key         │                               │
217//!   │  └─ permissions        │                               │
218//!   ├───────────────────────▶│                               │
219//!   │                        │  Store credentials            │
220//!   │                        │  存储凭证                      │
221//!   │                        │  (in memory)                  │
222//!   │                        │                               │
223//!   │  2. Registered ✅      │                               │
224//!   │◀───────────────────────│                               │
225//!   │                        │                               │
226//!   │  3. Access session     │                               │
227//!   │  访问会话              │                               │
228//!   │  ├─ service_id         │                               │
229//!   │  ├─ secret_key         │                               │
230//!   │  └─ session_id         │                               │
231//!   ├───────────────────────▶│                               │
232//!   │                        │  4. verify_service()          │
233//!   │                        │  验证服务                      │
234//!   │                        │  ├─ Lookup service            │
235//!   │                        │  └─ Compare secret_key        │
236//!   │                        │                               │
237//!   │                        │  5. get_session()             │
238//!   │                        │  获取会话                      │
239//!   │                        ├──────────────────────────────▶│
240//!   │                        │                               │
241//!   │                        │  6. Return session            │
242//!   │                        │  返回会话                      │
243//!   │                        │◀──────────────────────────────│
244//!   │                        │                               │
245//!   │  7. Session data ✅    │                               │
246//!   │◀───────────────────────│                               │
247//!   │                        │                               │
248//! ```
249//!
250//! ## Storage Backends | 存储后端
251//!
252//! The module is storage-agnostic. You can implement custom backends:
253//! 本模块与存储无关。您可以实现自定义后端:
254//!
255//! ### Redis Implementation (Recommended) | Redis 实现(推荐)
256//!
257//! ```rust,ignore
258//! use redis::AsyncCommands;
259//!
260//! pub struct RedisDistributedStorage {
261//!     client: redis::Client,
262//! }
263//!
264//! #[async_trait]
265//! impl DistributedSessionStorage for RedisDistributedStorage {
266//!     async fn save_session(&self, session: DistributedSession, ttl: Option<Duration>) 
267//!         -> Result<(), SaTokenError> 
268//!     {
269//!         let mut conn = self.client.get_async_connection().await?;
270//!         let key = format!("distributed:session:{}", session.session_id);
271//!         let value = serde_json::to_string(&session)?;
272//!         
273//!         if let Some(ttl) = ttl {
274//!             conn.set_ex(&key, value, ttl.as_secs() as usize).await?;
275//!         } else {
276//!             conn.set(&key, value).await?;
277//!         }
278//!         
279//!         // Index by login_id
280//!         let index_key = format!("distributed:login:{}", session.login_id);
281//!         conn.sadd(index_key, &session.session_id).await?;
282//!         
283//!         Ok(())
284//!     }
285//!     
286//!     // ... other methods
287//! }
288//! ```
289//!
290//! ### Database Implementation | 数据库实现
291//!
292//! ```rust,ignore
293//! use sqlx::PgPool;
294//!
295//! pub struct PostgresDistributedStorage {
296//!     pool: PgPool,
297//! }
298//!
299//! #[async_trait]
300//! impl DistributedSessionStorage for PostgresDistributedStorage {
301//!     async fn save_session(&self, session: DistributedSession, ttl: Option<Duration>) 
302//!         -> Result<(), SaTokenError> 
303//!     {
304//!         let expires_at = ttl.map(|t| Utc::now() + chrono::Duration::from_std(t).unwrap());
305//!         
306//!         sqlx::query!(
307//!             "INSERT INTO distributed_sessions 
308//!              (session_id, login_id, token, service_id, attributes, expires_at)
309//!              VALUES ($1, $2, $3, $4, $5, $6)
310//!              ON CONFLICT (session_id) DO UPDATE 
311//!              SET attributes = $5, last_access = NOW()",
312//!             session.session_id,
313//!             session.login_id,
314//!             session.token,
315//!             session.service_id,
316//!             serde_json::to_value(&session.attributes)?,
317//!             expires_at,
318//!         )
319//!         .execute(&self.pool)
320//!         .await?;
321//!         
322//!         Ok(())
323//!     }
324//!     
325//!     // ... other methods
326//! }
327//! ```
328//!
329//! ## Best Practices | 最佳实践
330//!
331//! ### 1. Service Registration | 服务注册
332//!
333//! ```rust,ignore
334//! // Initialize each service with unique credentials
335//! // 为每个服务初始化唯一凭证
336//! let credential = ServiceCredential {
337//!     service_id: "user-service".to_string(),
338//!     service_name: "User Management Service".to_string(),
339//!     secret_key: generate_secure_secret(), // Use crypto-secure generation
340//!     created_at: Utc::now(),
341//!     permissions: vec!["user.read".to_string(), "user.write".to_string()],
342//! };
343//! manager.register_service(credential).await;
344//! ```
345//!
346//! ### 2. Session Creation with Context | 带上下文的会话创建
347//!
348//! ```rust,ignore
349//! // Create session with user context
350//! // 创建带用户上下文的会话
351//! let session = manager.create_session(login_id, token).await?;
352//!
353//! // Add relevant attributes immediately
354//! // 立即添加相关属性
355//! manager.set_attribute(&session.session_id, "user_role".to_string(), "admin".to_string()).await?;
356//! manager.set_attribute(&session.session_id, "department".to_string(), "IT".to_string()).await?;
357//! manager.set_attribute(&session.session_id, "login_device".to_string(), "web".to_string()).await?;
358//! ```
359//!
360//! ### 3. Cross-Service Access Pattern | 跨服务访问模式
361//!
362//! ```rust,ignore
363//! // Service B accesses session created by Service A
364//! // 服务 B 访问服务 A 创建的会话
365//! 
366//! // 1. Verify service identity
367//! // 验证服务身份
368//! let service_cred = manager.verify_service("service-b", request.secret).await?;
369//!
370//! // 2. Check permissions
371//! // 检查权限
372//! if !service_cred.permissions.contains(&"session.read".to_string()) {
373//!     return Err(SaTokenError::PermissionDenied);
374//! }
375//!
376//! // 3. Access session
377//! // 访问会话
378//! let session = manager.get_session(&request.session_id).await?;
379//!
380//! // 4. Refresh to keep session alive
381//! // 刷新以保持会话活跃
382//! manager.refresh_session(&session.session_id).await?;
383//! ```
384//!
385//! ### 4. Multi-Device Logout | 多设备登出
386//!
387//! ```rust,ignore
388//! // Logout from all devices
389//! // 从所有设备登出
390//! manager.delete_all_sessions(&login_id).await?;
391//!
392//! // Or logout specific session
393//! // 或登出特定会话
394//! manager.delete_session(&session_id).await?;
395//! ```
396//!
397//! ### 5. Session Monitoring | 会话监控
398//!
399//! ```rust,ignore
400//! // Monitor user's active sessions
401//! // 监控用户的活跃会话
402//! let sessions = manager.get_sessions_by_login_id(&login_id).await?;
403//! 
404//! for session in sessions {
405//!     println!("Session: {} from service: {}, last active: {}", 
406//!         session.session_id,
407//!         session.service_id,
408//!         session.last_access
409//!     );
410//!     
411//!     // Check for suspicious activity
412//!     // 检查可疑活动
413//!     if is_suspicious(&session) {
414//!         manager.delete_session(&session.session_id).await?;
415//!     }
416//! }
417//! ```
418//!
419//! ## Security Considerations | 安全考虑
420//!
421//! ```text
422//! 1. ✅ Service Authentication | 服务认证
423//!    - Each service has unique secret_key
424//!    - Verify credentials before granting access
425//!    - Rotate keys periodically
426//!
427//! 2. ✅ Permission-Based Access | 基于权限的访问
428//!    - Services have explicit permissions
429//!    - Check permissions before operations
430//!    - Implement least-privilege principle
431//!
432//! 3. ✅ Session Timeout | 会话超时
433//!    - Configure appropriate TTL
434//!    - Auto-expire inactive sessions
435//!    - Refresh on active use
436//!
437//! 4. ✅ Data Encryption | 数据加密
438//!    - Encrypt sensitive session attributes
439//!    - Use TLS for inter-service communication
440//!    - Encrypt data at rest in storage
441//!
442//! 5. ✅ Audit Logging | 审计日志
443//!    - Log session creation/deletion
444//!    - Track cross-service access
445//!    - Monitor for anomalies
446//! ```
447
448use crate::error::SaTokenError;
449use async_trait::async_trait;
450use sa_token_adapter::storage::SaStorage;
451use serde::{Deserialize, Serialize};
452use std::collections::HashMap;
453use std::sync::Arc;
454use std::time::Duration;
455use chrono::{DateTime, Utc};
456use tokio::sync::RwLock;
457
458/// Distributed session data structure
459/// 分布式 Session 数据结构
460///
461/// Represents a session that can be shared across multiple services
462/// 表示可以在多个服务之间共享的 Session
463#[derive(Debug, Clone, Serialize, Deserialize)]
464pub struct DistributedSession {
465    /// Unique session identifier | 唯一 Session 标识符
466    pub session_id: String,
467    
468    /// User login ID | 用户登录 ID
469    pub login_id: String,
470    
471    /// Authentication token | 认证 Token
472    pub token: String,
473    
474    /// ID of the service that created this session | 创建此 Session 的服务 ID
475    pub service_id: String,
476    
477    /// Session creation time | Session 创建时间
478    pub create_time: DateTime<Utc>,
479    
480    /// Last access time | 最后访问时间
481    pub last_access: DateTime<Utc>,
482    
483    /// Session attributes (key-value pairs) | Session 属性(键值对)
484    pub attributes: HashMap<String, String>,
485}
486
487/// Service credential for inter-service authentication
488/// 服务间认证的服务凭证
489///
490/// Contains service identification and permission information
491/// 包含服务标识和权限信息
492#[derive(Debug, Clone, Serialize, Deserialize)]
493pub struct ServiceCredential {
494    /// Unique service identifier | 唯一服务标识符
495    pub service_id: String,
496    
497    /// Human-readable service name | 可读的服务名称
498    pub service_name: String,
499    
500    /// Service authentication secret key | 服务认证密钥
501    pub secret_key: String,
502    
503    /// Service registration time | 服务注册时间
504    pub created_at: DateTime<Utc>,
505    
506    /// List of permissions this service has | 该服务拥有的权限列表
507    pub permissions: Vec<String>,
508}
509
510/// Distributed session storage trait
511/// 分布式 Session 存储 trait
512///
513/// Implement this trait to provide custom storage backends
514/// 实现此 trait 以提供自定义存储后端
515#[async_trait]
516pub trait DistributedSessionStorage: Send + Sync {
517    /// Save a session to storage with optional TTL
518    /// 保存 Session 到存储,可选 TTL
519    ///
520    /// # Arguments | 参数
521    /// * `session` - Session to save | 要保存的 Session
522    /// * `ttl` - Time-to-live duration | 生存时间
523    async fn save_session(&self, session: DistributedSession, ttl: Option<Duration>) -> Result<(), SaTokenError>;
524    
525    /// Get a session from storage
526    /// 从存储获取 Session
527    ///
528    /// # Arguments | 参数
529    /// * `session_id` - Session identifier | Session 标识符
530    async fn get_session(&self, session_id: &str) -> Result<Option<DistributedSession>, SaTokenError>;
531    
532    /// Delete a session from storage
533    /// 从存储删除 Session
534    ///
535    /// # Arguments | 参数
536    /// * `session_id` - Session identifier | Session 标识符
537    async fn delete_session(&self, session_id: &str) -> Result<(), SaTokenError>;
538    
539    /// Get all sessions for a specific user
540    /// 获取特定用户的所有 Sessions
541    ///
542    /// # Arguments | 参数
543    /// * `login_id` - User login ID | 用户登录 ID
544    async fn get_sessions_by_login_id(&self, login_id: &str) -> Result<Vec<DistributedSession>, SaTokenError>;
545
546    /// 保存服务凭证 | Save a service credential
547    /// 用于把 register_service 的凭证持久化到存储
548    async fn save_credential(&self, credential: ServiceCredential) -> Result<(), SaTokenError>;
549
550    /// 按 service_id 获取服务凭证 | Get a service credential by service_id
551    /// 未找到返回 Ok(None)
552    async fn get_credential(&self, service_id: &str) -> Result<Option<ServiceCredential>, SaTokenError>;
553}
554
555/// Distributed session manager
556/// 分布式 Session 管理器
557///
558/// Manages distributed sessions and service authentication
559/// 管理分布式 Sessions 和服务认证
560pub struct DistributedSessionManager {
561    /// Session 存储后端
562    storage: Arc<dyn DistributedSessionStorage>,
563    /// 当前服务 ID
564    service_id: String,
565    /// 默认 Session 超时时间
566    session_timeout: Duration,
567}
568
569impl DistributedSessionManager {
570    /// Create a new distributed session manager
571    /// 创建新的分布式 Session 管理器
572    ///
573    /// # Arguments | 参数
574    /// * `storage` - Session storage implementation | Session 存储实现
575    /// * `service_id` - ID of this service | 此服务的 ID
576    /// * `session_timeout` - Default session timeout | 默认 Session 超时时间
577    ///
578    /// # Example | 示例
579    /// ```rust,ignore
580    /// let storage = Arc::new(MyDistributedStorage::new());
581    /// let manager = DistributedSessionManager::new(
582    ///     storage,
583    ///     "my-service".to_string(),
584    ///     Duration::from_secs(3600),
585    /// );
586    /// ```
587    pub fn new(
588        storage: Arc<dyn DistributedSessionStorage>,
589        service_id: String,
590        session_timeout: Duration,
591    ) -> Self {
592        Self {
593            storage,
594            service_id,
595            session_timeout,
596        }
597    }
598
599    /// 注册服务凭证(持久化到底层存储)
600    /// 返回 Result 以便调用方处理存储错误
601    pub async fn register_service(&self, credential: ServiceCredential) -> Result<(), SaTokenError> {
602        self.storage.save_credential(credential).await
603    }
604
605    /// Verify a service's credentials
606    /// 验证服务的凭证
607    ///
608    /// # Arguments | 参数
609    /// * `service_id` - Service identifier | 服务标识符
610    /// * `secret` - Service secret key | 服务密钥
611    ///
612    /// # Returns | 返回值
613    /// * `Ok(ServiceCredential)` - Service authenticated | 服务已认证
614    /// * `Err(PermissionDenied)` - Invalid credentials | 凭证无效
615    ///
616    /// # Example | 示例
617    /// ```rust,ignore
618    /// match manager.verify_service("api-gateway", "secret123").await {
619    ///     Ok(cred) => println!("Service {} verified", cred.service_name),
620    ///     Err(e) => println!("Verification failed: {}", e),
621    /// }
622    /// ```
623    /// 校验服务凭证
624    /// service_id 存在且 secret_key 匹配时返回凭证,否则返回 PermissionDenied
625    pub async fn verify_service(&self, service_id: &str, secret: &str) -> Result<ServiceCredential, SaTokenError> {
626        if let Some(cred) = self.storage.get_credential(service_id).await?
627            && cred.secret_key == secret
628        {
629            return Ok(cred);
630        }
631        Err(SaTokenError::PermissionDenied)
632    }
633
634    /// Create a new distributed session
635    /// 创建新的分布式 Session
636    ///
637    /// # Arguments | 参数
638    /// * `login_id` - User login ID | 用户登录 ID
639    /// * `token` - Authentication token | 认证 Token
640    ///
641    /// # Returns | 返回值
642    /// * `Ok(DistributedSession)` - Session created | Session 已创建
643    /// * `Err(SaTokenError)` - Creation failed | 创建失败
644    ///
645    /// # Example | 示例
646    /// ```rust,ignore
647    /// let session = manager.create_session(
648    ///     "user123".to_string(),
649    ///     "token456".to_string(),
650    /// ).await?;
651    /// println!("Session created: {}", session.session_id);
652    /// ```
653    pub async fn create_session(
654        &self,
655        login_id: String,
656        token: String,
657    ) -> Result<DistributedSession, SaTokenError> {
658        let session = DistributedSession {
659            session_id: uuid::Uuid::new_v4().to_string(),
660            login_id,
661            token,
662            service_id: self.service_id.clone(),
663            create_time: Utc::now(),
664            last_access: Utc::now(),
665            attributes: HashMap::new(),
666        };
667
668        self.storage.save_session(session.clone(), Some(self.session_timeout)).await?;
669        Ok(session)
670    }
671
672    /// Get a session by ID
673    /// 通过 ID 获取 Session
674    ///
675    /// # Arguments | 参数
676    /// * `session_id` - Session identifier | Session 标识符
677    ///
678    /// # Returns | 返回值
679    /// * `Ok(DistributedSession)` - Session found | 找到 Session
680    /// * `Err(SessionNotFound)` - Session not found | 未找到 Session
681    ///
682    /// # Example | 示例
683    /// ```rust,ignore
684    /// let session = manager.get_session("session-id-123").await?;
685    /// println!("User: {}", session.login_id);
686    /// ```
687    pub async fn get_session(&self, session_id: &str) -> Result<DistributedSession, SaTokenError> {
688        self.storage.get_session(session_id).await?
689            .ok_or(SaTokenError::SessionNotFound)
690    }
691
692    /// Update an existing session
693    /// 更新现有 Session
694    ///
695    /// # Arguments | 参数
696    /// * `session` - Updated session data | 更新后的 Session 数据
697    ///
698    /// # Example | 示例
699    /// ```rust,ignore
700    /// let mut session = manager.get_session("session-id").await?;
701    /// session.attributes.insert("role".to_string(), "admin".to_string());
702    /// manager.update_session(session).await?;
703    /// ```
704    pub async fn update_session(&self, session: DistributedSession) -> Result<(), SaTokenError> {
705        self.storage.save_session(session, Some(self.session_timeout)).await
706    }
707
708    /// Delete a session
709    /// 删除 Session
710    ///
711    /// # Arguments | 参数
712    /// * `session_id` - Session identifier | Session 标识符
713    ///
714    /// # Example | 示例
715    /// ```rust,ignore
716    /// manager.delete_session("session-id-123").await?;
717    /// ```
718    pub async fn delete_session(&self, session_id: &str) -> Result<(), SaTokenError> {
719        self.storage.delete_session(session_id).await
720    }
721
722    /// Refresh a session (update last access time)
723    /// 刷新 Session(更新最后访问时间)
724    ///
725    /// # Arguments | 参数
726    /// * `session_id` - Session identifier | Session 标识符
727    ///
728    /// # Example | 示例
729    /// ```rust,ignore
730    /// manager.refresh_session("session-id-123").await?;
731    /// ```
732    pub async fn refresh_session(&self, session_id: &str) -> Result<(), SaTokenError> {
733        let mut session = self.get_session(session_id).await?;
734        session.last_access = Utc::now();
735        self.update_session(session).await
736    }
737
738    /// Set a session attribute
739    /// 设置 Session 属性
740    ///
741    /// # Arguments | 参数
742    /// * `session_id` - Session identifier | Session 标识符
743    /// * `key` - Attribute key | 属性键
744    /// * `value` - Attribute value | 属性值
745    ///
746    /// # Example | 示例
747    /// ```rust,ignore
748    /// manager.set_attribute("session-id", "theme".to_string(), "dark".to_string()).await?;
749    /// ```
750    pub async fn set_attribute(
751        &self,
752        session_id: &str,
753        key: String,
754        value: String,
755    ) -> Result<(), SaTokenError> {
756        let mut session = self.get_session(session_id).await?;
757        session.attributes.insert(key, value);
758        session.last_access = Utc::now();
759        self.update_session(session).await
760    }
761
762    /// Get a session attribute
763    /// 获取 Session 属性
764    ///
765    /// # Arguments | 参数
766    /// * `session_id` - Session identifier | Session 标识符
767    /// * `key` - Attribute key | 属性键
768    ///
769    /// # Returns | 返回值
770    /// * `Some(value)` - Attribute found | 找到属性
771    /// * `None` - Attribute not found | 未找到属性
772    ///
773    /// # Example | 示例
774    /// ```rust,ignore
775    /// if let Some(theme) = manager.get_attribute("session-id", "theme").await? {
776    ///     println!("Theme: {}", theme);
777    /// }
778    /// ```
779    pub async fn get_attribute(
780        &self,
781        session_id: &str,
782        key: &str,
783    ) -> Result<Option<String>, SaTokenError> {
784        let session = self.get_session(session_id).await?;
785        Ok(session.attributes.get(key).cloned())
786    }
787
788    /// Remove a session attribute
789    /// 移除 Session 属性
790    ///
791    /// # Arguments | 参数
792    /// * `session_id` - Session identifier | Session 标识符
793    /// * `key` - Attribute key | 属性键
794    ///
795    /// # Example | 示例
796    /// ```rust,ignore
797    /// manager.remove_attribute("session-id", "temp_data").await?;
798    /// ```
799    pub async fn remove_attribute(
800        &self,
801        session_id: &str,
802        key: &str,
803    ) -> Result<(), SaTokenError> {
804        let mut session = self.get_session(session_id).await?;
805        session.attributes.remove(key);
806        session.last_access = Utc::now();
807        self.update_session(session).await
808    }
809
810    /// Get all sessions for a specific user
811    /// 获取特定用户的所有 Sessions
812    ///
813    /// # Arguments | 参数
814    /// * `login_id` - User login ID | 用户登录 ID
815    ///
816    /// # Returns | 返回值
817    /// Vector of sessions | Sessions 向量
818    ///
819    /// # Example | 示例
820    /// ```rust,ignore
821    /// let sessions = manager.get_sessions_by_login_id("user123").await?;
822    /// println!("User has {} active sessions", sessions.len());
823    /// ```
824    pub async fn get_sessions_by_login_id(&self, login_id: &str) -> Result<Vec<DistributedSession>, SaTokenError> {
825        self.storage.get_sessions_by_login_id(login_id).await
826    }
827
828    /// Delete all sessions for a specific user
829    /// 删除特定用户的所有 Sessions
830    ///
831    /// # Arguments | 参数
832    /// * `login_id` - User login ID | 用户登录 ID
833    ///
834    /// # Example | 示例
835    /// ```rust,ignore
836    /// manager.delete_all_sessions("user123").await?;
837    /// ```
838    pub async fn delete_all_sessions(&self, login_id: &str) -> Result<(), SaTokenError> {
839        let sessions = self.get_sessions_by_login_id(login_id).await?;
840        for session in sessions {
841            self.delete_session(&session.session_id).await?;
842        }
843        Ok(())
844    }
845}
846
847/// In-memory distributed session storage implementation
848/// 内存分布式 Session 存储实现
849///
850/// For testing and development purposes
851/// 用于测试和开发目的
852pub struct InMemoryDistributedStorage {
853    /// Sessions 存储: session_id -> DistributedSession
854    sessions: Arc<RwLock<HashMap<String, DistributedSession>>>,
855    /// 登录索引: login_id -> Vec<session_id>
856    login_index: Arc<RwLock<HashMap<String, Vec<String>>>>,
857    /// 服务凭证: service_id -> ServiceCredential
858    credentials: Arc<RwLock<HashMap<String, ServiceCredential>>>,
859}
860
861impl InMemoryDistributedStorage {
862    /// 创建新的内存存储
863    pub fn new() -> Self {
864        Self {
865            sessions: Arc::new(RwLock::new(HashMap::new())),
866            login_index: Arc::new(RwLock::new(HashMap::new())),
867            credentials: Arc::new(RwLock::new(HashMap::new())),
868        }
869    }
870}
871
872impl Default for InMemoryDistributedStorage {
873    fn default() -> Self {
874        Self::new()
875    }
876}
877
878#[async_trait]
879impl DistributedSessionStorage for InMemoryDistributedStorage {
880    /// Save session to memory storage | 保存会话到内存存储
881    ///
882    /// # Implementation Details | 实现细节
883    ///
884    /// 1. Stores session in main HashMap by session_id
885    ///    在主 HashMap 中按 session_id 存储会话
886    /// 2. Updates login_index for quick user lookup
887    ///    更新 login_index 以快速查找用户
888    ///
889    /// # Note | 注意
890    ///
891    /// TTL is ignored in memory storage (for simplicity).
892    /// In production, use Redis or similar with built-in TTL support.
893    /// 内存存储中忽略 TTL(为简化实现)。
894    /// 在生产环境中,使用 Redis 或类似的内置 TTL 支持的存储。
895    async fn save_session(&self, session: DistributedSession, _ttl: Option<Duration>) -> Result<(), SaTokenError> {
896        let session_id = session.session_id.clone();
897        let login_id = session.login_id.clone();
898        
899        // 1. Store session in main map
900        // 在主映射中存储会话
901        let mut sessions = self.sessions.write().await;
902        sessions.insert(session_id.clone(), session);
903        
904        // 2. Update login index for this user
905        // 更新此用户的登录索引
906        let mut index = self.login_index.write().await;
907        let session_list = index.entry(login_id).or_insert_with(Vec::new);
908        
909        // Add only if not already present (prevent duplicates)
910        // 仅在不存在时添加(防止重复)
911        if !session_list.contains(&session_id) {
912            session_list.push(session_id);
913        }
914        
915        Ok(())
916    }
917
918    /// Get session from memory storage | 从内存存储获取会话
919    ///
920    /// # Returns | 返回
921    ///
922    /// * `Ok(Some(session))` - Session found | 找到会话
923    /// * `Ok(None)` - Session not found | 未找到会话
924    async fn get_session(&self, session_id: &str) -> Result<Option<DistributedSession>, SaTokenError> {
925        let sessions = self.sessions.read().await;
926        Ok(sessions.get(session_id).cloned())
927    }
928
929    /// Delete session from memory storage | 从内存存储删除会话
930    ///
931    /// # Implementation Details | 实现细节
932    ///
933    /// 1. Removes session from main HashMap
934    ///    从主 HashMap 中移除会话
935    /// 2. Removes session_id from login_index
936    ///    从 login_index 中移除 session_id
937    /// 3. Cleans up empty index entries
938    ///    清理空的索引条目
939    async fn delete_session(&self, session_id: &str) -> Result<(), SaTokenError> {
940        // 1. Remove from main storage and get session data
941        // 从主存储中移除并获取会话数据
942        let mut sessions = self.sessions.write().await;
943        if let Some(session) = sessions.remove(session_id) {
944            // 2. Update login index
945            // 更新登录索引
946            let mut index = self.login_index.write().await;
947            if let Some(session_ids) = index.get_mut(&session.login_id) {
948                // Remove this session_id from the list
949                // 从列表中移除此 session_id
950                session_ids.retain(|id| id != session_id);
951                
952                // 3. Clean up: remove login_id entry if no sessions left
953                // 清理:如果没有剩余会话,移除 login_id 条目
954                if session_ids.is_empty() {
955                    index.remove(&session.login_id);
956                }
957            }
958        }
959        Ok(())
960    }
961
962    /// Get all sessions for a user | 获取用户的所有会话
963    ///
964    /// # Implementation Details | 实现细节
965    ///
966    /// 1. Looks up session_ids in login_index
967    ///    在 login_index 中查找 session_ids
968    /// 2. Retrieves full session data for each session_id
969    ///    为每个 session_id 检索完整的会话数据
970    /// 3. Filters out any missing sessions (cleanup)
971    ///    过滤掉任何缺失的会话(清理)
972    ///
973    /// # Returns | 返回
974    ///
975    /// Vector of all active sessions for the user
976    /// 用户所有活跃会话的向量
977    async fn get_sessions_by_login_id(&self, login_id: &str) -> Result<Vec<DistributedSession>, SaTokenError> {
978        // 1. Get session IDs from index
979        // 从索引中获取会话 IDs
980        let index = self.login_index.read().await;
981        let session_ids = index.get(login_id).cloned().unwrap_or_default();
982        
983        // 2. Retrieve full session data
984        // 检索完整的会话数据
985        let sessions = self.sessions.read().await;
986        let mut result = Vec::new();
987        
988        for session_id in session_ids {
989            if let Some(session) = sessions.get(&session_id) {
990                result.push(session.clone());
991            }
992            // Note: If session not found, it was deleted but index not updated
993            // This is a minor inconsistency acceptable in memory storage
994            // 注意:如果未找到会话,说明会话已删除但索引未更新
995            // 这是内存存储中可接受的小不一致
996        }
997        
998        Ok(result)
999    }
1000
1001    /// 保存服务凭证到内存
1002    async fn save_credential(&self, credential: ServiceCredential) -> Result<(), SaTokenError> {
1003        let mut creds = self.credentials.write().await;
1004        creds.insert(credential.service_id.clone(), credential);
1005        Ok(())
1006    }
1007
1008    /// 从内存获取服务凭证
1009    async fn get_credential(&self, service_id: &str) -> Result<Option<ServiceCredential>, SaTokenError> {
1010        let creds = self.credentials.read().await;
1011        Ok(creds.get(service_id).cloned())
1012    }
1013}
1014
1015/// 基于 SaStorage 的分布式 Session 存储实现
1016/// 把分布式 Session、登录索引、服务凭证统一持久化到任意 SaStorage 后端(Redis / 内存 / 数据库)
1017///
1018/// # 存储键格式
1019/// - Session: `{prefix}dsession:{session_id}`
1020/// - 登录索引: `{prefix}dsession:index:{login_id}`
1021/// - 服务凭证: `{prefix}dservice:{service_id}`
1022pub struct SaStorageDistributedStorage {
1023    /// 底层通用 KV 存储
1024    storage: Arc<dyn SaStorage>,
1025    /// 存储键前缀(应与 SaTokenConfig::storage_key_prefix 保持一致)
1026    key_prefix: String,
1027}
1028
1029impl SaStorageDistributedStorage {
1030    /// 创建适配器
1031    ///
1032    /// # 参数
1033    /// - `storage`: 底层存储实现(可直接复用全局 SaTokenManager 使用的同一个 storage)
1034    /// - `key_prefix`: 存储键前缀,建议传入 `config.storage_key_prefix.clone()` 以保持一致
1035    pub fn new(storage: Arc<dyn SaStorage>, key_prefix: impl Into<String>) -> Self {
1036        Self {
1037            storage,
1038            key_prefix: key_prefix.into(),
1039        }
1040    }
1041
1042    /// 构造 Session 键:{prefix}dsession:{session_id}
1043    fn session_key(&self, session_id: &str) -> String {
1044        format!("{}dsession:{}", self.key_prefix, session_id)
1045    }
1046
1047    /// 构造登录索引键:{prefix}dsession:index:{login_id}
1048    fn index_key(&self, login_id: &str) -> String {
1049        format!("{}dsession:index:{}", self.key_prefix, login_id)
1050    }
1051
1052    /// 构造凭证键:{prefix}dservice:{service_id}
1053    fn credential_key(&self, service_id: &str) -> String {
1054        format!("{}dservice:{}", self.key_prefix, service_id)
1055    }
1056
1057    /// 读取某用户的登录索引(session_id 列表)
1058    /// 不存在时返回空 Vec
1059    async fn load_index(&self, index_key: &str) -> Result<Vec<String>, SaTokenError> {
1060        match self
1061            .storage
1062            .get(index_key)
1063            .await
1064            .map_err(|e| SaTokenError::StorageError(e.to_string()))?
1065        {
1066            Some(value) => serde_json::from_str(&value).map_err(SaTokenError::SerializationError),
1067            None => Ok(Vec::new()),
1068        }
1069    }
1070
1071    /// 回写登录索引(永久保存,不设 TTL)
1072    async fn save_index(&self, index_key: &str, ids: &[String]) -> Result<(), SaTokenError> {
1073        let value = serde_json::to_string(ids).map_err(SaTokenError::SerializationError)?;
1074        self.storage
1075            .set(index_key, &value, None)
1076            .await
1077            .map_err(|e| SaTokenError::StorageError(e.to_string()))
1078    }
1079}
1080
1081#[async_trait]
1082impl DistributedSessionStorage for SaStorageDistributedStorage {
1083    /// 保存 Session
1084    /// 1. 写入会话本体(带 TTL,由后端控制过期)
1085    /// 2. 更新登录索引(永久保存,去重;过期 session 在读取时被过滤清理)
1086    async fn save_session(&self, session: DistributedSession, ttl: Option<Duration>) -> Result<(), SaTokenError> {
1087        let session_key = self.session_key(&session.session_id);
1088        let index_key = self.index_key(&session.login_id);
1089        let session_id = session.session_id.clone();
1090
1091        // 1. 写入会话本体
1092        let value = serde_json::to_string(&session).map_err(SaTokenError::SerializationError)?;
1093        self.storage
1094            .set(&session_key, &value, ttl)
1095            .await
1096            .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
1097
1098        // 2. 更新登录索引(去重)
1099        let mut ids = self.load_index(&index_key).await?;
1100        if !ids.contains(&session_id) {
1101            ids.push(session_id);
1102            self.save_index(&index_key, &ids).await?;
1103        }
1104        Ok(())
1105    }
1106
1107    /// 按 session_id 读取会话
1108    /// 未找到或已过期返回 None
1109    async fn get_session(&self, session_id: &str) -> Result<Option<DistributedSession>, SaTokenError> {
1110        match self
1111            .storage
1112            .get(&self.session_key(session_id))
1113            .await
1114            .map_err(|e| SaTokenError::StorageError(e.to_string()))?
1115        {
1116            Some(value) => Ok(Some(serde_json::from_str(&value).map_err(SaTokenError::SerializationError)?)),
1117            None => Ok(None),
1118        }
1119    }
1120
1121    /// 删除会话
1122    /// 1. 先读出会话以获得 login_id(用于维护索引)
1123    /// 2. 删除会话本体
1124    /// 3. 从登录索引中移除该 session_id(无剩余则删除整个索引键)
1125    async fn delete_session(&self, session_id: &str) -> Result<(), SaTokenError> {
1126        if let Some(session) = self.get_session(session_id).await? {
1127            // 1. 删除会话本体
1128            self.storage
1129                .delete(&self.session_key(session_id))
1130                .await
1131                .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
1132
1133            // 2. 从登录索引中移除
1134            let index_key = self.index_key(&session.login_id);
1135            let mut ids = self.load_index(&index_key).await?;
1136            let before = ids.len();
1137            ids.retain(|id| id != session_id);
1138            if ids.is_empty() {
1139                // 无剩余会话则删除整个索引键
1140                self.storage
1141                    .delete(&index_key)
1142                    .await
1143                    .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
1144            } else if ids.len() != before {
1145                self.save_index(&index_key, &ids).await?;
1146            }
1147        }
1148        Ok(())
1149    }
1150
1151    /// 获取某用户全部会话
1152    /// 顺带清理索引中已过期/丢失的 session_id(best-effort 清理,避免索引无限膨胀)
1153    async fn get_sessions_by_login_id(&self, login_id: &str) -> Result<Vec<DistributedSession>, SaTokenError> {
1154        let index_key = self.index_key(login_id);
1155        let ids = self.load_index(&index_key).await?;
1156        let original_len = ids.len();
1157
1158        let mut result = Vec::new();
1159        let mut alive_ids = Vec::new();
1160        for id in ids {
1161            // 会话本体可能因 TTL 已过期 → 读不到则视为失效
1162            if let Some(session) = self.get_session(&id).await? {
1163                result.push(session);
1164                alive_ids.push(id);
1165            }
1166        }
1167
1168        // 清理:索引发生收缩时回写
1169        if alive_ids.is_empty() {
1170            let _ = self.storage.delete(&index_key).await;
1171        } else if alive_ids.len() != original_len {
1172            let _ = self.save_index(&index_key, &alive_ids).await;
1173        }
1174
1175        Ok(result)
1176    }
1177
1178    /// 保存服务凭证(永久保存)
1179    async fn save_credential(&self, credential: ServiceCredential) -> Result<(), SaTokenError> {
1180        let key = self.credential_key(&credential.service_id);
1181        let value = serde_json::to_string(&credential).map_err(SaTokenError::SerializationError)?;
1182        self.storage
1183            .set(&key, &value, None)
1184            .await
1185            .map_err(|e| SaTokenError::StorageError(e.to_string()))
1186    }
1187
1188    /// 按 service_id 读取服务凭证
1189    /// 未找到返回 None
1190    async fn get_credential(&self, service_id: &str) -> Result<Option<ServiceCredential>, SaTokenError> {
1191        match self
1192            .storage
1193            .get(&self.credential_key(service_id))
1194            .await
1195            .map_err(|e| SaTokenError::StorageError(e.to_string()))?
1196        {
1197            Some(value) => Ok(Some(serde_json::from_str(&value).map_err(SaTokenError::SerializationError)?)),
1198            None => Ok(None),
1199        }
1200    }
1201}
1202
1203#[cfg(test)]
1204mod tests {
1205    use super::*;
1206
1207    #[tokio::test]
1208    async fn test_distributed_session_manager() {
1209        let storage = Arc::new(InMemoryDistributedStorage::new());
1210        let manager = DistributedSessionManager::new(
1211            storage,
1212            "service1".to_string(),
1213            Duration::from_secs(3600),
1214        );
1215
1216        let session = manager.create_session(
1217            "user1".to_string(),
1218            "token1".to_string(),
1219        ).await.unwrap();
1220
1221        let retrieved = manager.get_session(&session.session_id).await.unwrap();
1222        assert_eq!(retrieved.login_id, "user1");
1223    }
1224
1225    #[tokio::test]
1226    async fn test_session_attributes() {
1227        let storage = Arc::new(InMemoryDistributedStorage::new());
1228        let manager = DistributedSessionManager::new(
1229            storage,
1230            "service1".to_string(),
1231            Duration::from_secs(3600),
1232        );
1233
1234        let session = manager.create_session(
1235            "user2".to_string(),
1236            "token2".to_string(),
1237        ).await.unwrap();
1238
1239        manager.set_attribute(
1240            &session.session_id,
1241            "key1".to_string(),
1242            "value1".to_string(),
1243        ).await.unwrap();
1244
1245        let value = manager.get_attribute(&session.session_id, "key1").await.unwrap();
1246        assert_eq!(value, Some("value1".to_string()));
1247    }
1248
1249    #[tokio::test]
1250    async fn test_service_verification() {
1251        let storage = Arc::new(InMemoryDistributedStorage::new());
1252        let manager = DistributedSessionManager::new(
1253            storage,
1254            "service1".to_string(),
1255            Duration::from_secs(3600),
1256        );
1257
1258        let credential = ServiceCredential {
1259            service_id: "service2".to_string(),
1260            service_name: "Service 2".to_string(),
1261            secret_key: "secret123".to_string(),
1262            created_at: Utc::now(),
1263            permissions: vec!["read".to_string(), "write".to_string()],
1264        };
1265
1266        manager.register_service(credential.clone()).await.unwrap();
1267
1268        let verified = manager.verify_service("service2", "secret123").await.unwrap();
1269        assert_eq!(verified.service_id, "service2");
1270
1271        let result = manager.verify_service("service2", "wrong_secret").await;
1272        assert!(result.is_err());
1273    }
1274
1275    #[tokio::test]
1276    async fn test_delete_all_sessions() {
1277        let storage = Arc::new(InMemoryDistributedStorage::new());
1278        let manager = DistributedSessionManager::new(
1279            storage,
1280            "service1".to_string(),
1281            Duration::from_secs(3600),
1282        );
1283
1284        manager.create_session("user3".to_string(), "token1".to_string()).await.unwrap();
1285        manager.create_session("user3".to_string(), "token2".to_string()).await.unwrap();
1286
1287        let sessions = manager.get_sessions_by_login_id("user3").await.unwrap();
1288        assert_eq!(sessions.len(), 2);
1289
1290        manager.delete_all_sessions("user3").await.unwrap();
1291
1292        let sessions = manager.get_sessions_by_login_id("user3").await.unwrap();
1293        assert_eq!(sessions.len(), 0);
1294    }
1295}