futures_util/stream/
peek.rs1use futures_core::{Async, Poll, Stream};
2use futures_core::task;
3use futures_sink::{Sink};
4
5use stream::{StreamExt, Fuse};
6
7#[derive(Debug)]
13#[must_use = "streams do nothing unless polled"]
14pub struct Peekable<S: Stream> {
15 stream: Fuse<S>,
16 peeked: Option<S::Item>,
17}
18
19
20pub fn new<S: Stream>(stream: S) -> Peekable<S> {
21 Peekable {
22 stream: stream.fuse(),
23 peeked: None
24 }
25}
26
27impl<S> Sink for Peekable<S>
29 where S: Sink + Stream
30{
31 type SinkItem = S::SinkItem;
32 type SinkError = S::SinkError;
33
34 delegate_sink!(stream);
35}
36
37impl<S: Stream> Stream for Peekable<S> {
38 type Item = S::Item;
39 type Error = S::Error;
40
41 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
42 if let Some(item) = self.peeked.take() {
43 return Ok(Async::Ready(Some(item)))
44 }
45 self.stream.poll_next(cx)
46 }
47}
48
49
50impl<S: Stream> Peekable<S> {
51 pub fn peek(&mut self, cx: &mut task::Context) -> Poll<Option<&S::Item>, S::Error> {
56 if self.peeked.is_some() {
57 return Ok(Async::Ready(self.peeked.as_ref()))
58 }
59 match try_ready!(self.poll_next(cx)) {
60 None => Ok(Async::Ready(None)),
61 Some(item) => {
62 self.peeked = Some(item);
63 Ok(Async::Ready(self.peeked.as_ref()))
64 }
65 }
66 }
67}