go_spawn/
lib.rs

1//! `go-spawn` is a library that provides macros to spawn and join threads with minimal boilerplate.
2
3pub mod error;
4#[cfg(test)]
5mod test;
6
7use state::LocalStorage;
8use std::{cell::RefCell, collections::VecDeque, thread::JoinHandle};
9
10#[doc(hidden)]
11pub static __THREAD_HANDLE_INITIALIZED: LocalStorage<RefCell<VecDeque<JoinHandle<()>>>> =
12    LocalStorage::new();
13
14#[cfg(test)]
15fn drop_all_handles() {
16    if let Some(handles) = __THREAD_HANDLE_INITIALIZED.try_get() {
17        handles.borrow_mut().clear();
18    }
19}
20
21/// Spawns a thread to execute the given code. Any captured variables will be moved to the spawned
22/// thread.
23///
24/// The thread's handle will be stored in thread-local storage until joined by `join!`
25/// or `join_all!`.
26///
27/// # Examples
28/// ```
29/// # fn expensive_background_work() {}
30/// # fn do_work() {
31/// use go_spawn::go;
32/// use std::sync::{
33///     atomic::{AtomicI64, Ordering},
34///     Arc,
35/// };
36///
37/// go!(expensive_background_work());
38///
39/// let counter = Arc::new(AtomicI64::new(0));
40/// let copy_of_counter = counter.clone();
41///
42/// go! {
43///     for _ in 0..1_000_000 {
44///         copy_of_counter.fetch_add(1, Ordering::SeqCst);
45///     }
46/// }
47/// # }
48/// ```
49#[macro_export]
50macro_rules! go {
51    ( $( $tokens:tt )+ ) => {
52        go_spawn::__go_helper!(@move $( $tokens )+ )
53    };
54}
55
56/// Spawns a thread to execute the given code. Any captured variables will be borrowed by the
57/// spawned thread.
58///
59/// The thread's handle will be stored in thread-local storage until joined by
60/// `join!` or `join_all!`.
61///
62/// # Examples
63/// ```
64/// # fn expensive_background_work() {}
65/// # fn do_work() {
66/// use go_spawn::go_ref;
67/// use std::sync::{
68///     atomic::{AtomicI64, Ordering},
69/// };
70///
71/// go_ref!(expensive_background_work());
72///
73/// static COUNTER: AtomicI64 = AtomicI64::new(0);
74///
75/// go_ref! {
76///     for _ in 0..1_000_000 {
77///         COUNTER.fetch_add(1, Ordering::SeqCst);
78///     }
79/// }
80/// # }
81/// ```
82#[macro_export]
83macro_rules! go_ref {
84    ( $( $tokens:tt )+ ) => {
85        go_spawn::__go_helper!(@ref $( $tokens )+ )
86    };
87}
88
89/// Joins the most recent not-yet-joined thread spawned from the current thread by either `go!` or
90/// `go_ref!`.
91///
92/// This will always yield a `go_spawn::error::Result`, which indicates whether a thread
93/// was successfully joined.
94///
95/// # Examples
96/// ```
97/// # fn expensive_background_work() {}
98/// # fn do_work() {
99/// use go_spawn::{
100///     error::{JoinError, Result},
101///     go, go_ref, join,
102/// };
103/// use std::sync::{
104///     atomic::{AtomicI64, Ordering},
105/// };
106///
107/// go_ref!(expensive_background_work());
108///
109/// static COUNTER: AtomicI64 = AtomicI64::new(0);
110///
111/// go_ref! {
112///     for _ in 0..1_000_000 {
113///         COUNTER.fetch_add(1, Ordering::SeqCst);
114///     }
115/// }
116///
117/// assert!(join!().is_ok());
118/// assert_eq!(COUNTER.load(Ordering::SeqCst), 1_000_0000);
119///
120/// go!(panic!("{}", 4));
121///
122/// let result = join!().unwrap_err();
123/// let expected = Some("4".to_string());
124/// assert!(
125///     matches!(
126///         result,
127///         JoinError::ThreadPanic(value) if value.downcast_ref::<String>() == expected.as_ref(),
128///     )
129/// );
130///
131/// go!(std::panic::panic_any(4_i32));
132///
133/// let result = join!().unwrap_err();
134/// let expected = Some(4_i32);
135/// assert!(
136///     matches!(
137///         result,
138///         JoinError::ThreadPanic(value) if value.downcast_ref::<i32>() == expected.as_ref(),
139///     )
140/// );
141/// # }
142/// ```
143#[macro_export]
144macro_rules! join {
145    () => {
146        go_spawn::__THREAD_HANDLE_INITIALIZED
147            .try_get()
148            .and_then(|handles| handles.borrow_mut().pop_back())
149            .map(|handle| {
150                handle
151                    .join()
152                    .map_err(go_spawn::error::JoinError::ThreadPanic)
153            })
154            .unwrap_or_else(|| Err(go_spawn::error::JoinError::NoHandleFound))
155    };
156}
157
158/// Joins all pending threads spawned from the current thread by either `go!` or `go_ref!`.
159///
160/// Threads will be joined starting with the most recent and working backwards until the earliest
161/// thread spawned.
162///
163/// # Handling panics
164///
165/// `join_all` optionally takes a callback to invoke on the error values returned when joining
166/// threads that panicked. The callback will be invoked on each error value returned when joining
167/// threads before proceeding to join the next thread, so it's recommended to avoid doing heavy
168/// work in these error handlers (e.g. by using a channel to pass the error somewhere else to be
169/// processed).
170///
171/// See the documentation for [`JoinHandle::join`](https://doc.rust-lang.org/std/thread/struct.JoinHandle.html#method.join) for more details.
172///
173/// # Examples
174/// ```
175/// # fn expensive_background_work() {}
176/// # fn do_work() {
177/// use go_spawn::{go, go_ref, join_all};
178/// use std::sync::{
179///     atomic::{AtomicI64, Ordering},
180/// };
181///
182/// static COUNTER: AtomicI64 = AtomicI64::new(0);
183///
184/// for _ in 0..4 {
185///     go_ref! {
186///         COUNTER.fetch_add(1, Ordering::SeqCst);
187///     }
188/// }
189///
190/// join_all!();
191/// assert_eq!(COUNTER.load(Ordering::SeqCst), 4);
192///
193/// for i in 0_i64..4 {
194///     go! {
195///         std::panic::panic_any(i);
196///     }
197/// }
198///
199/// static ERR_COUNTER: AtomicI64 = AtomicI64::new(0);
200///
201/// join_all!(|error| {
202///     let value: &i64 = error.downcast_ref().unwrap();
203///     assert_eq!(ERR_COUNTER.fetch_add(1, Ordering::SeqCst), *value);
204/// });
205/// # }
206/// ```
207#[macro_export]
208macro_rules! join_all {
209    () => {
210        go_spawn::join_all!(std::mem::drop)
211    };
212
213    ($error_handler:expr) => {
214        if let Some(handles) = go_spawn::__THREAD_HANDLE_INITIALIZED.try_get() {
215            let mut handles = handles.borrow_mut();
216
217            handles
218                .drain(..)
219                .rev()
220                .filter_map(|handle| handle.join().err())
221                .for_each($error_handler);
222        }
223    };
224}
225
226#[doc(hidden)]
227#[macro_export]
228macro_rules! __go_helper {
229    (@move $( $tokens:tt )+ ) => {
230        go_spawn::__go_helper!(@spawn move || { let _ = { $( $tokens )+ }; } )
231    };
232
233    (@ref $( $tokens:tt )+ ) => {
234        go_spawn::__go_helper!(@spawn || { let _ = { $( $tokens )+ }; } )
235    };
236
237    (@spawn $expression:expr) => {{
238        go_spawn::__THREAD_HANDLE_INITIALIZED.set(|| {
239            std::cell::RefCell::new(std::collections::VecDeque::new())
240        });
241        go_spawn::__THREAD_HANDLE_INITIALIZED
242            .get()
243            .borrow_mut()
244            .push_back(std::thread::spawn($expression));
245    }};
246}