1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use futures::{Async, Poll, Stream, task};
use lapin_async::consumer::ConsumerSubscriber;
use log::trace;
use parking_lot::Mutex;
use tokio_io::{AsyncRead, AsyncWrite};

use std::collections::VecDeque;
use std::sync::Arc;

use crate::error::Error;
use crate::message::Delivery;
use crate::transport::*;

#[derive(Clone,Debug)]
pub struct ConsumerSub {
  inner: Arc<Mutex<ConsumerInner>>,
}

impl ConsumerSubscriber for ConsumerSub {
  fn new_delivery(&mut self, delivery: Delivery) {
    trace!("new_delivery;");
    let mut inner = self.inner.lock();
    inner.deliveries.push_back(delivery);
    if let Some(task) = inner.task.as_ref() {
      task.notify();
    }
  }
  fn drop_prefetched_messages(&mut self) {
    trace!("drop_prefetched_messages;");
    let mut inner = self.inner.lock();
    inner.deliveries.clear();
  }
  fn cancel(&mut self) {
    trace!("cancel;");
    let mut inner = self.inner.lock();
    inner.deliveries.clear();
    inner.canceled = true;
  }
}

#[derive(Clone)]
pub struct Consumer<T> {
  transport:    Arc<Mutex<AMQPTransport<T>>>,
  inner:        Arc<Mutex<ConsumerInner>>,
  channel_id:   u16,
  queue:        String,
  consumer_tag: String,
}

#[derive(Debug)]
struct ConsumerInner {
  deliveries: VecDeque<Delivery>,
  task:       Option<task::Task>,
  canceled:   bool,
}

impl Default for ConsumerInner {
  fn default() -> Self {
    Self {
      deliveries: VecDeque::new(),
      task:       None,
      canceled:   false,
    }
  }
}

impl<T: AsyncRead+AsyncWrite+Sync+Send+'static> Consumer<T> {
  pub fn new(transport: Arc<Mutex<AMQPTransport<T>>>, channel_id: u16, queue: String, consumer_tag: String) -> Consumer<T> {
    Consumer {
      transport,
      inner: Arc::new(Mutex::new(ConsumerInner::default())),
      channel_id,
      queue,
      consumer_tag,
    }
  }

  pub fn update_consumer_tag(&mut self, consumer_tag: String) {
    self.consumer_tag = consumer_tag;
  }

  pub fn subscriber(&self) -> ConsumerSub {
    ConsumerSub {
      inner: self.inner.clone(),
    }
  }
}

impl<T: AsyncRead+AsyncWrite+Sync+Send+'static> Stream for Consumer<T> {
  type Item = Delivery;
  type Error = Error;

  fn poll(&mut self) -> Poll<Option<Delivery>, Error> {
    trace!("consumer poll; consumer_tag={:?} polling transport", self.consumer_tag);
    let mut transport = self.transport.lock();
    transport.poll()?;
    let mut inner = self.inner.lock();
    trace!("consumer poll; consumer_tag={:?} acquired inner lock", self.consumer_tag);
    if inner.task.is_none() {
      let task = task::current();
      task.notify();
      inner.task = Some(task);
    }
    if let Some(delivery) = inner.deliveries.pop_front() {
      trace!("delivery; consumer_tag={:?} delivery_tag={:?}", self.consumer_tag, delivery.delivery_tag);
      Ok(Async::Ready(Some(delivery)))
    } else if inner.canceled {
      trace!("consumer canceled; consumer_tag={:?}", self.consumer_tag);
      Ok(Async::Ready(None))
    } else {
      trace!("delivery; consumer_tag={:?} status=NotReady", self.consumer_tag);
      Ok(Async::NotReady)
    }
  }
}