Skip to main content

auth_framework/storage/
dashmap_memory.rs

1use crate::audit::{
2    ActorInfo, AuditEvent, AuditEventType, EventOutcome, RequestMetadata, ResourceInfo, RiskLevel,
3};
4/// DashMap-based storage implementation with deadlock-safe patterns
5///
6/// This implementation replaces RwLock<HashMap> with DashMap to provide:
7/// - Lock-free concurrent access
8/// - Deadlock-free operations
9/// - Better performance under high concurrency
10///
11/// Key safety principles:
12/// 1. Never hold multiple DashMap references simultaneously
13/// 2. Always extract values immediately rather than holding references
14/// 3. Use atomic operations for cross-map updates
15/// 4. Scope all operations to prevent reference leaks
16use crate::errors::Result;
17use crate::storage::core::{AuthStorage, SessionData};
18use crate::tokens::AuthToken;
19use async_trait::async_trait;
20use dashmap::DashMap;
21use std::time::Duration;
22
23/// Wrapper for tokens with expiration tracking
24#[derive(Debug, Clone)]
25struct TimestampedToken {
26    token: AuthToken,
27    created_at: chrono::DateTime<chrono::Utc>,
28    expires_at: Option<chrono::DateTime<chrono::Utc>>,
29}
30
31/// Wrapper for sessions with expiration tracking
32#[derive(Debug, Clone)]
33struct TimestampedSession {
34    session: SessionData,
35    created_at: chrono::DateTime<chrono::Utc>,
36}
37
38/// Wrapper for KV values with expiration
39#[derive(Debug, Clone)]
40struct TimestampedValue {
41    data: Vec<u8>,
42    created_at: chrono::DateTime<chrono::Utc>,
43    expires_at: Option<chrono::DateTime<chrono::Utc>>,
44}
45
46impl TimestampedToken {
47    fn new(token: AuthToken, ttl: Option<Duration>) -> Self {
48        let now = chrono::Utc::now();
49        let expires_at =
50            ttl.map(|d| now + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::hours(1)));
51
52        Self {
53            token,
54            created_at: now,
55            expires_at,
56        }
57    }
58
59    fn is_expired(&self) -> bool {
60        self.expires_at
61            .map(|exp| chrono::Utc::now() > exp)
62            .unwrap_or(false)
63    }
64}
65
66impl TimestampedSession {
67    fn new(session: SessionData) -> Self {
68        Self {
69            session,
70            created_at: chrono::Utc::now(),
71        }
72    }
73
74    fn is_expired(&self) -> bool {
75        self.session.is_expired()
76    }
77}
78
79impl TimestampedValue {
80    fn new(data: Vec<u8>, ttl: Option<Duration>) -> Self {
81        let now = chrono::Utc::now();
82        let expires_at =
83            ttl.map(|d| now + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::hours(1)));
84
85        Self {
86            data,
87            created_at: now,
88            expires_at,
89        }
90    }
91
92    fn is_expired(&self) -> bool {
93        self.expires_at
94            .map(|exp| chrono::Utc::now() > exp)
95            .unwrap_or(false)
96    }
97}
98
99/// DashMap-based storage with deadlock-safe patterns
100#[derive(Debug, Clone)]
101pub struct DashMapMemoryStorage {
102    // Core storage maps
103    tokens: DashMap<String, TimestampedToken>,
104    sessions: DashMap<String, TimestampedSession>,
105    kv_store: DashMap<String, TimestampedValue>,
106
107    // Index maps for efficient lookups
108    access_token_to_id: DashMap<String, String>,
109    user_to_tokens: DashMap<String, Vec<String>>,
110    user_to_sessions: DashMap<String, Vec<String>>,
111
112    // Configuration
113    default_ttl: Option<Duration>,
114}
115
116impl Default for DashMapMemoryStorage {
117    fn default() -> Self {
118        Self::new()
119    }
120}
121
122impl DashMapMemoryStorage {
123    /// Create a new DashMap-based storage
124    pub fn new() -> Self {
125        Self {
126            tokens: DashMap::new(),
127            sessions: DashMap::new(),
128            kv_store: DashMap::new(),
129            access_token_to_id: DashMap::new(),
130            user_to_tokens: DashMap::new(),
131            user_to_sessions: DashMap::new(),
132            default_ttl: None,
133        }
134    }
135
136    /// Create storage with default TTL for all stored items
137    pub fn with_ttl(ttl: Duration) -> Self {
138        Self {
139            tokens: DashMap::new(),
140            sessions: DashMap::new(),
141            kv_store: DashMap::new(),
142            access_token_to_id: DashMap::new(),
143            user_to_tokens: DashMap::new(),
144            user_to_sessions: DashMap::new(),
145            default_ttl: Some(ttl),
146        }
147    }
148
149    /// Generate audit event for storage operations
150    fn create_audit_event(
151        &self,
152        event_type: AuditEventType,
153        user_id: &str,
154        resource_id: &str,
155        resource_type: &str,
156        outcome: EventOutcome,
157        details_str: Option<&str>,
158    ) -> AuditEvent {
159        let mut details = std::collections::HashMap::new();
160        if let Some(detail) = details_str {
161            details.insert("operation_details".to_string(), detail.to_string());
162        }
163        details.insert("resource_type".to_string(), resource_type.to_string());
164        details.insert("resource_id".to_string(), resource_id.to_string());
165
166        let risk_level = match &event_type {
167            AuditEventType::TokenRevoked | AuditEventType::TokenExpired => RiskLevel::Medium,
168            AuditEventType::SuspiciousActivity => RiskLevel::High,
169            _ => RiskLevel::Low,
170        };
171
172        AuditEvent::builder(
173            event_type.clone(),
174            format!("{:?} operation on {} {}", event_type, resource_type, resource_id),
175        )
176        .user_id(user_id)
177        .outcome(outcome)
178        .risk_level(risk_level)
179        .details(details)
180        .request_metadata(
181            RequestMetadata::new().with_endpoint("storage"),
182        )
183        .resource(ResourceInfo {
184            resource_type: resource_type.to_string(),
185            resource_id: resource_id.to_string(),
186            resource_name: None,
187            attributes: std::collections::HashMap::new(),
188        })
189        .actor(ActorInfo {
190            actor_type: "storage_system".to_string(),
191            actor_id: user_id.to_string(),
192            actor_name: Some(user_id.to_string()),
193            roles: vec!["storage_user".to_string()],
194        })
195        .build()
196    }
197
198    /// Log storage operation with lifecycle information
199    async fn log_storage_operation(
200        &self,
201        event_type: AuditEventType,
202        user_id: &str,
203        resource_id: &str,
204        resource_type: &str,
205        created_at: Option<chrono::DateTime<chrono::Utc>>,
206        outcome: EventOutcome,
207    ) {
208        let details = if let Some(created) = created_at {
209            let age = chrono::Utc::now().signed_duration_since(created);
210            format!(
211                "{:?} operation on {} {} (age: {} seconds)",
212                event_type,
213                resource_type,
214                resource_id,
215                age.num_seconds()
216            )
217        } else {
218            format!(
219                "{:?} operation on {} {}",
220                event_type, resource_type, resource_id
221            )
222        };
223
224        let audit_event = self.create_audit_event(
225            event_type,
226            user_id,
227            resource_id,
228            resource_type,
229            outcome,
230            Some(&details),
231        );
232
233        // Log the audit event - in production this would go to the audit logger
234        tracing::info!(
235            "STORAGE AUDIT: {}",
236            serde_json::to_string(&audit_event).unwrap_or_default()
237        );
238    }
239
240    /// Get storage statistics for audit reporting
241    pub fn get_storage_statistics(&self) -> std::collections::HashMap<String, serde_json::Value> {
242        let mut stats = std::collections::HashMap::new();
243        stats.insert(
244            "total_tokens".to_string(),
245            serde_json::Value::from(self.tokens.len()),
246        );
247        stats.insert(
248            "total_sessions".to_string(),
249            serde_json::Value::from(self.sessions.len()),
250        );
251        stats.insert(
252            "total_kv_pairs".to_string(),
253            serde_json::Value::from(self.kv_store.len()),
254        );
255        stats.insert(
256            "total_users_with_tokens".to_string(),
257            serde_json::Value::from(self.user_to_tokens.len()),
258        );
259        stats.insert(
260            "total_users_with_sessions".to_string(),
261            serde_json::Value::from(self.user_to_sessions.len()),
262        );
263        stats.insert(
264            "timestamp".to_string(),
265            serde_json::Value::from(chrono::Utc::now().to_rfc3339()),
266        );
267        stats
268    }
269
270    /// Audit tokens by age - find old tokens for security review
271    pub fn audit_token_ages(&self) -> Vec<(String, String, i64)> {
272        let mut aged_tokens = Vec::new();
273        let now = chrono::Utc::now();
274
275        for entry in self.tokens.iter() {
276            let age_seconds = now.signed_duration_since(entry.created_at).num_seconds();
277            aged_tokens.push((
278                entry.key().clone(),
279                entry.token.user_id.clone(),
280                age_seconds,
281            ));
282        }
283
284        aged_tokens.sort_by(|a, b| b.2.cmp(&a.2)); // Sort by age descending
285        aged_tokens
286    }
287
288    /// Audit sessions by age - find old sessions for security review
289    pub fn audit_session_ages(&self) -> Vec<(String, String, i64)> {
290        let mut aged_sessions = Vec::new();
291        let now = chrono::Utc::now();
292
293        for entry in self.sessions.iter() {
294            let age_seconds = now.signed_duration_since(entry.created_at).num_seconds();
295            aged_sessions.push((
296                entry.key().clone(),
297                entry.session.user_id.clone(),
298                age_seconds,
299            ));
300        }
301
302        aged_sessions.sort_by(|a, b| b.2.cmp(&a.2)); // Sort by age descending
303        aged_sessions
304    }
305
306    /// Generate comprehensive audit report
307    pub fn generate_audit_report(&self) -> std::collections::HashMap<String, serde_json::Value> {
308        let mut report = self.get_storage_statistics();
309
310        let token_ages = self.audit_token_ages();
311        let session_ages = self.audit_session_ages();
312
313        // Add age analysis
314        if !token_ages.is_empty() {
315            report.insert(
316                "oldest_token_age_seconds".to_string(),
317                serde_json::Value::from(token_ages[0].2),
318            );
319            report.insert(
320                "tokens_older_than_24h".to_string(),
321                serde_json::Value::from(
322                    token_ages.iter().filter(|(_, _, age)| *age > 86400).count(),
323                ),
324            );
325        }
326
327        if !session_ages.is_empty() {
328            report.insert(
329                "oldest_session_age_seconds".to_string(),
330                serde_json::Value::from(session_ages[0].2),
331            );
332            report.insert(
333                "sessions_older_than_24h".to_string(),
334                serde_json::Value::from(
335                    session_ages
336                        .iter()
337                        .filter(|(_, _, age)| *age > 86400)
338                        .count(),
339                ),
340            );
341        }
342
343        report
344    }
345
346    /// DEADLOCK-SAFE: Add token to user index
347    /// Uses atomic operations to prevent cross-map deadlocks
348    fn add_token_to_user_index(&self, user_id: &str, token_id: &str) {
349        // SAFE: Scoped operation that doesn't hold references across map operations
350        self.user_to_tokens
351            .entry(user_id.to_string())
352            .and_modify(|tokens| tokens.push(token_id.to_string()))
353            .or_insert_with(|| vec![token_id.to_string()]);
354    }
355
356    /// DEADLOCK-SAFE: Remove token from user index
357    fn remove_token_from_user_index(&self, user_id: &str, token_id: &str) {
358        // SAFE: Scoped operation with immediate value extraction
359        if let Some(mut entry) = self.user_to_tokens.get_mut(user_id) {
360            entry.retain(|id| id != token_id);
361            if entry.is_empty() {
362                drop(entry); // Release the entry before removal
363                self.user_to_tokens.remove(user_id);
364            }
365        }
366    }
367
368    /// DEADLOCK-SAFE: Add session to user index
369    fn add_session_to_user_index(&self, user_id: &str, session_id: &str) {
370        self.user_to_sessions
371            .entry(user_id.to_string())
372            .and_modify(|sessions| sessions.push(session_id.to_string()))
373            .or_insert_with(|| vec![session_id.to_string()]);
374    }
375
376    /// DEADLOCK-SAFE: Remove session from user index
377    fn remove_session_from_user_index(&self, user_id: &str, session_id: &str) {
378        if let Some(mut entry) = self.user_to_sessions.get_mut(user_id) {
379            entry.retain(|id| id != session_id);
380            if entry.is_empty() {
381                drop(entry);
382                self.user_to_sessions.remove(user_id);
383            }
384        }
385    }
386
387    /// Return all KV keys that start with `prefix`.
388    pub fn list_kv_keys_by_prefix(&self, prefix: &str) -> Vec<String> {
389        self.kv_store
390            .iter()
391            .filter_map(|entry| {
392                if entry.key().starts_with(prefix) {
393                    Some(entry.key().clone())
394                } else {
395                    None
396                }
397            })
398            .collect()
399    }
400}
401
402#[async_trait]
403impl AuthStorage for DashMapMemoryStorage {
404    async fn store_token(&self, token: &AuthToken) -> Result<()> {
405        let timestamped = TimestampedToken::new(token.clone(), self.default_ttl);
406        let created_at = timestamped.created_at;
407
408        // SAFE: Store in primary map first
409        self.tokens.insert(token.token_id.clone(), timestamped);
410
411        // SAFE: Update access token index (no cross-map references)
412        self.access_token_to_id
413            .insert(token.access_token.clone(), token.token_id.clone());
414
415        // SAFE: Update user index (atomic operation)
416        self.add_token_to_user_index(&token.user_id, &token.token_id);
417
418        // Log storage operation with creation timestamp
419        self.log_storage_operation(
420            AuditEventType::LoginSuccess, // Token storage represents successful authentication
421            &token.user_id,
422            &token.token_id,
423            "token",
424            Some(created_at),
425            EventOutcome::Success,
426        )
427        .await;
428
429        Ok(())
430    }
431
432    async fn get_token(&self, token_id: &str) -> Result<Option<AuthToken>> {
433        // SAFE: Immediate value extraction, no reference holding
434        if let Some(timestamped) = self.tokens.get(token_id) {
435            let created_at = timestamped.created_at;
436            let user_id = timestamped.token.user_id.clone();
437
438            if timestamped.is_expired() {
439                drop(timestamped); // Release reference
440                self.tokens.remove(token_id); // Cleanup expired
441
442                // Log expired token access attempt
443                self.log_storage_operation(
444                    AuditEventType::TokenExpired,
445                    &user_id,
446                    token_id,
447                    "token",
448                    Some(created_at),
449                    EventOutcome::Failure,
450                )
451                .await;
452
453                return Ok(None);
454            }
455
456            let token = timestamped.token.clone();
457            drop(timestamped); // Release reference
458
459            // Log successful token access
460            self.log_storage_operation(
461                AuditEventType::LoginSuccess, // Token access represents authentication validation
462                &user_id,
463                token_id,
464                "token",
465                Some(created_at),
466                EventOutcome::Success,
467            )
468            .await;
469
470            Ok(Some(token))
471        } else {
472            Ok(None)
473        }
474    }
475
476    async fn get_token_by_access_token(&self, access_token: &str) -> Result<Option<AuthToken>> {
477        // SAFE: Two-step lookup with immediate value extraction
478        if let Some(token_id_entry) = self.access_token_to_id.get(access_token) {
479            let token_id = token_id_entry.clone(); // Extract value immediately
480            drop(token_id_entry); // Release first map reference
481            self.get_token(&token_id).await // Use extracted value
482        } else {
483            Ok(None)
484        }
485    }
486
487    async fn update_token(&self, token: &AuthToken) -> Result<()> {
488        // SAFE: Update is same as store for this implementation
489        self.store_token(token).await
490    }
491
492    async fn delete_token(&self, token_id: &str) -> Result<()> {
493        // SAFE: Extract token info before removal to avoid reference issues
494        let token_info = if let Some(timestamped) = self.tokens.get(token_id) {
495            Some((
496                timestamped.token.user_id.clone(),
497                timestamped.token.access_token.clone(),
498                timestamped.created_at,
499            ))
500        } else {
501            None
502        };
503
504        if let Some((user_id, access_token, created_at)) = token_info {
505            // SAFE: All operations use extracted values, no cross-map references
506            self.tokens.remove(token_id);
507            self.access_token_to_id.remove(&access_token);
508            self.remove_token_from_user_index(&user_id, token_id);
509
510            // Log token deletion with creation timestamp for audit trail
511            self.log_storage_operation(
512                AuditEventType::TokenRevoked,
513                &user_id,
514                token_id,
515                "token",
516                Some(created_at),
517                EventOutcome::Success,
518            )
519            .await;
520        }
521
522        Ok(())
523    }
524
525    async fn list_user_tokens(&self, user_id: &str) -> Result<Vec<AuthToken>> {
526        // SAFE: Extract token IDs first, then lookup individually
527        let token_ids = if let Some(ids) = self.user_to_tokens.get(user_id) {
528            ids.clone() // Immediate extraction
529        } else {
530            return Ok(Vec::new());
531        };
532
533        let mut tokens = Vec::new();
534        let mut expired_tokens = Vec::new();
535
536        // SAFE: Iterate over extracted IDs, no cross-map reference holding
537        for token_id in token_ids {
538            if let Some(timestamped) = self.tokens.get(&token_id) {
539                if timestamped.is_expired() {
540                    expired_tokens.push(token_id);
541                } else {
542                    tokens.push(timestamped.token.clone());
543                }
544            } else {
545                expired_tokens.push(token_id); // Token was removed elsewhere
546            }
547        }
548
549        // SAFE: Cleanup expired tokens (uses extracted IDs)
550        for token_id in expired_tokens {
551            self.delete_token(&token_id).await?;
552        }
553
554        Ok(tokens)
555    }
556
557    async fn store_session(&self, session_id: &str, data: &SessionData) -> Result<()> {
558        let timestamped = TimestampedSession::new(data.clone());
559        let created_at = timestamped.created_at;
560
561        // SAFE: Store in primary map first
562        self.sessions.insert(session_id.to_string(), timestamped);
563
564        // SAFE: Update user index (atomic operation)
565        self.add_session_to_user_index(&data.user_id, session_id);
566
567        // Log session storage with creation timestamp
568        self.log_storage_operation(
569            AuditEventType::LoginSuccess, // Session creation represents successful login
570            &data.user_id,
571            session_id,
572            "session",
573            Some(created_at),
574            EventOutcome::Success,
575        )
576        .await;
577
578        Ok(())
579    }
580
581    async fn get_session(&self, session_id: &str) -> Result<Option<SessionData>> {
582        // SAFE: Immediate value extraction
583        if let Some(timestamped) = self.sessions.get(session_id) {
584            let created_at = timestamped.created_at;
585            let user_id = timestamped.session.user_id.clone();
586
587            if timestamped.is_expired() {
588                drop(timestamped);
589                self.sessions.remove(session_id);
590
591                // Log expired session access
592                self.log_storage_operation(
593                    AuditEventType::TokenExpired, // Session expiration
594                    &user_id,
595                    session_id,
596                    "session",
597                    Some(created_at),
598                    EventOutcome::Failure,
599                )
600                .await;
601
602                return Ok(None);
603            }
604
605            let session = timestamped.session.clone();
606            drop(timestamped);
607
608            // Log successful session access
609            self.log_storage_operation(
610                AuditEventType::LoginSuccess, // Session access represents continued authentication
611                &user_id,
612                session_id,
613                "session",
614                Some(created_at),
615                EventOutcome::Success,
616            )
617            .await;
618
619            Ok(Some(session))
620        } else {
621            Ok(None)
622        }
623    }
624
625    async fn delete_session(&self, session_id: &str) -> Result<()> {
626        // SAFE: Extract session info before removal
627        let session_info = if let Some(timestamped) = self.sessions.get(session_id) {
628            Some((timestamped.session.user_id.clone(), timestamped.created_at))
629        } else {
630            None
631        };
632
633        if let Some((user_id, created_at)) = session_info {
634            self.sessions.remove(session_id);
635            self.remove_session_from_user_index(&user_id, session_id);
636
637            // Log session deletion with creation timestamp
638            self.log_storage_operation(
639                AuditEventType::Logout,
640                &user_id,
641                session_id,
642                "session",
643                Some(created_at),
644                EventOutcome::Success,
645            )
646            .await;
647        }
648
649        Ok(())
650    }
651
652    async fn list_user_sessions(&self, user_id: &str) -> Result<Vec<SessionData>> {
653        // SAFE: Extract session IDs first
654        let session_ids = if let Some(ids) = self.user_to_sessions.get(user_id) {
655            ids.clone()
656        } else {
657            return Ok(Vec::new());
658        };
659
660        let mut sessions = Vec::new();
661        let mut expired_sessions = Vec::new();
662
663        // SAFE: Iterate over extracted IDs
664        for session_id in session_ids {
665            if let Some(timestamped) = self.sessions.get(&session_id) {
666                if timestamped.is_expired() {
667                    expired_sessions.push(session_id);
668                } else {
669                    sessions.push(timestamped.session.clone());
670                }
671            } else {
672                expired_sessions.push(session_id);
673            }
674        }
675
676        // SAFE: Cleanup expired sessions
677        for session_id in expired_sessions {
678            self.delete_session(&session_id).await?;
679        }
680
681        Ok(sessions)
682    }
683
684    async fn store_kv(&self, key: &str, value: &[u8], ttl: Option<Duration>) -> Result<()> {
685        let timestamped = TimestampedValue::new(value.to_vec(), ttl.or(self.default_ttl));
686        let created_at = timestamped.created_at;
687
688        self.kv_store.insert(key.to_string(), timestamped);
689
690        // Log KV storage operation with creation timestamp
691        self.log_storage_operation(
692            AuditEventType::ConfigurationChanged, // KV operations represent configuration/data changes
693            "system",                             // KV operations are typically system-level
694            key,
695            "kv_pair",
696            Some(created_at),
697            EventOutcome::Success,
698        )
699        .await;
700
701        Ok(())
702    }
703
704    async fn get_kv(&self, key: &str) -> Result<Option<Vec<u8>>> {
705        // SAFE: Immediate value extraction
706        if let Some(timestamped) = self.kv_store.get(key) {
707            let created_at = timestamped.created_at;
708
709            if timestamped.is_expired() {
710                drop(timestamped);
711                self.kv_store.remove(key);
712
713                // Log expired KV access
714                self.log_storage_operation(
715                    AuditEventType::TokenExpired, // Data expiration
716                    "system",
717                    key,
718                    "kv_pair",
719                    Some(created_at),
720                    EventOutcome::Failure,
721                )
722                .await;
723
724                return Ok(None);
725            }
726
727            let data = timestamped.data.clone();
728            drop(timestamped);
729
730            // Log successful KV access
731            self.log_storage_operation(
732                AuditEventType::ConfigurationChanged, // KV access
733                "system",
734                key,
735                "kv_pair",
736                Some(created_at),
737                EventOutcome::Success,
738            )
739            .await;
740
741            Ok(Some(data))
742        } else {
743            Ok(None)
744        }
745    }
746
747    async fn delete_kv(&self, key: &str) -> Result<()> {
748        // SAFE: Extract creation timestamp before removal
749        let created_at = if let Some(timestamped) = self.kv_store.get(key) {
750            Some(timestamped.created_at)
751        } else {
752            None
753        };
754
755        self.kv_store.remove(key);
756
757        if let Some(created_at) = created_at {
758            // Log KV deletion with creation timestamp
759            self.log_storage_operation(
760                AuditEventType::ConfigurationChanged, // KV deletion
761                "system",
762                key,
763                "kv_pair",
764                Some(created_at),
765                EventOutcome::Success,
766            )
767            .await;
768        }
769
770        Ok(())
771    }
772
773    async fn cleanup_expired(&self) -> Result<()> {
774        // SAFE: Collect expired keys first, then remove them
775        let mut expired_tokens = Vec::new();
776        let mut expired_sessions = Vec::new();
777        let mut expired_kvs = Vec::new();
778
779        // SAFE: Scan for expired items (no cross-map operations)
780        for entry in self.tokens.iter() {
781            if entry.is_expired() {
782                expired_tokens.push(entry.key().clone());
783            }
784        }
785
786        for entry in self.sessions.iter() {
787            if entry.is_expired() {
788                expired_sessions.push(entry.key().clone());
789            }
790        }
791
792        for entry in self.kv_store.iter() {
793            if entry.is_expired() {
794                expired_kvs.push(entry.key().clone());
795            }
796        }
797
798        // SAFE: Remove expired items using extracted keys
799        for token_id in expired_tokens {
800            self.delete_token(&token_id).await?;
801        }
802
803        for session_id in expired_sessions {
804            self.delete_session(&session_id).await?;
805        }
806
807        for key in expired_kvs {
808            self.delete_kv(&key).await?;
809        }
810
811        Ok(())
812    }
813
814    async fn count_active_sessions(&self) -> Result<u64> {
815        let mut count = 0;
816
817        // Count all non-expired sessions
818        for entry in self.sessions.iter() {
819            if !entry.is_expired() {
820                count += 1;
821            }
822        }
823
824        Ok(count)
825    }
826}
827
828#[cfg(test)]
829mod tests {
830    use super::*;
831    use crate::{
832        testing::test_infrastructure::TestEnvironmentGuard,
833        tokens::TokenMetadata,
834        types::{Permissions, Roles, Scopes},
835    };
836    use std::collections::HashMap;
837    use tokio::task::JoinSet;
838
839    #[tokio::test]
840    async fn test_basic_token_operations() {
841        let _env = TestEnvironmentGuard::new().with_jwt_secret("dashmap-test");
842        let storage = DashMapMemoryStorage::new();
843
844        let token = AuthToken {
845            token_id: "test-token".to_string(),
846            user_id: "test-user".to_string(),
847            access_token: "access-123".to_string(),
848            token_type: Some("bearer".to_string()),
849            subject: Some("test-user".to_string()),
850            issuer: Some("test-issuer".to_string()),
851            refresh_token: None,
852            issued_at: chrono::Utc::now(),
853            expires_at: chrono::Utc::now() + chrono::Duration::hours(1),
854            scopes: Scopes::new(vec!["read".to_string()]),
855            auth_method: "password".to_string(),
856            client_id: Some("test-client".to_string()),
857            user_profile: None,
858            permissions: Permissions::new(vec!["read:data".to_string()]),
859            roles: Roles::new(vec!["user".to_string()]),
860            metadata: TokenMetadata::default(),
861        };
862
863        // Store token
864        storage.store_token(&token).await.unwrap();
865
866        // Get token by ID
867        let retrieved = storage.get_token("test-token").await.unwrap().unwrap();
868        assert_eq!(retrieved.user_id, "test-user");
869
870        // Get token by access token
871        let retrieved = storage
872            .get_token_by_access_token("access-123")
873            .await
874            .unwrap()
875            .unwrap();
876        assert_eq!(retrieved.token_id, "test-token");
877
878        // List user tokens
879        let user_tokens = storage.list_user_tokens("test-user").await.unwrap();
880        assert_eq!(user_tokens.len(), 1);
881
882        // Delete token
883        storage.delete_token("test-token").await.unwrap();
884        let retrieved = storage.get_token("test-token").await.unwrap();
885        assert!(retrieved.is_none());
886    }
887
888    #[tokio::test]
889    async fn test_session_operations() {
890        let _env = TestEnvironmentGuard::new().with_jwt_secret("dashmap-session-test");
891        let storage = DashMapMemoryStorage::new();
892
893        let session = SessionData {
894            session_id: "test-session".to_string(),
895            user_id: "test-user".to_string(),
896            created_at: chrono::Utc::now(),
897            last_activity: chrono::Utc::now(),
898            expires_at: chrono::Utc::now() + chrono::Duration::hours(1),
899            ip_address: Some("192.168.1.1".to_string()),
900            user_agent: Some("test-agent".to_string()),
901            data: HashMap::new(),
902        };
903
904        // Store session
905        storage
906            .store_session("test-session", &session)
907            .await
908            .unwrap();
909
910        // Get session
911        let retrieved = storage.get_session("test-session").await.unwrap().unwrap();
912        assert_eq!(retrieved.user_id, "test-user");
913
914        // List user sessions
915        let user_sessions = storage.list_user_sessions("test-user").await.unwrap();
916        assert_eq!(user_sessions.len(), 1);
917
918        // Delete session
919        storage.delete_session("test-session").await.unwrap();
920        let retrieved = storage.get_session("test-session").await.unwrap();
921        assert!(retrieved.is_none());
922    }
923
924    #[tokio::test]
925    async fn test_kv_operations() {
926        let _env = TestEnvironmentGuard::new().with_jwt_secret("dashmap-kv-test");
927        let storage = DashMapMemoryStorage::new();
928
929        let key = "test-key";
930        let value = b"test-value";
931
932        // Store KV
933        storage
934            .store_kv(key, value, Some(Duration::from_secs(3600)))
935            .await
936            .unwrap();
937
938        // Get KV
939        let retrieved = storage.get_kv(key).await.unwrap().unwrap();
940        assert_eq!(retrieved, value);
941
942        // Delete KV
943        storage.delete_kv(key).await.unwrap();
944        let retrieved = storage.get_kv(key).await.unwrap();
945        assert!(retrieved.is_none());
946    }
947
948    #[tokio::test]
949    async fn test_concurrent_operations_no_deadlock() {
950        let _env = TestEnvironmentGuard::new().with_jwt_secret("dashmap-concurrent-test");
951        let storage = std::sync::Arc::new(DashMapMemoryStorage::new());
952
953        let mut join_set = JoinSet::new();
954
955        // Spawn multiple tasks doing concurrent operations
956        for i in 0..10 {
957            let storage = storage.clone();
958            join_set.spawn(async move {
959                for j in 0..50 {
960                    let token = AuthToken {
961                        token_id: format!("token-{}-{}", i, j),
962                        user_id: format!("user-{}", i % 3), // Multiple users per task
963                        access_token: format!("access-{}-{}", i, j),
964                        token_type: Some("bearer".to_string()),
965                        subject: Some(format!("user-{}", i % 3)),
966                        issuer: Some("test-issuer".to_string()),
967                        refresh_token: None,
968                        issued_at: chrono::Utc::now(),
969                        expires_at: chrono::Utc::now() + chrono::Duration::hours(1),
970                        scopes: Scopes::new(vec!["read".to_string()]),
971                        auth_method: "password".to_string(),
972                        client_id: Some("test-client".to_string()),
973                        user_profile: None,
974                        permissions: Permissions::new(vec!["read:data".to_string()]),
975                        roles: Roles::new(vec!["user".to_string()]),
976                        metadata: TokenMetadata::default(),
977                    };
978
979                    // Store token
980                    storage.store_token(&token).await.unwrap();
981
982                    // Immediately list user tokens (tests cross-map operations)
983                    let _user_tokens = storage.list_user_tokens(&token.user_id).await.unwrap();
984
985                    // Get by access token
986                    let _retrieved = storage
987                        .get_token_by_access_token(&token.access_token)
988                        .await
989                        .unwrap();
990                }
991            });
992        }
993
994        // Wait for all tasks to complete (should not deadlock)
995        while join_set.join_next().await.is_some() {}
996
997        println!("✅ Concurrent operations test passed - no deadlocks detected!");
998    }
999
1000    #[tokio::test]
1001    async fn test_ttl_expiration() {
1002        let _env = TestEnvironmentGuard::new().with_jwt_secret("dashmap-ttl-test");
1003        let storage = DashMapMemoryStorage::with_ttl(Duration::from_millis(100));
1004
1005        // Store a KV that should expire
1006        storage
1007            .store_kv("expiring-key", b"expiring-value", None)
1008            .await
1009            .unwrap();
1010
1011        // Should be available immediately
1012        let retrieved = storage.get_kv("expiring-key").await.unwrap();
1013        assert!(retrieved.is_some());
1014
1015        // Wait for expiration
1016        tokio::time::sleep(Duration::from_millis(150)).await;
1017
1018        // Should be expired and cleaned up
1019        let retrieved = storage.get_kv("expiring-key").await.unwrap();
1020        assert!(retrieved.is_none());
1021    }
1022
1023    #[tokio::test]
1024    async fn test_cleanup_expired() {
1025        let _env = TestEnvironmentGuard::new().with_jwt_secret("dashmap-cleanup-test");
1026        let storage = DashMapMemoryStorage::with_ttl(Duration::from_millis(50));
1027
1028        // Store multiple items that will expire
1029        for i in 0..10 {
1030            storage
1031                .store_kv(&format!("key-{}", i), b"value", None)
1032                .await
1033                .unwrap();
1034        }
1035
1036        // Wait for expiration
1037        tokio::time::sleep(Duration::from_millis(100)).await;
1038
1039        // Run cleanup
1040        storage.cleanup_expired().await.unwrap();
1041
1042        // Verify all items are cleaned up
1043        for i in 0..10 {
1044            let retrieved = storage.get_kv(&format!("key-{}", i)).await.unwrap();
1045            assert!(retrieved.is_none());
1046        }
1047    }
1048}