async_rayon/
global.rs

1use futures::channel::oneshot;
2
3use crate::AsyncRayonHandle;
4use std::panic::{catch_unwind, AssertUnwindSafe};
5
6/// Asynchronous wrapper around Rayon's [`spawn`](rayon::spawn).
7///
8/// Runs a function on the global Rayon thread pool with LIFO priority,
9/// produciing a future that resolves with the function's return value.
10///
11/// # Panics
12/// If the task function panics, the panic will be propagated through the
13/// returned future. This will NOT trigger the Rayon thread pool's panic
14/// handler.
15///
16/// If the returned handle is dropped, and the return value of `func` panics
17/// when dropped, that panic WILL trigger the thread pool's panic
18/// handler.
19pub fn spawn<F, R>(func: F) -> AsyncRayonHandle<R>
20where
21    F: FnOnce() -> R + Send + 'static,
22    R: Send + 'static,
23{
24    let (tx, rx) = oneshot::channel();
25
26    rayon::spawn(move || {
27        let _result = tx.send(catch_unwind(AssertUnwindSafe(func)));
28    });
29
30    AsyncRayonHandle { rx }
31}
32
33/// Asynchronous wrapper around Rayon's [`spawn_fifo`](rayon::spawn_fifo).
34///
35/// Runs a function on the global Rayon thread pool with FIFO priority,
36/// produciing a future that resolves with the function's return value.
37///
38/// # Panics
39/// If the task function panics, the panic will be propagated through the
40/// returned future. This will NOT trigger the Rayon thread pool's panic
41/// handler.
42///
43/// If the returned handle is dropped, and the return value of `func` panics
44/// when dropped, then that panic WILL trigger the thread pool's panic
45/// handler.
46pub fn spawn_fifo<F, R>(func: F) -> AsyncRayonHandle<R>
47where
48    F: FnOnce() -> R + Send + 'static,
49    R: Send + 'static,
50{
51    let (tx, rx) = oneshot::channel();
52
53    rayon::spawn_fifo(move || {
54        let _result = tx.send(catch_unwind(AssertUnwindSafe(func)));
55    });
56
57    AsyncRayonHandle { rx }
58}
59
60#[cfg(test)]
61mod tests {
62    use super::*;
63    use crate::test::init;
64
65    #[tokio::test]
66    async fn test_spawn_async_works() {
67        init();
68        let result = spawn(|| {
69            let thread_index = rayon::current_thread_index();
70            assert_eq!(thread_index, Some(0));
71            1337_usize
72        })
73        .await;
74        assert_eq!(result, 1337);
75        let thread_index = rayon::current_thread_index();
76        assert_eq!(thread_index, None);
77    }
78
79    #[tokio::test]
80    async fn test_spawn_fifo_async_works() {
81        init();
82        let result = spawn_fifo(|| {
83            let thread_index = rayon::current_thread_index();
84            assert_eq!(thread_index, Some(0));
85            1337_usize
86        })
87        .await;
88        assert_eq!(result, 1337);
89        let thread_index = rayon::current_thread_index();
90        assert_eq!(thread_index, None);
91    }
92
93    #[tokio::test]
94    #[should_panic(expected = "Task failed successfully")]
95    async fn test_spawn_propagates_panic() {
96        init();
97        let handle = spawn(|| {
98            panic!("Task failed successfully");
99        });
100
101        handle.await;
102    }
103
104    #[tokio::test]
105    #[should_panic(expected = "Task failed successfully")]
106    async fn test_spawn_fifo_propagates_panic() {
107        init();
108        let handle = spawn_fifo(|| {
109            panic!("Task failed successfully");
110        });
111
112        handle.await;
113    }
114}