tokio_file/
file.rs

1// vim: tw=80
2// This lint isn't very helpful.  See
3// https://github.com/rust-lang/rust-clippy/discussions/14256
4#![allow(clippy::doc_overindented_list_items)]
5
6use std::{
7    io::{self, IoSlice, IoSliceMut},
8    os::unix::io::{AsFd, RawFd},
9    pin::Pin,
10};
11
12use futures::{
13    task::{Context, Poll},
14    Future,
15};
16use mio_aio::AioFsyncMode;
17use tokio::io::bsd::{Aio, AioSource};
18
19nix::ioctl_read! {
20    /// Get the size of the entire device in bytes.  This should be a multiple
21    /// of the sector size.
22    diocgmediasize, 'd', 129, nix::libc::off_t
23}
24
25nix::ioctl_read! {
26    diocgsectorsize, 'd', 128, nix::libc::c_uint
27}
28
29nix::ioctl_read! {
30    diocgstripesize, 'd', 139, nix::libc::off_t
31}
32
33#[derive(Debug)]
34struct TokioSource<T>(T);
35
36impl<T: mio_aio::SourceApi> AioSource for TokioSource<T> {
37    fn register(&mut self, kq: RawFd, token: usize) {
38        self.0.register_raw(kq, token)
39    }
40
41    fn deregister(&mut self) {
42        self.0.deregister_raw()
43    }
44}
45
46#[must_use = "futures do nothing unless polled"]
47#[derive(Debug)]
48/// Future type used by all methods of [`File`].
49pub struct TokioFileFut<T: mio_aio::SourceApi>(Aio<TokioSource<T>>);
50
51impl<T: mio_aio::SourceApi> Future for TokioFileFut<T> {
52    type Output = io::Result<T::Output>;
53
54    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
55        let poll_result = self.0.poll_ready(cx);
56        match poll_result {
57            Poll::Pending => {
58                if !self.0 .0.in_progress() {
59                    let p = unsafe { self.map_unchecked_mut(|s| &mut s.0 .0) };
60                    match p.submit() {
61                        Ok(()) => (),
62                        Err(e) => {
63                            return Poll::Ready(Err(
64                                io::Error::from_raw_os_error(e as i32),
65                            ))
66                        }
67                    }
68                }
69                Poll::Pending
70            }
71            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
72            Poll::Ready(Ok(_ev)) => {
73                // At this point, we could clear readiness.  But there's no
74                // point, since we're about to drop the Aio.
75                let p = unsafe { self.map_unchecked_mut(|s| &mut s.0 .0) };
76                let result = p.aio_return();
77                match result {
78                    Ok(r) => Poll::Ready(Ok(r)),
79                    Err(e) => {
80                        Poll::Ready(Err(io::Error::from_raw_os_error(e as i32)))
81                    }
82                }
83            }
84        }
85    }
86}
87
88/// Return type of [`AioFileExt::read_at`].  Implements `Future`.
89pub type ReadAt<'a> = TokioFileFut<mio_aio::ReadAt<'a>>;
90
91/// Return type of [`AioFileExt::readv_at`].  Implements `Future`.
92pub type ReadvAt<'a> = TokioFileFut<mio_aio::ReadvAt<'a>>;
93
94/// Return type of [`AioFileExt::sync_all`].  Implements `Future`.
95pub type SyncAll<'a> = TokioFileFut<mio_aio::Fsync<'a>>;
96
97/// Return type of [`AioFileExt::write_at`].  Implements `Future`.
98pub type WriteAt<'a> = TokioFileFut<mio_aio::WriteAt<'a>>;
99
100/// Return type of [`AioFileExt::writev_at`].  Implements `Future`.
101pub type WritevAt<'a> = TokioFileFut<mio_aio::WritevAt<'a>>;
102
103/// Adds POSIX AIO-based asynchronous methods to files.
104pub trait AioFileExt: AsFd {
105    /// Asynchronous equivalent of `std::fs::File::read_at`
106    ///
107    /// # Examples
108    ///
109    /// ```
110    /// use std::fs;
111    /// use std::io::Write;
112    /// use tempfile::TempDir;
113    /// use tokio_file::AioFileExt;
114    ///
115    /// # #[tokio::main]
116    /// # async fn main() {
117    /// const WBUF: &[u8] = b"abcdef";
118    /// const EXPECT: &[u8] = b"cdef";
119    /// let mut rbuf = vec![0; 4];
120    /// let dir = TempDir::new().unwrap();
121    /// let path = dir.path().join("foo");
122    /// let mut f = fs::File::create(&path).unwrap();
123    /// f.write(WBUF).unwrap();
124    ///
125    /// let file = fs::OpenOptions::new()
126    ///     .read(true)
127    ///     .open(path)
128    ///     .unwrap();
129    /// let r = file.read_at(&mut rbuf[..], 2).unwrap().await.unwrap();
130    /// assert_eq!(&rbuf[..], &EXPECT[..]);
131    /// # }
132    /// ```
133    fn read_at<'a>(
134        &'a self,
135        buf: &'a mut [u8],
136        offset: u64,
137    ) -> io::Result<ReadAt<'a>> {
138        let fd = self.as_fd();
139        let source = TokioSource(mio_aio::ReadAt::read_at(fd, offset, buf, 0));
140        Ok(TokioFileFut(Aio::new_for_aio(source)?))
141    }
142
143    /// Asynchronous equivalent of `preadv`.
144    ///
145    /// Similar to
146    /// [preadv(2)](https://www.freebsd.org/cgi/man.cgi?query=read&sektion=2)
147    /// but asynchronous.  Reads a contiguous portion of a file into a
148    /// scatter-gather list of buffers.
149    ///
150    /// # Parameters
151    ///
152    /// - `bufs`:   The destination for the read.  A scatter-gather list of
153    ///             buffers.
154    /// - `offset`: Offset within the file at which to begin the read
155    ///
156    /// # Returns
157    ///
158    /// - `Ok(x)`:  The operation was successfully created.  The future may be
159    ///             polled and will eventually return the final status of the
160    ///             operation.
161    /// - `Err(x)`: An error occurred before issueing the operation.  The result
162    ///             may be `drop`ped.
163    ///
164    /// # Examples
165    ///
166    /// ```
167    /// use std::borrow::BorrowMut;
168    /// use std::fs;
169    /// use std::io::{IoSliceMut, Write};
170    /// use tempfile::TempDir;
171    /// use tokio_file::AioFileExt;
172    ///
173    /// # #[tokio::main]
174    /// # async fn main() {
175    /// const WBUF: &[u8] = b"abcdefghijklmnopqrwtuvwxyz";
176    /// const EXPECT0: &[u8] = b"cdef";
177    /// const EXPECT1: &[u8] = b"ghijklmn";
178    /// let l0 = 4;
179    /// let l1 = 8;
180    /// let mut rbuf0 = vec![0; l0];
181    /// let mut rbuf1 = vec![0; l1];
182    /// let mut rbufs = [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)];
183    ///
184    /// let dir = TempDir::new().unwrap();
185    /// let path = dir.path().join("foo");
186    /// let mut f = fs::File::create(&path).unwrap();
187    /// f.write(WBUF).unwrap();
188    ///
189    /// let file = fs::OpenOptions::new()
190    ///     .read(true)
191    ///     .open(path)
192    ///     .unwrap();
193    /// let mut r = file.readv_at(&mut rbufs[..], 2).unwrap().await.unwrap();
194    ///
195    /// assert_eq!(l0 + l1, r);
196    /// assert_eq!(&rbuf0[..], &EXPECT0[..]);
197    /// assert_eq!(&rbuf1[..], &EXPECT1[..]);
198    /// # }
199    /// ```
200    fn readv_at<'a>(
201        &'a self,
202        bufs: &mut [IoSliceMut<'a>],
203        offset: u64,
204    ) -> io::Result<ReadvAt<'a>> {
205        let fd = self.as_fd();
206        let source =
207            TokioSource(mio_aio::ReadvAt::readv_at(fd, offset, bufs, 0));
208        Ok(TokioFileFut(Aio::new_for_aio(source)?))
209    }
210
211    /// Asynchronous equivalent of `std::fs::File::sync_all`
212    ///
213    /// # Examples
214    ///
215    /// ```
216    /// use std::borrow::BorrowMut;
217    /// use std::fs;
218    /// use std::io::Write;
219    /// use tempfile::TempDir;
220    /// use tokio_file::AioFileExt;
221    ///
222    /// # #[tokio::main]
223    /// # async fn main() {
224    /// let dir = TempDir::new().unwrap();
225    /// let path = dir.path().join("foo");
226    ///
227    /// let file = fs::OpenOptions::new()
228    ///     .write(true)
229    ///     .create(true)
230    ///     .open(path)
231    ///     .unwrap();
232    /// let r = AioFileExt::sync_all(&file).unwrap().await.unwrap();
233    /// # }
234    /// ```
235    // TODO: add sync_all_data, for supported operating systems
236    fn sync_all(&self) -> io::Result<SyncAll> {
237        let mode = AioFsyncMode::O_SYNC;
238        let fd = self.as_fd();
239        let source = TokioSource(mio_aio::Fsync::fsync(fd, mode, 0));
240        Ok(TokioFileFut(Aio::new_for_aio(source)?))
241    }
242
243    /// Asynchronous equivalent of `std::fs::File::write_at`.
244    ///
245    /// # Examples
246    ///
247    /// ```
248    /// use std::fs;
249    /// use std::io::Read;
250    /// use tempfile::TempDir;
251    /// use tokio_file::AioFileExt;
252    ///
253    /// # #[tokio::main]
254    /// # async fn main() {
255    /// let contents = b"abcdef";
256    /// let mut rbuf = Vec::new();
257    ///
258    /// let dir = TempDir::new().unwrap();
259    /// let path = dir.path().join("foo");
260    /// let file = fs::OpenOptions::new()
261    ///     .create(true)
262    ///     .write(true)
263    ///     .open(&path)
264    ///     .unwrap();
265    /// let r = file.write_at(contents, 0).unwrap().await.unwrap();
266    /// assert_eq!(r, contents.len());
267    /// drop(file);
268    ///
269    /// let mut file = fs::File::open(path).unwrap();
270    /// assert_eq!(file.read_to_end(&mut rbuf).unwrap(), contents.len());
271    /// assert_eq!(&contents[..], &rbuf[..]);
272    /// # }
273    /// ```
274    fn write_at<'a>(
275        &'a self,
276        buf: &'a [u8],
277        offset: u64,
278    ) -> io::Result<WriteAt<'a>> {
279        let fd = self.as_fd();
280        let source =
281            TokioSource(mio_aio::WriteAt::write_at(fd, offset, buf, 0));
282        Ok(TokioFileFut(Aio::new_for_aio(source)?))
283    }
284
285    /// Asynchronous equivalent of `pwritev`
286    ///
287    /// Similar to
288    /// [pwritev(2)](https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2)
289    /// but asynchronous.  Writes a scatter-gather list of buffers into a
290    /// contiguous portion of a file.
291    ///
292    /// # Parameters
293    ///
294    /// - `bufs`:   The data to write.  A scatter-gather list of buffers.
295    /// - `offset`: Offset within the file at which to begin the write
296    ///
297    /// # Returns
298    ///
299    /// - `Ok(x)`:  The operation was successfully created.  The future may be
300    ///             polled and will eventually return the final status of the
301    ///             operation.
302    /// - `Err(x)`: An error occurred before issueing the operation.  The result
303    ///             may be `drop`ped.
304    ///
305    /// # Examples
306    ///
307    /// ```
308    /// use std::fs;
309    /// use std::io::{IoSlice, Read};
310    /// use tempfile::TempDir;
311    /// use tokio_file::AioFileExt;
312    ///
313    /// # #[tokio::main]
314    /// # async fn main() {
315    /// const EXPECT: &[u8] = b"abcdefghij";
316    /// let wbuf0 = b"abcdef";
317    /// let wbuf1 = b"ghij";
318    /// let wbufs = vec![IoSlice::new(wbuf0), IoSlice::new(wbuf1)];
319    /// let mut rbuf = Vec::new();
320    ///
321    /// let dir = TempDir::new().unwrap();
322    /// let path = dir.path().join("foo");
323    /// let file = fs::OpenOptions::new()
324    ///     .create(true)
325    ///     .write(true)
326    ///     .open(&path)
327    ///     .unwrap();
328    /// let r = file.writev_at(&wbufs[..], 0).unwrap().await.unwrap();
329    ///
330    /// assert_eq!(r, 10);
331    ///
332    /// let mut f = fs::File::open(path).unwrap();
333    /// let len = f.read_to_end(&mut rbuf).unwrap();
334    /// assert_eq!(len, EXPECT.len());
335    /// assert_eq!(rbuf, EXPECT);
336    /// # }
337    fn writev_at<'a>(
338        &'a self,
339        bufs: &[IoSlice<'a>],
340        offset: u64,
341    ) -> io::Result<WritevAt<'a>> {
342        let fd = self.as_fd();
343        let source =
344            TokioSource(mio_aio::WritevAt::writev_at(fd, offset, bufs, 0));
345        Ok(TokioFileFut(Aio::new_for_aio(source)?))
346    }
347}
348
349impl<T: AsFd> AioFileExt for T {}