#![deny(clippy::all)]
#![warn(clippy::pedantic)]
#![warn(clippy::nursery)]
#![warn(clippy::cargo)]
#![allow(clippy::use_self)]
use std::{
pin::Pin,
task::{Context, Poll},
future::Future,
ops::{Deref, DerefMut},
};
use tokio::sync::watch;
use tokio_stream::Stream;
pub struct CancelToken(());
pin_project_lite::pin_project! {
#[must_use]
pub struct Cancelable<S, F> {
#[pin]
pub stream: S,
#[pin]
pub fut: F,
}
}
impl<S, F> Deref for Cancelable<S, F> {
type Target = S;
fn deref(&self) -> &Self::Target {
&self.stream
}
}
impl<S, F> DerefMut for Cancelable<S, F> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.stream
}
}
impl<S, F> Stream for Cancelable<S, F>
where
S: Stream,
F: Future<Output = CancelToken>,
{
type Item = S::Item;
#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
if let Poll::Ready(CancelToken(())) = this.fut.poll(cx) {
Poll::Ready(None)
} else {
this.stream.poll_next(cx)
}
}
}
#[derive(Clone)]
pub struct Canceler(watch::Receiver<bool>);
impl Canceler {
#[inline]
pub fn spawn<F, T>(f: F) -> impl FnOnce() -> T
where
F: FnOnce(Self) -> T,
{
let (tx, rx) = watch::channel(false);
let output = f(Canceler(rx));
move || {
if tx.send(true).is_err() {
}
output
}
}
#[inline]
pub async fn cancel(&mut self) -> CancelToken {
while !*self.0.borrow() {
if self.0.changed().await.is_err() {
break;
}
}
CancelToken(())
}
}
#[macro_export]
macro_rules! cancelable {
($stream:ident, $canceler:expr) => {
let mut _canceler = $canceler.clone();
let _fut = _canceler.cancel();
tokio::pin!(_fut);
let mut $stream = vru_cancel::Cancelable {
stream: $stream,
fut: _fut,
};
};
}