Skip to main content

queue_runtime/providers/
memory.rs

1//! In-memory queue provider implementation for testing and development.
2//!
3//! This module provides a fully functional in-memory queue implementation that:
4//! - Supports session-based ordered message processing
5//! - Implements visibility timeouts and message TTL
6//! - Simulates dead letter queue behavior
7//! - Provides thread-safe concurrent access
8//!
9//! This provider is intended for:
10//! - Unit testing of queue-runtime consumers
11//! - Development and prototyping
12//! - Reference implementation for cloud providers
13
14use crate::client::{QueueProvider, SessionProvider};
15use crate::error::QueueError;
16use crate::message::{
17    Message, MessageId, QueueName, ReceiptHandle, ReceivedMessage, SessionId, Timestamp,
18};
19use crate::provider::{InMemoryConfig, ProviderType, SessionSupport};
20use async_trait::async_trait;
21use bytes::Bytes;
22use chrono::Duration;
23use std::collections::{HashMap, VecDeque};
24use std::sync::{Arc, RwLock};
25
26#[cfg(test)]
27#[path = "memory_tests.rs"]
28mod tests;
29
30// ============================================================================
31// Internal Storage Structures
32// ============================================================================
33
34/// Thread-safe storage for all queues
35struct QueueStorage {
36    queues: HashMap<QueueName, InMemoryQueue>,
37    config: InMemoryConfig,
38}
39
40impl QueueStorage {
41    fn new(config: InMemoryConfig) -> Self {
42        Self {
43            queues: HashMap::new(),
44            config,
45        }
46    }
47
48    /// Get or create a queue
49    fn get_or_create_queue(&mut self, queue_name: &QueueName) -> &mut InMemoryQueue {
50        self.queues
51            .entry(queue_name.clone())
52            .or_insert_with(|| InMemoryQueue::new(self.config.clone()))
53    }
54}
55
56/// Internal queue state for a single queue
57struct InMemoryQueue {
58    /// Main message queue (FIFO order)
59    messages: VecDeque<StoredMessage>,
60    /// Dead letter queue for failed messages
61    dead_letter: VecDeque<StoredMessage>,
62    /// In-flight messages being processed
63    in_flight: HashMap<String, InFlightMessage>,
64    /// Session state tracking
65    sessions: HashMap<SessionId, SessionState>,
66    /// Configuration
67    config: InMemoryConfig,
68}
69
70impl InMemoryQueue {
71    fn new(config: InMemoryConfig) -> Self {
72        Self {
73            messages: VecDeque::new(),
74            dead_letter: VecDeque::new(),
75            in_flight: HashMap::new(),
76            sessions: HashMap::new(),
77            config,
78        }
79    }
80}
81
82/// A message stored in the queue with metadata
83#[derive(Clone)]
84struct StoredMessage {
85    message_id: MessageId,
86    body: Bytes,
87    attributes: HashMap<String, String>,
88    session_id: Option<SessionId>,
89    correlation_id: Option<String>,
90    enqueued_at: Timestamp,
91    delivery_count: u32,
92    available_at: Timestamp,
93    expires_at: Option<Timestamp>,
94}
95
96impl StoredMessage {
97    fn from_message(message: &Message, message_id: MessageId, config: &InMemoryConfig) -> Self {
98        let now = Timestamp::now();
99
100        // Apply TTL: use message TTL if provided, otherwise use default from config
101        let ttl = message.time_to_live.or(config.default_message_ttl);
102        let expires_at = ttl.map(|ttl| Timestamp::from_datetime(now.as_datetime() + ttl));
103
104        Self {
105            message_id,
106            body: message.body.clone(),
107            attributes: message.attributes.clone(),
108            session_id: message.session_id.clone(),
109            correlation_id: message.correlation_id.clone(),
110            enqueued_at: now,
111            delivery_count: 0,
112            available_at: now,
113            expires_at,
114        }
115    }
116
117    /// Check if message is expired based on TTL
118    fn is_expired(&self) -> bool {
119        if let Some(ref expires_at) = self.expires_at {
120            Timestamp::now() >= *expires_at
121        } else {
122            false
123        }
124    }
125
126    /// Check if message is available for receiving
127    fn is_available(&self) -> bool {
128        Timestamp::now() >= self.available_at
129    }
130}
131
132/// A message currently being processed
133#[allow(dead_code)]
134struct InFlightMessage {
135    message: StoredMessage,
136    receipt_handle: String,
137    lock_expires_at: Timestamp,
138}
139
140#[allow(dead_code)]
141impl InFlightMessage {
142    fn is_expired(&self) -> bool {
143        Timestamp::now() >= self.lock_expires_at
144    }
145}
146
147/// State tracking for a message session
148struct SessionState {
149    locked: bool,
150    lock_expires_at: Option<Timestamp>,
151    locked_by: Option<String>, // Session client ID
152}
153
154impl SessionState {
155    fn new() -> Self {
156        Self {
157            locked: false,
158            lock_expires_at: None,
159            locked_by: None,
160        }
161    }
162
163    fn is_locked(&self) -> bool {
164        if !self.locked {
165            return false;
166        }
167
168        // Check if lock has expired
169        if let Some(ref expires_at) = self.lock_expires_at {
170            if Timestamp::now() >= *expires_at {
171                return false;
172            }
173        }
174
175        true
176    }
177}
178
179// ============================================================================
180// InMemoryProvider
181// ============================================================================
182
183/// In-memory queue provider implementation
184pub struct InMemoryProvider {
185    storage: Arc<RwLock<QueueStorage>>,
186}
187
188impl InMemoryProvider {
189    /// Create new in-memory provider with configuration
190    pub fn new(config: InMemoryConfig) -> Self {
191        Self {
192            storage: Arc::new(RwLock::new(QueueStorage::new(config))),
193        }
194    }
195
196    /// Helper method to accept a session and return a SessionClient.
197    ///
198    /// This is a convenience method for testing that wraps create_session_client.
199    /// In production code, use QueueClient::accept_session() instead.
200    pub async fn accept_session(
201        &self,
202        queue: &QueueName,
203        session_id: Option<SessionId>,
204    ) -> Result<Box<dyn crate::client::SessionClient>, QueueError> {
205        use crate::client::SessionProvider;
206
207        let provider = self.create_session_client(queue, session_id).await?;
208
209        // Wrap in StandardSessionClient
210        struct StandardSessionClient {
211            provider: Box<dyn SessionProvider>,
212        }
213
214        #[async_trait]
215        impl crate::client::SessionClient for StandardSessionClient {
216            async fn receive_message(
217                &self,
218                timeout: Duration,
219            ) -> Result<Option<ReceivedMessage>, QueueError> {
220                self.provider.receive_message(timeout).await
221            }
222
223            async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
224                self.provider.complete_message(&receipt).await
225            }
226
227            async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
228                self.provider.abandon_message(&receipt).await
229            }
230
231            async fn dead_letter_message(
232                &self,
233                receipt: ReceiptHandle,
234                reason: String,
235            ) -> Result<(), QueueError> {
236                self.provider.dead_letter_message(&receipt, &reason).await
237            }
238
239            async fn renew_session_lock(&self) -> Result<(), QueueError> {
240                self.provider.renew_session_lock().await
241            }
242
243            async fn close_session(&self) -> Result<(), QueueError> {
244                self.provider.close_session().await
245            }
246
247            fn session_id(&self) -> &SessionId {
248                self.provider.session_id()
249            }
250
251            fn session_expires_at(&self) -> Timestamp {
252                self.provider.session_expires_at()
253            }
254        }
255
256        Ok(Box::new(StandardSessionClient { provider }))
257    }
258
259    /// Return expired in-flight messages back to the queue
260    fn return_expired_messages(queue: &mut InMemoryQueue) {
261        let now = Timestamp::now();
262        let mut expired_handles = Vec::new();
263
264        // Find expired messages
265        for (handle, inflight) in &queue.in_flight {
266            if now >= inflight.lock_expires_at {
267                expired_handles.push(handle.clone());
268            }
269        }
270
271        // Return them to the queue
272        for handle in expired_handles {
273            if let Some(inflight) = queue.in_flight.remove(&handle) {
274                let mut message = inflight.message;
275                // Make immediately available
276                message.available_at = now;
277                queue.messages.push_back(message);
278            }
279        }
280    }
281
282    /// Clean up expired messages (based on TTL)
283    fn clean_expired_messages(queue: &mut InMemoryQueue) {
284        let mut i = 0;
285        while i < queue.messages.len() {
286            if queue.messages[i].is_expired() {
287                // Remove expired message
288                if let Some(expired_msg) = queue.messages.remove(i) {
289                    // Move to DLQ if enabled, otherwise just discard
290                    if queue.config.enable_dead_letter_queue {
291                        queue.dead_letter.push_back(expired_msg);
292                    }
293                }
294                // Don't increment i since we removed an element
295            } else {
296                i += 1;
297            }
298        }
299    }
300
301    /// Check if a session is locked
302    fn is_session_locked(queue: &InMemoryQueue, session_id: &Option<SessionId>) -> bool {
303        if let Some(ref sid) = session_id {
304            if let Some(session_state) = queue.sessions.get(sid) {
305                return session_state.is_locked();
306            }
307        }
308        false
309    }
310}
311
312impl Default for InMemoryProvider {
313    fn default() -> Self {
314        Self::new(InMemoryConfig::default())
315    }
316}
317
318#[async_trait]
319impl QueueProvider for InMemoryProvider {
320    async fn send_message(
321        &self,
322        queue: &QueueName,
323        message: &Message,
324    ) -> Result<MessageId, QueueError> {
325        // Validate message size (10MB for in-memory provider)
326        let message_size = message.body.len();
327        let max_size = self.provider_type().max_message_size();
328        if message_size > max_size {
329            return Err(QueueError::MessageTooLarge {
330                size: message_size,
331                max_size,
332            });
333        }
334
335        // Generate message ID
336        let message_id = MessageId::new();
337
338        // Store message with config for default TTL
339        let mut storage = self.storage.write().unwrap();
340        let queue_state = storage.get_or_create_queue(queue);
341        let stored_message =
342            StoredMessage::from_message(message, message_id.clone(), &queue_state.config);
343        queue_state.messages.push_back(stored_message);
344
345        Ok(message_id)
346    }
347
348    async fn send_messages(
349        &self,
350        queue: &QueueName,
351        messages: &[Message],
352    ) -> Result<Vec<MessageId>, QueueError> {
353        // Validate batch size
354        if messages.len() > self.max_batch_size() as usize {
355            return Err(QueueError::BatchTooLarge {
356                size: messages.len(),
357                max_size: self.max_batch_size() as usize,
358            });
359        }
360
361        // Validate individual message sizes and send all
362        let mut message_ids = Vec::with_capacity(messages.len());
363        for message in messages {
364            let message_id = self.send_message(queue, message).await?;
365            message_ids.push(message_id);
366        }
367
368        Ok(message_ids)
369    }
370
371    async fn receive_message(
372        &self,
373        queue: &QueueName,
374        timeout: Duration,
375    ) -> Result<Option<ReceivedMessage>, QueueError> {
376        let start_time = std::time::Instant::now();
377        let timeout_duration = timeout
378            .to_std()
379            .unwrap_or(std::time::Duration::from_secs(30));
380
381        loop {
382            // Try to receive a message
383            let received_message = {
384                let mut storage = self.storage.write().unwrap();
385                let queue_state = storage.get_or_create_queue(queue);
386
387                // First, return any expired in-flight messages back to the queue
388                Self::return_expired_messages(queue_state);
389
390                // Clean up expired messages (move to DLQ or discard)
391                Self::clean_expired_messages(queue_state);
392
393                // Find first available message (not expired, visibility timeout passed, not in a locked session)
394                let now = Timestamp::now();
395                let message_index = queue_state.messages.iter().position(|msg| {
396                    !msg.is_expired()
397                        && msg.is_available()
398                        && !Self::is_session_locked(queue_state, &msg.session_id)
399                });
400
401                if let Some(index) = message_index {
402                    // Remove message from queue
403                    let mut stored_message = queue_state.messages.remove(index).unwrap();
404
405                    // Check if message should go to DLQ before delivery
406                    if queue_state.config.enable_dead_letter_queue
407                        && stored_message.delivery_count >= queue_state.config.max_delivery_count
408                    {
409                        // Move to DLQ instead of delivering
410                        queue_state.dead_letter.push_back(stored_message);
411                        None
412                    } else {
413                        // Increment delivery count
414                        stored_message.delivery_count += 1;
415
416                        // Create receipt handle
417                        let receipt_handle_str = uuid::Uuid::new_v4().to_string();
418                        let lock_expires_at =
419                            Timestamp::from_datetime(now.as_datetime() + Duration::seconds(30));
420                        let receipt_handle = ReceiptHandle::new(
421                            receipt_handle_str.clone(),
422                            lock_expires_at,
423                            ProviderType::InMemory,
424                        );
425
426                        // Create received message
427                        let received_message = ReceivedMessage {
428                            message_id: stored_message.message_id.clone(),
429                            body: stored_message.body.clone(),
430                            attributes: stored_message.attributes.clone(),
431                            session_id: stored_message.session_id.clone(),
432                            correlation_id: stored_message.correlation_id.clone(),
433                            receipt_handle: receipt_handle.clone(),
434                            delivery_count: stored_message.delivery_count,
435                            first_delivered_at: stored_message.enqueued_at,
436                            delivered_at: now,
437                        };
438
439                        // Move to in-flight
440                        let inflight = InFlightMessage {
441                            message: stored_message,
442                            receipt_handle: receipt_handle_str.clone(),
443                            lock_expires_at,
444                        };
445                        queue_state.in_flight.insert(receipt_handle_str, inflight);
446
447                        Some(received_message)
448                    }
449                } else {
450                    None
451                }
452            }; // Lock is released here
453
454            if let Some(msg) = received_message {
455                return Ok(Some(msg));
456            }
457
458            // No message available - check timeout
459            if start_time.elapsed() >= timeout_duration {
460                return Ok(None);
461            }
462
463            // Small sleep before retry
464            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
465        }
466    }
467
468    async fn receive_messages(
469        &self,
470        queue: &QueueName,
471        max_messages: u32,
472        timeout: Duration,
473    ) -> Result<Vec<ReceivedMessage>, QueueError> {
474        let mut messages = Vec::new();
475        let start_time = std::time::Instant::now();
476        let timeout_duration = timeout
477            .to_std()
478            .unwrap_or(std::time::Duration::from_secs(30));
479
480        while messages.len() < max_messages as usize {
481            let remaining_timeout = timeout_duration
482                .checked_sub(start_time.elapsed())
483                .unwrap_or(std::time::Duration::ZERO);
484
485            if remaining_timeout.is_zero() {
486                break;
487            }
488
489            let remaining_duration =
490                Duration::from_std(remaining_timeout).unwrap_or(Duration::zero());
491            let received = self.receive_message(queue, remaining_duration).await?;
492
493            match received {
494                Some(msg) => messages.push(msg),
495                None => break,
496            }
497        }
498
499        Ok(messages)
500    }
501
502    async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
503        let mut storage = self.storage.write().unwrap();
504        let now = Timestamp::now();
505
506        // Find the queue containing this receipt
507        for queue in storage.queues.values_mut() {
508            if let Some(inflight) = queue.in_flight.get(receipt.handle()) {
509                // Check if receipt is expired
510                if inflight.lock_expires_at <= now {
511                    queue.in_flight.remove(receipt.handle());
512                    return Err(QueueError::MessageNotFound {
513                        receipt: receipt.handle().to_string(),
514                    });
515                }
516
517                // Remove from in-flight (permanently deletes the message)
518                queue.in_flight.remove(receipt.handle());
519                return Ok(());
520            }
521        }
522
523        // Receipt not found in any queue
524        Err(QueueError::MessageNotFound {
525            receipt: receipt.handle().to_string(),
526        })
527    }
528
529    async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
530        let mut storage = self.storage.write().unwrap();
531        let now = Timestamp::now();
532
533        // Find the queue containing this receipt
534        for queue in storage.queues.values_mut() {
535            if let Some(inflight) = queue.in_flight.remove(receipt.handle()) {
536                // Check if receipt is expired
537                if inflight.lock_expires_at <= now {
538                    return Err(QueueError::MessageNotFound {
539                        receipt: receipt.handle().to_string(),
540                    });
541                }
542
543                // Return message to queue with immediate availability
544                let mut returned_message = inflight.message;
545                returned_message.available_at = now;
546
547                // Add back to queue (front for sessions to maintain ordering, back for others)
548                if returned_message.session_id.is_some() {
549                    queue.messages.push_front(returned_message);
550                } else {
551                    queue.messages.push_back(returned_message);
552                }
553
554                return Ok(());
555            }
556        }
557
558        // Receipt not found in any queue
559        Err(QueueError::MessageNotFound {
560            receipt: receipt.handle().to_string(),
561        })
562    }
563
564    async fn dead_letter_message(
565        &self,
566        _receipt: &ReceiptHandle,
567        _reason: &str,
568    ) -> Result<(), QueueError> {
569        // TODO: Implement in subtask 10.4
570        unimplemented!("dead_letter_message will be implemented in subtask 10.4")
571    }
572
573    async fn create_session_client(
574        &self,
575        queue: &QueueName,
576        session_id: Option<SessionId>,
577    ) -> Result<Box<dyn SessionProvider>, QueueError> {
578        // Determine which session to use
579        let target_session_id = if let Some(sid) = session_id {
580            sid
581        } else {
582            // Find first available session (one with messages but not locked)
583            let storage = self.storage.read().unwrap();
584            let queue_state =
585                storage
586                    .queues
587                    .get(queue)
588                    .ok_or_else(|| QueueError::QueueNotFound {
589                        queue_name: queue.as_str().to_string(),
590                    })?;
591
592            // Find first session with available messages
593            let mut sessions_with_messages = std::collections::HashSet::new();
594            for msg in &queue_state.messages {
595                if let Some(ref sid) = msg.session_id {
596                    sessions_with_messages.insert(sid.clone());
597                }
598            }
599
600            // Check which sessions are not locked
601            let mut found_session = None;
602            for sid in sessions_with_messages {
603                let session_state = queue_state.sessions.get(&sid);
604                if session_state.map(|s| !s.is_locked()).unwrap_or(true) {
605                    // Session has messages and is not locked
606                    found_session = Some(sid);
607                    break;
608                }
609            }
610
611            found_session.ok_or_else(|| QueueError::SessionNotFound {
612                session_id: "<any>".to_string(),
613            })?
614        };
615
616        // Try to acquire lock on the session
617        let mut storage = self.storage.write().unwrap();
618        let queue_state = storage.get_or_create_queue(queue);
619        let config = queue_state.config.clone();
620
621        // Check if session is already locked
622        let session_state = queue_state
623            .sessions
624            .entry(target_session_id.clone())
625            .or_insert_with(SessionState::new);
626
627        if session_state.is_locked() {
628            let locked_until = session_state.lock_expires_at.unwrap_or_else(Timestamp::now);
629            return Err(QueueError::SessionLocked {
630                session_id: target_session_id.as_str().to_string(),
631                locked_until,
632            });
633        }
634
635        // Acquire lock
636        let lock_duration = config.session_lock_duration;
637        let now = Timestamp::now();
638        let lock_expires_at = Timestamp::from_datetime(now.as_datetime() + lock_duration);
639        let client_id = uuid::Uuid::new_v4().to_string();
640
641        session_state.locked = true;
642        session_state.lock_expires_at = Some(lock_expires_at);
643        session_state.locked_by = Some(client_id.clone());
644
645        // Create session provider
646        Ok(Box::new(InMemorySessionProvider::new(
647            self.storage.clone(),
648            queue.clone(),
649            target_session_id,
650            client_id,
651            lock_expires_at,
652        )))
653    }
654
655    fn provider_type(&self) -> ProviderType {
656        ProviderType::InMemory
657    }
658
659    fn supports_sessions(&self) -> SessionSupport {
660        SessionSupport::Native
661    }
662
663    fn supports_batching(&self) -> bool {
664        true
665    }
666
667    fn max_batch_size(&self) -> u32 {
668        100
669    }
670}
671
672// ============================================================================
673// InMemorySessionProvider
674// ============================================================================
675
676/// In-memory session provider implementation
677pub struct InMemorySessionProvider {
678    storage: Arc<RwLock<QueueStorage>>,
679    queue_name: QueueName,
680    session_id: SessionId,
681    client_id: String,
682    lock_expires_at: Timestamp,
683}
684
685impl InMemorySessionProvider {
686    fn new(
687        storage: Arc<RwLock<QueueStorage>>,
688        queue_name: QueueName,
689        session_id: SessionId,
690        client_id: String,
691        lock_expires_at: Timestamp,
692    ) -> Self {
693        Self {
694            storage,
695            queue_name,
696            session_id,
697            client_id,
698            lock_expires_at,
699        }
700    }
701}
702
703#[async_trait]
704impl SessionProvider for InMemorySessionProvider {
705    async fn receive_message(
706        &self,
707        timeout: Duration,
708    ) -> Result<Option<ReceivedMessage>, QueueError> {
709        // Check if we still hold the lock
710        {
711            let storage = self.storage.read().unwrap();
712            if let Some(queue_state) = storage.queues.get(&self.queue_name) {
713                if let Some(session_state) = queue_state.sessions.get(&self.session_id) {
714                    if !session_state.is_locked()
715                        || session_state.locked_by.as_ref() != Some(&self.client_id)
716                    {
717                        return Err(QueueError::SessionLocked {
718                            session_id: self.session_id.as_str().to_string(),
719                            locked_until: session_state
720                                .lock_expires_at
721                                .unwrap_or_else(Timestamp::now),
722                        });
723                    }
724                }
725            }
726        }
727
728        // Use a similar approach to regular receive, but filtered for this session
729        let start_time = std::time::Instant::now();
730        let timeout_duration = timeout
731            .to_std()
732            .unwrap_or(std::time::Duration::from_secs(30));
733
734        loop {
735            // Try to receive a message for this session
736            let received_message = {
737                let mut storage = self.storage.write().unwrap();
738                if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
739                    // Clean up expired messages
740                    InMemoryProvider::clean_expired_messages(queue_state);
741
742                    // Find first available message for this session
743                    let now = Timestamp::now();
744                    let message_index = queue_state.messages.iter().position(|msg| {
745                        !msg.is_expired()
746                            && msg.is_available()
747                            && msg.session_id.as_ref() == Some(&self.session_id)
748                    });
749
750                    if let Some(index) = message_index {
751                        // Remove message from queue
752                        let mut message = queue_state.messages.remove(index).unwrap();
753
754                        // Generate receipt handle
755                        let receipt = uuid::Uuid::new_v4().to_string();
756
757                        // Calculate visibility timeout
758                        let visibility_timeout = Duration::seconds(30);
759                        let lock_expires_at =
760                            Timestamp::from_datetime(now.as_datetime() + visibility_timeout);
761
762                        // Track delivery
763                        message.delivery_count += 1;
764                        let first_delivered_at = if message.delivery_count == 1 {
765                            now
766                        } else {
767                            message.enqueued_at
768                        };
769
770                        // Add to in-flight
771                        queue_state.in_flight.insert(
772                            receipt.clone(),
773                            InFlightMessage {
774                                message: message.clone(),
775                                receipt_handle: receipt.clone(),
776                                lock_expires_at,
777                            },
778                        );
779
780                        // Build received message
781                        Some(ReceivedMessage {
782                            message_id: message.message_id.clone(),
783                            body: message.body.clone(),
784                            attributes: message.attributes.clone(),
785                            receipt_handle: ReceiptHandle::new(
786                                receipt,
787                                lock_expires_at,
788                                ProviderType::InMemory,
789                            ),
790                            session_id: message.session_id.clone(),
791                            correlation_id: message.correlation_id.clone(),
792                            delivery_count: message.delivery_count,
793                            first_delivered_at,
794                            delivered_at: now,
795                        })
796                    } else {
797                        None
798                    }
799                } else {
800                    None
801                }
802            };
803
804            if let Some(msg) = received_message {
805                return Ok(Some(msg));
806            }
807
808            // Check timeout
809            if start_time.elapsed() >= timeout_duration {
810                return Ok(None);
811            }
812
813            // Brief sleep before retry
814            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
815        }
816    }
817
818    async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
819        // Verify we still hold the session lock
820        {
821            let storage = self.storage.read().unwrap();
822            if let Some(queue_state) = storage.queues.get(&self.queue_name) {
823                if let Some(session_state) = queue_state.sessions.get(&self.session_id) {
824                    if !session_state.is_locked()
825                        || session_state.locked_by.as_ref() != Some(&self.client_id)
826                    {
827                        return Err(QueueError::SessionLocked {
828                            session_id: self.session_id.as_str().to_string(),
829                            locked_until: session_state
830                                .lock_expires_at
831                                .unwrap_or_else(Timestamp::now),
832                        });
833                    }
834                }
835            }
836        }
837
838        // Delegate to storage to remove message
839        let mut storage = self.storage.write().unwrap();
840        if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
841            if queue_state.in_flight.remove(receipt.handle()).is_some() {
842                return Ok(());
843            }
844        }
845
846        Err(QueueError::MessageNotFound {
847            receipt: receipt.handle().to_string(),
848        })
849    }
850
851    async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
852        // Verify we still hold the session lock
853        {
854            let storage = self.storage.read().unwrap();
855            if let Some(queue_state) = storage.queues.get(&self.queue_name) {
856                if let Some(session_state) = queue_state.sessions.get(&self.session_id) {
857                    if !session_state.is_locked()
858                        || session_state.locked_by.as_ref() != Some(&self.client_id)
859                    {
860                        return Err(QueueError::SessionLocked {
861                            session_id: self.session_id.as_str().to_string(),
862                            locked_until: session_state
863                                .lock_expires_at
864                                .unwrap_or_else(Timestamp::now),
865                        });
866                    }
867                }
868            }
869        }
870
871        // Return message to queue
872        let mut storage = self.storage.write().unwrap();
873        if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
874            if let Some(inflight) = queue_state.in_flight.remove(receipt.handle()) {
875                let mut message = inflight.message;
876
877                // Check if max delivery count reached
878                if message.delivery_count >= queue_state.config.max_delivery_count {
879                    // Move to DLQ if enabled
880                    if queue_state.config.enable_dead_letter_queue {
881                        queue_state.dead_letter.push_back(message);
882                        return Ok(());
883                    }
884                }
885
886                // Make immediately available and add back to front for session ordering
887                message.available_at = Timestamp::now();
888                queue_state.messages.push_front(message);
889                return Ok(());
890            }
891        }
892
893        Err(QueueError::MessageNotFound {
894            receipt: receipt.handle().to_string(),
895        })
896    }
897
898    async fn dead_letter_message(
899        &self,
900        receipt: &ReceiptHandle,
901        _reason: &str,
902    ) -> Result<(), QueueError> {
903        // Verify we still hold the session lock
904        {
905            let storage = self.storage.read().unwrap();
906            if let Some(queue_state) = storage.queues.get(&self.queue_name) {
907                if let Some(session_state) = queue_state.sessions.get(&self.session_id) {
908                    if !session_state.is_locked()
909                        || session_state.locked_by.as_ref() != Some(&self.client_id)
910                    {
911                        return Err(QueueError::SessionLocked {
912                            session_id: self.session_id.as_str().to_string(),
913                            locked_until: session_state
914                                .lock_expires_at
915                                .unwrap_or_else(Timestamp::now),
916                        });
917                    }
918                }
919            }
920        }
921
922        // Move message to DLQ
923        let mut storage = self.storage.write().unwrap();
924        if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
925            if let Some(inflight) = queue_state.in_flight.remove(receipt.handle()) {
926                queue_state.dead_letter.push_back(inflight.message);
927                return Ok(());
928            }
929        }
930
931        Err(QueueError::MessageNotFound {
932            receipt: receipt.handle().to_string(),
933        })
934    }
935
936    async fn renew_session_lock(&self) -> Result<(), QueueError> {
937        let mut storage = self.storage.write().unwrap();
938        if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
939            if let Some(session_state) = queue_state.sessions.get_mut(&self.session_id) {
940                // Verify we hold the lock
941                if session_state.locked_by.as_ref() != Some(&self.client_id) {
942                    return Err(QueueError::SessionLocked {
943                        session_id: self.session_id.as_str().to_string(),
944                        locked_until: session_state.lock_expires_at.unwrap_or_else(Timestamp::now),
945                    });
946                }
947
948                // Renew lock
949                let lock_duration = queue_state.config.session_lock_duration;
950                let new_expires_at =
951                    Timestamp::from_datetime(Timestamp::now().as_datetime() + lock_duration);
952                session_state.lock_expires_at = Some(new_expires_at);
953
954                return Ok(());
955            }
956        }
957
958        Err(QueueError::SessionNotFound {
959            session_id: self.session_id.as_str().to_string(),
960        })
961    }
962
963    async fn close_session(&self) -> Result<(), QueueError> {
964        let mut storage = self.storage.write().unwrap();
965        if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
966            if let Some(session_state) = queue_state.sessions.get_mut(&self.session_id) {
967                // Release lock
968                session_state.locked = false;
969                session_state.lock_expires_at = None;
970                session_state.locked_by = None;
971                return Ok(());
972            }
973        }
974
975        Ok(()) // Session already released or doesn't exist - that's fine
976    }
977
978    fn session_id(&self) -> &SessionId {
979        &self.session_id
980    }
981
982    fn session_expires_at(&self) -> Timestamp {
983        self.lock_expires_at
984    }
985}