1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
use futures::stream::Stream; use futures::Poll; use futures::Async; #[allow(dead_code)] pub enum StreamWithEofMessage<T> { Item(T), Eof, } #[allow(dead_code)] pub fn stream_with_eof<S>(s: S) -> StreamWithEof<S> { StreamWithEof { stream: s, seen_eof: false, } } #[allow(dead_code)] pub struct StreamWithEof<S> { stream: S, seen_eof: bool, } impl<T, E, S> Stream for StreamWithEof<S> where S : Stream<Item=StreamWithEofMessage<T>, Error=E> { type Item = T; type Error = E; fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { loop { if self.seen_eof { match try_ready!(self.stream.poll()) { None => return Ok(Async::Ready(None)), Some(_) => panic!("item after eof"), } } else { match try_ready!(self.stream.poll()) { None => panic!("expecting explicit eof"), Some(StreamWithEofMessage::Eof) => { self.seen_eof = true; continue; } Some(StreamWithEofMessage::Item(item)) => return Ok(Async::Ready(Some(item))), } } } } }