1use std::{
2 cmp::min,
3 collections::HashMap,
4 fs::OpenOptions,
5 future::Future,
6 io::{Cursor, Error, ErrorKind},
7 mem::MaybeUninit,
8 path::{Component, Path, PathBuf},
9 pin::Pin,
10 task::{Context, Poll},
11 time::SystemTime,
12};
13
14use futures_util::future::{ready, Ready};
15use hyper::body::Bytes;
16use tokio::{
17 fs::{self, File},
18 io::{AsyncRead, AsyncSeek, ReadBuf},
19 task::{spawn_blocking, JoinHandle},
20};
21
22#[cfg(windows)]
23use std::os::windows::fs::OpenOptionsExt;
24#[cfg(windows)]
25use winapi::um::winbase::FILE_FLAG_BACKUP_SEMANTICS;
26
27const TOKIO_READ_BUF_SIZE: usize = 8 * 1024;
28
29#[derive(Debug)]
36pub struct FileWithMetadata<F = File> {
37 pub handle: F,
39 pub size: u64,
41 pub modified: Option<SystemTime>,
43 pub is_dir: bool,
45}
46
47pub trait FileOpener: Send + Sync + 'static {
52 type File: IntoFileAccess;
54
55 type Future: Future<Output = Result<FileWithMetadata<Self::File>, Error>> + Send;
57
58 fn open(&self, path: &Path) -> Self::Future;
62}
63
64pub trait IntoFileAccess: Send + Unpin + 'static {
69 type Output: FileAccess;
71
72 fn into_file_access(self) -> Self::Output;
74}
75
76pub trait FileAccess: AsyncSeek + Send + Unpin + 'static {
82 fn poll_read(
90 self: Pin<&mut Self>,
91 cx: &mut Context<'_>,
92 len: usize,
93 ) -> Poll<Result<Bytes, Error>>;
94}
95
96impl IntoFileAccess for File {
101 type Output = TokioFileAccess;
102
103 fn into_file_access(self) -> Self::Output {
104 TokioFileAccess::new(self)
105 }
106}
107
108pub struct TokioFileAccess {
110 file: File,
111 read_buf: Box<[MaybeUninit<u8>; TOKIO_READ_BUF_SIZE]>,
112}
113
114impl TokioFileAccess {
115 pub fn new(file: File) -> Self {
117 TokioFileAccess {
118 file,
119 read_buf: Box::new([MaybeUninit::uninit(); TOKIO_READ_BUF_SIZE]),
120 }
121 }
122}
123
124impl AsyncSeek for TokioFileAccess {
125 fn start_seek(mut self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
126 Pin::new(&mut self.file).start_seek(position)
127 }
128
129 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
130 Pin::new(&mut self.file).poll_complete(cx)
131 }
132}
133
134impl FileAccess for TokioFileAccess {
135 fn poll_read(
136 mut self: Pin<&mut Self>,
137 cx: &mut Context<'_>,
138 len: usize,
139 ) -> Poll<Result<Bytes, Error>> {
140 let Self {
141 ref mut file,
142 ref mut read_buf,
143 } = *self;
144
145 let len = min(len, read_buf.len());
146 let mut read_buf = ReadBuf::uninit(&mut read_buf[..len]);
147 match Pin::new(file).poll_read(cx, &mut read_buf) {
148 Poll::Ready(Ok(())) => {
149 let filled = read_buf.filled();
150 if filled.is_empty() {
151 Poll::Ready(Ok(Bytes::new()))
152 } else {
153 Poll::Ready(Ok(Bytes::copy_from_slice(filled)))
154 }
155 }
156 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
157 Poll::Pending => Poll::Pending,
158 }
159 }
160}
161
162pub struct TokioFileOpener {
164 pub root: PathBuf,
168}
169
170impl TokioFileOpener {
171 pub fn new(root: impl Into<PathBuf>) -> Self {
175 Self { root: root.into() }
176 }
177}
178
179impl FileOpener for TokioFileOpener {
180 type File = File;
181 type Future = TokioFileFuture;
182
183 fn open(&self, path: &Path) -> Self::Future {
184 let mut full_path = self.root.clone();
185 full_path.extend(path);
186
187 let inner = spawn_blocking(move || {
190 let mut opts = OpenOptions::new();
191 opts.read(true);
192
193 #[cfg(windows)]
195 opts.custom_flags(FILE_FLAG_BACKUP_SEMANTICS);
196
197 let handle = opts.open(full_path)?;
198 let metadata = handle.metadata()?;
199 Ok(FileWithMetadata {
200 handle: File::from_std(handle),
201 size: metadata.len(),
202 modified: metadata.modified().ok(),
203 is_dir: metadata.is_dir(),
204 })
205 });
206
207 TokioFileFuture { inner }
208 }
209}
210
211pub struct TokioFileFuture {
215 inner: JoinHandle<Result<FileWithMetadata<File>, Error>>,
216}
217
218impl Future for TokioFileFuture {
219 type Output = Result<FileWithMetadata<File>, Error>;
220
221 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
222 match Pin::new(&mut self.inner).poll(cx) {
227 Poll::Ready(Ok(res)) => Poll::Ready(res),
228 Poll::Ready(Err(_)) => {
229 Poll::Ready(Err(Error::new(ErrorKind::Other, "background task failed")))
230 }
231 Poll::Pending => Poll::Pending,
232 }
233 }
234}
235
236type MemoryFileMap = HashMap<PathBuf, FileWithMetadata<Bytes>>;
241
242impl IntoFileAccess for Cursor<Bytes> {
243 type Output = Self;
244
245 fn into_file_access(self) -> Self::Output {
246 self
248 }
249}
250
251impl FileAccess for Cursor<Bytes> {
252 fn poll_read(
253 self: Pin<&mut Self>,
254 _cx: &mut Context<'_>,
255 len: usize,
256 ) -> Poll<Result<Bytes, Error>> {
257 let pos = self.position();
258 let slice = (*self).get_ref();
259
260 if pos > slice.len() as u64 {
262 return Poll::Ready(Ok(Bytes::new()));
263 }
264
265 let start = pos as usize;
266 let amt = min(slice.len() - start, len);
267 let end = start + amt;
269 Poll::Ready(Ok(slice.slice(start..end)))
270 }
271}
272
273pub struct MemoryFs {
277 files: MemoryFileMap,
278}
279
280impl Default for MemoryFs {
281 fn default() -> Self {
282 let mut files = MemoryFileMap::new();
283
284 files.insert(
286 PathBuf::new(),
287 FileWithMetadata {
288 handle: Bytes::new(),
289 size: 0,
290 modified: None,
291 is_dir: true,
292 },
293 );
294
295 Self { files }
296 }
297}
298
299impl MemoryFs {
300 pub async fn from_dir(path: impl AsRef<Path>) -> Result<Self, Error> {
304 let mut fs = Self::default();
305
306 let mut dirs = vec![(path.as_ref().to_path_buf(), PathBuf::new())];
308 while let Some((dir, base)) = dirs.pop() {
309 let mut iter = fs::read_dir(dir).await?;
310 while let Some(entry) = iter.next_entry().await? {
311 let metadata = entry.metadata().await?;
312
313 let mut out_path = base.to_path_buf();
315 out_path.push(entry.file_name());
316
317 if metadata.is_dir() {
318 dirs.push((entry.path(), out_path));
320 } else if metadata.is_file() {
321 let data = fs::read(entry.path()).await?;
323 fs.add(out_path, data.into(), metadata.modified().ok());
324 }
325 }
326 }
327
328 Ok(fs)
329 }
330
331 pub fn add(
336 &mut self,
337 path: impl Into<PathBuf>,
338 data: Bytes,
339 modified: Option<SystemTime>,
340 ) -> &mut Self {
341 let path = path.into();
342
343 let mut components: Vec<_> = path.components().collect();
345 components.pop();
346 let mut dir_path = PathBuf::new();
347 for component in components {
348 if let Component::Normal(x) = component {
349 dir_path.push(x);
350 self.files.insert(
351 dir_path.clone(),
352 FileWithMetadata {
353 handle: Bytes::new(),
354 size: 0,
355 modified: None,
356 is_dir: true,
357 },
358 );
359 }
360 }
361
362 let size = data.len() as u64;
364 self.files.insert(
365 path,
366 FileWithMetadata {
367 handle: data,
368 size,
369 modified,
370 is_dir: false,
371 },
372 );
373
374 self
375 }
376}
377
378impl FileOpener for MemoryFs {
379 type File = Cursor<Bytes>;
380 type Future = Ready<Result<FileWithMetadata<Self::File>, Error>>;
381
382 fn open(&self, path: &Path) -> Self::Future {
383 ready(
384 self.files
385 .get(path)
386 .map(|file| FileWithMetadata {
387 handle: Cursor::new(file.handle.clone()),
388 size: file.size,
389 modified: file.modified,
390 is_dir: file.is_dir,
391 })
392 .ok_or_else(|| Error::new(ErrorKind::NotFound, "Not found")),
393 )
394 }
395}