iroh_io/
tokio_io.rs

1//! Blocking io for [std::fs::File], using the tokio blocking task pool.
2use std::{
3    future::Future,
4    io::{self, Read, Seek, SeekFrom},
5    path::PathBuf,
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10use bytes::Bytes;
11use pin_project::pin_project;
12use tokio::{
13    io::{AsyncReadExt, AsyncWrite},
14    task::{spawn_blocking, JoinHandle},
15};
16
17use super::{make_io_error, AsyncSliceReader, AsyncSliceWriter, AsyncStreamWriter};
18use crate::AsyncStreamReader;
19
20const MAX_PREALLOC: usize = 1024 * 16;
21
22/// A wrapper around a [std::fs::File] that implements [AsyncSliceReader] and [AsyncSliceWriter]
23#[derive(Debug)]
24pub struct File(Option<FileAdapterFsm>);
25
26impl File {
27    /// Create a new [File] from a function that creates a [std::fs::File]
28    pub async fn create(
29        create_file: impl FnOnce() -> io::Result<std::fs::File> + Send + 'static,
30    ) -> io::Result<Self> {
31        let inner = spawn_blocking(create_file).await.map_err(make_io_error)??;
32        Ok(Self::from_std(inner))
33    }
34
35    /// Create a new [File] from a [std::fs::File]
36    ///
37    /// This is fine if you already have a [std::fs::File] and want to use it with [File],
38    /// but opening a file is a blocking op that you probably don't want to do in an async context.
39    pub fn from_std(file: std::fs::File) -> Self {
40        Self(Some(FileAdapterFsm(file)))
41    }
42
43    /// Open a [File] from a path
44    pub async fn open(path: PathBuf) -> io::Result<Self> {
45        Self::create(move || std::fs::File::open(path)).await
46    }
47
48    /// Test helper to read the contents of the file
49    #[cfg(test)]
50    pub fn read_contents(&self) -> Vec<u8> {
51        let mut std_file = &self.0.as_ref().unwrap().0;
52        let mut t = Vec::new();
53        // this is not needed since at least for POSIX IO "read your own writes"
54        // is guaranteed.
55        // std_file.sync_all().unwrap();
56        std_file.rewind().unwrap();
57        std_file.read_to_end(&mut t).unwrap();
58        t
59    }
60}
61
62/// Support for the [File]
63pub mod file {
64    use super::*;
65
66    impl AsyncSliceReader for File {
67        async fn read_at(&mut self, offset: u64, len: usize) -> io::Result<Bytes> {
68            Asyncify::from(self.0.take().map(|t| (t.read_at(offset, len), &mut self.0))).await
69        }
70
71        async fn size(&mut self) -> io::Result<u64> {
72            Asyncify::from(self.0.take().map(|t| (t.len(), &mut self.0))).await
73        }
74    }
75
76    impl AsyncSliceWriter for File {
77        async fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> io::Result<()> {
78            Asyncify::from(
79                self.0
80                    .take()
81                    .map(|t| (t.write_bytes_at(offset, data), &mut self.0)),
82            )
83            .await
84        }
85
86        async fn write_at(&mut self, offset: u64, data: &[u8]) -> io::Result<()> {
87            Asyncify::from(
88                self.0
89                    .take()
90                    .map(|t| (t.write_at(offset, data), &mut self.0)),
91            )
92            .await
93        }
94
95        async fn sync(&mut self) -> io::Result<()> {
96            Asyncify::from(self.0.take().map(|t| (t.sync(), &mut self.0))).await
97        }
98
99        async fn set_len(&mut self, len: u64) -> io::Result<()> {
100            Asyncify::from(self.0.take().map(|t| (t.set_len(len), &mut self.0))).await
101        }
102    }
103}
104
105/// A future wrapper to unpack the result of a sync computation and store the
106/// state on completion, making the io object available again.
107#[derive(Debug)]
108#[pin_project(project = AsyncifyProj)]
109enum Asyncify<'a, R, T> {
110    /// we got a future and a handle where we can store the state on completion
111    Ok(
112        #[pin] tokio::task::JoinHandle<(T, io::Result<R>)>,
113        &'a mut Option<T>,
114    ),
115    /// the handle was busy
116    BusyErr,
117}
118
119impl<'a, R, T> From<Option<(JoinHandle<(T, io::Result<R>)>, &'a mut Option<T>)>>
120    for Asyncify<'a, R, T>
121{
122    fn from(value: Option<(JoinHandle<(T, io::Result<R>)>, &'a mut Option<T>)>) -> Self {
123        match value {
124            Some((f, h)) => Self::Ok(f, h),
125            None => Self::BusyErr,
126        }
127    }
128}
129
130impl<'a, T: 'a, R> Future for Asyncify<'a, R, T> {
131    type Output = io::Result<R>;
132    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
133        match self.project() {
134            AsyncifyProj::Ok(f, h) => f.poll(cx).map(|x| {
135                match x {
136                    Ok((state, r)) => {
137                        // we got a result, so we can store the state
138                        **h = Some(state);
139                        r
140                    }
141                    Err(e) => Err(io::Error::other(e)),
142                }
143            }),
144            AsyncifyProj::BusyErr => Poll::Ready(io::Result::Err(io::Error::other(
145                "previous io op not polled to completion",
146            ))),
147        }
148    }
149}
150
151/// A wrapper around a [std::fs::File] that defines IO operations that spawn blocking tasks.
152///
153/// This implements all operations of [AsyncSliceReader] and [AsyncSliceWriter] in state
154/// passing style.
155#[derive(Debug)]
156struct FileAdapterFsm(std::fs::File);
157
158impl FileAdapterFsm {
159    fn read_at(mut self, offset: u64, len: usize) -> JoinHandle<(Self, io::Result<Bytes>)> {
160        fn inner<R: std::io::Read + std::io::Seek>(
161            this: &mut R,
162            offset: u64,
163            len: usize,
164            buf: &mut Vec<u8>,
165        ) -> io::Result<()> {
166            this.seek(SeekFrom::Start(offset))?;
167            this.take(len as u64).read_to_end(buf)?;
168            Ok(())
169        }
170        spawn_blocking(move || {
171            // len is just the expected len, so if it is too big, we should not allocate
172            // the entire size.
173            let mut buf = Vec::with_capacity(len.min(MAX_PREALLOC));
174            let res = inner(&mut self.0, offset, len, &mut buf);
175            (self, res.map(|_| buf.into()))
176        })
177    }
178
179    fn len(mut self) -> JoinHandle<(Self, io::Result<u64>)> {
180        spawn_blocking(move || {
181            let res = self.0.seek(SeekFrom::End(0));
182            (self, res)
183        })
184    }
185}
186
187impl FileAdapterFsm {
188    fn write_bytes_at(mut self, offset: u64, data: Bytes) -> JoinHandle<(Self, io::Result<()>)> {
189        fn inner<W: std::io::Write + std::io::Seek>(
190            this: &mut W,
191            offset: u64,
192            buf: &[u8],
193        ) -> io::Result<()> {
194            this.seek(SeekFrom::Start(offset))?;
195            this.write_all(buf)?;
196            Ok(())
197        }
198        spawn_blocking(move || {
199            let res = inner(&mut self.0, offset, &data);
200            (self, res)
201        })
202    }
203
204    fn write_at(mut self, offset: u64, bytes: &[u8]) -> JoinHandle<(Self, io::Result<()>)> {
205        fn inner<W: std::io::Write + std::io::Seek>(
206            this: &mut W,
207            offset: u64,
208            buf: smallvec::SmallVec<[u8; 16]>,
209        ) -> io::Result<()> {
210            this.seek(SeekFrom::Start(offset))?;
211            this.write_all(&buf)?;
212            Ok(())
213        }
214        let t: smallvec::SmallVec<[u8; 16]> = bytes.into();
215        spawn_blocking(move || {
216            let res = inner(&mut self.0, offset, t);
217            (self, res)
218        })
219    }
220
221    fn set_len(self, len: u64) -> JoinHandle<(Self, io::Result<()>)> {
222        spawn_blocking(move || {
223            let res = self.0.set_len(len);
224            (self, res)
225        })
226    }
227
228    fn sync(self) -> JoinHandle<(Self, io::Result<()>)> {
229        spawn_blocking(move || {
230            let res = self.0.sync_all();
231            (self, res)
232        })
233    }
234}
235
236/// Utility to convert an [AsyncWrite] into an [AsyncSliceWriter] by just ignoring the offsets
237#[derive(Debug)]
238pub struct ConcatenateSliceWriter<W>(W);
239
240impl<W> ConcatenateSliceWriter<W> {
241    /// Create a new `ConcatenateSliceWriter` from an inner writer
242    pub fn new(inner: W) -> Self {
243        Self(inner)
244    }
245
246    /// Return the inner writer
247    pub fn into_inner(self) -> W {
248        self.0
249    }
250}
251
252impl<W: AsyncWrite + Unpin + 'static> AsyncSliceWriter for ConcatenateSliceWriter<W> {
253    async fn write_bytes_at(&mut self, _offset: u64, data: Bytes) -> io::Result<()> {
254        self.0.write_all(&data).await
255    }
256
257    async fn write_at(&mut self, _offset: u64, bytes: &[u8]) -> io::Result<()> {
258        self.0.write_all(bytes).await
259    }
260
261    async fn sync(&mut self) -> io::Result<()> {
262        self.0.flush().await
263    }
264
265    async fn set_len(&mut self, _len: u64) -> io::Result<()> {
266        io::Result::Ok(())
267    }
268}
269
270/// Utility to convert a [tokio::io::AsyncWrite] into an [AsyncStreamWriter].
271#[derive(Debug, Clone)]
272pub struct TokioStreamWriter<T>(pub T);
273
274use tokio::io::AsyncWriteExt;
275
276impl<T: tokio::io::AsyncWrite + Unpin> AsyncStreamWriter for TokioStreamWriter<T> {
277    async fn write(&mut self, data: &[u8]) -> io::Result<()> {
278        self.0.write_all(data).await
279    }
280
281    async fn write_bytes(&mut self, data: Bytes) -> io::Result<()> {
282        self.0.write_all(&data).await
283    }
284
285    async fn sync(&mut self) -> io::Result<()> {
286        self.0.flush().await
287    }
288}
289
290/// Utility to convert a [tokio::io::AsyncRead] into an [AsyncStreamReader].
291#[derive(Debug, Clone)]
292pub struct TokioStreamReader<T>(pub T);
293
294impl<T> TokioStreamReader<T> {
295    /// Create a new `TokioStreamReader` from an inner reader
296    pub fn new(inner: T) -> Self {
297        Self(inner)
298    }
299
300    /// Return the inner reader
301    pub fn into_inner(self) -> T {
302        self.0
303    }
304}
305
306impl<T: tokio::io::AsyncRead + Unpin> AsyncStreamReader for TokioStreamReader<T> {
307    async fn read_bytes(&mut self, len: usize) -> io::Result<Bytes> {
308        let mut buf = Vec::with_capacity(len.min(MAX_PREALLOC));
309        (&mut self.0).take(len as u64).read_to_end(&mut buf).await?;
310        Ok(buf.into())
311    }
312
313    async fn read<const L: usize>(&mut self) -> io::Result<[u8; L]> {
314        let mut buf = [0; L];
315        self.0.read_exact(&mut buf).await?;
316        Ok(buf)
317    }
318}