lapin 0.36.2

AMQP client library
Documentation
use crate::{
    executor::Executor,
    message::{Delivery, DeliveryResult},
    types::ShortString,
    BasicProperties, Error, Result,
};
use crossbeam_channel::{Receiver, Sender};
use futures_core::stream::Stream;
use log::trace;
use parking_lot::{Mutex, MutexGuard};
use std::{
    fmt,
    pin::Pin,
    sync::Arc,
    task::{Context, Poll, Waker},
};

pub trait ConsumerDelegate: Send + Sync {
    fn on_new_delivery(&self, delivery: DeliveryResult);
    fn drop_prefetched_messages(&self) {}
}

impl<DeliveryHandler: Fn(DeliveryResult) + Send + Sync> ConsumerDelegate for DeliveryHandler {
    fn on_new_delivery(&self, delivery: DeliveryResult) {
        self(delivery);
    }
}

#[derive(Clone)]
pub struct Consumer {
    inner: Arc<Mutex<ConsumerInner>>,
}

impl Consumer {
    pub(crate) fn new(consumer_tag: ShortString, executor: Arc<dyn Executor>) -> Consumer {
        Consumer {
            inner: Arc::new(Mutex::new(ConsumerInner::new(consumer_tag, executor))),
        }
    }

    pub fn inner(&self) -> MutexGuard<'_, ConsumerInner> {
        self.inner.lock()
    }

    pub fn set_delegate(&self, delegate: Box<dyn ConsumerDelegate>) {
        let mut inner = self.inner();
        while let Some(delivery) = inner.next_delivery() {
            delegate.on_new_delivery(delivery);
        }
        inner.delegate = Some(Arc::new(delegate));
    }

    pub(crate) fn start_new_delivery(&mut self, delivery: Delivery) {
        self.inner().current_message = Some(delivery)
    }

    pub(crate) fn set_delivery_properties(&mut self, properties: BasicProperties) {
        if let Some(delivery) = self.inner().current_message.as_mut() {
            delivery.properties = properties;
        }
    }

    pub(crate) fn receive_delivery_content(&mut self, payload: Vec<u8>) {
        if let Some(delivery) = self.inner().current_message.as_mut() {
            delivery.receive_content(payload);
        }
    }

    pub(crate) fn new_delivery_complete(&mut self) -> Result<()> {
        let mut inner = self.inner();
        if let Some(delivery) = inner.current_message.take() {
            inner.new_delivery(delivery)?;
        }
        Ok(())
    }

    pub(crate) fn drop_prefetched_messages(&self) -> Result<()> {
        self.inner().drop_prefetched_messages()
    }

    pub(crate) fn cancel(&self) -> Result<()> {
        self.inner().cancel()
    }

    pub(crate) fn set_error(&self, error: Error) -> Result<()> {
        self.inner().set_error(error)
    }
}

pub struct ConsumerInner {
    current_message: Option<Delivery>,
    deliveries_in: Sender<DeliveryResult>,
    deliveries_out: Receiver<DeliveryResult>,
    task: Option<Waker>,
    tag: ShortString,
    delegate: Option<Arc<Box<dyn ConsumerDelegate>>>,
    executor: Arc<dyn Executor>,
}

pub struct ConsumerIterator {
    receiver: Receiver<DeliveryResult>,
}

impl Iterator for ConsumerIterator {
    type Item = Result<Delivery>;

    fn next(&mut self) -> Option<Self::Item> {
        self.receiver.recv().ok().and_then(Result::transpose)
    }
}

impl IntoIterator for Consumer {
    type Item = Result<Delivery>;
    type IntoIter = ConsumerIterator;

    fn into_iter(self) -> Self::IntoIter {
        ConsumerIterator {
            receiver: self.inner().deliveries_out.clone(),
        }
    }
}

impl fmt::Debug for ConsumerInner {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "ConsumerInnder({})", self.tag)
    }
}

impl ConsumerInner {
    fn new(consumer_tag: ShortString, executor: Arc<dyn Executor>) -> Self {
        let (sender, receiver) = crossbeam_channel::unbounded();
        Self {
            current_message: None,
            deliveries_in: sender,
            deliveries_out: receiver,
            task: None,
            tag: consumer_tag,
            delegate: None,
            executor,
        }
    }

    pub fn next_delivery(&mut self) -> Option<DeliveryResult> {
        self.deliveries_out.try_recv().ok()
    }

    pub fn tag(&self) -> &ShortString {
        &self.tag
    }

    fn new_delivery(&mut self, delivery: Delivery) -> Result<()> {
        trace!("new_delivery; consumer_tag={}", self.tag);
        if let Some(delegate) = self.delegate.as_ref() {
            let delegate = delegate.clone();
            self.executor.execute(Box::new(move || {
                delegate.on_new_delivery(Ok(Some(delivery)))
            }))?;
        } else {
            self.deliveries_in
                .send(Ok(Some(delivery)))
                .expect("failed to send delivery to consumer");
        }
        if let Some(task) = self.task.as_ref() {
            task.wake_by_ref();
        }
        Ok(())
    }

    fn drop_prefetched_messages(&mut self) -> Result<()> {
        trace!("drop_prefetched_messages; consumer_tag={}", self.tag);
        if let Some(delegate) = self.delegate.as_ref() {
            let delegate = delegate.clone();
            self.executor
                .execute(Box::new(move || delegate.drop_prefetched_messages()))?;
        }
        while let Some(_) = self.next_delivery() {}
        Ok(())
    }

    fn cancel(&mut self) -> Result<()> {
        trace!("cancel; consumer_tag={}", self.tag);
        if let Some(delegate) = self.delegate.as_ref() {
            let delegate = delegate.clone();
            self.executor
                .execute(Box::new(move || delegate.on_new_delivery(Ok(None))))?;
        } else {
            self.deliveries_in
                .send(Ok(None))
                .expect("failed to send cancel to consumer");
        }
        if let Some(task) = self.task.take() {
            task.wake();
        }
        Ok(())
    }

    pub fn set_error(&mut self, error: Error) -> Result<()> {
        trace!("set_error; consumer_tag={}", self.tag);
        if let Some(delegate) = self.delegate.as_ref() {
            let delegate = delegate.clone();
            self.executor
                .execute(Box::new(move || delegate.on_new_delivery(Err(error))))?;
        } else {
            self.deliveries_in
                .send(Err(error))
                .expect("failed to send error to consumer");
        }
        self.cancel()
    }
}

impl fmt::Debug for Consumer {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "Consumer({})", self.inner().tag())
    }
}

impl Stream for Consumer {
    type Item = Result<Delivery>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        trace!("consumer poll; polling transport");
        let mut inner = self.inner();
        trace!(
            "consumer poll; acquired inner lock, consumer_tag={}",
            inner.tag()
        );
        inner.task = Some(cx.waker().clone());
        if let Some(delivery) = inner.next_delivery() {
            match delivery {
                Ok(Some(delivery)) => {
                    trace!(
                        "delivery; consumer_tag={}, delivery_tag={:?}",
                        inner.tag(),
                        delivery.delivery_tag
                    );
                    Poll::Ready(Some(Ok(delivery)))
                }
                Ok(None) => {
                    trace!("consumer canceled; consumer_tag={}", inner.tag());
                    Poll::Ready(None)
                }
                Err(error) => Poll::Ready(Some(Err(error))),
            }
        } else {
            trace!("delivery; status=NotReady, consumer_tag={}", inner.tag());
            Poll::Pending
        }
    }
}

#[cfg(test)]
mod futures_tests {
    use super::*;
    use crate::executor::DefaultExecutor;

    use std::task::{Context, Poll};

    use futures_test::task::new_count_waker;
    use futures_util::stream::StreamExt;

    #[test]
    fn stream_on_cancel() {
        let (waker, awoken_count) = new_count_waker();
        let mut cx = Context::from_waker(&waker);

        let mut consumer = Consumer::new(
            ShortString::from("test-consumer"),
            DefaultExecutor::default(),
        );

        assert_eq!(awoken_count.get(), 0);
        assert_eq!(consumer.poll_next_unpin(&mut cx), Poll::Pending);

        consumer.cancel().unwrap();

        assert_eq!(awoken_count.get(), 1);
        assert_eq!(consumer.poll_next_unpin(&mut cx), Poll::Ready(None));
    }

    #[test]
    fn stream_on_error() {
        let (waker, awoken_count) = new_count_waker();
        let mut cx = Context::from_waker(&waker);

        let mut consumer = Consumer::new(
            ShortString::from("test-consumer"),
            DefaultExecutor::default(),
        );

        assert_eq!(awoken_count.get(), 0);
        assert_eq!(consumer.poll_next_unpin(&mut cx), Poll::Pending);

        consumer.set_error(Error::InvalidFrameReceived).unwrap();

        assert_eq!(awoken_count.get(), 1);
        assert_eq!(
            consumer.poll_next_unpin(&mut cx),
            Poll::Ready(Some(Err(Error::InvalidFrameReceived)))
        );
    }
}