Skip to main content

http_file/
runtime.rs

1//! runtime module contains traits for introducing custom async file system impl.
2
3use core::future::Future;
4
5use std::{
6    io::{self, SeekFrom},
7    path::PathBuf,
8    time::SystemTime,
9};
10
11use bytes::BytesMut;
12
13/// trait for generic over async file systems.
14pub trait AsyncFs {
15    type File: ChunkRead + Meta;
16    type OpenFuture: Future<Output = io::Result<Self::File>>;
17
18    /// open a file from given path.
19    fn open(&self, path: PathBuf) -> Self::OpenFuture;
20}
21
22/// trait for generic over file metadata.
23pub trait Meta {
24    /// the last time when file is modified. optional
25    fn modified(&mut self) -> Option<SystemTime>;
26
27    /// the length hint of file.
28    fn len(&self) -> u64;
29
30    #[cold]
31    #[inline(never)]
32    fn is_empty(&self) -> bool {
33        self.len() == 0
34    }
35}
36
37/// trait for async chunk read from file.
38pub trait ChunkRead: Sized {
39    type SeekFuture<'f>: Future<Output = io::Result<()>> + 'f
40    where
41        Self: 'f;
42
43    type Future: Future<Output = io::Result<Option<(Self, BytesMut, usize)>>>;
44
45    /// seek file to skip n bytes with given offset.
46    fn seek(&mut self, pos: SeekFrom) -> Self::SeekFuture<'_>;
47
48    /// async read of Self and write into given [BytesMut].
49    /// return Ok(Some(Self, BytesMut, usize)) after successful read where usize is the byte count
50    /// of data written into buffer.
51    /// return Ok(None) when self has reached EOF and can not do more read anymore.
52    /// return Err(io::Error) when read error occur.
53    fn next(self, buf: BytesMut) -> Self::Future;
54}
55
56#[cfg(feature = "tokio")]
57pub(crate) use tokio_impl::TokioFs;
58
59#[cfg(feature = "tokio")]
60mod tokio_impl {
61    use core::pin::Pin;
62
63    use tokio::{
64        fs::File,
65        io::{AsyncReadExt, AsyncSeekExt},
66    };
67
68    use super::*;
69
70    type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
71
72    #[derive(Clone)]
73    pub struct TokioFs;
74
75    impl AsyncFs for TokioFs {
76        type File = TokioFile;
77        type OpenFuture = BoxFuture<'static, io::Result<Self::File>>;
78
79        fn open(&self, path: PathBuf) -> Self::OpenFuture {
80            Box::pin(async {
81                let file = File::open(path).await?;
82                let meta = file.metadata().await?;
83
84                let modified_time = meta.modified().ok();
85                let len = meta.len();
86
87                Ok(TokioFile {
88                    file,
89                    modified_time,
90                    len,
91                })
92            })
93        }
94    }
95
96    pub struct TokioFile {
97        file: File,
98        modified_time: Option<SystemTime>,
99        len: u64,
100    }
101
102    impl Meta for TokioFile {
103        fn modified(&mut self) -> Option<SystemTime> {
104            self.modified_time
105        }
106
107        fn len(&self) -> u64 {
108            self.len
109        }
110    }
111
112    impl ChunkRead for TokioFile {
113        type SeekFuture<'f>
114            = BoxFuture<'f, io::Result<()>>
115        where
116            Self: 'f;
117
118        type Future = BoxFuture<'static, io::Result<Option<(Self, BytesMut, usize)>>>;
119
120        fn seek(&mut self, pos: SeekFrom) -> Self::SeekFuture<'_> {
121            Box::pin(async move { self.file.seek(pos).await.map(|_| ()) })
122        }
123
124        fn next(mut self, mut buf: BytesMut) -> Self::Future {
125            Box::pin(async {
126                let n = self.file.read_buf(&mut buf).await?;
127                if n == 0 { Ok(None) } else { Ok(Some((self, buf, n))) }
128            })
129        }
130    }
131}
132
133#[cfg(feature = "tokio-uring-xitca")]
134pub(crate) use tokio_uring_impl::TokioUringFs;
135
136#[cfg(feature = "tokio-uring-xitca")]
137mod tokio_uring_impl {
138    use core::{
139        future::{Ready, ready},
140        pin::Pin,
141        time::Duration,
142    };
143
144    use tokio_uring_xitca::fs::File;
145
146    use super::*;
147
148    type BoxFuture<'f, T> = Pin<Box<dyn Future<Output = T> + 'f>>;
149
150    #[derive(Clone)]
151    pub struct TokioUringFs;
152
153    impl AsyncFs for TokioUringFs {
154        type File = TokioUringFile;
155        type OpenFuture = BoxFuture<'static, io::Result<Self::File>>;
156
157        fn open(&self, path: PathBuf) -> Self::OpenFuture {
158            Box::pin(async {
159                let file = File::open(path).await?;
160                let statx = file.statx().await?;
161
162                let mtime = statx.stx_mtime;
163                let len = statx.stx_size;
164
165                let modified_time = SystemTime::UNIX_EPOCH
166                    .checked_add(Duration::from_secs(mtime.tv_sec as _))
167                    .and_then(|time| time.checked_add(Duration::from_nanos(mtime.tv_nsec as _)));
168
169                Ok(TokioUringFile {
170                    file,
171                    pos: 0,
172                    modified_time,
173                    len,
174                })
175            })
176        }
177    }
178
179    pub struct TokioUringFile {
180        file: File,
181        pos: u64,
182        modified_time: Option<SystemTime>,
183        len: u64,
184    }
185
186    impl Meta for TokioUringFile {
187        fn modified(&mut self) -> Option<SystemTime> {
188            self.modified_time
189        }
190
191        fn len(&self) -> u64 {
192            self.len
193        }
194    }
195
196    impl ChunkRead for TokioUringFile {
197        type SeekFuture<'f>
198            = Ready<io::Result<()>>
199        where
200            Self: 'f;
201
202        type Future = BoxFuture<'static, io::Result<Option<(Self, BytesMut, usize)>>>;
203
204        fn seek(&mut self, pos: SeekFrom) -> Self::SeekFuture<'_> {
205            let SeekFrom::Start(pos) = pos else {
206                unreachable!("ChunkRead::seek only accept pos as SeekFrom::Start variant")
207            };
208            self.pos += pos;
209            ready(Ok(()))
210        }
211
212        fn next(mut self, buf: BytesMut) -> Self::Future {
213            Box::pin(async {
214                let (res, buf) = self.file.read_at(buf, self.pos).await;
215                let n = res?;
216                if n == 0 {
217                    Ok(None)
218                } else {
219                    self.pos += n as u64;
220                    Ok(Some((self, buf, n)))
221                }
222            })
223        }
224    }
225}
226
227#[cfg(feature = "tokio-uring-xitca")]
228#[cfg(test)]
229mod test {
230    use super::*;
231
232    #[test]
233    fn meta() {
234        tokio_uring_xitca::start(async {
235            let mut file = TokioUringFs::open(&TokioUringFs, "./sample/test.txt".into())
236                .await
237                .unwrap();
238
239            let mut file2 = TokioFs::open(&TokioFs, "./sample/test.txt".into()).await.unwrap();
240
241            assert_eq!(file.modified(), file2.modified());
242        })
243    }
244}