futures_util/stream/
split.rs1use std::any::Any;
2use std::error::Error;
3use std::fmt;
4
5use futures_core::{Stream, Poll, Async};
6use futures_core::task;
7use futures_sink::{ Sink};
8
9use lock::BiLock;
10
11#[must_use = "streams do nothing unless polled"]
13#[derive(Debug)]
14pub struct SplitStream<S>(BiLock<S>);
15
16impl<S: Sink> SplitStream<S> {
17    pub fn reunite(self, other: SplitSink<S>) -> Result<S, ReuniteError<S>> {
21        other.reunite(self)
22    }
23}
24
25impl<S: Stream> Stream for SplitStream<S> {
26    type Item = S::Item;
27    type Error = S::Error;
28
29    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S::Item>, S::Error> {
30        match self.0.poll_lock(cx) {
31            Async::Ready(mut inner) => inner.poll_next(cx),
32            Async::Pending => Ok(Async::Pending),
33        }
34    }
35}
36
37#[allow(bad_style)]
38fn SplitSink<S: Sink>(lock: BiLock<S>) -> SplitSink<S> {
39    SplitSink {
40        lock,
41        slot: None,
42    }
43}
44
45#[derive(Debug)]
47pub struct SplitSink<S: Sink> {
48    lock: BiLock<S>,
49    slot: Option<S::SinkItem>,
50}
51
52impl<S: Sink> SplitSink<S> {
53    pub fn reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S>> {
57        self.lock.reunite(other.0).map_err(|err| {
58            ReuniteError(SplitSink(err.0), SplitStream(err.1))
59        })
60    }
61}
62
63impl<S: Sink> Sink for SplitSink<S> {
64    type SinkItem = S::SinkItem;
65    type SinkError = S::SinkError;
66
67    fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> {
68        loop {
69            if self.slot.is_none() {
70                return Ok(Async::Ready(()));
71            }
72            try_ready!(self.poll_flush(cx));
73        }
74    }
75
76    fn start_send(&mut self, item: S::SinkItem) -> Result<(), S::SinkError> {
77        self.slot = Some(item);
78        Ok(())
79    }
80
81    fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> {
82        match self.lock.poll_lock(cx) {
83            Async::Ready(mut inner) => {
84                if self.slot.is_some() {
85                    try_ready!(inner.poll_ready(cx));
86                    inner.start_send(self.slot.take().unwrap())?;
87                }
88                inner.poll_flush(cx)
89            }
90            Async::Pending => Ok(Async::Pending),
91        }
92    }
93
94    fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> {
95        match self.lock.poll_lock(cx) {
96            Async::Ready(mut inner) => {
97                if self.slot.is_some() {
98                    try_ready!(inner.poll_ready(cx));
99                    inner.start_send(self.slot.take().unwrap())?;
100                }
101                inner.poll_close(cx)
102            }
103            Async::Pending => Ok(Async::Pending),
104        }
105    }
106}
107
108pub fn split<S: Stream + Sink>(s: S) -> (SplitSink<S>, SplitStream<S>) {
109    let (a, b) = BiLock::new(s);
110    let read = SplitStream(a);
111    let write = SplitSink(b);
112    (write, read)
113}
114
115pub struct ReuniteError<T: Sink>(pub SplitSink<T>, pub SplitStream<T>);
118
119impl<T: Sink> fmt::Debug for ReuniteError<T> {
120    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
121        fmt.debug_tuple("ReuniteError")
122            .field(&"...")
123            .finish()
124    }
125}
126
127impl<T: Sink> fmt::Display for ReuniteError<T> {
128    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
129        write!(fmt, "tried to reunite a SplitStream and SplitSink that don't form a pair")
130    }
131}
132
133impl<T: Any + Sink> Error for ReuniteError<T> {
134    fn description(&self) -> &str {
135        "tried to reunite a SplitStream and SplitSink that don't form a pair"
136    }
137}