1#![deny(clippy::all)]
5#![warn(clippy::pedantic)]
6#![warn(clippy::nursery)]
8#![warn(clippy::cargo)]
9#![allow(clippy::use_self)]
11
12use std::{
13 pin::Pin,
14 task::{Context, Poll},
15 future::Future,
16 ops::{Deref, DerefMut},
17};
18
19use tokio::sync::watch;
20use tokio_stream::Stream;
21
22pub struct CancelToken(());
23
24pin_project_lite::pin_project! {
25 #[must_use]
27 pub struct Cancelable<S, F> {
28 #[pin]
29 pub stream: S,
30 #[pin]
31 pub fut: F,
32 }
33}
34
35impl<S, F> Deref for Cancelable<S, F> {
36 type Target = S;
37
38 fn deref(&self) -> &Self::Target {
39 &self.stream
40 }
41}
42
43impl<S, F> DerefMut for Cancelable<S, F> {
44 fn deref_mut(&mut self) -> &mut Self::Target {
45 &mut self.stream
46 }
47}
48
49impl<S, F> Stream for Cancelable<S, F>
50where
51 S: Stream,
52 F: Future<Output = CancelToken>,
53{
54 type Item = S::Item;
55
56 #[inline]
57 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
58 let this = self.project();
59
60 if let Poll::Ready(CancelToken(())) = this.fut.poll(cx) {
61 Poll::Ready(None)
62 } else {
63 this.stream.poll_next(cx)
64 }
65 }
66}
67
68#[derive(Clone)]
69pub struct Canceler(watch::Receiver<bool>);
70
71impl Canceler {
72 #[inline]
75 pub fn spawn<F, T>(f: F) -> impl FnOnce() -> T
76 where
77 F: FnOnce(Self) -> T,
78 {
79 let (tx, rx) = watch::channel(false);
80 let output = f(Canceler(rx));
81
82 move || {
83 if tx.send(true).is_err() {
84 }
86 output
87 }
88 }
89
90 #[inline]
92 pub async fn cancel(&mut self) -> CancelToken {
93 while !*self.0.borrow() {
94 if self.0.changed().await.is_err() {
96 break;
98 }
99 }
100 CancelToken(())
101 }
102}
103
104#[macro_export]
106macro_rules! cancelable {
107 ($stream:ident, $canceler:expr) => {
108 let mut _canceler = $canceler.clone();
110 let _fut = _canceler.cancel();
111 tokio::pin!(_fut);
112 let mut $stream = vru_cancel::Cancelable {
113 stream: $stream,
114 fut: _fut,
115 };
116 };
117}