use std::pin::Pin;
use std::task::{Context, Poll};
use async_std::prelude::*;
use async_std::channel::{self, Receiver, Sender};
use pin_project_lite::pin_project;
enum Never {}
#[derive(Debug)]
pub struct StopSource {
_chan: Sender<Never>,
stop_token: StopToken,
}
#[derive(Debug, Clone)]
pub struct StopToken {
chan: Receiver<Never>,
}
impl Default for StopSource {
fn default() -> StopSource {
let (sender, receiver) = channel::bounded::<Never>(1);
StopSource {
_chan: sender,
stop_token: StopToken { chan: receiver },
}
}
}
impl StopSource {
pub fn new() -> StopSource {
StopSource::default()
}
pub fn stop_token(&self) -> StopToken {
self.stop_token.clone()
}
}
impl Future for StopToken {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let chan = Pin::new(&mut self.chan);
match Stream::poll_next(chan, cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(never)) => match never {},
Poll::Ready(None) => Poll::Ready(()),
}
}
}
impl StopToken {
pub fn stop_stream<S: Stream>(&self, stream: S) -> StopStream<S> {
StopStream {
stop_token: self.clone(),
stream,
}
}
pub fn stop_future<F: Future>(&self, future: F) -> StopFuture<F> {
StopFuture {
stop_token: self.clone(),
future,
}
}
}
pin_project! {
#[derive(Debug)]
pub struct StopStream<S> {
#[pin]
stop_token: StopToken,
#[pin]
stream: S,
}
}
impl<S: Stream> Stream for StopStream<S> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
if let Poll::Ready(()) = this.stop_token.poll(cx) {
return Poll::Ready(None);
}
this.stream.poll_next(cx)
}
}
pin_project! {
#[derive(Debug)]
pub struct StopFuture<F> {
#[pin]
stop_token: StopToken,
#[pin]
future: F,
}
}
impl<F: Future> Future for StopFuture<F> {
type Output = Option<F::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<F::Output>> {
let this = self.project();
if let Poll::Ready(()) = this.stop_token.poll(cx) {
return Poll::Ready(None);
}
match this.future.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(it) => Poll::Ready(Some(it)),
}
}
}