1use std::sync::Arc;
6use chrono::{DateTime, Duration, Utc};
7use sa_token_adapter::storage::SaStorage;
8use crate::config::SaTokenConfig;
9use crate::error::{SaTokenError, SaTokenResult};
10use crate::token::{TokenInfo, TokenValue, TokenGenerator};
11use crate::session::SaSession;
12use crate::event::{SaTokenEventBus, SaTokenEvent};
13use crate::online::OnlineManager;
14use crate::distributed::DistributedSessionManager;
15
16#[derive(Clone)]
18pub struct SaTokenManager {
19 pub(crate) storage: Arc<dyn SaStorage>,
21 pub config: SaTokenConfig,
23 pub(crate) event_bus: SaTokenEventBus,
25 online_manager: Option<Arc<OnlineManager>>,
27 distributed_manager: Option<Arc<DistributedSessionManager>>,
29}
30
31impl SaTokenManager {
32 pub fn new(storage: Arc<dyn SaStorage>, config: SaTokenConfig) -> Self {
34 Self {
35 storage,
36 config,
37 event_bus: SaTokenEventBus::new(),
38 online_manager: None,
39 distributed_manager: None,
40 }
41 }
42
43 pub fn with_online_manager(mut self, manager: Arc<OnlineManager>) -> Self {
44 self.online_manager = Some(manager);
45 self
46 }
47
48 pub fn with_distributed_manager(mut self, manager: Arc<DistributedSessionManager>) -> Self {
49 self.distributed_manager = Some(manager);
50 self
51 }
52
53 pub fn online_manager(&self) -> Option<&Arc<OnlineManager>> {
54 self.online_manager.as_ref()
55 }
56
57 pub fn distributed_manager(&self) -> Option<&Arc<DistributedSessionManager>> {
58 self.distributed_manager.as_ref()
59 }
60
61 pub fn event_bus(&self) -> &SaTokenEventBus {
63 &self.event_bus
64 }
65
66 pub async fn login(&self, login_id: impl Into<String>) -> SaTokenResult<TokenValue> {
68 self.login_with_options(login_id, None, None, None, None, None).await
69 }
70
71 pub async fn login_with_options(
93 &self,
94 login_id: impl Into<String>,
95 login_type: Option<String>,
96 device: Option<String>,
97 extra_data: Option<serde_json::Value>,
98 nonce: Option<String>,
99 expire_time: Option<DateTime<Utc>>,
100 ) -> SaTokenResult<TokenValue> {
101 let login_id = login_id.into();
102
103 let token = match &extra_data {
105 Some(extra) => TokenGenerator::generate_with_login_id_and_extra(&self.config, &login_id, extra),
106 None => TokenGenerator::generate_with_login_id(&self.config, &login_id),
107 };
108
109 let mut token_info = TokenInfo::new(token.clone(), login_id.clone());
111
112 token_info.login_type = login_type.unwrap_or_else(|| "default".to_string());
114
115 if let Some(device_str) = device {
117 token_info.device = Some(device_str);
118 }
119
120 if let Some(extra) = extra_data {
122 token_info.extra_data = Some(extra);
123 }
124
125 if let Some(nonce_str) = nonce {
127 token_info.nonce = Some(nonce_str);
128 }
129
130 if let Some(custom_expire_time) = expire_time {
132 token_info.expire_time = Some(custom_expire_time);
133 }
134 self.login_with_token_info(token_info).await
138 }
139
140 pub async fn login_with_token_info(&self, mut token_info: TokenInfo) -> SaTokenResult<TokenValue> {
169 let login_id = token_info.login_id.clone();
170
171 let token = if token_info.token.as_str().is_empty() {
173 TokenGenerator::generate_with_login_id(&self.config, &login_id)
174 } else {
175 token_info.token.clone()
176 };
177
178 token_info.token = token.clone();
180
181 token_info.update_active_time();
183
184 let now = Utc::now();
186 if token_info.expire_time.is_none()
187 && let Some(timeout) = self.config.timeout_duration() {
188 token_info.expire_time = Some(now + Duration::from_std(timeout).unwrap());
189 }
190
191 if token_info.login_type.is_empty() {
193 token_info.login_type = "default".to_string();
194 }
195
196 let key = self.config.make_key("token:", token.as_str());
198 let value = serde_json::to_string(&token_info)
199 .map_err(SaTokenError::SerializationError)?;
200
201 self.storage.set(&key, &value, self.config.timeout_duration()).await
202 .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
203
204 let login_token_key = if !token_info.login_type.is_empty() && token_info.login_type != "default" {
208 self.config.make_key("login:token:", &format!("{}:{}", login_id, token_info.login_type))
209 } else {
210 self.config.make_key("login:token:", &login_id)
211 };
212 self.storage.set(&login_token_key, token.as_str(), self.config.timeout_duration()).await
213 .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
214
215 if !self.config.is_concurrent {
217 self.logout_by_login_id(&login_id).await?;
218 }
219
220 let event = SaTokenEvent::login(login_id.clone(), token.as_str())
222 .with_login_type(&token_info.login_type);
223 self.event_bus.publish(event).await;
224
225 Ok(token)
226 }
227
228 pub async fn logout(&self, token: &TokenValue) -> SaTokenResult<()> {
230 tracing::debug!("Manager: 开始 logout,token: {}", token);
231
232 let key = self.config.make_key("token:", token.as_str());
234 tracing::debug!("Manager: 查询 token 信息,key: {}", key);
235
236 let token_info_str = self.storage.get(&key).await
237 .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
238
239 let token_info = if let Some(value) = token_info_str {
240 tracing::debug!("Manager: 找到 token 信息: {}", value);
241 serde_json::from_str::<TokenInfo>(&value).ok()
242 } else {
243 tracing::debug!("Manager: 未找到 token 信息");
244 None
245 };
246
247 tracing::debug!("Manager: 删除 token,key: {}", key);
249 self.storage.delete(&key).await
250 .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
251 tracing::debug!("Manager: token 已从存储中删除");
252
253 if let Some(info) = token_info.clone() {
255 tracing::debug!("Manager: 触发登出事件,login_id: {}, login_type: {}", info.login_id, info.login_type);
256 let event = SaTokenEvent::logout(&info.login_id, token.as_str())
257 .with_login_type(&info.login_type);
258 self.event_bus.publish(event).await;
259
260 if let Some(online_mgr) = &self.online_manager {
262 tracing::debug!("Manager: 标记用户下线,login_id: {}", info.login_id);
263 online_mgr.mark_offline(&info.login_id, token.as_str()).await;
264 }
265 }
266
267 tracing::debug!("Manager: logout 完成,token: {}", token);
268 Ok(())
269 }
270
271 pub async fn logout_by_login_id(&self, login_id: &str) -> SaTokenResult<()> {
273 let token_prefix = format!("{}token:", self.config.key_prefix());
275
276 if let Ok(keys) = self.storage.keys(&format!("{}*", token_prefix)).await {
278 for key in keys {
280 if let Ok(Some(token_info_str)) = self.storage.get(&key).await {
282 if let Ok(token_info) = serde_json::from_str::<TokenInfo>(&token_info_str) {
284 if token_info.login_id == login_id {
286 let token_str = key[token_prefix.len()..].to_string();
288 let token = TokenValue::new(token_str);
289
290 let _ = self.logout(&token).await;
292 }
293 }
294 }
295 }
296 }
297
298 Ok(())
299 }
300
301 pub async fn get_token_info(&self, token: &TokenValue) -> SaTokenResult<TokenInfo> {
303 let key = self.config.make_key("token:", token.as_str());
304 let value = self.storage.get(&key).await
305 .map_err(|e| SaTokenError::StorageError(e.to_string()))?
306 .ok_or(SaTokenError::TokenNotFound)?;
307
308 let token_info: TokenInfo = serde_json::from_str(&value)
309 .map_err(SaTokenError::SerializationError)?;
310
311 if token_info.is_expired() {
313 self.logout(token).await?;
315 return Err(SaTokenError::TokenExpired);
316 }
317
318 if self.config.auto_renew {
321 let renew_timeout = if self.config.active_timeout > 0 {
322 self.config.active_timeout
323 } else {
324 self.config.timeout
325 };
326
327 let _ = self.renew_timeout_internal(token, renew_timeout, &token_info).await;
329 }
330
331 Ok(token_info)
332 }
333
334 pub async fn is_valid(&self, token: &TokenValue) -> bool {
336 self.get_token_info(token).await.is_ok()
337 }
338
339 pub async fn get_session(&self, login_id: &str) -> SaTokenResult<SaSession> {
341 let key = self.config.make_key("session:", login_id);
342 let value = self.storage.get(&key).await
343 .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
344
345 if let Some(value) = value {
346 let session: SaSession = serde_json::from_str(&value)
347 .map_err(SaTokenError::SerializationError)?;
348 Ok(session)
349 } else {
350 Ok(SaSession::new(login_id))
351 }
352 }
353
354 pub async fn save_session(&self, session: &SaSession) -> SaTokenResult<()> {
356 let key = self.config.make_key("session:", &session.id);
357 let value = serde_json::to_string(session)
358 .map_err(SaTokenError::SerializationError)?;
359
360 self.storage.set(&key, &value, None).await
361 .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
362
363 Ok(())
364 }
365
366 pub async fn delete_session(&self, login_id: &str) -> SaTokenResult<()> {
368 let key = self.config.make_key("session:", login_id);
369 self.storage.delete(&key).await
370 .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
371 Ok(())
372 }
373
374 pub async fn renew_timeout(
376 &self,
377 token: &TokenValue,
378 timeout_seconds: i64,
379 ) -> SaTokenResult<()> {
380 let token_info = self.get_token_info(token).await?;
381 self.renew_timeout_internal(token, timeout_seconds, &token_info).await
382 }
383
384 async fn renew_timeout_internal(
386 &self,
387 token: &TokenValue,
388 timeout_seconds: i64,
389 token_info: &TokenInfo,
390 ) -> SaTokenResult<()> {
391 let mut new_token_info = token_info.clone();
392
393 use chrono::{Utc, Duration};
395 let new_expire_time = Utc::now() + Duration::seconds(timeout_seconds);
396 new_token_info.expire_time = Some(new_expire_time);
397
398 let key = self.config.make_key("token:", token.as_str());
400 let value = serde_json::to_string(&new_token_info)
401 .map_err(SaTokenError::SerializationError)?;
402
403 let timeout = std::time::Duration::from_secs(timeout_seconds as u64);
404 self.storage.set(&key, &value, Some(timeout)).await
405 .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
406
407 Ok(())
408 }
409
410 pub async fn kick_out(&self, login_id: &str) -> SaTokenResult<()> {
412 let token_result = self.storage.get(&self.config.make_key("login:token:", login_id)).await;
413
414 if let Some(online_mgr) = &self.online_manager {
415 let _ = online_mgr.kick_out_notify(login_id, "Account kicked out".to_string()).await;
416 }
417
418 self.logout_by_login_id(login_id).await?;
419 self.delete_session(login_id).await?;
420
421 if let Ok(Some(token_str)) = token_result {
422 let event = SaTokenEvent::kick_out(login_id, token_str);
423 self.event_bus.publish(event).await;
424 }
425
426 Ok(())
427 }
428}
429
430impl SaTokenManager {
433 fn permission_key(&self, login_id: &str) -> String {
435 self.config.make_key("permission:", login_id)
436 }
437
438 fn role_key(&self, login_id: &str) -> String {
440 self.config.make_key("role:", login_id)
441 }
442
443 async fn save_string_list(&self, key: &str, list: &[String]) -> SaTokenResult<()> {
446 let value = serde_json::to_string(list).map_err(SaTokenError::SerializationError)?;
447 self.storage
448 .set(key, &value, None)
449 .await
450 .map_err(|e| SaTokenError::StorageError(e.to_string()))
451 }
452
453 async fn load_string_list(&self, key: &str) -> SaTokenResult<Vec<String>> {
456 match self
457 .storage
458 .get(key)
459 .await
460 .map_err(|e| SaTokenError::StorageError(e.to_string()))?
461 {
462 Some(value) => serde_json::from_str(&value).map_err(SaTokenError::SerializationError),
463 None => Ok(Vec::new()),
464 }
465 }
466
467 pub async fn set_permissions(&self, login_id: &str, permissions: Vec<String>) -> SaTokenResult<()> {
470 self.save_string_list(&self.permission_key(login_id), &permissions).await
471 }
472
473 pub async fn get_permissions(&self, login_id: &str) -> SaTokenResult<Vec<String>> {
476 self.load_string_list(&self.permission_key(login_id)).await
477 }
478
479 pub async fn add_permission(&self, login_id: &str, permission: String) -> SaTokenResult<()> {
482 let key = self.permission_key(login_id);
483 let mut list = self.load_string_list(&key).await?;
484 if !list.contains(&permission) {
485 list.push(permission);
486 self.save_string_list(&key, &list).await?;
487 }
488 Ok(())
489 }
490
491 pub async fn remove_permission(&self, login_id: &str, permission: &str) -> SaTokenResult<()> {
494 let key = self.permission_key(login_id);
495 let mut list = self.load_string_list(&key).await?;
496 let before = list.len();
497 list.retain(|p| p != permission);
498 if list.len() != before {
499 self.save_string_list(&key, &list).await?;
500 }
501 Ok(())
502 }
503
504 pub async fn clear_permissions(&self, login_id: &str) -> SaTokenResult<()> {
507 self.storage
508 .delete(&self.permission_key(login_id))
509 .await
510 .map_err(|e| SaTokenError::StorageError(e.to_string()))
511 }
512
513 pub async fn set_roles(&self, login_id: &str, roles: Vec<String>) -> SaTokenResult<()> {
516 self.save_string_list(&self.role_key(login_id), &roles).await
517 }
518
519 pub async fn get_roles(&self, login_id: &str) -> SaTokenResult<Vec<String>> {
522 self.load_string_list(&self.role_key(login_id)).await
523 }
524
525 pub async fn add_role(&self, login_id: &str, role: String) -> SaTokenResult<()> {
528 let key = self.role_key(login_id);
529 let mut list = self.load_string_list(&key).await?;
530 if !list.contains(&role) {
531 list.push(role);
532 self.save_string_list(&key, &list).await?;
533 }
534 Ok(())
535 }
536
537 pub async fn remove_role(&self, login_id: &str, role: &str) -> SaTokenResult<()> {
540 let key = self.role_key(login_id);
541 let mut list = self.load_string_list(&key).await?;
542 let before = list.len();
543 list.retain(|r| r != role);
544 if list.len() != before {
545 self.save_string_list(&key, &list).await?;
546 }
547 Ok(())
548 }
549
550 pub async fn clear_roles(&self, login_id: &str) -> SaTokenResult<()> {
553 self.storage
554 .delete(&self.role_key(login_id))
555 .await
556 .map_err(|e| SaTokenError::StorageError(e.to_string()))
557 }
558}