tokio_util/io/
sync_bridge.rs

1use std::io::{BufRead, Read, Seek, Write};
2use tokio::io::{
3    AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite,
4    AsyncWriteExt,
5};
6
7/// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
8/// a [`tokio::io::AsyncWrite`] synchronously as a [`std::io::Write`].
9///
10/// # Alternatives
11///
12/// In many cases, there are better alternatives to using `SyncIoBridge`, especially
13/// if you want to avoid blocking the async runtime. Consider the following scenarios:
14///
15/// When hashing data, using `SyncIoBridge` can lead to suboptimal performance and
16/// might not fully leverage the async capabilities of the system.
17///
18/// ### Why It Matters:
19///
20/// `SyncIoBridge` allows you to use asynchronous I/O operations in an synchronous
21/// context by blocking the current thread. However, this can be inefficient because:
22/// - **Inefficient Resource Usage**: `SyncIoBridge` takes up an entire OS thread,
23///   which is inefficient compared to asynchronous code that can multiplex many
24///   tasks on a single thread.
25/// - **Thread Pool Saturation**: Excessive use of `SyncIoBridge` can exhaust the
26///   async runtime's thread pool, reducing the number of threads available for
27///   other tasks and impacting overall performance.
28/// - **Missed Concurrency Benefits**: By using synchronous operations with
29///   `SyncIoBridge`, you lose the ability to interleave tasks efficiently,
30///   which is a key advantage of asynchronous programming.
31///
32/// ## Example 1: Hashing Data
33///
34/// The use of `SyncIoBridge` is unnecessary when hashing data. Instead, you can
35/// process the data asynchronously by reading it into memory, which avoids blocking
36/// the async runtime.
37///
38/// There are two strategies for avoiding `SyncIoBridge` when hashing data. When
39/// the data fits into memory, the easiest is to read the data into a `Vec<u8>`
40/// and hash it:
41///
42/// Explanation: This example demonstrates how to asynchronously read data from a
43/// reader into memory and hash it using a synchronous hashing function. The
44/// `SyncIoBridge` is avoided, ensuring that the async runtime is not blocked.
45/// ```rust
46/// use tokio::io::AsyncReadExt;
47/// use tokio::io::AsyncRead;
48/// use std::io::Cursor;
49/// # mod blake3 { pub fn hash(_: &[u8]) {} }
50///
51/// async fn hash_contents(mut reader: impl AsyncRead + Unpin) -> Result<(), std::io::Error> {
52///    // Read all data from the reader into a Vec<u8>.
53///    let mut data = Vec::new();
54///    reader.read_to_end(&mut data).await?;
55///
56///    // Hash the data using the blake3 hashing function.
57///    let hash = blake3::hash(&data);
58///
59///    Ok(hash)
60/// }
61///
62/// # #[tokio::main(flavor = "current_thread")]
63/// # async fn main() -> Result<(), std::io::Error> {
64/// // Example: In-memory data.
65/// let data = b"Hello, world!"; // A byte slice.
66/// let reader = Cursor::new(data); // Create an in-memory AsyncRead.
67/// hash_contents(reader).await
68/// # }
69/// ```
70///
71/// When the data doesn't fit into memory, the hashing library will usually
72/// provide a `hasher` that you can repeatedly call `update` on to hash the data
73/// one chunk at the time.
74///
75/// Explanation: This example demonstrates how to asynchronously stream data in
76/// chunks for hashing. Each chunk is read asynchronously, and the hash is updated
77/// incrementally. This avoids blocking and improves performance over using
78/// `SyncIoBridge`.
79///
80/// ```rust
81/// use tokio::io::AsyncReadExt;
82/// use tokio::io::AsyncRead;
83/// use std::io::Cursor;
84/// # struct Hasher;
85/// # impl Hasher { pub fn update(&mut self, _: &[u8]) {} pub fn finalize(&self) {} }
86///
87/// /// Asynchronously streams data from an async reader, processes it in chunks,
88/// /// and hashes the data incrementally.
89/// async fn hash_stream(mut reader: impl AsyncRead + Unpin, mut hasher: Hasher) -> Result<(), std::io::Error> {
90///    // Create a buffer to read data into, sized for performance.
91///    let mut data = vec![0; 16 * 1024];
92///    loop {
93///        // Read data from the reader into the buffer.
94///        let len = reader.read(&mut data).await?;
95///        if len == 0 { break; } // Exit loop if no more data.
96///
97///        // Update the hash with the data read.
98///        hasher.update(&data[..len]);
99///    }
100///
101///    // Finalize the hash after all data has been processed.
102///    let hash = hasher.finalize();
103///
104///    Ok(hash)
105/// }
106///
107/// # #[tokio::main(flavor = "current_thread")]
108/// # async fn main() -> Result<(), std::io::Error> {
109/// // Example: In-memory data.
110/// let data = b"Hello, world!"; // A byte slice.
111/// let reader = Cursor::new(data); // Create an in-memory AsyncRead.
112/// let hasher = Hasher;
113/// hash_stream(reader, hasher).await
114/// # }
115/// ```
116///
117///
118/// ## Example 2: Compressing Data
119///
120/// When compressing data, the use of `SyncIoBridge` is unnecessary as it introduces
121/// blocking and inefficient code. Instead, you can utilize an async compression library
122/// such as the [`async-compression`](https://docs.rs/async-compression/latest/async_compression/)
123/// crate, which is built to handle asynchronous data streams efficiently.
124///
125/// Explanation: This example shows how to asynchronously compress data using an
126/// async compression library. By reading and writing asynchronously, it avoids
127/// blocking and is more efficient than using `SyncIoBridge` with a non-async
128/// compression library.
129///
130/// ```ignore
131/// use async_compression::tokio::write::GzipEncoder;
132/// use std::io::Cursor;
133/// use tokio::io::AsyncRead;
134///
135/// /// Asynchronously compresses data from an async reader using Gzip and an async encoder.
136/// async fn compress_data(mut reader: impl AsyncRead + Unpin) -> Result<(), std::io::Error> {
137///    let writer = tokio::io::sink();
138///
139///    // Create a Gzip encoder that wraps the writer.
140///    let mut encoder = GzipEncoder::new(writer);
141///
142///    // Copy data from the reader to the encoder, compressing it.
143///    tokio::io::copy(&mut reader, &mut encoder).await?;
144///
145///    Ok(())
146///}
147///
148/// #[tokio::main]
149/// async fn main() -> Result<(), std::io::Error> {
150///     // Example: In-memory data.
151///     let data = b"Hello, world!"; // A byte slice.
152///     let reader = Cursor::new(data); // Create an in-memory AsyncRead.
153///     compress_data(reader).await?;
154///
155///   Ok(())
156/// }
157/// ```
158///
159///
160/// ## Example 3: Parsing Data Formats
161///
162///
163/// `SyncIoBridge` is not ideal when parsing data formats such as `JSON`, as it
164/// blocks async operations. A more efficient approach is to read data asynchronously
165/// into memory and then `deserialize` it, avoiding unnecessary synchronization overhead.
166///
167/// Explanation: This example shows how to asynchronously read data into memory
168/// and then parse it as `JSON`. By avoiding `SyncIoBridge`, the asynchronous runtime
169/// remains unblocked, leading to better performance when working with asynchronous
170/// I/O streams.
171///
172/// ```rust,no_run
173/// use tokio::io::AsyncRead;
174/// use tokio::io::AsyncReadExt;
175/// use std::io::Cursor;
176/// # mod serde {
177/// #     pub trait DeserializeOwned: 'static {}
178/// #     impl<T: 'static> DeserializeOwned for T {}
179/// # }
180/// # mod serde_json {
181/// #     use super::serde::DeserializeOwned;
182/// #     pub fn from_slice<T: DeserializeOwned>(_: &[u8]) -> Result<T, std::io::Error> {
183/// #         unimplemented!()
184/// #     }
185/// # }
186/// # #[derive(Debug)] struct MyStruct;
187///
188///
189/// async fn parse_json(mut reader: impl AsyncRead + Unpin) -> Result<MyStruct, std::io::Error> {
190///    // Read all data from the reader into a Vec<u8>.
191///    let mut data = Vec::new();
192///    reader.read_to_end(&mut data).await?;
193///
194///    // Deserialize the data from the Vec<u8> into a MyStruct instance.
195///    let value: MyStruct = serde_json::from_slice(&data)?;
196///
197///    Ok(value)
198///}
199///
200/// #[tokio::main]
201/// async fn main() -> Result<(), std::io::Error> {
202///     // Example: In-memory data.
203///     let data = b"Hello, world!"; // A byte slice.
204///     let reader = Cursor::new(data); // Create an in-memory AsyncRead.
205///     parse_json(reader).await?;
206///     Ok(())
207/// }
208/// ```
209///
210/// ## Correct Usage of `SyncIoBridge` inside `spawn_blocking`
211///
212/// `SyncIoBridge` is mainly useful when you need to interface with synchronous
213/// libraries from an asynchronous context.
214///
215/// Explanation: This example shows how to use `SyncIoBridge` inside a `spawn_blocking`
216/// task to safely perform synchronous I/O without blocking the async runtime. The
217/// `spawn_blocking` ensures that the synchronous code is offloaded to a dedicated
218/// thread pool, preventing it from interfering with the async tasks.
219///
220/// ```rust
221/// # #[cfg(not(target_family = "wasm"))]
222/// # {
223/// use tokio::task::spawn_blocking;
224/// use tokio_util::io::SyncIoBridge;
225/// use tokio::io::AsyncRead;
226/// use std::marker::Unpin;
227/// use std::io::Cursor;
228///
229/// /// Wraps an async reader with `SyncIoBridge` and performs synchronous I/O operations in a blocking task.
230/// async fn process_sync_io(reader: impl AsyncRead + Unpin + Send + 'static) -> Result<Vec<u8>, std::io::Error> {
231///    // Wrap the async reader with `SyncIoBridge` to allow synchronous reading.
232///    let mut sync_reader = SyncIoBridge::new(reader);
233///
234///    // Spawn a blocking task to perform synchronous I/O operations.
235///    let result = spawn_blocking(move || {
236///        // Create an in-memory buffer to hold the copied data.
237///        let mut buffer = Vec::new();
238///        // Copy data from the sync_reader to the buffer.
239///        std::io::copy(&mut sync_reader, &mut buffer)?;
240///        // Return the buffer containing the copied data.
241///        Ok::<_, std::io::Error>(buffer)
242///    })
243///    .await??;
244///
245///    // Return the result from the blocking task.
246///    Ok(result)
247///}
248///
249/// #[tokio::main]
250/// async fn main() -> Result<(), std::io::Error> {
251///     // Example: In-memory data.
252///     let data = b"Hello, world!"; // A byte slice.
253///     let reader = Cursor::new(data); // Create an in-memory AsyncRead.
254///     let result = process_sync_io(reader).await?;
255///
256///     // You can use `result` here as needed.
257///
258///     Ok(())
259/// }
260/// # }
261/// ```
262///
263#[derive(Debug)]
264pub struct SyncIoBridge<T> {
265    src: T,
266    rt: tokio::runtime::Handle,
267}
268
269impl<T: AsyncBufRead + Unpin> BufRead for SyncIoBridge<T> {
270    fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
271        let src = &mut self.src;
272        self.rt.block_on(AsyncBufReadExt::fill_buf(src))
273    }
274
275    fn consume(&mut self, amt: usize) {
276        let src = &mut self.src;
277        AsyncBufReadExt::consume(src, amt)
278    }
279
280    fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> std::io::Result<usize> {
281        let src = &mut self.src;
282        self.rt
283            .block_on(AsyncBufReadExt::read_until(src, byte, buf))
284    }
285    fn read_line(&mut self, buf: &mut String) -> std::io::Result<usize> {
286        let src = &mut self.src;
287        self.rt.block_on(AsyncBufReadExt::read_line(src, buf))
288    }
289}
290
291impl<T: AsyncRead + Unpin> Read for SyncIoBridge<T> {
292    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
293        let src = &mut self.src;
294        self.rt.block_on(AsyncReadExt::read(src, buf))
295    }
296
297    fn read_to_end(&mut self, buf: &mut Vec<u8>) -> std::io::Result<usize> {
298        let src = &mut self.src;
299        self.rt.block_on(src.read_to_end(buf))
300    }
301
302    fn read_to_string(&mut self, buf: &mut String) -> std::io::Result<usize> {
303        let src = &mut self.src;
304        self.rt.block_on(src.read_to_string(buf))
305    }
306
307    fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
308        let src = &mut self.src;
309        // The AsyncRead trait returns the count, synchronous doesn't.
310        let _n = self.rt.block_on(src.read_exact(buf))?;
311        Ok(())
312    }
313}
314
315impl<T: AsyncWrite + Unpin> Write for SyncIoBridge<T> {
316    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
317        let src = &mut self.src;
318        self.rt.block_on(src.write(buf))
319    }
320
321    fn flush(&mut self) -> std::io::Result<()> {
322        let src = &mut self.src;
323        self.rt.block_on(src.flush())
324    }
325
326    fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
327        let src = &mut self.src;
328        self.rt.block_on(src.write_all(buf))
329    }
330
331    fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
332        let src = &mut self.src;
333        self.rt.block_on(src.write_vectored(bufs))
334    }
335}
336
337impl<T: AsyncSeek + Unpin> Seek for SyncIoBridge<T> {
338    fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
339        let src = &mut self.src;
340        self.rt.block_on(AsyncSeekExt::seek(src, pos))
341    }
342}
343
344// Because https://doc.rust-lang.org/std/io/trait.Write.html#method.is_write_vectored is at the time
345// of this writing still unstable, we expose this as part of a standalone method.
346impl<T: AsyncWrite> SyncIoBridge<T> {
347    /// Determines if the underlying [`tokio::io::AsyncWrite`] target supports efficient vectored writes.
348    ///
349    /// See [`tokio::io::AsyncWrite::is_write_vectored`].
350    pub fn is_write_vectored(&self) -> bool {
351        self.src.is_write_vectored()
352    }
353}
354
355impl<T: AsyncWrite + Unpin> SyncIoBridge<T> {
356    /// Shutdown this writer. This method provides a way to call the [`AsyncWriteExt::shutdown`]
357    /// function of the inner [`tokio::io::AsyncWrite`] instance.
358    ///
359    /// # Errors
360    ///
361    /// This method returns the same errors as [`AsyncWriteExt::shutdown`].
362    ///
363    /// [`AsyncWriteExt::shutdown`]: tokio::io::AsyncWriteExt::shutdown
364    pub fn shutdown(&mut self) -> std::io::Result<()> {
365        let src = &mut self.src;
366        self.rt.block_on(src.shutdown())
367    }
368}
369
370impl<T: Unpin> SyncIoBridge<T> {
371    /// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
372    /// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`].
373    ///
374    /// When this struct is created, it captures a handle to the current thread's runtime with [`tokio::runtime::Handle::current`].
375    /// It is hence OK to move this struct into a separate thread outside the runtime, as created
376    /// by e.g. [`tokio::task::spawn_blocking`].
377    ///
378    /// Stated even more strongly: to make use of this bridge, you *must* move
379    /// it into a separate thread outside the runtime.  The synchronous I/O will use the
380    /// underlying handle to block on the backing asynchronous source, via
381    /// [`tokio::runtime::Handle::block_on`].  As noted in the documentation for that
382    /// function, an attempt to `block_on` from an asynchronous execution context
383    /// will panic.
384    ///
385    /// # Wrapping `!Unpin` types
386    ///
387    /// Use e.g. `SyncIoBridge::new(Box::pin(src))`.
388    ///
389    /// # Panics
390    ///
391    /// This will panic if called outside the context of a Tokio runtime.
392    #[track_caller]
393    pub fn new(src: T) -> Self {
394        Self::new_with_handle(src, tokio::runtime::Handle::current())
395    }
396
397    /// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
398    /// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`].
399    ///
400    /// This is the same as [`SyncIoBridge::new`], but allows passing an arbitrary handle and hence may
401    /// be initially invoked outside of an asynchronous context.
402    pub fn new_with_handle(src: T, rt: tokio::runtime::Handle) -> Self {
403        Self { src, rt }
404    }
405
406    /// Consume this bridge, returning the underlying stream.
407    pub fn into_inner(self) -> T {
408        self.src
409    }
410}
411
412impl<T> AsMut<T> for SyncIoBridge<T> {
413    fn as_mut(&mut self) -> &mut T {
414        &mut self.src
415    }
416}
417
418impl<T> AsRef<T> for SyncIoBridge<T> {
419    fn as_ref(&self) -> &T {
420        &self.src
421    }
422}