amqpr_api/
subscribe_stream.rs1use futures::{Future, Stream, Sink, Poll, Async};
4
5use amqpr_codec::Frame;
6
7use basic::consume::{start_consume, ConsumeStarted};
8use basic::deliver::{get_delivered, Delivered, DeliveredItem};
9use errors::Error;
10
11pub use basic::consume::StartConsumeOption;
12
13
14
15pub fn subscribe_stream<S, E>(
31 ch_id: u16,
32 socket: S,
33 option: StartConsumeOption,
34) -> SubscribeStream<S, E>
35where
36 S: Stream<Item = Frame, Error = E> + Sink<SinkItem = Frame, SinkError = E>,
37 E: From<Error>,
38{
39 let consume_started = start_consume(ch_id, socket, option);
40 let sub_stream = SubscribeStream::SendingConsumeMethod(consume_started);
41
42 sub_stream
43}
44
45
46
47
48
49
50pub enum SubscribeStream<S, E>
56where
57 S: Stream<Item = Frame, Error = E> + Sink<SinkItem = Frame, SinkError = E>,
58{
59 SendingConsumeMethod(ConsumeStarted<S>),
60 ReceivingDeliverd(Delivered<S>),
61}
62
63
64impl<S, E> Stream for SubscribeStream<S, E>
65where
66 S: Stream<Item = Frame, Error = E>
67 + Sink<SinkItem = Frame, SinkError = E>,
68 E: From<Error>,
69{
70 type Item = DeliveredItem;
71 type Error = E;
72
73 fn poll(&mut self) -> Poll<Option<DeliveredItem>, Self::Error> {
74 use self::SubscribeStream::*;
75
76 let (item, socket) = match self {
77 &mut SendingConsumeMethod(ref mut fut) => {
78 let socket = try_ready!(fut.poll());
79 (None, socket)
80 }
81 &mut ReceivingDeliverd(ref mut del) => {
82 let (item, socket) = try_ready!(del.poll());
83 (Some(item), socket)
84 }
85 };
86
87 let delivered = get_delivered(socket);
89 *self = ReceivingDeliverd(delivered);
90
91 match item {
92 Some(bytes) => Ok(Async::Ready(Some(bytes))),
93 None => self.poll(),
94 }
95 }
96}