1use std::fs as std_fs;
4use std::io;
5use std::io::SeekFrom;
6use std::path::Path;
7use std::path::PathBuf;
8use std::pin::Pin;
9use std::task::Context;
10use std::task::Poll;
11
12use futures::AsyncBufRead as FuturesBufRead;
13use futures::Stream;
14use futures::io::AsyncRead as FuturesAsyncRead;
15use futures::io::AsyncSeek as FuturesAsyncSeek;
16use futures::io::AsyncWrite as FuturesAsyncWrite;
17use pin_project::pin_project;
18use tokio::fs;
19use tokio::io::AsyncBufRead;
20use tokio::io::AsyncRead;
21use tokio::io::AsyncSeek;
22use tokio::io::AsyncWrite;
23use tokio::io::BufReader;
24use tokio::io::BufWriter;
25use tokio::io::ReadBuf;
26
27use crate::traits::async_vfs::CreateParentDirDefaultFuture;
28use crate::traits::async_vfs::IoErrorWrapperFuture;
29use crate::traits::async_vfs::VfsAsync;
30use crate::traits::async_vfs::WriteSupportingVfsAsync;
31use crate::traits::vfs::PathType;
32use crate::traits::vfs::VfsCore;
33
34pub struct TokioFsVfs;
36
37#[pin_project]
40pub struct TokioAsyncAdapter<T>(#[pin] T, Option<SeekFrom>);
41
42impl<R: AsyncRead> FuturesAsyncRead for TokioAsyncAdapter<R> {
43 fn poll_read(
44 self: Pin<&mut Self>,
45 cx: &mut Context<'_>,
46 buf: &mut [u8],
47 ) -> Poll<io::Result<usize>> {
48 let mut rbuf = ReadBuf::new(buf);
49 let this = self.project();
50 *this.1 = None;
51 match this.0.poll_read(cx, &mut rbuf) {
52 Poll::Ready(Ok(())) => Poll::Ready(Ok(rbuf.filled().len())),
53 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
54 Poll::Pending => Poll::Pending,
55 }
56 }
57}
58
59impl<R: AsyncBufRead> FuturesBufRead for TokioAsyncAdapter<R> {
60 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
61 let this = self.project();
62 *this.1 = None;
63 this.0.poll_fill_buf(cx)
64 }
65
66 fn consume(self: Pin<&mut Self>, amt: usize) {
67 let this = self.project();
68 *this.1 = None;
69 this.0.consume(amt)
70 }
71}
72
73impl<T: AsyncSeek> FuturesAsyncSeek for TokioAsyncAdapter<T> {
74 fn poll_seek(
75 self: Pin<&mut Self>,
76 cx: &mut Context<'_>,
77 pos: io::SeekFrom,
78 ) -> Poll<io::Result<u64>> {
79 let mut this = self.project();
80 if *this.1 != Some(pos) {
81 *this.1 = Some(pos);
82 match this.0.as_mut().start_seek(pos) {
83 Ok(()) => {}
84 Err(e) => {
85 *this.1 = None;
86 return Poll::Ready(Err(e));
87 }
88 }
89 }
90
91 match this.0.poll_complete(cx) {
92 Poll::Ready(Ok(v)) => {
93 *this.1 = None;
94 Poll::Ready(Ok(v))
95 }
96 Poll::Ready(Err(e)) => {
97 *this.1 = None;
98 Poll::Ready(Err(e))
99 }
100 Poll::Pending => Poll::Pending,
101 }
102 }
103}
104
105impl<T: AsyncWrite> FuturesAsyncWrite for TokioAsyncAdapter<T> {
106 fn poll_write(
107 self: Pin<&mut Self>,
108 cx: &mut Context<'_>,
109 buf: &[u8],
110 ) -> Poll<io::Result<usize>> {
111 let this = self.project();
112 *this.1 = None;
113 this.0.poll_write(cx, buf)
114 }
115
116 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
117 let this = self.project();
118 *this.1 = None;
119 this.0.poll_flush(cx)
120 }
121
122 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
123 let this = self.project();
124 *this.1 = None;
125 this.0.poll_shutdown(cx)
126 }
127}
128
129impl VfsCore for TokioFsVfs {
130 type Path = Path;
131}
132
133impl VfsAsync for TokioFsVfs {
134 type RFile = TokioAsyncAdapter<BufReader<fs::File>>;
135 type OpenReadFuture = IoErrorWrapperFuture<
136 Self::RFile,
137 Pin<Box<dyn Future<Output = io::Result<Self::RFile>> + Send>>,
138 <<Self as VfsCore>::Path as PathType>::OwnedPath,
139 >;
140
141 fn open_read(self: Pin<&Self>, path: PathBuf) -> Self::OpenReadFuture {
142 IoErrorWrapperFuture::new(
143 path.clone(),
144 Box::pin(async move {
145 fs::File::open(path)
146 .await
147 .map(|f| TokioAsyncAdapter(BufReader::new(f), None))
148 }),
149 )
150 }
151
152 type ReadFuture<'a>
153 = IoErrorWrapperFuture<
154 Vec<u8>,
155 Pin<Box<dyn Future<Output = io::Result<Vec<u8>>> + Send + 'a>>,
156 <<Self as VfsCore>::Path as PathType>::OwnedPath,
157 >
158 where
159 Self: 'a;
160
161 fn read<'a>(self: Pin<&'a Self>, path: PathBuf) -> Self::ReadFuture<'a> {
162 IoErrorWrapperFuture::new(path.clone(), Box::pin(fs::read(path)))
163 }
164
165 type ReadStringFuture<'a>
166 = IoErrorWrapperFuture<
167 String,
168 Pin<Box<dyn Future<Output = io::Result<String>> + Send + 'a>>,
169 <<Self as VfsCore>::Path as PathType>::OwnedPath,
170 >
171 where
172 Self: 'a;
173
174 fn read_string<'a>(
175 self: Pin<&'a Self>,
176 path: <<Self as VfsCore>::Path as PathType>::OwnedPath,
177 ) -> Self::ReadStringFuture<'a> {
178 IoErrorWrapperFuture::new(path.clone(), Box::pin(fs::read_to_string(path)))
179 }
180
181 type ExistsFuture<'a>
182 = IoErrorWrapperFuture<
183 bool,
184 Pin<Box<dyn Future<Output = io::Result<bool>> + Send + 'a>>,
185 <<Self as VfsCore>::Path as PathType>::OwnedPath,
186 >
187 where
188 Self: 'a;
189
190 fn exists<'a>(
191 self: Pin<&'a Self>,
192 path: <<Self as VfsCore>::Path as PathType>::OwnedPath,
193 ) -> Self::ExistsFuture<'a> {
194 IoErrorWrapperFuture::new(path.clone(), Box::pin(fs::try_exists(path)))
195 }
196
197 type IsDirFuture<'a>
198 = IoErrorWrapperFuture<
199 bool,
200 Pin<Box<dyn Future<Output = io::Result<bool>> + Send + 'a>>,
201 <<Self as VfsCore>::Path as PathType>::OwnedPath,
202 >
203 where
204 Self: 'a;
205
206 fn is_dir<'a>(
207 self: Pin<&'a Self>,
208 path: <<Self as VfsCore>::Path as PathType>::OwnedPath,
209 ) -> Self::IsDirFuture<'a> {
210 IoErrorWrapperFuture::new(
211 path.clone(),
212 Box::pin(async move { fs::metadata(path).await.map(|m| m.is_dir()) }),
213 )
214 }
215
216 type DirWalk<'a>
217 = imp::DirWalker
218 where
219 Self: 'a;
220
221 type DirWalkFuture<'a>
222 = IoErrorWrapperFuture<
223 Self::DirWalk<'a>,
224 Pin<Box<dyn Future<Output = io::Result<Self::DirWalk<'a>>> + Send + 'a>>,
225 <<Self as VfsCore>::Path as PathType>::OwnedPath,
226 >
227 where
228 Self: 'a;
229
230 fn walk_dir<'a>(self: Pin<&'a Self>, path: PathBuf) -> Self::DirWalkFuture<'a> {
231 IoErrorWrapperFuture::new(
232 path.clone(),
233 Box::pin(async move {
234 fs::read_dir(path.clone())
235 .await
236 .map(|inner| imp::DirWalker {
237 inner,
238 path,
239 current_kind_future: None,
240 })
241 }),
242 )
243 }
244}
245
246impl WriteSupportingVfsAsync for TokioFsVfs {
247 type WFile = TokioAsyncAdapter<BufWriter<fs::File>>;
248 type OpenWriteFuture = IoErrorWrapperFuture<
249 Self::WFile,
250 Pin<Box<dyn Future<Output = io::Result<Self::WFile>> + Send>>,
251 <<Self as VfsCore>::Path as PathType>::OwnedPath,
252 >;
253 fn open_write(
254 self: Pin<&Self>,
255 path: <<Self as VfsCore>::Path as PathType>::OwnedPath,
256 ) -> Self::OpenWriteFuture {
257 IoErrorWrapperFuture::new(
258 path.clone(),
259 Box::pin(async move {
260 fs::File::create(path)
261 .await
262 .map(|f| TokioAsyncAdapter(BufWriter::new(f), None))
263 }),
264 )
265 }
266
267 type WriteFuture<'a>
268 = IoErrorWrapperFuture<
269 (),
270 Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>,
271 <<Self as VfsCore>::Path as PathType>::OwnedPath,
272 >
273 where
274 Self: 'a;
275
276 fn write<'d, 'a: 'd>(
277 self: Pin<&'a Self>,
278 path: <<Self as VfsCore>::Path as PathType>::OwnedPath,
279 data: &'d [u8],
280 ) -> Self::WriteFuture<'d> {
281 IoErrorWrapperFuture::new(path.clone(), Box::pin(fs::write(path, data)))
282 }
283
284 type RemoveDirAllFuture<'a>
285 = IoErrorWrapperFuture<
286 (),
287 Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>,
288 <<Self as VfsCore>::Path as PathType>::OwnedPath,
289 >
290 where
291 Self: 'a;
292
293 fn remove_dir_all<'a>(self: Pin<&'a Self>, path: PathBuf) -> Self::RemoveDirAllFuture<'a> {
294 IoErrorWrapperFuture::new(path.clone(), Box::pin(fs::remove_dir_all(path)))
295 }
296
297 type CreateDirFuture<'a>
298 = IoErrorWrapperFuture<
299 (),
300 Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>,
301 <<Self as VfsCore>::Path as PathType>::OwnedPath,
302 >
303 where
304 Self: 'a;
305
306 fn create_dir<'a>(self: Pin<&'a Self>, path: PathBuf) -> Self::CreateDirFuture<'a> {
307 IoErrorWrapperFuture::new(path.clone(), Box::pin(fs::create_dir(path)))
308 }
309
310 type CreateDirAllFuture<'a>
311 = IoErrorWrapperFuture<
312 (),
313 Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>,
314 <<Self as VfsCore>::Path as PathType>::OwnedPath,
315 >
316 where
317 Self: 'a;
318
319 fn create_dir_all<'a>(self: Pin<&'a Self>, path: PathBuf) -> Self::CreateDirAllFuture<'a> {
320 IoErrorWrapperFuture::new(path.clone(), Box::pin(fs::create_dir_all(path)))
321 }
322
323 type CreateParentDirFuture<'a>
324 = CreateParentDirDefaultFuture<'a, Self>
325 where
326 Self: 'a;
327
328 fn create_parent_dir<'a>(
329 self: Pin<&'a Self>,
330 path: PathBuf,
331 ) -> Self::CreateParentDirFuture<'a> {
332 let parent = path
333 .parent()
334 .map_or_else(|| path.join(".."), |p| p.to_path_buf());
335 CreateParentDirDefaultFuture::Start {
336 vfs: self,
337 path: parent,
338 }
339 }
340}
341
342mod imp {
343 use std::ffi::OsString;
344 use std::path::Path;
345 use std::task::Context;
346
347 use futures::FutureExt;
348
349 use super::*;
350 use crate::error::Error;
351 use crate::error::Result;
352 use crate::traits::vfs::DirEntryInfo;
353 use crate::traits::vfs::DirEntryKind;
354
355 pub struct DirWalker {
357 pub(super) inner: fs::ReadDir,
358 pub(super) path: PathBuf,
359
360 pub(super) current_kind_future: Option<(
361 OsString,
362 PathBuf,
363 Pin<Box<dyn Future<Output = io::Result<std_fs::FileType>> + Send>>,
364 )>,
365 }
366
367 impl Stream for DirWalker {
368 type Item = Result<DirEntryInfo<Path>, PathBuf>;
369
370 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
371 if let Some((name, path, fut)) = self.current_kind_future.as_mut() {
372 match fut.poll_unpin(cx) {
373 Poll::Ready(Ok(kind)) => {
374 let name = name.clone();
375 let path = path.clone();
376 self.current_kind_future = None;
377 let kind = if kind.is_dir() {
378 DirEntryKind::Directory
379 } else {
380 DirEntryKind::File
381 };
382 return Poll::Ready(Some(Ok(DirEntryInfo { name, path, kind })));
383 }
384 Poll::Ready(Err(e)) => {
385 let path = self.path.clone();
386 self.current_kind_future = None;
387 return Poll::Ready(Some(Err(Error::Io(path.clone(), e))));
388 }
389 Poll::Pending => return Poll::Pending,
390 }
391 }
392 match self.inner.poll_next_entry(cx) {
393 Poll::Ready(Ok(Some(v))) => {
394 let name = v.file_name();
395 let path = v.path();
396 let fut = Box::pin(async move { v.file_type().await });
397 self.current_kind_future = Some((name, path, fut));
398 cx.waker().wake_by_ref();
399 Poll::Pending
400 }
401 Poll::Ready(Ok(None)) => Poll::Ready(None),
402 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(Error::Io(self.path.clone(), e)))),
403 Poll::Pending => Poll::Pending,
404 }
405 }
406 }
407}