1use crate::client::{QueueProvider, SessionProvider};
15use crate::error::QueueError;
16use crate::message::{
17 Message, MessageId, QueueName, ReceiptHandle, ReceivedMessage, SessionId, Timestamp,
18};
19use crate::provider::{InMemoryConfig, ProviderType, SessionSupport};
20use async_trait::async_trait;
21use bytes::Bytes;
22use chrono::Duration;
23use std::collections::{HashMap, VecDeque};
24use std::sync::{Arc, RwLock};
25
26#[cfg(test)]
27#[path = "memory_tests.rs"]
28mod tests;
29
30struct QueueStorage {
36 queues: HashMap<QueueName, InMemoryQueue>,
37 config: InMemoryConfig,
38}
39
40impl QueueStorage {
41 fn new(config: InMemoryConfig) -> Self {
42 Self {
43 queues: HashMap::new(),
44 config,
45 }
46 }
47
48 fn get_or_create_queue(&mut self, queue_name: &QueueName) -> &mut InMemoryQueue {
50 self.queues
51 .entry(queue_name.clone())
52 .or_insert_with(|| InMemoryQueue::new(self.config.clone()))
53 }
54}
55
56struct InMemoryQueue {
58 messages: VecDeque<StoredMessage>,
60 dead_letter: VecDeque<StoredMessage>,
62 in_flight: HashMap<String, InFlightMessage>,
64 sessions: HashMap<SessionId, SessionState>,
66 config: InMemoryConfig,
68}
69
70impl InMemoryQueue {
71 fn new(config: InMemoryConfig) -> Self {
72 Self {
73 messages: VecDeque::new(),
74 dead_letter: VecDeque::new(),
75 in_flight: HashMap::new(),
76 sessions: HashMap::new(),
77 config,
78 }
79 }
80}
81
82#[derive(Clone)]
84struct StoredMessage {
85 message_id: MessageId,
86 body: Bytes,
87 attributes: HashMap<String, String>,
88 session_id: Option<SessionId>,
89 correlation_id: Option<String>,
90 enqueued_at: Timestamp,
91 delivery_count: u32,
92 available_at: Timestamp,
93 expires_at: Option<Timestamp>,
94}
95
96impl StoredMessage {
97 fn from_message(message: &Message, message_id: MessageId, config: &InMemoryConfig) -> Self {
98 let now = Timestamp::now();
99
100 let ttl = message.time_to_live.or(config.default_message_ttl);
102 let expires_at = ttl.map(|ttl| Timestamp::from_datetime(now.as_datetime() + ttl));
103
104 Self {
105 message_id,
106 body: message.body.clone(),
107 attributes: message.attributes.clone(),
108 session_id: message.session_id.clone(),
109 correlation_id: message.correlation_id.clone(),
110 enqueued_at: now,
111 delivery_count: 0,
112 available_at: now,
113 expires_at,
114 }
115 }
116
117 fn is_expired(&self) -> bool {
119 if let Some(ref expires_at) = self.expires_at {
120 Timestamp::now() >= *expires_at
121 } else {
122 false
123 }
124 }
125
126 fn is_available(&self) -> bool {
128 Timestamp::now() >= self.available_at
129 }
130}
131
132#[allow(dead_code)]
134struct InFlightMessage {
135 message: StoredMessage,
136 receipt_handle: String,
137 lock_expires_at: Timestamp,
138}
139
140#[allow(dead_code)]
141impl InFlightMessage {
142 fn is_expired(&self) -> bool {
143 Timestamp::now() >= self.lock_expires_at
144 }
145}
146
147struct SessionState {
149 locked: bool,
150 lock_expires_at: Option<Timestamp>,
151 locked_by: Option<String>, }
153
154impl SessionState {
155 fn new() -> Self {
156 Self {
157 locked: false,
158 lock_expires_at: None,
159 locked_by: None,
160 }
161 }
162
163 fn is_locked(&self) -> bool {
164 if !self.locked {
165 return false;
166 }
167
168 if let Some(ref expires_at) = self.lock_expires_at {
170 if Timestamp::now() >= *expires_at {
171 return false;
172 }
173 }
174
175 true
176 }
177}
178
179pub struct InMemoryProvider {
185 storage: Arc<RwLock<QueueStorage>>,
186}
187
188impl InMemoryProvider {
189 pub fn new(config: InMemoryConfig) -> Self {
191 Self {
192 storage: Arc::new(RwLock::new(QueueStorage::new(config))),
193 }
194 }
195
196 pub async fn accept_session(
201 &self,
202 queue: &QueueName,
203 session_id: Option<SessionId>,
204 ) -> Result<Box<dyn crate::client::SessionClient>, QueueError> {
205 use crate::client::SessionProvider;
206
207 let provider = self.create_session_client(queue, session_id).await?;
208
209 struct StandardSessionClient {
211 provider: Box<dyn SessionProvider>,
212 }
213
214 #[async_trait]
215 impl crate::client::SessionClient for StandardSessionClient {
216 async fn receive_message(
217 &self,
218 timeout: Duration,
219 ) -> Result<Option<ReceivedMessage>, QueueError> {
220 self.provider.receive_message(timeout).await
221 }
222
223 async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
224 self.provider.complete_message(&receipt).await
225 }
226
227 async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
228 self.provider.abandon_message(&receipt).await
229 }
230
231 async fn dead_letter_message(
232 &self,
233 receipt: ReceiptHandle,
234 reason: String,
235 ) -> Result<(), QueueError> {
236 self.provider.dead_letter_message(&receipt, &reason).await
237 }
238
239 async fn renew_session_lock(&self) -> Result<(), QueueError> {
240 self.provider.renew_session_lock().await
241 }
242
243 async fn close_session(&self) -> Result<(), QueueError> {
244 self.provider.close_session().await
245 }
246
247 fn session_id(&self) -> &SessionId {
248 self.provider.session_id()
249 }
250
251 fn session_expires_at(&self) -> Timestamp {
252 self.provider.session_expires_at()
253 }
254 }
255
256 Ok(Box::new(StandardSessionClient { provider }))
257 }
258
259 fn return_expired_messages(queue: &mut InMemoryQueue) {
261 let now = Timestamp::now();
262 let mut expired_handles = Vec::new();
263
264 for (handle, inflight) in &queue.in_flight {
266 if now >= inflight.lock_expires_at {
267 expired_handles.push(handle.clone());
268 }
269 }
270
271 for handle in expired_handles {
273 if let Some(inflight) = queue.in_flight.remove(&handle) {
274 let mut message = inflight.message;
275 message.available_at = now;
277 queue.messages.push_back(message);
278 }
279 }
280 }
281
282 fn clean_expired_messages(queue: &mut InMemoryQueue) {
284 let mut i = 0;
285 while i < queue.messages.len() {
286 if queue.messages[i].is_expired() {
287 if let Some(expired_msg) = queue.messages.remove(i) {
289 if queue.config.enable_dead_letter_queue {
291 queue.dead_letter.push_back(expired_msg);
292 }
293 }
294 } else {
296 i += 1;
297 }
298 }
299 }
300
301 fn is_session_locked(queue: &InMemoryQueue, session_id: &Option<SessionId>) -> bool {
303 if let Some(ref sid) = session_id {
304 if let Some(session_state) = queue.sessions.get(sid) {
305 return session_state.is_locked();
306 }
307 }
308 false
309 }
310}
311
312impl Default for InMemoryProvider {
313 fn default() -> Self {
314 Self::new(InMemoryConfig::default())
315 }
316}
317
318#[async_trait]
319impl QueueProvider for InMemoryProvider {
320 async fn send_message(
321 &self,
322 queue: &QueueName,
323 message: &Message,
324 ) -> Result<MessageId, QueueError> {
325 let message_size = message.body.len();
327 let max_size = self.provider_type().max_message_size();
328 if message_size > max_size {
329 return Err(QueueError::MessageTooLarge {
330 size: message_size,
331 max_size,
332 });
333 }
334
335 let message_id = MessageId::new();
337
338 let mut storage = self.storage.write().unwrap();
340 let queue_state = storage.get_or_create_queue(queue);
341 let stored_message =
342 StoredMessage::from_message(message, message_id.clone(), &queue_state.config);
343 queue_state.messages.push_back(stored_message);
344
345 Ok(message_id)
346 }
347
348 async fn send_messages(
349 &self,
350 queue: &QueueName,
351 messages: &[Message],
352 ) -> Result<Vec<MessageId>, QueueError> {
353 if messages.len() > self.max_batch_size() as usize {
355 return Err(QueueError::BatchTooLarge {
356 size: messages.len(),
357 max_size: self.max_batch_size() as usize,
358 });
359 }
360
361 let mut message_ids = Vec::with_capacity(messages.len());
363 for message in messages {
364 let message_id = self.send_message(queue, message).await?;
365 message_ids.push(message_id);
366 }
367
368 Ok(message_ids)
369 }
370
371 async fn receive_message(
372 &self,
373 queue: &QueueName,
374 timeout: Duration,
375 ) -> Result<Option<ReceivedMessage>, QueueError> {
376 let start_time = std::time::Instant::now();
377 let timeout_duration = timeout
378 .to_std()
379 .unwrap_or(std::time::Duration::from_secs(30));
380
381 loop {
382 let received_message = {
384 let mut storage = self.storage.write().unwrap();
385 let queue_state = storage.get_or_create_queue(queue);
386
387 Self::return_expired_messages(queue_state);
389
390 Self::clean_expired_messages(queue_state);
392
393 let now = Timestamp::now();
395 let message_index = queue_state.messages.iter().position(|msg| {
396 !msg.is_expired()
397 && msg.is_available()
398 && !Self::is_session_locked(queue_state, &msg.session_id)
399 });
400
401 if let Some(index) = message_index {
402 let mut stored_message = queue_state.messages.remove(index).unwrap();
404
405 if queue_state.config.enable_dead_letter_queue
407 && stored_message.delivery_count >= queue_state.config.max_delivery_count
408 {
409 queue_state.dead_letter.push_back(stored_message);
411 None
412 } else {
413 stored_message.delivery_count += 1;
415
416 let receipt_handle_str = uuid::Uuid::new_v4().to_string();
418 let lock_expires_at =
419 Timestamp::from_datetime(now.as_datetime() + Duration::seconds(30));
420 let receipt_handle = ReceiptHandle::new(
421 receipt_handle_str.clone(),
422 lock_expires_at,
423 ProviderType::InMemory,
424 );
425
426 let received_message = ReceivedMessage {
428 message_id: stored_message.message_id.clone(),
429 body: stored_message.body.clone(),
430 attributes: stored_message.attributes.clone(),
431 session_id: stored_message.session_id.clone(),
432 correlation_id: stored_message.correlation_id.clone(),
433 receipt_handle: receipt_handle.clone(),
434 delivery_count: stored_message.delivery_count,
435 first_delivered_at: stored_message.enqueued_at,
436 delivered_at: now,
437 };
438
439 let inflight = InFlightMessage {
441 message: stored_message,
442 receipt_handle: receipt_handle_str.clone(),
443 lock_expires_at,
444 };
445 queue_state.in_flight.insert(receipt_handle_str, inflight);
446
447 Some(received_message)
448 }
449 } else {
450 None
451 }
452 }; if let Some(msg) = received_message {
455 return Ok(Some(msg));
456 }
457
458 if start_time.elapsed() >= timeout_duration {
460 return Ok(None);
461 }
462
463 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
465 }
466 }
467
468 async fn receive_messages(
469 &self,
470 queue: &QueueName,
471 max_messages: u32,
472 timeout: Duration,
473 ) -> Result<Vec<ReceivedMessage>, QueueError> {
474 let mut messages = Vec::new();
475 let start_time = std::time::Instant::now();
476 let timeout_duration = timeout
477 .to_std()
478 .unwrap_or(std::time::Duration::from_secs(30));
479
480 while messages.len() < max_messages as usize {
481 let remaining_timeout = timeout_duration
482 .checked_sub(start_time.elapsed())
483 .unwrap_or(std::time::Duration::ZERO);
484
485 if remaining_timeout.is_zero() {
486 break;
487 }
488
489 let remaining_duration =
490 Duration::from_std(remaining_timeout).unwrap_or(Duration::zero());
491 let received = self.receive_message(queue, remaining_duration).await?;
492
493 match received {
494 Some(msg) => messages.push(msg),
495 None => break,
496 }
497 }
498
499 Ok(messages)
500 }
501
502 async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
503 let mut storage = self.storage.write().unwrap();
504 let now = Timestamp::now();
505
506 for queue in storage.queues.values_mut() {
508 if let Some(inflight) = queue.in_flight.get(receipt.handle()) {
509 if inflight.lock_expires_at <= now {
511 queue.in_flight.remove(receipt.handle());
512 return Err(QueueError::InvalidReceipt {
513 receipt: receipt.handle().to_string(),
514 });
515 }
516
517 queue.in_flight.remove(receipt.handle());
519 return Ok(());
520 }
521 }
522
523 Err(QueueError::InvalidReceipt {
525 receipt: receipt.handle().to_string(),
526 })
527 }
528
529 async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
530 let mut storage = self.storage.write().unwrap();
531 let now = Timestamp::now();
532
533 for queue in storage.queues.values_mut() {
535 if let Some(inflight) = queue.in_flight.remove(receipt.handle()) {
536 if inflight.lock_expires_at <= now {
538 return Err(QueueError::InvalidReceipt {
539 receipt: receipt.handle().to_string(),
540 });
541 }
542
543 let mut returned_message = inflight.message;
545 returned_message.available_at = now;
546
547 if returned_message.session_id.is_some() {
549 queue.messages.push_front(returned_message);
550 } else {
551 queue.messages.push_back(returned_message);
552 }
553
554 return Ok(());
555 }
556 }
557
558 Err(QueueError::InvalidReceipt {
560 receipt: receipt.handle().to_string(),
561 })
562 }
563
564 async fn dead_letter_message(
565 &self,
566 receipt: &ReceiptHandle,
567 _reason: &str,
568 ) -> Result<(), QueueError> {
569 let mut storage = self.storage.write().unwrap();
570 let now = Timestamp::now();
571
572 for queue in storage.queues.values_mut() {
574 if let Some(inflight) = queue.in_flight.remove(receipt.handle()) {
575 if inflight.lock_expires_at <= now {
577 return Err(QueueError::InvalidReceipt {
578 receipt: receipt.handle().to_string(),
579 });
580 }
581
582 queue.dead_letter.push_back(inflight.message);
583 return Ok(());
584 }
585 }
586
587 Err(QueueError::InvalidReceipt {
589 receipt: receipt.handle().to_string(),
590 })
591 }
592
593 async fn create_session_client(
594 &self,
595 queue: &QueueName,
596 session_id: Option<SessionId>,
597 ) -> Result<Box<dyn SessionProvider>, QueueError> {
598 let target_session_id = if let Some(sid) = session_id {
600 sid
601 } else {
602 let storage = self.storage.read().unwrap();
604 let queue_state =
605 storage
606 .queues
607 .get(queue)
608 .ok_or_else(|| QueueError::QueueNotFound {
609 queue_name: queue.as_str().to_string(),
610 })?;
611
612 let mut sessions_with_messages = std::collections::HashSet::new();
614 for msg in &queue_state.messages {
615 if let Some(ref sid) = msg.session_id {
616 sessions_with_messages.insert(sid.clone());
617 }
618 }
619
620 let mut found_session = None;
622 for sid in sessions_with_messages {
623 let session_state = queue_state.sessions.get(&sid);
624 if session_state.map(|s| !s.is_locked()).unwrap_or(true) {
625 found_session = Some(sid);
627 break;
628 }
629 }
630
631 found_session.ok_or_else(|| QueueError::SessionNotFound {
632 session_id: "<any>".to_string(),
633 })?
634 };
635
636 let mut storage = self.storage.write().unwrap();
638 let queue_state = storage.get_or_create_queue(queue);
639 let config = queue_state.config.clone();
640
641 let session_state = queue_state
643 .sessions
644 .entry(target_session_id.clone())
645 .or_insert_with(SessionState::new);
646
647 if session_state.is_locked() {
648 let locked_until = session_state.lock_expires_at.unwrap_or_else(Timestamp::now);
649 return Err(QueueError::SessionLocked {
650 session_id: target_session_id.as_str().to_string(),
651 locked_until,
652 });
653 }
654
655 let lock_duration = config.session_lock_duration;
657 let now = Timestamp::now();
658 let lock_expires_at = Timestamp::from_datetime(now.as_datetime() + lock_duration);
659 let client_id = uuid::Uuid::new_v4().to_string();
660
661 session_state.locked = true;
662 session_state.lock_expires_at = Some(lock_expires_at);
663 session_state.locked_by = Some(client_id.clone());
664
665 Ok(Box::new(InMemorySessionProvider::new(
667 self.storage.clone(),
668 queue.clone(),
669 target_session_id,
670 client_id,
671 lock_expires_at,
672 )))
673 }
674
675 fn provider_type(&self) -> ProviderType {
676 ProviderType::InMemory
677 }
678
679 fn supports_sessions(&self) -> SessionSupport {
680 SessionSupport::Native
681 }
682
683 fn supports_batching(&self) -> bool {
684 true
685 }
686
687 fn max_batch_size(&self) -> u32 {
688 100
689 }
690}
691
692pub struct InMemorySessionProvider {
698 storage: Arc<RwLock<QueueStorage>>,
699 queue_name: QueueName,
700 session_id: SessionId,
701 client_id: String,
702 lock_expires_at: Timestamp,
703}
704
705impl InMemorySessionProvider {
706 fn new(
707 storage: Arc<RwLock<QueueStorage>>,
708 queue_name: QueueName,
709 session_id: SessionId,
710 client_id: String,
711 lock_expires_at: Timestamp,
712 ) -> Self {
713 Self {
714 storage,
715 queue_name,
716 session_id,
717 client_id,
718 lock_expires_at,
719 }
720 }
721}
722
723#[async_trait]
724impl SessionProvider for InMemorySessionProvider {
725 async fn receive_message(
726 &self,
727 timeout: Duration,
728 ) -> Result<Option<ReceivedMessage>, QueueError> {
729 {
731 let storage = self.storage.read().unwrap();
732 if let Some(queue_state) = storage.queues.get(&self.queue_name) {
733 if let Some(session_state) = queue_state.sessions.get(&self.session_id) {
734 if !session_state.is_locked()
735 || session_state.locked_by.as_ref() != Some(&self.client_id)
736 {
737 return Err(QueueError::SessionLocked {
738 session_id: self.session_id.as_str().to_string(),
739 locked_until: session_state
740 .lock_expires_at
741 .unwrap_or_else(Timestamp::now),
742 });
743 }
744 }
745 }
746 }
747
748 let start_time = std::time::Instant::now();
750 let timeout_duration = timeout
751 .to_std()
752 .unwrap_or(std::time::Duration::from_secs(30));
753
754 loop {
755 let received_message = {
757 let mut storage = self.storage.write().unwrap();
758 if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
759 InMemoryProvider::clean_expired_messages(queue_state);
761
762 let now = Timestamp::now();
764 let message_index = queue_state.messages.iter().position(|msg| {
765 !msg.is_expired()
766 && msg.is_available()
767 && msg.session_id.as_ref() == Some(&self.session_id)
768 });
769
770 if let Some(index) = message_index {
771 let mut message = queue_state.messages.remove(index).unwrap();
773
774 let receipt = uuid::Uuid::new_v4().to_string();
776
777 let visibility_timeout = Duration::seconds(30);
779 let lock_expires_at =
780 Timestamp::from_datetime(now.as_datetime() + visibility_timeout);
781
782 message.delivery_count += 1;
784 let first_delivered_at = if message.delivery_count == 1 {
785 now
786 } else {
787 message.enqueued_at
788 };
789
790 queue_state.in_flight.insert(
792 receipt.clone(),
793 InFlightMessage {
794 message: message.clone(),
795 receipt_handle: receipt.clone(),
796 lock_expires_at,
797 },
798 );
799
800 Some(ReceivedMessage {
802 message_id: message.message_id.clone(),
803 body: message.body.clone(),
804 attributes: message.attributes.clone(),
805 receipt_handle: ReceiptHandle::new(
806 receipt,
807 lock_expires_at,
808 ProviderType::InMemory,
809 ),
810 session_id: message.session_id.clone(),
811 correlation_id: message.correlation_id.clone(),
812 delivery_count: message.delivery_count,
813 first_delivered_at,
814 delivered_at: now,
815 })
816 } else {
817 None
818 }
819 } else {
820 None
821 }
822 };
823
824 if let Some(msg) = received_message {
825 return Ok(Some(msg));
826 }
827
828 if start_time.elapsed() >= timeout_duration {
830 return Ok(None);
831 }
832
833 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
835 }
836 }
837
838 async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
839 {
841 let storage = self.storage.read().unwrap();
842 if let Some(queue_state) = storage.queues.get(&self.queue_name) {
843 if let Some(session_state) = queue_state.sessions.get(&self.session_id) {
844 if !session_state.is_locked()
845 || session_state.locked_by.as_ref() != Some(&self.client_id)
846 {
847 return Err(QueueError::SessionLocked {
848 session_id: self.session_id.as_str().to_string(),
849 locked_until: session_state
850 .lock_expires_at
851 .unwrap_or_else(Timestamp::now),
852 });
853 }
854 }
855 }
856 }
857
858 let mut storage = self.storage.write().unwrap();
860 if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
861 if queue_state.in_flight.remove(receipt.handle()).is_some() {
862 return Ok(());
863 }
864 }
865
866 Err(QueueError::InvalidReceipt {
867 receipt: receipt.handle().to_string(),
868 })
869 }
870
871 async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
872 {
874 let storage = self.storage.read().unwrap();
875 if let Some(queue_state) = storage.queues.get(&self.queue_name) {
876 if let Some(session_state) = queue_state.sessions.get(&self.session_id) {
877 if !session_state.is_locked()
878 || session_state.locked_by.as_ref() != Some(&self.client_id)
879 {
880 return Err(QueueError::SessionLocked {
881 session_id: self.session_id.as_str().to_string(),
882 locked_until: session_state
883 .lock_expires_at
884 .unwrap_or_else(Timestamp::now),
885 });
886 }
887 }
888 }
889 }
890
891 let mut storage = self.storage.write().unwrap();
893 if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
894 if let Some(inflight) = queue_state.in_flight.remove(receipt.handle()) {
895 let mut message = inflight.message;
896
897 if message.delivery_count >= queue_state.config.max_delivery_count {
899 if queue_state.config.enable_dead_letter_queue {
901 queue_state.dead_letter.push_back(message);
902 return Ok(());
903 }
904 }
905
906 message.available_at = Timestamp::now();
908 queue_state.messages.push_front(message);
909 return Ok(());
910 }
911 }
912
913 Err(QueueError::InvalidReceipt {
914 receipt: receipt.handle().to_string(),
915 })
916 }
917
918 async fn dead_letter_message(
919 &self,
920 receipt: &ReceiptHandle,
921 _reason: &str,
922 ) -> Result<(), QueueError> {
923 {
925 let storage = self.storage.read().unwrap();
926 if let Some(queue_state) = storage.queues.get(&self.queue_name) {
927 if let Some(session_state) = queue_state.sessions.get(&self.session_id) {
928 if !session_state.is_locked()
929 || session_state.locked_by.as_ref() != Some(&self.client_id)
930 {
931 return Err(QueueError::SessionLocked {
932 session_id: self.session_id.as_str().to_string(),
933 locked_until: session_state
934 .lock_expires_at
935 .unwrap_or_else(Timestamp::now),
936 });
937 }
938 }
939 }
940 }
941
942 let mut storage = self.storage.write().unwrap();
944 if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
945 if let Some(inflight) = queue_state.in_flight.remove(receipt.handle()) {
946 queue_state.dead_letter.push_back(inflight.message);
947 return Ok(());
948 }
949 }
950
951 Err(QueueError::InvalidReceipt {
952 receipt: receipt.handle().to_string(),
953 })
954 }
955
956 async fn renew_session_lock(&self) -> Result<(), QueueError> {
957 let mut storage = self.storage.write().unwrap();
958 if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
959 if let Some(session_state) = queue_state.sessions.get_mut(&self.session_id) {
960 if session_state.locked_by.as_ref() != Some(&self.client_id) {
962 return Err(QueueError::SessionLocked {
963 session_id: self.session_id.as_str().to_string(),
964 locked_until: session_state.lock_expires_at.unwrap_or_else(Timestamp::now),
965 });
966 }
967
968 let lock_duration = queue_state.config.session_lock_duration;
970 let new_expires_at =
971 Timestamp::from_datetime(Timestamp::now().as_datetime() + lock_duration);
972 session_state.lock_expires_at = Some(new_expires_at);
973
974 return Ok(());
975 }
976 }
977
978 Err(QueueError::SessionNotFound {
979 session_id: self.session_id.as_str().to_string(),
980 })
981 }
982
983 async fn close_session(&self) -> Result<(), QueueError> {
984 let mut storage = self.storage.write().unwrap();
985 if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
986 if let Some(session_state) = queue_state.sessions.get_mut(&self.session_id) {
987 session_state.locked = false;
989 session_state.lock_expires_at = None;
990 session_state.locked_by = None;
991 return Ok(());
992 }
993 }
994
995 Ok(()) }
997
998 fn session_id(&self) -> &SessionId {
999 &self.session_id
1000 }
1001
1002 fn session_expires_at(&self) -> Timestamp {
1003 self.lock_expires_at
1004 }
1005}