Skip to main content

queue_runtime/providers/
memory.rs

1//! In-memory queue provider implementation for testing and development.
2//!
3//! This module provides a fully functional in-memory queue implementation that:
4//! - Supports session-based ordered message processing
5//! - Implements visibility timeouts and message TTL
6//! - Simulates dead letter queue behavior
7//! - Provides thread-safe concurrent access
8//!
9//! This provider is intended for:
10//! - Unit testing of queue-runtime consumers
11//! - Development and prototyping
12//! - Reference implementation for cloud providers
13
14use crate::client::{QueueProvider, SessionProvider};
15use crate::error::QueueError;
16use crate::message::{
17    Message, MessageId, QueueName, ReceiptHandle, ReceivedMessage, SessionId, Timestamp,
18};
19use crate::provider::{InMemoryConfig, ProviderType, SessionSupport};
20use async_trait::async_trait;
21use bytes::Bytes;
22use chrono::Duration;
23use std::collections::{HashMap, VecDeque};
24use std::sync::{Arc, RwLock};
25
26#[cfg(test)]
27#[path = "memory_tests.rs"]
28mod tests;
29
30// ============================================================================
31// Internal Storage Structures
32// ============================================================================
33
34/// Thread-safe storage for all queues
35struct QueueStorage {
36    queues: HashMap<QueueName, InMemoryQueue>,
37    config: InMemoryConfig,
38}
39
40impl QueueStorage {
41    fn new(config: InMemoryConfig) -> Self {
42        Self {
43            queues: HashMap::new(),
44            config,
45        }
46    }
47
48    /// Get or create a queue
49    fn get_or_create_queue(&mut self, queue_name: &QueueName) -> &mut InMemoryQueue {
50        self.queues
51            .entry(queue_name.clone())
52            .or_insert_with(|| InMemoryQueue::new(self.config.clone()))
53    }
54}
55
56/// Internal queue state for a single queue
57struct InMemoryQueue {
58    /// Main message queue (FIFO order)
59    messages: VecDeque<StoredMessage>,
60    /// Dead letter queue for failed messages
61    dead_letter: VecDeque<StoredMessage>,
62    /// In-flight messages being processed
63    in_flight: HashMap<String, InFlightMessage>,
64    /// Session state tracking
65    sessions: HashMap<SessionId, SessionState>,
66    /// Configuration
67    config: InMemoryConfig,
68}
69
70impl InMemoryQueue {
71    fn new(config: InMemoryConfig) -> Self {
72        Self {
73            messages: VecDeque::new(),
74            dead_letter: VecDeque::new(),
75            in_flight: HashMap::new(),
76            sessions: HashMap::new(),
77            config,
78        }
79    }
80}
81
82/// A message stored in the queue with metadata
83#[derive(Clone)]
84struct StoredMessage {
85    message_id: MessageId,
86    body: Bytes,
87    attributes: HashMap<String, String>,
88    session_id: Option<SessionId>,
89    correlation_id: Option<String>,
90    enqueued_at: Timestamp,
91    delivery_count: u32,
92    available_at: Timestamp,
93    expires_at: Option<Timestamp>,
94}
95
96impl StoredMessage {
97    fn from_message(message: &Message, message_id: MessageId, config: &InMemoryConfig) -> Self {
98        let now = Timestamp::now();
99
100        // Apply TTL: use message TTL if provided, otherwise use default from config
101        let ttl = message.time_to_live.or(config.default_message_ttl);
102        let expires_at = ttl.map(|ttl| Timestamp::from_datetime(now.as_datetime() + ttl));
103
104        Self {
105            message_id,
106            body: message.body.clone(),
107            attributes: message.attributes.clone(),
108            session_id: message.session_id.clone(),
109            correlation_id: message.correlation_id.clone(),
110            enqueued_at: now,
111            delivery_count: 0,
112            available_at: now,
113            expires_at,
114        }
115    }
116
117    /// Check if message is expired based on TTL
118    fn is_expired(&self) -> bool {
119        if let Some(ref expires_at) = self.expires_at {
120            Timestamp::now() >= *expires_at
121        } else {
122            false
123        }
124    }
125
126    /// Check if message is available for receiving
127    fn is_available(&self) -> bool {
128        Timestamp::now() >= self.available_at
129    }
130}
131
132/// A message currently being processed
133#[allow(dead_code)]
134struct InFlightMessage {
135    message: StoredMessage,
136    receipt_handle: String,
137    lock_expires_at: Timestamp,
138}
139
140#[allow(dead_code)]
141impl InFlightMessage {
142    fn is_expired(&self) -> bool {
143        Timestamp::now() >= self.lock_expires_at
144    }
145}
146
147/// State tracking for a message session
148struct SessionState {
149    locked: bool,
150    lock_expires_at: Option<Timestamp>,
151    locked_by: Option<String>, // Session client ID
152}
153
154impl SessionState {
155    fn new() -> Self {
156        Self {
157            locked: false,
158            lock_expires_at: None,
159            locked_by: None,
160        }
161    }
162
163    fn is_locked(&self) -> bool {
164        if !self.locked {
165            return false;
166        }
167
168        // Check if lock has expired
169        if let Some(ref expires_at) = self.lock_expires_at {
170            if Timestamp::now() >= *expires_at {
171                return false;
172            }
173        }
174
175        true
176    }
177}
178
179// ============================================================================
180// InMemoryProvider
181// ============================================================================
182
183/// In-memory queue provider implementation
184pub struct InMemoryProvider {
185    storage: Arc<RwLock<QueueStorage>>,
186}
187
188impl InMemoryProvider {
189    /// Create new in-memory provider with configuration
190    pub fn new(config: InMemoryConfig) -> Self {
191        Self {
192            storage: Arc::new(RwLock::new(QueueStorage::new(config))),
193        }
194    }
195
196    /// Helper method to accept a session and return a SessionClient.
197    ///
198    /// This is a convenience method for testing that wraps create_session_client.
199    /// In production code, use QueueClient::accept_session() instead.
200    pub async fn accept_session(
201        &self,
202        queue: &QueueName,
203        session_id: Option<SessionId>,
204    ) -> Result<Box<dyn crate::client::SessionClient>, QueueError> {
205        use crate::client::SessionProvider;
206
207        let provider = self.create_session_client(queue, session_id).await?;
208
209        // Wrap in StandardSessionClient
210        struct StandardSessionClient {
211            provider: Box<dyn SessionProvider>,
212        }
213
214        #[async_trait]
215        impl crate::client::SessionClient for StandardSessionClient {
216            async fn receive_message(
217                &self,
218                timeout: Duration,
219            ) -> Result<Option<ReceivedMessage>, QueueError> {
220                self.provider.receive_message(timeout).await
221            }
222
223            async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
224                self.provider.complete_message(&receipt).await
225            }
226
227            async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
228                self.provider.abandon_message(&receipt).await
229            }
230
231            async fn dead_letter_message(
232                &self,
233                receipt: ReceiptHandle,
234                reason: String,
235            ) -> Result<(), QueueError> {
236                self.provider.dead_letter_message(&receipt, &reason).await
237            }
238
239            async fn renew_session_lock(&self) -> Result<(), QueueError> {
240                self.provider.renew_session_lock().await
241            }
242
243            async fn close_session(&self) -> Result<(), QueueError> {
244                self.provider.close_session().await
245            }
246
247            fn session_id(&self) -> &SessionId {
248                self.provider.session_id()
249            }
250
251            fn session_expires_at(&self) -> Timestamp {
252                self.provider.session_expires_at()
253            }
254        }
255
256        Ok(Box::new(StandardSessionClient { provider }))
257    }
258
259    /// Return expired in-flight messages back to the queue
260    fn return_expired_messages(queue: &mut InMemoryQueue) {
261        let now = Timestamp::now();
262        let mut expired_handles = Vec::new();
263
264        // Find expired messages
265        for (handle, inflight) in &queue.in_flight {
266            if now >= inflight.lock_expires_at {
267                expired_handles.push(handle.clone());
268            }
269        }
270
271        // Return them to the queue
272        for handle in expired_handles {
273            if let Some(inflight) = queue.in_flight.remove(&handle) {
274                let mut message = inflight.message;
275                // Make immediately available
276                message.available_at = now;
277                queue.messages.push_back(message);
278            }
279        }
280    }
281
282    /// Clean up expired messages (based on TTL)
283    fn clean_expired_messages(queue: &mut InMemoryQueue) {
284        let mut i = 0;
285        while i < queue.messages.len() {
286            if queue.messages[i].is_expired() {
287                // Remove expired message
288                if let Some(expired_msg) = queue.messages.remove(i) {
289                    // Move to DLQ if enabled, otherwise just discard
290                    if queue.config.enable_dead_letter_queue {
291                        queue.dead_letter.push_back(expired_msg);
292                    }
293                }
294                // Don't increment i since we removed an element
295            } else {
296                i += 1;
297            }
298        }
299    }
300
301    /// Check if a session is locked
302    fn is_session_locked(queue: &InMemoryQueue, session_id: &Option<SessionId>) -> bool {
303        if let Some(ref sid) = session_id {
304            if let Some(session_state) = queue.sessions.get(sid) {
305                return session_state.is_locked();
306            }
307        }
308        false
309    }
310}
311
312impl Default for InMemoryProvider {
313    fn default() -> Self {
314        Self::new(InMemoryConfig::default())
315    }
316}
317
318#[async_trait]
319impl QueueProvider for InMemoryProvider {
320    async fn send_message(
321        &self,
322        queue: &QueueName,
323        message: &Message,
324    ) -> Result<MessageId, QueueError> {
325        // Validate message size (10MB for in-memory provider)
326        let message_size = message.body.len();
327        let max_size = self.provider_type().max_message_size();
328        if message_size > max_size {
329            return Err(QueueError::MessageTooLarge {
330                size: message_size,
331                max_size,
332            });
333        }
334
335        // Generate message ID
336        let message_id = MessageId::new();
337
338        // Store message with config for default TTL
339        let mut storage = self.storage.write().unwrap();
340        let queue_state = storage.get_or_create_queue(queue);
341        let stored_message =
342            StoredMessage::from_message(message, message_id.clone(), &queue_state.config);
343        queue_state.messages.push_back(stored_message);
344
345        Ok(message_id)
346    }
347
348    async fn send_messages(
349        &self,
350        queue: &QueueName,
351        messages: &[Message],
352    ) -> Result<Vec<MessageId>, QueueError> {
353        // Validate batch size
354        if messages.len() > self.max_batch_size() as usize {
355            return Err(QueueError::BatchTooLarge {
356                size: messages.len(),
357                max_size: self.max_batch_size() as usize,
358            });
359        }
360
361        // Validate individual message sizes and send all
362        let mut message_ids = Vec::with_capacity(messages.len());
363        for message in messages {
364            let message_id = self.send_message(queue, message).await?;
365            message_ids.push(message_id);
366        }
367
368        Ok(message_ids)
369    }
370
371    async fn receive_message(
372        &self,
373        queue: &QueueName,
374        timeout: Duration,
375    ) -> Result<Option<ReceivedMessage>, QueueError> {
376        let start_time = std::time::Instant::now();
377        let timeout_duration = timeout
378            .to_std()
379            .unwrap_or(std::time::Duration::from_secs(30));
380
381        loop {
382            // Try to receive a message
383            let received_message = {
384                let mut storage = self.storage.write().unwrap();
385                let queue_state = storage.get_or_create_queue(queue);
386
387                // First, return any expired in-flight messages back to the queue
388                Self::return_expired_messages(queue_state);
389
390                // Clean up expired messages (move to DLQ or discard)
391                Self::clean_expired_messages(queue_state);
392
393                // Find first available message (not expired, visibility timeout passed, not in a locked session)
394                let now = Timestamp::now();
395                let message_index = queue_state.messages.iter().position(|msg| {
396                    !msg.is_expired()
397                        && msg.is_available()
398                        && !Self::is_session_locked(queue_state, &msg.session_id)
399                });
400
401                if let Some(index) = message_index {
402                    // Remove message from queue
403                    let mut stored_message = queue_state.messages.remove(index).unwrap();
404
405                    // Check if message should go to DLQ before delivery
406                    if queue_state.config.enable_dead_letter_queue
407                        && stored_message.delivery_count >= queue_state.config.max_delivery_count
408                    {
409                        // Move to DLQ instead of delivering
410                        queue_state.dead_letter.push_back(stored_message);
411                        None
412                    } else {
413                        // Increment delivery count
414                        stored_message.delivery_count += 1;
415
416                        // Create receipt handle
417                        let receipt_handle_str = uuid::Uuid::new_v4().to_string();
418                        let lock_expires_at =
419                            Timestamp::from_datetime(now.as_datetime() + Duration::seconds(30));
420                        let receipt_handle = ReceiptHandle::new(
421                            receipt_handle_str.clone(),
422                            lock_expires_at,
423                            ProviderType::InMemory,
424                        );
425
426                        // Create received message
427                        let received_message = ReceivedMessage {
428                            message_id: stored_message.message_id.clone(),
429                            body: stored_message.body.clone(),
430                            attributes: stored_message.attributes.clone(),
431                            session_id: stored_message.session_id.clone(),
432                            correlation_id: stored_message.correlation_id.clone(),
433                            receipt_handle: receipt_handle.clone(),
434                            delivery_count: stored_message.delivery_count,
435                            first_delivered_at: stored_message.enqueued_at,
436                            delivered_at: now,
437                        };
438
439                        // Move to in-flight
440                        let inflight = InFlightMessage {
441                            message: stored_message,
442                            receipt_handle: receipt_handle_str.clone(),
443                            lock_expires_at,
444                        };
445                        queue_state.in_flight.insert(receipt_handle_str, inflight);
446
447                        Some(received_message)
448                    }
449                } else {
450                    None
451                }
452            }; // Lock is released here
453
454            if let Some(msg) = received_message {
455                return Ok(Some(msg));
456            }
457
458            // No message available - check timeout
459            if start_time.elapsed() >= timeout_duration {
460                return Ok(None);
461            }
462
463            // Small sleep before retry
464            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
465        }
466    }
467
468    async fn receive_messages(
469        &self,
470        queue: &QueueName,
471        max_messages: u32,
472        timeout: Duration,
473    ) -> Result<Vec<ReceivedMessage>, QueueError> {
474        let mut messages = Vec::new();
475        let start_time = std::time::Instant::now();
476        let timeout_duration = timeout
477            .to_std()
478            .unwrap_or(std::time::Duration::from_secs(30));
479
480        while messages.len() < max_messages as usize {
481            let remaining_timeout = timeout_duration
482                .checked_sub(start_time.elapsed())
483                .unwrap_or(std::time::Duration::ZERO);
484
485            if remaining_timeout.is_zero() {
486                break;
487            }
488
489            let remaining_duration =
490                Duration::from_std(remaining_timeout).unwrap_or(Duration::zero());
491            let received = self.receive_message(queue, remaining_duration).await?;
492
493            match received {
494                Some(msg) => messages.push(msg),
495                None => break,
496            }
497        }
498
499        Ok(messages)
500    }
501
502    async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
503        let mut storage = self.storage.write().unwrap();
504        let now = Timestamp::now();
505
506        // Find the queue containing this receipt
507        for queue in storage.queues.values_mut() {
508            if let Some(inflight) = queue.in_flight.get(receipt.handle()) {
509                // Check if receipt is expired
510                if inflight.lock_expires_at <= now {
511                    queue.in_flight.remove(receipt.handle());
512                    return Err(QueueError::InvalidReceipt {
513                        receipt: receipt.handle().to_string(),
514                    });
515                }
516
517                // Remove from in-flight (permanently deletes the message)
518                queue.in_flight.remove(receipt.handle());
519                return Ok(());
520            }
521        }
522
523        // Receipt not found in any queue
524        Err(QueueError::InvalidReceipt {
525            receipt: receipt.handle().to_string(),
526        })
527    }
528
529    async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
530        let mut storage = self.storage.write().unwrap();
531        let now = Timestamp::now();
532
533        // Find the queue containing this receipt
534        for queue in storage.queues.values_mut() {
535            if let Some(inflight) = queue.in_flight.remove(receipt.handle()) {
536                // Check if receipt is expired
537                if inflight.lock_expires_at <= now {
538                    return Err(QueueError::InvalidReceipt {
539                        receipt: receipt.handle().to_string(),
540                    });
541                }
542
543                // Return message to queue with immediate availability
544                let mut returned_message = inflight.message;
545                returned_message.available_at = now;
546
547                // Add back to queue (front for sessions to maintain ordering, back for others)
548                if returned_message.session_id.is_some() {
549                    queue.messages.push_front(returned_message);
550                } else {
551                    queue.messages.push_back(returned_message);
552                }
553
554                return Ok(());
555            }
556        }
557
558        // Receipt not found in any queue
559        Err(QueueError::InvalidReceipt {
560            receipt: receipt.handle().to_string(),
561        })
562    }
563
564    async fn dead_letter_message(
565        &self,
566        receipt: &ReceiptHandle,
567        _reason: &str,
568    ) -> Result<(), QueueError> {
569        let mut storage = self.storage.write().unwrap();
570        let now = Timestamp::now();
571
572        // Find the queue containing this receipt and move the message to DLQ
573        for queue in storage.queues.values_mut() {
574            if let Some(inflight) = queue.in_flight.remove(receipt.handle()) {
575                // Check if receipt is expired
576                if inflight.lock_expires_at <= now {
577                    return Err(QueueError::InvalidReceipt {
578                        receipt: receipt.handle().to_string(),
579                    });
580                }
581
582                queue.dead_letter.push_back(inflight.message);
583                return Ok(());
584            }
585        }
586
587        // Receipt not found in any queue
588        Err(QueueError::InvalidReceipt {
589            receipt: receipt.handle().to_string(),
590        })
591    }
592
593    async fn create_session_client(
594        &self,
595        queue: &QueueName,
596        session_id: Option<SessionId>,
597    ) -> Result<Box<dyn SessionProvider>, QueueError> {
598        // Determine which session to use
599        let target_session_id = if let Some(sid) = session_id {
600            sid
601        } else {
602            // Find first available session (one with messages but not locked)
603            let storage = self.storage.read().unwrap();
604            let queue_state =
605                storage
606                    .queues
607                    .get(queue)
608                    .ok_or_else(|| QueueError::QueueNotFound {
609                        queue_name: queue.as_str().to_string(),
610                    })?;
611
612            // Find first session with available messages
613            let mut sessions_with_messages = std::collections::HashSet::new();
614            for msg in &queue_state.messages {
615                if let Some(ref sid) = msg.session_id {
616                    sessions_with_messages.insert(sid.clone());
617                }
618            }
619
620            // Check which sessions are not locked
621            let mut found_session = None;
622            for sid in sessions_with_messages {
623                let session_state = queue_state.sessions.get(&sid);
624                if session_state.map(|s| !s.is_locked()).unwrap_or(true) {
625                    // Session has messages and is not locked
626                    found_session = Some(sid);
627                    break;
628                }
629            }
630
631            found_session.ok_or_else(|| QueueError::SessionNotFound {
632                session_id: "<any>".to_string(),
633            })?
634        };
635
636        // Try to acquire lock on the session
637        let mut storage = self.storage.write().unwrap();
638        let queue_state = storage.get_or_create_queue(queue);
639        let config = queue_state.config.clone();
640
641        // Check if session is already locked
642        let session_state = queue_state
643            .sessions
644            .entry(target_session_id.clone())
645            .or_insert_with(SessionState::new);
646
647        if session_state.is_locked() {
648            let locked_until = session_state.lock_expires_at.unwrap_or_else(Timestamp::now);
649            return Err(QueueError::SessionLocked {
650                session_id: target_session_id.as_str().to_string(),
651                locked_until,
652            });
653        }
654
655        // Acquire lock
656        let lock_duration = config.session_lock_duration;
657        let now = Timestamp::now();
658        let lock_expires_at = Timestamp::from_datetime(now.as_datetime() + lock_duration);
659        let client_id = uuid::Uuid::new_v4().to_string();
660
661        session_state.locked = true;
662        session_state.lock_expires_at = Some(lock_expires_at);
663        session_state.locked_by = Some(client_id.clone());
664
665        // Create session provider
666        Ok(Box::new(InMemorySessionProvider::new(
667            self.storage.clone(),
668            queue.clone(),
669            target_session_id,
670            client_id,
671            lock_expires_at,
672        )))
673    }
674
675    fn provider_type(&self) -> ProviderType {
676        ProviderType::InMemory
677    }
678
679    fn supports_sessions(&self) -> SessionSupport {
680        SessionSupport::Native
681    }
682
683    fn supports_batching(&self) -> bool {
684        true
685    }
686
687    fn max_batch_size(&self) -> u32 {
688        100
689    }
690}
691
692// ============================================================================
693// InMemorySessionProvider
694// ============================================================================
695
696/// In-memory session provider implementation
697pub struct InMemorySessionProvider {
698    storage: Arc<RwLock<QueueStorage>>,
699    queue_name: QueueName,
700    session_id: SessionId,
701    client_id: String,
702    lock_expires_at: Timestamp,
703}
704
705impl InMemorySessionProvider {
706    fn new(
707        storage: Arc<RwLock<QueueStorage>>,
708        queue_name: QueueName,
709        session_id: SessionId,
710        client_id: String,
711        lock_expires_at: Timestamp,
712    ) -> Self {
713        Self {
714            storage,
715            queue_name,
716            session_id,
717            client_id,
718            lock_expires_at,
719        }
720    }
721}
722
723#[async_trait]
724impl SessionProvider for InMemorySessionProvider {
725    async fn receive_message(
726        &self,
727        timeout: Duration,
728    ) -> Result<Option<ReceivedMessage>, QueueError> {
729        // Check if we still hold the lock
730        {
731            let storage = self.storage.read().unwrap();
732            if let Some(queue_state) = storage.queues.get(&self.queue_name) {
733                if let Some(session_state) = queue_state.sessions.get(&self.session_id) {
734                    if !session_state.is_locked()
735                        || session_state.locked_by.as_ref() != Some(&self.client_id)
736                    {
737                        return Err(QueueError::SessionLocked {
738                            session_id: self.session_id.as_str().to_string(),
739                            locked_until: session_state
740                                .lock_expires_at
741                                .unwrap_or_else(Timestamp::now),
742                        });
743                    }
744                }
745            }
746        }
747
748        // Use a similar approach to regular receive, but filtered for this session
749        let start_time = std::time::Instant::now();
750        let timeout_duration = timeout
751            .to_std()
752            .unwrap_or(std::time::Duration::from_secs(30));
753
754        loop {
755            // Try to receive a message for this session
756            let received_message = {
757                let mut storage = self.storage.write().unwrap();
758                if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
759                    // Clean up expired messages
760                    InMemoryProvider::clean_expired_messages(queue_state);
761
762                    // Find first available message for this session
763                    let now = Timestamp::now();
764                    let message_index = queue_state.messages.iter().position(|msg| {
765                        !msg.is_expired()
766                            && msg.is_available()
767                            && msg.session_id.as_ref() == Some(&self.session_id)
768                    });
769
770                    if let Some(index) = message_index {
771                        // Remove message from queue
772                        let mut message = queue_state.messages.remove(index).unwrap();
773
774                        // Generate receipt handle
775                        let receipt = uuid::Uuid::new_v4().to_string();
776
777                        // Calculate visibility timeout
778                        let visibility_timeout = Duration::seconds(30);
779                        let lock_expires_at =
780                            Timestamp::from_datetime(now.as_datetime() + visibility_timeout);
781
782                        // Track delivery
783                        message.delivery_count += 1;
784                        let first_delivered_at = if message.delivery_count == 1 {
785                            now
786                        } else {
787                            message.enqueued_at
788                        };
789
790                        // Add to in-flight
791                        queue_state.in_flight.insert(
792                            receipt.clone(),
793                            InFlightMessage {
794                                message: message.clone(),
795                                receipt_handle: receipt.clone(),
796                                lock_expires_at,
797                            },
798                        );
799
800                        // Build received message
801                        Some(ReceivedMessage {
802                            message_id: message.message_id.clone(),
803                            body: message.body.clone(),
804                            attributes: message.attributes.clone(),
805                            receipt_handle: ReceiptHandle::new(
806                                receipt,
807                                lock_expires_at,
808                                ProviderType::InMemory,
809                            ),
810                            session_id: message.session_id.clone(),
811                            correlation_id: message.correlation_id.clone(),
812                            delivery_count: message.delivery_count,
813                            first_delivered_at,
814                            delivered_at: now,
815                        })
816                    } else {
817                        None
818                    }
819                } else {
820                    None
821                }
822            };
823
824            if let Some(msg) = received_message {
825                return Ok(Some(msg));
826            }
827
828            // Check timeout
829            if start_time.elapsed() >= timeout_duration {
830                return Ok(None);
831            }
832
833            // Brief sleep before retry
834            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
835        }
836    }
837
838    async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
839        // Verify we still hold the session lock
840        {
841            let storage = self.storage.read().unwrap();
842            if let Some(queue_state) = storage.queues.get(&self.queue_name) {
843                if let Some(session_state) = queue_state.sessions.get(&self.session_id) {
844                    if !session_state.is_locked()
845                        || session_state.locked_by.as_ref() != Some(&self.client_id)
846                    {
847                        return Err(QueueError::SessionLocked {
848                            session_id: self.session_id.as_str().to_string(),
849                            locked_until: session_state
850                                .lock_expires_at
851                                .unwrap_or_else(Timestamp::now),
852                        });
853                    }
854                }
855            }
856        }
857
858        // Delegate to storage to remove message
859        let mut storage = self.storage.write().unwrap();
860        if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
861            if queue_state.in_flight.remove(receipt.handle()).is_some() {
862                return Ok(());
863            }
864        }
865
866        Err(QueueError::InvalidReceipt {
867            receipt: receipt.handle().to_string(),
868        })
869    }
870
871    async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
872        // Verify we still hold the session lock
873        {
874            let storage = self.storage.read().unwrap();
875            if let Some(queue_state) = storage.queues.get(&self.queue_name) {
876                if let Some(session_state) = queue_state.sessions.get(&self.session_id) {
877                    if !session_state.is_locked()
878                        || session_state.locked_by.as_ref() != Some(&self.client_id)
879                    {
880                        return Err(QueueError::SessionLocked {
881                            session_id: self.session_id.as_str().to_string(),
882                            locked_until: session_state
883                                .lock_expires_at
884                                .unwrap_or_else(Timestamp::now),
885                        });
886                    }
887                }
888            }
889        }
890
891        // Return message to queue
892        let mut storage = self.storage.write().unwrap();
893        if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
894            if let Some(inflight) = queue_state.in_flight.remove(receipt.handle()) {
895                let mut message = inflight.message;
896
897                // Check if max delivery count reached
898                if message.delivery_count >= queue_state.config.max_delivery_count {
899                    // Move to DLQ if enabled
900                    if queue_state.config.enable_dead_letter_queue {
901                        queue_state.dead_letter.push_back(message);
902                        return Ok(());
903                    }
904                }
905
906                // Make immediately available and add back to front for session ordering
907                message.available_at = Timestamp::now();
908                queue_state.messages.push_front(message);
909                return Ok(());
910            }
911        }
912
913        Err(QueueError::InvalidReceipt {
914            receipt: receipt.handle().to_string(),
915        })
916    }
917
918    async fn dead_letter_message(
919        &self,
920        receipt: &ReceiptHandle,
921        _reason: &str,
922    ) -> Result<(), QueueError> {
923        // Verify we still hold the session lock
924        {
925            let storage = self.storage.read().unwrap();
926            if let Some(queue_state) = storage.queues.get(&self.queue_name) {
927                if let Some(session_state) = queue_state.sessions.get(&self.session_id) {
928                    if !session_state.is_locked()
929                        || session_state.locked_by.as_ref() != Some(&self.client_id)
930                    {
931                        return Err(QueueError::SessionLocked {
932                            session_id: self.session_id.as_str().to_string(),
933                            locked_until: session_state
934                                .lock_expires_at
935                                .unwrap_or_else(Timestamp::now),
936                        });
937                    }
938                }
939            }
940        }
941
942        // Move message to DLQ
943        let mut storage = self.storage.write().unwrap();
944        if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
945            if let Some(inflight) = queue_state.in_flight.remove(receipt.handle()) {
946                queue_state.dead_letter.push_back(inflight.message);
947                return Ok(());
948            }
949        }
950
951        Err(QueueError::InvalidReceipt {
952            receipt: receipt.handle().to_string(),
953        })
954    }
955
956    async fn renew_session_lock(&self) -> Result<(), QueueError> {
957        let mut storage = self.storage.write().unwrap();
958        if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
959            if let Some(session_state) = queue_state.sessions.get_mut(&self.session_id) {
960                // Verify we hold the lock
961                if session_state.locked_by.as_ref() != Some(&self.client_id) {
962                    return Err(QueueError::SessionLocked {
963                        session_id: self.session_id.as_str().to_string(),
964                        locked_until: session_state.lock_expires_at.unwrap_or_else(Timestamp::now),
965                    });
966                }
967
968                // Renew lock
969                let lock_duration = queue_state.config.session_lock_duration;
970                let new_expires_at =
971                    Timestamp::from_datetime(Timestamp::now().as_datetime() + lock_duration);
972                session_state.lock_expires_at = Some(new_expires_at);
973
974                return Ok(());
975            }
976        }
977
978        Err(QueueError::SessionNotFound {
979            session_id: self.session_id.as_str().to_string(),
980        })
981    }
982
983    async fn close_session(&self) -> Result<(), QueueError> {
984        let mut storage = self.storage.write().unwrap();
985        if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
986            if let Some(session_state) = queue_state.sessions.get_mut(&self.session_id) {
987                // Release lock
988                session_state.locked = false;
989                session_state.lock_expires_at = None;
990                session_state.locked_by = None;
991                return Ok(());
992            }
993        }
994
995        Ok(()) // Session already released or doesn't exist - that's fine
996    }
997
998    fn session_id(&self) -> &SessionId {
999        &self.session_id
1000    }
1001
1002    fn session_expires_at(&self) -> Timestamp {
1003        self.lock_expires_at
1004    }
1005}