asyncified/
lib.rs

1//! # Asyncified
2//!
3//! This small, zero-dependency library provides a wrapper
4//! to hide synchronous, blocking types behind an async
5//! interface that is runtime agnostic.
6//!
7//! # Example
8//!
9//! ```rust
10//! # #[tokio::main]
11//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
12//! # struct SlowThing;
13//! # impl SlowThing {
14//! #     fn new() -> SlowThing {
15//! #         std::thread::sleep(std::time::Duration::from_secs(1));
16//! #         SlowThing
17//! #     }
18//! # }
19//! use asyncified::Asyncified;
20//!
21//! // Construct a thing (that could take time) inside
22//! // the Asyncified container, awaiting it to be ready.
23//! // prefer `new()` if you want to be able to return an error.
24//! let s = Asyncified::builder().build_ok(SlowThing::new).await;
25//!
26//! // Perform some potentially slow operation on the thing
27//! // inside the container, awaiting the result.
28//! let n = s.call(|slow_thing| {
29//!     std::thread::sleep(std::time::Duration::from_secs(1));
30//!     123usize
31//! }).await;
32//! # assert_eq!(n, 123);
33//! # Ok(())
34//! # }
35//! ```
36#![forbid(unsafe_code)]
37
38mod channel;
39mod oneshot;
40
41type Func<T> = Box<dyn FnOnce(&mut T) + Send + 'static>;
42
43pub struct AsyncifiedBuilder<T> {
44    channel_size: usize,
45    on_close: Option<Func<T>>,
46    thread_builder: Option<std::thread::Builder>,
47}
48
49impl<T: 'static> Default for AsyncifiedBuilder<T> {
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55impl<T: 'static> AsyncifiedBuilder<T> {
56    /// Construct a new builder.
57    pub fn new() -> Self {
58        Self {
59            channel_size: 16,
60            on_close: None,
61            thread_builder: None,
62        }
63    }
64
65    /// How many items can be queued at a time to be executed on
66    /// the Asyncified container.
67    pub fn channel_size(mut self, size: usize) -> Self {
68        self.channel_size = size;
69        self
70    }
71
72    /// Configure the thread that the asyncified item will live on.
73    pub fn thread_builder(mut self, builder: std::thread::Builder) -> Self {
74        self.thread_builder = Some(builder);
75        self
76    }
77
78    /// Configure a single function to run when the [`Asyncified`]
79    /// container is dropped.
80    pub fn on_close<F>(mut self, f: F) -> Self
81    where
82        F: FnOnce(&mut T) + Send + 'static,
83    {
84        self.on_close = Some(Box::new(f));
85        self
86    }
87
88    /// This is a shorthand for [`AsyncifiedBuilder::build()`] for when the thing
89    /// you're constructing can't fail.
90    pub async fn build_ok<F>(self, val_fn: F) -> Asyncified<T>
91    where
92        F: Send + 'static + FnOnce() -> T,
93    {
94        self.build(move || Ok::<_, ()>(val_fn()))
95            .await
96            .expect("function can't fail")
97    }
98
99    /// This is passed a constructor function, and constructs the resulting
100    /// value on a new thread, returning any error in doing so. Use
101    /// [`Asyncified::call()`] to perform operations on the constructed value.
102    ///
103    /// It's also passed a thread builder, giving you control over the thread
104    /// that the value is spawned into.
105    pub async fn build<E, F>(self, val_fn: F) -> Result<Asyncified<T>, E>
106    where
107        E: Send + 'static,
108        F: Send + 'static + FnOnce() -> Result<T, E>,
109    {
110        let thread_builder = self
111            .thread_builder
112            .unwrap_or_else(|| std::thread::Builder::new().name("Asyncified thread".to_string()));
113
114        // The default size of the bounded channel, 16, is somewhat
115        // arbitrarily chosen just to help reduce locking on
116        // reads from the channel when there are many writes
117        // (ie it can lock once per 16 reads). `call` will
118        // always wait until a result anyway.
119        let channel_size = self.channel_size;
120
121        let (tx, mut rx) = channel::new::<Func<T>>(channel_size);
122        let (res_tx, res_rx) = oneshot::new::<Result<(), E>>();
123
124        // This runs when the task is going to end (eg no more
125        // open channels).
126        let on_close = self.on_close;
127
128        // As long as there are senders, we'll
129        // receive and run functions in the separate thread.
130        thread_builder
131            .spawn(move || {
132                let mut val = match val_fn() {
133                    Ok(val) => {
134                        res_tx.send(Ok(()));
135                        val
136                    }
137                    Err(e) => {
138                        res_tx.send(Err(e));
139                        return;
140                    }
141                };
142                while let Some(f) = rx.recv() {
143                    f(&mut val)
144                }
145                if let Some(on_close) = on_close {
146                    on_close(&mut val)
147                }
148            })
149            .expect("should be able to spawn new thread for Asyncified instance");
150
151        res_rx.recv().await?;
152        Ok(Asyncified { tx })
153    }
154}
155
156/// The whole point.
157pub struct Asyncified<T> {
158    tx: channel::Sender<Func<T>>,
159}
160
161impl<T> Clone for Asyncified<T> {
162    fn clone(&self) -> Self {
163        Self {
164            tx: self.tx.clone(),
165        }
166    }
167}
168
169impl<T> std::fmt::Debug for Asyncified<T> {
170    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171        f.debug_struct("Asyncified")
172            .field("tx", &"<channel::Sender>")
173            .finish()
174    }
175}
176
177impl<T: 'static> Asyncified<T> {
178    /// Construct a new [`Asyncified`] container with default values.
179    /// Use [`Asyncified::builder()`] for more options.
180    pub async fn new<E, F>(val_fn: F) -> Result<Asyncified<T>, E>
181    where
182        E: Send + 'static,
183        F: Send + 'static + FnOnce() -> Result<T, E>,
184    {
185        Self::builder().build(val_fn).await
186    }
187
188    /// Configure a new [`Asyncified`] instance.
189    pub fn builder() -> AsyncifiedBuilder<T> {
190        AsyncifiedBuilder::new()
191    }
192
193    /// Execute the provided function on the thread that the asyncified
194    /// value was moved onto, handing back the result when done. The async
195    /// task that this is called from will be yielded while we're waiting
196    /// for the call to finish.
197    pub async fn call<R: Send + 'static, F: (FnOnce(&mut T) -> R) + Send + 'static>(
198        &self,
199        f: F,
200    ) -> R {
201        let (tx, rx) = oneshot::new::<R>();
202
203        // Ignore any error, since we expect the thread to last until this
204        // struct is dropped, and so sending should never fail.
205        let _ = self
206            .tx
207            .send(Box::new(move |item| {
208                let res = f(item);
209                // Send res back via sync->async oneshot.
210                tx.send(res);
211            }))
212            .await;
213
214        rx.recv().await
215    }
216}
217
218#[cfg(test)]
219mod test {
220    use super::*;
221    use std::time::{Duration, Instant};
222
223    #[test]
224    fn new_doesnt_block() {
225        let start = Instant::now();
226
227        // don't block if constructing the value takes a while (we can `await` to return only
228        // when the value has been built).
229        let _fut = Asyncified::new(|| {
230            std::thread::sleep(Duration::from_secs(10));
231            Ok::<_, ()>(())
232        });
233
234        // Should take <100ms to get here, not 10s.
235        let d = Instant::now().duration_since(start).as_millis();
236        assert!(d < 100);
237    }
238
239    #[tokio::test]
240    async fn call_doesnt_block() {
241        let a = Asyncified::new(|| Ok::<_, ()>(())).await.unwrap();
242
243        let start = Instant::now();
244
245        // The function takes 10s to complete:
246        let _fut = a.call(|_| {
247            std::thread::sleep(Duration::from_secs(10));
248        });
249
250        // But we just get a future back which doesn't block:
251        let d = Instant::now().duration_since(start).as_millis();
252        assert!(d < 100);
253    }
254
255    #[tokio::test]
256    async fn basic_updating_works() {
257        let a = Asyncified::new(|| Ok::<_, ()>(0u64)).await.unwrap();
258
259        for i in 1..100_000 {
260            assert_eq!(
261                a.call(|n| {
262                    *n += 1;
263                    *n
264                })
265                .await,
266                i
267            );
268        }
269    }
270
271    #[tokio::test]
272    async fn parallel_updating_works() {
273        let a = Asyncified::new(|| Ok::<_, ()>(0u64)).await.unwrap();
274
275        // spawn 10 tasks which all increment the number
276        let handles: Vec<_> = (0..10)
277            .map({
278                let a = a.clone();
279                move |_| {
280                    let a = a.clone();
281                    tokio::spawn(async move {
282                        for _ in 0..10_000 {
283                            a.call(|n| {
284                                *n += 1;
285                                *n
286                            })
287                            .await;
288                        }
289                    })
290                }
291            })
292            .collect();
293
294        // wait for them all to finish
295        for handle in handles {
296            let _ = handle.await;
297        }
298
299        // the number should have been incremeted 100k times.
300        assert_eq!(a.call(|n| *n).await, 100_000);
301    }
302
303    #[tokio::test]
304    async fn on_close_is_called() {
305        let (tx, rx) = tokio::sync::oneshot::channel();
306
307        let a = Asyncified::builder()
308            .on_close(move |val| {
309                let _ = tx.send(*val);
310            })
311            .build_ok(|| 0u64)
312            .await;
313
314        a.call(|v| *v = 100).await;
315
316        drop(a);
317
318        // The on_close should have set b to the value:
319        assert_eq!(rx.await.unwrap(), 100);
320    }
321}