triggered/lib.rs
1//! Triggers for one time events between tasks and threads.
2//!
3//! The mechanism consists of two types, the [`Trigger`] and the [`Listener`]. They come together
4//! as a pair. Much like the sender/receiver pair of a channel. The trigger half has a
5//! [`Trigger::trigger`] method that will make all tasks/threads waiting on
6//! a listener continue executing.
7//! The listener both has a sync [`Listener::wait`] method, and it also implements
8//! `Future<Output = ()>` for async support.
9//!
10//! Both the [`Trigger`] and [`Listener`] can be cloned. So any number of trigger instances can
11//! trigger any number of waiting listeners. When any one trigger instance belonging to the pair is
12//! triggered, all the waiting listeners will be unblocked. Waiting on a listener whose
13//! trigger already went off will return instantly. So each trigger/listener pair can only be fired
14//! once.
15//!
16//! This crate does not use any `unsafe` code.
17//!
18//! # Examples
19//!
20//! A trivial example showing the basic usage:
21//!
22//! ```
23//! #[tokio::main]
24//! async fn main() {
25//! let (trigger, listener) = triggered::trigger();
26//!
27//! let task = tokio::spawn(async {
28//! // Blocks until `trigger.trigger()` below
29//! listener.await;
30//!
31//! println!("Triggered async task");
32//! });
33//!
34//! // This will make any thread blocked in `Listener::wait()` or async task awaiting the
35//! // listener continue execution again.
36//! trigger.trigger();
37//!
38//! let _ = task.await;
39//! }
40//! ```
41//!
42//! An example showing a trigger/listener pair being used to gracefully shut down some async
43//! server instances on a Ctrl-C event, where only an immutable `Fn` closure is accepted:
44//!
45//! ```
46//! # use std::future::Future;
47//! # type Error = Box<dyn std::error::Error>;
48//! # struct SomeServer;
49//! # impl SomeServer {
50//! # fn new() -> Self { SomeServer }
51//! # async fn serve_with_shutdown_signal(self, s: impl Future<Output = ()>) -> Result<(), Error> {Ok(())}
52//! # async fn serve(self) -> Result<(), Error> {Ok(())}
53//! # }
54//!
55//! #[tokio::main]
56//! async fn main() -> Result<(), Error> {
57//! let (shutdown_trigger, shutdown_signal1) = triggered::trigger();
58//!
59//! // A sync `Fn` closure will trigger the trigger when the user hits Ctrl-C
60//! ctrlc::set_handler(move || {
61//! shutdown_trigger.trigger();
62//! }).expect("Error setting Ctrl-C handler");
63//!
64//! // If the server library has support for something like a shutdown signal:
65//! let shutdown_signal2 = shutdown_signal1.clone();
66//! let server1_task = tokio::spawn(async move {
67//! SomeServer::new().serve_with_shutdown_signal(shutdown_signal1).await;
68//! });
69//!
70//! // Or just select between the long running future and the signal to abort it
71//! tokio::select! {
72//! server_result = SomeServer::new().serve() => {
73//! eprintln!("Server error: {:?}", server_result);
74//! }
75//! _ = shutdown_signal2 => {}
76//! }
77//!
78//! let _ = server1_task.await;
79//! Ok(())
80//! }
81//! ```
82//!
83//! # Rust Compatibility
84//!
85//! Will work with at least the two latest stable Rust releases. This gives users at least six
86//! weeks to upgrade their Rust toolchain after a new stable is released.
87//!
88//! The current MSRV can be seen in `travis.yml`. Any change to the MSRV will be considered a
89//! breaking change and listed in the changelog.
90//!
91//! # Comparison with similar primitives
92//!
93//! ## Channels
94//!
95//! The event triggering primitives in this library is somewhat similar to channels. The main
96//! difference and why I developed this library is that
97//!
98//! The listener is somewhat similar to a `futures::channel::oneshot::Receiver<()>`. But it:
99//! * Is not fallible - Implements `Future<Output = ()>` instead of
100//! `Future<Output = Result<T, Canceled>>`
101//! * Implements `Clone` - Any number of listeners can wait for the same event
102//! * Has a sync [`Listener::wait`] - Both synchronous threads, and asynchronous tasks can wait
103//! at the same time.
104//!
105//! The trigger, when compared to a `futures::channel::oneshot::Sender<()>` has the differences
106//! that it:
107//! * Is not fallible - The trigger does not care if there are any listeners left
108//! * Does not consume itself on send, instead takes `&self` - So can be used
109//! in situations where it is not owned or not mutable. For example in `Drop` implementations
110//! or callback closures that are limited to `Fn` or `FnMut`.
111//!
112//! ## `futures::future::Abortable`
113//!
114//! One use case of these triggers is to abort futures when some event happens. See examples above.
115//! The differences include:
116//! * A single handle can abort any number of futures
117//! * Some futures are not properly cleaned up when just dropped the way `Abortable` does it.
118//! These libraries sometimes allows creating their futures with a shutdown signal that triggers
119//! a clean abort. Something like `serve_with_shutdown(signal: impl Future<Output = ()>)`.
120
121#![deny(unsafe_code)]
122#![deny(rust_2018_idioms)]
123
124use std::collections::HashMap;
125use std::mem;
126use std::pin::Pin;
127use std::sync::{
128 atomic::{AtomicBool, AtomicUsize, Ordering},
129 Arc, Condvar, Mutex,
130};
131use std::task::{Context, Poll, Waker};
132
133/// Returns a [`Trigger`] and [`Listener`] pair bound to each other.
134///
135/// The [`Listener`] is used to wait for the trigger to fire. It can be waited on both sync
136/// and async.
137pub fn trigger() -> (Trigger, Listener) {
138 let inner = Arc::new(Inner {
139 complete: AtomicBool::new(false),
140 tasks: Mutex::new(HashMap::new()),
141 condvar: Condvar::new(),
142 next_listener_id: AtomicUsize::new(1),
143 });
144 let trigger = Trigger {
145 inner: inner.clone(),
146 };
147 let listener = Listener { inner, id: 0 };
148 (trigger, listener)
149}
150
151/// A struct used to trigger [`Listener`]s it is paired with.
152///
153/// Can be cloned to create multiple instances that all trigger the same listeners.
154#[derive(Clone, Debug)]
155pub struct Trigger {
156 inner: Arc<Inner>,
157}
158
159/// A struct used to wait for a trigger event from a [`Trigger`].
160///
161/// Can be waited on synchronously via [`Listener::wait`] or asynchronously thanks to the struct
162/// implementing `Future`.
163///
164/// The listener can be cloned and any amount of threads and tasks can wait for the same trigger
165/// at the same time.
166#[derive(Debug)]
167pub struct Listener {
168 inner: Arc<Inner>,
169 id: usize,
170}
171
172impl Clone for Listener {
173 fn clone(&self) -> Self {
174 Listener {
175 inner: self.inner.clone(),
176 id: self.inner.next_listener_id.fetch_add(1, Ordering::SeqCst),
177 }
178 }
179}
180
181impl Drop for Listener {
182 fn drop(&mut self) {
183 self.inner
184 .tasks
185 .lock()
186 .expect("Some Trigger/Listener has panicked")
187 .remove(&self.id);
188 }
189}
190
191#[derive(Debug)]
192struct Inner {
193 complete: AtomicBool,
194 tasks: Mutex<HashMap<usize, Waker>>,
195 condvar: Condvar,
196 next_listener_id: AtomicUsize,
197}
198
199impl Unpin for Trigger {}
200impl Unpin for Listener {}
201
202impl Trigger {
203 /// Trigger all [`Listener`]s paired with this trigger.
204 ///
205 /// Makes all listeners currently blocked in [`Listener::wait`] return,
206 /// and all that is being `await`ed finish.
207 ///
208 /// Calling this method only does anything the first time. Any subsequent trigger call to
209 /// the same instance or a clone thereof does nothing, it has already been triggered.
210 /// Any listener waiting on the trigger after it has been triggered will just return
211 /// instantly.
212 ///
213 /// This method is safe to call from both async and sync code. It's not an async function,
214 /// but it always finishes very fast.
215 pub fn trigger(&self) {
216 if self.inner.complete.swap(true, Ordering::SeqCst) {
217 return;
218 }
219 // This code will only be executed once per trigger instance. No matter the amount of
220 // `Trigger` clones or calls to `trigger()`, thanks to the atomic swap above.
221 let mut tasks_guard = self
222 .inner
223 .tasks
224 .lock()
225 .expect("Some Trigger/Listener has panicked");
226 let tasks = mem::take(&mut *tasks_guard);
227 mem::drop(tasks_guard);
228 for (_listener_id, task) in tasks {
229 task.wake();
230 }
231 self.inner.condvar.notify_all();
232 }
233
234 /// Returns true if this trigger has been triggered.
235 pub fn is_triggered(&self) -> bool {
236 self.inner.complete.load(Ordering::SeqCst)
237 }
238}
239
240impl std::future::Future for Listener {
241 type Output = ();
242
243 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
244 if self.inner.complete.load(Ordering::SeqCst) {
245 return Poll::Ready(());
246 }
247
248 let mut task_guard = self
249 .inner
250 .tasks
251 .lock()
252 .expect("Some Trigger/Listener has panicked");
253
254 // If the trigger completed while we waited for the lock, skip adding our waker to the list
255 // of tasks.
256 if self.inner.complete.load(Ordering::SeqCst) {
257 Poll::Ready(())
258 } else {
259 task_guard.insert(self.id, cx.waker().clone());
260 Poll::Pending
261 }
262 }
263}
264
265impl Listener {
266 /// Wait for this trigger synchronously.
267 ///
268 /// Blocks the current thread until the corresponding [`Trigger`] is triggered.
269 /// If the trigger has already been triggered at least once, this returns immediately.
270 pub fn wait(&self) {
271 if self.inner.complete.load(Ordering::SeqCst) {
272 return;
273 }
274
275 let mut task_guard = self
276 .inner
277 .tasks
278 .lock()
279 .expect("Some Trigger/Listener has panicked");
280
281 while !self.inner.complete.load(Ordering::SeqCst) {
282 task_guard = self
283 .inner
284 .condvar
285 .wait(task_guard)
286 .expect("Some Trigger/Listener has panicked");
287 }
288 }
289
290 /// Returns true if this trigger has been triggered.
291 pub fn is_triggered(&self) -> bool {
292 self.inner.complete.load(Ordering::SeqCst)
293 }
294}
295
296#[allow(unsafe_code)]
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use std::future::Future;
301 use std::sync::atomic::AtomicU8;
302 use std::task::{RawWaker, RawWakerVTable};
303
304 #[test]
305 fn polling_listener_keeps_only_last_waker() {
306 let (_trigger, mut listener) = trigger();
307
308 let (waker1, waker_handle1) = create_waker();
309 {
310 let mut context = Context::from_waker(&waker1);
311 let listener = Pin::new(&mut listener);
312 assert_eq!(listener.poll(&mut context), Poll::Pending);
313 }
314 assert!(waker_handle1.data.load(Ordering::SeqCst) & CLONED != 0);
315 assert!(waker_handle1.data.load(Ordering::SeqCst) & DROPPED == 0);
316
317 let (waker2, waker_handle2) = create_waker();
318 {
319 let mut context = Context::from_waker(&waker2);
320 let listener = Pin::new(&mut listener);
321 assert_eq!(listener.poll(&mut context), Poll::Pending);
322 }
323 assert!(waker_handle2.data.load(Ordering::SeqCst) & CLONED != 0);
324 assert!(waker_handle2.data.load(Ordering::SeqCst) & DROPPED == 0);
325 assert!(waker_handle1.data.load(Ordering::SeqCst) & DROPPED != 0);
326 }
327
328 const CLONED: u8 = 0b0001;
329 const WOKE: u8 = 0b0010;
330 const DROPPED: u8 = 0b0100;
331
332 fn create_waker() -> (Waker, Arc<WakerHandle>) {
333 let waker_handle = Arc::new(WakerHandle {
334 data: AtomicU8::new(0),
335 });
336 let data = Arc::into_raw(waker_handle.clone()) as *const _;
337 let raw_waker = RawWaker::new(data, &VTABLE);
338 (unsafe { Waker::from_raw(raw_waker) }, waker_handle)
339 }
340
341 struct WakerHandle {
342 data: AtomicU8,
343 }
344
345 impl Drop for WakerHandle {
346 fn drop(&mut self) {
347 println!("WakerHandle dropped");
348 }
349 }
350
351 const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
352
353 unsafe fn clone(data: *const ()) -> RawWaker {
354 let waker_handle = &*(data as *const WakerHandle);
355 waker_handle.data.fetch_or(CLONED, Ordering::SeqCst);
356 Arc::increment_strong_count(waker_handle);
357 RawWaker::new(data, &VTABLE)
358 }
359
360 unsafe fn wake(data: *const ()) {
361 let waker_handle = &*(data as *const WakerHandle);
362 waker_handle.data.fetch_or(WOKE, Ordering::SeqCst);
363 }
364
365 unsafe fn wake_by_ref(_data: *const ()) {
366 todo!();
367 }
368
369 unsafe fn drop(data: *const ()) {
370 let waker_handle = &*(data as *const WakerHandle);
371 waker_handle.data.fetch_or(DROPPED, Ordering::SeqCst);
372 Arc::decrement_strong_count(waker_handle);
373 }
374}