triggered/lib.rs
1//! Triggers for one time events between tasks and threads.
2//!
3//! The mechanism consists of two types, the [`Trigger`] and the [`Listener`]. They come together
4//! as a pair. Much like the sender/receiver pair of a channel. The trigger half has a
5//! [`Trigger::trigger`] method that will make all tasks/threads waiting on
6//! a listener continue executing.
7//! The listener both has a sync [`Listener::wait`] method, and it also implements
8//! `Future<Output = ()>` for async support.
9//!
10//! Both the [`Trigger`] and [`Listener`] can be cloned. So any number of trigger instances can
11//! trigger any number of waiting listeners. When any one trigger instance belonging to the pair is
12//! triggered, all the waiting listeners will be unblocked. Waiting on a listener whose
13//! trigger already went off will return instantly. So each trigger/listener pair can only be fired
14//! once.
15//!
16//! This crate does not use any `unsafe` code.
17//!
18//! # Examples
19//!
20//! A trivial example showing the basic usage:
21//!
22//! ```
23//! #[tokio::main]
24//! async fn main() {
25//! let (trigger, listener) = triggered::trigger();
26//!
27//! let task = tokio::spawn(async {
28//! // Blocks until `trigger.trigger()` below
29//! listener.await;
30//!
31//! println!("Triggered async task");
32//! });
33//!
34//! // This will make any thread blocked in `Listener::wait()` or async task awaiting the
35//! // listener continue execution again.
36//! trigger.trigger();
37//!
38//! let _ = task.await;
39//! }
40//! ```
41//!
42//! An example showing a trigger/listener pair being used to gracefully shut down some async
43//! server instances on a Ctrl-C event, where only an immutable `Fn` closure is accepted:
44//!
45//! ```
46//! # use std::future::Future;
47//! # type Error = Box<dyn std::error::Error>;
48//! # struct SomeServer;
49//! # impl SomeServer {
50//! # fn new() -> Self { SomeServer }
51//! # async fn serve_with_shutdown_signal(self, s: impl Future<Output = ()>) -> Result<(), Error> {Ok(())}
52//! # async fn serve(self) -> Result<(), Error> {Ok(())}
53//! # }
54//!
55//! #[tokio::main]
56//! async fn main() -> Result<(), Error> {
57//! let (shutdown_trigger, shutdown_signal1) = triggered::trigger();
58//!
59//! // A sync `Fn` closure will trigger the trigger when the user hits Ctrl-C
60//! ctrlc::set_handler(move || {
61//! shutdown_trigger.trigger();
62//! }).expect("Error setting Ctrl-C handler");
63//!
64//! // If the server library has support for something like a shutdown signal:
65//! let shutdown_signal2 = shutdown_signal1.clone();
66//! let server1_task = tokio::spawn(async move {
67//! SomeServer::new().serve_with_shutdown_signal(shutdown_signal1).await;
68//! });
69//!
70//! // Or just select between the long running future and the signal to abort it
71//! tokio::select! {
72//! server_result = SomeServer::new().serve() => {
73//! eprintln!("Server error: {:?}", server_result);
74//! }
75//! _ = shutdown_signal2 => {}
76//! }
77//!
78//! let _ = server1_task.await;
79//! Ok(())
80//! }
81//! ```
82//!
83//! # Rust Compatibility
84//!
85//! Will work with at least the two latest stable Rust releases. This gives users at least six
86//! weeks to upgrade their Rust toolchain after a new stable is released.
87//!
88//! The current MSRV can be seen in `travis.yml`. Any change to the MSRV will be considered a
89//! breaking change and listed in the changelog.
90//!
91//! # Comparison with similar primitives
92//!
93//! ## Channels
94//!
95//! The event triggering primitives in this library is somewhat similar to channels. The main
96//! difference and why I developed this library is that
97//!
98//! The listener is somewhat similar to a `futures::channel::oneshot::Receiver<()>`. But it:
99//! * Is not fallible - Implements `Future<Output = ()>` instead of
100//! `Future<Output = Result<T, Canceled>>`
101//! * Implements `Clone` - Any number of listeners can wait for the same event
102//! * Has a sync [`Listener::wait`] - Both synchronous threads, and asynchronous tasks can wait
103//! at the same time.
104//!
105//! The trigger, when compared to a `futures::channel::oneshot::Sender<()>` has the differences
106//! that it:
107//! * Is not fallible - The trigger does not care if there are any listeners left
108//! * Does not consume itself on send, instead takes `&self` - So can be used
109//! in situations where it is not owned or not mutable. For example in `Drop` implementations
110//! or callback closures that are limited to `Fn` or `FnMut`.
111//!
112//! ## `futures::future::Abortable`
113//!
114//! One use case of these triggers is to abort futures when some event happens. See examples above.
115//! The differences include:
116//! * A single handle can abort any number of futures
117//! * Some futures are not properly cleaned up when just dropped the way `Abortable` does it.
118//! These libraries sometimes allows creating their futures with a shutdown signal that triggers
119//! a clean abort. Something like `serve_with_shutdown(signal: impl Future<Output = ()>)`.
120
121#![deny(unsafe_code)]
122#![deny(rust_2018_idioms)]
123
124use std::collections::HashMap;
125use std::mem;
126use std::pin::Pin;
127use std::sync::{
128 atomic::{AtomicBool, AtomicUsize, Ordering},
129 Arc, Condvar, Mutex,
130};
131use std::task::{Context, Poll, Waker};
132use std::time::Duration;
133
134/// Returns a [`Trigger`] and [`Listener`] pair bound to each other.
135///
136/// The [`Listener`] is used to wait for the trigger to fire. It can be waited on both sync
137/// and async.
138pub fn trigger() -> (Trigger, Listener) {
139 let inner = Arc::new(Inner {
140 complete: AtomicBool::new(false),
141 tasks: Mutex::new(HashMap::new()),
142 condvar: Condvar::new(),
143 next_listener_id: AtomicUsize::new(1),
144 });
145 let trigger = Trigger {
146 inner: inner.clone(),
147 };
148 let listener = Listener { inner, id: 0 };
149 (trigger, listener)
150}
151
152/// A struct used to trigger [`Listener`]s it is paired with.
153///
154/// Can be cloned to create multiple instances that all trigger the same listeners.
155#[derive(Clone, Debug)]
156pub struct Trigger {
157 inner: Arc<Inner>,
158}
159
160/// A struct used to wait for a trigger event from a [`Trigger`].
161///
162/// Can be waited on synchronously via [`Listener::wait`] or asynchronously thanks to the struct
163/// implementing `Future`.
164///
165/// The listener can be cloned and any amount of threads and tasks can wait for the same trigger
166/// at the same time.
167#[derive(Debug)]
168pub struct Listener {
169 inner: Arc<Inner>,
170 id: usize,
171}
172
173impl Clone for Listener {
174 fn clone(&self) -> Self {
175 Listener {
176 inner: self.inner.clone(),
177 id: self.inner.next_listener_id.fetch_add(1, Ordering::SeqCst),
178 }
179 }
180}
181
182impl Drop for Listener {
183 fn drop(&mut self) {
184 self.inner
185 .tasks
186 .lock()
187 .expect("Some Trigger/Listener has panicked")
188 .remove(&self.id);
189 }
190}
191
192#[derive(Debug)]
193struct Inner {
194 complete: AtomicBool,
195 tasks: Mutex<HashMap<usize, Waker>>,
196 condvar: Condvar,
197 next_listener_id: AtomicUsize,
198}
199
200impl Unpin for Trigger {}
201impl Unpin for Listener {}
202
203impl Trigger {
204 /// Trigger all [`Listener`]s paired with this trigger.
205 ///
206 /// Makes all listeners currently blocked in [`Listener::wait`] return,
207 /// and all that is being `await`ed finish.
208 ///
209 /// Calling this method only does anything the first time. Any subsequent trigger call to
210 /// the same instance or a clone thereof does nothing, it has already been triggered.
211 /// Any listener waiting on the trigger after it has been triggered will just return
212 /// instantly.
213 ///
214 /// This method is safe to call from both async and sync code. It's not an async function,
215 /// but it always finishes very fast.
216 pub fn trigger(&self) {
217 if self.inner.complete.swap(true, Ordering::SeqCst) {
218 return;
219 }
220 // This code will only be executed once per trigger instance. No matter the amount of
221 // `Trigger` clones or calls to `trigger()`, thanks to the atomic swap above.
222 let mut tasks_guard = self
223 .inner
224 .tasks
225 .lock()
226 .expect("Some Trigger/Listener has panicked");
227 let tasks = mem::take(&mut *tasks_guard);
228 mem::drop(tasks_guard);
229 for (_listener_id, task) in tasks {
230 task.wake();
231 }
232 self.inner.condvar.notify_all();
233 }
234
235 /// Returns true if this trigger has been triggered.
236 pub fn is_triggered(&self) -> bool {
237 self.inner.complete.load(Ordering::SeqCst)
238 }
239}
240
241impl std::future::Future for Listener {
242 type Output = ();
243
244 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
245 if self.inner.complete.load(Ordering::SeqCst) {
246 return Poll::Ready(());
247 }
248
249 let mut task_guard = self
250 .inner
251 .tasks
252 .lock()
253 .expect("Some Trigger/Listener has panicked");
254
255 // If the trigger completed while we waited for the lock, skip adding our waker to the list
256 // of tasks.
257 if self.inner.complete.load(Ordering::SeqCst) {
258 Poll::Ready(())
259 } else {
260 task_guard.insert(self.id, cx.waker().clone());
261 Poll::Pending
262 }
263 }
264}
265
266impl Listener {
267 /// Wait for this trigger synchronously.
268 ///
269 /// Blocks the current thread until the corresponding [`Trigger`] is triggered.
270 /// If the trigger has already been triggered at least once, this returns immediately.
271 pub fn wait(&self) {
272 if self.inner.complete.load(Ordering::SeqCst) {
273 return;
274 }
275
276 let task_guard = self
277 .inner
278 .tasks
279 .lock()
280 .expect("Some Trigger/Listener has panicked");
281
282 let _guard = self
283 .inner
284 .condvar
285 .wait_while(task_guard, |_| !self.inner.complete.load(Ordering::SeqCst))
286 .expect("Some Trigger/Listener has panicked");
287 }
288
289 /// Wait for this trigger synchronously, timing out after a specified duration.
290 ///
291 /// The semantics of this function are equivalent to [`Listener::wait`] except that the
292 /// thread will be blocked for roughly no longer than `duration`.
293 ///
294 /// Returns `true` if this method returned because the trigger was triggered. Returns
295 /// `false` if it returned due to the timeout.
296 ///
297 /// In an async program the same can be achieved by wrapping the `Listener` in one of the
298 /// many `Timeout` implementations that exists.
299 pub fn wait_timeout(&self, duration: Duration) -> bool {
300 if self.inner.complete.load(Ordering::SeqCst) {
301 return true;
302 }
303
304 let task_guard = self
305 .inner
306 .tasks
307 .lock()
308 .expect("Some Trigger/Listener has panicked");
309
310 let _ = self
311 .inner
312 .condvar
313 .wait_timeout_while(task_guard, duration, |_| {
314 !self.inner.complete.load(Ordering::SeqCst)
315 })
316 .expect("Some Trigger/Listener has panicked");
317
318 self.inner.complete.load(Ordering::SeqCst)
319 }
320
321 /// Returns true if this trigger has been triggered.
322 pub fn is_triggered(&self) -> bool {
323 self.inner.complete.load(Ordering::SeqCst)
324 }
325}
326
327#[allow(unsafe_code)]
328#[cfg(test)]
329mod tests {
330 use super::*;
331 use std::future::Future;
332 use std::sync::atomic::AtomicU8;
333 use std::task::{RawWaker, RawWakerVTable};
334
335 #[test]
336 fn polling_listener_keeps_only_last_waker() {
337 let (_trigger, mut listener) = trigger();
338
339 let (waker1, waker_handle1) = create_waker();
340 {
341 let mut context = Context::from_waker(&waker1);
342 let listener = Pin::new(&mut listener);
343 assert_eq!(listener.poll(&mut context), Poll::Pending);
344 }
345 assert!(waker_handle1.data.load(Ordering::SeqCst) & CLONED != 0);
346 assert!(waker_handle1.data.load(Ordering::SeqCst) & DROPPED == 0);
347
348 let (waker2, waker_handle2) = create_waker();
349 {
350 let mut context = Context::from_waker(&waker2);
351 let listener = Pin::new(&mut listener);
352 assert_eq!(listener.poll(&mut context), Poll::Pending);
353 }
354 assert!(waker_handle2.data.load(Ordering::SeqCst) & CLONED != 0);
355 assert!(waker_handle2.data.load(Ordering::SeqCst) & DROPPED == 0);
356 assert!(waker_handle1.data.load(Ordering::SeqCst) & DROPPED != 0);
357 }
358
359 const CLONED: u8 = 0b0001;
360 const WOKE: u8 = 0b0010;
361 const DROPPED: u8 = 0b0100;
362
363 fn create_waker() -> (Waker, Arc<WakerHandle>) {
364 let waker_handle = Arc::new(WakerHandle {
365 data: AtomicU8::new(0),
366 });
367 let data = Arc::into_raw(waker_handle.clone()) as *const _;
368 let raw_waker = RawWaker::new(data, &VTABLE);
369 (unsafe { Waker::from_raw(raw_waker) }, waker_handle)
370 }
371
372 struct WakerHandle {
373 data: AtomicU8,
374 }
375
376 impl Drop for WakerHandle {
377 fn drop(&mut self) {
378 println!("WakerHandle dropped");
379 }
380 }
381
382 const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
383
384 unsafe fn clone(data: *const ()) -> RawWaker {
385 let waker_handle = &*(data as *const WakerHandle);
386 waker_handle.data.fetch_or(CLONED, Ordering::SeqCst);
387 Arc::increment_strong_count(waker_handle);
388 RawWaker::new(data, &VTABLE)
389 }
390
391 unsafe fn wake(data: *const ()) {
392 let waker_handle = &*(data as *const WakerHandle);
393 waker_handle.data.fetch_or(WOKE, Ordering::SeqCst);
394 }
395
396 unsafe fn wake_by_ref(_data: *const ()) {
397 todo!();
398 }
399
400 unsafe fn drop(data: *const ()) {
401 let waker_handle = &*(data as *const WakerHandle);
402 waker_handle.data.fetch_or(DROPPED, Ordering::SeqCst);
403 Arc::decrement_strong_count(waker_handle);
404 }
405}