lapin_async/
queue.rs

1use std::{
2  borrow::Borrow,
3  collections::HashMap,
4  hash::Hash,
5};
6
7use crate::{
8  BasicProperties,
9  consumer::Consumer,
10  message::BasicGetMessage,
11  types::ShortString,
12  wait::WaitHandle,
13};
14
15#[derive(Clone, Debug)]
16#[deprecated(note = "use lapin instead")]
17pub struct Queue {
18  name:           ShortString,
19  message_count:  u32,
20  consumer_count: u32,
21}
22
23impl Queue {
24  #[deprecated(note = "use lapin instead")]
25  pub fn name(&self) -> &ShortString {
26    &self.name
27  }
28
29  #[deprecated(note = "use lapin instead")]
30  pub fn message_count(&self) -> u32 {
31    self.message_count
32  }
33
34  #[deprecated(note = "use lapin instead")]
35  pub fn consumer_count(&self) -> u32 {
36    self.consumer_count
37  }
38}
39
40#[derive(Debug)]
41pub(crate) struct QueueState {
42  name:                ShortString,
43  consumers:           HashMap<ShortString, Consumer>,
44  current_get_message: Option<(BasicGetMessage, WaitHandle<Option<BasicGetMessage>>)>,
45}
46
47impl Queue {
48  pub(crate) fn new(name: ShortString, message_count: u32, consumer_count: u32) -> Self {
49    Self { name, message_count, consumer_count }
50  }
51}
52
53impl Borrow<str> for Queue {
54  fn borrow(&self) -> &str {
55    self.name.as_str()
56  }
57}
58
59impl QueueState {
60  pub(crate) fn register_consumer(&mut self, consumer_tag: ShortString, consumer: Consumer) {
61    self.consumers.insert(consumer_tag, consumer);
62  }
63
64  pub(crate) fn deregister_consumer<S: Hash + Eq + ?Sized>(&mut self, consumer_tag: &S) -> Option<Consumer> where ShortString: Borrow<S> {
65    self.consumers.remove(consumer_tag)
66  }
67
68  pub(crate) fn get_consumer<S: Hash + Eq + ?Sized>(&mut self, consumer_tag: &S) -> Option<&mut Consumer> where ShortString: Borrow<S> {
69    self.consumers.get_mut(consumer_tag.borrow())
70  }
71
72  pub(crate) fn name(&self) -> ShortString {
73    self.name.clone()
74  }
75
76  pub(crate) fn drop_prefetched_messages(&mut self) {
77    for consumer in self.consumers.values() {
78      consumer.drop_prefetched_messages();
79    }
80  }
81
82  pub(crate) fn start_new_delivery(&mut self, delivery: BasicGetMessage, wait_handle: WaitHandle<Option<BasicGetMessage>>) {
83    self.current_get_message = Some((delivery, wait_handle));
84  }
85
86  pub(crate) fn set_delivery_properties(&mut self, properties: BasicProperties) {
87    if let Some(delivery) = self.current_get_message.as_mut() {
88      delivery.0.delivery.properties = properties;
89    }
90  }
91
92  pub(crate) fn receive_delivery_content(&mut self, payload: Vec<u8>) {
93    if let Some(delivery) = self.current_get_message.as_mut() {
94      delivery.0.delivery.receive_content(payload);
95    }
96  }
97
98  pub(crate) fn new_delivery_complete(&mut self) {
99    if let Some((message, wait_handle)) = self.current_get_message.take() {
100      wait_handle.finish(Some(message));
101    }
102  }
103}
104
105impl From<Queue> for QueueState {
106  fn from(queue: Queue) -> Self {
107    Self {
108      name:                queue.name,
109      consumers:           HashMap::new(),
110      current_get_message: None,
111    }
112  }
113}