amadeus_core/
pool.rs

1use futures::future::BoxFuture;
2use serde::{Deserialize, Serialize};
3use serde_closure::traits;
4use std::{
5	error::Error, future::Future, panic::{RefUnwindSafe, UnwindSafe}
6};
7
8pub trait ProcessSend: Send + Serialize + for<'de> Deserialize<'de> {}
9impl<T: ?Sized> ProcessSend for T where T: Send + Serialize + for<'de> Deserialize<'de> {}
10
11type Result<T> = std::result::Result<T, Box<dyn Error + Send>>;
12
13#[cfg_attr(not(nightly), serde_closure::desugar)]
14pub trait ProcessPool: Clone + Send + Sync + RefUnwindSafe + UnwindSafe + Unpin {
15	type ThreadPool: ThreadPool + 'static;
16
17	fn processes(&self) -> usize;
18
19	fn spawn<F, Fut, T>(&self, work: F) -> BoxFuture<'static, Result<T>>
20	where
21		F: traits::FnOnce(&Self::ThreadPool) -> Fut + ProcessSend + 'static,
22		Fut: Future<Output = T> + 'static,
23		T: ProcessSend + 'static,
24	{
25		#[allow(unsafe_code)]
26		unsafe {
27			self.spawn_unchecked(work)
28		}
29	}
30
31	/// # Safety
32	///
33	/// Must be polled to completion before dropping. Unsound to forget it without having polled to completion.
34	#[allow(unsafe_code)]
35	unsafe fn spawn_unchecked<'a, F, Fut, T>(&self, work: F) -> BoxFuture<'a, Result<T>>
36	where
37		F: traits::FnOnce(&Self::ThreadPool) -> Fut + ProcessSend + 'a,
38		Fut: Future<Output = T> + 'a,
39		T: ProcessSend + 'a;
40}
41
42pub trait ThreadPool: Clone + Send + Sync + RefUnwindSafe + UnwindSafe + Unpin {
43	fn threads(&self) -> usize;
44
45	fn spawn<F, Fut, T>(&self, work: F) -> BoxFuture<'static, Result<T>>
46	where
47		F: FnOnce() -> Fut + Send + 'static,
48		Fut: Future<Output = T> + 'static,
49		T: Send + 'static,
50	{
51		#[allow(unsafe_code)]
52		unsafe {
53			self.spawn_unchecked(work)
54		}
55	}
56
57	/// # Safety
58	///
59	/// Must be polled to completion before dropping. Unsound to forget it without having polled to completion.
60	#[allow(unsafe_code)]
61	unsafe fn spawn_unchecked<'a, F, Fut, T>(&self, work: F) -> BoxFuture<'a, Result<T>>
62	where
63		F: FnOnce() -> Fut + Send + 'a,
64		Fut: Future<Output = T> + 'a,
65		T: Send + 'a;
66}
67
68#[cfg_attr(not(nightly), serde_closure::desugar)]
69impl<P: ?Sized> ProcessPool for &P
70where
71	P: ProcessPool,
72{
73	type ThreadPool = P::ThreadPool;
74
75	fn processes(&self) -> usize {
76		(*self).processes()
77	}
78	fn spawn<F, Fut, T>(&self, work: F) -> BoxFuture<'static, Result<T>>
79	where
80		F: traits::FnOnce(&Self::ThreadPool) -> Fut + ProcessSend + 'static,
81		Fut: Future<Output = T> + 'static,
82		T: ProcessSend + 'static,
83	{
84		(*self).spawn(work)
85	}
86	#[allow(unsafe_code)]
87	unsafe fn spawn_unchecked<'a, F, Fut, T>(&self, work: F) -> BoxFuture<'a, Result<T>>
88	where
89		F: traits::FnOnce(&Self::ThreadPool) -> Fut + ProcessSend + 'a,
90		Fut: Future<Output = T> + 'a,
91		T: ProcessSend + 'a,
92	{
93		(*self).spawn_unchecked(work)
94	}
95}
96
97impl<P: ?Sized> ThreadPool for &P
98where
99	P: ThreadPool,
100{
101	fn threads(&self) -> usize {
102		(*self).threads()
103	}
104	fn spawn<F, Fut, T>(&self, work: F) -> BoxFuture<'static, Result<T>>
105	where
106		F: FnOnce() -> Fut + Send + 'static,
107		Fut: Future<Output = T> + 'static,
108		T: Send + 'static,
109	{
110		(*self).spawn(work)
111	}
112	#[allow(unsafe_code)]
113	unsafe fn spawn_unchecked<'a, F, Fut, T>(&self, work: F) -> BoxFuture<'a, Result<T>>
114	where
115		F: FnOnce() -> Fut + Send + 'a,
116		Fut: Future<Output = T> + 'a,
117		T: Send + 'a,
118	{
119		(*self).spawn_unchecked(work)
120	}
121}