Skip to main content

queue_runtime/
sessions.rs

1//! Session management for ordered message processing.
2//!
3//! This module provides a generic framework for session key generation that enables
4//! ordered message processing for any domain. Session keys group related messages
5//! to ensure they are processed in FIFO order.
6//!
7//! # Design Philosophy
8//!
9//! This module is intentionally **domain-agnostic**. It provides the infrastructure
10//! for session-based ordering without assuming any specific message structure or
11//! business domain (GitHub events, e-commerce orders, IoT telemetry, etc.).
12//!
13//! # Core Concepts
14//!
15//! - **SessionKeyGenerator**: Trait for extracting session keys from messages
16//! - **Session Keys**: Strings that group related messages for ordered processing
17//! - **Message Metadata**: Messages provide metadata via the `SessionKeyExtractor` trait
18//!
19//! # Usage Pattern
20//!
21//! 1. Implement `SessionKeyExtractor` for your message type
22//! 2. Implement `SessionKeyGenerator` for your domain-specific strategy
23//! 3. Use the generator to produce session IDs when sending messages to queues
24//!
25//! # Example
26//!
27//! ```rust
28//! use queue_runtime::sessions::{SessionKeyGenerator, SessionKeyExtractor};
29//! use queue_runtime::message::SessionId;
30//! use std::collections::HashMap;
31//!
32//! // Your domain message type
33//! struct OrderEvent {
34//!     order_id: String,
35//!     customer_id: String,
36//! }
37//!
38//! // Implement metadata extraction
39//! impl SessionKeyExtractor for OrderEvent {
40//!     fn get_metadata(&self, key: &str) -> Option<String> {
41//!         match key {
42//!             "order_id" => Some(self.order_id.clone()),
43//!             "customer_id" => Some(self.customer_id.clone()),
44//!             _ => None,
45//!         }
46//!     }
47//! }
48//!
49//! // Implement your session strategy
50//! struct OrderSessionStrategy;
51//!
52//! impl SessionKeyGenerator for OrderSessionStrategy {
53//!     fn generate_key(&self, extractor: &dyn SessionKeyExtractor) -> Option<SessionId> {
54//!         extractor.get_metadata("order_id")
55//!             .and_then(|id| SessionId::new(format!("order-{}", id)).ok())
56//!     }
57//! }
58//! ```
59
60use crate::error::ValidationError;
61use crate::message::{SessionId, Timestamp};
62use crate::QueueError;
63use std::collections::HashMap;
64use std::sync::Arc;
65use std::time::Duration;
66use tokio::sync::RwLock;
67use tokio::time::Instant;
68
69#[cfg(test)]
70#[path = "sessions_tests.rs"]
71mod tests;
72
73// ============================================================================
74// Session Key Extractor Trait
75// ============================================================================
76
77/// Trait for extracting metadata from messages for session key generation.
78///
79/// This trait provides a completely generic interface for messages to expose
80/// metadata that can be used to generate session keys. It makes no assumptions
81/// about the message structure or domain.
82///
83/// # Design
84///
85/// The trait uses a key-value interface where messages expose named metadata
86/// fields. Session key generators query for the metadata they need.
87///
88/// # Example
89///
90/// ```rust
91/// use queue_runtime::sessions::SessionKeyExtractor;
92///
93/// struct MyMessage {
94///     user_id: String,
95///     resource_id: String,
96/// }
97///
98/// impl SessionKeyExtractor for MyMessage {
99///     fn get_metadata(&self, key: &str) -> Option<String> {
100///         match key {
101///             "user_id" => Some(self.user_id.clone()),
102///             "resource_id" => Some(self.resource_id.clone()),
103///             _ => None,
104///         }
105///     }
106///
107///     fn list_metadata_keys(&self) -> Vec<String> {
108///         vec!["user_id".to_string(), "resource_id".to_string()]
109///     }
110/// }
111/// ```
112pub trait SessionKeyExtractor {
113    /// Get a metadata value by key.
114    ///
115    /// Returns `None` if the key doesn't exist or has no value for this message.
116    ///
117    /// # Arguments
118    ///
119    /// * `key` - The metadata key to retrieve
120    ///
121    /// # Returns
122    ///
123    /// Optional string value for the requested key
124    fn get_metadata(&self, key: &str) -> Option<String>;
125
126    /// List all available metadata keys for this message.
127    ///
128    /// This is useful for debugging and introspection. Default implementation
129    /// returns an empty list.
130    ///
131    /// # Returns
132    ///
133    /// Vector of available metadata key names
134    fn list_metadata_keys(&self) -> Vec<String> {
135        Vec::new()
136    }
137
138    /// Get all metadata as a map (optional, for bulk operations).
139    ///
140    /// Default implementation iterates over `list_metadata_keys()` and calls
141    /// `get_metadata()` for each key.
142    ///
143    /// # Returns
144    ///
145    /// HashMap of all available metadata
146    fn get_all_metadata(&self) -> HashMap<String, String> {
147        self.list_metadata_keys()
148            .into_iter()
149            .filter_map(|key| self.get_metadata(&key).map(|value| (key, value)))
150            .collect()
151    }
152}
153
154// ============================================================================
155// Session Key Generator Trait
156// ============================================================================
157
158/// Strategy trait for generating session keys from messages.
159///
160/// Implementations define how messages are grouped for ordered processing.
161/// The generator extracts relevant metadata from messages and produces
162/// session keys that group related messages together.
163///
164/// # Design Principles
165///
166/// - **Domain-Agnostic**: Works with any message structure via `SessionKeyExtractor`
167/// - **Strategy Pattern**: Different strategies provide different ordering semantics
168/// - **Composable**: Strategies can be combined or chained
169/// - **Optional Ordering**: Returning `None` allows concurrent processing
170///
171/// # Common Patterns
172///
173/// - **Entity-based**: Group by entity ID (order-123, user-456)
174/// - **Hierarchical**: Group by parent/child relationships
175/// - **Temporal**: Group by time windows
176/// - **Custom**: Domain-specific grouping logic
177///
178/// # Example
179///
180/// ```rust
181/// use queue_runtime::sessions::{SessionKeyGenerator, SessionKeyExtractor};
182/// use queue_runtime::message::SessionId;
183///
184/// struct ResourceIdStrategy;
185///
186/// impl SessionKeyGenerator for ResourceIdStrategy {
187///     fn generate_key(&self, extractor: &dyn SessionKeyExtractor) -> Option<SessionId> {
188///         extractor.get_metadata("resource_id")
189///             .and_then(|id| SessionId::new(format!("resource-{}", id)).ok())
190///     }
191/// }
192/// ```
193pub trait SessionKeyGenerator: Send + Sync {
194    /// Generate a session key for the given message.
195    ///
196    /// Returns `None` if the message should not be session-ordered, allowing
197    /// it to be processed concurrently without ordering constraints.
198    ///
199    /// # Arguments
200    ///
201    /// * `extractor` - Message implementing SessionKeyExtractor trait
202    ///
203    /// # Returns
204    ///
205    /// Optional session ID for grouping related messages
206    fn generate_key(&self, extractor: &dyn SessionKeyExtractor) -> Option<SessionId>;
207}
208
209// ============================================================================
210// Composite Key Strategy
211// ============================================================================
212
213/// Generates session keys by composing multiple metadata fields.
214///
215/// This strategy builds session keys from a list of metadata fields in order,
216/// joining them with a separator. This is useful for creating hierarchical
217/// or compound session keys.
218///
219/// # Example
220///
221/// ```rust
222/// use queue_runtime::sessions::CompositeKeyStrategy;
223///
224/// // Create session keys like "tenant-123-resource-456"
225/// let strategy = CompositeKeyStrategy::new(vec![
226///     "tenant_id".to_string(),
227///     "resource_id".to_string(),
228/// ], "-");
229/// ```
230pub struct CompositeKeyStrategy {
231    fields: Vec<String>,
232    separator: String,
233}
234
235impl CompositeKeyStrategy {
236    /// Create a new composite key strategy.
237    ///
238    /// # Arguments
239    ///
240    /// * `fields` - Ordered list of metadata field names to compose
241    /// * `separator` - String to join field values with
242    ///
243    /// # Example
244    ///
245    /// ```rust
246    /// use queue_runtime::sessions::CompositeKeyStrategy;
247    ///
248    /// let strategy = CompositeKeyStrategy::new(
249    ///     vec!["region".to_string(), "customer_id".to_string()],
250    ///     "-"
251    /// );
252    /// ```
253    pub fn new(fields: Vec<String>, separator: &str) -> Self {
254        Self {
255            fields,
256            separator: separator.to_string(),
257        }
258    }
259}
260
261impl SessionKeyGenerator for CompositeKeyStrategy {
262    fn generate_key(&self, extractor: &dyn SessionKeyExtractor) -> Option<SessionId> {
263        // Return None if no fields specified
264        if self.fields.is_empty() {
265            return None;
266        }
267
268        // Collect all field values
269        let values: Vec<String> = self
270            .fields
271            .iter()
272            .filter_map(|field| extractor.get_metadata(field))
273            .collect();
274
275        // Return None if any required field is missing
276        if values.len() != self.fields.len() {
277            return None;
278        }
279
280        // Join values with separator
281        let key = values.join(&self.separator);
282
283        // Create session ID
284        SessionId::new(key).ok()
285    }
286}
287
288// ============================================================================
289// Single Field Strategy
290// ============================================================================
291
292/// Generates session keys from a single metadata field.
293///
294/// This is the simplest strategy: extract one field and use it as the session key.
295/// Optionally adds a prefix for namespacing.
296///
297/// # Example
298///
299/// ```rust
300/// use queue_runtime::sessions::SingleFieldStrategy;
301///
302/// // Create session keys from "user_id" like "user-12345"
303/// let strategy = SingleFieldStrategy::new("user_id", Some("user"));
304/// ```
305pub struct SingleFieldStrategy {
306    field_name: String,
307    prefix: Option<String>,
308}
309
310impl SingleFieldStrategy {
311    /// Create a new single field strategy.
312    ///
313    /// # Arguments
314    ///
315    /// * `field_name` - The metadata field to use for the session key
316    /// * `prefix` - Optional prefix to add before the field value
317    ///
318    /// # Example
319    ///
320    /// ```rust
321    /// use queue_runtime::sessions::SingleFieldStrategy;
322    ///
323    /// let strategy = SingleFieldStrategy::new("order_id", Some("order"));
324    /// // Produces keys like "order-123"
325    /// ```
326    pub fn new(field_name: &str, prefix: Option<&str>) -> Self {
327        Self {
328            field_name: field_name.to_string(),
329            prefix: prefix.map(|s| s.to_string()),
330        }
331    }
332}
333
334impl SessionKeyGenerator for SingleFieldStrategy {
335    fn generate_key(&self, extractor: &dyn SessionKeyExtractor) -> Option<SessionId> {
336        // Get the field value
337        let value = extractor.get_metadata(&self.field_name)?;
338
339        // Build key with optional prefix
340        let key = if let Some(ref prefix) = self.prefix {
341            format!("{}-{}", prefix, value)
342        } else {
343            value
344        };
345
346        // Create session ID
347        SessionId::new(key).ok()
348    }
349}
350
351// ============================================================================
352// No Ordering Strategy
353// ============================================================================
354
355/// Strategy that disables session-based ordering.
356///
357/// Always returns `None`, allowing concurrent message processing without
358/// ordering guarantees. Use for stateless operations that don't require
359/// message ordering.
360///
361/// # Example
362///
363/// ```rust
364/// use queue_runtime::sessions::NoOrderingStrategy;
365///
366/// let strategy = NoOrderingStrategy;
367/// // All messages can be processed concurrently
368/// ```
369pub struct NoOrderingStrategy;
370
371impl SessionKeyGenerator for NoOrderingStrategy {
372    fn generate_key(&self, _extractor: &dyn SessionKeyExtractor) -> Option<SessionId> {
373        None
374    }
375}
376
377// ============================================================================
378// Fallback Strategy
379// ============================================================================
380
381/// Strategy that tries multiple generators in order, using the first success.
382///
383/// This implements a fallback chain: try the primary strategy first, and if it
384/// returns `None`, try the next strategy, and so on. Useful for providing
385/// fine-grained ordering when possible, with coarser fallbacks.
386///
387/// # Example
388///
389/// ```rust
390/// use queue_runtime::sessions::{FallbackStrategy, SingleFieldStrategy, CompositeKeyStrategy};
391///
392/// // Try specific entity ID first, fall back to tenant-level ordering
393/// let primary = SingleFieldStrategy::new("entity_id", Some("entity"));
394/// let fallback = SingleFieldStrategy::new("tenant_id", Some("tenant"));
395///
396/// let strategy = FallbackStrategy::new(vec![
397///     Box::new(primary),
398///     Box::new(fallback),
399/// ]);
400/// ```
401pub struct FallbackStrategy {
402    strategies: Vec<Box<dyn SessionKeyGenerator>>,
403}
404
405impl FallbackStrategy {
406    /// Create a new fallback strategy with ordered generators.
407    ///
408    /// # Arguments
409    ///
410    /// * `strategies` - Ordered list of generators to try
411    ///
412    /// # Example
413    ///
414    /// ```rust
415    /// use queue_runtime::sessions::{FallbackStrategy, SingleFieldStrategy, NoOrderingStrategy};
416    ///
417    /// let strategy = FallbackStrategy::new(vec![
418    ///     Box::new(SingleFieldStrategy::new("user_id", Some("user"))),
419    ///     Box::new(NoOrderingStrategy), // Ultimate fallback: no ordering
420    /// ]);
421    /// ```
422    pub fn new(strategies: Vec<Box<dyn SessionKeyGenerator>>) -> Self {
423        Self { strategies }
424    }
425}
426
427impl SessionKeyGenerator for FallbackStrategy {
428    fn generate_key(&self, extractor: &dyn SessionKeyExtractor) -> Option<SessionId> {
429        // Try each strategy in order until one succeeds
430        for strategy in &self.strategies {
431            if let Some(session_id) = strategy.generate_key(extractor) {
432                return Some(session_id);
433            }
434        }
435
436        // All strategies failed
437        None
438    }
439}
440
441// ============================================================================
442// Session Lock Management
443// ============================================================================
444
445/// Represents a lock on a session for exclusive message processing.
446///
447/// A session lock ensures that only one consumer can process messages from
448/// a session at a time, maintaining FIFO ordering guarantees. Locks have
449/// an expiration time and can be renewed to extend processing time.
450///
451/// # Design
452///
453/// - **Expiration**: Locks automatically expire after a timeout period
454/// - **Renewal**: Locks can be renewed before expiration to extend processing
455/// - **Owner Tracking**: Each lock tracks which consumer owns it
456/// - **Timeout Handling**: Expired locks can be acquired by other consumers
457///
458/// # Example
459///
460/// ```rust
461/// use queue_runtime::sessions::SessionLock;
462/// use queue_runtime::message::SessionId;
463/// use std::time::Duration;
464///
465/// # tokio_test::block_on(async {
466/// let session_id = SessionId::new("user-123".to_string()).unwrap();
467/// let lock = SessionLock::new(session_id.clone(), "consumer-1".to_string(), Duration::from_secs(30));
468///
469/// assert!(!lock.is_expired());
470/// assert_eq!(lock.owner(), "consumer-1");
471/// # });
472/// ```
473#[derive(Debug, Clone)]
474pub struct SessionLock {
475    session_id: SessionId,
476    owner: String,
477    acquired_at: Instant,
478    expires_at: Instant,
479    lock_duration: Duration,
480}
481
482impl SessionLock {
483    /// Create a new session lock.
484    ///
485    /// # Arguments
486    ///
487    /// * `session_id` - The session being locked
488    /// * `owner` - Identifier of the consumer owning this lock
489    /// * `lock_duration` - How long the lock is valid before expiration
490    ///
491    /// # Returns
492    ///
493    /// A new session lock that expires after `lock_duration`
494    pub fn new(session_id: SessionId, owner: String, lock_duration: Duration) -> Self {
495        let now = Instant::now();
496        Self {
497            session_id,
498            owner,
499            acquired_at: now,
500            expires_at: now + lock_duration,
501            lock_duration,
502        }
503    }
504
505    /// Get the session ID this lock is for.
506    pub fn session_id(&self) -> &SessionId {
507        &self.session_id
508    }
509
510    /// Get the owner of this lock.
511    pub fn owner(&self) -> &str {
512        &self.owner
513    }
514
515    /// Get when this lock was acquired.
516    pub fn acquired_at(&self) -> Instant {
517        self.acquired_at
518    }
519
520    /// Get when this lock expires.
521    pub fn expires_at(&self) -> Instant {
522        self.expires_at
523    }
524
525    /// Get the configured lock duration.
526    pub fn lock_duration(&self) -> Duration {
527        self.lock_duration
528    }
529
530    /// Check if this lock has expired.
531    ///
532    /// # Returns
533    ///
534    /// `true` if the current time is past the expiration time
535    pub fn is_expired(&self) -> bool {
536        Instant::now() >= self.expires_at
537    }
538
539    /// Get the remaining time before this lock expires.
540    ///
541    /// # Returns
542    ///
543    /// Duration until expiration, or zero if already expired
544    pub fn time_remaining(&self) -> Duration {
545        let now = Instant::now();
546        if now >= self.expires_at {
547            Duration::ZERO
548        } else {
549            self.expires_at - now
550        }
551    }
552
553    /// Renew this lock, extending its expiration time.
554    ///
555    /// # Arguments
556    ///
557    /// * `extension` - How long to extend the lock by
558    ///
559    /// # Returns
560    ///
561    /// A new lock with updated expiration time
562    pub fn renew(&self, extension: Duration) -> Self {
563        Self {
564            session_id: self.session_id.clone(),
565            owner: self.owner.clone(),
566            acquired_at: self.acquired_at,
567            expires_at: Instant::now() + extension,
568            lock_duration: extension,
569        }
570    }
571}
572
573/// Manages session locks for concurrent message processing.
574///
575/// The lock manager coordinates exclusive access to sessions, ensuring that
576/// only one consumer processes messages from a session at a time. It handles
577/// lock acquisition, renewal, release, and automatic expiration cleanup.
578///
579/// # Thread Safety
580///
581/// This type is thread-safe and can be shared across async tasks using `Arc`.
582///
583/// # Example
584///
585/// ```rust
586/// use queue_runtime::sessions::SessionLockManager;
587/// use queue_runtime::message::SessionId;
588/// use std::time::Duration;
589///
590/// # tokio_test::block_on(async {
591/// let manager = SessionLockManager::new(Duration::from_secs(30));
592/// let session_id = SessionId::new("order-456".to_string()).unwrap();
593///
594/// // Acquire lock
595/// let lock = manager.acquire_lock(session_id.clone(), "consumer-1".to_string()).await?;
596/// assert_eq!(lock.owner(), "consumer-1");
597///
598/// // Try to acquire same session with different consumer - should fail
599/// let result = manager.try_acquire_lock(session_id.clone(), "consumer-2".to_string()).await;
600/// assert!(result.is_err());
601///
602/// // Release lock
603/// manager.release_lock(&session_id, "consumer-1").await?;
604/// # Ok::<(), queue_runtime::QueueError>(())
605/// # });
606/// ```
607pub struct SessionLockManager {
608    locks: Arc<RwLock<HashMap<SessionId, SessionLock>>>,
609    default_lock_duration: Duration,
610}
611
612impl SessionLockManager {
613    /// Create a new session lock manager.
614    ///
615    /// # Arguments
616    ///
617    /// * `default_lock_duration` - Default duration for session locks
618    ///
619    /// # Example
620    ///
621    /// ```rust
622    /// use queue_runtime::sessions::SessionLockManager;
623    /// use std::time::Duration;
624    ///
625    /// let manager = SessionLockManager::new(Duration::from_secs(60));
626    /// ```
627    pub fn new(default_lock_duration: Duration) -> Self {
628        Self {
629            locks: Arc::new(RwLock::new(HashMap::new())),
630            default_lock_duration,
631        }
632    }
633
634    /// Try to acquire a lock on a session (non-blocking).
635    ///
636    /// Returns immediately with an error if the session is already locked
637    /// by another consumer.
638    ///
639    /// # Arguments
640    ///
641    /// * `session_id` - The session to lock
642    /// * `owner` - Identifier of the consumer requesting the lock
643    ///
644    /// # Returns
645    ///
646    /// The acquired lock if successful, or an error if the session is locked
647    ///
648    /// # Errors
649    ///
650    /// Returns `QueueError::SessionLocked` if the session is already locked
651    /// by another consumer and the lock has not expired.
652    pub async fn try_acquire_lock(
653        &self,
654        session_id: SessionId,
655        owner: String,
656    ) -> Result<SessionLock, QueueError> {
657        let mut locks = self.locks.write().await;
658
659        // Check if session is already locked
660        if let Some(existing_lock) = locks.get(&session_id) {
661            if !existing_lock.is_expired() {
662                // Lock is still valid and owned by someone else
663                if existing_lock.owner() != owner {
664                    return Err(QueueError::SessionLocked {
665                        session_id: session_id.to_string(),
666                        locked_until: Timestamp::now(),
667                    });
668                }
669                // Same owner - return existing lock
670                return Ok(existing_lock.clone());
671            }
672            // Lock expired - remove it and acquire new lock below
673        }
674
675        // Acquire new lock
676        let lock = SessionLock::new(session_id.clone(), owner, self.default_lock_duration);
677        locks.insert(session_id, lock.clone());
678
679        Ok(lock)
680    }
681
682    /// Acquire a lock on a session (blocking with timeout).
683    ///
684    /// Waits for the lock to become available if it's currently held by
685    /// another consumer, up to the specified timeout.
686    ///
687    /// # Arguments
688    ///
689    /// * `session_id` - The session to lock
690    /// * `owner` - Identifier of the consumer requesting the lock
691    ///
692    /// # Returns
693    ///
694    /// The acquired lock if successful within the timeout period
695    ///
696    /// # Errors
697    ///
698    /// Returns `QueueError::SessionLocked` if unable to acquire the lock
699    /// within the timeout period.
700    pub async fn acquire_lock(
701        &self,
702        session_id: SessionId,
703        owner: String,
704    ) -> Result<SessionLock, QueueError> {
705        // For now, just try once - future enhancement could add retry logic
706        self.try_acquire_lock(session_id, owner).await
707    }
708
709    /// Renew an existing session lock.
710    ///
711    /// Extends the lock's expiration time, allowing the consumer to continue
712    /// processing messages from the session.
713    ///
714    /// # Arguments
715    ///
716    /// * `session_id` - The session whose lock to renew
717    /// * `owner` - Identifier of the consumer owning the lock
718    /// * `extension` - How long to extend the lock by (if None, uses default duration)
719    ///
720    /// # Returns
721    ///
722    /// The renewed lock with updated expiration time
723    ///
724    /// # Errors
725    ///
726    /// Returns `QueueError::SessionNotFound` if no lock exists for the session.
727    /// Returns `QueueError::SessionLocked` if the lock is owned by a different consumer.
728    pub async fn renew_lock(
729        &self,
730        session_id: &SessionId,
731        owner: &str,
732        extension: Option<Duration>,
733    ) -> Result<SessionLock, QueueError> {
734        let mut locks = self.locks.write().await;
735
736        let existing_lock = locks
737            .get(session_id)
738            .ok_or_else(|| QueueError::SessionNotFound {
739                session_id: session_id.to_string(),
740            })?;
741
742        // Verify ownership
743        if existing_lock.owner() != owner {
744            return Err(QueueError::SessionLocked {
745                session_id: session_id.to_string(),
746                locked_until: Timestamp::now(),
747            });
748        }
749
750        // Renew the lock
751        let renewed_lock = existing_lock.renew(extension.unwrap_or(self.default_lock_duration));
752        locks.insert(session_id.clone(), renewed_lock.clone());
753
754        Ok(renewed_lock)
755    }
756
757    /// Release a session lock.
758    ///
759    /// Removes the lock, allowing other consumers to acquire it.
760    ///
761    /// # Arguments
762    ///
763    /// * `session_id` - The session whose lock to release
764    /// * `owner` - Identifier of the consumer releasing the lock
765    ///
766    /// # Returns
767    ///
768    /// `Ok(())` if the lock was successfully released
769    ///
770    /// # Errors
771    ///
772    /// Returns `QueueError::SessionNotFound` if no lock exists for the session.
773    /// Returns `QueueError::SessionLocked` if the lock is owned by a different consumer.
774    pub async fn release_lock(
775        &self,
776        session_id: &SessionId,
777        owner: &str,
778    ) -> Result<(), QueueError> {
779        let mut locks = self.locks.write().await;
780
781        let existing_lock = locks
782            .get(session_id)
783            .ok_or_else(|| QueueError::SessionNotFound {
784                session_id: session_id.to_string(),
785            })?;
786
787        // Verify ownership
788        if existing_lock.owner() != owner {
789            return Err(QueueError::SessionLocked {
790                session_id: session_id.to_string(),
791                locked_until: Timestamp::now(),
792            });
793        }
794
795        // Remove the lock
796        locks.remove(session_id);
797
798        Ok(())
799    }
800
801    /// Check if a session is currently locked.
802    ///
803    /// # Arguments
804    ///
805    /// * `session_id` - The session to check
806    ///
807    /// # Returns
808    ///
809    /// `true` if the session has a valid (non-expired) lock
810    pub async fn is_locked(&self, session_id: &SessionId) -> bool {
811        let locks = self.locks.read().await;
812        locks
813            .get(session_id)
814            .map(|lock| !lock.is_expired())
815            .unwrap_or(false)
816    }
817
818    /// Get information about a session lock.
819    ///
820    /// # Arguments
821    ///
822    /// * `session_id` - The session to query
823    ///
824    /// # Returns
825    ///
826    /// The lock information if it exists and is not expired
827    pub async fn get_lock(&self, session_id: &SessionId) -> Option<SessionLock> {
828        let locks = self.locks.read().await;
829        locks
830            .get(session_id)
831            .filter(|lock| !lock.is_expired())
832            .cloned()
833    }
834
835    /// Clean up expired locks.
836    ///
837    /// Removes all locks that have passed their expiration time.
838    ///
839    /// # Returns
840    ///
841    /// The number of expired locks that were removed
842    pub async fn cleanup_expired_locks(&self) -> usize {
843        let mut locks = self.locks.write().await;
844
845        let expired: Vec<SessionId> = locks
846            .iter()
847            .filter(|(_, lock)| lock.is_expired())
848            .map(|(id, _)| id.clone())
849            .collect();
850
851        let count = expired.len();
852        for session_id in expired {
853            locks.remove(&session_id);
854        }
855
856        count
857    }
858
859    /// Get the number of currently held locks (including expired).
860    ///
861    /// # Returns
862    ///
863    /// Total number of locks in the manager
864    pub async fn lock_count(&self) -> usize {
865        let locks = self.locks.read().await;
866        locks.len()
867    }
868
869    /// Get the number of active (non-expired) locks.
870    ///
871    /// # Returns
872    ///
873    /// Number of locks that have not expired
874    pub async fn active_lock_count(&self) -> usize {
875        let locks = self.locks.read().await;
876        locks.values().filter(|lock| !lock.is_expired()).count()
877    }
878}
879
880// ============================================================================
881// Session Affinity Tracking
882// ============================================================================
883
884/// Mapping of a session to its assigned consumer.
885///
886/// Session affinity ensures that all messages for a given session are processed
887/// by the same consumer, maintaining ordering and state consistency.
888///
889/// # Examples
890///
891/// ```
892/// use queue_runtime::sessions::SessionAffinity;
893/// use queue_runtime::message::SessionId;
894/// use std::time::{SystemTime, Duration};
895///
896/// # tokio_test::block_on(async {
897/// let session_id = SessionId::new("order-789".to_string()).unwrap();
898/// let affinity = SessionAffinity::new(
899///     session_id.clone(),
900///     "worker-3".to_string(),
901///     Duration::from_secs(300)
902/// );
903///
904/// assert_eq!(affinity.session_id(), &session_id);
905/// assert_eq!(affinity.consumer_id(), "worker-3");
906/// assert!(!affinity.is_expired());
907/// # });
908/// ```
909#[derive(Debug, Clone)]
910pub struct SessionAffinity {
911    session_id: SessionId,
912    consumer_id: String,
913    assigned_at: Instant,
914    expires_at: Instant,
915    affinity_duration: Duration,
916    last_activity: Instant,
917}
918
919impl SessionAffinity {
920    /// Create a new session affinity mapping.
921    ///
922    /// # Arguments
923    ///
924    /// * `session_id` - The session being tracked
925    /// * `consumer_id` - Identifier of the consumer assigned to this session
926    /// * `affinity_duration` - How long the affinity is valid
927    ///
928    /// # Returns
929    ///
930    /// A new `SessionAffinity` instance
931    pub fn new(session_id: SessionId, consumer_id: String, affinity_duration: Duration) -> Self {
932        let now = Instant::now();
933        Self {
934            session_id,
935            consumer_id,
936            assigned_at: now,
937            expires_at: now + affinity_duration,
938            affinity_duration,
939            last_activity: now,
940        }
941    }
942
943    /// Get the session ID.
944    pub fn session_id(&self) -> &SessionId {
945        &self.session_id
946    }
947
948    /// Get the consumer ID.
949    pub fn consumer_id(&self) -> &str {
950        &self.consumer_id
951    }
952
953    /// Get the affinity duration.
954    pub fn affinity_duration(&self) -> Duration {
955        self.affinity_duration
956    }
957
958    /// Get when the affinity was assigned.
959    pub fn assigned_at(&self) -> Instant {
960        self.assigned_at
961    }
962
963    /// Check if the affinity has expired.
964    ///
965    /// # Returns
966    ///
967    /// `true` if the affinity has expired, `false` otherwise
968    pub fn is_expired(&self) -> bool {
969        Instant::now() >= self.expires_at
970    }
971
972    /// Get the remaining time before expiration.
973    ///
974    /// # Returns
975    ///
976    /// Duration remaining, or zero if expired
977    pub fn time_remaining(&self) -> Duration {
978        let now = Instant::now();
979        if now >= self.expires_at {
980            Duration::ZERO
981        } else {
982            self.expires_at - now
983        }
984    }
985
986    /// Update the last activity time.
987    ///
988    /// This is called when a message is processed for the session,
989    /// keeping the affinity fresh.
990    pub fn touch(&mut self) {
991        self.last_activity = Instant::now();
992    }
993
994    /// Get the time since last activity.
995    pub fn idle_time(&self) -> Duration {
996        Instant::now().duration_since(self.last_activity)
997    }
998
999    /// Extend the affinity expiration.
1000    ///
1001    /// # Arguments
1002    ///
1003    /// * `additional_duration` - Additional time to add to expiration
1004    ///
1005    /// # Returns
1006    ///
1007    /// A new `SessionAffinity` with extended expiration
1008    pub fn extend(&self, additional_duration: Duration) -> Self {
1009        let mut extended = self.clone();
1010        extended.expires_at = Instant::now() + additional_duration;
1011        extended
1012    }
1013}
1014
1015/// Tracks session-to-consumer affinity mappings for ordered processing.
1016///
1017/// The affinity tracker ensures that all messages for a given session are
1018/// routed to the same consumer, maintaining message ordering and processing
1019/// consistency within sessions.
1020///
1021/// # Thread Safety
1022///
1023/// This type uses `Arc<RwLock<>>` internally and can be safely shared across
1024/// threads and tasks.
1025///
1026/// # Examples
1027///
1028/// ```
1029/// use queue_runtime::sessions::SessionAffinityTracker;
1030/// use queue_runtime::message::SessionId;
1031/// use std::time::Duration;
1032///
1033/// # tokio_test::block_on(async {
1034/// let tracker = SessionAffinityTracker::new(Duration::from_secs(600));
1035/// let session_id = SessionId::new("session-123".to_string()).unwrap();
1036///
1037/// // Assign session to consumer
1038/// let affinity = tracker.assign_session(session_id.clone(), "worker-1".to_string()).await.unwrap();
1039/// assert_eq!(affinity.consumer_id(), "worker-1");
1040///
1041/// // Query affinity
1042/// let consumer = tracker.get_consumer(&session_id).await;
1043/// assert_eq!(consumer, Some("worker-1".to_string()));
1044/// # });
1045/// ```
1046#[derive(Clone)]
1047pub struct SessionAffinityTracker {
1048    affinities: Arc<RwLock<HashMap<SessionId, SessionAffinity>>>,
1049    default_affinity_duration: Duration,
1050}
1051
1052impl SessionAffinityTracker {
1053    /// Create a new session affinity tracker.
1054    ///
1055    /// # Arguments
1056    ///
1057    /// * `default_affinity_duration` - Default duration for affinity mappings
1058    ///
1059    /// # Returns
1060    ///
1061    /// A new `SessionAffinityTracker` instance
1062    pub fn new(default_affinity_duration: Duration) -> Self {
1063        Self {
1064            affinities: Arc::new(RwLock::new(HashMap::new())),
1065            default_affinity_duration,
1066        }
1067    }
1068
1069    /// Assign a session to a consumer.
1070    ///
1071    /// If the session is already assigned and not expired, returns an error.
1072    /// If the session affinity has expired, reassigns to the new consumer.
1073    ///
1074    /// # Arguments
1075    ///
1076    /// * `session_id` - The session to assign
1077    /// * `consumer_id` - The consumer to assign the session to
1078    ///
1079    /// # Returns
1080    ///
1081    /// The created affinity mapping on success, or an error if the session
1082    /// is already assigned to a different consumer.
1083    ///
1084    /// # Errors
1085    ///
1086    /// Returns `QueueError::SessionLocked` if the session is already assigned
1087    /// to a different consumer and the affinity has not expired.
1088    pub async fn assign_session(
1089        &self,
1090        session_id: SessionId,
1091        consumer_id: String,
1092    ) -> Result<SessionAffinity, QueueError> {
1093        let mut affinities = self.affinities.write().await;
1094
1095        // Check if session is already assigned
1096        if let Some(existing) = affinities.get(&session_id) {
1097            if !existing.is_expired() {
1098                if existing.consumer_id() != consumer_id {
1099                    // Session assigned to different consumer
1100                    return Err(QueueError::SessionLocked {
1101                        session_id: session_id.to_string(),
1102                        locked_until: Timestamp::now(), // Approximate
1103                    });
1104                }
1105                // Same consumer - return existing affinity
1106                return Ok(existing.clone());
1107            }
1108            // Expired - will reassign below
1109        }
1110
1111        // Create new affinity
1112        let affinity = SessionAffinity::new(
1113            session_id.clone(),
1114            consumer_id,
1115            self.default_affinity_duration,
1116        );
1117
1118        affinities.insert(session_id, affinity.clone());
1119        Ok(affinity)
1120    }
1121
1122    /// Get the consumer assigned to a session.
1123    ///
1124    /// # Arguments
1125    ///
1126    /// * `session_id` - The session to query
1127    ///
1128    /// # Returns
1129    ///
1130    /// The consumer ID if the session has an active affinity, `None` otherwise
1131    pub async fn get_consumer(&self, session_id: &SessionId) -> Option<String> {
1132        let affinities = self.affinities.read().await;
1133        affinities
1134            .get(session_id)
1135            .filter(|affinity| !affinity.is_expired())
1136            .map(|affinity| affinity.consumer_id().to_string())
1137    }
1138
1139    /// Get the full affinity information for a session.
1140    ///
1141    /// # Arguments
1142    ///
1143    /// * `session_id` - The session to query
1144    ///
1145    /// # Returns
1146    ///
1147    /// The affinity information if the session has an active affinity
1148    pub async fn get_affinity(&self, session_id: &SessionId) -> Option<SessionAffinity> {
1149        let affinities = self.affinities.read().await;
1150        affinities
1151            .get(session_id)
1152            .filter(|affinity| !affinity.is_expired())
1153            .cloned()
1154    }
1155
1156    /// Check if a session has an active affinity.
1157    ///
1158    /// # Arguments
1159    ///
1160    /// * `session_id` - The session to check
1161    ///
1162    /// # Returns
1163    ///
1164    /// `true` if the session has an active affinity
1165    pub async fn has_affinity(&self, session_id: &SessionId) -> bool {
1166        self.get_consumer(session_id).await.is_some()
1167    }
1168
1169    /// Update the last activity time for a session.
1170    ///
1171    /// This should be called when a message is processed for the session.
1172    ///
1173    /// # Arguments
1174    ///
1175    /// * `session_id` - The session to update
1176    ///
1177    /// # Returns
1178    ///
1179    /// `Ok(())` if successful, error if session not found or expired
1180    pub async fn touch_session(&self, session_id: &SessionId) -> Result<(), QueueError> {
1181        let mut affinities = self.affinities.write().await;
1182
1183        if let Some(affinity) = affinities.get_mut(session_id) {
1184            if !affinity.is_expired() {
1185                affinity.touch();
1186                return Ok(());
1187            }
1188        }
1189
1190        Err(QueueError::SessionNotFound {
1191            session_id: session_id.to_string(),
1192        })
1193    }
1194
1195    /// Release a session affinity.
1196    ///
1197    /// # Arguments
1198    ///
1199    /// * `session_id` - The session to release
1200    /// * `consumer_id` - The consumer releasing the session (for validation)
1201    ///
1202    /// # Returns
1203    ///
1204    /// `Ok(())` if successful
1205    ///
1206    /// # Errors
1207    ///
1208    /// Returns error if the consumer doesn't own the session
1209    pub async fn release_session(
1210        &self,
1211        session_id: &SessionId,
1212        consumer_id: &str,
1213    ) -> Result<(), QueueError> {
1214        let mut affinities = self.affinities.write().await;
1215
1216        if let Some(affinity) = affinities.get(session_id) {
1217            if affinity.consumer_id() != consumer_id {
1218                return Err(QueueError::ValidationError(
1219                    ValidationError::InvalidFormat {
1220                        field: "consumer_id".to_string(),
1221                        message: format!(
1222                            "Session owned by {}, cannot release from {}",
1223                            affinity.consumer_id(),
1224                            consumer_id
1225                        ),
1226                    },
1227                ));
1228            }
1229        }
1230
1231        affinities.remove(session_id);
1232        Ok(())
1233    }
1234
1235    /// Extend the affinity duration for a session.
1236    ///
1237    /// # Arguments
1238    ///
1239    /// * `session_id` - The session to extend
1240    /// * `consumer_id` - The consumer requesting the extension (for validation)
1241    /// * `additional_duration` - Additional time to add
1242    ///
1243    /// # Returns
1244    ///
1245    /// The updated affinity on success
1246    ///
1247    /// # Errors
1248    ///
1249    /// Returns error if consumer doesn't own the session or session not found
1250    pub async fn extend_affinity(
1251        &self,
1252        session_id: &SessionId,
1253        consumer_id: &str,
1254        additional_duration: Duration,
1255    ) -> Result<SessionAffinity, QueueError> {
1256        let mut affinities = self.affinities.write().await;
1257
1258        if let Some(affinity) = affinities.get(session_id) {
1259            if affinity.consumer_id() != consumer_id {
1260                return Err(QueueError::ValidationError(
1261                    ValidationError::InvalidFormat {
1262                        field: "consumer_id".to_string(),
1263                        message: format!(
1264                            "Session owned by {}, cannot extend from {}",
1265                            affinity.consumer_id(),
1266                            consumer_id
1267                        ),
1268                    },
1269                ));
1270            }
1271
1272            let extended = affinity.extend(additional_duration);
1273            affinities.insert(session_id.clone(), extended.clone());
1274            return Ok(extended);
1275        }
1276
1277        Err(QueueError::SessionNotFound {
1278            session_id: session_id.to_string(),
1279        })
1280    }
1281
1282    /// Get all sessions assigned to a consumer.
1283    ///
1284    /// # Arguments
1285    ///
1286    /// * `consumer_id` - The consumer to query
1287    ///
1288    /// # Returns
1289    ///
1290    /// List of session IDs assigned to the consumer
1291    pub async fn get_consumer_sessions(&self, consumer_id: &str) -> Vec<SessionId> {
1292        let affinities = self.affinities.read().await;
1293        affinities
1294            .iter()
1295            .filter(|(_, affinity)| !affinity.is_expired() && affinity.consumer_id() == consumer_id)
1296            .map(|(session_id, _)| session_id.clone())
1297            .collect()
1298    }
1299
1300    /// Clean up expired affinities.
1301    ///
1302    /// # Returns
1303    ///
1304    /// Number of affinities removed
1305    pub async fn cleanup_expired(&self) -> usize {
1306        let mut affinities = self.affinities.write().await;
1307
1308        let expired: Vec<SessionId> = affinities
1309            .iter()
1310            .filter(|(_, affinity)| affinity.is_expired())
1311            .map(|(session_id, _)| session_id.clone())
1312            .collect();
1313
1314        let count = expired.len();
1315        for session_id in expired {
1316            affinities.remove(&session_id);
1317        }
1318
1319        count
1320    }
1321
1322    /// Get the total number of affinity mappings (including expired).
1323    pub async fn affinity_count(&self) -> usize {
1324        let affinities = self.affinities.read().await;
1325        affinities.len()
1326    }
1327
1328    /// Get the number of active (non-expired) affinities.
1329    pub async fn active_affinity_count(&self) -> usize {
1330        let affinities = self.affinities.read().await;
1331        affinities
1332            .values()
1333            .filter(|affinity| !affinity.is_expired())
1334            .count()
1335    }
1336}
1337
1338// ============================================================================
1339// Session Lifecycle Management
1340// ============================================================================
1341
1342/// Information about an active session's lifecycle state.
1343///
1344/// Tracks activity metrics used for determining when to close sessions
1345/// based on duration limits, message counts, or inactivity timeouts.
1346///
1347/// # Examples
1348///
1349/// ```
1350/// use queue_runtime::{SessionInfo, SessionId};
1351/// use std::time::Duration;
1352///
1353/// let session_id = SessionId::new("order-123".to_string()).unwrap();
1354/// let mut info = SessionInfo::new(session_id.clone(), "worker-1".to_string());
1355///
1356/// // Record message processing
1357/// info.increment_message_count();
1358/// info.increment_message_count();
1359///
1360/// // Check duration
1361/// assert!(info.duration() < Duration::from_secs(1));
1362///
1363/// // Check message count
1364/// assert_eq!(info.message_count(), 2);
1365/// ```
1366#[derive(Debug, Clone)]
1367pub struct SessionInfo {
1368    session_id: SessionId,
1369    consumer_id: String,
1370    started_at: Instant,
1371    last_activity: Instant,
1372    message_count: u32,
1373}
1374
1375impl SessionInfo {
1376    /// Create a new session info tracker.
1377    pub fn new(session_id: SessionId, consumer_id: String) -> Self {
1378        let now = Instant::now();
1379        Self {
1380            session_id,
1381            consumer_id,
1382            started_at: now,
1383            last_activity: now,
1384            message_count: 0,
1385        }
1386    }
1387
1388    /// Get the session ID.
1389    pub fn session_id(&self) -> &SessionId {
1390        &self.session_id
1391    }
1392
1393    /// Get the consumer ID.
1394    pub fn consumer_id(&self) -> &str {
1395        &self.consumer_id
1396    }
1397
1398    /// Get the time when this session was started.
1399    pub fn started_at(&self) -> Instant {
1400        self.started_at
1401    }
1402
1403    /// Get the time of last activity in this session.
1404    pub fn last_activity(&self) -> Instant {
1405        self.last_activity
1406    }
1407
1408    /// Get the number of messages processed in this session.
1409    pub fn message_count(&self) -> u32 {
1410        self.message_count
1411    }
1412
1413    /// Calculate how long this session has been active.
1414    pub fn duration(&self) -> Duration {
1415        Instant::now().saturating_duration_since(self.started_at)
1416    }
1417
1418    /// Calculate how long since last activity.
1419    pub fn idle_time(&self) -> Duration {
1420        Instant::now().saturating_duration_since(self.last_activity)
1421    }
1422
1423    /// Record message processing activity.
1424    pub fn increment_message_count(&mut self) {
1425        self.message_count += 1;
1426        self.last_activity = Instant::now();
1427    }
1428
1429    /// Update last activity timestamp without incrementing message count.
1430    pub fn touch(&mut self) {
1431        self.last_activity = Instant::now();
1432    }
1433}
1434
1435/// Configuration for session lifecycle management.
1436///
1437/// Defines limits and timeouts that determine when sessions should be
1438/// automatically closed to prevent resource exhaustion and ensure fair
1439/// processing distribution.
1440///
1441/// # Examples
1442///
1443/// ```
1444/// use queue_runtime::SessionLifecycleConfig;
1445/// use std::time::Duration;
1446///
1447/// let config = SessionLifecycleConfig {
1448///     max_session_duration: Duration::from_secs(2 * 60 * 60), // 2 hours
1449///     max_messages_per_session: 1000,
1450///     session_timeout: Duration::from_secs(30 * 60), // 30 minutes
1451/// };
1452///
1453/// // Check if defaults are reasonable
1454/// assert_eq!(config.max_session_duration, Duration::from_secs(7200));
1455/// ```
1456#[derive(Debug, Clone)]
1457pub struct SessionLifecycleConfig {
1458    /// Maximum duration a session can be active before forced closure.
1459    pub max_session_duration: Duration,
1460
1461    /// Maximum number of messages processed per session before forced closure.
1462    pub max_messages_per_session: u32,
1463
1464    /// Maximum idle time before session is considered timed out.
1465    pub session_timeout: Duration,
1466}
1467
1468impl Default for SessionLifecycleConfig {
1469    fn default() -> Self {
1470        Self {
1471            max_session_duration: Duration::from_secs(2 * 60 * 60), // 2 hours
1472            max_messages_per_session: 1000,
1473            session_timeout: Duration::from_secs(30 * 60), // 30 minutes
1474        }
1475    }
1476}
1477
1478/// Manages session lifecycles with automatic cleanup and recovery.
1479///
1480/// Tracks active sessions and enforces limits on duration, message count,
1481/// and inactivity. Integrates with lock and affinity management to ensure
1482/// proper resource cleanup when sessions are forcibly closed.
1483///
1484/// # Thread Safety
1485///
1486/// All operations are async and use `Arc<RwLock<>>` for thread-safe access.
1487///
1488/// # Examples
1489///
1490/// ```
1491/// use queue_runtime::{SessionLifecycleManager, SessionId, SessionLifecycleConfig};
1492/// use std::time::Duration;
1493///
1494/// # #[tokio::main]
1495/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1496/// let config = SessionLifecycleConfig::default();
1497/// let manager = SessionLifecycleManager::new(config);
1498///
1499/// let session_id = SessionId::new("order-456".to_string())?;
1500///
1501/// // Start tracking a session
1502/// manager.start_session(session_id.clone(), "worker-1".to_string()).await?;
1503///
1504/// // Record activity
1505/// manager.record_message(&session_id).await?;
1506///
1507/// // Check if session should be closed
1508/// let should_close = manager.should_close_session(&session_id).await;
1509/// # Ok(())
1510/// # }
1511/// ```
1512#[derive(Debug, Clone)]
1513pub struct SessionLifecycleManager {
1514    active_sessions: Arc<RwLock<HashMap<SessionId, SessionInfo>>>,
1515    config: SessionLifecycleConfig,
1516}
1517
1518impl SessionLifecycleManager {
1519    /// Create a new session lifecycle manager with the given configuration.
1520    pub fn new(config: SessionLifecycleConfig) -> Self {
1521        Self {
1522            active_sessions: Arc::new(RwLock::new(HashMap::new())),
1523            config,
1524        }
1525    }
1526
1527    /// Start tracking a new session.
1528    ///
1529    /// # Errors
1530    ///
1531    /// Returns `QueueError::ValidationError` if session is already being tracked.
1532    pub async fn start_session(
1533        &self,
1534        session_id: SessionId,
1535        consumer_id: String,
1536    ) -> Result<(), QueueError> {
1537        let mut sessions = self.active_sessions.write().await;
1538
1539        if sessions.contains_key(&session_id) {
1540            return Err(QueueError::ValidationError(
1541                ValidationError::InvalidFormat {
1542                    field: "session_id".to_string(),
1543                    message: format!("Session {} is already active", session_id),
1544                },
1545            ));
1546        }
1547
1548        sessions.insert(
1549            session_id.clone(),
1550            SessionInfo::new(session_id, consumer_id),
1551        );
1552        Ok(())
1553    }
1554
1555    /// Stop tracking a session.
1556    ///
1557    /// # Errors
1558    ///
1559    /// Returns `QueueError::SessionNotFound` if session is not being tracked.
1560    pub async fn stop_session(&self, session_id: &SessionId) -> Result<(), QueueError> {
1561        let mut sessions = self.active_sessions.write().await;
1562
1563        if sessions.remove(session_id).is_none() {
1564            return Err(QueueError::SessionNotFound {
1565                session_id: session_id.to_string(),
1566            });
1567        }
1568
1569        Ok(())
1570    }
1571
1572    /// Record message processing activity for a session.
1573    ///
1574    /// Increments message count and updates last activity timestamp.
1575    ///
1576    /// # Errors
1577    ///
1578    /// Returns `QueueError::SessionNotFound` if session is not being tracked.
1579    pub async fn record_message(&self, session_id: &SessionId) -> Result<(), QueueError> {
1580        let mut sessions = self.active_sessions.write().await;
1581
1582        let session_info =
1583            sessions
1584                .get_mut(session_id)
1585                .ok_or_else(|| QueueError::SessionNotFound {
1586                    session_id: session_id.to_string(),
1587                })?;
1588
1589        session_info.increment_message_count();
1590        Ok(())
1591    }
1592
1593    /// Update last activity timestamp without incrementing message count.
1594    ///
1595    /// # Errors
1596    ///
1597    /// Returns `QueueError::SessionNotFound` if session is not being tracked.
1598    pub async fn touch_session(&self, session_id: &SessionId) -> Result<(), QueueError> {
1599        let mut sessions = self.active_sessions.write().await;
1600
1601        let session_info =
1602            sessions
1603                .get_mut(session_id)
1604                .ok_or_else(|| QueueError::SessionNotFound {
1605                    session_id: session_id.to_string(),
1606                })?;
1607
1608        session_info.touch();
1609        Ok(())
1610    }
1611
1612    /// Get information about a session.
1613    ///
1614    /// Returns `None` if session is not being tracked.
1615    pub async fn get_session_info(&self, session_id: &SessionId) -> Option<SessionInfo> {
1616        let sessions = self.active_sessions.read().await;
1617        sessions.get(session_id).cloned()
1618    }
1619
1620    /// Check if a session should be closed based on configured limits.
1621    ///
1622    /// A session should be closed if:
1623    /// - It has exceeded the maximum duration
1624    /// - It has processed more than the maximum message count
1625    /// - It has been idle longer than the timeout
1626    ///
1627    /// Returns `false` if session is not being tracked.
1628    pub async fn should_close_session(&self, session_id: &SessionId) -> bool {
1629        let sessions = self.active_sessions.read().await;
1630
1631        if let Some(session_info) = sessions.get(session_id) {
1632            // Check duration limit
1633            if session_info.duration() > self.config.max_session_duration {
1634                return true;
1635            }
1636
1637            // Check message count limit
1638            if session_info.message_count > self.config.max_messages_per_session {
1639                return true;
1640            }
1641
1642            // Check timeout
1643            if session_info.idle_time() > self.config.session_timeout {
1644                return true;
1645            }
1646
1647            false
1648        } else {
1649            false
1650        }
1651    }
1652
1653    /// Get all sessions that should be closed based on configured limits.
1654    ///
1655    /// Returns a list of session IDs that have exceeded limits.
1656    pub async fn get_sessions_to_close(&self) -> Vec<SessionId> {
1657        let sessions = self.active_sessions.read().await;
1658
1659        sessions
1660            .iter()
1661            .filter(|(_session_id, session_info)| {
1662                session_info.duration() > self.config.max_session_duration
1663                    || session_info.message_count > self.config.max_messages_per_session
1664                    || session_info.idle_time() > self.config.session_timeout
1665            })
1666            .map(|(session_id, _)| session_id.clone())
1667            .collect()
1668    }
1669
1670    /// Clean up sessions that have exceeded limits.
1671    ///
1672    /// # Returns
1673    ///
1674    /// Vector of session IDs that were cleaned up
1675    pub async fn cleanup_expired_sessions(&self) -> Vec<SessionId> {
1676        let expired_sessions = self.get_sessions_to_close().await;
1677
1678        if !expired_sessions.is_empty() {
1679            let mut sessions = self.active_sessions.write().await;
1680            for session_id in &expired_sessions {
1681                sessions.remove(session_id);
1682            }
1683        }
1684
1685        expired_sessions
1686    }
1687
1688    /// Get the total number of active sessions.
1689    pub async fn session_count(&self) -> usize {
1690        let sessions = self.active_sessions.read().await;
1691        sessions.len()
1692    }
1693
1694    /// Get all active session IDs.
1695    pub async fn get_active_sessions(&self) -> Vec<SessionId> {
1696        let sessions = self.active_sessions.read().await;
1697        sessions.keys().cloned().collect()
1698    }
1699
1700    /// Get all sessions for a specific consumer.
1701    pub async fn get_consumer_sessions(&self, consumer_id: &str) -> Vec<SessionId> {
1702        let sessions = self.active_sessions.read().await;
1703        sessions
1704            .iter()
1705            .filter(|(_, info)| info.consumer_id() == consumer_id)
1706            .map(|(session_id, _)| session_id.clone())
1707            .collect()
1708    }
1709}