http_file/
runtime.rs

1//! runtime module contains traits for introducing custom async file system impl.
2
3use core::future::Future;
4
5use std::{
6    io::{self, SeekFrom},
7    path::PathBuf,
8    time::SystemTime,
9};
10
11use bytes::BytesMut;
12
13/// trait for generic over async file systems.
14pub trait AsyncFs {
15    type File: ChunkRead + Meta;
16    type OpenFuture: Future<Output = io::Result<Self::File>>;
17
18    /// open a file from given path.
19    fn open(&self, path: PathBuf) -> Self::OpenFuture;
20}
21
22/// trait for generic over file metadata.
23pub trait Meta {
24    /// the last time when file is modified. optional
25    fn modified(&mut self) -> Option<SystemTime>;
26
27    /// the length hint of file.
28    fn len(&self) -> u64;
29
30    #[cold]
31    #[inline(never)]
32    fn is_empty(&self) -> bool {
33        self.len() == 0
34    }
35}
36
37/// trait for async chunk read from file.
38pub trait ChunkRead: Sized {
39    type SeekFuture<'f>: Future<Output = io::Result<()>> + 'f
40    where
41        Self: 'f;
42
43    type Future: Future<Output = io::Result<Option<(Self, BytesMut, usize)>>>;
44
45    /// seek file to skip n bytes with given offset.
46    fn seek(&mut self, pos: SeekFrom) -> Self::SeekFuture<'_>;
47
48    /// async read of Self and write into given [BytesMut].
49    /// return Ok(Some(Self, BytesMut, usize)) after successful read where usize is the byte count
50    /// of data written into buffer.
51    /// return Ok(None) when self has reached EOF and can not do more read anymore.
52    /// return Err(io::Error) when read error occur.
53    fn next(self, buf: BytesMut) -> Self::Future;
54}
55
56#[cfg(feature = "tokio")]
57pub(crate) use tokio_impl::TokioFs;
58
59#[cfg(feature = "tokio")]
60mod tokio_impl {
61    use core::{
62        pin::Pin,
63        task::{Context, Poll},
64    };
65
66    use tokio::{
67        fs::File,
68        io::{AsyncReadExt, AsyncSeekExt},
69    };
70
71    use super::*;
72
73    type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
74
75    pub struct OpenFuture<F> {
76        handle: tokio::task::JoinHandle<F>,
77    }
78
79    impl<F> Future for OpenFuture<F> {
80        type Output = F;
81
82        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
83            Pin::new(&mut self.get_mut().handle).poll(cx).map(|res| res.unwrap())
84        }
85    }
86
87    #[derive(Clone)]
88    pub struct TokioFs;
89
90    impl AsyncFs for TokioFs {
91        type File = TokioFile;
92        type OpenFuture = OpenFuture<io::Result<Self::File>>;
93
94        fn open(&self, path: PathBuf) -> Self::OpenFuture {
95            OpenFuture {
96                handle: tokio::task::spawn_blocking(move || {
97                    let file = std::fs::File::open(path)?;
98                    let meta = file.metadata()?;
99                    let modified_time = meta.modified().ok();
100                    let len = meta.len();
101                    Ok(TokioFile {
102                        file: file.into(),
103                        modified_time,
104                        len,
105                    })
106                }),
107            }
108        }
109    }
110
111    pub struct TokioFile {
112        file: File,
113        modified_time: Option<SystemTime>,
114        len: u64,
115    }
116
117    impl Meta for TokioFile {
118        fn modified(&mut self) -> Option<SystemTime> {
119            self.modified_time
120        }
121
122        fn len(&self) -> u64 {
123            self.len
124        }
125    }
126
127    impl ChunkRead for TokioFile {
128        type SeekFuture<'f>
129            = BoxFuture<'f, io::Result<()>>
130        where
131            Self: 'f;
132
133        type Future = BoxFuture<'static, io::Result<Option<(Self, BytesMut, usize)>>>;
134
135        fn seek(&mut self, pos: SeekFrom) -> Self::SeekFuture<'_> {
136            Box::pin(async move { self.file.seek(pos).await.map(|_| ()) })
137        }
138
139        fn next(mut self, mut buf: BytesMut) -> Self::Future {
140            Box::pin(async {
141                let n = self.file.read_buf(&mut buf).await?;
142                if n == 0 {
143                    Ok(None)
144                } else {
145                    Ok(Some((self, buf, n)))
146                }
147            })
148        }
149    }
150}
151
152#[cfg(feature = "tokio-uring")]
153pub(crate) use tokio_uring_impl::TokioUringFs;
154
155#[cfg(feature = "tokio-uring")]
156mod tokio_uring_impl {
157    use core::{
158        future::{ready, Ready},
159        pin::Pin,
160    };
161
162    use tokio_uring::fs::File;
163
164    use super::*;
165
166    type BoxFuture<'f, T> = Pin<Box<dyn Future<Output = T> + 'f>>;
167
168    #[derive(Clone)]
169    pub struct TokioUringFs;
170
171    impl AsyncFs for TokioUringFs {
172        type File = TokioUringFile;
173        type OpenFuture = BoxFuture<'static, io::Result<Self::File>>;
174
175        fn open(&self, path: PathBuf) -> Self::OpenFuture {
176            Box::pin(async {
177                let file = File::open(path).await?;
178
179                // SAFETY: fd is borrowed and lives longer than the unsafe block
180                let meta = unsafe {
181                    use std::os::fd::{AsRawFd, FromRawFd};
182
183                    let file = std::fs::File::from_raw_fd(file.as_raw_fd());
184                    let md = file.metadata();
185                    // SAFETY: forget the fd before exiting block in success or error case but don't
186                    // run destructor (that would close file handle)
187                    core::mem::forget(file);
188                    md?
189                };
190
191                let modified_time = meta.modified().ok();
192                let len = meta.len();
193
194                Ok(TokioUringFile {
195                    file,
196                    pos: 0,
197                    modified_time,
198                    len,
199                })
200            })
201        }
202    }
203
204    pub struct TokioUringFile {
205        file: File,
206        pos: u64,
207        modified_time: Option<SystemTime>,
208        len: u64,
209    }
210
211    impl Meta for TokioUringFile {
212        fn modified(&mut self) -> Option<SystemTime> {
213            self.modified_time
214        }
215
216        fn len(&self) -> u64 {
217            self.len
218        }
219    }
220
221    impl ChunkRead for TokioUringFile {
222        type SeekFuture<'f>
223            = Ready<io::Result<()>>
224        where
225            Self: 'f;
226
227        type Future = BoxFuture<'static, io::Result<Option<(Self, BytesMut, usize)>>>;
228
229        fn seek(&mut self, pos: SeekFrom) -> Self::SeekFuture<'_> {
230            let SeekFrom::Start(pos) = pos else {
231                unreachable!("ChunkRead::seek only accept pos as SeekFrom::Start variant")
232            };
233            self.pos += pos;
234            ready(Ok(()))
235        }
236
237        fn next(mut self, buf: BytesMut) -> Self::Future {
238            Box::pin(async {
239                let (res, buf) = self.file.read_at(buf, self.pos).await;
240                let n = res?;
241                if n == 0 {
242                    Ok(None)
243                } else {
244                    self.pos += n as u64;
245                    Ok(Some((self, buf, n)))
246                }
247            })
248        }
249    }
250}