async_deadman/
lib.rs

1use parking_lot::Mutex;
2use std::{
3    future::Future,
4    ops::DerefMut,
5    pin::Pin,
6    sync::{
7        atomic::{self, AtomicBool},
8        Arc,
9    },
10    task::{Poll, Waker},
11};
12
13/// A deadman switch that can be used to detect when it is dropped.
14///
15/// # Example
16///
17/// ```rust
18/// use async_deadman::Deadman;
19///
20/// #[tokio::main] // does not have to be tokio
21/// async fn main() {
22///     let (deadman, receiver) = Deadman::new();
23///
24///     tokio::spawn(async move {
25///         receiver.await;
26///         println!("Deadman was dropped");
27///     });
28///
29///     drop(deadman);
30/// }
31/// ```
32#[must_use]
33pub struct Deadman {
34    state: Arc<DeadmanState>,
35}
36
37impl Deadman {
38    pub fn new() -> (Self, DeadmanReceiver) {
39        let state = Arc::new(DeadmanState {
40            wakers: Mutex::new(Vec::new()),
41            is_dead: AtomicBool::new(false),
42        });
43
44        (
45            Self {
46                state: state.clone(),
47            },
48            DeadmanReceiver { state },
49        )
50    }
51
52    pub fn release(self) {
53        drop(self);
54    }
55}
56
57impl Drop for Deadman {
58    fn drop(&mut self) {
59        self.state.is_dead.store(true, atomic::Ordering::Relaxed);
60        let wakers = std::mem::take(self.state.wakers.lock().deref_mut());
61
62        for waker in wakers {
63            waker.wake();
64        }
65    }
66}
67
68struct DeadmanState {
69    wakers: Mutex<Vec<Waker>>,
70    is_dead: AtomicBool,
71}
72
73#[derive(Clone)]
74pub struct DeadmanReceiver {
75    state: Arc<DeadmanState>,
76}
77
78impl Future for DeadmanReceiver {
79    type Output = ();
80
81    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
82        let inner = Pin::into_inner(self);
83
84        if inner.state.is_dead.load(atomic::Ordering::Relaxed) {
85            Poll::Ready(())
86        } else {
87            inner.state.wakers.lock().push(cx.waker().clone());
88            Poll::Pending
89        }
90    }
91}
92
93#[cfg(test)]
94mod tests {
95    use tokio::{select, sync::oneshot};
96
97    use super::Deadman;
98
99    #[tokio::test]
100    async fn test_deadman() {
101        let (deadman, receiver) = Deadman::new();
102        let (other_sender, other_receiver) = oneshot::channel();
103
104        tokio::spawn(async move {
105            select! {
106                _ = receiver => {},
107                _ = other_receiver => {
108                    assert!(false, "Deadman was not released");
109                },
110            }
111        });
112
113        drop(deadman);
114
115        let _ = other_sender.send(());
116    }
117
118    #[tokio::test]
119    async fn test_deadman_not_released() {
120        let (deadman, receiver) = Deadman::new();
121        let (other_sender, other_receiver) = oneshot::channel();
122
123        tokio::spawn(async move {
124            select! {
125                _ = other_receiver => {},
126                _ = receiver => {
127                    assert!(false, "Deadman was released");
128                },
129            }
130        });
131
132        std::mem::forget(deadman);
133
134        let _ = other_sender.send(());
135    }
136}