lapin 0.36.2

AMQP client library
Documentation
use crate::{
    consumer::Consumer,
    message::{BasicGetMessage, Delivery},
    pinky_swear::Pinky,
    queue::{Queue, QueueState},
    types::ShortString,
    BasicProperties, Error, Result,
};
use parking_lot::Mutex;
use std::{collections::HashMap, sync::Arc};

#[derive(Clone, Debug, Default)]
pub(crate) struct Queues {
    queues: Arc<Mutex<HashMap<ShortString, QueueState>>>,
}

impl Queues {
    pub(crate) fn register(&self, queue: QueueState) {
        self.queues.lock().insert(queue.name(), queue);
    }

    pub(crate) fn deregister(&self, queue: &str) {
        self.queues.lock().remove(queue);
    }

    fn with_queue<F: FnOnce(&mut QueueState) -> Result<()>>(
        &self,
        queue: &str,
        f: F,
    ) -> Result<()> {
        f(self
            .queues
            .lock()
            .entry(queue.into())
            .or_insert_with(|| Queue::new(queue.into(), 0, 0).into()))
    }

    pub(crate) fn register_consumer(
        &self,
        queue: &str,
        consumer_tag: ShortString,
        consumer: Consumer,
    ) {
        self.with_queue(queue, |queue| {
            queue.register_consumer(consumer_tag, consumer);
            Ok(())
        })
        .expect("register_consumer cannot fail");
    }

    pub(crate) fn deregister_consumer(&self, consumer_tag: &str) -> Result<()> {
        self.queues
            .lock()
            .values_mut()
            .map(|queue| queue.deregister_consumer(consumer_tag))
            .fold(Ok(()), Result::and)
    }

    pub(crate) fn drop_prefetched_messages(&self) -> Result<()> {
        self.queues
            .lock()
            .values_mut()
            .map(QueueState::drop_prefetched_messages)
            .fold(Ok(()), Result::and)
    }

    pub(crate) fn cancel_consumers(&self) -> Result<()> {
        self.queues
            .lock()
            .values_mut()
            .map(QueueState::cancel_consumers)
            .fold(Ok(()), Result::and)
    }

    pub(crate) fn error_consumers(&self, error: Error) -> Result<()> {
        self.queues
            .lock()
            .values_mut()
            .map(|queue| queue.error_consumers(error.clone()))
            .fold(Ok(()), Result::and)
    }

    pub(crate) fn start_consumer_delivery(
        &self,
        consumer_tag: &str,
        message: Delivery,
    ) -> Option<ShortString> {
        for queue in self.queues.lock().values_mut() {
            if let Some(consumer) = queue.get_consumer(consumer_tag) {
                consumer.start_new_delivery(message);
                return Some(queue.name());
            }
        }
        None
    }

    pub(crate) fn start_basic_get_delivery(
        &self,
        queue: &str,
        message: BasicGetMessage,
        pinky: Pinky<Result<Option<BasicGetMessage>>>,
    ) {
        self.with_queue(queue, |queue| {
            queue.start_new_delivery(message, pinky);
            Ok(())
        })
        .expect("start_basic_get_delivery cannot fail");
    }

    pub(crate) fn handle_content_header_frame(
        &self,
        queue: &str,
        consumer_tag: Option<ShortString>,
        size: u64,
        properties: BasicProperties,
    ) -> Result<()> {
        self.with_queue(queue, |queue| {
            match consumer_tag {
                Some(consumer_tag) => {
                    if let Some(consumer) = queue.get_consumer(&consumer_tag) {
                        consumer.set_delivery_properties(properties);
                        if size == 0 {
                            consumer.new_delivery_complete()?;
                        }
                    }
                }
                None => {
                    queue.set_delivery_properties(properties);
                    if size == 0 {
                        queue.new_delivery_complete();
                    }
                }
            }
            Ok(())
        })
    }

    pub(crate) fn handle_body_frame(
        &self,
        queue: &str,
        consumer_tag: Option<ShortString>,
        remaining_size: usize,
        payload_size: usize,
        payload: Vec<u8>,
    ) -> Result<()> {
        self.with_queue(queue, |queue| {
            match consumer_tag {
                Some(consumer_tag) => {
                    if let Some(consumer) = queue.get_consumer(&consumer_tag) {
                        consumer.receive_delivery_content(payload);
                        if remaining_size == payload_size {
                            consumer.new_delivery_complete()?;
                        }
                    }
                }
                None => {
                    queue.receive_delivery_content(payload);
                    if remaining_size == payload_size {
                        queue.new_delivery_complete();
                    }
                }
            }
            Ok(())
        })
    }
}