queue_runtime/sessions.rs
1//! Session management for ordered message processing.
2//!
3//! This module provides a generic framework for session key generation that enables
4//! ordered message processing for any domain. Session keys group related messages
5//! to ensure they are processed in FIFO order.
6//!
7//! # Design Philosophy
8//!
9//! This module is intentionally **domain-agnostic**. It provides the infrastructure
10//! for session-based ordering without assuming any specific message structure or
11//! business domain (GitHub events, e-commerce orders, IoT telemetry, etc.).
12//!
13//! # Core Concepts
14//!
15//! - **SessionKeyGenerator**: Trait for extracting session keys from messages
16//! - **Session Keys**: Strings that group related messages for ordered processing
17//! - **Message Metadata**: Messages provide metadata via the `SessionKeyExtractor` trait
18//!
19//! # Usage Pattern
20//!
21//! 1. Implement `SessionKeyExtractor` for your message type
22//! 2. Implement `SessionKeyGenerator` for your domain-specific strategy
23//! 3. Use the generator to produce session IDs when sending messages to queues
24//!
25//! # Example
26//!
27//! ```rust
28//! use queue_runtime::sessions::{SessionKeyGenerator, SessionKeyExtractor};
29//! use queue_runtime::message::SessionId;
30//! use std::collections::HashMap;
31//!
32//! // Your domain message type
33//! struct OrderEvent {
34//! order_id: String,
35//! customer_id: String,
36//! }
37//!
38//! // Implement metadata extraction
39//! impl SessionKeyExtractor for OrderEvent {
40//! fn get_metadata(&self, key: &str) -> Option<String> {
41//! match key {
42//! "order_id" => Some(self.order_id.clone()),
43//! "customer_id" => Some(self.customer_id.clone()),
44//! _ => None,
45//! }
46//! }
47//! }
48//!
49//! // Implement your session strategy
50//! struct OrderSessionStrategy;
51//!
52//! impl SessionKeyGenerator for OrderSessionStrategy {
53//! fn generate_key(&self, extractor: &dyn SessionKeyExtractor) -> Option<SessionId> {
54//! extractor.get_metadata("order_id")
55//! .and_then(|id| SessionId::new(format!("order-{}", id)).ok())
56//! }
57//! }
58//! ```
59
60use crate::error::ValidationError;
61use crate::message::{SessionId, Timestamp};
62use crate::QueueError;
63use std::collections::HashMap;
64use std::sync::Arc;
65use std::time::Duration;
66use tokio::sync::RwLock;
67use tokio::time::Instant;
68
69#[cfg(test)]
70#[path = "sessions_tests.rs"]
71mod tests;
72
73// ============================================================================
74// Session Key Extractor Trait
75// ============================================================================
76
77/// Trait for extracting metadata from messages for session key generation.
78///
79/// This trait provides a completely generic interface for messages to expose
80/// metadata that can be used to generate session keys. It makes no assumptions
81/// about the message structure or domain.
82///
83/// # Design
84///
85/// The trait uses a key-value interface where messages expose named metadata
86/// fields. Session key generators query for the metadata they need.
87///
88/// # Example
89///
90/// ```rust
91/// use queue_runtime::sessions::SessionKeyExtractor;
92///
93/// struct MyMessage {
94/// user_id: String,
95/// resource_id: String,
96/// }
97///
98/// impl SessionKeyExtractor for MyMessage {
99/// fn get_metadata(&self, key: &str) -> Option<String> {
100/// match key {
101/// "user_id" => Some(self.user_id.clone()),
102/// "resource_id" => Some(self.resource_id.clone()),
103/// _ => None,
104/// }
105/// }
106///
107/// fn list_metadata_keys(&self) -> Vec<String> {
108/// vec!["user_id".to_string(), "resource_id".to_string()]
109/// }
110/// }
111/// ```
112pub trait SessionKeyExtractor {
113 /// Get a metadata value by key.
114 ///
115 /// Returns `None` if the key doesn't exist or has no value for this message.
116 ///
117 /// # Arguments
118 ///
119 /// * `key` - The metadata key to retrieve
120 ///
121 /// # Returns
122 ///
123 /// Optional string value for the requested key
124 fn get_metadata(&self, key: &str) -> Option<String>;
125
126 /// List all available metadata keys for this message.
127 ///
128 /// This is useful for debugging and introspection. Default implementation
129 /// returns an empty list.
130 ///
131 /// # Returns
132 ///
133 /// Vector of available metadata key names
134 fn list_metadata_keys(&self) -> Vec<String> {
135 Vec::new()
136 }
137
138 /// Get all metadata as a map (optional, for bulk operations).
139 ///
140 /// Default implementation iterates over `list_metadata_keys()` and calls
141 /// `get_metadata()` for each key.
142 ///
143 /// # Returns
144 ///
145 /// HashMap of all available metadata
146 fn get_all_metadata(&self) -> HashMap<String, String> {
147 self.list_metadata_keys()
148 .into_iter()
149 .filter_map(|key| self.get_metadata(&key).map(|value| (key, value)))
150 .collect()
151 }
152}
153
154// ============================================================================
155// Session Key Generator Trait
156// ============================================================================
157
158/// Strategy trait for generating session keys from messages.
159///
160/// Implementations define how messages are grouped for ordered processing.
161/// The generator extracts relevant metadata from messages and produces
162/// session keys that group related messages together.
163///
164/// # Design Principles
165///
166/// - **Domain-Agnostic**: Works with any message structure via `SessionKeyExtractor`
167/// - **Strategy Pattern**: Different strategies provide different ordering semantics
168/// - **Composable**: Strategies can be combined or chained
169/// - **Optional Ordering**: Returning `None` allows concurrent processing
170///
171/// # Common Patterns
172///
173/// - **Entity-based**: Group by entity ID (order-123, user-456)
174/// - **Hierarchical**: Group by parent/child relationships
175/// - **Temporal**: Group by time windows
176/// - **Custom**: Domain-specific grouping logic
177///
178/// # Example
179///
180/// ```rust
181/// use queue_runtime::sessions::{SessionKeyGenerator, SessionKeyExtractor};
182/// use queue_runtime::message::SessionId;
183///
184/// struct ResourceIdStrategy;
185///
186/// impl SessionKeyGenerator for ResourceIdStrategy {
187/// fn generate_key(&self, extractor: &dyn SessionKeyExtractor) -> Option<SessionId> {
188/// extractor.get_metadata("resource_id")
189/// .and_then(|id| SessionId::new(format!("resource-{}", id)).ok())
190/// }
191/// }
192/// ```
193pub trait SessionKeyGenerator: Send + Sync {
194 /// Generate a session key for the given message.
195 ///
196 /// Returns `None` if the message should not be session-ordered, allowing
197 /// it to be processed concurrently without ordering constraints.
198 ///
199 /// # Arguments
200 ///
201 /// * `extractor` - Message implementing SessionKeyExtractor trait
202 ///
203 /// # Returns
204 ///
205 /// Optional session ID for grouping related messages
206 fn generate_key(&self, extractor: &dyn SessionKeyExtractor) -> Option<SessionId>;
207}
208
209// ============================================================================
210// Composite Key Strategy
211// ============================================================================
212
213/// Generates session keys by composing multiple metadata fields.
214///
215/// This strategy builds session keys from a list of metadata fields in order,
216/// joining them with a separator. This is useful for creating hierarchical
217/// or compound session keys.
218///
219/// # Example
220///
221/// ```rust
222/// use queue_runtime::sessions::CompositeKeyStrategy;
223///
224/// // Create session keys like "tenant-123-resource-456"
225/// let strategy = CompositeKeyStrategy::new(vec![
226/// "tenant_id".to_string(),
227/// "resource_id".to_string(),
228/// ], "-");
229/// ```
230pub struct CompositeKeyStrategy {
231 fields: Vec<String>,
232 separator: String,
233}
234
235impl CompositeKeyStrategy {
236 /// Create a new composite key strategy.
237 ///
238 /// # Arguments
239 ///
240 /// * `fields` - Ordered list of metadata field names to compose
241 /// * `separator` - String to join field values with
242 ///
243 /// # Example
244 ///
245 /// ```rust
246 /// use queue_runtime::sessions::CompositeKeyStrategy;
247 ///
248 /// let strategy = CompositeKeyStrategy::new(
249 /// vec!["region".to_string(), "customer_id".to_string()],
250 /// "-"
251 /// );
252 /// ```
253 pub fn new(fields: Vec<String>, separator: &str) -> Self {
254 Self {
255 fields,
256 separator: separator.to_string(),
257 }
258 }
259}
260
261impl SessionKeyGenerator for CompositeKeyStrategy {
262 fn generate_key(&self, extractor: &dyn SessionKeyExtractor) -> Option<SessionId> {
263 // Return None if no fields specified
264 if self.fields.is_empty() {
265 return None;
266 }
267
268 // Collect all field values
269 let values: Vec<String> = self
270 .fields
271 .iter()
272 .filter_map(|field| extractor.get_metadata(field))
273 .collect();
274
275 // Return None if any required field is missing
276 if values.len() != self.fields.len() {
277 return None;
278 }
279
280 // Join values with separator
281 let key = values.join(&self.separator);
282
283 // Create session ID
284 SessionId::new(key).ok()
285 }
286}
287
288// ============================================================================
289// Single Field Strategy
290// ============================================================================
291
292/// Generates session keys from a single metadata field.
293///
294/// This is the simplest strategy: extract one field and use it as the session key.
295/// Optionally adds a prefix for namespacing.
296///
297/// # Example
298///
299/// ```rust
300/// use queue_runtime::sessions::SingleFieldStrategy;
301///
302/// // Create session keys from "user_id" like "user-12345"
303/// let strategy = SingleFieldStrategy::new("user_id", Some("user"));
304/// ```
305pub struct SingleFieldStrategy {
306 field_name: String,
307 prefix: Option<String>,
308}
309
310impl SingleFieldStrategy {
311 /// Create a new single field strategy.
312 ///
313 /// # Arguments
314 ///
315 /// * `field_name` - The metadata field to use for the session key
316 /// * `prefix` - Optional prefix to add before the field value
317 ///
318 /// # Example
319 ///
320 /// ```rust
321 /// use queue_runtime::sessions::SingleFieldStrategy;
322 ///
323 /// let strategy = SingleFieldStrategy::new("order_id", Some("order"));
324 /// // Produces keys like "order-123"
325 /// ```
326 pub fn new(field_name: &str, prefix: Option<&str>) -> Self {
327 Self {
328 field_name: field_name.to_string(),
329 prefix: prefix.map(|s| s.to_string()),
330 }
331 }
332}
333
334impl SessionKeyGenerator for SingleFieldStrategy {
335 fn generate_key(&self, extractor: &dyn SessionKeyExtractor) -> Option<SessionId> {
336 // Get the field value
337 let value = extractor.get_metadata(&self.field_name)?;
338
339 // Build key with optional prefix
340 let key = if let Some(ref prefix) = self.prefix {
341 format!("{}-{}", prefix, value)
342 } else {
343 value
344 };
345
346 // Create session ID
347 SessionId::new(key).ok()
348 }
349}
350
351// ============================================================================
352// No Ordering Strategy
353// ============================================================================
354
355/// Strategy that disables session-based ordering.
356///
357/// Always returns `None`, allowing concurrent message processing without
358/// ordering guarantees. Use for stateless operations that don't require
359/// message ordering.
360///
361/// # Example
362///
363/// ```rust
364/// use queue_runtime::sessions::NoOrderingStrategy;
365///
366/// let strategy = NoOrderingStrategy;
367/// // All messages can be processed concurrently
368/// ```
369pub struct NoOrderingStrategy;
370
371impl SessionKeyGenerator for NoOrderingStrategy {
372 fn generate_key(&self, _extractor: &dyn SessionKeyExtractor) -> Option<SessionId> {
373 None
374 }
375}
376
377// ============================================================================
378// Fallback Strategy
379// ============================================================================
380
381/// Strategy that tries multiple generators in order, using the first success.
382///
383/// This implements a fallback chain: try the primary strategy first, and if it
384/// returns `None`, try the next strategy, and so on. Useful for providing
385/// fine-grained ordering when possible, with coarser fallbacks.
386///
387/// # Example
388///
389/// ```rust
390/// use queue_runtime::sessions::{FallbackStrategy, SingleFieldStrategy, CompositeKeyStrategy};
391///
392/// // Try specific entity ID first, fall back to tenant-level ordering
393/// let primary = SingleFieldStrategy::new("entity_id", Some("entity"));
394/// let fallback = SingleFieldStrategy::new("tenant_id", Some("tenant"));
395///
396/// let strategy = FallbackStrategy::new(vec![
397/// Box::new(primary),
398/// Box::new(fallback),
399/// ]);
400/// ```
401pub struct FallbackStrategy {
402 strategies: Vec<Box<dyn SessionKeyGenerator>>,
403}
404
405impl FallbackStrategy {
406 /// Create a new fallback strategy with ordered generators.
407 ///
408 /// # Arguments
409 ///
410 /// * `strategies` - Ordered list of generators to try
411 ///
412 /// # Example
413 ///
414 /// ```rust
415 /// use queue_runtime::sessions::{FallbackStrategy, SingleFieldStrategy, NoOrderingStrategy};
416 ///
417 /// let strategy = FallbackStrategy::new(vec![
418 /// Box::new(SingleFieldStrategy::new("user_id", Some("user"))),
419 /// Box::new(NoOrderingStrategy), // Ultimate fallback: no ordering
420 /// ]);
421 /// ```
422 pub fn new(strategies: Vec<Box<dyn SessionKeyGenerator>>) -> Self {
423 Self { strategies }
424 }
425}
426
427impl SessionKeyGenerator for FallbackStrategy {
428 fn generate_key(&self, extractor: &dyn SessionKeyExtractor) -> Option<SessionId> {
429 // Try each strategy in order until one succeeds
430 for strategy in &self.strategies {
431 if let Some(session_id) = strategy.generate_key(extractor) {
432 return Some(session_id);
433 }
434 }
435
436 // All strategies failed
437 None
438 }
439}
440
441// ============================================================================
442// Session Lock Management
443// ============================================================================
444
445/// Represents a lock on a session for exclusive message processing.
446///
447/// A session lock ensures that only one consumer can process messages from
448/// a session at a time, maintaining FIFO ordering guarantees. Locks have
449/// an expiration time and can be renewed to extend processing time.
450///
451/// # Design
452///
453/// - **Expiration**: Locks automatically expire after a timeout period
454/// - **Renewal**: Locks can be renewed before expiration to extend processing
455/// - **Owner Tracking**: Each lock tracks which consumer owns it
456/// - **Timeout Handling**: Expired locks can be acquired by other consumers
457///
458/// # Example
459///
460/// ```rust
461/// use queue_runtime::sessions::SessionLock;
462/// use queue_runtime::message::SessionId;
463/// use std::time::Duration;
464///
465/// # tokio_test::block_on(async {
466/// let session_id = SessionId::new("user-123".to_string()).unwrap();
467/// let lock = SessionLock::new(session_id.clone(), "consumer-1".to_string(), Duration::from_secs(30));
468///
469/// assert!(!lock.is_expired());
470/// assert_eq!(lock.owner(), "consumer-1");
471/// # });
472/// ```
473#[derive(Debug, Clone)]
474pub struct SessionLock {
475 session_id: SessionId,
476 owner: String,
477 acquired_at: Instant,
478 expires_at: Instant,
479 lock_duration: Duration,
480}
481
482impl SessionLock {
483 /// Create a new session lock.
484 ///
485 /// # Arguments
486 ///
487 /// * `session_id` - The session being locked
488 /// * `owner` - Identifier of the consumer owning this lock
489 /// * `lock_duration` - How long the lock is valid before expiration
490 ///
491 /// # Returns
492 ///
493 /// A new session lock that expires after `lock_duration`
494 pub fn new(session_id: SessionId, owner: String, lock_duration: Duration) -> Self {
495 let now = Instant::now();
496 Self {
497 session_id,
498 owner,
499 acquired_at: now,
500 expires_at: now + lock_duration,
501 lock_duration,
502 }
503 }
504
505 /// Get the session ID this lock is for.
506 pub fn session_id(&self) -> &SessionId {
507 &self.session_id
508 }
509
510 /// Get the owner of this lock.
511 pub fn owner(&self) -> &str {
512 &self.owner
513 }
514
515 /// Get when this lock was acquired.
516 pub fn acquired_at(&self) -> Instant {
517 self.acquired_at
518 }
519
520 /// Get when this lock expires.
521 pub fn expires_at(&self) -> Instant {
522 self.expires_at
523 }
524
525 /// Get the configured lock duration.
526 pub fn lock_duration(&self) -> Duration {
527 self.lock_duration
528 }
529
530 /// Check if this lock has expired.
531 ///
532 /// # Returns
533 ///
534 /// `true` if the current time is past the expiration time
535 pub fn is_expired(&self) -> bool {
536 Instant::now() >= self.expires_at
537 }
538
539 /// Get the remaining time before this lock expires.
540 ///
541 /// # Returns
542 ///
543 /// Duration until expiration, or zero if already expired
544 pub fn time_remaining(&self) -> Duration {
545 let now = Instant::now();
546 if now >= self.expires_at {
547 Duration::ZERO
548 } else {
549 self.expires_at - now
550 }
551 }
552
553 /// Renew this lock, extending its expiration time.
554 ///
555 /// # Arguments
556 ///
557 /// * `extension` - How long to extend the lock by
558 ///
559 /// # Returns
560 ///
561 /// A new lock with updated expiration time
562 pub fn renew(&self, extension: Duration) -> Self {
563 Self {
564 session_id: self.session_id.clone(),
565 owner: self.owner.clone(),
566 acquired_at: self.acquired_at,
567 expires_at: Instant::now() + extension,
568 lock_duration: extension,
569 }
570 }
571}
572
573/// Manages session locks for concurrent message processing.
574///
575/// The lock manager coordinates exclusive access to sessions, ensuring that
576/// only one consumer processes messages from a session at a time. It handles
577/// lock acquisition, renewal, release, and automatic expiration cleanup.
578///
579/// # Thread Safety
580///
581/// This type is thread-safe and can be shared across async tasks using `Arc`.
582///
583/// # Example
584///
585/// ```rust
586/// use queue_runtime::sessions::SessionLockManager;
587/// use queue_runtime::message::SessionId;
588/// use std::time::Duration;
589///
590/// # tokio_test::block_on(async {
591/// let manager = SessionLockManager::new(Duration::from_secs(30));
592/// let session_id = SessionId::new("order-456".to_string()).unwrap();
593///
594/// // Acquire lock
595/// let lock = manager.acquire_lock(session_id.clone(), "consumer-1".to_string()).await?;
596/// assert_eq!(lock.owner(), "consumer-1");
597///
598/// // Try to acquire same session with different consumer - should fail
599/// let result = manager.try_acquire_lock(session_id.clone(), "consumer-2".to_string()).await;
600/// assert!(result.is_err());
601///
602/// // Release lock
603/// manager.release_lock(&session_id, "consumer-1").await?;
604/// # Ok::<(), queue_runtime::QueueError>(())
605/// # });
606/// ```
607pub struct SessionLockManager {
608 locks: Arc<RwLock<HashMap<SessionId, SessionLock>>>,
609 default_lock_duration: Duration,
610}
611
612impl SessionLockManager {
613 /// Create a new session lock manager.
614 ///
615 /// # Arguments
616 ///
617 /// * `default_lock_duration` - Default duration for session locks
618 ///
619 /// # Example
620 ///
621 /// ```rust
622 /// use queue_runtime::sessions::SessionLockManager;
623 /// use std::time::Duration;
624 ///
625 /// let manager = SessionLockManager::new(Duration::from_secs(60));
626 /// ```
627 pub fn new(default_lock_duration: Duration) -> Self {
628 Self {
629 locks: Arc::new(RwLock::new(HashMap::new())),
630 default_lock_duration,
631 }
632 }
633
634 /// Try to acquire a lock on a session (non-blocking).
635 ///
636 /// Returns immediately with an error if the session is already locked
637 /// by another consumer.
638 ///
639 /// # Arguments
640 ///
641 /// * `session_id` - The session to lock
642 /// * `owner` - Identifier of the consumer requesting the lock
643 ///
644 /// # Returns
645 ///
646 /// The acquired lock if successful, or an error if the session is locked
647 ///
648 /// # Errors
649 ///
650 /// Returns `QueueError::SessionLocked` if the session is already locked
651 /// by another consumer and the lock has not expired.
652 pub async fn try_acquire_lock(
653 &self,
654 session_id: SessionId,
655 owner: String,
656 ) -> Result<SessionLock, QueueError> {
657 let mut locks = self.locks.write().await;
658
659 // Check if session is already locked
660 if let Some(existing_lock) = locks.get(&session_id) {
661 if !existing_lock.is_expired() {
662 // Lock is still valid and owned by someone else
663 if existing_lock.owner() != owner {
664 return Err(QueueError::SessionLocked {
665 session_id: session_id.to_string(),
666 locked_until: Timestamp::now(),
667 });
668 }
669 // Same owner - return existing lock
670 return Ok(existing_lock.clone());
671 }
672 // Lock expired - remove it and acquire new lock below
673 }
674
675 // Acquire new lock
676 let lock = SessionLock::new(session_id.clone(), owner, self.default_lock_duration);
677 locks.insert(session_id, lock.clone());
678
679 Ok(lock)
680 }
681
682 /// Acquire a lock on a session (blocking with timeout).
683 ///
684 /// Waits for the lock to become available if it's currently held by
685 /// another consumer, up to the specified timeout.
686 ///
687 /// # Arguments
688 ///
689 /// * `session_id` - The session to lock
690 /// * `owner` - Identifier of the consumer requesting the lock
691 ///
692 /// # Returns
693 ///
694 /// The acquired lock if successful within the timeout period
695 ///
696 /// # Errors
697 ///
698 /// Returns `QueueError::SessionLocked` if unable to acquire the lock
699 /// within the timeout period.
700 pub async fn acquire_lock(
701 &self,
702 session_id: SessionId,
703 owner: String,
704 ) -> Result<SessionLock, QueueError> {
705 // For now, just try once - future enhancement could add retry logic
706 self.try_acquire_lock(session_id, owner).await
707 }
708
709 /// Renew an existing session lock.
710 ///
711 /// Extends the lock's expiration time, allowing the consumer to continue
712 /// processing messages from the session.
713 ///
714 /// # Arguments
715 ///
716 /// * `session_id` - The session whose lock to renew
717 /// * `owner` - Identifier of the consumer owning the lock
718 /// * `extension` - How long to extend the lock by (if None, uses default duration)
719 ///
720 /// # Returns
721 ///
722 /// The renewed lock with updated expiration time
723 ///
724 /// # Errors
725 ///
726 /// Returns `QueueError::SessionNotFound` if no lock exists for the session.
727 /// Returns `QueueError::SessionLocked` if the lock is owned by a different consumer.
728 pub async fn renew_lock(
729 &self,
730 session_id: &SessionId,
731 owner: &str,
732 extension: Option<Duration>,
733 ) -> Result<SessionLock, QueueError> {
734 let mut locks = self.locks.write().await;
735
736 let existing_lock = locks
737 .get(session_id)
738 .ok_or_else(|| QueueError::SessionNotFound {
739 session_id: session_id.to_string(),
740 })?;
741
742 // Verify ownership
743 if existing_lock.owner() != owner {
744 return Err(QueueError::SessionLocked {
745 session_id: session_id.to_string(),
746 locked_until: Timestamp::now(),
747 });
748 }
749
750 // Renew the lock
751 let renewed_lock = existing_lock.renew(extension.unwrap_or(self.default_lock_duration));
752 locks.insert(session_id.clone(), renewed_lock.clone());
753
754 Ok(renewed_lock)
755 }
756
757 /// Release a session lock.
758 ///
759 /// Removes the lock, allowing other consumers to acquire it.
760 ///
761 /// # Arguments
762 ///
763 /// * `session_id` - The session whose lock to release
764 /// * `owner` - Identifier of the consumer releasing the lock
765 ///
766 /// # Returns
767 ///
768 /// `Ok(())` if the lock was successfully released
769 ///
770 /// # Errors
771 ///
772 /// Returns `QueueError::SessionNotFound` if no lock exists for the session.
773 /// Returns `QueueError::SessionLocked` if the lock is owned by a different consumer.
774 pub async fn release_lock(
775 &self,
776 session_id: &SessionId,
777 owner: &str,
778 ) -> Result<(), QueueError> {
779 let mut locks = self.locks.write().await;
780
781 let existing_lock = locks
782 .get(session_id)
783 .ok_or_else(|| QueueError::SessionNotFound {
784 session_id: session_id.to_string(),
785 })?;
786
787 // Verify ownership
788 if existing_lock.owner() != owner {
789 return Err(QueueError::SessionLocked {
790 session_id: session_id.to_string(),
791 locked_until: Timestamp::now(),
792 });
793 }
794
795 // Remove the lock
796 locks.remove(session_id);
797
798 Ok(())
799 }
800
801 /// Check if a session is currently locked.
802 ///
803 /// # Arguments
804 ///
805 /// * `session_id` - The session to check
806 ///
807 /// # Returns
808 ///
809 /// `true` if the session has a valid (non-expired) lock
810 pub async fn is_locked(&self, session_id: &SessionId) -> bool {
811 let locks = self.locks.read().await;
812 locks
813 .get(session_id)
814 .map(|lock| !lock.is_expired())
815 .unwrap_or(false)
816 }
817
818 /// Get information about a session lock.
819 ///
820 /// # Arguments
821 ///
822 /// * `session_id` - The session to query
823 ///
824 /// # Returns
825 ///
826 /// The lock information if it exists and is not expired
827 pub async fn get_lock(&self, session_id: &SessionId) -> Option<SessionLock> {
828 let locks = self.locks.read().await;
829 locks
830 .get(session_id)
831 .filter(|lock| !lock.is_expired())
832 .cloned()
833 }
834
835 /// Clean up expired locks.
836 ///
837 /// Removes all locks that have passed their expiration time.
838 ///
839 /// # Returns
840 ///
841 /// The number of expired locks that were removed
842 pub async fn cleanup_expired_locks(&self) -> usize {
843 let mut locks = self.locks.write().await;
844
845 let expired: Vec<SessionId> = locks
846 .iter()
847 .filter(|(_, lock)| lock.is_expired())
848 .map(|(id, _)| id.clone())
849 .collect();
850
851 let count = expired.len();
852 for session_id in expired {
853 locks.remove(&session_id);
854 }
855
856 count
857 }
858
859 /// Get the number of currently held locks (including expired).
860 ///
861 /// # Returns
862 ///
863 /// Total number of locks in the manager
864 pub async fn lock_count(&self) -> usize {
865 let locks = self.locks.read().await;
866 locks.len()
867 }
868
869 /// Get the number of active (non-expired) locks.
870 ///
871 /// # Returns
872 ///
873 /// Number of locks that have not expired
874 pub async fn active_lock_count(&self) -> usize {
875 let locks = self.locks.read().await;
876 locks.values().filter(|lock| !lock.is_expired()).count()
877 }
878}
879
880// ============================================================================
881// Session Affinity Tracking
882// ============================================================================
883
884/// Mapping of a session to its assigned consumer.
885///
886/// Session affinity ensures that all messages for a given session are processed
887/// by the same consumer, maintaining ordering and state consistency.
888///
889/// # Examples
890///
891/// ```
892/// use queue_runtime::sessions::SessionAffinity;
893/// use queue_runtime::message::SessionId;
894/// use std::time::{SystemTime, Duration};
895///
896/// # tokio_test::block_on(async {
897/// let session_id = SessionId::new("order-789".to_string()).unwrap();
898/// let affinity = SessionAffinity::new(
899/// session_id.clone(),
900/// "worker-3".to_string(),
901/// Duration::from_secs(300)
902/// );
903///
904/// assert_eq!(affinity.session_id(), &session_id);
905/// assert_eq!(affinity.consumer_id(), "worker-3");
906/// assert!(!affinity.is_expired());
907/// # });
908/// ```
909#[derive(Debug, Clone)]
910pub struct SessionAffinity {
911 session_id: SessionId,
912 consumer_id: String,
913 assigned_at: Instant,
914 expires_at: Instant,
915 affinity_duration: Duration,
916 last_activity: Instant,
917}
918
919impl SessionAffinity {
920 /// Create a new session affinity mapping.
921 ///
922 /// # Arguments
923 ///
924 /// * `session_id` - The session being tracked
925 /// * `consumer_id` - Identifier of the consumer assigned to this session
926 /// * `affinity_duration` - How long the affinity is valid
927 ///
928 /// # Returns
929 ///
930 /// A new `SessionAffinity` instance
931 pub fn new(session_id: SessionId, consumer_id: String, affinity_duration: Duration) -> Self {
932 let now = Instant::now();
933 Self {
934 session_id,
935 consumer_id,
936 assigned_at: now,
937 expires_at: now + affinity_duration,
938 affinity_duration,
939 last_activity: now,
940 }
941 }
942
943 /// Get the session ID.
944 pub fn session_id(&self) -> &SessionId {
945 &self.session_id
946 }
947
948 /// Get the consumer ID.
949 pub fn consumer_id(&self) -> &str {
950 &self.consumer_id
951 }
952
953 /// Get the affinity duration.
954 pub fn affinity_duration(&self) -> Duration {
955 self.affinity_duration
956 }
957
958 /// Get when the affinity was assigned.
959 pub fn assigned_at(&self) -> Instant {
960 self.assigned_at
961 }
962
963 /// Check if the affinity has expired.
964 ///
965 /// # Returns
966 ///
967 /// `true` if the affinity has expired, `false` otherwise
968 pub fn is_expired(&self) -> bool {
969 Instant::now() >= self.expires_at
970 }
971
972 /// Get the remaining time before expiration.
973 ///
974 /// # Returns
975 ///
976 /// Duration remaining, or zero if expired
977 pub fn time_remaining(&self) -> Duration {
978 let now = Instant::now();
979 if now >= self.expires_at {
980 Duration::ZERO
981 } else {
982 self.expires_at - now
983 }
984 }
985
986 /// Update the last activity time.
987 ///
988 /// This is called when a message is processed for the session,
989 /// keeping the affinity fresh.
990 pub fn touch(&mut self) {
991 self.last_activity = Instant::now();
992 }
993
994 /// Get the time since last activity.
995 pub fn idle_time(&self) -> Duration {
996 Instant::now().duration_since(self.last_activity)
997 }
998
999 /// Extend the affinity expiration.
1000 ///
1001 /// # Arguments
1002 ///
1003 /// * `additional_duration` - Additional time to add to expiration
1004 ///
1005 /// # Returns
1006 ///
1007 /// A new `SessionAffinity` with extended expiration
1008 pub fn extend(&self, additional_duration: Duration) -> Self {
1009 let mut extended = self.clone();
1010 extended.expires_at = Instant::now() + additional_duration;
1011 extended
1012 }
1013}
1014
1015/// Tracks session-to-consumer affinity mappings for ordered processing.
1016///
1017/// The affinity tracker ensures that all messages for a given session are
1018/// routed to the same consumer, maintaining message ordering and processing
1019/// consistency within sessions.
1020///
1021/// # Thread Safety
1022///
1023/// This type uses `Arc<RwLock<>>` internally and can be safely shared across
1024/// threads and tasks.
1025///
1026/// # Examples
1027///
1028/// ```
1029/// use queue_runtime::sessions::SessionAffinityTracker;
1030/// use queue_runtime::message::SessionId;
1031/// use std::time::Duration;
1032///
1033/// # tokio_test::block_on(async {
1034/// let tracker = SessionAffinityTracker::new(Duration::from_secs(600));
1035/// let session_id = SessionId::new("session-123".to_string()).unwrap();
1036///
1037/// // Assign session to consumer
1038/// let affinity = tracker.assign_session(session_id.clone(), "worker-1".to_string()).await.unwrap();
1039/// assert_eq!(affinity.consumer_id(), "worker-1");
1040///
1041/// // Query affinity
1042/// let consumer = tracker.get_consumer(&session_id).await;
1043/// assert_eq!(consumer, Some("worker-1".to_string()));
1044/// # });
1045/// ```
1046#[derive(Clone)]
1047pub struct SessionAffinityTracker {
1048 affinities: Arc<RwLock<HashMap<SessionId, SessionAffinity>>>,
1049 default_affinity_duration: Duration,
1050}
1051
1052impl SessionAffinityTracker {
1053 /// Create a new session affinity tracker.
1054 ///
1055 /// # Arguments
1056 ///
1057 /// * `default_affinity_duration` - Default duration for affinity mappings
1058 ///
1059 /// # Returns
1060 ///
1061 /// A new `SessionAffinityTracker` instance
1062 pub fn new(default_affinity_duration: Duration) -> Self {
1063 Self {
1064 affinities: Arc::new(RwLock::new(HashMap::new())),
1065 default_affinity_duration,
1066 }
1067 }
1068
1069 /// Assign a session to a consumer.
1070 ///
1071 /// If the session is already assigned and not expired, returns an error.
1072 /// If the session affinity has expired, reassigns to the new consumer.
1073 ///
1074 /// # Arguments
1075 ///
1076 /// * `session_id` - The session to assign
1077 /// * `consumer_id` - The consumer to assign the session to
1078 ///
1079 /// # Returns
1080 ///
1081 /// The created affinity mapping on success, or an error if the session
1082 /// is already assigned to a different consumer.
1083 ///
1084 /// # Errors
1085 ///
1086 /// Returns `QueueError::SessionLocked` if the session is already assigned
1087 /// to a different consumer and the affinity has not expired.
1088 pub async fn assign_session(
1089 &self,
1090 session_id: SessionId,
1091 consumer_id: String,
1092 ) -> Result<SessionAffinity, QueueError> {
1093 let mut affinities = self.affinities.write().await;
1094
1095 // Check if session is already assigned
1096 if let Some(existing) = affinities.get(&session_id) {
1097 if !existing.is_expired() {
1098 if existing.consumer_id() != consumer_id {
1099 // Session assigned to different consumer
1100 return Err(QueueError::SessionLocked {
1101 session_id: session_id.to_string(),
1102 locked_until: Timestamp::now(), // Approximate
1103 });
1104 }
1105 // Same consumer - return existing affinity
1106 return Ok(existing.clone());
1107 }
1108 // Expired - will reassign below
1109 }
1110
1111 // Create new affinity
1112 let affinity = SessionAffinity::new(
1113 session_id.clone(),
1114 consumer_id,
1115 self.default_affinity_duration,
1116 );
1117
1118 affinities.insert(session_id, affinity.clone());
1119 Ok(affinity)
1120 }
1121
1122 /// Get the consumer assigned to a session.
1123 ///
1124 /// # Arguments
1125 ///
1126 /// * `session_id` - The session to query
1127 ///
1128 /// # Returns
1129 ///
1130 /// The consumer ID if the session has an active affinity, `None` otherwise
1131 pub async fn get_consumer(&self, session_id: &SessionId) -> Option<String> {
1132 let affinities = self.affinities.read().await;
1133 affinities
1134 .get(session_id)
1135 .filter(|affinity| !affinity.is_expired())
1136 .map(|affinity| affinity.consumer_id().to_string())
1137 }
1138
1139 /// Get the full affinity information for a session.
1140 ///
1141 /// # Arguments
1142 ///
1143 /// * `session_id` - The session to query
1144 ///
1145 /// # Returns
1146 ///
1147 /// The affinity information if the session has an active affinity
1148 pub async fn get_affinity(&self, session_id: &SessionId) -> Option<SessionAffinity> {
1149 let affinities = self.affinities.read().await;
1150 affinities
1151 .get(session_id)
1152 .filter(|affinity| !affinity.is_expired())
1153 .cloned()
1154 }
1155
1156 /// Check if a session has an active affinity.
1157 ///
1158 /// # Arguments
1159 ///
1160 /// * `session_id` - The session to check
1161 ///
1162 /// # Returns
1163 ///
1164 /// `true` if the session has an active affinity
1165 pub async fn has_affinity(&self, session_id: &SessionId) -> bool {
1166 self.get_consumer(session_id).await.is_some()
1167 }
1168
1169 /// Update the last activity time for a session.
1170 ///
1171 /// This should be called when a message is processed for the session.
1172 ///
1173 /// # Arguments
1174 ///
1175 /// * `session_id` - The session to update
1176 ///
1177 /// # Returns
1178 ///
1179 /// `Ok(())` if successful, error if session not found or expired
1180 pub async fn touch_session(&self, session_id: &SessionId) -> Result<(), QueueError> {
1181 let mut affinities = self.affinities.write().await;
1182
1183 if let Some(affinity) = affinities.get_mut(session_id) {
1184 if !affinity.is_expired() {
1185 affinity.touch();
1186 return Ok(());
1187 }
1188 }
1189
1190 Err(QueueError::SessionNotFound {
1191 session_id: session_id.to_string(),
1192 })
1193 }
1194
1195 /// Release a session affinity.
1196 ///
1197 /// # Arguments
1198 ///
1199 /// * `session_id` - The session to release
1200 /// * `consumer_id` - The consumer releasing the session (for validation)
1201 ///
1202 /// # Returns
1203 ///
1204 /// `Ok(())` if successful
1205 ///
1206 /// # Errors
1207 ///
1208 /// Returns error if the consumer doesn't own the session
1209 pub async fn release_session(
1210 &self,
1211 session_id: &SessionId,
1212 consumer_id: &str,
1213 ) -> Result<(), QueueError> {
1214 let mut affinities = self.affinities.write().await;
1215
1216 if let Some(affinity) = affinities.get(session_id) {
1217 if affinity.consumer_id() != consumer_id {
1218 return Err(QueueError::ValidationError(
1219 ValidationError::InvalidFormat {
1220 field: "consumer_id".to_string(),
1221 message: format!(
1222 "Session owned by {}, cannot release from {}",
1223 affinity.consumer_id(),
1224 consumer_id
1225 ),
1226 },
1227 ));
1228 }
1229 }
1230
1231 affinities.remove(session_id);
1232 Ok(())
1233 }
1234
1235 /// Extend the affinity duration for a session.
1236 ///
1237 /// # Arguments
1238 ///
1239 /// * `session_id` - The session to extend
1240 /// * `consumer_id` - The consumer requesting the extension (for validation)
1241 /// * `additional_duration` - Additional time to add
1242 ///
1243 /// # Returns
1244 ///
1245 /// The updated affinity on success
1246 ///
1247 /// # Errors
1248 ///
1249 /// Returns error if consumer doesn't own the session or session not found
1250 pub async fn extend_affinity(
1251 &self,
1252 session_id: &SessionId,
1253 consumer_id: &str,
1254 additional_duration: Duration,
1255 ) -> Result<SessionAffinity, QueueError> {
1256 let mut affinities = self.affinities.write().await;
1257
1258 if let Some(affinity) = affinities.get(session_id) {
1259 if affinity.consumer_id() != consumer_id {
1260 return Err(QueueError::ValidationError(
1261 ValidationError::InvalidFormat {
1262 field: "consumer_id".to_string(),
1263 message: format!(
1264 "Session owned by {}, cannot extend from {}",
1265 affinity.consumer_id(),
1266 consumer_id
1267 ),
1268 },
1269 ));
1270 }
1271
1272 let extended = affinity.extend(additional_duration);
1273 affinities.insert(session_id.clone(), extended.clone());
1274 return Ok(extended);
1275 }
1276
1277 Err(QueueError::SessionNotFound {
1278 session_id: session_id.to_string(),
1279 })
1280 }
1281
1282 /// Get all sessions assigned to a consumer.
1283 ///
1284 /// # Arguments
1285 ///
1286 /// * `consumer_id` - The consumer to query
1287 ///
1288 /// # Returns
1289 ///
1290 /// List of session IDs assigned to the consumer
1291 pub async fn get_consumer_sessions(&self, consumer_id: &str) -> Vec<SessionId> {
1292 let affinities = self.affinities.read().await;
1293 affinities
1294 .iter()
1295 .filter(|(_, affinity)| !affinity.is_expired() && affinity.consumer_id() == consumer_id)
1296 .map(|(session_id, _)| session_id.clone())
1297 .collect()
1298 }
1299
1300 /// Clean up expired affinities.
1301 ///
1302 /// # Returns
1303 ///
1304 /// Number of affinities removed
1305 pub async fn cleanup_expired(&self) -> usize {
1306 let mut affinities = self.affinities.write().await;
1307
1308 let expired: Vec<SessionId> = affinities
1309 .iter()
1310 .filter(|(_, affinity)| affinity.is_expired())
1311 .map(|(session_id, _)| session_id.clone())
1312 .collect();
1313
1314 let count = expired.len();
1315 for session_id in expired {
1316 affinities.remove(&session_id);
1317 }
1318
1319 count
1320 }
1321
1322 /// Get the total number of affinity mappings (including expired).
1323 pub async fn affinity_count(&self) -> usize {
1324 let affinities = self.affinities.read().await;
1325 affinities.len()
1326 }
1327
1328 /// Get the number of active (non-expired) affinities.
1329 pub async fn active_affinity_count(&self) -> usize {
1330 let affinities = self.affinities.read().await;
1331 affinities
1332 .values()
1333 .filter(|affinity| !affinity.is_expired())
1334 .count()
1335 }
1336}
1337
1338// ============================================================================
1339// Session Lifecycle Management
1340// ============================================================================
1341
1342/// Information about an active session's lifecycle state.
1343///
1344/// Tracks activity metrics used for determining when to close sessions
1345/// based on duration limits, message counts, or inactivity timeouts.
1346///
1347/// # Examples
1348///
1349/// ```
1350/// use queue_runtime::{SessionInfo, SessionId};
1351/// use std::time::Duration;
1352///
1353/// let session_id = SessionId::new("order-123".to_string()).unwrap();
1354/// let mut info = SessionInfo::new(session_id.clone(), "worker-1".to_string());
1355///
1356/// // Record message processing
1357/// info.increment_message_count();
1358/// info.increment_message_count();
1359///
1360/// // Check duration
1361/// assert!(info.duration() < Duration::from_secs(1));
1362///
1363/// // Check message count
1364/// assert_eq!(info.message_count(), 2);
1365/// ```
1366#[derive(Debug, Clone)]
1367pub struct SessionInfo {
1368 session_id: SessionId,
1369 consumer_id: String,
1370 started_at: Instant,
1371 last_activity: Instant,
1372 message_count: u32,
1373}
1374
1375impl SessionInfo {
1376 /// Create a new session info tracker.
1377 pub fn new(session_id: SessionId, consumer_id: String) -> Self {
1378 let now = Instant::now();
1379 Self {
1380 session_id,
1381 consumer_id,
1382 started_at: now,
1383 last_activity: now,
1384 message_count: 0,
1385 }
1386 }
1387
1388 /// Get the session ID.
1389 pub fn session_id(&self) -> &SessionId {
1390 &self.session_id
1391 }
1392
1393 /// Get the consumer ID.
1394 pub fn consumer_id(&self) -> &str {
1395 &self.consumer_id
1396 }
1397
1398 /// Get the time when this session was started.
1399 pub fn started_at(&self) -> Instant {
1400 self.started_at
1401 }
1402
1403 /// Get the time of last activity in this session.
1404 pub fn last_activity(&self) -> Instant {
1405 self.last_activity
1406 }
1407
1408 /// Get the number of messages processed in this session.
1409 pub fn message_count(&self) -> u32 {
1410 self.message_count
1411 }
1412
1413 /// Calculate how long this session has been active.
1414 pub fn duration(&self) -> Duration {
1415 Instant::now().saturating_duration_since(self.started_at)
1416 }
1417
1418 /// Calculate how long since last activity.
1419 pub fn idle_time(&self) -> Duration {
1420 Instant::now().saturating_duration_since(self.last_activity)
1421 }
1422
1423 /// Record message processing activity.
1424 pub fn increment_message_count(&mut self) {
1425 self.message_count += 1;
1426 self.last_activity = Instant::now();
1427 }
1428
1429 /// Update last activity timestamp without incrementing message count.
1430 pub fn touch(&mut self) {
1431 self.last_activity = Instant::now();
1432 }
1433}
1434
1435/// Configuration for session lifecycle management.
1436///
1437/// Defines limits and timeouts that determine when sessions should be
1438/// automatically closed to prevent resource exhaustion and ensure fair
1439/// processing distribution.
1440///
1441/// # Examples
1442///
1443/// ```
1444/// use queue_runtime::SessionLifecycleConfig;
1445/// use std::time::Duration;
1446///
1447/// let config = SessionLifecycleConfig {
1448/// max_session_duration: Duration::from_secs(2 * 60 * 60), // 2 hours
1449/// max_messages_per_session: 1000,
1450/// session_timeout: Duration::from_secs(30 * 60), // 30 minutes
1451/// };
1452///
1453/// // Check if defaults are reasonable
1454/// assert_eq!(config.max_session_duration, Duration::from_secs(7200));
1455/// ```
1456#[derive(Debug, Clone)]
1457pub struct SessionLifecycleConfig {
1458 /// Maximum duration a session can be active before forced closure.
1459 pub max_session_duration: Duration,
1460
1461 /// Maximum number of messages processed per session before forced closure.
1462 pub max_messages_per_session: u32,
1463
1464 /// Maximum idle time before session is considered timed out.
1465 pub session_timeout: Duration,
1466}
1467
1468impl Default for SessionLifecycleConfig {
1469 fn default() -> Self {
1470 Self {
1471 max_session_duration: Duration::from_secs(2 * 60 * 60), // 2 hours
1472 max_messages_per_session: 1000,
1473 session_timeout: Duration::from_secs(30 * 60), // 30 minutes
1474 }
1475 }
1476}
1477
1478/// Manages session lifecycles with automatic cleanup and recovery.
1479///
1480/// Tracks active sessions and enforces limits on duration, message count,
1481/// and inactivity. Integrates with lock and affinity management to ensure
1482/// proper resource cleanup when sessions are forcibly closed.
1483///
1484/// # Thread Safety
1485///
1486/// All operations are async and use `Arc<RwLock<>>` for thread-safe access.
1487///
1488/// # Examples
1489///
1490/// ```
1491/// use queue_runtime::{SessionLifecycleManager, SessionId, SessionLifecycleConfig};
1492/// use std::time::Duration;
1493///
1494/// # #[tokio::main]
1495/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1496/// let config = SessionLifecycleConfig::default();
1497/// let manager = SessionLifecycleManager::new(config);
1498///
1499/// let session_id = SessionId::new("order-456".to_string())?;
1500///
1501/// // Start tracking a session
1502/// manager.start_session(session_id.clone(), "worker-1".to_string()).await?;
1503///
1504/// // Record activity
1505/// manager.record_message(&session_id).await?;
1506///
1507/// // Check if session should be closed
1508/// let should_close = manager.should_close_session(&session_id).await;
1509/// # Ok(())
1510/// # }
1511/// ```
1512#[derive(Debug, Clone)]
1513pub struct SessionLifecycleManager {
1514 active_sessions: Arc<RwLock<HashMap<SessionId, SessionInfo>>>,
1515 config: SessionLifecycleConfig,
1516}
1517
1518impl SessionLifecycleManager {
1519 /// Create a new session lifecycle manager with the given configuration.
1520 pub fn new(config: SessionLifecycleConfig) -> Self {
1521 Self {
1522 active_sessions: Arc::new(RwLock::new(HashMap::new())),
1523 config,
1524 }
1525 }
1526
1527 /// Start tracking a new session.
1528 ///
1529 /// # Errors
1530 ///
1531 /// Returns `QueueError::ValidationError` if session is already being tracked.
1532 pub async fn start_session(
1533 &self,
1534 session_id: SessionId,
1535 consumer_id: String,
1536 ) -> Result<(), QueueError> {
1537 let mut sessions = self.active_sessions.write().await;
1538
1539 if sessions.contains_key(&session_id) {
1540 return Err(QueueError::ValidationError(
1541 ValidationError::InvalidFormat {
1542 field: "session_id".to_string(),
1543 message: format!("Session {} is already active", session_id),
1544 },
1545 ));
1546 }
1547
1548 sessions.insert(
1549 session_id.clone(),
1550 SessionInfo::new(session_id, consumer_id),
1551 );
1552 Ok(())
1553 }
1554
1555 /// Stop tracking a session.
1556 ///
1557 /// # Errors
1558 ///
1559 /// Returns `QueueError::SessionNotFound` if session is not being tracked.
1560 pub async fn stop_session(&self, session_id: &SessionId) -> Result<(), QueueError> {
1561 let mut sessions = self.active_sessions.write().await;
1562
1563 if sessions.remove(session_id).is_none() {
1564 return Err(QueueError::SessionNotFound {
1565 session_id: session_id.to_string(),
1566 });
1567 }
1568
1569 Ok(())
1570 }
1571
1572 /// Record message processing activity for a session.
1573 ///
1574 /// Increments message count and updates last activity timestamp.
1575 ///
1576 /// # Errors
1577 ///
1578 /// Returns `QueueError::SessionNotFound` if session is not being tracked.
1579 pub async fn record_message(&self, session_id: &SessionId) -> Result<(), QueueError> {
1580 let mut sessions = self.active_sessions.write().await;
1581
1582 let session_info =
1583 sessions
1584 .get_mut(session_id)
1585 .ok_or_else(|| QueueError::SessionNotFound {
1586 session_id: session_id.to_string(),
1587 })?;
1588
1589 session_info.increment_message_count();
1590 Ok(())
1591 }
1592
1593 /// Update last activity timestamp without incrementing message count.
1594 ///
1595 /// # Errors
1596 ///
1597 /// Returns `QueueError::SessionNotFound` if session is not being tracked.
1598 pub async fn touch_session(&self, session_id: &SessionId) -> Result<(), QueueError> {
1599 let mut sessions = self.active_sessions.write().await;
1600
1601 let session_info =
1602 sessions
1603 .get_mut(session_id)
1604 .ok_or_else(|| QueueError::SessionNotFound {
1605 session_id: session_id.to_string(),
1606 })?;
1607
1608 session_info.touch();
1609 Ok(())
1610 }
1611
1612 /// Get information about a session.
1613 ///
1614 /// Returns `None` if session is not being tracked.
1615 pub async fn get_session_info(&self, session_id: &SessionId) -> Option<SessionInfo> {
1616 let sessions = self.active_sessions.read().await;
1617 sessions.get(session_id).cloned()
1618 }
1619
1620 /// Check if a session should be closed based on configured limits.
1621 ///
1622 /// A session should be closed if:
1623 /// - It has exceeded the maximum duration
1624 /// - It has processed more than the maximum message count
1625 /// - It has been idle longer than the timeout
1626 ///
1627 /// Returns `false` if session is not being tracked.
1628 pub async fn should_close_session(&self, session_id: &SessionId) -> bool {
1629 let sessions = self.active_sessions.read().await;
1630
1631 if let Some(session_info) = sessions.get(session_id) {
1632 // Check duration limit
1633 if session_info.duration() > self.config.max_session_duration {
1634 return true;
1635 }
1636
1637 // Check message count limit
1638 if session_info.message_count > self.config.max_messages_per_session {
1639 return true;
1640 }
1641
1642 // Check timeout
1643 if session_info.idle_time() > self.config.session_timeout {
1644 return true;
1645 }
1646
1647 false
1648 } else {
1649 false
1650 }
1651 }
1652
1653 /// Get all sessions that should be closed based on configured limits.
1654 ///
1655 /// Returns a list of session IDs that have exceeded limits.
1656 pub async fn get_sessions_to_close(&self) -> Vec<SessionId> {
1657 let sessions = self.active_sessions.read().await;
1658
1659 sessions
1660 .iter()
1661 .filter(|(_session_id, session_info)| {
1662 session_info.duration() > self.config.max_session_duration
1663 || session_info.message_count > self.config.max_messages_per_session
1664 || session_info.idle_time() > self.config.session_timeout
1665 })
1666 .map(|(session_id, _)| session_id.clone())
1667 .collect()
1668 }
1669
1670 /// Clean up sessions that have exceeded limits.
1671 ///
1672 /// # Returns
1673 ///
1674 /// Vector of session IDs that were cleaned up
1675 pub async fn cleanup_expired_sessions(&self) -> Vec<SessionId> {
1676 let expired_sessions = self.get_sessions_to_close().await;
1677
1678 if !expired_sessions.is_empty() {
1679 let mut sessions = self.active_sessions.write().await;
1680 for session_id in &expired_sessions {
1681 sessions.remove(session_id);
1682 }
1683 }
1684
1685 expired_sessions
1686 }
1687
1688 /// Get the total number of active sessions.
1689 pub async fn session_count(&self) -> usize {
1690 let sessions = self.active_sessions.read().await;
1691 sessions.len()
1692 }
1693
1694 /// Get all active session IDs.
1695 pub async fn get_active_sessions(&self) -> Vec<SessionId> {
1696 let sessions = self.active_sessions.read().await;
1697 sessions.keys().cloned().collect()
1698 }
1699
1700 /// Get all sessions for a specific consumer.
1701 pub async fn get_consumer_sessions(&self, consumer_id: &str) -> Vec<SessionId> {
1702 let sessions = self.active_sessions.read().await;
1703 sessions
1704 .iter()
1705 .filter(|(_, info)| info.consumer_id() == consumer_id)
1706 .map(|(session_id, _)| session_id.clone())
1707 .collect()
1708 }
1709}