1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use futures::{Future, Stream, Sink, Poll, Async};
use amqpr_codec::Frame;
use basic::consume::{start_consume, ConsumeStarted};
use basic::deliver::{get_delivered, Delivered, DeliveredItem};
use errors::Error;
pub use basic::consume::StartConsumeOption;
pub fn subscribe_stream<S, E>(
ch_id: u16,
socket: S,
option: StartConsumeOption,
) -> SubscribeStream<S, E>
where
S: Stream<Item = Frame, Error = E> + Sink<SinkItem = Frame, SinkError = E>,
E: From<Error>,
{
let consume_started = start_consume(ch_id, socket, option);
let sub_stream = SubscribeStream::SendingConsumeMethod(consume_started);
sub_stream
}
pub enum SubscribeStream<S, E>
where
S: Stream<Item = Frame, Error = E> + Sink<SinkItem = Frame, SinkError = E>,
{
SendingConsumeMethod(ConsumeStarted<S>),
ReceivingDeliverd(Delivered<S>),
}
impl<S, E> Stream for SubscribeStream<S, E>
where
S: Stream<Item = Frame, Error = E>
+ Sink<SinkItem = Frame, SinkError = E>,
E: From<Error>,
{
type Item = DeliveredItem;
type Error = E;
fn poll(&mut self) -> Poll<Option<DeliveredItem>, Self::Error> {
use self::SubscribeStream::*;
let (item, socket) = match self {
&mut SendingConsumeMethod(ref mut fut) => {
let socket = try_ready!(fut.poll());
(None, socket)
}
&mut ReceivingDeliverd(ref mut del) => {
let (item, socket) = try_ready!(del.poll());
(Some(item), socket)
}
};
let delivered = get_delivered(socket);
*self = ReceivingDeliverd(delivered);
match item {
Some(bytes) => Ok(Async::Ready(Some(bytes))),
None => self.poll(),
}
}
}