futures_cputask/lib.rs
1#![doc(html_root_url = "https://docs.rs/futures-cputask/0.3.0")]
2//! futures-cputask allows you to turn long-running CPU tasks into a async function that runs
3//! on a secondary threadpool.
4//!
5//! While futures-preview allows execution of tasks, it runs it on the main threadpool, which clogs up
6//! one of the threads while the future is run. Running it on a secondary threadpool allows for potentially
7//! higher throughput.
8//!
9//! This crate supports multiple threadpool implementations, namely the one found in `threadpool` and the one found in `uvth`.
10//! You can add support for other threadpool implementations by using the `SyncThreadPool` trait found
11//! in this crate
12use std::future::Future;
13use std::pin::Pin;
14use parking_lot::Mutex;
15use lazy_static::lazy_static;
16mod impls;
17
18/// ThreadPool trait
19///
20/// `SyncThreadPool` has to be implemented for any kind of underlying threadpool you want to use
21/// with this crate. It defines the minimum interface required to be useful.
22///
23/// This crate implements this trait on the `threadpool` and `uvth` crates, if their corresponding
24/// features are enabled.
25pub trait SyncThreadPool {
26 /// Create a new default thread pool
27 ///
28 /// The implementor is suggested to dynamically check the amount of processing units available
29 /// and spawn that many threads for threadpool usage.
30 fn new() -> Self;
31 /// Creates a new thread pool with a maximum of `num_threads` threads
32 ///
33 /// The implementor *shall not* execute more than `num_threads` jobs at the same time, unless
34 /// modified. Internal threads that manage the job threads are not counted towards that limit.
35 fn with_thread_count(num_threads: usize) -> Self;
36 /// Schedules a task on the threadpool
37 ///
38 /// As soon as the task is scheduled, this function shall return.
39 fn execute<F>(&self, fun: F)
40 where
41 F: FnOnce() + Send + 'static;
42}
43
44/// Asynchronous threadpools
45///
46/// This trait specifies an interface for interacting with asynchronous threadpools, which allow you to
47/// await jobs
48pub trait AsyncThreadPool {
49 /// Create a new default thread pool
50 ///
51 /// The implementor is suggested to dynamically check the amount of processing units available
52 /// and spawn that many threads for threadpool usage.
53 fn new() -> Self;
54 /// Creates a new thread pool with a maximum of `num_threads` threads
55 ///
56 /// The implementor *shall not* execute more than `num_threads` jobs at the same time, unless
57 /// modified. Internal threads that manage the job threads are not counted towards that limit.
58 fn with_thread_count(num_threads: usize) -> Self;
59 /// Schedules a task on the threadpool
60 ///
61 /// As soon as the task is scheduled, this function shall return a future.
62 fn execute<F, T>(&self, fun: F) -> Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>>
63 where
64 F: FnOnce() -> T + Send + 'static,
65 T: Sized + Send + Sync + 'static;
66}
67
68/// Asynchronous threadpool
69///
70/// Wrapper around a Synchronous threadpool to provide futures support.
71pub struct ThreadPool<P>
72where P: SyncThreadPool + Sized
73{
74 inner: Mutex<P>
75}
76
77impl<P> ThreadPool<P>
78where P: SyncThreadPool + Sized
79{
80 /// Wraps around an existing threadpool instance
81 pub fn from_threadpool(pool: P) -> Self {
82 Self {
83 inner: Mutex::new(pool)
84 }
85 }
86 /// Moves the underlying threadpool out
87 pub fn into_inner(self) -> P {
88 self.inner.into_inner()
89 }
90}
91
92impl<P> AsyncThreadPool for ThreadPool<P>
93where P: SyncThreadPool + Sized
94{
95 fn new() -> Self {
96 Self::from_threadpool(P::new())
97 }
98 fn with_thread_count(num_threads: usize) -> Self {
99 Self::from_threadpool(P::with_thread_count(num_threads))
100 }
101 fn execute<F, T>(&self, fun: F) -> Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>>
102 where
103 F: FnOnce() -> T + Send + 'static,
104 T: Sized + Send + Sync + 'static
105 {
106 use futures::prelude::*;
107 use futures::channel::oneshot::channel;
108
109 let (tx, rx) = channel();
110 self.inner.lock().execute(move || {
111 let retval = fun();
112 tx.send(retval).ok();
113 });
114 Box::pin(rx.map(|v| v.unwrap()))
115 }
116}
117#[cfg(feature = "uvth")]
118lazy_static! {
119 /// Global threadpool
120 pub static ref GLOBAL_THREADPOOL: ThreadPool<uvth::ThreadPool> = ThreadPool::new();
121}
122
123#[cfg(all(feature = "threadpool", not(feature = "uvth")))]
124lazy_static! {
125 /// Global threadpool
126 pub static ref GLOBAL_THREADPOOL: ThreadPool<threadpool::ThreadPool> = ThreadPool::new();
127}
128
129/// Runs a task in an asynchronous threadpool
130pub fn run_task_in_pool<R, F, T>(fun: F, thread_pool: &T) -> Pin<Box<dyn Future<Output = R> + Send + Sync + 'static>>
131where
132 R: Sized + Send + Sync + 'static,
133 F: FnOnce() -> R + Send + 'static,
134 T: AsyncThreadPool
135{
136 thread_pool.execute(fun)
137}
138
139/// Runs a task in the global threadpool
140#[cfg(any(feature = "uvth", feature = "threadpool"))]
141pub fn run_task<R, F>(fun: F) -> Pin<Box<dyn Future<Output = R> + Send + Sync + 'static>>
142where
143 R: Sized + Send + Sync + 'static,
144 F: FnOnce() -> R + Send + 'static,
145{
146 run_task_in_pool(fun, &*GLOBAL_THREADPOOL)
147}
148
149#[cfg(feature = "derive")]
150pub use futures_cputask_derive::async_task;