1use core::future::Future;
4
5use std::{
6 io::{self, SeekFrom},
7 path::PathBuf,
8 time::SystemTime,
9};
10
11use bytes::BytesMut;
12
13pub trait AsyncFs {
15 type File: ChunkRead + Meta;
16 type OpenFuture: Future<Output = io::Result<Self::File>>;
17
18 fn open(&self, path: PathBuf) -> Self::OpenFuture;
20}
21
22pub trait Meta {
24 fn modified(&mut self) -> Option<SystemTime>;
26
27 fn len(&self) -> u64;
29
30 #[cold]
31 #[inline(never)]
32 fn is_empty(&self) -> bool {
33 self.len() == 0
34 }
35}
36
37pub trait ChunkRead: Sized {
39 type SeekFuture<'f>: Future<Output = io::Result<()>> + 'f
40 where
41 Self: 'f;
42
43 type Future: Future<Output = io::Result<Option<(Self, BytesMut, usize)>>>;
44
45 fn seek(&mut self, pos: SeekFrom) -> Self::SeekFuture<'_>;
47
48 fn next(self, buf: BytesMut) -> Self::Future;
54}
55
56#[cfg(feature = "tokio")]
57pub(crate) use tokio_impl::TokioFs;
58
59#[cfg(feature = "tokio")]
60mod tokio_impl {
61 use core::pin::Pin;
62
63 use tokio::{
64 fs::File,
65 io::{AsyncReadExt, AsyncSeekExt},
66 };
67
68 use super::*;
69
70 type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
71
72 #[derive(Clone)]
73 pub struct TokioFs;
74
75 impl AsyncFs for TokioFs {
76 type File = TokioFile;
77 type OpenFuture = BoxFuture<'static, io::Result<Self::File>>;
78
79 fn open(&self, path: PathBuf) -> Self::OpenFuture {
80 Box::pin(async {
81 let file = File::open(path).await?;
82 let meta = file.metadata().await?;
83
84 let modified_time = meta.modified().ok();
85 let len = meta.len();
86
87 Ok(TokioFile {
88 file,
89 modified_time,
90 len,
91 })
92 })
93 }
94 }
95
96 pub struct TokioFile {
97 file: File,
98 modified_time: Option<SystemTime>,
99 len: u64,
100 }
101
102 impl Meta for TokioFile {
103 fn modified(&mut self) -> Option<SystemTime> {
104 self.modified_time
105 }
106
107 fn len(&self) -> u64 {
108 self.len
109 }
110 }
111
112 impl ChunkRead for TokioFile {
113 type SeekFuture<'f>
114 = BoxFuture<'f, io::Result<()>>
115 where
116 Self: 'f;
117
118 type Future = BoxFuture<'static, io::Result<Option<(Self, BytesMut, usize)>>>;
119
120 fn seek(&mut self, pos: SeekFrom) -> Self::SeekFuture<'_> {
121 Box::pin(async move { self.file.seek(pos).await.map(|_| ()) })
122 }
123
124 fn next(mut self, mut buf: BytesMut) -> Self::Future {
125 Box::pin(async {
126 let n = self.file.read_buf(&mut buf).await?;
127 if n == 0 { Ok(None) } else { Ok(Some((self, buf, n))) }
128 })
129 }
130 }
131}
132
133#[cfg(feature = "tokio-uring-xitca")]
134pub(crate) use tokio_uring_impl::TokioUringFs;
135
136#[cfg(feature = "tokio-uring-xitca")]
137mod tokio_uring_impl {
138 use core::{
139 future::{Ready, ready},
140 pin::Pin,
141 time::Duration,
142 };
143
144 use tokio_uring_xitca::fs::File;
145
146 use super::*;
147
148 type BoxFuture<'f, T> = Pin<Box<dyn Future<Output = T> + 'f>>;
149
150 #[derive(Clone)]
151 pub struct TokioUringFs;
152
153 impl AsyncFs for TokioUringFs {
154 type File = TokioUringFile;
155 type OpenFuture = BoxFuture<'static, io::Result<Self::File>>;
156
157 fn open(&self, path: PathBuf) -> Self::OpenFuture {
158 Box::pin(async {
159 let file = File::open(path).await?;
160 let statx = file.statx().await?;
161
162 let mtime = statx.stx_mtime;
163 let len = statx.stx_size;
164
165 let modified_time = SystemTime::UNIX_EPOCH
166 .checked_add(Duration::from_secs(mtime.tv_sec as _))
167 .and_then(|time| time.checked_add(Duration::from_nanos(mtime.tv_nsec as _)));
168
169 Ok(TokioUringFile {
170 file,
171 pos: 0,
172 modified_time,
173 len,
174 })
175 })
176 }
177 }
178
179 pub struct TokioUringFile {
180 file: File,
181 pos: u64,
182 modified_time: Option<SystemTime>,
183 len: u64,
184 }
185
186 impl Meta for TokioUringFile {
187 fn modified(&mut self) -> Option<SystemTime> {
188 self.modified_time
189 }
190
191 fn len(&self) -> u64 {
192 self.len
193 }
194 }
195
196 impl ChunkRead for TokioUringFile {
197 type SeekFuture<'f>
198 = Ready<io::Result<()>>
199 where
200 Self: 'f;
201
202 type Future = BoxFuture<'static, io::Result<Option<(Self, BytesMut, usize)>>>;
203
204 fn seek(&mut self, pos: SeekFrom) -> Self::SeekFuture<'_> {
205 let SeekFrom::Start(pos) = pos else {
206 unreachable!("ChunkRead::seek only accept pos as SeekFrom::Start variant")
207 };
208 self.pos += pos;
209 ready(Ok(()))
210 }
211
212 fn next(mut self, buf: BytesMut) -> Self::Future {
213 Box::pin(async {
214 let (res, buf) = self.file.read_at(buf, self.pos).await;
215 let n = res?;
216 if n == 0 {
217 Ok(None)
218 } else {
219 self.pos += n as u64;
220 Ok(Some((self, buf, n)))
221 }
222 })
223 }
224 }
225}
226
227#[cfg(feature = "tokio-uring-xitca")]
228#[cfg(test)]
229mod test {
230 use super::*;
231
232 #[test]
233 fn meta() {
234 tokio_uring_xitca::start(async {
235 let mut file = TokioUringFs::open(&TokioUringFs, "./sample/test.txt".into())
236 .await
237 .unwrap();
238
239 let mut file2 = TokioFs::open(&TokioFs, "./sample/test.txt".into()).await.unwrap();
240
241 assert_eq!(file.modified(), file2.modified());
242 })
243 }
244}