use std::{
borrow::Borrow,
collections::HashMap,
hash::Hash,
};
use crate::{
BasicProperties,
consumer::Consumer,
message::BasicGetMessage,
types::ShortString,
wait::WaitHandle,
};
#[derive(Clone, Debug)]
#[deprecated(note = "use lapin instead")]
pub struct Queue {
name: ShortString,
message_count: u32,
consumer_count: u32,
}
impl Queue {
#[deprecated(note = "use lapin instead")]
pub fn name(&self) -> &ShortString {
&self.name
}
#[deprecated(note = "use lapin instead")]
pub fn message_count(&self) -> u32 {
self.message_count
}
#[deprecated(note = "use lapin instead")]
pub fn consumer_count(&self) -> u32 {
self.consumer_count
}
}
#[derive(Debug)]
pub(crate) struct QueueState {
name: ShortString,
consumers: HashMap<ShortString, Consumer>,
current_get_message: Option<(BasicGetMessage, WaitHandle<Option<BasicGetMessage>>)>,
}
impl Queue {
pub(crate) fn new(name: ShortString, message_count: u32, consumer_count: u32) -> Self {
Self { name, message_count, consumer_count }
}
}
impl Borrow<str> for Queue {
fn borrow(&self) -> &str {
self.name.as_str()
}
}
impl QueueState {
pub(crate) fn register_consumer(&mut self, consumer_tag: ShortString, consumer: Consumer) {
self.consumers.insert(consumer_tag, consumer);
}
pub(crate) fn deregister_consumer<S: Hash + Eq + ?Sized>(&mut self, consumer_tag: &S) -> Option<Consumer> where ShortString: Borrow<S> {
self.consumers.remove(consumer_tag)
}
pub(crate) fn get_consumer<S: Hash + Eq + ?Sized>(&mut self, consumer_tag: &S) -> Option<&mut Consumer> where ShortString: Borrow<S> {
self.consumers.get_mut(consumer_tag.borrow())
}
pub(crate) fn name(&self) -> ShortString {
self.name.clone()
}
pub(crate) fn drop_prefetched_messages(&mut self) {
for consumer in self.consumers.values() {
consumer.drop_prefetched_messages();
}
}
pub(crate) fn start_new_delivery(&mut self, delivery: BasicGetMessage, wait_handle: WaitHandle<Option<BasicGetMessage>>) {
self.current_get_message = Some((delivery, wait_handle));
}
pub(crate) fn set_delivery_properties(&mut self, properties: BasicProperties) {
if let Some(delivery) = self.current_get_message.as_mut() {
delivery.0.delivery.properties = properties;
}
}
pub(crate) fn receive_delivery_content(&mut self, payload: Vec<u8>) {
if let Some(delivery) = self.current_get_message.as_mut() {
delivery.0.delivery.receive_content(payload);
}
}
pub(crate) fn new_delivery_complete(&mut self) {
if let Some((message, wait_handle)) = self.current_get_message.take() {
wait_handle.finish(Some(message));
}
}
}
impl From<Queue> for QueueState {
fn from(queue: Queue) -> Self {
Self {
name: queue.name,
consumers: HashMap::new(),
current_get_message: None,
}
}
}