use futures::{Sink, Future, Poll, StartSend, Async, AsyncSink};
use amqpr_codec::Frame;
use basic::publish::{publish, Published, PublishItem};
use common::Should;
pub fn publish_sink<S>(channel: u16, socket: S) -> BroadcastSink<S>
where
S: Sink<SinkItem = Frame>,
{
BroadcastSink {
channel: channel,
state: PublishState::Waiting(Should::new(socket)),
}
}
pub struct BroadcastSink<S>
where
S: Sink<SinkItem = Frame>,
{
channel: u16,
state: PublishState<S>,
}
enum PublishState<S>
where
S: Sink<SinkItem = Frame>,
{
Processing(Published<S>),
Waiting(Should<S>),
}
impl<S> Sink for BroadcastSink<S>
where
S: Sink<SinkItem = Frame>,
{
type SinkItem = PublishItem;
type SinkError = S::SinkError;
fn start_send(&mut self, item: PublishItem) -> StartSend<PublishItem, Self::SinkError> {
if let Async::NotReady = self.poll_complete()? {
return Ok(AsyncSink::NotReady(item));
}
use self::PublishState::*;
self.state = match &mut self.state {
&mut Processing(ref mut _published) => unreachable!(),
&mut Waiting(ref mut sink) => {
let sink = sink.take();
let published = publish(self.channel, sink, item);
Processing(published)
}
};
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
use self::PublishState::*;
self.state = match &mut self.state {
&mut Processing(ref mut processing) => {
let sink = try_ready!(processing.poll());
Waiting(Should::new(sink))
}
&mut Waiting(_) => return Ok(Async::Ready(())),
};
self.poll_complete()
}
}