1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use futures::{Async,Poll,Stream,task};
use lapin_async::consumer::ConsumerSubscriber;
use tokio_io::{AsyncRead,AsyncWrite};
use std::collections::VecDeque;
use std::io;
use std::sync::{Arc,Mutex};
use message::Delivery;
use transport::*;
#[derive(Clone,Debug)]
pub struct ConsumerSub {
inner: Arc<Mutex<ConsumerInner>>,
}
impl ConsumerSubscriber for ConsumerSub {
fn new_delivery(&mut self, delivery: Delivery) {
trace!("new_delivery;");
if let Ok(mut inner) = self.inner.lock() {
inner.deliveries.push_back(delivery);
if let Some(task) = inner.task.as_ref() {
task.notify();
}
} else {
error!("new_delivery; mutex error");
}
}
}
#[derive(Clone)]
pub struct Consumer<T> {
transport: Arc<Mutex<AMQPTransport<T>>>,
inner: Arc<Mutex<ConsumerInner>>,
channel_id: u16,
queue: String,
consumer_tag: String,
}
#[derive(Debug)]
struct ConsumerInner {
deliveries: VecDeque<Delivery>,
task: Option<task::Task>,
}
impl Default for ConsumerInner {
fn default() -> Self {
Self {
deliveries: VecDeque::new(),
task: None,
}
}
}
impl<T: AsyncRead+AsyncWrite+Sync+Send+'static> Consumer<T> {
pub fn new(transport: Arc<Mutex<AMQPTransport<T>>>, channel_id: u16, queue: String, consumer_tag: String) -> Consumer<T> {
Consumer {
transport,
inner: Arc::new(Mutex::new(ConsumerInner::default())),
channel_id,
queue,
consumer_tag,
}
}
pub fn update_consumer_tag(&mut self, consumer_tag: String) {
self.consumer_tag = consumer_tag;
}
pub fn subscriber(&self) -> ConsumerSub {
ConsumerSub {
inner: self.inner.clone(),
}
}
}
impl<T: AsyncRead+AsyncWrite+Sync+Send+'static> Stream for Consumer<T> {
type Item = Delivery;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Delivery>, io::Error> {
trace!("consumer poll; consumer_tag={:?} polling transport", self.consumer_tag);
let mut transport = lock_transport!(self.transport);
transport.poll()?;
let mut inner = match self.inner.lock() {
Ok(inner) => inner,
Err(_) => if self.inner.is_poisoned() {
return Err(io::Error::new(io::ErrorKind::Other, "Consumer mutex is poisoned"))
} else {
task::current().notify();
return Ok(Async::NotReady)
},
};
trace!("consumer poll; consumer_tag={:?} acquired inner lock", self.consumer_tag);
if inner.task.is_none() {
let task = task::current();
task.notify();
inner.task = Some(task);
}
if let Some(delivery) = inner.deliveries.pop_front() {
trace!("delivery; consumer_tag={:?} delivery_tag={:?}", self.consumer_tag, delivery.delivery_tag);
Ok(Async::Ready(Some(delivery)))
} else {
trace!("delivery; consumer_tag={:?} status=NotReady", self.consumer_tag);
Ok(Async::NotReady)
}
}
}