triggered/
lib.rs

1//! Triggers for one time events between tasks and threads.
2//!
3//! The mechanism consists of two types, the [`Trigger`] and the [`Listener`]. They come together
4//! as a pair. Much like the sender/receiver pair of a channel. The trigger half has a
5//! [`Trigger::trigger`] method that will make all tasks/threads waiting on
6//! a listener continue executing.
7//! The listener both has a sync [`Listener::wait`] method, and it also implements
8//! `Future<Output = ()>` for async support.
9//!
10//! Both the [`Trigger`] and [`Listener`] can be cloned. So any number of trigger instances can
11//! trigger any number of waiting listeners. When any one trigger instance belonging to the pair is
12//! triggered, all the waiting listeners will be unblocked. Waiting on a listener whose
13//! trigger already went off will return instantly. So each trigger/listener pair can only be fired
14//! once.
15//!
16//! This crate does not use any `unsafe` code.
17//!
18//! # Examples
19//!
20//! A trivial example showing the basic usage:
21//!
22//! ```
23//! #[tokio::main]
24//! async fn main() {
25//!     let (trigger, listener) = triggered::trigger();
26//!
27//!     let task = tokio::spawn(async {
28//!         // Blocks until `trigger.trigger()` below
29//!         listener.await;
30//!
31//!         println!("Triggered async task");
32//!     });
33//!
34//!     // This will make any thread blocked in `Listener::wait()` or async task awaiting the
35//!     // listener continue execution again.
36//!     trigger.trigger();
37//!
38//!     let _ = task.await;
39//! }
40//! ```
41//!
42//! An example showing a trigger/listener pair being used to gracefully shut down some async
43//! server instances on a Ctrl-C event, where only an immutable `Fn` closure is accepted:
44//!
45//! ```
46//! # use std::future::Future;
47//! # type Error = Box<dyn std::error::Error>;
48//! # struct SomeServer;
49//! # impl SomeServer {
50//! #    fn new() -> Self { SomeServer }
51//! #    async fn serve_with_shutdown_signal(self, s: impl Future<Output = ()>) -> Result<(), Error> {Ok(())}
52//! #    async fn serve(self) -> Result<(), Error> {Ok(())}
53//! # }
54//!
55//! #[tokio::main]
56//! async fn main() -> Result<(), Error> {
57//!     let (shutdown_trigger, shutdown_signal1) = triggered::trigger();
58//!
59//!     // A sync `Fn` closure will trigger the trigger when the user hits Ctrl-C
60//!     ctrlc::set_handler(move || {
61//!         shutdown_trigger.trigger();
62//!     }).expect("Error setting Ctrl-C handler");
63//!
64//!     // If the server library has support for something like a shutdown signal:
65//!     let shutdown_signal2 = shutdown_signal1.clone();
66//!     let server1_task = tokio::spawn(async move {
67//!         SomeServer::new().serve_with_shutdown_signal(shutdown_signal1).await;
68//!     });
69//!
70//!     // Or just select between the long running future and the signal to abort it
71//!     tokio::select! {
72//!         server_result = SomeServer::new().serve() => {
73//!             eprintln!("Server error: {:?}", server_result);
74//!         }
75//!         _ = shutdown_signal2 => {}
76//!     }
77//!
78//!     let _ = server1_task.await;
79//!     Ok(())
80//! }
81//! ```
82//!
83//! # Rust Compatibility
84//!
85//! Will work with at least the two latest stable Rust releases. This gives users at least six
86//! weeks to upgrade their Rust toolchain after a new stable is released.
87//!
88//! The current MSRV can be seen in `travis.yml`. Any change to the MSRV will be considered a
89//! breaking change and listed in the changelog.
90//!
91//! # Comparison with similar primitives
92//!
93//! ## Channels
94//!
95//! The event triggering primitives in this library is somewhat similar to channels. The main
96//! difference and why I developed this library is that
97//!
98//! The listener is somewhat similar to a `futures::channel::oneshot::Receiver<()>`. But it:
99//!  * Is not fallible - Implements `Future<Output = ()>` instead of
100//!    `Future<Output = Result<T, Canceled>>`
101//!  * Implements `Clone` - Any number of listeners can wait for the same event
102//!  * Has a sync [`Listener::wait`] - Both synchronous threads, and asynchronous tasks can wait
103//!    at the same time.
104//!
105//! The trigger, when compared to a `futures::channel::oneshot::Sender<()>` has the differences
106//! that it:
107//!  * Is not fallible - The trigger does not care if there are any listeners left
108//!  * Does not consume itself on send, instead takes `&self` - So can be used
109//!    in situations where it is not owned or not mutable. For example in `Drop` implementations
110//!    or callback closures that are limited to `Fn` or `FnMut`.
111//!
112//! ## `futures::future::Abortable`
113//!
114//! One use case of these triggers is to abort futures when some event happens. See examples above.
115//! The differences include:
116//!  * A single handle can abort any number of futures
117//!  * Some futures are not properly cleaned up when just dropped the way `Abortable` does it.
118//!    These libraries sometimes allows creating their futures with a shutdown signal that triggers
119//!    a clean abort. Something like `serve_with_shutdown(signal: impl Future<Output = ()>)`.
120
121#![deny(unsafe_code)]
122#![deny(rust_2018_idioms)]
123
124use std::collections::HashMap;
125use std::mem;
126use std::pin::Pin;
127use std::sync::{
128    atomic::{AtomicBool, AtomicUsize, Ordering},
129    Arc, Condvar, Mutex,
130};
131use std::task::{Context, Poll, Waker};
132
133/// Returns a [`Trigger`] and [`Listener`] pair bound to each other.
134///
135/// The [`Listener`] is used to wait for the trigger to fire. It can be waited on both sync
136/// and async.
137pub fn trigger() -> (Trigger, Listener) {
138    let inner = Arc::new(Inner {
139        complete: AtomicBool::new(false),
140        tasks: Mutex::new(HashMap::new()),
141        condvar: Condvar::new(),
142        next_listener_id: AtomicUsize::new(1),
143    });
144    let trigger = Trigger {
145        inner: inner.clone(),
146    };
147    let listener = Listener { inner, id: 0 };
148    (trigger, listener)
149}
150
151/// A struct used to trigger [`Listener`]s it is paired with.
152///
153/// Can be cloned to create multiple instances that all trigger the same listeners.
154#[derive(Clone, Debug)]
155pub struct Trigger {
156    inner: Arc<Inner>,
157}
158
159/// A struct used to wait for a trigger event from a [`Trigger`].
160///
161/// Can be waited on synchronously via [`Listener::wait`] or asynchronously thanks to the struct
162/// implementing `Future`.
163///
164/// The listener can be cloned and any amount of threads and tasks can wait for the same trigger
165/// at the same time.
166#[derive(Debug)]
167pub struct Listener {
168    inner: Arc<Inner>,
169    id: usize,
170}
171
172impl Clone for Listener {
173    fn clone(&self) -> Self {
174        Listener {
175            inner: self.inner.clone(),
176            id: self.inner.next_listener_id.fetch_add(1, Ordering::SeqCst),
177        }
178    }
179}
180
181impl Drop for Listener {
182    fn drop(&mut self) {
183        self.inner
184            .tasks
185            .lock()
186            .expect("Some Trigger/Listener has panicked")
187            .remove(&self.id);
188    }
189}
190
191#[derive(Debug)]
192struct Inner {
193    complete: AtomicBool,
194    tasks: Mutex<HashMap<usize, Waker>>,
195    condvar: Condvar,
196    next_listener_id: AtomicUsize,
197}
198
199impl Unpin for Trigger {}
200impl Unpin for Listener {}
201
202impl Trigger {
203    /// Trigger all [`Listener`]s paired with this trigger.
204    ///
205    /// Makes all listeners currently blocked in [`Listener::wait`] return,
206    /// and all that is being `await`ed finish.
207    ///
208    /// Calling this method only does anything the first time. Any subsequent trigger call to
209    /// the same instance or a clone thereof does nothing, it has already been triggered.
210    /// Any listener waiting on the trigger after it has been triggered will just return
211    /// instantly.
212    ///
213    /// This method is safe to call from both async and sync code. It's not an async function,
214    /// but it always finishes very fast.
215    pub fn trigger(&self) {
216        if self.inner.complete.swap(true, Ordering::SeqCst) {
217            return;
218        }
219        // This code will only be executed once per trigger instance. No matter the amount of
220        // `Trigger` clones or calls to `trigger()`, thanks to the atomic swap above.
221        let mut tasks_guard = self
222            .inner
223            .tasks
224            .lock()
225            .expect("Some Trigger/Listener has panicked");
226        let tasks = mem::take(&mut *tasks_guard);
227        mem::drop(tasks_guard);
228        for (_listener_id, task) in tasks {
229            task.wake();
230        }
231        self.inner.condvar.notify_all();
232    }
233
234    /// Returns true if this trigger has been triggered.
235    pub fn is_triggered(&self) -> bool {
236        self.inner.complete.load(Ordering::SeqCst)
237    }
238}
239
240impl std::future::Future for Listener {
241    type Output = ();
242
243    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
244        if self.inner.complete.load(Ordering::SeqCst) {
245            return Poll::Ready(());
246        }
247
248        let mut task_guard = self
249            .inner
250            .tasks
251            .lock()
252            .expect("Some Trigger/Listener has panicked");
253
254        // If the trigger completed while we waited for the lock, skip adding our waker to the list
255        // of tasks.
256        if self.inner.complete.load(Ordering::SeqCst) {
257            Poll::Ready(())
258        } else {
259            task_guard.insert(self.id, cx.waker().clone());
260            Poll::Pending
261        }
262    }
263}
264
265impl Listener {
266    /// Wait for this trigger synchronously.
267    ///
268    /// Blocks the current thread until the corresponding [`Trigger`] is triggered.
269    /// If the trigger has already been triggered at least once, this returns immediately.
270    pub fn wait(&self) {
271        if self.inner.complete.load(Ordering::SeqCst) {
272            return;
273        }
274
275        let mut task_guard = self
276            .inner
277            .tasks
278            .lock()
279            .expect("Some Trigger/Listener has panicked");
280
281        while !self.inner.complete.load(Ordering::SeqCst) {
282            task_guard = self
283                .inner
284                .condvar
285                .wait(task_guard)
286                .expect("Some Trigger/Listener has panicked");
287        }
288    }
289
290    /// Returns true if this trigger has been triggered.
291    pub fn is_triggered(&self) -> bool {
292        self.inner.complete.load(Ordering::SeqCst)
293    }
294}
295
296#[allow(unsafe_code)]
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use std::future::Future;
301    use std::sync::atomic::AtomicU8;
302    use std::task::{RawWaker, RawWakerVTable};
303
304    #[test]
305    fn polling_listener_keeps_only_last_waker() {
306        let (_trigger, mut listener) = trigger();
307
308        let (waker1, waker_handle1) = create_waker();
309        {
310            let mut context = Context::from_waker(&waker1);
311            let listener = Pin::new(&mut listener);
312            assert_eq!(listener.poll(&mut context), Poll::Pending);
313        }
314        assert!(waker_handle1.data.load(Ordering::SeqCst) & CLONED != 0);
315        assert!(waker_handle1.data.load(Ordering::SeqCst) & DROPPED == 0);
316
317        let (waker2, waker_handle2) = create_waker();
318        {
319            let mut context = Context::from_waker(&waker2);
320            let listener = Pin::new(&mut listener);
321            assert_eq!(listener.poll(&mut context), Poll::Pending);
322        }
323        assert!(waker_handle2.data.load(Ordering::SeqCst) & CLONED != 0);
324        assert!(waker_handle2.data.load(Ordering::SeqCst) & DROPPED == 0);
325        assert!(waker_handle1.data.load(Ordering::SeqCst) & DROPPED != 0);
326    }
327
328    const CLONED: u8 = 0b0001;
329    const WOKE: u8 = 0b0010;
330    const DROPPED: u8 = 0b0100;
331
332    fn create_waker() -> (Waker, Arc<WakerHandle>) {
333        let waker_handle = Arc::new(WakerHandle {
334            data: AtomicU8::new(0),
335        });
336        let data = Arc::into_raw(waker_handle.clone()) as *const _;
337        let raw_waker = RawWaker::new(data, &VTABLE);
338        (unsafe { Waker::from_raw(raw_waker) }, waker_handle)
339    }
340
341    struct WakerHandle {
342        data: AtomicU8,
343    }
344
345    impl Drop for WakerHandle {
346        fn drop(&mut self) {
347            println!("WakerHandle dropped");
348        }
349    }
350
351    const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
352
353    unsafe fn clone(data: *const ()) -> RawWaker {
354        let waker_handle = &*(data as *const WakerHandle);
355        waker_handle.data.fetch_or(CLONED, Ordering::SeqCst);
356        Arc::increment_strong_count(waker_handle);
357        RawWaker::new(data, &VTABLE)
358    }
359
360    unsafe fn wake(data: *const ()) {
361        let waker_handle = &*(data as *const WakerHandle);
362        waker_handle.data.fetch_or(WOKE, Ordering::SeqCst);
363    }
364
365    unsafe fn wake_by_ref(_data: *const ()) {
366        todo!();
367    }
368
369    unsafe fn drop(data: *const ()) {
370        let waker_handle = &*(data as *const WakerHandle);
371        waker_handle.data.fetch_or(DROPPED, Ordering::SeqCst);
372        Arc::decrement_strong_count(waker_handle);
373    }
374}