use crate::op_prelude::*;
const POLL_AFTER_COMPLETED_MSG: &'static str = "call to poll after completed!";
pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct TryFoldMut<S, T, F, Fut> {
#[pin]
upstream: S,
#[pin]
pending_future: Option<Fut>,
state: Option<T>,
handler: F,
}
}
impl<S, T, F, Fut> TryFoldMut<S, T, F, Fut>
where
S: TryStream + FusedStream,
F: FnMut(&mut T, S::Ok) -> Fut,
Fut: TryFuture<Ok=(), Error=S::Error>,
{
pub(crate) fn new(upstream: S, initial: T, handler: F) -> Self {
Self {
upstream,
pending_future: None,
state: Some(initial),
handler,
}
}
}
impl<S, T, F, Fut> Future for TryFoldMut<S, T, F, Fut>
where
S: TryStream + FusedStream,
F: FnMut(&mut T, S::Ok) -> Fut,
Fut: TryFuture<Ok=(), Error=S::Error>,
{
type Output = Result<T, S::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
Poll::Ready(loop {
if let Some(future) = this.pending_future.as_mut().as_pin_mut() {
let out = ready!(future.try_poll(cx));
this.pending_future.set(None);
if let Err(err) = out {
this.state.take();
break Err(err);
}
}
match ready!(this.upstream.as_mut().try_poll_next(cx)) {
Some(Ok(next)) => {
let state = this.state.as_mut().expect(POLL_AFTER_COMPLETED_MSG);
let future = (this.handler)(state, next);
this.pending_future.set(Some(future));
}
Some(Err(err)) => {
this.state.take();
break Err(err);
},
None => {
break Ok(this.state.take().expect(POLL_AFTER_COMPLETED_MSG));
}
}
})
}
}
#[cfg(feature = "sink")]
impl<S, T, F, Fut, Item, E> Sink<Item> for TryFoldMut<S, T, F, Fut>
where
S: Sink<Item, Error=E> + Stream + FusedStream,
F: FnMut(&mut T, S::Item) -> Fut,
Fut: Future<Output=()>,
{
delegate_sink!(upstream, E, Item);
}
pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct FoldMut<S, T, F, Fut> {
#[pin]
upstream: S,
#[pin]
pending_future: Option<Fut>,
state: Option<T>,
handler: F,
}
}
impl<S, T, F, Fut> FoldMut<S, T, F, Fut>
where
S: Stream + FusedStream,
F: FnMut(&mut T, S::Item) -> Fut,
Fut: Future<Output=()>,
{
pub(crate) fn new(upstream: S, initial: T, handler: F) -> Self {
Self {
upstream,
pending_future: None,
state: Some(initial),
handler,
}
}
}
impl<S, T, F, Fut> Future for FoldMut<S, T, F, Fut>
where
S: Stream + FusedStream,
F: FnMut(&mut T, S::Item) -> Fut,
Fut: Future<Output=()>,
{
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
Poll::Ready(loop {
if let Some(future) = this.pending_future.as_mut().as_pin_mut() {
ready!(future.poll(cx));
this.pending_future.set(None);
}
match ready!(this.upstream.as_mut().poll_next(cx)) {
Some(next) => {
let state = this.state.as_mut().expect(POLL_AFTER_COMPLETED_MSG);
let future = (this.handler)(state, next);
this.pending_future.set(Some(future));
}
None => {
break this.state.take().expect(POLL_AFTER_COMPLETED_MSG)
}
}
})
}
}
#[cfg(feature = "sink")]
impl<S, T, F, Fut, Item> Sink<Item> for FoldMut<S, T, F, Fut>
where
S: Sink<Item> + Stream + FusedStream,
F: FnMut(&mut T, S::Item) -> Fut,
Fut: Future<Output=()>,
{
delegate_sink!(upstream, S::Error, Item);
}