tokio_file/file.rs
1// vim: tw=80
2// This lint isn't very helpful. See
3// https://github.com/rust-lang/rust-clippy/discussions/14256
4#![allow(clippy::doc_overindented_list_items)]
5
6use std::{
7 io::{self, IoSlice, IoSliceMut},
8 os::unix::io::{AsFd, RawFd},
9 pin::Pin,
10};
11
12use futures::{
13 task::{Context, Poll},
14 Future,
15};
16use mio_aio::AioFsyncMode;
17use tokio::io::bsd::{Aio, AioSource};
18
19nix::ioctl_read! {
20 /// Get the size of the entire device in bytes. This should be a multiple
21 /// of the sector size.
22 diocgmediasize, 'd', 129, nix::libc::off_t
23}
24
25nix::ioctl_read! {
26 diocgsectorsize, 'd', 128, nix::libc::c_uint
27}
28
29nix::ioctl_read! {
30 diocgstripesize, 'd', 139, nix::libc::off_t
31}
32
33#[derive(Debug)]
34struct TokioSource<T>(T);
35
36impl<T: mio_aio::SourceApi> AioSource for TokioSource<T> {
37 fn register(&mut self, kq: RawFd, token: usize) {
38 self.0.register_raw(kq, token)
39 }
40
41 fn deregister(&mut self) {
42 self.0.deregister_raw()
43 }
44}
45
46#[must_use = "futures do nothing unless polled"]
47#[derive(Debug)]
48/// Future type used by all methods of [`File`].
49pub struct TokioFileFut<T: mio_aio::SourceApi>(Aio<TokioSource<T>>);
50
51impl<T: mio_aio::SourceApi> Future for TokioFileFut<T> {
52 type Output = io::Result<T::Output>;
53
54 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
55 let poll_result = self.0.poll_ready(cx);
56 match poll_result {
57 Poll::Pending => {
58 if !self.0 .0.in_progress() {
59 let p = unsafe { self.map_unchecked_mut(|s| &mut s.0 .0) };
60 match p.submit() {
61 Ok(()) => (),
62 Err(e) => {
63 return Poll::Ready(Err(
64 io::Error::from_raw_os_error(e as i32),
65 ))
66 }
67 }
68 }
69 Poll::Pending
70 }
71 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
72 Poll::Ready(Ok(_ev)) => {
73 // At this point, we could clear readiness. But there's no
74 // point, since we're about to drop the Aio.
75 let p = unsafe { self.map_unchecked_mut(|s| &mut s.0 .0) };
76 let result = p.aio_return();
77 match result {
78 Ok(r) => Poll::Ready(Ok(r)),
79 Err(e) => {
80 Poll::Ready(Err(io::Error::from_raw_os_error(e as i32)))
81 }
82 }
83 }
84 }
85 }
86}
87
88/// Return type of [`AioFileExt::read_at`]. Implements `Future`.
89pub type ReadAt<'a> = TokioFileFut<mio_aio::ReadAt<'a>>;
90
91/// Return type of [`AioFileExt::readv_at`]. Implements `Future`.
92pub type ReadvAt<'a> = TokioFileFut<mio_aio::ReadvAt<'a>>;
93
94/// Return type of [`AioFileExt::sync_all`]. Implements `Future`.
95pub type SyncAll<'a> = TokioFileFut<mio_aio::Fsync<'a>>;
96
97/// Return type of [`AioFileExt::write_at`]. Implements `Future`.
98pub type WriteAt<'a> = TokioFileFut<mio_aio::WriteAt<'a>>;
99
100/// Return type of [`AioFileExt::writev_at`]. Implements `Future`.
101pub type WritevAt<'a> = TokioFileFut<mio_aio::WritevAt<'a>>;
102
103/// Adds POSIX AIO-based asynchronous methods to files.
104pub trait AioFileExt: AsFd {
105 /// Asynchronous equivalent of `std::fs::File::read_at`
106 ///
107 /// # Examples
108 ///
109 /// ```
110 /// use std::fs;
111 /// use std::io::Write;
112 /// use tempfile::TempDir;
113 /// use tokio_file::AioFileExt;
114 ///
115 /// # #[tokio::main]
116 /// # async fn main() {
117 /// const WBUF: &[u8] = b"abcdef";
118 /// const EXPECT: &[u8] = b"cdef";
119 /// let mut rbuf = vec![0; 4];
120 /// let dir = TempDir::new().unwrap();
121 /// let path = dir.path().join("foo");
122 /// let mut f = fs::File::create(&path).unwrap();
123 /// f.write(WBUF).unwrap();
124 ///
125 /// let file = fs::OpenOptions::new()
126 /// .read(true)
127 /// .open(path)
128 /// .unwrap();
129 /// let r = file.read_at(&mut rbuf[..], 2).unwrap().await.unwrap();
130 /// assert_eq!(&rbuf[..], &EXPECT[..]);
131 /// # }
132 /// ```
133 fn read_at<'a>(
134 &'a self,
135 buf: &'a mut [u8],
136 offset: u64,
137 ) -> io::Result<ReadAt<'a>> {
138 let fd = self.as_fd();
139 let source = TokioSource(mio_aio::ReadAt::read_at(fd, offset, buf, 0));
140 Ok(TokioFileFut(Aio::new_for_aio(source)?))
141 }
142
143 /// Asynchronous equivalent of `preadv`.
144 ///
145 /// Similar to
146 /// [preadv(2)](https://www.freebsd.org/cgi/man.cgi?query=read&sektion=2)
147 /// but asynchronous. Reads a contiguous portion of a file into a
148 /// scatter-gather list of buffers.
149 ///
150 /// # Parameters
151 ///
152 /// - `bufs`: The destination for the read. A scatter-gather list of
153 /// buffers.
154 /// - `offset`: Offset within the file at which to begin the read
155 ///
156 /// # Returns
157 ///
158 /// - `Ok(x)`: The operation was successfully created. The future may be
159 /// polled and will eventually return the final status of the
160 /// operation.
161 /// - `Err(x)`: An error occurred before issueing the operation. The result
162 /// may be `drop`ped.
163 ///
164 /// # Examples
165 ///
166 /// ```
167 /// use std::borrow::BorrowMut;
168 /// use std::fs;
169 /// use std::io::{IoSliceMut, Write};
170 /// use tempfile::TempDir;
171 /// use tokio_file::AioFileExt;
172 ///
173 /// # #[tokio::main]
174 /// # async fn main() {
175 /// const WBUF: &[u8] = b"abcdefghijklmnopqrwtuvwxyz";
176 /// const EXPECT0: &[u8] = b"cdef";
177 /// const EXPECT1: &[u8] = b"ghijklmn";
178 /// let l0 = 4;
179 /// let l1 = 8;
180 /// let mut rbuf0 = vec![0; l0];
181 /// let mut rbuf1 = vec![0; l1];
182 /// let mut rbufs = [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)];
183 ///
184 /// let dir = TempDir::new().unwrap();
185 /// let path = dir.path().join("foo");
186 /// let mut f = fs::File::create(&path).unwrap();
187 /// f.write(WBUF).unwrap();
188 ///
189 /// let file = fs::OpenOptions::new()
190 /// .read(true)
191 /// .open(path)
192 /// .unwrap();
193 /// let mut r = file.readv_at(&mut rbufs[..], 2).unwrap().await.unwrap();
194 ///
195 /// assert_eq!(l0 + l1, r);
196 /// assert_eq!(&rbuf0[..], &EXPECT0[..]);
197 /// assert_eq!(&rbuf1[..], &EXPECT1[..]);
198 /// # }
199 /// ```
200 fn readv_at<'a>(
201 &'a self,
202 bufs: &mut [IoSliceMut<'a>],
203 offset: u64,
204 ) -> io::Result<ReadvAt<'a>> {
205 let fd = self.as_fd();
206 let source =
207 TokioSource(mio_aio::ReadvAt::readv_at(fd, offset, bufs, 0));
208 Ok(TokioFileFut(Aio::new_for_aio(source)?))
209 }
210
211 /// Asynchronous equivalent of `std::fs::File::sync_all`
212 ///
213 /// # Examples
214 ///
215 /// ```
216 /// use std::borrow::BorrowMut;
217 /// use std::fs;
218 /// use std::io::Write;
219 /// use tempfile::TempDir;
220 /// use tokio_file::AioFileExt;
221 ///
222 /// # #[tokio::main]
223 /// # async fn main() {
224 /// let dir = TempDir::new().unwrap();
225 /// let path = dir.path().join("foo");
226 ///
227 /// let file = fs::OpenOptions::new()
228 /// .write(true)
229 /// .create(true)
230 /// .open(path)
231 /// .unwrap();
232 /// let r = AioFileExt::sync_all(&file).unwrap().await.unwrap();
233 /// # }
234 /// ```
235 // TODO: add sync_all_data, for supported operating systems
236 fn sync_all(&self) -> io::Result<SyncAll> {
237 let mode = AioFsyncMode::O_SYNC;
238 let fd = self.as_fd();
239 let source = TokioSource(mio_aio::Fsync::fsync(fd, mode, 0));
240 Ok(TokioFileFut(Aio::new_for_aio(source)?))
241 }
242
243 /// Asynchronous equivalent of `std::fs::File::write_at`.
244 ///
245 /// # Examples
246 ///
247 /// ```
248 /// use std::fs;
249 /// use std::io::Read;
250 /// use tempfile::TempDir;
251 /// use tokio_file::AioFileExt;
252 ///
253 /// # #[tokio::main]
254 /// # async fn main() {
255 /// let contents = b"abcdef";
256 /// let mut rbuf = Vec::new();
257 ///
258 /// let dir = TempDir::new().unwrap();
259 /// let path = dir.path().join("foo");
260 /// let file = fs::OpenOptions::new()
261 /// .create(true)
262 /// .write(true)
263 /// .open(&path)
264 /// .unwrap();
265 /// let r = file.write_at(contents, 0).unwrap().await.unwrap();
266 /// assert_eq!(r, contents.len());
267 /// drop(file);
268 ///
269 /// let mut file = fs::File::open(path).unwrap();
270 /// assert_eq!(file.read_to_end(&mut rbuf).unwrap(), contents.len());
271 /// assert_eq!(&contents[..], &rbuf[..]);
272 /// # }
273 /// ```
274 fn write_at<'a>(
275 &'a self,
276 buf: &'a [u8],
277 offset: u64,
278 ) -> io::Result<WriteAt<'a>> {
279 let fd = self.as_fd();
280 let source =
281 TokioSource(mio_aio::WriteAt::write_at(fd, offset, buf, 0));
282 Ok(TokioFileFut(Aio::new_for_aio(source)?))
283 }
284
285 /// Asynchronous equivalent of `pwritev`
286 ///
287 /// Similar to
288 /// [pwritev(2)](https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2)
289 /// but asynchronous. Writes a scatter-gather list of buffers into a
290 /// contiguous portion of a file.
291 ///
292 /// # Parameters
293 ///
294 /// - `bufs`: The data to write. A scatter-gather list of buffers.
295 /// - `offset`: Offset within the file at which to begin the write
296 ///
297 /// # Returns
298 ///
299 /// - `Ok(x)`: The operation was successfully created. The future may be
300 /// polled and will eventually return the final status of the
301 /// operation.
302 /// - `Err(x)`: An error occurred before issueing the operation. The result
303 /// may be `drop`ped.
304 ///
305 /// # Examples
306 ///
307 /// ```
308 /// use std::fs;
309 /// use std::io::{IoSlice, Read};
310 /// use tempfile::TempDir;
311 /// use tokio_file::AioFileExt;
312 ///
313 /// # #[tokio::main]
314 /// # async fn main() {
315 /// const EXPECT: &[u8] = b"abcdefghij";
316 /// let wbuf0 = b"abcdef";
317 /// let wbuf1 = b"ghij";
318 /// let wbufs = vec![IoSlice::new(wbuf0), IoSlice::new(wbuf1)];
319 /// let mut rbuf = Vec::new();
320 ///
321 /// let dir = TempDir::new().unwrap();
322 /// let path = dir.path().join("foo");
323 /// let file = fs::OpenOptions::new()
324 /// .create(true)
325 /// .write(true)
326 /// .open(&path)
327 /// .unwrap();
328 /// let r = file.writev_at(&wbufs[..], 0).unwrap().await.unwrap();
329 ///
330 /// assert_eq!(r, 10);
331 ///
332 /// let mut f = fs::File::open(path).unwrap();
333 /// let len = f.read_to_end(&mut rbuf).unwrap();
334 /// assert_eq!(len, EXPECT.len());
335 /// assert_eq!(rbuf, EXPECT);
336 /// # }
337 fn writev_at<'a>(
338 &'a self,
339 bufs: &[IoSlice<'a>],
340 offset: u64,
341 ) -> io::Result<WritevAt<'a>> {
342 let fd = self.as_fd();
343 let source =
344 TokioSource(mio_aio::WritevAt::writev_at(fd, offset, bufs, 0));
345 Ok(TokioFileFut(Aio::new_for_aio(source)?))
346 }
347}
348
349impl<T: AsFd> AioFileExt for T {}