use futures::{Future, Stream, Sink, Poll, Async};
use amqpr_codec::Frame;
use basic::consume::{start_consume, ConsumeStarted};
use basic::deliver::{get_delivered, Delivered, DeliveredItem};
use errors::Error;
pub use basic::consume::StartConsumeOption;
pub fn subscribe_stream<S, E>(
ch_id: u16,
socket: S,
option: StartConsumeOption,
) -> SubscribeStream<S, E>
where
S: Stream<Item = Frame, Error = E> + Sink<SinkItem = Frame, SinkError = E>,
E: From<Error>,
{
let consume_started = start_consume(ch_id, socket, option);
let sub_stream = SubscribeStream::SendingConsumeMethod(consume_started);
sub_stream
}
pub enum SubscribeStream<S, E>
where
S: Stream<Item = Frame, Error = E> + Sink<SinkItem = Frame, SinkError = E>,
{
SendingConsumeMethod(ConsumeStarted<S>),
ReceivingDeliverd(Delivered<S>),
}
impl<S, E> Stream for SubscribeStream<S, E>
where
S: Stream<Item = Frame, Error = E>
+ Sink<SinkItem = Frame, SinkError = E>,
E: From<Error>,
{
type Item = DeliveredItem;
type Error = E;
fn poll(&mut self) -> Poll<Option<DeliveredItem>, Self::Error> {
use self::SubscribeStream::*;
let (item, socket) = match self {
&mut SendingConsumeMethod(ref mut fut) => {
let socket = try_ready!(fut.poll());
(None, socket)
}
&mut ReceivingDeliverd(ref mut del) => {
let (item, socket) = try_ready!(del.poll());
(Some(item), socket)
}
};
let delivered = get_delivered(socket);
*self = ReceivingDeliverd(delivered);
match item {
Some(bytes) => Ok(Async::Ready(Some(bytes))),
None => self.poll(),
}
}
}