sa_token_core/distributed.rs
1//! Distributed Session Management Module | 分布式 Session 管理模块
2//!
3//! # Overview | 概述
4//!
5//! This module enables **distributed session management** for microservices architecture,
6//! allowing multiple services to share authentication sessions seamlessly.
7//! 本模块为微服务架构提供**分布式 Session 管理**,允许多个服务无缝共享认证会话。
8//!
9//! ## Architecture Context | 架构上下文
10//!
11//! ```text
12//! ┌────────────────────────────────────────────────────────────────────┐
13//! │ Microservices Architecture │
14//! │ 微服务架构 │
15//! └────────────────────────────────────────────────────────────────────┘
16//!
17//! ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
18//! │ Service A │ │ Service B │ │ Service C │
19//! │ (User API) │ │ (Order API) │ │ (Pay API) │
20//! └──────┬───────┘ └──────┬───────┘ └──────┬───────┘
21//! │ │ │
22//! └──────────────────┼──────────────────┘
23//! │
24//! ┌─────────▼──────────┐
25//! │ Distributed │
26//! │ Session Storage │
27//! │ (Redis/Database) │
28//! └────────────────────┘
29//!
30//! Each service can:
31//! 每个服务可以:
32//! 1. Create sessions for users
33//! 为用户创建会话
34//! 2. Access sessions created by other services
35//! 访问其他服务创建的会话
36//! 3. Share user authentication state
37//! 共享用户认证状态
38//! ```
39//!
40//! ## Key Use Cases | 关键使用场景
41//!
42//! ### 1. Single Sign-On (SSO) Across Services | 跨服务单点登录
43//!
44//! ```text
45//! Scenario: User logs in to Service A and accesses Service B
46//! 场景:用户登录服务 A 并访问服务 B
47//!
48//! 1. User → Service A: Login
49//! 用户 → 服务 A:登录
50//! ├─ Service A creates session: session_id = "abc123"
51//! │ 服务 A 创建会话:session_id = "abc123"
52//! └─ Saves to distributed storage
53//! 保存到分布式存储
54//!
55//! 2. User → Service B: Request with session_id = "abc123"
56//! 用户 → 服务 B:请求带 session_id = "abc123"
57//! ├─ Service B retrieves session from storage
58//! │ 服务 B 从存储中获取会话
59//! ├─ Validates user is authenticated
60//! │ 验证用户已认证
61//! └─ Processes request ✅
62//! 处理请求 ✅
63//!
64//! No need to log in again! 无需再次登录!
65//! ```
66//!
67//! ### 2. Session Sharing for User Context | 会话共享用户上下文
68//!
69//! ```text
70//! Service A stores: { "user_role": "admin", "department": "IT" }
71//! 服务 A 存储:{ "user_role": "admin", "department": "IT" }
72//!
73//! Service B reads: Same session attributes available
74//! 服务 B 读取:相同的会话属性可用
75//!
76//! Service C updates: { "last_order": "order_123" }
77//! 服务 C 更新:{ "last_order": "order_123" }
78//!
79//! All services share the same session state!
80//! 所有服务共享相同的会话状态!
81//! ```
82//!
83//! ### 3. Multi-Device Session Management | 多设备会话管理
84//!
85//! ```text
86//! User: user_123
87//! ├─ Session 1: Web (Service A)
88//! │ 会话 1:网页(服务 A)
89//! ├─ Session 2: Mobile (Service B)
90//! │ 会话 2:移动端(服务 B)
91//! └─ Session 3: Desktop (Service C)
92//! 会话 3:桌面端(服务 C)
93//!
94//! All sessions can be:
95//! 所有会话可以:
96//! - Listed: get_sessions_by_login_id()
97//! - Managed individually
98//! - Terminated all at once: delete_all_sessions()
99//! ```
100//!
101//! ## Integration with Sa-Token | 与 Sa-Token 的集成
102//!
103//! ```text
104//! ┌─────────────────────────────────────────────────────────┐
105//! │ Sa-Token Core Flow │
106//! │ Sa-Token 核心流程 │
107//! └─────────────────────────────────────────────────────────┘
108//!
109//! SaTokenManager::login()
110//! ├─ 1. Generate token
111//! │ 生成 token
112//! ├─ 2. Create TokenInfo
113//! │ 创建 TokenInfo
114//! └─ 3. Create DistributedSession (if enabled)
115//! 创建 DistributedSession(如果启用)
116//! ├─ session_id: UUID
117//! ├─ login_id: user's login ID
118//! ├─ token: access token
119//! ├─ service_id: current service
120//! └─ attributes: custom data
121//!
122//! StpUtil::get_session()
123//! └─ Retrieves distributed session
124//! 获取分布式会话
125//!
126//! StpUtil::logout()
127//! └─ Deletes distributed session(s)
128//! 删除分布式会话
129//! ```
130
131//!
132//! ## Workflow Diagrams | 工作流程图
133//!
134//! ### Complete Session Lifecycle | 完整会话生命周期
135//!
136//! ```text
137//! ┌──────────────────────────────────────────────────────────────────┐
138//! │ Session Lifecycle │
139//! │ 会话生命周期 │
140//! └──────────────────────────────────────────────────────────────────┘
141//!
142//! User Service A Storage Service B
143//! 用户 服务 A 存储 服务 B
144//! │ │ │ │
145//! │ 1. Login │ │ │
146//! │ 登录 │ │ │
147//! ├────────────────────▶│ │ │
148//! │ │ 2. create_session() │
149//! │ │ 创建会话 │ │
150//! │ │ ├─ session_id: uuid │
151//! │ │ ├─ login_id: user_123 │
152//! │ │ ├─ token: access_token │
153//! │ │ └─ service_id: service-a │
154//! │ │ │ │
155//! │ │ 3. save_session()│ │
156//! │ │ 保存会话 │ │
157//! │ ├──────────────────▶│ │
158//! │ │ │ Store with TTL │
159//! │ │ │ 存储并设置 TTL │
160//! │ │ │ │
161//! │ 4. session_id │ │ │
162//! │ 返回会话 ID │ │ │
163//! │◀────────────────────│ │ │
164//! │ │ │ │
165//! │ 5. Request to Service B with session_id │
166//! │ 带 session_id 请求服务 B │
167//! ├────────────────────────────────────────────────────────────▶│
168//! │ │ │ │
169//! │ │ │ 6. get_session() │
170//! │ │ │ 获取会话 │
171//! │ │ │◀──────────────────│
172//! │ │ │ │
173//! │ │ │ 7. Return session│
174//! │ │ │ 返回会话数据 │
175//! │ │ ├──────────────────▶│
176//! │ │ │ │
177//! │ │ │ 8. refresh_session()
178//! │ │ │ 刷新会话 │
179//! │ │ │ (update last_access)
180//! │ │ │◀──────────────────│
181//! │ │ │ │
182//! │ 9. Response │ │ │
183//! │ 响应 │ │ │
184//! │◀────────────────────────────────────────────────────────────│
185//! │ │ │ │
186//! │ 10. Logout │ │ │
187//! │ 登出 │ │ │
188//! ├────────────────────▶│ │ │
189//! │ │ 11. delete_session() │
190//! │ │ 删除会话 │ │
191//! │ ├──────────────────▶│ │
192//! │ │ │ Remove from storage
193//! │ │ │ 从存储中移除 │
194//! │ │ │ │
195//! │ 12. Logout Success │ │ │
196//! │ 登出成功 │ │ │
197//! │◀────────────────────│ │ │
198//! │ │ │ │
199//! ```
200//!
201//! ### Service Authentication Flow | 服务认证流程
202//!
203//! ```text
204//! ┌──────────────────────────────────────────────────────────────────┐
205//! │ Service Inter-Communication │
206//! │ 服务间通信 │
207//! └──────────────────────────────────────────────────────────────────┘
208//!
209//! Service B Service A (Session Manager) Storage
210//! 服务 B 服务 A(会话管理器) 存储
211//! │ │ │
212//! │ 1. Register │ │
213//! │ 注册服务 │ │
214//! │ ├─ service_id │ │
215//! │ ├─ service_name │ │
216//! │ ├─ secret_key │ │
217//! │ └─ permissions │ │
218//! ├───────────────────────▶│ │
219//! │ │ Store credentials │
220//! │ │ 存储凭证 │
221//! │ │ (in memory) │
222//! │ │ │
223//! │ 2. Registered ✅ │ │
224//! │◀───────────────────────│ │
225//! │ │ │
226//! │ 3. Access session │ │
227//! │ 访问会话 │ │
228//! │ ├─ service_id │ │
229//! │ ├─ secret_key │ │
230//! │ └─ session_id │ │
231//! ├───────────────────────▶│ │
232//! │ │ 4. verify_service() │
233//! │ │ 验证服务 │
234//! │ │ ├─ Lookup service │
235//! │ │ └─ Compare secret_key │
236//! │ │ │
237//! │ │ 5. get_session() │
238//! │ │ 获取会话 │
239//! │ ├──────────────────────────────▶│
240//! │ │ │
241//! │ │ 6. Return session │
242//! │ │ 返回会话 │
243//! │ │◀──────────────────────────────│
244//! │ │ │
245//! │ 7. Session data ✅ │ │
246//! │◀───────────────────────│ │
247//! │ │ │
248//! ```
249//!
250//! ## Storage Backends | 存储后端
251//!
252//! The module is storage-agnostic. You can implement custom backends:
253//! 本模块与存储无关。您可以实现自定义后端:
254//!
255//! ### Redis Implementation (Recommended) | Redis 实现(推荐)
256//!
257//! ```rust,ignore
258//! use redis::AsyncCommands;
259//!
260//! pub struct RedisDistributedStorage {
261//! client: redis::Client,
262//! }
263//!
264//! #[async_trait]
265//! impl DistributedSessionStorage for RedisDistributedStorage {
266//! async fn save_session(&self, session: DistributedSession, ttl: Option<Duration>)
267//! -> Result<(), SaTokenError>
268//! {
269//! let mut conn = self.client.get_async_connection().await?;
270//! let key = format!("distributed:session:{}", session.session_id);
271//! let value = serde_json::to_string(&session)?;
272//!
273//! if let Some(ttl) = ttl {
274//! conn.set_ex(&key, value, ttl.as_secs() as usize).await?;
275//! } else {
276//! conn.set(&key, value).await?;
277//! }
278//!
279//! // Index by login_id
280//! let index_key = format!("distributed:login:{}", session.login_id);
281//! conn.sadd(index_key, &session.session_id).await?;
282//!
283//! Ok(())
284//! }
285//!
286//! // ... other methods
287//! }
288//! ```
289//!
290//! ### Database Implementation | 数据库实现
291//!
292//! ```rust,ignore
293//! use sqlx::PgPool;
294//!
295//! pub struct PostgresDistributedStorage {
296//! pool: PgPool,
297//! }
298//!
299//! #[async_trait]
300//! impl DistributedSessionStorage for PostgresDistributedStorage {
301//! async fn save_session(&self, session: DistributedSession, ttl: Option<Duration>)
302//! -> Result<(), SaTokenError>
303//! {
304//! let expires_at = ttl.map(|t| Utc::now() + chrono::Duration::from_std(t).unwrap());
305//!
306//! sqlx::query!(
307//! "INSERT INTO distributed_sessions
308//! (session_id, login_id, token, service_id, attributes, expires_at)
309//! VALUES ($1, $2, $3, $4, $5, $6)
310//! ON CONFLICT (session_id) DO UPDATE
311//! SET attributes = $5, last_access = NOW()",
312//! session.session_id,
313//! session.login_id,
314//! session.token,
315//! session.service_id,
316//! serde_json::to_value(&session.attributes)?,
317//! expires_at,
318//! )
319//! .execute(&self.pool)
320//! .await?;
321//!
322//! Ok(())
323//! }
324//!
325//! // ... other methods
326//! }
327//! ```
328//!
329//! ## Best Practices | 最佳实践
330//!
331//! ### 1. Service Registration | 服务注册
332//!
333//! ```rust,ignore
334//! // Initialize each service with unique credentials
335//! // 为每个服务初始化唯一凭证
336//! let credential = ServiceCredential {
337//! service_id: "user-service".to_string(),
338//! service_name: "User Management Service".to_string(),
339//! secret_key: generate_secure_secret(), // Use crypto-secure generation
340//! created_at: Utc::now(),
341//! permissions: vec!["user.read".to_string(), "user.write".to_string()],
342//! };
343//! manager.register_service(credential).await;
344//! ```
345//!
346//! ### 2. Session Creation with Context | 带上下文的会话创建
347//!
348//! ```rust,ignore
349//! // Create session with user context
350//! // 创建带用户上下文的会话
351//! let session = manager.create_session(login_id, token).await?;
352//!
353//! // Add relevant attributes immediately
354//! // 立即添加相关属性
355//! manager.set_attribute(&session.session_id, "user_role".to_string(), "admin".to_string()).await?;
356//! manager.set_attribute(&session.session_id, "department".to_string(), "IT".to_string()).await?;
357//! manager.set_attribute(&session.session_id, "login_device".to_string(), "web".to_string()).await?;
358//! ```
359//!
360//! ### 3. Cross-Service Access Pattern | 跨服务访问模式
361//!
362//! ```rust,ignore
363//! // Service B accesses session created by Service A
364//! // 服务 B 访问服务 A 创建的会话
365//!
366//! // 1. Verify service identity
367//! // 验证服务身份
368//! let service_cred = manager.verify_service("service-b", request.secret).await?;
369//!
370//! // 2. Check permissions
371//! // 检查权限
372//! if !service_cred.permissions.contains(&"session.read".to_string()) {
373//! return Err(SaTokenError::PermissionDenied);
374//! }
375//!
376//! // 3. Access session
377//! // 访问会话
378//! let session = manager.get_session(&request.session_id).await?;
379//!
380//! // 4. Refresh to keep session alive
381//! // 刷新以保持会话活跃
382//! manager.refresh_session(&session.session_id).await?;
383//! ```
384//!
385//! ### 4. Multi-Device Logout | 多设备登出
386//!
387//! ```rust,ignore
388//! // Logout from all devices
389//! // 从所有设备登出
390//! manager.delete_all_sessions(&login_id).await?;
391//!
392//! // Or logout specific session
393//! // 或登出特定会话
394//! manager.delete_session(&session_id).await?;
395//! ```
396//!
397//! ### 5. Session Monitoring | 会话监控
398//!
399//! ```rust,ignore
400//! // Monitor user's active sessions
401//! // 监控用户的活跃会话
402//! let sessions = manager.get_sessions_by_login_id(&login_id).await?;
403//!
404//! for session in sessions {
405//! println!("Session: {} from service: {}, last active: {}",
406//! session.session_id,
407//! session.service_id,
408//! session.last_access
409//! );
410//!
411//! // Check for suspicious activity
412//! // 检查可疑活动
413//! if is_suspicious(&session) {
414//! manager.delete_session(&session.session_id).await?;
415//! }
416//! }
417//! ```
418//!
419//! ## Security Considerations | 安全考虑
420//!
421//! ```text
422//! 1. ✅ Service Authentication | 服务认证
423//! - Each service has unique secret_key
424//! - Verify credentials before granting access
425//! - Rotate keys periodically
426//!
427//! 2. ✅ Permission-Based Access | 基于权限的访问
428//! - Services have explicit permissions
429//! - Check permissions before operations
430//! - Implement least-privilege principle
431//!
432//! 3. ✅ Session Timeout | 会话超时
433//! - Configure appropriate TTL
434//! - Auto-expire inactive sessions
435//! - Refresh on active use
436//!
437//! 4. ✅ Data Encryption | 数据加密
438//! - Encrypt sensitive session attributes
439//! - Use TLS for inter-service communication
440//! - Encrypt data at rest in storage
441//!
442//! 5. ✅ Audit Logging | 审计日志
443//! - Log session creation/deletion
444//! - Track cross-service access
445//! - Monitor for anomalies
446//! ```
447
448use crate::error::SaTokenError;
449use async_trait::async_trait;
450use sa_token_adapter::storage::SaStorage;
451use serde::{Deserialize, Serialize};
452use std::collections::HashMap;
453use std::sync::Arc;
454use std::time::Duration;
455use chrono::{DateTime, Utc};
456use tokio::sync::RwLock;
457
458/// Distributed session data structure
459/// 分布式 Session 数据结构
460///
461/// Represents a session that can be shared across multiple services
462/// 表示可以在多个服务之间共享的 Session
463#[derive(Debug, Clone, Serialize, Deserialize)]
464pub struct DistributedSession {
465 /// Unique session identifier | 唯一 Session 标识符
466 pub session_id: String,
467
468 /// User login ID | 用户登录 ID
469 pub login_id: String,
470
471 /// Authentication token | 认证 Token
472 pub token: String,
473
474 /// ID of the service that created this session | 创建此 Session 的服务 ID
475 pub service_id: String,
476
477 /// Session creation time | Session 创建时间
478 pub create_time: DateTime<Utc>,
479
480 /// Last access time | 最后访问时间
481 pub last_access: DateTime<Utc>,
482
483 /// Session attributes (key-value pairs) | Session 属性(键值对)
484 pub attributes: HashMap<String, String>,
485}
486
487/// Service credential for inter-service authentication
488/// 服务间认证的服务凭证
489///
490/// Contains service identification and permission information
491/// 包含服务标识和权限信息
492#[derive(Debug, Clone, Serialize, Deserialize)]
493pub struct ServiceCredential {
494 /// Unique service identifier | 唯一服务标识符
495 pub service_id: String,
496
497 /// Human-readable service name | 可读的服务名称
498 pub service_name: String,
499
500 /// Service authentication secret key | 服务认证密钥
501 pub secret_key: String,
502
503 /// Service registration time | 服务注册时间
504 pub created_at: DateTime<Utc>,
505
506 /// List of permissions this service has | 该服务拥有的权限列表
507 pub permissions: Vec<String>,
508}
509
510/// Distributed session storage trait
511/// 分布式 Session 存储 trait
512///
513/// Implement this trait to provide custom storage backends
514/// 实现此 trait 以提供自定义存储后端
515#[async_trait]
516pub trait DistributedSessionStorage: Send + Sync {
517 /// Save a session to storage with optional TTL
518 /// 保存 Session 到存储,可选 TTL
519 ///
520 /// # Arguments | 参数
521 /// * `session` - Session to save | 要保存的 Session
522 /// * `ttl` - Time-to-live duration | 生存时间
523 async fn save_session(&self, session: DistributedSession, ttl: Option<Duration>) -> Result<(), SaTokenError>;
524
525 /// Get a session from storage
526 /// 从存储获取 Session
527 ///
528 /// # Arguments | 参数
529 /// * `session_id` - Session identifier | Session 标识符
530 async fn get_session(&self, session_id: &str) -> Result<Option<DistributedSession>, SaTokenError>;
531
532 /// Delete a session from storage
533 /// 从存储删除 Session
534 ///
535 /// # Arguments | 参数
536 /// * `session_id` - Session identifier | Session 标识符
537 async fn delete_session(&self, session_id: &str) -> Result<(), SaTokenError>;
538
539 /// Get all sessions for a specific user
540 /// 获取特定用户的所有 Sessions
541 ///
542 /// # Arguments | 参数
543 /// * `login_id` - User login ID | 用户登录 ID
544 async fn get_sessions_by_login_id(&self, login_id: &str) -> Result<Vec<DistributedSession>, SaTokenError>;
545
546 /// 保存服务凭证 | Save a service credential
547 /// 用于把 register_service 的凭证持久化到存储
548 async fn save_credential(&self, credential: ServiceCredential) -> Result<(), SaTokenError>;
549
550 /// 按 service_id 获取服务凭证 | Get a service credential by service_id
551 /// 未找到返回 Ok(None)
552 async fn get_credential(&self, service_id: &str) -> Result<Option<ServiceCredential>, SaTokenError>;
553}
554
555/// Distributed session manager
556/// 分布式 Session 管理器
557///
558/// Manages distributed sessions and service authentication
559/// 管理分布式 Sessions 和服务认证
560pub struct DistributedSessionManager {
561 /// Session 存储后端
562 storage: Arc<dyn DistributedSessionStorage>,
563 /// 当前服务 ID
564 service_id: String,
565 /// 默认 Session 超时时间
566 session_timeout: Duration,
567}
568
569impl DistributedSessionManager {
570 /// Create a new distributed session manager
571 /// 创建新的分布式 Session 管理器
572 ///
573 /// # Arguments | 参数
574 /// * `storage` - Session storage implementation | Session 存储实现
575 /// * `service_id` - ID of this service | 此服务的 ID
576 /// * `session_timeout` - Default session timeout | 默认 Session 超时时间
577 ///
578 /// # Example | 示例
579 /// ```rust,ignore
580 /// let storage = Arc::new(MyDistributedStorage::new());
581 /// let manager = DistributedSessionManager::new(
582 /// storage,
583 /// "my-service".to_string(),
584 /// Duration::from_secs(3600),
585 /// );
586 /// ```
587 pub fn new(
588 storage: Arc<dyn DistributedSessionStorage>,
589 service_id: String,
590 session_timeout: Duration,
591 ) -> Self {
592 Self {
593 storage,
594 service_id,
595 session_timeout,
596 }
597 }
598
599 /// 注册服务凭证(持久化到底层存储)
600 /// 返回 Result 以便调用方处理存储错误
601 pub async fn register_service(&self, credential: ServiceCredential) -> Result<(), SaTokenError> {
602 self.storage.save_credential(credential).await
603 }
604
605 /// Verify a service's credentials
606 /// 验证服务的凭证
607 ///
608 /// # Arguments | 参数
609 /// * `service_id` - Service identifier | 服务标识符
610 /// * `secret` - Service secret key | 服务密钥
611 ///
612 /// # Returns | 返回值
613 /// * `Ok(ServiceCredential)` - Service authenticated | 服务已认证
614 /// * `Err(PermissionDenied)` - Invalid credentials | 凭证无效
615 ///
616 /// # Example | 示例
617 /// ```rust,ignore
618 /// match manager.verify_service("api-gateway", "secret123").await {
619 /// Ok(cred) => println!("Service {} verified", cred.service_name),
620 /// Err(e) => println!("Verification failed: {}", e),
621 /// }
622 /// ```
623 /// 校验服务凭证
624 /// service_id 存在且 secret_key 匹配时返回凭证,否则返回 PermissionDenied
625 pub async fn verify_service(&self, service_id: &str, secret: &str) -> Result<ServiceCredential, SaTokenError> {
626 if let Some(cred) = self.storage.get_credential(service_id).await?
627 && cred.secret_key == secret
628 {
629 return Ok(cred);
630 }
631 Err(SaTokenError::PermissionDenied)
632 }
633
634 /// Create a new distributed session
635 /// 创建新的分布式 Session
636 ///
637 /// # Arguments | 参数
638 /// * `login_id` - User login ID | 用户登录 ID
639 /// * `token` - Authentication token | 认证 Token
640 ///
641 /// # Returns | 返回值
642 /// * `Ok(DistributedSession)` - Session created | Session 已创建
643 /// * `Err(SaTokenError)` - Creation failed | 创建失败
644 ///
645 /// # Example | 示例
646 /// ```rust,ignore
647 /// let session = manager.create_session(
648 /// "user123".to_string(),
649 /// "token456".to_string(),
650 /// ).await?;
651 /// println!("Session created: {}", session.session_id);
652 /// ```
653 pub async fn create_session(
654 &self,
655 login_id: String,
656 token: String,
657 ) -> Result<DistributedSession, SaTokenError> {
658 let session = DistributedSession {
659 session_id: uuid::Uuid::new_v4().to_string(),
660 login_id,
661 token,
662 service_id: self.service_id.clone(),
663 create_time: Utc::now(),
664 last_access: Utc::now(),
665 attributes: HashMap::new(),
666 };
667
668 self.storage.save_session(session.clone(), Some(self.session_timeout)).await?;
669 Ok(session)
670 }
671
672 /// Get a session by ID
673 /// 通过 ID 获取 Session
674 ///
675 /// # Arguments | 参数
676 /// * `session_id` - Session identifier | Session 标识符
677 ///
678 /// # Returns | 返回值
679 /// * `Ok(DistributedSession)` - Session found | 找到 Session
680 /// * `Err(SessionNotFound)` - Session not found | 未找到 Session
681 ///
682 /// # Example | 示例
683 /// ```rust,ignore
684 /// let session = manager.get_session("session-id-123").await?;
685 /// println!("User: {}", session.login_id);
686 /// ```
687 pub async fn get_session(&self, session_id: &str) -> Result<DistributedSession, SaTokenError> {
688 self.storage.get_session(session_id).await?
689 .ok_or(SaTokenError::SessionNotFound)
690 }
691
692 /// Update an existing session
693 /// 更新现有 Session
694 ///
695 /// # Arguments | 参数
696 /// * `session` - Updated session data | 更新后的 Session 数据
697 ///
698 /// # Example | 示例
699 /// ```rust,ignore
700 /// let mut session = manager.get_session("session-id").await?;
701 /// session.attributes.insert("role".to_string(), "admin".to_string());
702 /// manager.update_session(session).await?;
703 /// ```
704 pub async fn update_session(&self, session: DistributedSession) -> Result<(), SaTokenError> {
705 self.storage.save_session(session, Some(self.session_timeout)).await
706 }
707
708 /// Delete a session
709 /// 删除 Session
710 ///
711 /// # Arguments | 参数
712 /// * `session_id` - Session identifier | Session 标识符
713 ///
714 /// # Example | 示例
715 /// ```rust,ignore
716 /// manager.delete_session("session-id-123").await?;
717 /// ```
718 pub async fn delete_session(&self, session_id: &str) -> Result<(), SaTokenError> {
719 self.storage.delete_session(session_id).await
720 }
721
722 /// Refresh a session (update last access time)
723 /// 刷新 Session(更新最后访问时间)
724 ///
725 /// # Arguments | 参数
726 /// * `session_id` - Session identifier | Session 标识符
727 ///
728 /// # Example | 示例
729 /// ```rust,ignore
730 /// manager.refresh_session("session-id-123").await?;
731 /// ```
732 pub async fn refresh_session(&self, session_id: &str) -> Result<(), SaTokenError> {
733 let mut session = self.get_session(session_id).await?;
734 session.last_access = Utc::now();
735 self.update_session(session).await
736 }
737
738 /// Set a session attribute
739 /// 设置 Session 属性
740 ///
741 /// # Arguments | 参数
742 /// * `session_id` - Session identifier | Session 标识符
743 /// * `key` - Attribute key | 属性键
744 /// * `value` - Attribute value | 属性值
745 ///
746 /// # Example | 示例
747 /// ```rust,ignore
748 /// manager.set_attribute("session-id", "theme".to_string(), "dark".to_string()).await?;
749 /// ```
750 pub async fn set_attribute(
751 &self,
752 session_id: &str,
753 key: String,
754 value: String,
755 ) -> Result<(), SaTokenError> {
756 let mut session = self.get_session(session_id).await?;
757 session.attributes.insert(key, value);
758 session.last_access = Utc::now();
759 self.update_session(session).await
760 }
761
762 /// Get a session attribute
763 /// 获取 Session 属性
764 ///
765 /// # Arguments | 参数
766 /// * `session_id` - Session identifier | Session 标识符
767 /// * `key` - Attribute key | 属性键
768 ///
769 /// # Returns | 返回值
770 /// * `Some(value)` - Attribute found | 找到属性
771 /// * `None` - Attribute not found | 未找到属性
772 ///
773 /// # Example | 示例
774 /// ```rust,ignore
775 /// if let Some(theme) = manager.get_attribute("session-id", "theme").await? {
776 /// println!("Theme: {}", theme);
777 /// }
778 /// ```
779 pub async fn get_attribute(
780 &self,
781 session_id: &str,
782 key: &str,
783 ) -> Result<Option<String>, SaTokenError> {
784 let session = self.get_session(session_id).await?;
785 Ok(session.attributes.get(key).cloned())
786 }
787
788 /// Remove a session attribute
789 /// 移除 Session 属性
790 ///
791 /// # Arguments | 参数
792 /// * `session_id` - Session identifier | Session 标识符
793 /// * `key` - Attribute key | 属性键
794 ///
795 /// # Example | 示例
796 /// ```rust,ignore
797 /// manager.remove_attribute("session-id", "temp_data").await?;
798 /// ```
799 pub async fn remove_attribute(
800 &self,
801 session_id: &str,
802 key: &str,
803 ) -> Result<(), SaTokenError> {
804 let mut session = self.get_session(session_id).await?;
805 session.attributes.remove(key);
806 session.last_access = Utc::now();
807 self.update_session(session).await
808 }
809
810 /// Get all sessions for a specific user
811 /// 获取特定用户的所有 Sessions
812 ///
813 /// # Arguments | 参数
814 /// * `login_id` - User login ID | 用户登录 ID
815 ///
816 /// # Returns | 返回值
817 /// Vector of sessions | Sessions 向量
818 ///
819 /// # Example | 示例
820 /// ```rust,ignore
821 /// let sessions = manager.get_sessions_by_login_id("user123").await?;
822 /// println!("User has {} active sessions", sessions.len());
823 /// ```
824 pub async fn get_sessions_by_login_id(&self, login_id: &str) -> Result<Vec<DistributedSession>, SaTokenError> {
825 self.storage.get_sessions_by_login_id(login_id).await
826 }
827
828 /// Delete all sessions for a specific user
829 /// 删除特定用户的所有 Sessions
830 ///
831 /// # Arguments | 参数
832 /// * `login_id` - User login ID | 用户登录 ID
833 ///
834 /// # Example | 示例
835 /// ```rust,ignore
836 /// manager.delete_all_sessions("user123").await?;
837 /// ```
838 pub async fn delete_all_sessions(&self, login_id: &str) -> Result<(), SaTokenError> {
839 let sessions = self.get_sessions_by_login_id(login_id).await?;
840 for session in sessions {
841 self.delete_session(&session.session_id).await?;
842 }
843 Ok(())
844 }
845}
846
847/// In-memory distributed session storage implementation
848/// 内存分布式 Session 存储实现
849///
850/// For testing and development purposes
851/// 用于测试和开发目的
852pub struct InMemoryDistributedStorage {
853 /// Sessions 存储: session_id -> DistributedSession
854 sessions: Arc<RwLock<HashMap<String, DistributedSession>>>,
855 /// 登录索引: login_id -> Vec<session_id>
856 login_index: Arc<RwLock<HashMap<String, Vec<String>>>>,
857 /// 服务凭证: service_id -> ServiceCredential
858 credentials: Arc<RwLock<HashMap<String, ServiceCredential>>>,
859}
860
861impl InMemoryDistributedStorage {
862 /// 创建新的内存存储
863 pub fn new() -> Self {
864 Self {
865 sessions: Arc::new(RwLock::new(HashMap::new())),
866 login_index: Arc::new(RwLock::new(HashMap::new())),
867 credentials: Arc::new(RwLock::new(HashMap::new())),
868 }
869 }
870}
871
872impl Default for InMemoryDistributedStorage {
873 fn default() -> Self {
874 Self::new()
875 }
876}
877
878#[async_trait]
879impl DistributedSessionStorage for InMemoryDistributedStorage {
880 /// Save session to memory storage | 保存会话到内存存储
881 ///
882 /// # Implementation Details | 实现细节
883 ///
884 /// 1. Stores session in main HashMap by session_id
885 /// 在主 HashMap 中按 session_id 存储会话
886 /// 2. Updates login_index for quick user lookup
887 /// 更新 login_index 以快速查找用户
888 ///
889 /// # Note | 注意
890 ///
891 /// TTL is ignored in memory storage (for simplicity).
892 /// In production, use Redis or similar with built-in TTL support.
893 /// 内存存储中忽略 TTL(为简化实现)。
894 /// 在生产环境中,使用 Redis 或类似的内置 TTL 支持的存储。
895 async fn save_session(&self, session: DistributedSession, _ttl: Option<Duration>) -> Result<(), SaTokenError> {
896 let session_id = session.session_id.clone();
897 let login_id = session.login_id.clone();
898
899 // 1. Store session in main map
900 // 在主映射中存储会话
901 let mut sessions = self.sessions.write().await;
902 sessions.insert(session_id.clone(), session);
903
904 // 2. Update login index for this user
905 // 更新此用户的登录索引
906 let mut index = self.login_index.write().await;
907 let session_list = index.entry(login_id).or_insert_with(Vec::new);
908
909 // Add only if not already present (prevent duplicates)
910 // 仅在不存在时添加(防止重复)
911 if !session_list.contains(&session_id) {
912 session_list.push(session_id);
913 }
914
915 Ok(())
916 }
917
918 /// Get session from memory storage | 从内存存储获取会话
919 ///
920 /// # Returns | 返回
921 ///
922 /// * `Ok(Some(session))` - Session found | 找到会话
923 /// * `Ok(None)` - Session not found | 未找到会话
924 async fn get_session(&self, session_id: &str) -> Result<Option<DistributedSession>, SaTokenError> {
925 let sessions = self.sessions.read().await;
926 Ok(sessions.get(session_id).cloned())
927 }
928
929 /// Delete session from memory storage | 从内存存储删除会话
930 ///
931 /// # Implementation Details | 实现细节
932 ///
933 /// 1. Removes session from main HashMap
934 /// 从主 HashMap 中移除会话
935 /// 2. Removes session_id from login_index
936 /// 从 login_index 中移除 session_id
937 /// 3. Cleans up empty index entries
938 /// 清理空的索引条目
939 async fn delete_session(&self, session_id: &str) -> Result<(), SaTokenError> {
940 // 1. Remove from main storage and get session data
941 // 从主存储中移除并获取会话数据
942 let mut sessions = self.sessions.write().await;
943 if let Some(session) = sessions.remove(session_id) {
944 // 2. Update login index
945 // 更新登录索引
946 let mut index = self.login_index.write().await;
947 if let Some(session_ids) = index.get_mut(&session.login_id) {
948 // Remove this session_id from the list
949 // 从列表中移除此 session_id
950 session_ids.retain(|id| id != session_id);
951
952 // 3. Clean up: remove login_id entry if no sessions left
953 // 清理:如果没有剩余会话,移除 login_id 条目
954 if session_ids.is_empty() {
955 index.remove(&session.login_id);
956 }
957 }
958 }
959 Ok(())
960 }
961
962 /// Get all sessions for a user | 获取用户的所有会话
963 ///
964 /// # Implementation Details | 实现细节
965 ///
966 /// 1. Looks up session_ids in login_index
967 /// 在 login_index 中查找 session_ids
968 /// 2. Retrieves full session data for each session_id
969 /// 为每个 session_id 检索完整的会话数据
970 /// 3. Filters out any missing sessions (cleanup)
971 /// 过滤掉任何缺失的会话(清理)
972 ///
973 /// # Returns | 返回
974 ///
975 /// Vector of all active sessions for the user
976 /// 用户所有活跃会话的向量
977 async fn get_sessions_by_login_id(&self, login_id: &str) -> Result<Vec<DistributedSession>, SaTokenError> {
978 // 1. Get session IDs from index
979 // 从索引中获取会话 IDs
980 let index = self.login_index.read().await;
981 let session_ids = index.get(login_id).cloned().unwrap_or_default();
982
983 // 2. Retrieve full session data
984 // 检索完整的会话数据
985 let sessions = self.sessions.read().await;
986 let mut result = Vec::new();
987
988 for session_id in session_ids {
989 if let Some(session) = sessions.get(&session_id) {
990 result.push(session.clone());
991 }
992 // Note: If session not found, it was deleted but index not updated
993 // This is a minor inconsistency acceptable in memory storage
994 // 注意:如果未找到会话,说明会话已删除但索引未更新
995 // 这是内存存储中可接受的小不一致
996 }
997
998 Ok(result)
999 }
1000
1001 /// 保存服务凭证到内存
1002 async fn save_credential(&self, credential: ServiceCredential) -> Result<(), SaTokenError> {
1003 let mut creds = self.credentials.write().await;
1004 creds.insert(credential.service_id.clone(), credential);
1005 Ok(())
1006 }
1007
1008 /// 从内存获取服务凭证
1009 async fn get_credential(&self, service_id: &str) -> Result<Option<ServiceCredential>, SaTokenError> {
1010 let creds = self.credentials.read().await;
1011 Ok(creds.get(service_id).cloned())
1012 }
1013}
1014
1015/// 基于 SaStorage 的分布式 Session 存储实现
1016/// 把分布式 Session、登录索引、服务凭证统一持久化到任意 SaStorage 后端(Redis / 内存 / 数据库)
1017///
1018/// # 存储键格式
1019/// - Session: `{prefix}dsession:{session_id}`
1020/// - 登录索引: `{prefix}dsession:index:{login_id}`
1021/// - 服务凭证: `{prefix}dservice:{service_id}`
1022pub struct SaStorageDistributedStorage {
1023 /// 底层通用 KV 存储
1024 storage: Arc<dyn SaStorage>,
1025 /// 存储键前缀(应与 SaTokenConfig::storage_key_prefix 保持一致)
1026 key_prefix: String,
1027}
1028
1029impl SaStorageDistributedStorage {
1030 /// 创建适配器
1031 ///
1032 /// # 参数
1033 /// - `storage`: 底层存储实现(可直接复用全局 SaTokenManager 使用的同一个 storage)
1034 /// - `key_prefix`: 存储键前缀,建议传入 `config.storage_key_prefix.clone()` 以保持一致
1035 pub fn new(storage: Arc<dyn SaStorage>, key_prefix: impl Into<String>) -> Self {
1036 Self {
1037 storage,
1038 key_prefix: key_prefix.into(),
1039 }
1040 }
1041
1042 /// 构造 Session 键:{prefix}dsession:{session_id}
1043 fn session_key(&self, session_id: &str) -> String {
1044 format!("{}dsession:{}", self.key_prefix, session_id)
1045 }
1046
1047 /// 构造登录索引键:{prefix}dsession:index:{login_id}
1048 fn index_key(&self, login_id: &str) -> String {
1049 format!("{}dsession:index:{}", self.key_prefix, login_id)
1050 }
1051
1052 /// 构造凭证键:{prefix}dservice:{service_id}
1053 fn credential_key(&self, service_id: &str) -> String {
1054 format!("{}dservice:{}", self.key_prefix, service_id)
1055 }
1056
1057 /// 读取某用户的登录索引(session_id 列表)
1058 /// 不存在时返回空 Vec
1059 async fn load_index(&self, index_key: &str) -> Result<Vec<String>, SaTokenError> {
1060 match self
1061 .storage
1062 .get(index_key)
1063 .await
1064 .map_err(|e| SaTokenError::StorageError(e.to_string()))?
1065 {
1066 Some(value) => serde_json::from_str(&value).map_err(SaTokenError::SerializationError),
1067 None => Ok(Vec::new()),
1068 }
1069 }
1070
1071 /// 回写登录索引(永久保存,不设 TTL)
1072 async fn save_index(&self, index_key: &str, ids: &[String]) -> Result<(), SaTokenError> {
1073 let value = serde_json::to_string(ids).map_err(SaTokenError::SerializationError)?;
1074 self.storage
1075 .set(index_key, &value, None)
1076 .await
1077 .map_err(|e| SaTokenError::StorageError(e.to_string()))
1078 }
1079}
1080
1081#[async_trait]
1082impl DistributedSessionStorage for SaStorageDistributedStorage {
1083 /// 保存 Session
1084 /// 1. 写入会话本体(带 TTL,由后端控制过期)
1085 /// 2. 更新登录索引(永久保存,去重;过期 session 在读取时被过滤清理)
1086 async fn save_session(&self, session: DistributedSession, ttl: Option<Duration>) -> Result<(), SaTokenError> {
1087 let session_key = self.session_key(&session.session_id);
1088 let index_key = self.index_key(&session.login_id);
1089 let session_id = session.session_id.clone();
1090
1091 // 1. 写入会话本体
1092 let value = serde_json::to_string(&session).map_err(SaTokenError::SerializationError)?;
1093 self.storage
1094 .set(&session_key, &value, ttl)
1095 .await
1096 .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
1097
1098 // 2. 更新登录索引(去重)
1099 let mut ids = self.load_index(&index_key).await?;
1100 if !ids.contains(&session_id) {
1101 ids.push(session_id);
1102 self.save_index(&index_key, &ids).await?;
1103 }
1104 Ok(())
1105 }
1106
1107 /// 按 session_id 读取会话
1108 /// 未找到或已过期返回 None
1109 async fn get_session(&self, session_id: &str) -> Result<Option<DistributedSession>, SaTokenError> {
1110 match self
1111 .storage
1112 .get(&self.session_key(session_id))
1113 .await
1114 .map_err(|e| SaTokenError::StorageError(e.to_string()))?
1115 {
1116 Some(value) => Ok(Some(serde_json::from_str(&value).map_err(SaTokenError::SerializationError)?)),
1117 None => Ok(None),
1118 }
1119 }
1120
1121 /// 删除会话
1122 /// 1. 先读出会话以获得 login_id(用于维护索引)
1123 /// 2. 删除会话本体
1124 /// 3. 从登录索引中移除该 session_id(无剩余则删除整个索引键)
1125 async fn delete_session(&self, session_id: &str) -> Result<(), SaTokenError> {
1126 if let Some(session) = self.get_session(session_id).await? {
1127 // 1. 删除会话本体
1128 self.storage
1129 .delete(&self.session_key(session_id))
1130 .await
1131 .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
1132
1133 // 2. 从登录索引中移除
1134 let index_key = self.index_key(&session.login_id);
1135 let mut ids = self.load_index(&index_key).await?;
1136 let before = ids.len();
1137 ids.retain(|id| id != session_id);
1138 if ids.is_empty() {
1139 // 无剩余会话则删除整个索引键
1140 self.storage
1141 .delete(&index_key)
1142 .await
1143 .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
1144 } else if ids.len() != before {
1145 self.save_index(&index_key, &ids).await?;
1146 }
1147 }
1148 Ok(())
1149 }
1150
1151 /// 获取某用户全部会话
1152 /// 顺带清理索引中已过期/丢失的 session_id(best-effort 清理,避免索引无限膨胀)
1153 async fn get_sessions_by_login_id(&self, login_id: &str) -> Result<Vec<DistributedSession>, SaTokenError> {
1154 let index_key = self.index_key(login_id);
1155 let ids = self.load_index(&index_key).await?;
1156 let original_len = ids.len();
1157
1158 let mut result = Vec::new();
1159 let mut alive_ids = Vec::new();
1160 for id in ids {
1161 // 会话本体可能因 TTL 已过期 → 读不到则视为失效
1162 if let Some(session) = self.get_session(&id).await? {
1163 result.push(session);
1164 alive_ids.push(id);
1165 }
1166 }
1167
1168 // 清理:索引发生收缩时回写
1169 if alive_ids.is_empty() {
1170 let _ = self.storage.delete(&index_key).await;
1171 } else if alive_ids.len() != original_len {
1172 let _ = self.save_index(&index_key, &alive_ids).await;
1173 }
1174
1175 Ok(result)
1176 }
1177
1178 /// 保存服务凭证(永久保存)
1179 async fn save_credential(&self, credential: ServiceCredential) -> Result<(), SaTokenError> {
1180 let key = self.credential_key(&credential.service_id);
1181 let value = serde_json::to_string(&credential).map_err(SaTokenError::SerializationError)?;
1182 self.storage
1183 .set(&key, &value, None)
1184 .await
1185 .map_err(|e| SaTokenError::StorageError(e.to_string()))
1186 }
1187
1188 /// 按 service_id 读取服务凭证
1189 /// 未找到返回 None
1190 async fn get_credential(&self, service_id: &str) -> Result<Option<ServiceCredential>, SaTokenError> {
1191 match self
1192 .storage
1193 .get(&self.credential_key(service_id))
1194 .await
1195 .map_err(|e| SaTokenError::StorageError(e.to_string()))?
1196 {
1197 Some(value) => Ok(Some(serde_json::from_str(&value).map_err(SaTokenError::SerializationError)?)),
1198 None => Ok(None),
1199 }
1200 }
1201}
1202
1203#[cfg(test)]
1204mod tests {
1205 use super::*;
1206
1207 #[tokio::test]
1208 async fn test_distributed_session_manager() {
1209 let storage = Arc::new(InMemoryDistributedStorage::new());
1210 let manager = DistributedSessionManager::new(
1211 storage,
1212 "service1".to_string(),
1213 Duration::from_secs(3600),
1214 );
1215
1216 let session = manager.create_session(
1217 "user1".to_string(),
1218 "token1".to_string(),
1219 ).await.unwrap();
1220
1221 let retrieved = manager.get_session(&session.session_id).await.unwrap();
1222 assert_eq!(retrieved.login_id, "user1");
1223 }
1224
1225 #[tokio::test]
1226 async fn test_session_attributes() {
1227 let storage = Arc::new(InMemoryDistributedStorage::new());
1228 let manager = DistributedSessionManager::new(
1229 storage,
1230 "service1".to_string(),
1231 Duration::from_secs(3600),
1232 );
1233
1234 let session = manager.create_session(
1235 "user2".to_string(),
1236 "token2".to_string(),
1237 ).await.unwrap();
1238
1239 manager.set_attribute(
1240 &session.session_id,
1241 "key1".to_string(),
1242 "value1".to_string(),
1243 ).await.unwrap();
1244
1245 let value = manager.get_attribute(&session.session_id, "key1").await.unwrap();
1246 assert_eq!(value, Some("value1".to_string()));
1247 }
1248
1249 #[tokio::test]
1250 async fn test_service_verification() {
1251 let storage = Arc::new(InMemoryDistributedStorage::new());
1252 let manager = DistributedSessionManager::new(
1253 storage,
1254 "service1".to_string(),
1255 Duration::from_secs(3600),
1256 );
1257
1258 let credential = ServiceCredential {
1259 service_id: "service2".to_string(),
1260 service_name: "Service 2".to_string(),
1261 secret_key: "secret123".to_string(),
1262 created_at: Utc::now(),
1263 permissions: vec!["read".to_string(), "write".to_string()],
1264 };
1265
1266 manager.register_service(credential.clone()).await.unwrap();
1267
1268 let verified = manager.verify_service("service2", "secret123").await.unwrap();
1269 assert_eq!(verified.service_id, "service2");
1270
1271 let result = manager.verify_service("service2", "wrong_secret").await;
1272 assert!(result.is_err());
1273 }
1274
1275 #[tokio::test]
1276 async fn test_delete_all_sessions() {
1277 let storage = Arc::new(InMemoryDistributedStorage::new());
1278 let manager = DistributedSessionManager::new(
1279 storage,
1280 "service1".to_string(),
1281 Duration::from_secs(3600),
1282 );
1283
1284 manager.create_session("user3".to_string(), "token1".to_string()).await.unwrap();
1285 manager.create_session("user3".to_string(), "token2".to_string()).await.unwrap();
1286
1287 let sessions = manager.get_sessions_by_login_id("user3").await.unwrap();
1288 assert_eq!(sessions.len(), 2);
1289
1290 manager.delete_all_sessions("user3").await.unwrap();
1291
1292 let sessions = manager.get_sessions_by_login_id("user3").await.unwrap();
1293 assert_eq!(sessions.len(), 0);
1294 }
1295}