1use std::sync::Arc;
6use std::collections::HashMap;
7use chrono::{DateTime, Duration, Utc};
8use tokio::sync::RwLock;
9use sa_token_adapter::storage::SaStorage;
10use crate::config::SaTokenConfig;
11use crate::error::{SaTokenError, SaTokenResult};
12use crate::token::{TokenInfo, TokenValue, TokenGenerator};
13use crate::session::SaSession;
14use crate::event::{SaTokenEventBus, SaTokenEvent};
15use crate::online::OnlineManager;
16use crate::distributed::DistributedSessionManager;
17
18#[derive(Clone)]
20pub struct SaTokenManager {
21 pub(crate) storage: Arc<dyn SaStorage>,
22 pub config: SaTokenConfig,
23 pub(crate) user_permissions: Arc<RwLock<HashMap<String, Vec<String>>>>,
25 pub(crate) user_roles: Arc<RwLock<HashMap<String, Vec<String>>>>,
27 pub(crate) event_bus: SaTokenEventBus,
29 online_manager: Option<Arc<OnlineManager>>,
31 distributed_manager: Option<Arc<DistributedSessionManager>>,
33}
34
35impl SaTokenManager {
36 pub fn new(storage: Arc<dyn SaStorage>, config: SaTokenConfig) -> Self {
38 Self {
39 storage,
40 config,
41 user_permissions: Arc::new(RwLock::new(HashMap::new())),
42 user_roles: Arc::new(RwLock::new(HashMap::new())),
43 event_bus: SaTokenEventBus::new(),
44 online_manager: None,
45 distributed_manager: None,
46 }
47 }
48
49 pub fn with_online_manager(mut self, manager: Arc<OnlineManager>) -> Self {
50 self.online_manager = Some(manager);
51 self
52 }
53
54 pub fn with_distributed_manager(mut self, manager: Arc<DistributedSessionManager>) -> Self {
55 self.distributed_manager = Some(manager);
56 self
57 }
58
59 pub fn online_manager(&self) -> Option<&Arc<OnlineManager>> {
60 self.online_manager.as_ref()
61 }
62
63 pub fn distributed_manager(&self) -> Option<&Arc<DistributedSessionManager>> {
64 self.distributed_manager.as_ref()
65 }
66
67 pub fn event_bus(&self) -> &SaTokenEventBus {
69 &self.event_bus
70 }
71
72 pub async fn login(&self, login_id: impl Into<String>) -> SaTokenResult<TokenValue> {
74 self.login_with_options(login_id, None, None, None, None, None).await
75 }
76
77 pub async fn login_with_options(
99 &self,
100 login_id: impl Into<String>,
101 login_type: Option<String>,
102 device: Option<String>,
103 extra_data: Option<serde_json::Value>,
104 nonce: Option<String>,
105 expire_time: Option<DateTime<Utc>>,
106 ) -> SaTokenResult<TokenValue> {
107 let login_id = login_id.into();
108
109 let token = match &extra_data {
111 Some(extra) => TokenGenerator::generate_with_login_id_and_extra(&self.config, &login_id, extra),
112 None => TokenGenerator::generate_with_login_id(&self.config, &login_id),
113 };
114
115 let mut token_info = TokenInfo::new(token.clone(), login_id.clone());
117
118 token_info.login_type = login_type.unwrap_or_else(|| "default".to_string());
120
121 if let Some(device_str) = device {
123 token_info.device = Some(device_str);
124 }
125
126 if let Some(extra) = extra_data {
128 token_info.extra_data = Some(extra);
129 }
130
131 if let Some(nonce_str) = nonce {
133 token_info.nonce = Some(nonce_str);
134 }
135
136 if let Some(custom_expire_time) = expire_time {
138 token_info.expire_time = Some(custom_expire_time);
139 }
140 self.login_with_token_info(token_info).await
144 }
145
146 pub async fn login_with_token_info(&self, mut token_info: TokenInfo) -> SaTokenResult<TokenValue> {
175 let login_id = token_info.login_id.clone();
176
177 let token = if token_info.token.as_str().is_empty() {
179 TokenGenerator::generate_with_login_id(&self.config, &login_id)
180 } else {
181 token_info.token.clone()
182 };
183
184 token_info.token = token.clone();
186
187 token_info.update_active_time();
189
190 let now = Utc::now();
192 if token_info.expire_time.is_none()
193 && let Some(timeout) = self.config.timeout_duration() {
194 token_info.expire_time = Some(now + Duration::from_std(timeout).unwrap());
195 }
196
197 if token_info.login_type.is_empty() {
199 token_info.login_type = "default".to_string();
200 }
201
202 let key = format!("sa:token:{}", token.as_str());
204 let value = serde_json::to_string(&token_info)
205 .map_err(SaTokenError::SerializationError)?;
206
207 self.storage.set(&key, &value, self.config.timeout_duration()).await
208 .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
209
210 let login_token_key = if !token_info.login_type.is_empty() && token_info.login_type != "default" {
214 format!("sa:login:token:{}:{}", login_id, token_info.login_type)
215 } else {
216 format!("sa:login:token:{}", login_id)
217 };
218 self.storage.set(&login_token_key, token.as_str(), self.config.timeout_duration()).await
219 .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
220
221 if !self.config.is_concurrent {
223 self.logout_by_login_id(&login_id).await?;
224 }
225
226 let event = SaTokenEvent::login(login_id.clone(), token.as_str())
228 .with_login_type(&token_info.login_type);
229 self.event_bus.publish(event).await;
230
231 Ok(token)
232 }
233
234 pub async fn logout(&self, token: &TokenValue) -> SaTokenResult<()> {
236 tracing::debug!("Manager: 开始 logout,token: {}", token);
237
238 let key = format!("sa:token:{}", token.as_str());
240 tracing::debug!("Manager: 查询 token 信息,key: {}", key);
241
242 let token_info_str = self.storage.get(&key).await
243 .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
244
245 let token_info = if let Some(value) = token_info_str {
246 tracing::debug!("Manager: 找到 token 信息: {}", value);
247 serde_json::from_str::<TokenInfo>(&value).ok()
248 } else {
249 tracing::debug!("Manager: 未找到 token 信息");
250 None
251 };
252
253 tracing::debug!("Manager: 删除 token,key: {}", key);
255 self.storage.delete(&key).await
256 .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
257 tracing::debug!("Manager: token 已从存储中删除");
258
259 if let Some(info) = token_info.clone() {
261 tracing::debug!("Manager: 触发登出事件,login_id: {}, login_type: {}", info.login_id, info.login_type);
262 let event = SaTokenEvent::logout(&info.login_id, token.as_str())
263 .with_login_type(&info.login_type);
264 self.event_bus.publish(event).await;
265
266 if let Some(online_mgr) = &self.online_manager {
268 tracing::debug!("Manager: 标记用户下线,login_id: {}", info.login_id);
269 online_mgr.mark_offline(&info.login_id, token.as_str()).await;
270 }
271 }
272
273 tracing::debug!("Manager: logout 完成,token: {}", token);
274 Ok(())
275 }
276
277 pub async fn logout_by_login_id(&self, login_id: &str) -> SaTokenResult<()> {
279 let token_prefix = "sa:token:";
281
282 if let Ok(keys) = self.storage.keys(&format!("{}*", token_prefix)).await {
284 for key in keys {
286 if let Ok(Some(token_info_str)) = self.storage.get(&key).await {
288 if let Ok(token_info) = serde_json::from_str::<TokenInfo>(&token_info_str) {
290 if token_info.login_id == login_id {
292 let token_str = key[token_prefix.len()..].to_string();
294 let token = TokenValue::new(token_str);
295
296 let _ = self.logout(&token).await;
298 }
299 }
300 }
301 }
302 }
303
304 Ok(())
305 }
306
307 pub async fn get_token_info(&self, token: &TokenValue) -> SaTokenResult<TokenInfo> {
309 let key = format!("sa:token:{}", token.as_str());
310 let value = self.storage.get(&key).await
311 .map_err(|e| SaTokenError::StorageError(e.to_string()))?
312 .ok_or(SaTokenError::TokenNotFound)?;
313
314 let token_info: TokenInfo = serde_json::from_str(&value)
315 .map_err(SaTokenError::SerializationError)?;
316
317 if token_info.is_expired() {
319 self.logout(token).await?;
321 return Err(SaTokenError::TokenExpired);
322 }
323
324 if self.config.auto_renew {
327 let renew_timeout = if self.config.active_timeout > 0 {
328 self.config.active_timeout
329 } else {
330 self.config.timeout
331 };
332
333 let _ = self.renew_timeout_internal(token, renew_timeout, &token_info).await;
335 }
336
337 Ok(token_info)
338 }
339
340 pub async fn is_valid(&self, token: &TokenValue) -> bool {
342 self.get_token_info(token).await.is_ok()
343 }
344
345 pub async fn get_session(&self, login_id: &str) -> SaTokenResult<SaSession> {
347 let key = format!("sa:session:{}", login_id);
348 let value = self.storage.get(&key).await
349 .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
350
351 if let Some(value) = value {
352 let session: SaSession = serde_json::from_str(&value)
353 .map_err(SaTokenError::SerializationError)?;
354 Ok(session)
355 } else {
356 Ok(SaSession::new(login_id))
357 }
358 }
359
360 pub async fn save_session(&self, session: &SaSession) -> SaTokenResult<()> {
362 let key = format!("sa:session:{}", session.id);
363 let value = serde_json::to_string(session)
364 .map_err(SaTokenError::SerializationError)?;
365
366 self.storage.set(&key, &value, None).await
367 .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
368
369 Ok(())
370 }
371
372 pub async fn delete_session(&self, login_id: &str) -> SaTokenResult<()> {
374 let key = format!("sa:session:{}", login_id);
375 self.storage.delete(&key).await
376 .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
377 Ok(())
378 }
379
380 pub async fn renew_timeout(
382 &self,
383 token: &TokenValue,
384 timeout_seconds: i64,
385 ) -> SaTokenResult<()> {
386 let token_info = self.get_token_info(token).await?;
387 self.renew_timeout_internal(token, timeout_seconds, &token_info).await
388 }
389
390 async fn renew_timeout_internal(
392 &self,
393 token: &TokenValue,
394 timeout_seconds: i64,
395 token_info: &TokenInfo,
396 ) -> SaTokenResult<()> {
397 let mut new_token_info = token_info.clone();
398
399 use chrono::{Utc, Duration};
401 let new_expire_time = Utc::now() + Duration::seconds(timeout_seconds);
402 new_token_info.expire_time = Some(new_expire_time);
403
404 let key = format!("sa:token:{}", token.as_str());
406 let value = serde_json::to_string(&new_token_info)
407 .map_err(SaTokenError::SerializationError)?;
408
409 let timeout = std::time::Duration::from_secs(timeout_seconds as u64);
410 self.storage.set(&key, &value, Some(timeout)).await
411 .map_err(|e| SaTokenError::StorageError(e.to_string()))?;
412
413 Ok(())
414 }
415
416 pub async fn kick_out(&self, login_id: &str) -> SaTokenResult<()> {
418 let token_result = self.storage.get(&format!("sa:login:token:{}", login_id)).await;
419
420 if let Some(online_mgr) = &self.online_manager {
421 let _ = online_mgr.kick_out_notify(login_id, "Account kicked out".to_string()).await;
422 }
423
424 self.logout_by_login_id(login_id).await?;
425 self.delete_session(login_id).await?;
426
427 if let Ok(Some(token_str)) = token_result {
428 let event = SaTokenEvent::kick_out(login_id, token_str);
429 self.event_bus.publish(event).await;
430 }
431
432 Ok(())
433 }
434}