tokio_safe_block_on/lib.rs
1#![deny(warnings)]
2#![deny(missing_docs)]
3#![allow(clippy::needless_doctest_main)]
4//! Provides the ability to execute async code from a sync context,
5//! without blocking a tokio core thread or busy looping the cpu.
6//!
7//! # Example
8//!
9//! ```
10//! #[tokio::main(threaded_scheduler)]
11//! async fn main() {
12//! // we need to ensure we are in the context of a tokio task
13//! tokio::task::spawn(async move {
14//! // some library api may take a sync callback
15//! // but we want to be able to execute async code
16//! (|| {
17//! let r = tokio_safe_block_on::tokio_safe_block_on(
18//! // async code to poll synchronously
19//! async move {
20//! // simulate some async work
21//! tokio::time::delay_for(
22//! std::time::Duration::from_millis(2)
23//! ).await;
24//!
25//! // return our result
26//! "test"
27//! },
28//!
29//! // timeout to allow async execution
30//! std::time::Duration::from_millis(10),
31//! ).unwrap();
32//!
33//! // note we get the result inline with no `await`
34//! assert_eq!("test", r);
35//! })()
36//! })
37//! .await
38//! .unwrap();
39//! }
40//! ```
41
42/// Error Type
43#[derive(Debug)]
44pub enum BlockOnError {
45 /// The future did not complete within the time alloted.
46 Timeout,
47
48 /// The spawned tokio task returned a JoinError
49 TaskJoinError(tokio::task::JoinError),
50}
51
52impl std::fmt::Display for BlockOnError {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 write!(f, "{:?}", self)
55 }
56}
57
58impl From<tokio::task::JoinError> for BlockOnError {
59 fn from(e: tokio::task::JoinError) -> Self {
60 Self::TaskJoinError(e)
61 }
62}
63
64impl std::error::Error for BlockOnError {}
65
66/// Provides the ability to execute async code from a sync context,
67/// without blocking a tokio core thread or busy looping the cpu.
68/// You must ensure you are within the context of a tokio::task,
69/// This allows `tokio::task::block_in_place` to move to a blocking thread.
70/// This version will never time out - you may end up binding a
71/// tokio background thread forever.
72pub fn tokio_safe_block_forever_on<F>(f: F) -> Result<F::Output, BlockOnError>
73where
74 F: 'static + std::future::Future + Send,
75 <F as std::future::Future>::Output: Send,
76{
77 // first, we need to make sure to move this thread to the background
78 tokio::task::block_in_place(move || {
79 // poll until we get a result
80 futures::executor::block_on(async move {
81 // if we don't spawn, any recursive calls to block_in_place
82 // will fail
83 Ok(tokio::task::spawn(f).await?)
84 })
85 })
86}
87
88/// Provides the ability to execute async code from a sync context,
89/// without blocking a tokio core thread or busy looping the cpu.
90/// You must ensure you are within the context of a tokio::task,
91/// This allows `tokio::task::block_in_place` to move to a blocking thread.
92pub fn tokio_safe_block_on<F>(
93 f: F,
94 timeout: std::time::Duration,
95) -> Result<F::Output, BlockOnError>
96where
97 F: 'static + std::future::Future + Send,
98 <F as std::future::Future>::Output: Send,
99{
100 // work around pin requirements with a Box
101 let f = Box::pin(f);
102
103 // apply the timeout
104 let f = async move {
105 match futures::future::select(f, tokio::time::delay_for(timeout)).await
106 {
107 futures::future::Either::Left((res, _)) => Ok(res),
108 futures::future::Either::Right(_) => Err(BlockOnError::Timeout),
109 }
110 };
111
112 // execute the future
113 tokio_safe_block_forever_on(f)?
114}
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119
120 #[tokio::test(threaded_scheduler)]
121 async fn it_should_execute_async_from_sync_context_forever() {
122 tokio::task::spawn(async move {
123 (|| {
124 let result =
125 tokio_safe_block_forever_on(async move { "test0" });
126 assert_eq!("test0", result.unwrap());
127 })()
128 })
129 .await
130 .unwrap();
131 }
132
133 #[tokio::test(threaded_scheduler)]
134 async fn it_should_execute_async_from_sync_context() {
135 tokio::task::spawn(async move {
136 (|| {
137 let result = tokio_safe_block_on(
138 async move { "test1" },
139 std::time::Duration::from_millis(10),
140 );
141 assert_eq!("test1", result.unwrap());
142 })()
143 })
144 .await
145 .unwrap();
146 }
147
148 #[tokio::test(threaded_scheduler)]
149 async fn it_should_execute_timed_async_from_sync_context() {
150 tokio::task::spawn(async move {
151 (|| {
152 let result = tokio_safe_block_on(
153 async move {
154 tokio::time::delay_for(
155 std::time::Duration::from_millis(2),
156 )
157 .await;
158 "test2"
159 },
160 std::time::Duration::from_millis(10),
161 );
162 assert_eq!("test2", result.unwrap());
163 })()
164 })
165 .await
166 .unwrap();
167 }
168
169 #[tokio::test(threaded_scheduler)]
170 async fn it_should_timeout_timed_async_from_sync_context() {
171 tokio::task::spawn(async move {
172 (|| {
173 let result = tokio_safe_block_on(
174 async move {
175 tokio::time::delay_for(
176 std::time::Duration::from_millis(10),
177 )
178 .await;
179 "test3"
180 },
181 std::time::Duration::from_millis(2),
182 );
183 assert_matches::assert_matches!(
184 result,
185 Err(BlockOnError::Timeout)
186 );
187 })()
188 })
189 .await
190 .unwrap();
191 }
192
193 #[tokio::test(threaded_scheduler)]
194 async fn recursive_blocks_test() {
195 async fn rec_async(depth: u8) -> u8 {
196 if depth >= 10 {
197 return depth;
198 }
199 rec_sync(depth + 1)
200 }
201
202 fn rec_sync(depth: u8) -> u8 {
203 tokio_safe_block_forever_on(
204 async move { rec_async(depth + 1).await },
205 )
206 .unwrap()
207 }
208
209 tokio::task::spawn(async move {
210 assert_eq!(10, rec_async(0).await);
211 })
212 .await
213 .unwrap();
214 }
215}