fuse_backend_rs/common/
async_file.rs

1// Copyright (C) 2022 Alibaba Cloud. All rights reserved.
2//
3// SPDX-License-Identifier: Apache-2.0
4
5//! `File` to wrap over `tokio::fs::File` and `tokio-uring::fs::File`.
6
7use std::fmt::{Debug, Formatter};
8use std::io::{ErrorKind, IoSlice, IoSliceMut};
9use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
10use std::path::Path;
11
12use crate::async_runtime::{RuntimeType, RUNTIME_TYPE};
13use crate::file_buf::FileVolatileBuf;
14use crate::{off64_t, preadv64, pwritev64};
15
16/// An adapter enum to support both tokio and tokio-uring asynchronous `File`.
17pub enum File {
18    /// Tokio asynchronous `File`.
19    Tokio(tokio::fs::File),
20    #[cfg(target_os = "linux")]
21    /// Tokio-uring asynchronous `File`.
22    Uring(tokio_uring::fs::File),
23}
24
25impl File {
26    /// Asynchronously open a file.
27    pub async fn async_open<P: AsRef<Path>>(
28        path: P,
29        write: bool,
30        create: bool,
31    ) -> std::io::Result<Self> {
32        match *RUNTIME_TYPE {
33            RuntimeType::Tokio => tokio::fs::OpenOptions::new()
34                .read(true)
35                .write(write)
36                .create(create)
37                .open(path)
38                .await
39                .map(File::Tokio),
40            #[cfg(target_os = "linux")]
41            RuntimeType::Uring => tokio_uring::fs::OpenOptions::new()
42                .read(true)
43                .write(write)
44                .create(create)
45                .open(path)
46                .await
47                .map(File::Uring),
48        }
49    }
50
51    /// Asynchronously read data at `offset` into the buffer.
52    pub async fn async_read_at(
53        &self,
54        buf: FileVolatileBuf,
55        offset: u64,
56    ) -> (std::io::Result<usize>, FileVolatileBuf) {
57        match self {
58            File::Tokio(f) => {
59                // tokio::fs:File doesn't support read_at() yet.
60                //f.read_at(buf, offset).await,
61                let mut bufs = [buf];
62                let res = preadv(f.as_raw_fd(), &mut bufs, offset);
63                (res, bufs[0])
64            }
65            #[cfg(target_os = "linux")]
66            File::Uring(f) => f.read_at(buf, offset).await,
67        }
68    }
69
70    /// Asynchronously read data at `offset` into buffers.
71    pub async fn async_readv_at(
72        &self,
73        mut bufs: Vec<FileVolatileBuf>,
74        offset: u64,
75    ) -> (std::io::Result<usize>, Vec<FileVolatileBuf>) {
76        match self {
77            File::Tokio(f) => {
78                // tokio::fs:File doesn't support read_at() yet.
79                //f.read_at(buf, offset).await,
80                let res = preadv(f.as_raw_fd(), &mut bufs, offset);
81                (res, bufs)
82            }
83            #[cfg(target_os = "linux")]
84            File::Uring(f) => f.readv_at(bufs, offset).await,
85        }
86    }
87
88    /// Asynchronously write data at `offset` from the buffer.
89    pub async fn async_write_at(
90        &self,
91        buf: FileVolatileBuf,
92        offset: u64,
93    ) -> (std::io::Result<usize>, FileVolatileBuf) {
94        match self {
95            File::Tokio(f) => {
96                // tokio::fs:File doesn't support read_at() yet.
97                //f.read_at(buf, offset).await,
98                let bufs = [buf];
99                let res = pwritev(f.as_raw_fd(), &bufs, offset);
100                (res, bufs[0])
101            }
102            #[cfg(target_os = "linux")]
103            File::Uring(f) => f.write_at(buf, offset).await,
104        }
105    }
106
107    /// Asynchronously write data at `offset` from buffers.
108    pub async fn async_writev_at(
109        &self,
110        bufs: Vec<FileVolatileBuf>,
111        offset: u64,
112    ) -> (std::io::Result<usize>, Vec<FileVolatileBuf>) {
113        match self {
114            File::Tokio(f) => {
115                // tokio::fs:File doesn't support read_at() yet.
116                //f.read_at(buf, offset).await,
117                let res = pwritev(f.as_raw_fd(), &bufs, offset);
118                (res, bufs)
119            }
120            #[cfg(target_os = "linux")]
121            File::Uring(f) => f.writev_at(bufs, offset).await,
122        }
123    }
124
125    /// Get metadata about the file.
126    pub fn metadata(&self) -> std::io::Result<std::fs::Metadata> {
127        // Safe because we have manually forget() the `file` object below.
128        let file = unsafe { std::fs::File::from_raw_fd(self.as_raw_fd()) };
129        let res = file.metadata();
130        std::mem::forget(file);
131        res
132    }
133
134    /// Try to clone the file object.
135    pub async fn async_try_clone(&self) -> std::io::Result<Self> {
136        match self {
137            File::Tokio(f) => f.try_clone().await.map(File::Tokio),
138            #[cfg(target_os = "linux")]
139            File::Uring(f) => {
140                // Safe because file.as_raw_fd() is valid RawFd and we have checked the result.
141                let fd = unsafe { libc::dup(f.as_raw_fd()) };
142                if fd < 0 {
143                    Err(std::io::Error::last_os_error())
144                } else {
145                    // Safe because we dup a new raw fd.
146                    Ok(File::Uring(unsafe {
147                        tokio_uring::fs::File::from_raw_fd(fd)
148                    }))
149                }
150            }
151        }
152    }
153}
154
155impl AsRawFd for File {
156    fn as_raw_fd(&self) -> RawFd {
157        match self {
158            File::Tokio(f) => f.as_raw_fd(),
159            #[cfg(target_os = "linux")]
160            File::Uring(f) => f.as_raw_fd(),
161        }
162    }
163}
164
165impl Debug for File {
166    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
167        let fd = self.as_raw_fd();
168        write!(f, "Async File {}", fd)
169    }
170}
171
172/// A simple wrapper over posix `preadv` to deal with `FileVolatileBuf`.
173pub fn preadv(fd: RawFd, bufs: &mut [FileVolatileBuf], offset: u64) -> std::io::Result<usize> {
174    let iov: Vec<IoSliceMut> = bufs.iter().map(|v| v.io_slice_mut()).collect();
175
176    loop {
177        // SAFETY: it is ABI compatible, a pointer cast here is valid
178        let res = unsafe {
179            preadv64(
180                fd,
181                iov.as_ptr() as *const libc::iovec,
182                iov.len() as libc::c_int,
183                offset as off64_t,
184            )
185        };
186
187        if res >= 0 {
188            let mut count = res as usize;
189            for buf in bufs.iter_mut() {
190                let cnt = std::cmp::min(count, buf.cap() - buf.len());
191                unsafe { buf.set_size(buf.len() + cnt) };
192                count -= cnt;
193                if count == 0 {
194                    break;
195                }
196            }
197            assert_eq!(count, 0);
198            return Ok(res as usize);
199        } else {
200            let e = std::io::Error::last_os_error();
201            // Retry if the IO is interrupted by signal.
202            if e.kind() != ErrorKind::Interrupted {
203                return Err(e);
204            }
205        }
206    }
207}
208
209/// A simple wrapper over posix `pwritev` to deal with `FileVolatileBuf`.
210pub fn pwritev(fd: RawFd, bufs: &[FileVolatileBuf], offset: u64) -> std::io::Result<usize> {
211    let iov: Vec<IoSlice> = bufs.iter().map(|v| v.io_slice()).collect();
212
213    loop {
214        // SAFETY: it is ABI compatible, a pointer cast here is valid
215        let res = unsafe {
216            pwritev64(
217                fd,
218                iov.as_ptr() as *const libc::iovec,
219                iov.len() as libc::c_int,
220                offset as off64_t,
221            )
222        };
223
224        if res >= 0 {
225            return Ok(res as usize);
226        } else {
227            let e = std::io::Error::last_os_error();
228            // Retry if the IO is interrupted by signal.
229            if e.kind() != ErrorKind::Interrupted {
230                return Err(e);
231            }
232        }
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use super::*;
239    use crate::async_runtime::block_on;
240    use vmm_sys_util::tempdir::TempDir;
241
242    #[test]
243    fn test_new_async_file() {
244        let dir = TempDir::new().unwrap();
245        let path = dir.as_path().to_path_buf().join("test.txt");
246        std::fs::write(&path, b"test").unwrap();
247
248        let file = block_on(async { File::async_open(&path, false, false).await.unwrap() });
249        assert!(file.as_raw_fd() >= 0);
250        drop(file);
251    }
252
253    #[test]
254    fn test_async_file_metadata() {
255        let dir = TempDir::new().unwrap();
256        let path = dir.as_path().to_path_buf();
257        std::fs::write(path.join("test.txt"), b"test").unwrap();
258        let file = block_on(async {
259            File::async_open(path.join("test.txt"), false, false)
260                .await
261                .unwrap()
262        });
263
264        let md = file.metadata().unwrap();
265        assert!(md.is_file());
266        let md = file.metadata().unwrap();
267        assert!(md.is_file());
268
269        drop(file);
270    }
271
272    #[test]
273    fn test_async_read_at() {
274        let dir = TempDir::new().unwrap();
275        let path = dir.as_path().to_path_buf();
276        std::fs::write(path.join("test.txt"), b"test").unwrap();
277
278        block_on(async {
279            let file = File::async_open(path.join("test.txt"), false, false)
280                .await
281                .unwrap();
282
283            let mut buffer = [0u8; 3];
284            let buf = unsafe { FileVolatileBuf::new(&mut buffer) };
285            let (res, buf) = file.async_read_at(buf, 0).await;
286            assert_eq!(res.unwrap(), 3);
287            assert_eq!(buf.len(), 3);
288            let buf = unsafe { FileVolatileBuf::new(&mut buffer) };
289            let (res, buf) = file.async_read_at(buf, 2).await;
290            assert_eq!(res.unwrap(), 2);
291            assert_eq!(buf.len(), 2);
292        });
293    }
294
295    #[test]
296    fn test_async_readv_at() {
297        let dir = TempDir::new().unwrap();
298        let path = dir.as_path().to_path_buf();
299        std::fs::write(path.join("test.txt"), b"test").unwrap();
300
301        block_on(async {
302            let file = File::async_open(path.join("test.txt"), false, false)
303                .await
304                .unwrap();
305
306            let mut buffer = [0u8; 3];
307            let buf = unsafe { FileVolatileBuf::new(&mut buffer) };
308            let mut buffer2 = [0u8; 3];
309            let buf2 = unsafe { FileVolatileBuf::new(&mut buffer2) };
310            let bufs = vec![buf, buf2];
311            let (res, bufs) = file.async_readv_at(bufs, 0).await;
312
313            assert_eq!(res.unwrap(), 4);
314            assert_eq!(bufs[0].len(), 3);
315            assert_eq!(bufs[1].len(), 1);
316        });
317    }
318
319    #[test]
320    fn test_async_write_at() {
321        let dir = TempDir::new().unwrap();
322        let path = dir.as_path().to_path_buf();
323
324        block_on(async {
325            let file = File::async_open(path.join("test.txt"), true, true)
326                .await
327                .unwrap();
328
329            let buffer = b"test";
330            let buf = unsafe {
331                FileVolatileBuf::from_raw_ptr(
332                    buffer.as_ptr() as *mut u8,
333                    buffer.len(),
334                    buffer.len(),
335                )
336            };
337            let (res, buf) = file.async_write_at(buf, 0).await;
338            assert_eq!(res.unwrap(), 4);
339            assert_eq!(buf.len(), 4);
340
341            let res = std::fs::read_to_string(path.join("test.txt")).unwrap();
342            assert_eq!(&res, "test");
343        });
344    }
345
346    #[test]
347    fn test_async_writev_at() {
348        let dir = TempDir::new().unwrap();
349        let path = dir.as_path().to_path_buf();
350
351        block_on(async {
352            let file = File::async_open(path.join("test.txt"), true, true)
353                .await
354                .unwrap();
355
356            let buffer = b"tes";
357            let buf = unsafe {
358                FileVolatileBuf::from_raw_ptr(
359                    buffer.as_ptr() as *mut u8,
360                    buffer.len(),
361                    buffer.len(),
362                )
363            };
364            let buffer2 = b"t";
365            let buf2 = unsafe {
366                FileVolatileBuf::from_raw_ptr(
367                    buffer2.as_ptr() as *mut u8,
368                    buffer2.len(),
369                    buffer2.len(),
370                )
371            };
372            let bufs = vec![buf, buf2];
373            let (res, bufs) = file.async_writev_at(bufs, 0).await;
374
375            assert_eq!(res.unwrap(), 4);
376            assert_eq!(bufs[0].len(), 3);
377            assert_eq!(bufs[1].len(), 1);
378
379            let res = std::fs::read_to_string(path.join("test.txt")).unwrap();
380            assert_eq!(&res, "test");
381        });
382    }
383
384    #[test]
385    fn test_async_try_clone() {
386        let dir = TempDir::new().unwrap();
387        let path = dir.as_path().to_path_buf();
388
389        block_on(async {
390            let file = File::async_open(path.join("test.txt"), true, true)
391                .await
392                .unwrap();
393
394            let file2 = file.async_try_clone().await.unwrap();
395            drop(file);
396
397            let buffer = b"test";
398            let buf = unsafe {
399                FileVolatileBuf::from_raw_ptr(
400                    buffer.as_ptr() as *mut u8,
401                    buffer.len(),
402                    buffer.len(),
403                )
404            };
405            let (res, buf) = file2.async_write_at(buf, 0).await;
406            assert_eq!(res.unwrap(), 4);
407            assert_eq!(buf.len(), 4);
408        });
409    }
410}