tokio_safe_block_on/
lib.rs

1#![deny(warnings)]
2#![deny(missing_docs)]
3#![allow(clippy::needless_doctest_main)]
4//! Provides the ability to execute async code from a sync context,
5//! without blocking a tokio core thread or busy looping the cpu.
6//!
7//! # Example
8//!
9//! ```
10//! #[tokio::main(threaded_scheduler)]
11//! async fn main() {
12//!     // we need to ensure we are in the context of a tokio task
13//!     tokio::task::spawn(async move {
14//!         // some library api may take a sync callback
15//!         // but we want to be able to execute async code
16//!         (|| {
17//!             let r = tokio_safe_block_on::tokio_safe_block_on(
18//!                 // async code to poll synchronously
19//!                 async move {
20//!                     // simulate some async work
21//!                     tokio::time::delay_for(
22//!                         std::time::Duration::from_millis(2)
23//!                     ).await;
24//!
25//!                     // return our result
26//!                     "test"
27//!                 },
28//!
29//!                 // timeout to allow async execution
30//!                 std::time::Duration::from_millis(10),
31//!             ).unwrap();
32//!
33//!             // note we get the result inline with no `await`
34//!             assert_eq!("test", r);
35//!         })()
36//!     })
37//!     .await
38//!     .unwrap();
39//! }
40//! ```
41
42/// Error Type
43#[derive(Debug)]
44pub enum BlockOnError {
45    /// The future did not complete within the time alloted.
46    Timeout,
47
48    /// The spawned tokio task returned a JoinError
49    TaskJoinError(tokio::task::JoinError),
50}
51
52impl std::fmt::Display for BlockOnError {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        write!(f, "{:?}", self)
55    }
56}
57
58impl From<tokio::task::JoinError> for BlockOnError {
59    fn from(e: tokio::task::JoinError) -> Self {
60        Self::TaskJoinError(e)
61    }
62}
63
64impl std::error::Error for BlockOnError {}
65
66/// Provides the ability to execute async code from a sync context,
67/// without blocking a tokio core thread or busy looping the cpu.
68/// You must ensure you are within the context of a tokio::task,
69/// This allows `tokio::task::block_in_place` to move to a blocking thread.
70/// This version will never time out - you may end up binding a
71/// tokio background thread forever.
72pub fn tokio_safe_block_forever_on<F>(f: F) -> Result<F::Output, BlockOnError>
73where
74    F: 'static + std::future::Future + Send,
75    <F as std::future::Future>::Output: Send,
76{
77    // first, we need to make sure to move this thread to the background
78    tokio::task::block_in_place(move || {
79        // poll until we get a result
80        futures::executor::block_on(async move {
81            // if we don't spawn, any recursive calls to block_in_place
82            // will fail
83            Ok(tokio::task::spawn(f).await?)
84        })
85    })
86}
87
88/// Provides the ability to execute async code from a sync context,
89/// without blocking a tokio core thread or busy looping the cpu.
90/// You must ensure you are within the context of a tokio::task,
91/// This allows `tokio::task::block_in_place` to move to a blocking thread.
92pub fn tokio_safe_block_on<F>(
93    f: F,
94    timeout: std::time::Duration,
95) -> Result<F::Output, BlockOnError>
96where
97    F: 'static + std::future::Future + Send,
98    <F as std::future::Future>::Output: Send,
99{
100    // work around pin requirements with a Box
101    let f = Box::pin(f);
102
103    // apply the timeout
104    let f = async move {
105        match futures::future::select(f, tokio::time::delay_for(timeout)).await
106        {
107            futures::future::Either::Left((res, _)) => Ok(res),
108            futures::future::Either::Right(_) => Err(BlockOnError::Timeout),
109        }
110    };
111
112    // execute the future
113    tokio_safe_block_forever_on(f)?
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119
120    #[tokio::test(threaded_scheduler)]
121    async fn it_should_execute_async_from_sync_context_forever() {
122        tokio::task::spawn(async move {
123            (|| {
124                let result =
125                    tokio_safe_block_forever_on(async move { "test0" });
126                assert_eq!("test0", result.unwrap());
127            })()
128        })
129        .await
130        .unwrap();
131    }
132
133    #[tokio::test(threaded_scheduler)]
134    async fn it_should_execute_async_from_sync_context() {
135        tokio::task::spawn(async move {
136            (|| {
137                let result = tokio_safe_block_on(
138                    async move { "test1" },
139                    std::time::Duration::from_millis(10),
140                );
141                assert_eq!("test1", result.unwrap());
142            })()
143        })
144        .await
145        .unwrap();
146    }
147
148    #[tokio::test(threaded_scheduler)]
149    async fn it_should_execute_timed_async_from_sync_context() {
150        tokio::task::spawn(async move {
151            (|| {
152                let result = tokio_safe_block_on(
153                    async move {
154                        tokio::time::delay_for(
155                            std::time::Duration::from_millis(2),
156                        )
157                        .await;
158                        "test2"
159                    },
160                    std::time::Duration::from_millis(10),
161                );
162                assert_eq!("test2", result.unwrap());
163            })()
164        })
165        .await
166        .unwrap();
167    }
168
169    #[tokio::test(threaded_scheduler)]
170    async fn it_should_timeout_timed_async_from_sync_context() {
171        tokio::task::spawn(async move {
172            (|| {
173                let result = tokio_safe_block_on(
174                    async move {
175                        tokio::time::delay_for(
176                            std::time::Duration::from_millis(10),
177                        )
178                        .await;
179                        "test3"
180                    },
181                    std::time::Duration::from_millis(2),
182                );
183                assert_matches::assert_matches!(
184                    result,
185                    Err(BlockOnError::Timeout)
186                );
187            })()
188        })
189        .await
190        .unwrap();
191    }
192
193    #[tokio::test(threaded_scheduler)]
194    async fn recursive_blocks_test() {
195        async fn rec_async(depth: u8) -> u8 {
196            if depth >= 10 {
197                return depth;
198            }
199            rec_sync(depth + 1)
200        }
201
202        fn rec_sync(depth: u8) -> u8 {
203            tokio_safe_block_forever_on(
204                async move { rec_async(depth + 1).await },
205            )
206            .unwrap()
207        }
208
209        tokio::task::spawn(async move {
210            assert_eq!(10, rec_async(0).await);
211        })
212        .await
213        .unwrap();
214    }
215}