1use std::fs as std_fs;
4use std::io;
5use std::io::SeekFrom;
6use std::path::Path;
7use std::path::PathBuf;
8use std::pin::Pin;
9use std::task::Context;
10use std::task::Poll;
11
12use futures::AsyncBufRead as FuturesBufRead;
13use futures::Stream;
14use futures::io::AsyncRead as FuturesAsyncRead;
15use futures::io::AsyncSeek as FuturesAsyncSeek;
16use futures::io::AsyncWrite as FuturesAsyncWrite;
17use pin_project::pin_project;
18use tokio::fs;
19use tokio::io::AsyncBufRead;
20use tokio::io::AsyncRead;
21use tokio::io::AsyncSeek;
22use tokio::io::AsyncWrite;
23use tokio::io::BufReader;
24use tokio::io::BufWriter;
25use tokio::io::ReadBuf;
26
27use crate::traits::async_vfs::CreateParentDirDefaultFuture;
28use crate::traits::async_vfs::IoErrorWrapperFuture;
29use crate::traits::async_vfs::VfsAsync;
30use crate::traits::async_vfs::WriteSupportingVfsAsync;
31use crate::traits::vfs::PathType;
32use crate::traits::vfs::VfsCore;
33
34pub struct TokioFsVfs;
36
37#[pin_project]
40pub struct TokioAsyncAdapter<T>(#[pin] T, Option<SeekFrom>);
41
42impl<R: AsyncRead> FuturesAsyncRead for TokioAsyncAdapter<R> {
43 fn poll_read(
44 self: Pin<&mut Self>,
45 cx: &mut Context<'_>,
46 buf: &mut [u8],
47 ) -> Poll<io::Result<usize>> {
48 let mut rbuf = ReadBuf::new(buf);
49 let this = self.project();
50 *this.1 = None;
51 match this.0.poll_read(cx, &mut rbuf) {
52 Poll::Ready(Ok(())) => Poll::Ready(Ok(rbuf.filled().len())),
53 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
54 Poll::Pending => Poll::Pending,
55 }
56 }
57}
58
59impl<R: AsyncBufRead> FuturesBufRead for TokioAsyncAdapter<R> {
60 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
61 let this = self.project();
62 *this.1 = None;
63 this.0.poll_fill_buf(cx)
64 }
65
66 fn consume(self: Pin<&mut Self>, amt: usize) {
67 let this = self.project();
68 *this.1 = None;
69 this.0.consume(amt)
70 }
71}
72
73impl<T: AsyncSeek> FuturesAsyncSeek for TokioAsyncAdapter<T> {
74 fn poll_seek(
75 self: Pin<&mut Self>,
76 cx: &mut Context<'_>,
77 pos: io::SeekFrom,
78 ) -> Poll<io::Result<u64>> {
79 let mut this = self.project();
80 if *this.1 != Some(pos) {
81 *this.1 = Some(pos);
82 match this.0.as_mut().start_seek(pos) {
83 Ok(()) => {}
84 Err(e) => {
85 *this.1 = None;
86 return Poll::Ready(Err(e));
87 }
88 }
89 }
90
91 match this.0.poll_complete(cx) {
92 Poll::Ready(Ok(v)) => {
93 *this.1 = None;
94 Poll::Ready(Ok(v))
95 }
96 Poll::Ready(Err(e)) => {
97 *this.1 = None;
98 Poll::Ready(Err(e))
99 }
100 Poll::Pending => Poll::Pending,
101 }
102 }
103}
104
105impl<T: AsyncWrite> FuturesAsyncWrite for TokioAsyncAdapter<T> {
106 fn poll_write(
107 self: Pin<&mut Self>,
108 cx: &mut Context<'_>,
109 buf: &[u8],
110 ) -> Poll<io::Result<usize>> {
111 let this = self.project();
112 *this.1 = None;
113 this.0.poll_write(cx, buf)
114 }
115
116 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
117 let this = self.project();
118 *this.1 = None;
119 this.0.poll_flush(cx)
120 }
121
122 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
123 let this = self.project();
124 *this.1 = None;
125 this.0.poll_shutdown(cx)
126 }
127}
128
129impl VfsCore for TokioFsVfs {
130 type Path = Path;
131}
132
133impl VfsAsync for TokioFsVfs {
134 type RFile = TokioAsyncAdapter<BufReader<fs::File>>;
135 type OpenReadFuture = IoErrorWrapperFuture<
136 Self::RFile,
137 Pin<Box<dyn Future<Output = io::Result<Self::RFile>> + Send>>,
138 <<Self as VfsCore>::Path as PathType>::OwnedPath,
139 >;
140
141 fn open_read(self: Pin<&Self>, path: PathBuf) -> Self::OpenReadFuture {
142 IoErrorWrapperFuture::new(
143 path.clone(),
144 Box::pin(async move {
145 fs::File::open(path)
146 .await
147 .map(|f| TokioAsyncAdapter(BufReader::new(f), None))
148 }),
149 )
150 }
151
152 type ReadFuture<'a>
153 = IoErrorWrapperFuture<
154 Vec<u8>,
155 Pin<Box<dyn Future<Output = io::Result<Vec<u8>>> + Send + 'a>>,
156 <<Self as VfsCore>::Path as PathType>::OwnedPath,
157 >
158 where
159 Self: 'a;
160
161 fn read<'a>(self: Pin<&'a Self>, path: PathBuf) -> Self::ReadFuture<'a> {
162 IoErrorWrapperFuture::new(path.clone(), Box::pin(fs::read(path)))
163 }
164
165 type ReadStringFuture<'a>
166 = IoErrorWrapperFuture<
167 String,
168 Pin<Box<dyn Future<Output = io::Result<String>> + Send + 'a>>,
169 <<Self as VfsCore>::Path as PathType>::OwnedPath,
170 >
171 where
172 Self: 'a;
173
174 fn read_string<'a>(
175 self: Pin<&'a Self>,
176 path: <<Self as VfsCore>::Path as PathType>::OwnedPath,
177 ) -> Self::ReadStringFuture<'a> {
178 IoErrorWrapperFuture::new(path.clone(), Box::pin(fs::read_to_string(path)))
179 }
180
181 type ExistsFuture<'a>
182 = IoErrorWrapperFuture<
183 bool,
184 Pin<Box<dyn Future<Output = io::Result<bool>> + Send + 'a>>,
185 <<Self as VfsCore>::Path as PathType>::OwnedPath,
186 >
187 where
188 Self: 'a;
189
190 fn exists<'a>(
191 self: Pin<&'a Self>,
192 path: <<Self as VfsCore>::Path as PathType>::OwnedPath,
193 ) -> Self::ExistsFuture<'a> {
194 IoErrorWrapperFuture::new(path.clone(), Box::pin(fs::try_exists(path)))
195 }
196
197 type IsDirFuture<'a>
198 = IoErrorWrapperFuture<
199 bool,
200 Pin<Box<dyn Future<Output = io::Result<bool>> + Send + 'a>>,
201 <<Self as VfsCore>::Path as PathType>::OwnedPath,
202 >
203 where
204 Self: 'a;
205
206 fn is_dir<'a>(
207 self: Pin<&'a Self>,
208 path: <<Self as VfsCore>::Path as PathType>::OwnedPath,
209 ) -> Self::IsDirFuture<'a> {
210 IoErrorWrapperFuture::new(
211 path.clone(),
212 Box::pin(async move { fs::metadata(path).await.map(|m| m.is_dir()) }),
213 )
214 }
215
216 type DirWalk<'a>
217 = imp::DirWalker
218 where
219 Self: 'a;
220
221 type DirWalkFuture<'a>
222 = IoErrorWrapperFuture<
223 Self::DirWalk<'a>,
224 Pin<Box<dyn Future<Output = io::Result<Self::DirWalk<'a>>> + Send + 'a>>,
225 <<Self as VfsCore>::Path as PathType>::OwnedPath,
226 >
227 where
228 Self: 'a;
229
230 fn walk_dir<'a>(self: Pin<&'a Self>, path: PathBuf) -> Self::DirWalkFuture<'a> {
231 IoErrorWrapperFuture::new(
232 path.clone(),
233 Box::pin(async move {
234 fs::read_dir(path.clone())
235 .await
236 .map(|inner| imp::DirWalker {
237 inner,
238 path,
239 current_kind_future: None,
240 })
241 }),
242 )
243 }
244}
245
246impl WriteSupportingVfsAsync for TokioFsVfs {
247 type WFile = TokioAsyncAdapter<BufWriter<fs::File>>;
248 type OpenWriteFuture = IoErrorWrapperFuture<
249 Self::WFile,
250 Pin<Box<dyn Future<Output = io::Result<Self::WFile>> + Send>>,
251 <<Self as VfsCore>::Path as PathType>::OwnedPath,
252 >;
253 fn open_write(
254 self: Pin<&Self>,
255 path: <<Self as VfsCore>::Path as PathType>::OwnedPath,
256 ) -> Self::OpenWriteFuture {
257 IoErrorWrapperFuture::new(
258 path.clone(),
259 Box::pin(async move {
260 fs::File::create(path)
261 .await
262 .map(|f| TokioAsyncAdapter(BufWriter::new(f), None))
263 }),
264 )
265 }
266
267 type WriteFuture<'a>
268 = IoErrorWrapperFuture<
269 (),
270 Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>,
271 <<Self as VfsCore>::Path as PathType>::OwnedPath,
272 >
273 where
274 Self: 'a;
275
276 fn write<'d, 'a: 'd>(
277 self: Pin<&'a Self>,
278 path: <<Self as VfsCore>::Path as PathType>::OwnedPath,
279 data: &'d [u8],
280 ) -> Self::WriteFuture<'d> {
281 IoErrorWrapperFuture::new(path.clone(), Box::pin(fs::write(path, data)))
282 }
283
284 type RemoveDirAllFuture<'a>
285 = IoErrorWrapperFuture<
286 (),
287 Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>,
288 <<Self as VfsCore>::Path as PathType>::OwnedPath,
289 >
290 where
291 Self: 'a;
292
293 fn remove_dir_all<'a>(self: Pin<&'a Self>, path: PathBuf) -> Self::RemoveDirAllFuture<'a> {
294 IoErrorWrapperFuture::new(path.clone(), Box::pin(fs::remove_dir_all(path)))
295 }
296
297 type CreateDirFuture<'a>
298 = IoErrorWrapperFuture<
299 (),
300 Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>,
301 <<Self as VfsCore>::Path as PathType>::OwnedPath,
302 >
303 where
304 Self: 'a;
305
306 fn create_dir<'a>(self: Pin<&'a Self>, path: PathBuf) -> Self::CreateDirFuture<'a> {
307 IoErrorWrapperFuture::new(path.clone(), Box::pin(fs::create_dir(path)))
308 }
309
310 type CreateDirAllFuture<'a>
311 = IoErrorWrapperFuture<
312 (),
313 Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>,
314 <<Self as VfsCore>::Path as PathType>::OwnedPath,
315 >
316 where
317 Self: 'a;
318
319 fn create_dir_all<'a>(self: Pin<&'a Self>, path: PathBuf) -> Self::CreateDirAllFuture<'a> {
320 IoErrorWrapperFuture::new(path.clone(), Box::pin(fs::create_dir_all(path)))
321 }
322
323 type CreateParentDirFuture<'a>
324 = CreateParentDirDefaultFuture<'a, Self>
325 where
326 Self: 'a;
327
328 fn create_parent_dir<'a>(
329 self: Pin<&'a Self>,
330 path: PathBuf,
331 ) -> Self::CreateParentDirFuture<'a> {
332 let parent = path
333 .parent()
334 .map_or_else(|| path.join(".."), |p| p.to_path_buf());
335 CreateParentDirDefaultFuture::Start {
336 vfs: self,
337 path: parent,
338 }
339 }
340}
341
342mod imp {
343 use std::ffi::OsString;
344 use std::path::Path;
345 use std::task::Context;
346
347 use futures::FutureExt;
348 #[cfg(feature = "image")]
349 use tokio::task;
350
351 use super::*;
352 use crate::error::Error;
353 use crate::error::Result;
354 #[cfg(feature = "image")]
355 use crate::image::ImgFormat;
356 #[cfg(feature = "image")]
357 use crate::prelude::*;
358 #[cfg(feature = "image")]
359 use crate::traits::async_vfs::ReadImageFromAsync;
360 #[cfg(feature = "image")]
361 use crate::traits::async_vfs::WriteImageToAsync;
362 #[cfg(feature = "image")]
363 use crate::traits::async_vfs::WriteImageToAsyncRef;
364 use crate::traits::vfs::DirEntryInfo;
365 use crate::traits::vfs::DirEntryKind;
366 #[cfg(feature = "image")]
367 use crate::traits::vfs::WriteSupportingVfs as _;
368 #[cfg(feature = "image")]
369 use crate::vfs::fs_vfs::FsVfs;
370
371 pub struct DirWalker {
373 pub(super) inner: fs::ReadDir,
374 pub(super) path: PathBuf,
375
376 pub(super) current_kind_future: Option<(
377 OsString,
378 PathBuf,
379 Pin<Box<dyn Future<Output = io::Result<std_fs::FileType>> + Send>>,
380 )>,
381 }
382
383 impl Stream for DirWalker {
384 type Item = Result<DirEntryInfo<Path>, PathBuf>;
385
386 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
387 if let Some((name, path, fut)) = self.current_kind_future.as_mut() {
388 match fut.poll_unpin(cx) {
389 Poll::Ready(Ok(kind)) => {
390 let name = name.clone();
391 let path = path.clone();
392 self.current_kind_future = None;
393 let kind = if kind.is_dir() {
394 DirEntryKind::Directory
395 } else {
396 DirEntryKind::File
397 };
398 return Poll::Ready(Some(Ok(DirEntryInfo { name, path, kind })));
399 }
400 Poll::Ready(Err(e)) => {
401 let path = self.path.clone();
402 self.current_kind_future = None;
403 return Poll::Ready(Some(Err(Error::Io(path.clone(), e))));
404 }
405 Poll::Pending => return Poll::Pending,
406 }
407 }
408 match self.inner.poll_next_entry(cx) {
409 Poll::Ready(Ok(Some(v))) => {
410 let name = v.file_name();
411 let path = v.path();
412 let fut = Box::pin(async move { v.file_type().await });
413 self.current_kind_future = Some((name, path, fut));
414 cx.waker().wake_by_ref();
415 Poll::Pending
416 }
417 Poll::Ready(Ok(None)) => Poll::Ready(None),
418 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(Error::Io(self.path.clone(), e)))),
419 Poll::Pending => Poll::Pending,
420 }
421 }
422 }
423
424 #[cfg(feature = "image")]
425 #[cfg_attr(docsrs, doc(cfg(feature = "image")))]
426 impl<'vfs, T: ImgFormat> ReadImageFromAsync<T> for TokioFsVfs
427 where
428 T: Send + 'static,
429 {
430 type ReadImageFuture<'a>
431 = Pin<Box<dyn Future<Output = Result<T, PathBuf>> + Send + 'a>>
432 where
433 Self: 'a;
434
435 fn read_image_async<'a>(
436 self: Pin<&'a Self>,
437 path: <Self::Path as PathType>::OwnedPath,
438 ) -> Self::ReadImageFuture<'a> {
439 debug_assert!(
440 T::FORMAT.reading_enabled(),
441 "Image format {:?} does not support reading; enable the corresponding feature",
442 T::FORMAT
443 );
444 let std_vfs = Pin::new(&FsVfs);
445 Box::pin(async move {
446 let p_clone = path.clone();
447 match task::spawn_blocking(move || {
448 let mut img_reader =
449 image::ImageReader::new(io::BufReader::new(std_vfs.open_read(&p_clone)?));
450 img_reader.set_format(T::FORMAT);
451 let img = img_reader
452 .decode()
453 .map_err(|e| Error::Parse(p_clone.clone(), Box::new(e)))?;
454 Ok(T::from_image(img))
455 })
456 .await
457 {
458 Ok(res) => res,
459 Err(e) => Err(Error::Parse(path, Box::new(e))),
460 }
461 })
462 }
463 }
464
465 #[cfg(feature = "image")]
466 #[cfg_attr(docsrs, doc(cfg(feature = "image")))]
467 impl<'vfs> WriteImageToAsync<'vfs> for TokioFsVfs {
468 type WriteImageFuture = Pin<Box<dyn Future<Output = Result<(), PathBuf>> + Send + 'vfs>>;
469
470 fn write_image_async(
471 self: Pin<&'vfs Self>,
472 path: <Self::Path as PathType>::OwnedPath,
473 image: image::DynamicImage,
474 format: image::ImageFormat,
475 ) -> Self::WriteImageFuture {
476 Box::pin(async move {
477 use crate::traits::async_vfs::WriteSupportingVfsAsync;
478
479 self.create_parent_dir(path.clone()).await?;
480
481 let p_clone = path.clone();
482
483 let std_vfs = Pin::new(&FsVfs);
484 match task::spawn_blocking(move || {
485 let mut f = std_vfs.open_write(&p_clone)?;
486 image
487 .write_to(&mut f, format)
488 .map_err(|e| Error::Write(p_clone, Box::new(e)))
489 })
490 .await
491 {
492 Ok(res) => res,
493 Err(e) => Err(Error::Write(path, Box::new(e))),
494 }
495 })
496 }
497 }
498
499 #[cfg(feature = "image")]
500 #[cfg_attr(docsrs, doc(cfg(feature = "image")))]
501 impl<'vfs> WriteImageToAsyncRef<'vfs> for TokioFsVfs
502 where
503 'vfs: 'vfs,
504 {
505 type WriteImageRefFuture = Pin<Box<dyn Future<Output = Result<(), PathBuf>> + Send + 'vfs>>;
506
507 fn write_image_async_ref(
508 self: Pin<&'vfs Self>,
509 path: <Self::Path as PathType>::OwnedPath,
510 image: &'vfs image::DynamicImage,
511 format: image::ImageFormat,
512 ) -> Self::WriteImageRefFuture {
513 let img = image.clone();
514 (img, format).write_to_async(path, self)
515 }
516 }
517}
518
519#[cfg(feature = "tools-atomic-dir")]
520mod atomic_dir_imp {
521 use tokio::fs;
524
525 use super::*;
526 use crate::atomic_dir::TempDirApiAsync;
527 use crate::atomic_dir::VfsSupportsTemporaryDirectoriesAsync;
528 use crate::error::Error;
529 use crate::error::VfsResult;
530 use crate::vfs::fs_vfs as std_fs_vfs;
531
532 pub struct TempDir(PathBuf);
534
535 impl<'vfs> TempDirApiAsync<'vfs> for TempDir {
536 type Vfs = TokioFsVfs;
537
538 fn path(&self) -> &Path {
539 &self.0
540 }
541
542 type FuturePersistAt =
543 Pin<Box<dyn Future<Output = VfsResult<(), Self::Vfs>> + Send + 'vfs>>;
544
545 type FutureDelete = <TokioFsVfs as WriteSupportingVfsAsync>::RemoveDirAllFuture<'vfs>;
546
547 fn persist_at(
548 self,
549 vfs: Pin<&'vfs Self::Vfs>,
550 path: <<Self::Vfs as VfsCore>::Path as PathType>::OwnedPath,
551 ) -> Self::FuturePersistAt {
552 Box::pin(async move {
553 vfs.create_parent_dir(path.clone()).await?;
554 fs::rename(&self.0, &path)
555 .await
556 .map_err(|e| Error::Io(path.clone(), e))?;
557 Ok(())
558 })
559 }
560
561 fn delete(self, vfs: Pin<&'vfs Self::Vfs>) -> Self::FutureDelete {
562 vfs.remove_dir_all(self.0.clone())
563 }
564 }
565
566 impl<'vfs> VfsSupportsTemporaryDirectoriesAsync<'vfs> for TokioFsVfs {
567 type TemporaryDirectory = TempDir;
568
569 type TemporaryDirectoryFuture =
570 Pin<Box<dyn Future<Output = VfsResult<TempDir, Self>> + Send + 'vfs>>;
571
572 fn create_temporary_directory(self: Pin<&'vfs Self>) -> Self::TemporaryDirectoryFuture {
573 let temp_dir = std_fs_vfs::atomic_dir_imp::make_new_temp_dir_path();
574 let path = temp_dir.clone();
575 Box::pin(async move {
576 if self.exists(path.clone()).await? {
577 self.remove_dir_all(path.clone()).await?;
578 }
579 self.create_dir(path.clone()).await?;
580 Ok(TempDir(temp_dir))
581 })
582 }
583 }
584}