1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use futures::{Async,Poll,Stream,task};
use lapin_async::consumer::ConsumerSubscriber;
use tokio_io::{AsyncRead,AsyncWrite};
use std::collections::VecDeque;
use std::io;
use std::sync::{Arc,Mutex};

use message::Delivery;
use transport::*;

#[derive(Clone,Debug)]
pub struct ConsumerSub {
  inner: Arc<Mutex<ConsumerInner>>,
}

impl ConsumerSubscriber for ConsumerSub {
  fn new_delivery(&mut self, delivery: Delivery) {
    trace!("new_delivery;");
    if let Ok(mut inner) = self.inner.lock() {
      inner.deliveries.push_back(delivery);
      if let Some(task) = inner.task.as_ref() {
        task.notify();
      }
    } else {
      // FIXME: what do we do here?
      error!("new_delivery; mutex error");
    }
  }
}

#[derive(Clone)]
pub struct Consumer<T> {
  transport:    Arc<Mutex<AMQPTransport<T>>>,
  inner:        Arc<Mutex<ConsumerInner>>,
  channel_id:   u16,
  queue:        String,
  consumer_tag: String,
}

#[derive(Debug)]
struct ConsumerInner {
  deliveries: VecDeque<Delivery>,
  task:       Option<task::Task>,
}

impl Default for ConsumerInner {
  fn default() -> Self {
    Self {
      deliveries: VecDeque::new(),
      task:       None,
    }
  }
}

impl<T: AsyncRead+AsyncWrite+Sync+Send+'static> Consumer<T> {
  pub fn new(transport: Arc<Mutex<AMQPTransport<T>>>, channel_id: u16, queue: String, consumer_tag: String) -> Consumer<T> {
    Consumer {
      transport,
      inner: Arc::new(Mutex::new(ConsumerInner::default())),
      channel_id,
      queue,
      consumer_tag,
    }
  }

  pub fn update_consumer_tag(&mut self, consumer_tag: String) {
    self.consumer_tag = consumer_tag;
  }

  pub fn subscriber(&self) -> ConsumerSub {
    ConsumerSub {
      inner: self.inner.clone(),
    }
  }
}

impl<T: AsyncRead+AsyncWrite+Sync+Send+'static> Stream for Consumer<T> {
  type Item = Delivery;
  type Error = io::Error;

  fn poll(&mut self) -> Poll<Option<Delivery>, io::Error> {
    trace!("consumer poll; consumer_tag={:?} polling transport", self.consumer_tag);
    let mut transport = lock_transport!(self.transport);
    transport.poll()?;
    let mut inner = match self.inner.lock() {
      Ok(inner) => inner,
      Err(_)    => if self.inner.is_poisoned() {
        return Err(io::Error::new(io::ErrorKind::Other, "Consumer mutex is poisoned"))
      } else {
        task::current().notify();
        return Ok(Async::NotReady)
      },
    };
    trace!("consumer poll; consumer_tag={:?} acquired inner lock", self.consumer_tag);
    if inner.task.is_none() {
      let task = task::current();
      task.notify();
      inner.task = Some(task);
    }
    if let Some(delivery) = inner.deliveries.pop_front() {
      trace!("delivery; consumer_tag={:?} delivery_tag={:?}", self.consumer_tag, delivery.delivery_tag);
      Ok(Async::Ready(Some(delivery)))
    } else {
      trace!("delivery; consumer_tag={:?} status=NotReady", self.consumer_tag);
      Ok(Async::NotReady)
    }
  }
}