1use crate::audit::{
2 ActorInfo, AuditEvent, AuditEventType, EventOutcome, RequestMetadata, ResourceInfo, RiskLevel,
3};
4use crate::errors::Result;
17use crate::storage::core::{AuthStorage, SessionData};
18use crate::tokens::AuthToken;
19use async_trait::async_trait;
20use dashmap::DashMap;
21use std::time::Duration;
22
23#[derive(Debug, Clone)]
25struct TimestampedToken {
26 token: AuthToken,
27 created_at: chrono::DateTime<chrono::Utc>,
28 expires_at: Option<chrono::DateTime<chrono::Utc>>,
29}
30
31#[derive(Debug, Clone)]
33struct TimestampedSession {
34 session: SessionData,
35 created_at: chrono::DateTime<chrono::Utc>,
36}
37
38#[derive(Debug, Clone)]
40struct TimestampedValue {
41 data: Vec<u8>,
42 created_at: chrono::DateTime<chrono::Utc>,
43 expires_at: Option<chrono::DateTime<chrono::Utc>>,
44}
45
46impl TimestampedToken {
47 fn new(token: AuthToken, ttl: Option<Duration>) -> Self {
48 let now = chrono::Utc::now();
49 let expires_at = ttl.map(|d| now + chrono::Duration::from_std(d).unwrap());
50
51 Self {
52 token,
53 created_at: now,
54 expires_at,
55 }
56 }
57
58 fn is_expired(&self) -> bool {
59 self.expires_at
60 .map(|exp| chrono::Utc::now() > exp)
61 .unwrap_or(false)
62 }
63}
64
65impl TimestampedSession {
66 fn new(session: SessionData) -> Self {
67 Self {
68 session,
69 created_at: chrono::Utc::now(),
70 }
71 }
72
73 fn is_expired(&self) -> bool {
74 self.session.is_expired()
75 }
76}
77
78impl TimestampedValue {
79 fn new(data: Vec<u8>, ttl: Option<Duration>) -> Self {
80 let now = chrono::Utc::now();
81 let expires_at = ttl.map(|d| now + chrono::Duration::from_std(d).unwrap());
82
83 Self {
84 data,
85 created_at: now,
86 expires_at,
87 }
88 }
89
90 fn is_expired(&self) -> bool {
91 self.expires_at
92 .map(|exp| chrono::Utc::now() > exp)
93 .unwrap_or(false)
94 }
95}
96
97#[derive(Debug, Clone)]
99pub struct DashMapMemoryStorage {
100 tokens: DashMap<String, TimestampedToken>,
102 sessions: DashMap<String, TimestampedSession>,
103 kv_store: DashMap<String, TimestampedValue>,
104
105 access_token_to_id: DashMap<String, String>,
107 user_to_tokens: DashMap<String, Vec<String>>,
108 user_to_sessions: DashMap<String, Vec<String>>,
109
110 default_ttl: Option<Duration>,
112}
113
114impl Default for DashMapMemoryStorage {
115 fn default() -> Self {
116 Self::new()
117 }
118}
119
120impl DashMapMemoryStorage {
121 pub fn new() -> Self {
123 Self {
124 tokens: DashMap::new(),
125 sessions: DashMap::new(),
126 kv_store: DashMap::new(),
127 access_token_to_id: DashMap::new(),
128 user_to_tokens: DashMap::new(),
129 user_to_sessions: DashMap::new(),
130 default_ttl: None,
131 }
132 }
133
134 pub fn with_ttl(ttl: Duration) -> Self {
136 Self {
137 tokens: DashMap::new(),
138 sessions: DashMap::new(),
139 kv_store: DashMap::new(),
140 access_token_to_id: DashMap::new(),
141 user_to_tokens: DashMap::new(),
142 user_to_sessions: DashMap::new(),
143 default_ttl: Some(ttl),
144 }
145 }
146
147 fn create_audit_event(
149 &self,
150 event_type: AuditEventType,
151 user_id: &str,
152 resource_id: &str,
153 resource_type: &str,
154 outcome: EventOutcome,
155 details_str: Option<&str>,
156 ) -> AuditEvent {
157 let mut details = std::collections::HashMap::new();
158 if let Some(detail) = details_str {
159 details.insert("operation_details".to_string(), detail.to_string());
160 }
161 details.insert("resource_type".to_string(), resource_type.to_string());
162 details.insert("resource_id".to_string(), resource_id.to_string());
163
164 AuditEvent {
165 id: uuid::Uuid::new_v4().to_string(),
166 event_type: event_type.clone(),
167 timestamp: std::time::SystemTime::now(),
168 user_id: Some(user_id.to_string()),
169 session_id: None,
170 outcome,
171 risk_level: match &event_type {
172 AuditEventType::TokenRevoked | AuditEventType::TokenExpired => RiskLevel::Medium,
173 AuditEventType::SuspiciousActivity => RiskLevel::High,
174 _ => RiskLevel::Low,
175 },
176 description: format!(
177 "{:?} operation on {} {}",
178 event_type, resource_type, resource_id
179 ),
180 details,
181 request_metadata: RequestMetadata {
182 ip_address: None,
183 user_agent: None,
184 request_id: None,
185 endpoint: Some("storage".to_string()),
186 http_method: None,
187 geolocation: None,
188 device_info: None,
189 },
190 resource: Some(ResourceInfo {
191 resource_type: resource_type.to_string(),
192 resource_id: resource_id.to_string(),
193 resource_name: None,
194 attributes: std::collections::HashMap::new(),
195 }),
196 actor: ActorInfo {
197 actor_type: "storage_system".to_string(),
198 actor_id: user_id.to_string(),
199 actor_name: Some(user_id.to_string()),
200 roles: vec!["storage_user".to_string()],
201 },
202 correlation_id: None,
203 }
204 }
205
206 async fn log_storage_operation(
208 &self,
209 event_type: AuditEventType,
210 user_id: &str,
211 resource_id: &str,
212 resource_type: &str,
213 created_at: Option<chrono::DateTime<chrono::Utc>>,
214 outcome: EventOutcome,
215 ) {
216 let details = if let Some(created) = created_at {
217 let age = chrono::Utc::now().signed_duration_since(created);
218 format!(
219 "{:?} operation on {} {} (age: {} seconds)",
220 event_type,
221 resource_type,
222 resource_id,
223 age.num_seconds()
224 )
225 } else {
226 format!(
227 "{:?} operation on {} {}",
228 event_type, resource_type, resource_id
229 )
230 };
231
232 let audit_event = self.create_audit_event(
233 event_type,
234 user_id,
235 resource_id,
236 resource_type,
237 outcome,
238 Some(&details),
239 );
240
241 log::info!(
243 "STORAGE AUDIT: {}",
244 serde_json::to_string(&audit_event).unwrap_or_default()
245 );
246 }
247
248 pub fn get_storage_statistics(&self) -> std::collections::HashMap<String, serde_json::Value> {
250 let mut stats = std::collections::HashMap::new();
251 stats.insert(
252 "total_tokens".to_string(),
253 serde_json::Value::from(self.tokens.len()),
254 );
255 stats.insert(
256 "total_sessions".to_string(),
257 serde_json::Value::from(self.sessions.len()),
258 );
259 stats.insert(
260 "total_kv_pairs".to_string(),
261 serde_json::Value::from(self.kv_store.len()),
262 );
263 stats.insert(
264 "total_users_with_tokens".to_string(),
265 serde_json::Value::from(self.user_to_tokens.len()),
266 );
267 stats.insert(
268 "total_users_with_sessions".to_string(),
269 serde_json::Value::from(self.user_to_sessions.len()),
270 );
271 stats.insert(
272 "timestamp".to_string(),
273 serde_json::Value::from(chrono::Utc::now().to_rfc3339()),
274 );
275 stats
276 }
277
278 pub fn audit_token_ages(&self) -> Vec<(String, String, i64)> {
280 let mut aged_tokens = Vec::new();
281 let now = chrono::Utc::now();
282
283 for entry in self.tokens.iter() {
284 let age_seconds = now.signed_duration_since(entry.created_at).num_seconds();
285 aged_tokens.push((
286 entry.key().clone(),
287 entry.token.user_id.clone(),
288 age_seconds,
289 ));
290 }
291
292 aged_tokens.sort_by(|a, b| b.2.cmp(&a.2)); aged_tokens
294 }
295
296 pub fn audit_session_ages(&self) -> Vec<(String, String, i64)> {
298 let mut aged_sessions = Vec::new();
299 let now = chrono::Utc::now();
300
301 for entry in self.sessions.iter() {
302 let age_seconds = now.signed_duration_since(entry.created_at).num_seconds();
303 aged_sessions.push((
304 entry.key().clone(),
305 entry.session.user_id.clone(),
306 age_seconds,
307 ));
308 }
309
310 aged_sessions.sort_by(|a, b| b.2.cmp(&a.2)); aged_sessions
312 }
313
314 pub fn generate_audit_report(&self) -> std::collections::HashMap<String, serde_json::Value> {
316 let mut report = self.get_storage_statistics();
317
318 let token_ages = self.audit_token_ages();
319 let session_ages = self.audit_session_ages();
320
321 if !token_ages.is_empty() {
323 report.insert(
324 "oldest_token_age_seconds".to_string(),
325 serde_json::Value::from(token_ages[0].2),
326 );
327 report.insert(
328 "tokens_older_than_24h".to_string(),
329 serde_json::Value::from(
330 token_ages.iter().filter(|(_, _, age)| *age > 86400).count(),
331 ),
332 );
333 }
334
335 if !session_ages.is_empty() {
336 report.insert(
337 "oldest_session_age_seconds".to_string(),
338 serde_json::Value::from(session_ages[0].2),
339 );
340 report.insert(
341 "sessions_older_than_24h".to_string(),
342 serde_json::Value::from(
343 session_ages
344 .iter()
345 .filter(|(_, _, age)| *age > 86400)
346 .count(),
347 ),
348 );
349 }
350
351 report
352 }
353
354 fn add_token_to_user_index(&self, user_id: &str, token_id: &str) {
357 self.user_to_tokens
359 .entry(user_id.to_string())
360 .and_modify(|tokens| tokens.push(token_id.to_string()))
361 .or_insert_with(|| vec![token_id.to_string()]);
362 }
363
364 fn remove_token_from_user_index(&self, user_id: &str, token_id: &str) {
366 if let Some(mut entry) = self.user_to_tokens.get_mut(user_id) {
368 entry.retain(|id| id != token_id);
369 if entry.is_empty() {
370 drop(entry); self.user_to_tokens.remove(user_id);
372 }
373 }
374 }
375
376 fn add_session_to_user_index(&self, user_id: &str, session_id: &str) {
378 self.user_to_sessions
379 .entry(user_id.to_string())
380 .and_modify(|sessions| sessions.push(session_id.to_string()))
381 .or_insert_with(|| vec![session_id.to_string()]);
382 }
383
384 fn remove_session_from_user_index(&self, user_id: &str, session_id: &str) {
386 if let Some(mut entry) = self.user_to_sessions.get_mut(user_id) {
387 entry.retain(|id| id != session_id);
388 if entry.is_empty() {
389 drop(entry);
390 self.user_to_sessions.remove(user_id);
391 }
392 }
393 }
394}
395
396#[async_trait]
397impl AuthStorage for DashMapMemoryStorage {
398 async fn store_token(&self, token: &AuthToken) -> Result<()> {
399 let timestamped = TimestampedToken::new(token.clone(), self.default_ttl);
400 let created_at = timestamped.created_at;
401
402 self.tokens.insert(token.token_id.clone(), timestamped);
404
405 self.access_token_to_id
407 .insert(token.access_token.clone(), token.token_id.clone());
408
409 self.add_token_to_user_index(&token.user_id, &token.token_id);
411
412 self.log_storage_operation(
414 AuditEventType::LoginSuccess, &token.user_id,
416 &token.token_id,
417 "token",
418 Some(created_at),
419 EventOutcome::Success,
420 )
421 .await;
422
423 Ok(())
424 }
425
426 async fn get_token(&self, token_id: &str) -> Result<Option<AuthToken>> {
427 if let Some(timestamped) = self.tokens.get(token_id) {
429 let created_at = timestamped.created_at;
430 let user_id = timestamped.token.user_id.clone();
431
432 if timestamped.is_expired() {
433 drop(timestamped); self.tokens.remove(token_id); self.log_storage_operation(
438 AuditEventType::TokenExpired,
439 &user_id,
440 token_id,
441 "token",
442 Some(created_at),
443 EventOutcome::Failure,
444 )
445 .await;
446
447 return Ok(None);
448 }
449
450 let token = timestamped.token.clone();
451 drop(timestamped); self.log_storage_operation(
455 AuditEventType::LoginSuccess, &user_id,
457 token_id,
458 "token",
459 Some(created_at),
460 EventOutcome::Success,
461 )
462 .await;
463
464 Ok(Some(token))
465 } else {
466 Ok(None)
467 }
468 }
469
470 async fn get_token_by_access_token(&self, access_token: &str) -> Result<Option<AuthToken>> {
471 if let Some(token_id_entry) = self.access_token_to_id.get(access_token) {
473 let token_id = token_id_entry.clone(); drop(token_id_entry); self.get_token(&token_id).await } else {
477 Ok(None)
478 }
479 }
480
481 async fn update_token(&self, token: &AuthToken) -> Result<()> {
482 self.store_token(token).await
484 }
485
486 async fn delete_token(&self, token_id: &str) -> Result<()> {
487 let token_info = if let Some(timestamped) = self.tokens.get(token_id) {
489 Some((
490 timestamped.token.user_id.clone(),
491 timestamped.token.access_token.clone(),
492 timestamped.created_at,
493 ))
494 } else {
495 None
496 };
497
498 if let Some((user_id, access_token, created_at)) = token_info {
499 self.tokens.remove(token_id);
501 self.access_token_to_id.remove(&access_token);
502 self.remove_token_from_user_index(&user_id, token_id);
503
504 self.log_storage_operation(
506 AuditEventType::TokenRevoked,
507 &user_id,
508 token_id,
509 "token",
510 Some(created_at),
511 EventOutcome::Success,
512 )
513 .await;
514 }
515
516 Ok(())
517 }
518
519 async fn list_user_tokens(&self, user_id: &str) -> Result<Vec<AuthToken>> {
520 let token_ids = if let Some(ids) = self.user_to_tokens.get(user_id) {
522 ids.clone() } else {
524 return Ok(Vec::new());
525 };
526
527 let mut tokens = Vec::new();
528 let mut expired_tokens = Vec::new();
529
530 for token_id in token_ids {
532 if let Some(timestamped) = self.tokens.get(&token_id) {
533 if timestamped.is_expired() {
534 expired_tokens.push(token_id);
535 } else {
536 tokens.push(timestamped.token.clone());
537 }
538 } else {
539 expired_tokens.push(token_id); }
541 }
542
543 for token_id in expired_tokens {
545 self.delete_token(&token_id).await?;
546 }
547
548 Ok(tokens)
549 }
550
551 async fn store_session(&self, session_id: &str, data: &SessionData) -> Result<()> {
552 let timestamped = TimestampedSession::new(data.clone());
553 let created_at = timestamped.created_at;
554
555 self.sessions.insert(session_id.to_string(), timestamped);
557
558 self.add_session_to_user_index(&data.user_id, session_id);
560
561 self.log_storage_operation(
563 AuditEventType::LoginSuccess, &data.user_id,
565 session_id,
566 "session",
567 Some(created_at),
568 EventOutcome::Success,
569 )
570 .await;
571
572 Ok(())
573 }
574
575 async fn get_session(&self, session_id: &str) -> Result<Option<SessionData>> {
576 if let Some(timestamped) = self.sessions.get(session_id) {
578 let created_at = timestamped.created_at;
579 let user_id = timestamped.session.user_id.clone();
580
581 if timestamped.is_expired() {
582 drop(timestamped);
583 self.sessions.remove(session_id);
584
585 self.log_storage_operation(
587 AuditEventType::TokenExpired, &user_id,
589 session_id,
590 "session",
591 Some(created_at),
592 EventOutcome::Failure,
593 )
594 .await;
595
596 return Ok(None);
597 }
598
599 let session = timestamped.session.clone();
600 drop(timestamped);
601
602 self.log_storage_operation(
604 AuditEventType::LoginSuccess, &user_id,
606 session_id,
607 "session",
608 Some(created_at),
609 EventOutcome::Success,
610 )
611 .await;
612
613 Ok(Some(session))
614 } else {
615 Ok(None)
616 }
617 }
618
619 async fn delete_session(&self, session_id: &str) -> Result<()> {
620 let session_info = if let Some(timestamped) = self.sessions.get(session_id) {
622 Some((timestamped.session.user_id.clone(), timestamped.created_at))
623 } else {
624 None
625 };
626
627 if let Some((user_id, created_at)) = session_info {
628 self.sessions.remove(session_id);
629 self.remove_session_from_user_index(&user_id, session_id);
630
631 self.log_storage_operation(
633 AuditEventType::Logout,
634 &user_id,
635 session_id,
636 "session",
637 Some(created_at),
638 EventOutcome::Success,
639 )
640 .await;
641 }
642
643 Ok(())
644 }
645
646 async fn list_user_sessions(&self, user_id: &str) -> Result<Vec<SessionData>> {
647 let session_ids = if let Some(ids) = self.user_to_sessions.get(user_id) {
649 ids.clone()
650 } else {
651 return Ok(Vec::new());
652 };
653
654 let mut sessions = Vec::new();
655 let mut expired_sessions = Vec::new();
656
657 for session_id in session_ids {
659 if let Some(timestamped) = self.sessions.get(&session_id) {
660 if timestamped.is_expired() {
661 expired_sessions.push(session_id);
662 } else {
663 sessions.push(timestamped.session.clone());
664 }
665 } else {
666 expired_sessions.push(session_id);
667 }
668 }
669
670 for session_id in expired_sessions {
672 self.delete_session(&session_id).await?;
673 }
674
675 Ok(sessions)
676 }
677
678 async fn store_kv(&self, key: &str, value: &[u8], ttl: Option<Duration>) -> Result<()> {
679 let timestamped = TimestampedValue::new(value.to_vec(), ttl.or(self.default_ttl));
680 let created_at = timestamped.created_at;
681
682 self.kv_store.insert(key.to_string(), timestamped);
683
684 self.log_storage_operation(
686 AuditEventType::ConfigurationChanged, "system", key,
689 "kv_pair",
690 Some(created_at),
691 EventOutcome::Success,
692 )
693 .await;
694
695 Ok(())
696 }
697
698 async fn get_kv(&self, key: &str) -> Result<Option<Vec<u8>>> {
699 if let Some(timestamped) = self.kv_store.get(key) {
701 let created_at = timestamped.created_at;
702
703 if timestamped.is_expired() {
704 drop(timestamped);
705 self.kv_store.remove(key);
706
707 self.log_storage_operation(
709 AuditEventType::TokenExpired, "system",
711 key,
712 "kv_pair",
713 Some(created_at),
714 EventOutcome::Failure,
715 )
716 .await;
717
718 return Ok(None);
719 }
720
721 let data = timestamped.data.clone();
722 drop(timestamped);
723
724 self.log_storage_operation(
726 AuditEventType::ConfigurationChanged, "system",
728 key,
729 "kv_pair",
730 Some(created_at),
731 EventOutcome::Success,
732 )
733 .await;
734
735 Ok(Some(data))
736 } else {
737 Ok(None)
738 }
739 }
740
741 async fn delete_kv(&self, key: &str) -> Result<()> {
742 let created_at = if let Some(timestamped) = self.kv_store.get(key) {
744 Some(timestamped.created_at)
745 } else {
746 None
747 };
748
749 self.kv_store.remove(key);
750
751 if let Some(created_at) = created_at {
752 self.log_storage_operation(
754 AuditEventType::ConfigurationChanged, "system",
756 key,
757 "kv_pair",
758 Some(created_at),
759 EventOutcome::Success,
760 )
761 .await;
762 }
763
764 Ok(())
765 }
766
767 async fn cleanup_expired(&self) -> Result<()> {
768 let mut expired_tokens = Vec::new();
770 let mut expired_sessions = Vec::new();
771 let mut expired_kvs = Vec::new();
772
773 for entry in self.tokens.iter() {
775 if entry.is_expired() {
776 expired_tokens.push(entry.key().clone());
777 }
778 }
779
780 for entry in self.sessions.iter() {
781 if entry.is_expired() {
782 expired_sessions.push(entry.key().clone());
783 }
784 }
785
786 for entry in self.kv_store.iter() {
787 if entry.is_expired() {
788 expired_kvs.push(entry.key().clone());
789 }
790 }
791
792 for token_id in expired_tokens {
794 self.delete_token(&token_id).await?;
795 }
796
797 for session_id in expired_sessions {
798 self.delete_session(&session_id).await?;
799 }
800
801 for key in expired_kvs {
802 self.delete_kv(&key).await?;
803 }
804
805 Ok(())
806 }
807
808 async fn count_active_sessions(&self) -> Result<u64> {
809 let mut count = 0;
810
811 for entry in self.sessions.iter() {
813 if !entry.is_expired() {
814 count += 1;
815 }
816 }
817
818 Ok(count)
819 }
820}
821
822#[cfg(test)]
823mod tests {
824 use super::*;
825 use crate::{testing::test_infrastructure::TestEnvironmentGuard, tokens::TokenMetadata};
826 use std::collections::HashMap;
827 use tokio::task::JoinSet;
828
829 #[tokio::test]
830 async fn test_basic_token_operations() {
831 let _env = TestEnvironmentGuard::new().with_jwt_secret("dashmap-test");
832 let storage = DashMapMemoryStorage::new();
833
834 let token = AuthToken {
835 token_id: "test-token".to_string(),
836 user_id: "test-user".to_string(),
837 access_token: "access-123".to_string(),
838 token_type: Some("bearer".to_string()),
839 subject: Some("test-user".to_string()),
840 issuer: Some("test-issuer".to_string()),
841 refresh_token: None,
842 issued_at: chrono::Utc::now(),
843 expires_at: chrono::Utc::now() + chrono::Duration::hours(1),
844 scopes: vec!["read".to_string()],
845 auth_method: "password".to_string(),
846 client_id: Some("test-client".to_string()),
847 user_profile: None,
848 permissions: vec!["read:data".to_string()],
849 roles: vec!["user".to_string()],
850 metadata: TokenMetadata::default(),
851 };
852
853 storage.store_token(&token).await.unwrap();
855
856 let retrieved = storage.get_token("test-token").await.unwrap().unwrap();
858 assert_eq!(retrieved.user_id, "test-user");
859
860 let retrieved = storage
862 .get_token_by_access_token("access-123")
863 .await
864 .unwrap()
865 .unwrap();
866 assert_eq!(retrieved.token_id, "test-token");
867
868 let user_tokens = storage.list_user_tokens("test-user").await.unwrap();
870 assert_eq!(user_tokens.len(), 1);
871
872 storage.delete_token("test-token").await.unwrap();
874 let retrieved = storage.get_token("test-token").await.unwrap();
875 assert!(retrieved.is_none());
876 }
877
878 #[tokio::test]
879 async fn test_session_operations() {
880 let _env = TestEnvironmentGuard::new().with_jwt_secret("dashmap-session-test");
881 let storage = DashMapMemoryStorage::new();
882
883 let session = SessionData {
884 session_id: "test-session".to_string(),
885 user_id: "test-user".to_string(),
886 created_at: chrono::Utc::now(),
887 last_activity: chrono::Utc::now(),
888 expires_at: chrono::Utc::now() + chrono::Duration::hours(1),
889 ip_address: Some("192.168.1.1".to_string()),
890 user_agent: Some("test-agent".to_string()),
891 data: HashMap::new(),
892 };
893
894 storage
896 .store_session("test-session", &session)
897 .await
898 .unwrap();
899
900 let retrieved = storage.get_session("test-session").await.unwrap().unwrap();
902 assert_eq!(retrieved.user_id, "test-user");
903
904 let user_sessions = storage.list_user_sessions("test-user").await.unwrap();
906 assert_eq!(user_sessions.len(), 1);
907
908 storage.delete_session("test-session").await.unwrap();
910 let retrieved = storage.get_session("test-session").await.unwrap();
911 assert!(retrieved.is_none());
912 }
913
914 #[tokio::test]
915 async fn test_kv_operations() {
916 let _env = TestEnvironmentGuard::new().with_jwt_secret("dashmap-kv-test");
917 let storage = DashMapMemoryStorage::new();
918
919 let key = "test-key";
920 let value = b"test-value";
921
922 storage
924 .store_kv(key, value, Some(Duration::from_secs(3600)))
925 .await
926 .unwrap();
927
928 let retrieved = storage.get_kv(key).await.unwrap().unwrap();
930 assert_eq!(retrieved, value);
931
932 storage.delete_kv(key).await.unwrap();
934 let retrieved = storage.get_kv(key).await.unwrap();
935 assert!(retrieved.is_none());
936 }
937
938 #[tokio::test]
939 async fn test_concurrent_operations_no_deadlock() {
940 let _env = TestEnvironmentGuard::new().with_jwt_secret("dashmap-concurrent-test");
941 let storage = std::sync::Arc::new(DashMapMemoryStorage::new());
942
943 let mut join_set = JoinSet::new();
944
945 for i in 0..10 {
947 let storage = storage.clone();
948 join_set.spawn(async move {
949 for j in 0..50 {
950 let token = AuthToken {
951 token_id: format!("token-{}-{}", i, j),
952 user_id: format!("user-{}", i % 3), access_token: format!("access-{}-{}", i, j),
954 token_type: Some("bearer".to_string()),
955 subject: Some(format!("user-{}", i % 3)),
956 issuer: Some("test-issuer".to_string()),
957 refresh_token: None,
958 issued_at: chrono::Utc::now(),
959 expires_at: chrono::Utc::now() + chrono::Duration::hours(1),
960 scopes: vec!["read".to_string()],
961 auth_method: "password".to_string(),
962 client_id: Some("test-client".to_string()),
963 user_profile: None,
964 permissions: vec!["read:data".to_string()],
965 roles: vec!["user".to_string()],
966 metadata: TokenMetadata::default(),
967 };
968
969 storage.store_token(&token).await.unwrap();
971
972 let _user_tokens = storage.list_user_tokens(&token.user_id).await.unwrap();
974
975 let _retrieved = storage
977 .get_token_by_access_token(&token.access_token)
978 .await
979 .unwrap();
980 }
981 });
982 }
983
984 while join_set.join_next().await.is_some() {}
986
987 println!("✅ Concurrent operations test passed - no deadlocks detected!");
988 }
989
990 #[tokio::test]
991 async fn test_ttl_expiration() {
992 let _env = TestEnvironmentGuard::new().with_jwt_secret("dashmap-ttl-test");
993 let storage = DashMapMemoryStorage::with_ttl(Duration::from_millis(100));
994
995 storage
997 .store_kv("expiring-key", b"expiring-value", None)
998 .await
999 .unwrap();
1000
1001 let retrieved = storage.get_kv("expiring-key").await.unwrap();
1003 assert!(retrieved.is_some());
1004
1005 tokio::time::sleep(Duration::from_millis(150)).await;
1007
1008 let retrieved = storage.get_kv("expiring-key").await.unwrap();
1010 assert!(retrieved.is_none());
1011 }
1012
1013 #[tokio::test]
1014 async fn test_cleanup_expired() {
1015 let _env = TestEnvironmentGuard::new().with_jwt_secret("dashmap-cleanup-test");
1016 let storage = DashMapMemoryStorage::with_ttl(Duration::from_millis(50));
1017
1018 for i in 0..10 {
1020 storage
1021 .store_kv(&format!("key-{}", i), b"value", None)
1022 .await
1023 .unwrap();
1024 }
1025
1026 tokio::time::sleep(Duration::from_millis(100)).await;
1028
1029 storage.cleanup_expired().await.unwrap();
1031
1032 for i in 0..10 {
1034 let retrieved = storage.get_kv(&format!("key-{}", i)).await.unwrap();
1035 assert!(retrieved.is_none());
1036 }
1037 }
1038}
1039