asyncified/lib.rs
1//! # Asyncified
2//!
3//! This small, zero-dependency library provides a wrapper
4//! to hide synchronous, blocking types behind an async
5//! interface that is runtime agnostic.
6//!
7//! # Example
8//!
9//! ```rust
10//! # #[tokio::main]
11//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
12//! # struct SlowThing;
13//! # impl SlowThing {
14//! # fn new() -> SlowThing {
15//! # std::thread::sleep(std::time::Duration::from_secs(1));
16//! # SlowThing
17//! # }
18//! # }
19//! use asyncified::Asyncified;
20//!
21//! // Construct a thing (that could take time) inside
22//! // the Asyncified container, awaiting it to be ready.
23//! // prefer `new()` if you want to be able to return an error.
24//! let s = Asyncified::builder().build_ok(SlowThing::new).await;
25//!
26//! // Perform some potentially slow operation on the thing
27//! // inside the container, awaiting the result.
28//! let n = s.call(|slow_thing| {
29//! std::thread::sleep(std::time::Duration::from_secs(1));
30//! 123usize
31//! }).await;
32//! # assert_eq!(n, 123);
33//! # Ok(())
34//! # }
35//! ```
36#![forbid(unsafe_code)]
37
38mod channel;
39mod oneshot;
40
41type Func<T> = Box<dyn FnOnce(&mut T) + Send + 'static>;
42
43pub struct AsyncifiedBuilder<T> {
44 channel_size: usize,
45 on_close: Option<Func<T>>,
46 thread_builder: Option<std::thread::Builder>,
47}
48
49impl<T: 'static> Default for AsyncifiedBuilder<T> {
50 fn default() -> Self {
51 Self::new()
52 }
53}
54
55impl<T: 'static> AsyncifiedBuilder<T> {
56 /// Construct a new builder.
57 pub fn new() -> Self {
58 Self {
59 channel_size: 16,
60 on_close: None,
61 thread_builder: None,
62 }
63 }
64
65 /// How many items can be queued at a time to be executed on
66 /// the Asyncified container.
67 pub fn channel_size(mut self, size: usize) -> Self {
68 self.channel_size = size;
69 self
70 }
71
72 /// Configure the thread that the asyncified item will live on.
73 pub fn thread_builder(mut self, builder: std::thread::Builder) -> Self {
74 self.thread_builder = Some(builder);
75 self
76 }
77
78 /// Configure a single function to run when the [`Asyncified`]
79 /// container is dropped.
80 pub fn on_close<F>(mut self, f: F) -> Self
81 where
82 F: FnOnce(&mut T) + Send + 'static,
83 {
84 self.on_close = Some(Box::new(f));
85 self
86 }
87
88 /// This is a shorthand for [`AsyncifiedBuilder::build()`] for when the thing
89 /// you're constructing can't fail.
90 pub async fn build_ok<F>(self, val_fn: F) -> Asyncified<T>
91 where
92 F: Send + 'static + FnOnce() -> T,
93 {
94 self.build(move || Ok::<_, ()>(val_fn()))
95 .await
96 .expect("function can't fail")
97 }
98
99 /// This is passed a constructor function, and constructs the resulting
100 /// value on a new thread, returning any error in doing so. Use
101 /// [`Asyncified::call()`] to perform operations on the constructed value.
102 ///
103 /// It's also passed a thread builder, giving you control over the thread
104 /// that the value is spawned into.
105 pub async fn build<E, F>(self, val_fn: F) -> Result<Asyncified<T>, E>
106 where
107 E: Send + 'static,
108 F: Send + 'static + FnOnce() -> Result<T, E>,
109 {
110 let thread_builder = self
111 .thread_builder
112 .unwrap_or_else(|| std::thread::Builder::new().name("Asyncified thread".to_string()));
113
114 // The default size of the bounded channel, 16, is somewhat
115 // arbitrarily chosen just to help reduce locking on
116 // reads from the channel when there are many writes
117 // (ie it can lock once per 16 reads). `call` will
118 // always wait until a result anyway.
119 let channel_size = self.channel_size;
120
121 let (tx, mut rx) = channel::new::<Func<T>>(channel_size);
122 let (res_tx, res_rx) = oneshot::new::<Result<(), E>>();
123
124 // This runs when the task is going to end (eg no more
125 // open channels).
126 let on_close = self.on_close;
127
128 // As long as there are senders, we'll
129 // receive and run functions in the separate thread.
130 thread_builder
131 .spawn(move || {
132 let mut val = match val_fn() {
133 Ok(val) => {
134 res_tx.send(Ok(()));
135 val
136 }
137 Err(e) => {
138 res_tx.send(Err(e));
139 return;
140 }
141 };
142 while let Some(f) = rx.recv() {
143 f(&mut val)
144 }
145 if let Some(on_close) = on_close {
146 on_close(&mut val)
147 }
148 })
149 .expect("should be able to spawn new thread for Asyncified instance");
150
151 res_rx.recv().await?;
152 Ok(Asyncified { tx })
153 }
154}
155
156/// The whole point.
157pub struct Asyncified<T> {
158 tx: channel::Sender<Func<T>>,
159}
160
161impl<T> Clone for Asyncified<T> {
162 fn clone(&self) -> Self {
163 Self {
164 tx: self.tx.clone(),
165 }
166 }
167}
168
169impl<T> std::fmt::Debug for Asyncified<T> {
170 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 f.debug_struct("Asyncified")
172 .field("tx", &"<channel::Sender>")
173 .finish()
174 }
175}
176
177impl<T: 'static> Asyncified<T> {
178 /// Construct a new [`Asyncified`] container with default values.
179 /// Use [`Asyncified::builder()`] for more options.
180 pub async fn new<E, F>(val_fn: F) -> Result<Asyncified<T>, E>
181 where
182 E: Send + 'static,
183 F: Send + 'static + FnOnce() -> Result<T, E>,
184 {
185 Self::builder().build(val_fn).await
186 }
187
188 /// Configure a new [`Asyncified`] instance.
189 pub fn builder() -> AsyncifiedBuilder<T> {
190 AsyncifiedBuilder::new()
191 }
192
193 /// Execute the provided function on the thread that the asyncified
194 /// value was moved onto, handing back the result when done. The async
195 /// task that this is called from will be yielded while we're waiting
196 /// for the call to finish.
197 pub async fn call<R: Send + 'static, F: (FnOnce(&mut T) -> R) + Send + 'static>(
198 &self,
199 f: F,
200 ) -> R {
201 let (tx, rx) = oneshot::new::<R>();
202
203 // Ignore any error, since we expect the thread to last until this
204 // struct is dropped, and so sending should never fail.
205 let _ = self
206 .tx
207 .send(Box::new(move |item| {
208 let res = f(item);
209 // Send res back via sync->async oneshot.
210 tx.send(res);
211 }))
212 .await;
213
214 rx.recv().await
215 }
216}
217
218#[cfg(test)]
219mod test {
220 use super::*;
221 use std::time::{Duration, Instant};
222
223 #[test]
224 fn new_doesnt_block() {
225 let start = Instant::now();
226
227 // don't block if constructing the value takes a while (we can `await` to return only
228 // when the value has been built).
229 let _fut = Asyncified::new(|| {
230 std::thread::sleep(Duration::from_secs(10));
231 Ok::<_, ()>(())
232 });
233
234 // Should take <100ms to get here, not 10s.
235 let d = Instant::now().duration_since(start).as_millis();
236 assert!(d < 100);
237 }
238
239 #[tokio::test]
240 async fn call_doesnt_block() {
241 let a = Asyncified::new(|| Ok::<_, ()>(())).await.unwrap();
242
243 let start = Instant::now();
244
245 // The function takes 10s to complete:
246 let _fut = a.call(|_| {
247 std::thread::sleep(Duration::from_secs(10));
248 });
249
250 // But we just get a future back which doesn't block:
251 let d = Instant::now().duration_since(start).as_millis();
252 assert!(d < 100);
253 }
254
255 #[tokio::test]
256 async fn basic_updating_works() {
257 let a = Asyncified::new(|| Ok::<_, ()>(0u64)).await.unwrap();
258
259 for i in 1..100_000 {
260 assert_eq!(
261 a.call(|n| {
262 *n += 1;
263 *n
264 })
265 .await,
266 i
267 );
268 }
269 }
270
271 #[tokio::test]
272 async fn parallel_updating_works() {
273 let a = Asyncified::new(|| Ok::<_, ()>(0u64)).await.unwrap();
274
275 // spawn 10 tasks which all increment the number
276 let handles: Vec<_> = (0..10)
277 .map({
278 let a = a.clone();
279 move |_| {
280 let a = a.clone();
281 tokio::spawn(async move {
282 for _ in 0..10_000 {
283 a.call(|n| {
284 *n += 1;
285 *n
286 })
287 .await;
288 }
289 })
290 }
291 })
292 .collect();
293
294 // wait for them all to finish
295 for handle in handles {
296 let _ = handle.await;
297 }
298
299 // the number should have been incremeted 100k times.
300 assert_eq!(a.call(|n| *n).await, 100_000);
301 }
302
303 #[tokio::test]
304 async fn on_close_is_called() {
305 let (tx, rx) = tokio::sync::oneshot::channel();
306
307 let a = Asyncified::builder()
308 .on_close(move |val| {
309 let _ = tx.send(*val);
310 })
311 .build_ok(|| 0u64)
312 .await;
313
314 a.call(|v| *v = 100).await;
315
316 drop(a);
317
318 // The on_close should have set b to the value:
319 assert_eq!(rx.await.unwrap(), 100);
320 }
321}