use futures_util::{stream::FusedStream, Stream};
use pin_project_lite::pin_project;
use std::{
fmt,
pin::Pin,
task::{Context, Poll},
};
pin_project! {
pub(crate) struct PeekableFused<St: FusedStream> {
#[pin]
stream: St,
peeked: Option<St::Item>,
}
}
impl<St> fmt::Debug for PeekableFused<St>
where
St: FusedStream + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PeekableFused")
.field("stream", &self.stream)
.field("peeked", &self.peeked.as_ref().map(|_| "..."))
.finish()
}
}
#[allow(dead_code)]
impl<St: FusedStream> PeekableFused<St> {
pub(super) fn new(stream: St) -> Self {
Self {
stream,
peeked: None,
}
}
pub fn get_ref(&self) -> &St {
&self.stream
}
pub fn get_mut(&mut self) -> &mut St {
&mut self.stream
}
pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
self.project().stream
}
pub fn into_inner(self) -> St {
self.stream
}
pub fn is_done(&self) -> bool {
self.stream.is_terminated()
}
pub fn poll_peek(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<&St::Item>> {
let mut this = self.project();
Poll::Ready(loop {
if this.peeked.is_some() {
break this.peeked.as_ref();
} else if let Some(item) = futures_util::ready!(this.stream.as_mut().poll_next(cx)) {
*this.peeked = Some(item);
} else {
break None;
}
})
}
}
impl<St: FusedStream> Stream for PeekableFused<St> {
type Item = St::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
if let Some(item) = this.peeked.take() {
return Poll::Ready(Some(item));
}
this.stream.poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
let peek_len = usize::from(self.peeked.is_some());
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(peek_len);
let upper = match upper {
Some(x) => x.checked_add(peek_len),
None => None,
};
(lower, upper)
}
}
impl<St: FusedStream> FusedStream for PeekableFused<St> {
fn is_terminated(&self) -> bool {
self.peeked.is_none() && self.stream.is_terminated()
}
}