use core::fmt::{Debug, Formatter, Result as FmtResult};
use core::mem::replace;
use {Async, AsyncSink, Poll, Sink, StartSend};
pub struct Fanout<A: Sink, B: Sink> {
left: Downstream<A>,
right: Downstream<B>
}
impl<A: Sink, B: Sink> Fanout<A, B> {
pub fn into_inner(self) -> (A, B) {
(self.left.sink, self.right.sink)
}
}
impl<A: Sink + Debug, B: Sink + Debug> Debug for Fanout<A, B>
where A::SinkItem: Debug,
B::SinkItem: Debug
{
fn fmt(&self, f: &mut Formatter) -> FmtResult {
f.debug_struct("Fanout")
.field("left", &self.left)
.field("right", &self.right)
.finish()
}
}
pub fn new<A: Sink, B: Sink>(left: A, right: B) -> Fanout<A, B> {
Fanout {
left: Downstream::new(left),
right: Downstream::new(right)
}
}
impl<A, B> Sink for Fanout<A, B>
where A: Sink,
A::SinkItem: Clone,
B: Sink<SinkItem=A::SinkItem, SinkError=A::SinkError>
{
type SinkItem = A::SinkItem;
type SinkError = A::SinkError;
fn start_send(
&mut self,
item: Self::SinkItem
) -> StartSend<Self::SinkItem, Self::SinkError> {
self.left.keep_flushing()?;
self.right.keep_flushing()?;
if self.left.is_ready() && self.right.is_ready() {
self.left.state = self.left.sink.start_send(item.clone())?;
self.right.state = self.right.sink.start_send(item)?;
Ok(AsyncSink::Ready)
} else {
Ok(AsyncSink::NotReady(item))
}
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
let left_async = self.left.poll_complete()?;
let right_async = self.right.poll_complete()?;
if left_async.is_ready() && right_async.is_ready() {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
fn close(&mut self) -> Poll<(), Self::SinkError> {
let left_async = self.left.close()?;
let right_async = self.right.close()?;
if left_async.is_ready() && right_async.is_ready() {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
}
#[derive(Debug)]
struct Downstream<S: Sink> {
sink: S,
state: AsyncSink<S::SinkItem>
}
impl<S: Sink> Downstream<S> {
fn new(sink: S) -> Self {
Downstream { sink: sink, state: AsyncSink::Ready }
}
fn is_ready(&self) -> bool {
self.state.is_ready()
}
fn keep_flushing(&mut self) -> Result<(), S::SinkError> {
if let AsyncSink::NotReady(item) = replace(&mut self.state, AsyncSink::Ready) {
self.state = self.sink.start_send(item)?;
}
Ok(())
}
fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
self.keep_flushing()?;
let async = self.sink.poll_complete()?;
if self.state.is_ready() && async.is_ready() {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
fn close(&mut self) -> Poll<(), S::SinkError> {
self.keep_flushing()?;
if self.state.is_ready() {
self.sink.close()
} else {
Ok(Async::NotReady)
}
}
}