1use core::future::Future;
4
5use std::{
6 io::{self, SeekFrom},
7 path::PathBuf,
8 time::SystemTime,
9};
10
11use bytes::BytesMut;
12
13pub trait AsyncFs {
15 type File: ChunkRead + Meta;
16 type OpenFuture: Future<Output = io::Result<Self::File>>;
17
18 fn open(&self, path: PathBuf) -> Self::OpenFuture;
20}
21
22pub trait Meta {
24 fn modified(&mut self) -> Option<SystemTime>;
26
27 fn len(&self) -> u64;
29
30 #[cold]
31 #[inline(never)]
32 fn is_empty(&self) -> bool {
33 self.len() == 0
34 }
35}
36
37pub trait ChunkRead: Sized {
39 type SeekFuture<'f>: Future<Output = io::Result<()>> + 'f
40 where
41 Self: 'f;
42
43 type Future: Future<Output = io::Result<Option<(Self, BytesMut, usize)>>>;
44
45 fn seek(&mut self, pos: SeekFrom) -> Self::SeekFuture<'_>;
47
48 fn next(self, buf: BytesMut) -> Self::Future;
54}
55
56#[cfg(feature = "tokio")]
57pub(crate) use tokio_impl::TokioFs;
58
59#[cfg(feature = "tokio")]
60mod tokio_impl {
61 use core::pin::Pin;
62
63 use tokio::{
64 fs::File,
65 io::{AsyncReadExt, AsyncSeekExt},
66 };
67
68 use super::*;
69
70 type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
71
72 #[derive(Clone)]
73 pub struct TokioFs;
74
75 impl AsyncFs for TokioFs {
76 type File = TokioFile;
77 type OpenFuture = BoxFuture<'static, io::Result<Self::File>>;
78
79 fn open(&self, path: PathBuf) -> Self::OpenFuture {
80 Box::pin(async {
81 let file = tokio::fs::File::open(path).await?;
82 let meta = file.metadata().await?;
83
84 let modified_time = meta.modified().ok();
85 let len = meta.len();
86
87 Ok(TokioFile {
88 file,
89 modified_time,
90 len,
91 })
92 })
93 }
94 }
95
96 pub struct TokioFile {
97 file: File,
98 modified_time: Option<SystemTime>,
99 len: u64,
100 }
101
102 impl Meta for TokioFile {
103 fn modified(&mut self) -> Option<SystemTime> {
104 self.modified_time
105 }
106
107 fn len(&self) -> u64 {
108 self.len
109 }
110 }
111
112 impl ChunkRead for TokioFile {
113 type SeekFuture<'f>
114 = BoxFuture<'f, io::Result<()>>
115 where
116 Self: 'f;
117
118 type Future = BoxFuture<'static, io::Result<Option<(Self, BytesMut, usize)>>>;
119
120 fn seek(&mut self, pos: SeekFrom) -> Self::SeekFuture<'_> {
121 Box::pin(async move { self.file.seek(pos).await.map(|_| ()) })
122 }
123
124 fn next(mut self, mut buf: BytesMut) -> Self::Future {
125 Box::pin(async {
126 let n = self.file.read_buf(&mut buf).await?;
127 if n == 0 {
128 Ok(None)
129 } else {
130 Ok(Some((self, buf, n)))
131 }
132 })
133 }
134 }
135}
136
137#[cfg(feature = "tokio-uring")]
138pub(crate) use tokio_uring_impl::TokioUringFs;
139
140#[cfg(feature = "tokio-uring")]
141mod tokio_uring_impl {
142 use core::{
143 future::{ready, Ready},
144 pin::Pin,
145 time::Duration,
146 };
147
148 use tokio_uring::fs::File;
149
150 use super::*;
151
152 type BoxFuture<'f, T> = Pin<Box<dyn Future<Output = T> + 'f>>;
153
154 #[derive(Clone)]
155 pub struct TokioUringFs;
156
157 impl AsyncFs for TokioUringFs {
158 type File = TokioUringFile;
159 type OpenFuture = BoxFuture<'static, io::Result<Self::File>>;
160
161 fn open(&self, path: PathBuf) -> Self::OpenFuture {
162 Box::pin(async {
163 let file = File::open(path).await?;
164 let statx = file.statx().await?;
165
166 let mtime = statx.stx_mtime;
167 let len = statx.stx_size;
168
169 let modified_time = SystemTime::UNIX_EPOCH
170 .checked_add(Duration::from_secs(mtime.tv_sec as _))
171 .and_then(|time| time.checked_add(Duration::from_nanos(mtime.tv_nsec as _)));
172
173 Ok(TokioUringFile {
174 file,
175 pos: 0,
176 modified_time,
177 len,
178 })
179 })
180 }
181 }
182
183 pub struct TokioUringFile {
184 file: File,
185 pos: u64,
186 modified_time: Option<SystemTime>,
187 len: u64,
188 }
189
190 impl Meta for TokioUringFile {
191 fn modified(&mut self) -> Option<SystemTime> {
192 self.modified_time
193 }
194
195 fn len(&self) -> u64 {
196 self.len
197 }
198 }
199
200 impl ChunkRead for TokioUringFile {
201 type SeekFuture<'f>
202 = Ready<io::Result<()>>
203 where
204 Self: 'f;
205
206 type Future = BoxFuture<'static, io::Result<Option<(Self, BytesMut, usize)>>>;
207
208 fn seek(&mut self, pos: SeekFrom) -> Self::SeekFuture<'_> {
209 let SeekFrom::Start(pos) = pos else {
210 unreachable!("ChunkRead::seek only accept pos as SeekFrom::Start variant")
211 };
212 self.pos += pos;
213 ready(Ok(()))
214 }
215
216 fn next(mut self, buf: BytesMut) -> Self::Future {
217 Box::pin(async {
218 let (res, buf) = self.file.read_at(buf, self.pos).await;
219 let n = res?;
220 if n == 0 {
221 Ok(None)
222 } else {
223 self.pos += n as u64;
224 Ok(Some((self, buf, n)))
225 }
226 })
227 }
228 }
229}
230
231#[cfg(feature = "tokio-uring")]
232#[cfg(test)]
233mod test {
234 use super::*;
235
236 #[test]
237 fn meta() {
238 tokio_uring::start(async {
239 let mut file = TokioUringFs::open(&TokioUringFs, "./sample/test.txt".into())
240 .await
241 .unwrap();
242
243 let mut file2 = TokioFs::open(&TokioFs, "./sample/test.txt".into()).await.unwrap();
244
245 assert_eq!(file.modified(), file2.modified());
246 })
247 }
248}