1use crate::client::{QueueProvider, SessionProvider};
15use crate::error::QueueError;
16use crate::message::{
17 Message, MessageId, QueueName, ReceiptHandle, ReceivedMessage, SessionId, Timestamp,
18};
19use crate::provider::{InMemoryConfig, ProviderType, SessionSupport};
20use async_trait::async_trait;
21use bytes::Bytes;
22use chrono::Duration;
23use std::collections::{HashMap, VecDeque};
24use std::sync::{Arc, RwLock};
25
26#[cfg(test)]
27#[path = "memory_tests.rs"]
28mod tests;
29
30struct QueueStorage {
36 queues: HashMap<QueueName, InMemoryQueue>,
37 config: InMemoryConfig,
38}
39
40impl QueueStorage {
41 fn new(config: InMemoryConfig) -> Self {
42 Self {
43 queues: HashMap::new(),
44 config,
45 }
46 }
47
48 fn get_or_create_queue(&mut self, queue_name: &QueueName) -> &mut InMemoryQueue {
50 self.queues
51 .entry(queue_name.clone())
52 .or_insert_with(|| InMemoryQueue::new(self.config.clone()))
53 }
54}
55
56struct InMemoryQueue {
58 messages: VecDeque<StoredMessage>,
60 dead_letter: VecDeque<StoredMessage>,
62 in_flight: HashMap<String, InFlightMessage>,
64 sessions: HashMap<SessionId, SessionState>,
66 config: InMemoryConfig,
68}
69
70impl InMemoryQueue {
71 fn new(config: InMemoryConfig) -> Self {
72 Self {
73 messages: VecDeque::new(),
74 dead_letter: VecDeque::new(),
75 in_flight: HashMap::new(),
76 sessions: HashMap::new(),
77 config,
78 }
79 }
80}
81
82#[derive(Clone)]
84struct StoredMessage {
85 message_id: MessageId,
86 body: Bytes,
87 attributes: HashMap<String, String>,
88 session_id: Option<SessionId>,
89 correlation_id: Option<String>,
90 enqueued_at: Timestamp,
91 delivery_count: u32,
92 available_at: Timestamp,
93 expires_at: Option<Timestamp>,
94}
95
96impl StoredMessage {
97 fn from_message(message: &Message, message_id: MessageId, config: &InMemoryConfig) -> Self {
98 let now = Timestamp::now();
99
100 let ttl = message.time_to_live.or(config.default_message_ttl);
102 let expires_at = ttl.map(|ttl| Timestamp::from_datetime(now.as_datetime() + ttl));
103
104 Self {
105 message_id,
106 body: message.body.clone(),
107 attributes: message.attributes.clone(),
108 session_id: message.session_id.clone(),
109 correlation_id: message.correlation_id.clone(),
110 enqueued_at: now,
111 delivery_count: 0,
112 available_at: now,
113 expires_at,
114 }
115 }
116
117 fn is_expired(&self) -> bool {
119 if let Some(ref expires_at) = self.expires_at {
120 Timestamp::now() >= *expires_at
121 } else {
122 false
123 }
124 }
125
126 fn is_available(&self) -> bool {
128 Timestamp::now() >= self.available_at
129 }
130}
131
132#[allow(dead_code)]
134struct InFlightMessage {
135 message: StoredMessage,
136 receipt_handle: String,
137 lock_expires_at: Timestamp,
138}
139
140#[allow(dead_code)]
141impl InFlightMessage {
142 fn is_expired(&self) -> bool {
143 Timestamp::now() >= self.lock_expires_at
144 }
145}
146
147struct SessionState {
149 locked: bool,
150 lock_expires_at: Option<Timestamp>,
151 locked_by: Option<String>, }
153
154impl SessionState {
155 fn new() -> Self {
156 Self {
157 locked: false,
158 lock_expires_at: None,
159 locked_by: None,
160 }
161 }
162
163 fn is_locked(&self) -> bool {
164 if !self.locked {
165 return false;
166 }
167
168 if let Some(ref expires_at) = self.lock_expires_at {
170 if Timestamp::now() >= *expires_at {
171 return false;
172 }
173 }
174
175 true
176 }
177}
178
179pub struct InMemoryProvider {
185 storage: Arc<RwLock<QueueStorage>>,
186}
187
188impl InMemoryProvider {
189 pub fn new(config: InMemoryConfig) -> Self {
191 Self {
192 storage: Arc::new(RwLock::new(QueueStorage::new(config))),
193 }
194 }
195
196 pub async fn accept_session(
201 &self,
202 queue: &QueueName,
203 session_id: Option<SessionId>,
204 ) -> Result<Box<dyn crate::client::SessionClient>, QueueError> {
205 use crate::client::SessionProvider;
206
207 let provider = self.create_session_client(queue, session_id).await?;
208
209 struct StandardSessionClient {
211 provider: Box<dyn SessionProvider>,
212 }
213
214 #[async_trait]
215 impl crate::client::SessionClient for StandardSessionClient {
216 async fn receive_message(
217 &self,
218 timeout: Duration,
219 ) -> Result<Option<ReceivedMessage>, QueueError> {
220 self.provider.receive_message(timeout).await
221 }
222
223 async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
224 self.provider.complete_message(&receipt).await
225 }
226
227 async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
228 self.provider.abandon_message(&receipt).await
229 }
230
231 async fn dead_letter_message(
232 &self,
233 receipt: ReceiptHandle,
234 reason: String,
235 ) -> Result<(), QueueError> {
236 self.provider.dead_letter_message(&receipt, &reason).await
237 }
238
239 async fn renew_session_lock(&self) -> Result<(), QueueError> {
240 self.provider.renew_session_lock().await
241 }
242
243 async fn close_session(&self) -> Result<(), QueueError> {
244 self.provider.close_session().await
245 }
246
247 fn session_id(&self) -> &SessionId {
248 self.provider.session_id()
249 }
250
251 fn session_expires_at(&self) -> Timestamp {
252 self.provider.session_expires_at()
253 }
254 }
255
256 Ok(Box::new(StandardSessionClient { provider }))
257 }
258
259 fn return_expired_messages(queue: &mut InMemoryQueue) {
261 let now = Timestamp::now();
262 let mut expired_handles = Vec::new();
263
264 for (handle, inflight) in &queue.in_flight {
266 if now >= inflight.lock_expires_at {
267 expired_handles.push(handle.clone());
268 }
269 }
270
271 for handle in expired_handles {
273 if let Some(inflight) = queue.in_flight.remove(&handle) {
274 let mut message = inflight.message;
275 message.available_at = now;
277 queue.messages.push_back(message);
278 }
279 }
280 }
281
282 fn clean_expired_messages(queue: &mut InMemoryQueue) {
284 let mut i = 0;
285 while i < queue.messages.len() {
286 if queue.messages[i].is_expired() {
287 if let Some(expired_msg) = queue.messages.remove(i) {
289 if queue.config.enable_dead_letter_queue {
291 queue.dead_letter.push_back(expired_msg);
292 }
293 }
294 } else {
296 i += 1;
297 }
298 }
299 }
300
301 fn is_session_locked(queue: &InMemoryQueue, session_id: &Option<SessionId>) -> bool {
303 if let Some(ref sid) = session_id {
304 if let Some(session_state) = queue.sessions.get(sid) {
305 return session_state.is_locked();
306 }
307 }
308 false
309 }
310}
311
312impl Default for InMemoryProvider {
313 fn default() -> Self {
314 Self::new(InMemoryConfig::default())
315 }
316}
317
318#[async_trait]
319impl QueueProvider for InMemoryProvider {
320 async fn send_message(
321 &self,
322 queue: &QueueName,
323 message: &Message,
324 ) -> Result<MessageId, QueueError> {
325 let message_size = message.body.len();
327 let max_size = self.provider_type().max_message_size();
328 if message_size > max_size {
329 return Err(QueueError::MessageTooLarge {
330 size: message_size,
331 max_size,
332 });
333 }
334
335 let message_id = MessageId::new();
337
338 let mut storage = self.storage.write().unwrap();
340 let queue_state = storage.get_or_create_queue(queue);
341 let stored_message =
342 StoredMessage::from_message(message, message_id.clone(), &queue_state.config);
343 queue_state.messages.push_back(stored_message);
344
345 Ok(message_id)
346 }
347
348 async fn send_messages(
349 &self,
350 queue: &QueueName,
351 messages: &[Message],
352 ) -> Result<Vec<MessageId>, QueueError> {
353 if messages.len() > self.max_batch_size() as usize {
355 return Err(QueueError::BatchTooLarge {
356 size: messages.len(),
357 max_size: self.max_batch_size() as usize,
358 });
359 }
360
361 let mut message_ids = Vec::with_capacity(messages.len());
363 for message in messages {
364 let message_id = self.send_message(queue, message).await?;
365 message_ids.push(message_id);
366 }
367
368 Ok(message_ids)
369 }
370
371 async fn receive_message(
372 &self,
373 queue: &QueueName,
374 timeout: Duration,
375 ) -> Result<Option<ReceivedMessage>, QueueError> {
376 let start_time = std::time::Instant::now();
377 let timeout_duration = timeout
378 .to_std()
379 .unwrap_or(std::time::Duration::from_secs(30));
380
381 loop {
382 let received_message = {
384 let mut storage = self.storage.write().unwrap();
385 let queue_state = storage.get_or_create_queue(queue);
386
387 Self::return_expired_messages(queue_state);
389
390 Self::clean_expired_messages(queue_state);
392
393 let now = Timestamp::now();
395 let message_index = queue_state.messages.iter().position(|msg| {
396 !msg.is_expired()
397 && msg.is_available()
398 && !Self::is_session_locked(queue_state, &msg.session_id)
399 });
400
401 if let Some(index) = message_index {
402 let mut stored_message = queue_state.messages.remove(index).unwrap();
404
405 if queue_state.config.enable_dead_letter_queue
407 && stored_message.delivery_count >= queue_state.config.max_delivery_count
408 {
409 queue_state.dead_letter.push_back(stored_message);
411 None
412 } else {
413 stored_message.delivery_count += 1;
415
416 let receipt_handle_str = uuid::Uuid::new_v4().to_string();
418 let lock_expires_at =
419 Timestamp::from_datetime(now.as_datetime() + Duration::seconds(30));
420 let receipt_handle = ReceiptHandle::new(
421 receipt_handle_str.clone(),
422 lock_expires_at,
423 ProviderType::InMemory,
424 );
425
426 let received_message = ReceivedMessage {
428 message_id: stored_message.message_id.clone(),
429 body: stored_message.body.clone(),
430 attributes: stored_message.attributes.clone(),
431 session_id: stored_message.session_id.clone(),
432 correlation_id: stored_message.correlation_id.clone(),
433 receipt_handle: receipt_handle.clone(),
434 delivery_count: stored_message.delivery_count,
435 first_delivered_at: stored_message.enqueued_at,
436 delivered_at: now,
437 };
438
439 let inflight = InFlightMessage {
441 message: stored_message,
442 receipt_handle: receipt_handle_str.clone(),
443 lock_expires_at,
444 };
445 queue_state.in_flight.insert(receipt_handle_str, inflight);
446
447 Some(received_message)
448 }
449 } else {
450 None
451 }
452 }; if let Some(msg) = received_message {
455 return Ok(Some(msg));
456 }
457
458 if start_time.elapsed() >= timeout_duration {
460 return Ok(None);
461 }
462
463 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
465 }
466 }
467
468 async fn receive_messages(
469 &self,
470 queue: &QueueName,
471 max_messages: u32,
472 timeout: Duration,
473 ) -> Result<Vec<ReceivedMessage>, QueueError> {
474 let mut messages = Vec::new();
475 let start_time = std::time::Instant::now();
476 let timeout_duration = timeout
477 .to_std()
478 .unwrap_or(std::time::Duration::from_secs(30));
479
480 while messages.len() < max_messages as usize {
481 let remaining_timeout = timeout_duration
482 .checked_sub(start_time.elapsed())
483 .unwrap_or(std::time::Duration::ZERO);
484
485 if remaining_timeout.is_zero() {
486 break;
487 }
488
489 let remaining_duration =
490 Duration::from_std(remaining_timeout).unwrap_or(Duration::zero());
491 let received = self.receive_message(queue, remaining_duration).await?;
492
493 match received {
494 Some(msg) => messages.push(msg),
495 None => break,
496 }
497 }
498
499 Ok(messages)
500 }
501
502 async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
503 let mut storage = self.storage.write().unwrap();
504 let now = Timestamp::now();
505
506 for queue in storage.queues.values_mut() {
508 if let Some(inflight) = queue.in_flight.get(receipt.handle()) {
509 if inflight.lock_expires_at <= now {
511 queue.in_flight.remove(receipt.handle());
512 return Err(QueueError::MessageNotFound {
513 receipt: receipt.handle().to_string(),
514 });
515 }
516
517 queue.in_flight.remove(receipt.handle());
519 return Ok(());
520 }
521 }
522
523 Err(QueueError::MessageNotFound {
525 receipt: receipt.handle().to_string(),
526 })
527 }
528
529 async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
530 let mut storage = self.storage.write().unwrap();
531 let now = Timestamp::now();
532
533 for queue in storage.queues.values_mut() {
535 if let Some(inflight) = queue.in_flight.remove(receipt.handle()) {
536 if inflight.lock_expires_at <= now {
538 return Err(QueueError::MessageNotFound {
539 receipt: receipt.handle().to_string(),
540 });
541 }
542
543 let mut returned_message = inflight.message;
545 returned_message.available_at = now;
546
547 if returned_message.session_id.is_some() {
549 queue.messages.push_front(returned_message);
550 } else {
551 queue.messages.push_back(returned_message);
552 }
553
554 return Ok(());
555 }
556 }
557
558 Err(QueueError::MessageNotFound {
560 receipt: receipt.handle().to_string(),
561 })
562 }
563
564 async fn dead_letter_message(
565 &self,
566 _receipt: &ReceiptHandle,
567 _reason: &str,
568 ) -> Result<(), QueueError> {
569 unimplemented!("dead_letter_message will be implemented in subtask 10.4")
571 }
572
573 async fn create_session_client(
574 &self,
575 queue: &QueueName,
576 session_id: Option<SessionId>,
577 ) -> Result<Box<dyn SessionProvider>, QueueError> {
578 let target_session_id = if let Some(sid) = session_id {
580 sid
581 } else {
582 let storage = self.storage.read().unwrap();
584 let queue_state =
585 storage
586 .queues
587 .get(queue)
588 .ok_or_else(|| QueueError::QueueNotFound {
589 queue_name: queue.as_str().to_string(),
590 })?;
591
592 let mut sessions_with_messages = std::collections::HashSet::new();
594 for msg in &queue_state.messages {
595 if let Some(ref sid) = msg.session_id {
596 sessions_with_messages.insert(sid.clone());
597 }
598 }
599
600 let mut found_session = None;
602 for sid in sessions_with_messages {
603 let session_state = queue_state.sessions.get(&sid);
604 if session_state.map(|s| !s.is_locked()).unwrap_or(true) {
605 found_session = Some(sid);
607 break;
608 }
609 }
610
611 found_session.ok_or_else(|| QueueError::SessionNotFound {
612 session_id: "<any>".to_string(),
613 })?
614 };
615
616 let mut storage = self.storage.write().unwrap();
618 let queue_state = storage.get_or_create_queue(queue);
619 let config = queue_state.config.clone();
620
621 let session_state = queue_state
623 .sessions
624 .entry(target_session_id.clone())
625 .or_insert_with(SessionState::new);
626
627 if session_state.is_locked() {
628 let locked_until = session_state.lock_expires_at.unwrap_or_else(Timestamp::now);
629 return Err(QueueError::SessionLocked {
630 session_id: target_session_id.as_str().to_string(),
631 locked_until,
632 });
633 }
634
635 let lock_duration = config.session_lock_duration;
637 let now = Timestamp::now();
638 let lock_expires_at = Timestamp::from_datetime(now.as_datetime() + lock_duration);
639 let client_id = uuid::Uuid::new_v4().to_string();
640
641 session_state.locked = true;
642 session_state.lock_expires_at = Some(lock_expires_at);
643 session_state.locked_by = Some(client_id.clone());
644
645 Ok(Box::new(InMemorySessionProvider::new(
647 self.storage.clone(),
648 queue.clone(),
649 target_session_id,
650 client_id,
651 lock_expires_at,
652 )))
653 }
654
655 fn provider_type(&self) -> ProviderType {
656 ProviderType::InMemory
657 }
658
659 fn supports_sessions(&self) -> SessionSupport {
660 SessionSupport::Native
661 }
662
663 fn supports_batching(&self) -> bool {
664 true
665 }
666
667 fn max_batch_size(&self) -> u32 {
668 100
669 }
670}
671
672pub struct InMemorySessionProvider {
678 storage: Arc<RwLock<QueueStorage>>,
679 queue_name: QueueName,
680 session_id: SessionId,
681 client_id: String,
682 lock_expires_at: Timestamp,
683}
684
685impl InMemorySessionProvider {
686 fn new(
687 storage: Arc<RwLock<QueueStorage>>,
688 queue_name: QueueName,
689 session_id: SessionId,
690 client_id: String,
691 lock_expires_at: Timestamp,
692 ) -> Self {
693 Self {
694 storage,
695 queue_name,
696 session_id,
697 client_id,
698 lock_expires_at,
699 }
700 }
701}
702
703#[async_trait]
704impl SessionProvider for InMemorySessionProvider {
705 async fn receive_message(
706 &self,
707 timeout: Duration,
708 ) -> Result<Option<ReceivedMessage>, QueueError> {
709 {
711 let storage = self.storage.read().unwrap();
712 if let Some(queue_state) = storage.queues.get(&self.queue_name) {
713 if let Some(session_state) = queue_state.sessions.get(&self.session_id) {
714 if !session_state.is_locked()
715 || session_state.locked_by.as_ref() != Some(&self.client_id)
716 {
717 return Err(QueueError::SessionLocked {
718 session_id: self.session_id.as_str().to_string(),
719 locked_until: session_state
720 .lock_expires_at
721 .unwrap_or_else(Timestamp::now),
722 });
723 }
724 }
725 }
726 }
727
728 let start_time = std::time::Instant::now();
730 let timeout_duration = timeout
731 .to_std()
732 .unwrap_or(std::time::Duration::from_secs(30));
733
734 loop {
735 let received_message = {
737 let mut storage = self.storage.write().unwrap();
738 if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
739 InMemoryProvider::clean_expired_messages(queue_state);
741
742 let now = Timestamp::now();
744 let message_index = queue_state.messages.iter().position(|msg| {
745 !msg.is_expired()
746 && msg.is_available()
747 && msg.session_id.as_ref() == Some(&self.session_id)
748 });
749
750 if let Some(index) = message_index {
751 let mut message = queue_state.messages.remove(index).unwrap();
753
754 let receipt = uuid::Uuid::new_v4().to_string();
756
757 let visibility_timeout = Duration::seconds(30);
759 let lock_expires_at =
760 Timestamp::from_datetime(now.as_datetime() + visibility_timeout);
761
762 message.delivery_count += 1;
764 let first_delivered_at = if message.delivery_count == 1 {
765 now
766 } else {
767 message.enqueued_at
768 };
769
770 queue_state.in_flight.insert(
772 receipt.clone(),
773 InFlightMessage {
774 message: message.clone(),
775 receipt_handle: receipt.clone(),
776 lock_expires_at,
777 },
778 );
779
780 Some(ReceivedMessage {
782 message_id: message.message_id.clone(),
783 body: message.body.clone(),
784 attributes: message.attributes.clone(),
785 receipt_handle: ReceiptHandle::new(
786 receipt,
787 lock_expires_at,
788 ProviderType::InMemory,
789 ),
790 session_id: message.session_id.clone(),
791 correlation_id: message.correlation_id.clone(),
792 delivery_count: message.delivery_count,
793 first_delivered_at,
794 delivered_at: now,
795 })
796 } else {
797 None
798 }
799 } else {
800 None
801 }
802 };
803
804 if let Some(msg) = received_message {
805 return Ok(Some(msg));
806 }
807
808 if start_time.elapsed() >= timeout_duration {
810 return Ok(None);
811 }
812
813 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
815 }
816 }
817
818 async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
819 {
821 let storage = self.storage.read().unwrap();
822 if let Some(queue_state) = storage.queues.get(&self.queue_name) {
823 if let Some(session_state) = queue_state.sessions.get(&self.session_id) {
824 if !session_state.is_locked()
825 || session_state.locked_by.as_ref() != Some(&self.client_id)
826 {
827 return Err(QueueError::SessionLocked {
828 session_id: self.session_id.as_str().to_string(),
829 locked_until: session_state
830 .lock_expires_at
831 .unwrap_or_else(Timestamp::now),
832 });
833 }
834 }
835 }
836 }
837
838 let mut storage = self.storage.write().unwrap();
840 if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
841 if queue_state.in_flight.remove(receipt.handle()).is_some() {
842 return Ok(());
843 }
844 }
845
846 Err(QueueError::MessageNotFound {
847 receipt: receipt.handle().to_string(),
848 })
849 }
850
851 async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
852 {
854 let storage = self.storage.read().unwrap();
855 if let Some(queue_state) = storage.queues.get(&self.queue_name) {
856 if let Some(session_state) = queue_state.sessions.get(&self.session_id) {
857 if !session_state.is_locked()
858 || session_state.locked_by.as_ref() != Some(&self.client_id)
859 {
860 return Err(QueueError::SessionLocked {
861 session_id: self.session_id.as_str().to_string(),
862 locked_until: session_state
863 .lock_expires_at
864 .unwrap_or_else(Timestamp::now),
865 });
866 }
867 }
868 }
869 }
870
871 let mut storage = self.storage.write().unwrap();
873 if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
874 if let Some(inflight) = queue_state.in_flight.remove(receipt.handle()) {
875 let mut message = inflight.message;
876
877 if message.delivery_count >= queue_state.config.max_delivery_count {
879 if queue_state.config.enable_dead_letter_queue {
881 queue_state.dead_letter.push_back(message);
882 return Ok(());
883 }
884 }
885
886 message.available_at = Timestamp::now();
888 queue_state.messages.push_front(message);
889 return Ok(());
890 }
891 }
892
893 Err(QueueError::MessageNotFound {
894 receipt: receipt.handle().to_string(),
895 })
896 }
897
898 async fn dead_letter_message(
899 &self,
900 receipt: &ReceiptHandle,
901 _reason: &str,
902 ) -> Result<(), QueueError> {
903 {
905 let storage = self.storage.read().unwrap();
906 if let Some(queue_state) = storage.queues.get(&self.queue_name) {
907 if let Some(session_state) = queue_state.sessions.get(&self.session_id) {
908 if !session_state.is_locked()
909 || session_state.locked_by.as_ref() != Some(&self.client_id)
910 {
911 return Err(QueueError::SessionLocked {
912 session_id: self.session_id.as_str().to_string(),
913 locked_until: session_state
914 .lock_expires_at
915 .unwrap_or_else(Timestamp::now),
916 });
917 }
918 }
919 }
920 }
921
922 let mut storage = self.storage.write().unwrap();
924 if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
925 if let Some(inflight) = queue_state.in_flight.remove(receipt.handle()) {
926 queue_state.dead_letter.push_back(inflight.message);
927 return Ok(());
928 }
929 }
930
931 Err(QueueError::MessageNotFound {
932 receipt: receipt.handle().to_string(),
933 })
934 }
935
936 async fn renew_session_lock(&self) -> Result<(), QueueError> {
937 let mut storage = self.storage.write().unwrap();
938 if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
939 if let Some(session_state) = queue_state.sessions.get_mut(&self.session_id) {
940 if session_state.locked_by.as_ref() != Some(&self.client_id) {
942 return Err(QueueError::SessionLocked {
943 session_id: self.session_id.as_str().to_string(),
944 locked_until: session_state.lock_expires_at.unwrap_or_else(Timestamp::now),
945 });
946 }
947
948 let lock_duration = queue_state.config.session_lock_duration;
950 let new_expires_at =
951 Timestamp::from_datetime(Timestamp::now().as_datetime() + lock_duration);
952 session_state.lock_expires_at = Some(new_expires_at);
953
954 return Ok(());
955 }
956 }
957
958 Err(QueueError::SessionNotFound {
959 session_id: self.session_id.as_str().to_string(),
960 })
961 }
962
963 async fn close_session(&self) -> Result<(), QueueError> {
964 let mut storage = self.storage.write().unwrap();
965 if let Some(queue_state) = storage.queues.get_mut(&self.queue_name) {
966 if let Some(session_state) = queue_state.sessions.get_mut(&self.session_id) {
967 session_state.locked = false;
969 session_state.lock_expires_at = None;
970 session_state.locked_by = None;
971 return Ok(());
972 }
973 }
974
975 Ok(()) }
977
978 fn session_id(&self) -> &SessionId {
979 &self.session_id
980 }
981
982 fn session_expires_at(&self) -> Timestamp {
983 self.lock_expires_at
984 }
985}