vru_cancel/
lib.rs

1// Copyright 2021 Vladislav Melnik
2// SPDX-License-Identifier: MIT
3
4#![deny(clippy::all)]
5#![warn(clippy::pedantic)]
6// #![warn(clippy::restriction)]
7#![warn(clippy::nursery)]
8#![warn(clippy::cargo)]
9// nursery
10#![allow(clippy::use_self)]
11
12use std::{
13    pin::Pin,
14    task::{Context, Poll},
15    future::Future,
16    ops::{Deref, DerefMut},
17};
18
19use tokio::sync::watch;
20use tokio_stream::Stream;
21
22pub struct CancelToken(());
23
24pin_project_lite::pin_project! {
25    /// The stream that end when `fut` is ready.
26    #[must_use]
27    pub struct Cancelable<S, F> {
28        #[pin]
29        pub stream: S,
30        #[pin]
31        pub fut: F,
32    }
33}
34
35impl<S, F> Deref for Cancelable<S, F> {
36    type Target = S;
37
38    fn deref(&self) -> &Self::Target {
39        &self.stream
40    }
41}
42
43impl<S, F> DerefMut for Cancelable<S, F> {
44    fn deref_mut(&mut self) -> &mut Self::Target {
45        &mut self.stream
46    }
47}
48
49impl<S, F> Stream for Cancelable<S, F>
50where
51    S: Stream,
52    F: Future<Output = CancelToken>,
53{
54    type Item = S::Item;
55
56    #[inline]
57    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
58        let this = self.project();
59
60        if let Poll::Ready(CancelToken(())) = this.fut.poll(cx) {
61            Poll::Ready(None)
62        } else {
63            this.stream.poll_next(cx)
64        }
65    }
66}
67
68#[derive(Clone)]
69pub struct Canceler(watch::Receiver<bool>);
70
71impl Canceler {
72    /// Execute a closure immediately with `Canceler`.
73    /// Return a closure which cancels all streams.
74    #[inline]
75    pub fn spawn<F, T>(f: F) -> impl FnOnce() -> T
76    where
77        F: FnOnce(Self) -> T,
78    {
79        let (tx, rx) = watch::channel(false);
80        let output = f(Canceler(rx));
81
82        move || {
83            if tx.send(true).is_err() {
84                // canceler already dropped, nothing to cancel
85            }
86            output
87        }
88    }
89
90    /// Create a cancel token. Use `cancelable` macros instead.
91    #[inline]
92    pub async fn cancel(&mut self) -> CancelToken {
93        while !*self.0.borrow() {
94            // value is currently false; wait for it to change
95            if self.0.changed().await.is_err() {
96                // channel was closed
97                break;
98            }
99        }
100        CancelToken(())
101    }
102}
103
104/// Make the stream cancelable.
105#[macro_export]
106macro_rules! cancelable {
107    ($stream:ident, $canceler:expr) => {
108        // TODO: make `_canceler` and `_fut` anonymous to avoid accidental eclipse
109        let mut _canceler = $canceler.clone();
110        let _fut = _canceler.cancel();
111        tokio::pin!(_fut);
112        let mut $stream = vru_cancel::Cancelable {
113            stream: $stream,
114            fut: _fut,
115        };
116    };
117}