1use crate::audit::{
2 ActorInfo, AuditEvent, AuditEventType, EventOutcome, RequestMetadata, ResourceInfo, RiskLevel,
3};
4use crate::errors::Result;
17use crate::storage::core::{AuthStorage, SessionData};
18use crate::tokens::AuthToken;
19use async_trait::async_trait;
20use dashmap::DashMap;
21use std::time::Duration;
22
23#[derive(Debug, Clone)]
25struct TimestampedToken {
26 token: AuthToken,
27 created_at: chrono::DateTime<chrono::Utc>,
28 expires_at: Option<chrono::DateTime<chrono::Utc>>,
29}
30
31#[derive(Debug, Clone)]
33struct TimestampedSession {
34 session: SessionData,
35 created_at: chrono::DateTime<chrono::Utc>,
36}
37
38#[derive(Debug, Clone)]
40struct TimestampedValue {
41 data: Vec<u8>,
42 created_at: chrono::DateTime<chrono::Utc>,
43 expires_at: Option<chrono::DateTime<chrono::Utc>>,
44}
45
46impl TimestampedToken {
47 fn new(token: AuthToken, ttl: Option<Duration>) -> Self {
48 let now = chrono::Utc::now();
49 let expires_at =
50 ttl.map(|d| now + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::hours(1)));
51
52 Self {
53 token,
54 created_at: now,
55 expires_at,
56 }
57 }
58
59 fn is_expired(&self) -> bool {
60 self.expires_at
61 .map(|exp| chrono::Utc::now() > exp)
62 .unwrap_or(false)
63 }
64}
65
66impl TimestampedSession {
67 fn new(session: SessionData) -> Self {
68 Self {
69 session,
70 created_at: chrono::Utc::now(),
71 }
72 }
73
74 fn is_expired(&self) -> bool {
75 self.session.is_expired()
76 }
77}
78
79impl TimestampedValue {
80 fn new(data: Vec<u8>, ttl: Option<Duration>) -> Self {
81 let now = chrono::Utc::now();
82 let expires_at =
83 ttl.map(|d| now + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::hours(1)));
84
85 Self {
86 data,
87 created_at: now,
88 expires_at,
89 }
90 }
91
92 fn is_expired(&self) -> bool {
93 self.expires_at
94 .map(|exp| chrono::Utc::now() > exp)
95 .unwrap_or(false)
96 }
97}
98
99#[derive(Debug, Clone)]
101pub struct DashMapMemoryStorage {
102 tokens: DashMap<String, TimestampedToken>,
104 sessions: DashMap<String, TimestampedSession>,
105 kv_store: DashMap<String, TimestampedValue>,
106
107 access_token_to_id: DashMap<String, String>,
109 user_to_tokens: DashMap<String, Vec<String>>,
110 user_to_sessions: DashMap<String, Vec<String>>,
111
112 default_ttl: Option<Duration>,
114}
115
116impl Default for DashMapMemoryStorage {
117 fn default() -> Self {
118 Self::new()
119 }
120}
121
122impl DashMapMemoryStorage {
123 pub fn new() -> Self {
125 Self {
126 tokens: DashMap::new(),
127 sessions: DashMap::new(),
128 kv_store: DashMap::new(),
129 access_token_to_id: DashMap::new(),
130 user_to_tokens: DashMap::new(),
131 user_to_sessions: DashMap::new(),
132 default_ttl: None,
133 }
134 }
135
136 pub fn with_ttl(ttl: Duration) -> Self {
138 Self {
139 tokens: DashMap::new(),
140 sessions: DashMap::new(),
141 kv_store: DashMap::new(),
142 access_token_to_id: DashMap::new(),
143 user_to_tokens: DashMap::new(),
144 user_to_sessions: DashMap::new(),
145 default_ttl: Some(ttl),
146 }
147 }
148
149 fn create_audit_event(
151 &self,
152 event_type: AuditEventType,
153 user_id: &str,
154 resource_id: &str,
155 resource_type: &str,
156 outcome: EventOutcome,
157 details_str: Option<&str>,
158 ) -> AuditEvent {
159 let mut details = std::collections::HashMap::new();
160 if let Some(detail) = details_str {
161 details.insert("operation_details".to_string(), detail.to_string());
162 }
163 details.insert("resource_type".to_string(), resource_type.to_string());
164 details.insert("resource_id".to_string(), resource_id.to_string());
165
166 let risk_level = match &event_type {
167 AuditEventType::TokenRevoked | AuditEventType::TokenExpired => RiskLevel::Medium,
168 AuditEventType::SuspiciousActivity => RiskLevel::High,
169 _ => RiskLevel::Low,
170 };
171
172 AuditEvent::builder(
173 event_type.clone(),
174 format!("{:?} operation on {} {}", event_type, resource_type, resource_id),
175 )
176 .user_id(user_id)
177 .outcome(outcome)
178 .risk_level(risk_level)
179 .details(details)
180 .request_metadata(
181 RequestMetadata::new().with_endpoint("storage"),
182 )
183 .resource(ResourceInfo {
184 resource_type: resource_type.to_string(),
185 resource_id: resource_id.to_string(),
186 resource_name: None,
187 attributes: std::collections::HashMap::new(),
188 })
189 .actor(ActorInfo {
190 actor_type: "storage_system".to_string(),
191 actor_id: user_id.to_string(),
192 actor_name: Some(user_id.to_string()),
193 roles: vec!["storage_user".to_string()],
194 })
195 .build()
196 }
197
198 async fn log_storage_operation(
200 &self,
201 event_type: AuditEventType,
202 user_id: &str,
203 resource_id: &str,
204 resource_type: &str,
205 created_at: Option<chrono::DateTime<chrono::Utc>>,
206 outcome: EventOutcome,
207 ) {
208 let details = if let Some(created) = created_at {
209 let age = chrono::Utc::now().signed_duration_since(created);
210 format!(
211 "{:?} operation on {} {} (age: {} seconds)",
212 event_type,
213 resource_type,
214 resource_id,
215 age.num_seconds()
216 )
217 } else {
218 format!(
219 "{:?} operation on {} {}",
220 event_type, resource_type, resource_id
221 )
222 };
223
224 let audit_event = self.create_audit_event(
225 event_type,
226 user_id,
227 resource_id,
228 resource_type,
229 outcome,
230 Some(&details),
231 );
232
233 tracing::info!(
235 "STORAGE AUDIT: {}",
236 serde_json::to_string(&audit_event).unwrap_or_default()
237 );
238 }
239
240 pub fn get_storage_statistics(&self) -> std::collections::HashMap<String, serde_json::Value> {
242 let mut stats = std::collections::HashMap::new();
243 stats.insert(
244 "total_tokens".to_string(),
245 serde_json::Value::from(self.tokens.len()),
246 );
247 stats.insert(
248 "total_sessions".to_string(),
249 serde_json::Value::from(self.sessions.len()),
250 );
251 stats.insert(
252 "total_kv_pairs".to_string(),
253 serde_json::Value::from(self.kv_store.len()),
254 );
255 stats.insert(
256 "total_users_with_tokens".to_string(),
257 serde_json::Value::from(self.user_to_tokens.len()),
258 );
259 stats.insert(
260 "total_users_with_sessions".to_string(),
261 serde_json::Value::from(self.user_to_sessions.len()),
262 );
263 stats.insert(
264 "timestamp".to_string(),
265 serde_json::Value::from(chrono::Utc::now().to_rfc3339()),
266 );
267 stats
268 }
269
270 pub fn audit_token_ages(&self) -> Vec<(String, String, i64)> {
272 let mut aged_tokens = Vec::new();
273 let now = chrono::Utc::now();
274
275 for entry in self.tokens.iter() {
276 let age_seconds = now.signed_duration_since(entry.created_at).num_seconds();
277 aged_tokens.push((
278 entry.key().clone(),
279 entry.token.user_id.clone(),
280 age_seconds,
281 ));
282 }
283
284 aged_tokens.sort_by(|a, b| b.2.cmp(&a.2)); aged_tokens
286 }
287
288 pub fn audit_session_ages(&self) -> Vec<(String, String, i64)> {
290 let mut aged_sessions = Vec::new();
291 let now = chrono::Utc::now();
292
293 for entry in self.sessions.iter() {
294 let age_seconds = now.signed_duration_since(entry.created_at).num_seconds();
295 aged_sessions.push((
296 entry.key().clone(),
297 entry.session.user_id.clone(),
298 age_seconds,
299 ));
300 }
301
302 aged_sessions.sort_by(|a, b| b.2.cmp(&a.2)); aged_sessions
304 }
305
306 pub fn generate_audit_report(&self) -> std::collections::HashMap<String, serde_json::Value> {
308 let mut report = self.get_storage_statistics();
309
310 let token_ages = self.audit_token_ages();
311 let session_ages = self.audit_session_ages();
312
313 if !token_ages.is_empty() {
315 report.insert(
316 "oldest_token_age_seconds".to_string(),
317 serde_json::Value::from(token_ages[0].2),
318 );
319 report.insert(
320 "tokens_older_than_24h".to_string(),
321 serde_json::Value::from(
322 token_ages.iter().filter(|(_, _, age)| *age > 86400).count(),
323 ),
324 );
325 }
326
327 if !session_ages.is_empty() {
328 report.insert(
329 "oldest_session_age_seconds".to_string(),
330 serde_json::Value::from(session_ages[0].2),
331 );
332 report.insert(
333 "sessions_older_than_24h".to_string(),
334 serde_json::Value::from(
335 session_ages
336 .iter()
337 .filter(|(_, _, age)| *age > 86400)
338 .count(),
339 ),
340 );
341 }
342
343 report
344 }
345
346 fn add_token_to_user_index(&self, user_id: &str, token_id: &str) {
349 self.user_to_tokens
351 .entry(user_id.to_string())
352 .and_modify(|tokens| tokens.push(token_id.to_string()))
353 .or_insert_with(|| vec![token_id.to_string()]);
354 }
355
356 fn remove_token_from_user_index(&self, user_id: &str, token_id: &str) {
358 if let Some(mut entry) = self.user_to_tokens.get_mut(user_id) {
360 entry.retain(|id| id != token_id);
361 if entry.is_empty() {
362 drop(entry); self.user_to_tokens.remove(user_id);
364 }
365 }
366 }
367
368 fn add_session_to_user_index(&self, user_id: &str, session_id: &str) {
370 self.user_to_sessions
371 .entry(user_id.to_string())
372 .and_modify(|sessions| sessions.push(session_id.to_string()))
373 .or_insert_with(|| vec![session_id.to_string()]);
374 }
375
376 fn remove_session_from_user_index(&self, user_id: &str, session_id: &str) {
378 if let Some(mut entry) = self.user_to_sessions.get_mut(user_id) {
379 entry.retain(|id| id != session_id);
380 if entry.is_empty() {
381 drop(entry);
382 self.user_to_sessions.remove(user_id);
383 }
384 }
385 }
386
387 pub fn list_kv_keys_by_prefix(&self, prefix: &str) -> Vec<String> {
389 self.kv_store
390 .iter()
391 .filter_map(|entry| {
392 if entry.key().starts_with(prefix) {
393 Some(entry.key().clone())
394 } else {
395 None
396 }
397 })
398 .collect()
399 }
400}
401
402#[async_trait]
403impl AuthStorage for DashMapMemoryStorage {
404 async fn store_token(&self, token: &AuthToken) -> Result<()> {
405 let timestamped = TimestampedToken::new(token.clone(), self.default_ttl);
406 let created_at = timestamped.created_at;
407
408 self.tokens.insert(token.token_id.clone(), timestamped);
410
411 self.access_token_to_id
413 .insert(token.access_token.clone(), token.token_id.clone());
414
415 self.add_token_to_user_index(&token.user_id, &token.token_id);
417
418 self.log_storage_operation(
420 AuditEventType::LoginSuccess, &token.user_id,
422 &token.token_id,
423 "token",
424 Some(created_at),
425 EventOutcome::Success,
426 )
427 .await;
428
429 Ok(())
430 }
431
432 async fn get_token(&self, token_id: &str) -> Result<Option<AuthToken>> {
433 if let Some(timestamped) = self.tokens.get(token_id) {
435 let created_at = timestamped.created_at;
436 let user_id = timestamped.token.user_id.clone();
437
438 if timestamped.is_expired() {
439 drop(timestamped); self.tokens.remove(token_id); self.log_storage_operation(
444 AuditEventType::TokenExpired,
445 &user_id,
446 token_id,
447 "token",
448 Some(created_at),
449 EventOutcome::Failure,
450 )
451 .await;
452
453 return Ok(None);
454 }
455
456 let token = timestamped.token.clone();
457 drop(timestamped); self.log_storage_operation(
461 AuditEventType::LoginSuccess, &user_id,
463 token_id,
464 "token",
465 Some(created_at),
466 EventOutcome::Success,
467 )
468 .await;
469
470 Ok(Some(token))
471 } else {
472 Ok(None)
473 }
474 }
475
476 async fn get_token_by_access_token(&self, access_token: &str) -> Result<Option<AuthToken>> {
477 if let Some(token_id_entry) = self.access_token_to_id.get(access_token) {
479 let token_id = token_id_entry.clone(); drop(token_id_entry); self.get_token(&token_id).await } else {
483 Ok(None)
484 }
485 }
486
487 async fn update_token(&self, token: &AuthToken) -> Result<()> {
488 self.store_token(token).await
490 }
491
492 async fn delete_token(&self, token_id: &str) -> Result<()> {
493 let token_info = if let Some(timestamped) = self.tokens.get(token_id) {
495 Some((
496 timestamped.token.user_id.clone(),
497 timestamped.token.access_token.clone(),
498 timestamped.created_at,
499 ))
500 } else {
501 None
502 };
503
504 if let Some((user_id, access_token, created_at)) = token_info {
505 self.tokens.remove(token_id);
507 self.access_token_to_id.remove(&access_token);
508 self.remove_token_from_user_index(&user_id, token_id);
509
510 self.log_storage_operation(
512 AuditEventType::TokenRevoked,
513 &user_id,
514 token_id,
515 "token",
516 Some(created_at),
517 EventOutcome::Success,
518 )
519 .await;
520 }
521
522 Ok(())
523 }
524
525 async fn list_user_tokens(&self, user_id: &str) -> Result<Vec<AuthToken>> {
526 let token_ids = if let Some(ids) = self.user_to_tokens.get(user_id) {
528 ids.clone() } else {
530 return Ok(Vec::new());
531 };
532
533 let mut tokens = Vec::new();
534 let mut expired_tokens = Vec::new();
535
536 for token_id in token_ids {
538 if let Some(timestamped) = self.tokens.get(&token_id) {
539 if timestamped.is_expired() {
540 expired_tokens.push(token_id);
541 } else {
542 tokens.push(timestamped.token.clone());
543 }
544 } else {
545 expired_tokens.push(token_id); }
547 }
548
549 for token_id in expired_tokens {
551 self.delete_token(&token_id).await?;
552 }
553
554 Ok(tokens)
555 }
556
557 async fn store_session(&self, session_id: &str, data: &SessionData) -> Result<()> {
558 let timestamped = TimestampedSession::new(data.clone());
559 let created_at = timestamped.created_at;
560
561 self.sessions.insert(session_id.to_string(), timestamped);
563
564 self.add_session_to_user_index(&data.user_id, session_id);
566
567 self.log_storage_operation(
569 AuditEventType::LoginSuccess, &data.user_id,
571 session_id,
572 "session",
573 Some(created_at),
574 EventOutcome::Success,
575 )
576 .await;
577
578 Ok(())
579 }
580
581 async fn get_session(&self, session_id: &str) -> Result<Option<SessionData>> {
582 if let Some(timestamped) = self.sessions.get(session_id) {
584 let created_at = timestamped.created_at;
585 let user_id = timestamped.session.user_id.clone();
586
587 if timestamped.is_expired() {
588 drop(timestamped);
589 self.sessions.remove(session_id);
590
591 self.log_storage_operation(
593 AuditEventType::TokenExpired, &user_id,
595 session_id,
596 "session",
597 Some(created_at),
598 EventOutcome::Failure,
599 )
600 .await;
601
602 return Ok(None);
603 }
604
605 let session = timestamped.session.clone();
606 drop(timestamped);
607
608 self.log_storage_operation(
610 AuditEventType::LoginSuccess, &user_id,
612 session_id,
613 "session",
614 Some(created_at),
615 EventOutcome::Success,
616 )
617 .await;
618
619 Ok(Some(session))
620 } else {
621 Ok(None)
622 }
623 }
624
625 async fn delete_session(&self, session_id: &str) -> Result<()> {
626 let session_info = if let Some(timestamped) = self.sessions.get(session_id) {
628 Some((timestamped.session.user_id.clone(), timestamped.created_at))
629 } else {
630 None
631 };
632
633 if let Some((user_id, created_at)) = session_info {
634 self.sessions.remove(session_id);
635 self.remove_session_from_user_index(&user_id, session_id);
636
637 self.log_storage_operation(
639 AuditEventType::Logout,
640 &user_id,
641 session_id,
642 "session",
643 Some(created_at),
644 EventOutcome::Success,
645 )
646 .await;
647 }
648
649 Ok(())
650 }
651
652 async fn list_user_sessions(&self, user_id: &str) -> Result<Vec<SessionData>> {
653 let session_ids = if let Some(ids) = self.user_to_sessions.get(user_id) {
655 ids.clone()
656 } else {
657 return Ok(Vec::new());
658 };
659
660 let mut sessions = Vec::new();
661 let mut expired_sessions = Vec::new();
662
663 for session_id in session_ids {
665 if let Some(timestamped) = self.sessions.get(&session_id) {
666 if timestamped.is_expired() {
667 expired_sessions.push(session_id);
668 } else {
669 sessions.push(timestamped.session.clone());
670 }
671 } else {
672 expired_sessions.push(session_id);
673 }
674 }
675
676 for session_id in expired_sessions {
678 self.delete_session(&session_id).await?;
679 }
680
681 Ok(sessions)
682 }
683
684 async fn store_kv(&self, key: &str, value: &[u8], ttl: Option<Duration>) -> Result<()> {
685 let timestamped = TimestampedValue::new(value.to_vec(), ttl.or(self.default_ttl));
686 let created_at = timestamped.created_at;
687
688 self.kv_store.insert(key.to_string(), timestamped);
689
690 self.log_storage_operation(
692 AuditEventType::ConfigurationChanged, "system", key,
695 "kv_pair",
696 Some(created_at),
697 EventOutcome::Success,
698 )
699 .await;
700
701 Ok(())
702 }
703
704 async fn get_kv(&self, key: &str) -> Result<Option<Vec<u8>>> {
705 if let Some(timestamped) = self.kv_store.get(key) {
707 let created_at = timestamped.created_at;
708
709 if timestamped.is_expired() {
710 drop(timestamped);
711 self.kv_store.remove(key);
712
713 self.log_storage_operation(
715 AuditEventType::TokenExpired, "system",
717 key,
718 "kv_pair",
719 Some(created_at),
720 EventOutcome::Failure,
721 )
722 .await;
723
724 return Ok(None);
725 }
726
727 let data = timestamped.data.clone();
728 drop(timestamped);
729
730 self.log_storage_operation(
732 AuditEventType::ConfigurationChanged, "system",
734 key,
735 "kv_pair",
736 Some(created_at),
737 EventOutcome::Success,
738 )
739 .await;
740
741 Ok(Some(data))
742 } else {
743 Ok(None)
744 }
745 }
746
747 async fn delete_kv(&self, key: &str) -> Result<()> {
748 let created_at = if let Some(timestamped) = self.kv_store.get(key) {
750 Some(timestamped.created_at)
751 } else {
752 None
753 };
754
755 self.kv_store.remove(key);
756
757 if let Some(created_at) = created_at {
758 self.log_storage_operation(
760 AuditEventType::ConfigurationChanged, "system",
762 key,
763 "kv_pair",
764 Some(created_at),
765 EventOutcome::Success,
766 )
767 .await;
768 }
769
770 Ok(())
771 }
772
773 async fn cleanup_expired(&self) -> Result<()> {
774 let mut expired_tokens = Vec::new();
776 let mut expired_sessions = Vec::new();
777 let mut expired_kvs = Vec::new();
778
779 for entry in self.tokens.iter() {
781 if entry.is_expired() {
782 expired_tokens.push(entry.key().clone());
783 }
784 }
785
786 for entry in self.sessions.iter() {
787 if entry.is_expired() {
788 expired_sessions.push(entry.key().clone());
789 }
790 }
791
792 for entry in self.kv_store.iter() {
793 if entry.is_expired() {
794 expired_kvs.push(entry.key().clone());
795 }
796 }
797
798 for token_id in expired_tokens {
800 self.delete_token(&token_id).await?;
801 }
802
803 for session_id in expired_sessions {
804 self.delete_session(&session_id).await?;
805 }
806
807 for key in expired_kvs {
808 self.delete_kv(&key).await?;
809 }
810
811 Ok(())
812 }
813
814 async fn count_active_sessions(&self) -> Result<u64> {
815 let mut count = 0;
816
817 for entry in self.sessions.iter() {
819 if !entry.is_expired() {
820 count += 1;
821 }
822 }
823
824 Ok(count)
825 }
826}
827
828#[cfg(test)]
829mod tests {
830 use super::*;
831 use crate::{
832 testing::test_infrastructure::TestEnvironmentGuard,
833 tokens::TokenMetadata,
834 types::{Permissions, Roles, Scopes},
835 };
836 use std::collections::HashMap;
837 use tokio::task::JoinSet;
838
839 #[tokio::test]
840 async fn test_basic_token_operations() {
841 let _env = TestEnvironmentGuard::new().with_jwt_secret("dashmap-test");
842 let storage = DashMapMemoryStorage::new();
843
844 let token = AuthToken {
845 token_id: "test-token".to_string(),
846 user_id: "test-user".to_string(),
847 access_token: "access-123".to_string(),
848 token_type: Some("bearer".to_string()),
849 subject: Some("test-user".to_string()),
850 issuer: Some("test-issuer".to_string()),
851 refresh_token: None,
852 issued_at: chrono::Utc::now(),
853 expires_at: chrono::Utc::now() + chrono::Duration::hours(1),
854 scopes: Scopes::new(vec!["read".to_string()]),
855 auth_method: "password".to_string(),
856 client_id: Some("test-client".to_string()),
857 user_profile: None,
858 permissions: Permissions::new(vec!["read:data".to_string()]),
859 roles: Roles::new(vec!["user".to_string()]),
860 metadata: TokenMetadata::default(),
861 };
862
863 storage.store_token(&token).await.unwrap();
865
866 let retrieved = storage.get_token("test-token").await.unwrap().unwrap();
868 assert_eq!(retrieved.user_id, "test-user");
869
870 let retrieved = storage
872 .get_token_by_access_token("access-123")
873 .await
874 .unwrap()
875 .unwrap();
876 assert_eq!(retrieved.token_id, "test-token");
877
878 let user_tokens = storage.list_user_tokens("test-user").await.unwrap();
880 assert_eq!(user_tokens.len(), 1);
881
882 storage.delete_token("test-token").await.unwrap();
884 let retrieved = storage.get_token("test-token").await.unwrap();
885 assert!(retrieved.is_none());
886 }
887
888 #[tokio::test]
889 async fn test_session_operations() {
890 let _env = TestEnvironmentGuard::new().with_jwt_secret("dashmap-session-test");
891 let storage = DashMapMemoryStorage::new();
892
893 let session = SessionData {
894 session_id: "test-session".to_string(),
895 user_id: "test-user".to_string(),
896 created_at: chrono::Utc::now(),
897 last_activity: chrono::Utc::now(),
898 expires_at: chrono::Utc::now() + chrono::Duration::hours(1),
899 ip_address: Some("192.168.1.1".to_string()),
900 user_agent: Some("test-agent".to_string()),
901 data: HashMap::new(),
902 };
903
904 storage
906 .store_session("test-session", &session)
907 .await
908 .unwrap();
909
910 let retrieved = storage.get_session("test-session").await.unwrap().unwrap();
912 assert_eq!(retrieved.user_id, "test-user");
913
914 let user_sessions = storage.list_user_sessions("test-user").await.unwrap();
916 assert_eq!(user_sessions.len(), 1);
917
918 storage.delete_session("test-session").await.unwrap();
920 let retrieved = storage.get_session("test-session").await.unwrap();
921 assert!(retrieved.is_none());
922 }
923
924 #[tokio::test]
925 async fn test_kv_operations() {
926 let _env = TestEnvironmentGuard::new().with_jwt_secret("dashmap-kv-test");
927 let storage = DashMapMemoryStorage::new();
928
929 let key = "test-key";
930 let value = b"test-value";
931
932 storage
934 .store_kv(key, value, Some(Duration::from_secs(3600)))
935 .await
936 .unwrap();
937
938 let retrieved = storage.get_kv(key).await.unwrap().unwrap();
940 assert_eq!(retrieved, value);
941
942 storage.delete_kv(key).await.unwrap();
944 let retrieved = storage.get_kv(key).await.unwrap();
945 assert!(retrieved.is_none());
946 }
947
948 #[tokio::test]
949 async fn test_concurrent_operations_no_deadlock() {
950 let _env = TestEnvironmentGuard::new().with_jwt_secret("dashmap-concurrent-test");
951 let storage = std::sync::Arc::new(DashMapMemoryStorage::new());
952
953 let mut join_set = JoinSet::new();
954
955 for i in 0..10 {
957 let storage = storage.clone();
958 join_set.spawn(async move {
959 for j in 0..50 {
960 let token = AuthToken {
961 token_id: format!("token-{}-{}", i, j),
962 user_id: format!("user-{}", i % 3), access_token: format!("access-{}-{}", i, j),
964 token_type: Some("bearer".to_string()),
965 subject: Some(format!("user-{}", i % 3)),
966 issuer: Some("test-issuer".to_string()),
967 refresh_token: None,
968 issued_at: chrono::Utc::now(),
969 expires_at: chrono::Utc::now() + chrono::Duration::hours(1),
970 scopes: Scopes::new(vec!["read".to_string()]),
971 auth_method: "password".to_string(),
972 client_id: Some("test-client".to_string()),
973 user_profile: None,
974 permissions: Permissions::new(vec!["read:data".to_string()]),
975 roles: Roles::new(vec!["user".to_string()]),
976 metadata: TokenMetadata::default(),
977 };
978
979 storage.store_token(&token).await.unwrap();
981
982 let _user_tokens = storage.list_user_tokens(&token.user_id).await.unwrap();
984
985 let _retrieved = storage
987 .get_token_by_access_token(&token.access_token)
988 .await
989 .unwrap();
990 }
991 });
992 }
993
994 while join_set.join_next().await.is_some() {}
996
997 println!("✅ Concurrent operations test passed - no deadlocks detected!");
998 }
999
1000 #[tokio::test]
1001 async fn test_ttl_expiration() {
1002 let _env = TestEnvironmentGuard::new().with_jwt_secret("dashmap-ttl-test");
1003 let storage = DashMapMemoryStorage::with_ttl(Duration::from_millis(100));
1004
1005 storage
1007 .store_kv("expiring-key", b"expiring-value", None)
1008 .await
1009 .unwrap();
1010
1011 let retrieved = storage.get_kv("expiring-key").await.unwrap();
1013 assert!(retrieved.is_some());
1014
1015 tokio::time::sleep(Duration::from_millis(150)).await;
1017
1018 let retrieved = storage.get_kv("expiring-key").await.unwrap();
1020 assert!(retrieved.is_none());
1021 }
1022
1023 #[tokio::test]
1024 async fn test_cleanup_expired() {
1025 let _env = TestEnvironmentGuard::new().with_jwt_secret("dashmap-cleanup-test");
1026 let storage = DashMapMemoryStorage::with_ttl(Duration::from_millis(50));
1027
1028 for i in 0..10 {
1030 storage
1031 .store_kv(&format!("key-{}", i), b"value", None)
1032 .await
1033 .unwrap();
1034 }
1035
1036 tokio::time::sleep(Duration::from_millis(100)).await;
1038
1039 storage.cleanup_expired().await.unwrap();
1041
1042 for i in 0..10 {
1044 let retrieved = storage.get_kv(&format!("key-{}", i)).await.unwrap();
1045 assert!(retrieved.is_none());
1046 }
1047 }
1048}