futures_cputask/
lib.rs

1#![doc(html_root_url = "https://docs.rs/futures-cputask/0.3.0")]
2//! futures-cputask allows you to turn long-running CPU tasks into a async function that runs
3//! on a secondary threadpool.
4//!
5//! While futures-preview allows execution of tasks, it runs it on the main threadpool, which clogs up
6//! one of the threads while the future is run. Running it on a secondary threadpool allows for potentially
7//! higher throughput.
8//!
9//! This crate supports multiple threadpool implementations, namely the one found in `threadpool` and the one found in `uvth`.
10//! You can add support for other threadpool implementations by using the `SyncThreadPool` trait found
11//! in this crate
12use std::future::Future;
13use std::pin::Pin;
14use parking_lot::Mutex;
15use lazy_static::lazy_static;
16mod impls;
17
18/// ThreadPool trait
19///
20/// `SyncThreadPool` has to be implemented for any kind of underlying threadpool you want to use
21/// with this crate. It defines the minimum interface required to be useful.
22///
23/// This crate implements this trait on the `threadpool` and `uvth` crates, if their corresponding
24/// features are enabled.
25pub trait SyncThreadPool {
26    /// Create a new default thread pool
27    ///
28    /// The implementor is suggested to dynamically check the amount of processing units available
29    /// and spawn that many threads for threadpool usage.
30    fn new() -> Self;
31    /// Creates a new thread pool with a maximum of `num_threads` threads
32    ///
33    /// The implementor *shall not* execute more than `num_threads` jobs at the same time, unless
34    /// modified. Internal threads that manage the job threads are not counted towards that limit.
35    fn with_thread_count(num_threads: usize) -> Self;
36    /// Schedules a task on the threadpool
37    ///
38    /// As soon as the task is scheduled, this function shall return.
39    fn execute<F>(&self, fun: F)
40    where
41        F: FnOnce() + Send + 'static;
42}
43
44/// Asynchronous threadpools
45///
46/// This trait specifies an interface for interacting with asynchronous threadpools, which allow you to
47/// await jobs
48pub trait AsyncThreadPool {
49    /// Create a new default thread pool
50    ///
51    /// The implementor is suggested to dynamically check the amount of processing units available
52    /// and spawn that many threads for threadpool usage.
53    fn new() -> Self;
54    /// Creates a new thread pool with a maximum of `num_threads` threads
55    ///
56    /// The implementor *shall not* execute more than `num_threads` jobs at the same time, unless
57    /// modified. Internal threads that manage the job threads are not counted towards that limit.
58    fn with_thread_count(num_threads: usize) -> Self;
59    /// Schedules a task on the threadpool
60    ///
61    /// As soon as the task is scheduled, this function shall return a future.
62    fn execute<F, T>(&self, fun: F) -> Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>>
63    where
64        F: FnOnce() -> T + Send + 'static,
65        T: Sized + Send + Sync + 'static;
66}
67
68/// Asynchronous threadpool
69///
70/// Wrapper around a Synchronous threadpool to provide futures support.
71pub struct ThreadPool<P>
72where P: SyncThreadPool + Sized
73{
74    inner: Mutex<P>
75}
76
77impl<P> ThreadPool<P>
78where P: SyncThreadPool + Sized
79{
80    /// Wraps around an existing threadpool instance
81    pub fn from_threadpool(pool: P) -> Self {
82        Self {
83            inner: Mutex::new(pool)
84        }
85    }
86    /// Moves the underlying threadpool out
87    pub fn into_inner(self) -> P {
88        self.inner.into_inner()
89    }
90}
91
92impl<P> AsyncThreadPool for ThreadPool<P>
93where P: SyncThreadPool + Sized
94{
95    fn new() -> Self {
96        Self::from_threadpool(P::new())
97    }
98    fn with_thread_count(num_threads: usize) -> Self {
99        Self::from_threadpool(P::with_thread_count(num_threads))
100    }
101    fn execute<F, T>(&self, fun: F) -> Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>>
102    where
103        F: FnOnce() -> T + Send + 'static,
104        T: Sized + Send + Sync + 'static
105    {
106        use futures::prelude::*;
107        use futures::channel::oneshot::channel;
108
109        let (tx, rx) = channel();
110        self.inner.lock().execute(move || {
111            let retval = fun();
112            tx.send(retval).ok();
113        });
114        Box::pin(rx.map(|v| v.unwrap()))
115    }
116}
117#[cfg(feature = "uvth")]
118lazy_static! {
119    /// Global threadpool
120    pub static ref GLOBAL_THREADPOOL: ThreadPool<uvth::ThreadPool> = ThreadPool::new();
121}
122
123#[cfg(all(feature = "threadpool", not(feature = "uvth")))]
124lazy_static! {
125    /// Global threadpool
126    pub static ref GLOBAL_THREADPOOL: ThreadPool<threadpool::ThreadPool> = ThreadPool::new();
127}
128
129/// Runs a task in an asynchronous threadpool
130pub fn run_task_in_pool<R, F, T>(fun: F, thread_pool: &T) -> Pin<Box<dyn Future<Output = R> + Send + Sync + 'static>>
131where
132    R: Sized + Send + Sync + 'static,
133    F: FnOnce() -> R + Send + 'static,
134    T: AsyncThreadPool
135{
136    thread_pool.execute(fun)
137}
138
139/// Runs a task in the global threadpool
140#[cfg(any(feature = "uvth", feature = "threadpool"))]
141pub fn run_task<R, F>(fun: F) -> Pin<Box<dyn Future<Output = R> + Send + Sync + 'static>>
142where
143    R: Sized + Send + Sync + 'static,
144    F: FnOnce() -> R + Send + 'static,
145{
146    run_task_in_pool(fun, &*GLOBAL_THREADPOOL)
147}
148
149#[cfg(feature = "derive")]
150pub use futures_cputask_derive::async_task;