1use std::{
2 borrow::Borrow,
3 collections::HashMap,
4 hash::Hash,
5};
6
7use crate::{
8 BasicProperties,
9 consumer::Consumer,
10 message::BasicGetMessage,
11 types::ShortString,
12 wait::WaitHandle,
13};
14
15#[derive(Clone, Debug)]
16#[deprecated(note = "use lapin instead")]
17pub struct Queue {
18 name: ShortString,
19 message_count: u32,
20 consumer_count: u32,
21}
22
23impl Queue {
24 #[deprecated(note = "use lapin instead")]
25 pub fn name(&self) -> &ShortString {
26 &self.name
27 }
28
29 #[deprecated(note = "use lapin instead")]
30 pub fn message_count(&self) -> u32 {
31 self.message_count
32 }
33
34 #[deprecated(note = "use lapin instead")]
35 pub fn consumer_count(&self) -> u32 {
36 self.consumer_count
37 }
38}
39
40#[derive(Debug)]
41pub(crate) struct QueueState {
42 name: ShortString,
43 consumers: HashMap<ShortString, Consumer>,
44 current_get_message: Option<(BasicGetMessage, WaitHandle<Option<BasicGetMessage>>)>,
45}
46
47impl Queue {
48 pub(crate) fn new(name: ShortString, message_count: u32, consumer_count: u32) -> Self {
49 Self { name, message_count, consumer_count }
50 }
51}
52
53impl Borrow<str> for Queue {
54 fn borrow(&self) -> &str {
55 self.name.as_str()
56 }
57}
58
59impl QueueState {
60 pub(crate) fn register_consumer(&mut self, consumer_tag: ShortString, consumer: Consumer) {
61 self.consumers.insert(consumer_tag, consumer);
62 }
63
64 pub(crate) fn deregister_consumer<S: Hash + Eq + ?Sized>(&mut self, consumer_tag: &S) -> Option<Consumer> where ShortString: Borrow<S> {
65 self.consumers.remove(consumer_tag)
66 }
67
68 pub(crate) fn get_consumer<S: Hash + Eq + ?Sized>(&mut self, consumer_tag: &S) -> Option<&mut Consumer> where ShortString: Borrow<S> {
69 self.consumers.get_mut(consumer_tag.borrow())
70 }
71
72 pub(crate) fn name(&self) -> ShortString {
73 self.name.clone()
74 }
75
76 pub(crate) fn drop_prefetched_messages(&mut self) {
77 for consumer in self.consumers.values() {
78 consumer.drop_prefetched_messages();
79 }
80 }
81
82 pub(crate) fn start_new_delivery(&mut self, delivery: BasicGetMessage, wait_handle: WaitHandle<Option<BasicGetMessage>>) {
83 self.current_get_message = Some((delivery, wait_handle));
84 }
85
86 pub(crate) fn set_delivery_properties(&mut self, properties: BasicProperties) {
87 if let Some(delivery) = self.current_get_message.as_mut() {
88 delivery.0.delivery.properties = properties;
89 }
90 }
91
92 pub(crate) fn receive_delivery_content(&mut self, payload: Vec<u8>) {
93 if let Some(delivery) = self.current_get_message.as_mut() {
94 delivery.0.delivery.receive_content(payload);
95 }
96 }
97
98 pub(crate) fn new_delivery_complete(&mut self) {
99 if let Some((message, wait_handle)) = self.current_get_message.take() {
100 wait_handle.finish(Some(message));
101 }
102 }
103}
104
105impl From<Queue> for QueueState {
106 fn from(queue: Queue) -> Self {
107 Self {
108 name: queue.name,
109 consumers: HashMap::new(),
110 current_get_message: None,
111 }
112 }
113}