1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
use futures::{Async, Poll, Stream, task}; use lapin_async::consumer::ConsumerSubscriber; use log::trace; use parking_lot::Mutex; use tokio_io::{AsyncRead, AsyncWrite}; use std::collections::VecDeque; use std::sync::Arc; use crate::error::Error; use crate::message::Delivery; use crate::transport::*; #[derive(Clone,Debug)] pub struct ConsumerSub { inner: Arc<Mutex<ConsumerInner>>, } impl ConsumerSubscriber for ConsumerSub { fn new_delivery(&mut self, delivery: Delivery) { trace!("new_delivery;"); let mut inner = self.inner.lock(); inner.deliveries.push_back(delivery); if let Some(task) = inner.task.as_ref() { task.notify(); } } fn drop_prefetched_messages(&mut self) { trace!("drop_prefetched_messages;"); let mut inner = self.inner.lock(); inner.deliveries.clear(); } fn cancel(&mut self) { trace!("cancel;"); let mut inner = self.inner.lock(); inner.deliveries.clear(); inner.canceled = true; } } #[derive(Clone)] pub struct Consumer<T> { transport: Arc<Mutex<AMQPTransport<T>>>, inner: Arc<Mutex<ConsumerInner>>, channel_id: u16, queue: String, consumer_tag: String, } #[derive(Debug)] struct ConsumerInner { deliveries: VecDeque<Delivery>, task: Option<task::Task>, canceled: bool, } impl Default for ConsumerInner { fn default() -> Self { Self { deliveries: VecDeque::new(), task: None, canceled: false, } } } impl<T: AsyncRead+AsyncWrite+Sync+Send+'static> Consumer<T> { pub fn new(transport: Arc<Mutex<AMQPTransport<T>>>, channel_id: u16, queue: String, consumer_tag: String) -> Consumer<T> { Consumer { transport, inner: Arc::new(Mutex::new(ConsumerInner::default())), channel_id, queue, consumer_tag, } } pub fn update_consumer_tag(&mut self, consumer_tag: String) { self.consumer_tag = consumer_tag; } pub fn subscriber(&self) -> ConsumerSub { ConsumerSub { inner: self.inner.clone(), } } } impl<T: AsyncRead+AsyncWrite+Sync+Send+'static> Stream for Consumer<T> { type Item = Delivery; type Error = Error; fn poll(&mut self) -> Poll<Option<Delivery>, Error> { trace!("consumer poll; consumer_tag={:?} polling transport", self.consumer_tag); let mut transport = self.transport.lock(); transport.poll()?; let mut inner = self.inner.lock(); trace!("consumer poll; consumer_tag={:?} acquired inner lock", self.consumer_tag); if inner.task.is_none() { let task = task::current(); task.notify(); inner.task = Some(task); } if let Some(delivery) = inner.deliveries.pop_front() { trace!("delivery; consumer_tag={:?} delivery_tag={:?}", self.consumer_tag, delivery.delivery_tag); Ok(Async::Ready(Some(delivery))) } else if inner.canceled { trace!("consumer canceled; consumer_tag={:?}", self.consumer_tag); Ok(Async::Ready(None)) } else { trace!("delivery; consumer_tag={:?} status=NotReady", self.consumer_tag); Ok(Async::NotReady) } } }