use super::ThrottlePool;
use futures::task::{Context, Poll};
use futures::{ready, Future, FutureExt, Stream};
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::pin::Pin;
pub trait ThrottledStream {
fn throttle(self, pool: ThrottlePool) -> Throttled<Self>
where
Self: Stream + Sized,
{
Throttled {
stream_pinned: self,
pool,
state_unpinned: State::None,
slot_pinned: None,
}
}
}
impl<T: Stream> ThrottledStream for T {}
#[must_use = "streams do nothing unless polled"]
pub struct Throttled<S>
where
S: Stream + 'static,
{
stream_pinned: S,
pool: ThrottlePool,
state_unpinned: State,
slot_pinned: Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
}
impl<S> Throttled<S>
where
S: Stream + 'static,
{
unsafe_pinned!(stream_pinned: S);
unsafe_unpinned!(state_unpinned: State);
unsafe_pinned!(slot_pinned: Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>);
}
impl<S> Stream for Throttled<S>
where
S: Stream,
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
match self.state_unpinned {
State::None => {
let slot = self.pool.queue().boxed();
self.as_mut().slot_pinned().set(Some(slot));
*self.as_mut().state_unpinned() = State::Slot;
}
State::Slot => {
let _ = ready!(self
.as_mut()
.slot_pinned()
.as_pin_mut()
.expect("impossible: slot future was None, during State::Slot")
.poll(cx));
self.as_mut().slot_pinned().set(None);
*self.as_mut().state_unpinned() = State::Stream;
}
State::Stream => {
if let Some(item) = ready!(self.as_mut().stream_pinned().poll_next(cx)) {
*self.as_mut().state_unpinned() = State::None;
return Poll::Ready(Some(item));
}
else {
*self.as_mut().state_unpinned() = State::Done;
}
}
State::Done => return Poll::Ready(None),
}
}
}
}
enum State {
None,
Slot,
Stream,
Done,
}