dir_structure/vfs/
tokio_fs_vfs.rs

1//! Tokio file system virtual file system implementation.
2
3use std::fs as std_fs;
4use std::io;
5use std::io::SeekFrom;
6use std::path::Path;
7use std::path::PathBuf;
8use std::pin::Pin;
9use std::task::Context;
10use std::task::Poll;
11
12use futures::AsyncBufRead as FuturesBufRead;
13use futures::Stream;
14use futures::io::AsyncRead as FuturesAsyncRead;
15use futures::io::AsyncSeek as FuturesAsyncSeek;
16use futures::io::AsyncWrite as FuturesAsyncWrite;
17use pin_project::pin_project;
18use tokio::fs;
19use tokio::io::AsyncBufRead;
20use tokio::io::AsyncRead;
21use tokio::io::AsyncSeek;
22use tokio::io::AsyncWrite;
23use tokio::io::BufReader;
24use tokio::io::BufWriter;
25use tokio::io::ReadBuf;
26
27use crate::traits::async_vfs::CreateParentDirDefaultFuture;
28use crate::traits::async_vfs::IoErrorWrapperFuture;
29use crate::traits::async_vfs::VfsAsync;
30use crate::traits::async_vfs::WriteSupportingVfsAsync;
31use crate::traits::vfs::PathType;
32use crate::traits::vfs::VfsCore;
33
34/// A [`VfsAsync`] and [`WriteSupportingVfsAsync`] implementation using [`tokio::fs`].
35pub struct TokioFsVfs;
36
37/// Adapter to convert a type implementing [`tokio::io`] traits to one implementing
38/// [`futures::io`] traits.
39#[pin_project]
40pub struct TokioAsyncAdapter<T>(#[pin] T, Option<SeekFrom>);
41
42impl<R: AsyncRead> FuturesAsyncRead for TokioAsyncAdapter<R> {
43    fn poll_read(
44        self: Pin<&mut Self>,
45        cx: &mut Context<'_>,
46        buf: &mut [u8],
47    ) -> Poll<io::Result<usize>> {
48        let mut rbuf = ReadBuf::new(buf);
49        let this = self.project();
50        *this.1 = None;
51        match this.0.poll_read(cx, &mut rbuf) {
52            Poll::Ready(Ok(())) => Poll::Ready(Ok(rbuf.filled().len())),
53            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
54            Poll::Pending => Poll::Pending,
55        }
56    }
57}
58
59impl<R: AsyncBufRead> FuturesBufRead for TokioAsyncAdapter<R> {
60    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
61        let this = self.project();
62        *this.1 = None;
63        this.0.poll_fill_buf(cx)
64    }
65
66    fn consume(self: Pin<&mut Self>, amt: usize) {
67        let this = self.project();
68        *this.1 = None;
69        this.0.consume(amt)
70    }
71}
72
73impl<T: AsyncSeek> FuturesAsyncSeek for TokioAsyncAdapter<T> {
74    fn poll_seek(
75        self: Pin<&mut Self>,
76        cx: &mut Context<'_>,
77        pos: io::SeekFrom,
78    ) -> Poll<io::Result<u64>> {
79        let mut this = self.project();
80        if *this.1 != Some(pos) {
81            *this.1 = Some(pos);
82            match this.0.as_mut().start_seek(pos) {
83                Ok(()) => {}
84                Err(e) => {
85                    *this.1 = None;
86                    return Poll::Ready(Err(e));
87                }
88            }
89        }
90
91        match this.0.poll_complete(cx) {
92            Poll::Ready(Ok(v)) => {
93                *this.1 = None;
94                Poll::Ready(Ok(v))
95            }
96            Poll::Ready(Err(e)) => {
97                *this.1 = None;
98                Poll::Ready(Err(e))
99            }
100            Poll::Pending => Poll::Pending,
101        }
102    }
103}
104
105impl<T: AsyncWrite> FuturesAsyncWrite for TokioAsyncAdapter<T> {
106    fn poll_write(
107        self: Pin<&mut Self>,
108        cx: &mut Context<'_>,
109        buf: &[u8],
110    ) -> Poll<io::Result<usize>> {
111        let this = self.project();
112        *this.1 = None;
113        this.0.poll_write(cx, buf)
114    }
115
116    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
117        let this = self.project();
118        *this.1 = None;
119        this.0.poll_flush(cx)
120    }
121
122    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
123        let this = self.project();
124        *this.1 = None;
125        this.0.poll_shutdown(cx)
126    }
127}
128
129impl VfsCore for TokioFsVfs {
130    type Path = Path;
131}
132
133impl VfsAsync for TokioFsVfs {
134    type RFile = TokioAsyncAdapter<BufReader<fs::File>>;
135    type OpenReadFuture = IoErrorWrapperFuture<
136        Self::RFile,
137        Pin<Box<dyn Future<Output = io::Result<Self::RFile>> + Send>>,
138        <<Self as VfsCore>::Path as PathType>::OwnedPath,
139    >;
140
141    fn open_read(self: Pin<&Self>, path: PathBuf) -> Self::OpenReadFuture {
142        IoErrorWrapperFuture::new(
143            path.clone(),
144            Box::pin(async move {
145                fs::File::open(path)
146                    .await
147                    .map(|f| TokioAsyncAdapter(BufReader::new(f), None))
148            }),
149        )
150    }
151
152    type ReadFuture<'a>
153        = IoErrorWrapperFuture<
154        Vec<u8>,
155        Pin<Box<dyn Future<Output = io::Result<Vec<u8>>> + Send + 'a>>,
156        <<Self as VfsCore>::Path as PathType>::OwnedPath,
157    >
158    where
159        Self: 'a;
160
161    fn read<'a>(self: Pin<&'a Self>, path: PathBuf) -> Self::ReadFuture<'a> {
162        IoErrorWrapperFuture::new(path.clone(), Box::pin(fs::read(path)))
163    }
164
165    type ReadStringFuture<'a>
166        = IoErrorWrapperFuture<
167        String,
168        Pin<Box<dyn Future<Output = io::Result<String>> + Send + 'a>>,
169        <<Self as VfsCore>::Path as PathType>::OwnedPath,
170    >
171    where
172        Self: 'a;
173
174    fn read_string<'a>(
175        self: Pin<&'a Self>,
176        path: <<Self as VfsCore>::Path as PathType>::OwnedPath,
177    ) -> Self::ReadStringFuture<'a> {
178        IoErrorWrapperFuture::new(path.clone(), Box::pin(fs::read_to_string(path)))
179    }
180
181    type ExistsFuture<'a>
182        = IoErrorWrapperFuture<
183        bool,
184        Pin<Box<dyn Future<Output = io::Result<bool>> + Send + 'a>>,
185        <<Self as VfsCore>::Path as PathType>::OwnedPath,
186    >
187    where
188        Self: 'a;
189
190    fn exists<'a>(
191        self: Pin<&'a Self>,
192        path: <<Self as VfsCore>::Path as PathType>::OwnedPath,
193    ) -> Self::ExistsFuture<'a> {
194        IoErrorWrapperFuture::new(path.clone(), Box::pin(fs::try_exists(path)))
195    }
196
197    type IsDirFuture<'a>
198        = IoErrorWrapperFuture<
199        bool,
200        Pin<Box<dyn Future<Output = io::Result<bool>> + Send + 'a>>,
201        <<Self as VfsCore>::Path as PathType>::OwnedPath,
202    >
203    where
204        Self: 'a;
205
206    fn is_dir<'a>(
207        self: Pin<&'a Self>,
208        path: <<Self as VfsCore>::Path as PathType>::OwnedPath,
209    ) -> Self::IsDirFuture<'a> {
210        IoErrorWrapperFuture::new(
211            path.clone(),
212            Box::pin(async move { fs::metadata(path).await.map(|m| m.is_dir()) }),
213        )
214    }
215
216    type DirWalk<'a>
217        = imp::DirWalker
218    where
219        Self: 'a;
220
221    type DirWalkFuture<'a>
222        = IoErrorWrapperFuture<
223        Self::DirWalk<'a>,
224        Pin<Box<dyn Future<Output = io::Result<Self::DirWalk<'a>>> + Send + 'a>>,
225        <<Self as VfsCore>::Path as PathType>::OwnedPath,
226    >
227    where
228        Self: 'a;
229
230    fn walk_dir<'a>(self: Pin<&'a Self>, path: PathBuf) -> Self::DirWalkFuture<'a> {
231        IoErrorWrapperFuture::new(
232            path.clone(),
233            Box::pin(async move {
234                fs::read_dir(path.clone())
235                    .await
236                    .map(|inner| imp::DirWalker {
237                        inner,
238                        path,
239                        current_kind_future: None,
240                    })
241            }),
242        )
243    }
244}
245
246impl WriteSupportingVfsAsync for TokioFsVfs {
247    type WFile = TokioAsyncAdapter<BufWriter<fs::File>>;
248    type OpenWriteFuture = IoErrorWrapperFuture<
249        Self::WFile,
250        Pin<Box<dyn Future<Output = io::Result<Self::WFile>> + Send>>,
251        <<Self as VfsCore>::Path as PathType>::OwnedPath,
252    >;
253    fn open_write(
254        self: Pin<&Self>,
255        path: <<Self as VfsCore>::Path as PathType>::OwnedPath,
256    ) -> Self::OpenWriteFuture {
257        IoErrorWrapperFuture::new(
258            path.clone(),
259            Box::pin(async move {
260                fs::File::create(path)
261                    .await
262                    .map(|f| TokioAsyncAdapter(BufWriter::new(f), None))
263            }),
264        )
265    }
266
267    type WriteFuture<'a>
268        = IoErrorWrapperFuture<
269        (),
270        Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>,
271        <<Self as VfsCore>::Path as PathType>::OwnedPath,
272    >
273    where
274        Self: 'a;
275
276    fn write<'d, 'a: 'd>(
277        self: Pin<&'a Self>,
278        path: <<Self as VfsCore>::Path as PathType>::OwnedPath,
279        data: &'d [u8],
280    ) -> Self::WriteFuture<'d> {
281        IoErrorWrapperFuture::new(path.clone(), Box::pin(fs::write(path, data)))
282    }
283
284    type RemoveDirAllFuture<'a>
285        = IoErrorWrapperFuture<
286        (),
287        Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>,
288        <<Self as VfsCore>::Path as PathType>::OwnedPath,
289    >
290    where
291        Self: 'a;
292
293    fn remove_dir_all<'a>(self: Pin<&'a Self>, path: PathBuf) -> Self::RemoveDirAllFuture<'a> {
294        IoErrorWrapperFuture::new(path.clone(), Box::pin(fs::remove_dir_all(path)))
295    }
296
297    type CreateDirFuture<'a>
298        = IoErrorWrapperFuture<
299        (),
300        Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>,
301        <<Self as VfsCore>::Path as PathType>::OwnedPath,
302    >
303    where
304        Self: 'a;
305
306    fn create_dir<'a>(self: Pin<&'a Self>, path: PathBuf) -> Self::CreateDirFuture<'a> {
307        IoErrorWrapperFuture::new(path.clone(), Box::pin(fs::create_dir(path)))
308    }
309
310    type CreateDirAllFuture<'a>
311        = IoErrorWrapperFuture<
312        (),
313        Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>,
314        <<Self as VfsCore>::Path as PathType>::OwnedPath,
315    >
316    where
317        Self: 'a;
318
319    fn create_dir_all<'a>(self: Pin<&'a Self>, path: PathBuf) -> Self::CreateDirAllFuture<'a> {
320        IoErrorWrapperFuture::new(path.clone(), Box::pin(fs::create_dir_all(path)))
321    }
322
323    type CreateParentDirFuture<'a>
324        = CreateParentDirDefaultFuture<'a, Self>
325    where
326        Self: 'a;
327
328    fn create_parent_dir<'a>(
329        self: Pin<&'a Self>,
330        path: PathBuf,
331    ) -> Self::CreateParentDirFuture<'a> {
332        let parent = path
333            .parent()
334            .map_or_else(|| path.join(".."), |p| p.to_path_buf());
335        CreateParentDirDefaultFuture::Start {
336            vfs: self,
337            path: parent,
338        }
339    }
340}
341
342mod imp {
343    use std::ffi::OsString;
344    use std::path::Path;
345    use std::task::Context;
346
347    use futures::FutureExt;
348    #[cfg(feature = "image")]
349    use tokio::task;
350
351    use super::*;
352    use crate::error::Error;
353    use crate::error::Result;
354    #[cfg(feature = "image")]
355    use crate::image::ImgFormat;
356    #[cfg(feature = "image")]
357    use crate::prelude::*;
358    #[cfg(feature = "image")]
359    use crate::traits::async_vfs::ReadImageFromAsync;
360    #[cfg(feature = "image")]
361    use crate::traits::async_vfs::WriteImageToAsync;
362    #[cfg(feature = "image")]
363    use crate::traits::async_vfs::WriteImageToAsyncRef;
364    use crate::traits::vfs::DirEntryInfo;
365    use crate::traits::vfs::DirEntryKind;
366    #[cfg(feature = "image")]
367    use crate::traits::vfs::WriteSupportingVfs as _;
368    #[cfg(feature = "image")]
369    use crate::vfs::fs_vfs::FsVfs;
370
371    /// Directory walker for asynchronous file system operations on [`tokio::fs`].
372    pub struct DirWalker {
373        pub(super) inner: fs::ReadDir,
374        pub(super) path: PathBuf,
375
376        pub(super) current_kind_future: Option<(
377            OsString,
378            PathBuf,
379            Pin<Box<dyn Future<Output = io::Result<std_fs::FileType>> + Send>>,
380        )>,
381    }
382
383    impl Stream for DirWalker {
384        type Item = Result<DirEntryInfo<Path>, PathBuf>;
385
386        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
387            if let Some((name, path, fut)) = self.current_kind_future.as_mut() {
388                match fut.poll_unpin(cx) {
389                    Poll::Ready(Ok(kind)) => {
390                        let name = name.clone();
391                        let path = path.clone();
392                        self.current_kind_future = None;
393                        let kind = if kind.is_dir() {
394                            DirEntryKind::Directory
395                        } else {
396                            DirEntryKind::File
397                        };
398                        return Poll::Ready(Some(Ok(DirEntryInfo { name, path, kind })));
399                    }
400                    Poll::Ready(Err(e)) => {
401                        let path = self.path.clone();
402                        self.current_kind_future = None;
403                        return Poll::Ready(Some(Err(Error::Io(path.clone(), e))));
404                    }
405                    Poll::Pending => return Poll::Pending,
406                }
407            }
408            match self.inner.poll_next_entry(cx) {
409                Poll::Ready(Ok(Some(v))) => {
410                    let name = v.file_name();
411                    let path = v.path();
412                    let fut = Box::pin(async move { v.file_type().await });
413                    self.current_kind_future = Some((name, path, fut));
414                    cx.waker().wake_by_ref();
415                    Poll::Pending
416                }
417                Poll::Ready(Ok(None)) => Poll::Ready(None),
418                Poll::Ready(Err(e)) => Poll::Ready(Some(Err(Error::Io(self.path.clone(), e)))),
419                Poll::Pending => Poll::Pending,
420            }
421        }
422    }
423
424    #[cfg(feature = "image")]
425    #[cfg_attr(docsrs, doc(cfg(feature = "image")))]
426    impl<'vfs, T: ImgFormat> ReadImageFromAsync<T> for TokioFsVfs
427    where
428        T: Send + 'static,
429    {
430        type ReadImageFuture<'a>
431            = Pin<Box<dyn Future<Output = Result<T, PathBuf>> + Send + 'a>>
432        where
433            Self: 'a;
434
435        fn read_image_async<'a>(
436            self: Pin<&'a Self>,
437            path: <Self::Path as PathType>::OwnedPath,
438        ) -> Self::ReadImageFuture<'a> {
439            debug_assert!(
440                T::FORMAT.reading_enabled(),
441                "Image format {:?} does not support reading; enable the corresponding feature",
442                T::FORMAT
443            );
444            let std_vfs = Pin::new(&FsVfs);
445            Box::pin(async move {
446                let p_clone = path.clone();
447                match task::spawn_blocking(move || {
448                    let mut img_reader =
449                        image::ImageReader::new(io::BufReader::new(std_vfs.open_read(&p_clone)?));
450                    img_reader.set_format(T::FORMAT);
451                    let img = img_reader
452                        .decode()
453                        .map_err(|e| Error::Parse(p_clone.clone(), Box::new(e)))?;
454                    Ok(T::from_image(img))
455                })
456                .await
457                {
458                    Ok(res) => res,
459                    Err(e) => Err(Error::Parse(path, Box::new(e))),
460                }
461            })
462        }
463    }
464
465    #[cfg(feature = "image")]
466    #[cfg_attr(docsrs, doc(cfg(feature = "image")))]
467    impl<'vfs> WriteImageToAsync<'vfs> for TokioFsVfs {
468        type WriteImageFuture = Pin<Box<dyn Future<Output = Result<(), PathBuf>> + Send + 'vfs>>;
469
470        fn write_image_async(
471            self: Pin<&'vfs Self>,
472            path: <Self::Path as PathType>::OwnedPath,
473            image: image::DynamicImage,
474            format: image::ImageFormat,
475        ) -> Self::WriteImageFuture {
476            Box::pin(async move {
477                use crate::traits::async_vfs::WriteSupportingVfsAsync;
478
479                self.create_parent_dir(path.clone()).await?;
480
481                let p_clone = path.clone();
482
483                let std_vfs = Pin::new(&FsVfs);
484                match task::spawn_blocking(move || {
485                    let mut f = std_vfs.open_write(&p_clone)?;
486                    image
487                        .write_to(&mut f, format)
488                        .map_err(|e| Error::Write(p_clone, Box::new(e)))
489                })
490                .await
491                {
492                    Ok(res) => res,
493                    Err(e) => Err(Error::Write(path, Box::new(e))),
494                }
495            })
496        }
497    }
498
499    #[cfg(feature = "image")]
500    #[cfg_attr(docsrs, doc(cfg(feature = "image")))]
501    impl<'vfs> WriteImageToAsyncRef<'vfs> for TokioFsVfs
502    where
503        'vfs: 'vfs,
504    {
505        type WriteImageRefFuture = Pin<Box<dyn Future<Output = Result<(), PathBuf>> + Send + 'vfs>>;
506
507        fn write_image_async_ref(
508            self: Pin<&'vfs Self>,
509            path: <Self::Path as PathType>::OwnedPath,
510            image: &'vfs image::DynamicImage,
511            format: image::ImageFormat,
512        ) -> Self::WriteImageRefFuture {
513            let img = image.clone();
514            (img, format).write_to_async(path, self)
515        }
516    }
517}
518
519#[cfg(feature = "tools-atomic-dir")]
520mod atomic_dir_imp {
521    //! The [`VfsSupportsTemporaryDirectoriesAsync`] implementation for the [`TokioFsVfs`] file system.
522
523    use tokio::fs;
524
525    use super::*;
526    use crate::atomic_dir::TempDirApiAsync;
527    use crate::atomic_dir::VfsSupportsTemporaryDirectoriesAsync;
528    use crate::error::Error;
529    use crate::error::VfsResult;
530    use crate::vfs::fs_vfs as std_fs_vfs;
531
532    /// A temporary directory in the real file system.
533    pub struct TempDir(PathBuf);
534
535    impl<'vfs> TempDirApiAsync<'vfs> for TempDir {
536        type Vfs = TokioFsVfs;
537
538        fn path(&self) -> &Path {
539            &self.0
540        }
541
542        type FuturePersistAt =
543            Pin<Box<dyn Future<Output = VfsResult<(), Self::Vfs>> + Send + 'vfs>>;
544
545        type FutureDelete = <TokioFsVfs as WriteSupportingVfsAsync>::RemoveDirAllFuture<'vfs>;
546
547        fn persist_at(
548            self,
549            vfs: Pin<&'vfs Self::Vfs>,
550            path: <<Self::Vfs as VfsCore>::Path as PathType>::OwnedPath,
551        ) -> Self::FuturePersistAt {
552            Box::pin(async move {
553                vfs.create_parent_dir(path.clone()).await?;
554                fs::rename(&self.0, &path)
555                    .await
556                    .map_err(|e| Error::Io(path.clone(), e))?;
557                Ok(())
558            })
559        }
560
561        fn delete(self, vfs: Pin<&'vfs Self::Vfs>) -> Self::FutureDelete {
562            vfs.remove_dir_all(self.0.clone())
563        }
564    }
565
566    impl<'vfs> VfsSupportsTemporaryDirectoriesAsync<'vfs> for TokioFsVfs {
567        type TemporaryDirectory = TempDir;
568
569        type TemporaryDirectoryFuture =
570            Pin<Box<dyn Future<Output = VfsResult<TempDir, Self>> + Send + 'vfs>>;
571
572        fn create_temporary_directory(self: Pin<&'vfs Self>) -> Self::TemporaryDirectoryFuture {
573            let temp_dir = std_fs_vfs::atomic_dir_imp::make_new_temp_dir_path();
574            let path = temp_dir.clone();
575            Box::pin(async move {
576                if self.exists(path.clone()).await? {
577                    self.remove_dir_all(path.clone()).await?;
578                }
579                self.create_dir(path.clone()).await?;
580                Ok(TempDir(temp_dir))
581            })
582        }
583    }
584}