1#![allow(clippy::needless_lifetimes)]
2
3use async_trait::async_trait;
4use futures::{future, future::LocalBoxFuture, stream, FutureExt, StreamExt, TryStreamExt};
5use std::{
6 ffi::{OsStr, OsString}, fs, future::Future, io, path::{Path, PathBuf}, sync::Arc
7};
8use walkdir::WalkDir;
9
10#[cfg(unix)]
11use std::os::unix::fs::FileExt;
12#[cfg(windows)]
13use std::os::windows::fs::FileExt;
14#[cfg(target_arch = "wasm32")]
15use {
16 futures::lock::Mutex, js_sys::{ArrayBuffer, Uint8Array}, std::convert::TryFrom, wasm_bindgen::{JsCast, JsValue}, wasm_bindgen_futures::JsFuture, web_sys::{Blob, Response}
17};
18#[cfg(not(target_arch = "wasm32"))]
19use {
20 std::io::{Seek, SeekFrom}, tokio::task::spawn_blocking
21};
22
23use super::{Directory, File, Page, Partition};
24#[cfg(target_arch = "wasm32")]
25use crate::util::{f64_to_u64, u64_to_f64};
26use crate::util::{IoError, ResultExpand};
27
28#[async_trait(?Send)]
29impl<F> File for Vec<F>
30where
31 F: File,
32{
33 type Partition = F::Partition;
34 type Error = F::Error;
35
36 async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
37 stream::iter(self.into_iter())
38 .flat_map(|file| {
39 async { stream::iter(ResultExpand(file.partitions().await)) }.flatten_stream()
40 })
41 .try_collect()
42 .await
43 }
44}
45#[async_trait(?Send)]
46impl<F> File for &[F]
47where
48 F: File + Clone,
49{
50 type Partition = F::Partition;
51 type Error = F::Error;
52
53 async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
54 stream::iter(self.iter().cloned())
55 .flat_map(|file| {
56 async { stream::iter(ResultExpand(file.partitions().await)) }.flatten_stream()
57 })
58 .try_collect()
59 .await
60 }
61}
62#[async_trait(?Send)]
63impl File for PathBuf {
64 type Partition = Self;
65 type Error = IoError;
66
67 async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
68 Ok(vec![self])
69 }
70}
71#[async_trait(?Send)]
72impl Directory for PathBuf {
73 async fn partitions_filter<F>(
74 self, f: F,
75 ) -> Result<Vec<<Self as File>::Partition>, <Self as File>::Error>
76 where
77 F: FnMut(&super::PathBuf) -> bool,
78 {
79 (*self).partitions_filter(f).await
80 }
81}
82#[async_trait(?Send)]
83impl Partition for PathBuf {
84 type Page = LocalFile;
85 type Error = IoError;
86
87 async fn pages(self) -> Result<Vec<Self::Page>, Self::Error> {
88 Ok(vec![LocalFile::open(self)?])
89 }
90}
91#[async_trait(?Send)]
92impl Directory for &Path {
93 async fn partitions_filter<F>(
94 self, mut f: F,
95 ) -> Result<Vec<<Self as File>::Partition>, <Self as File>::Error>
96 where
97 F: FnMut(&super::PathBuf) -> bool,
98 {
99 WalkDir::new(self)
100 .follow_links(true)
101 .sort_by(|a, b| a.file_name().cmp(b.file_name()))
102 .into_iter()
103 .filter_entry(|e| {
104 let is_dir = e.file_type().is_dir();
105 let path = e.path();
106 if path == self {
107 return true;
108 }
109 let mut path = path.strip_prefix(self).unwrap();
110 let mut path_buf = super::PathBuf::new();
111 let mut file_name = None;
112 if !is_dir {
113 file_name = Some(path.file_name().unwrap());
114 path = path.parent().unwrap();
115 }
116 for component in path {
117 path_buf.push(component);
118 }
119 path_buf.set_file_name(file_name);
120 f(&path_buf)
121 })
122 .filter_map(|e| match e {
123 Ok(ref e) if e.file_type().is_dir() => None,
124 Ok(e) => Some(Ok(e.into_path())),
125 Err(e) => Some(Err(if e.io_error().is_some() {
126 e.into_io_error().unwrap()
127 } else {
128 io::Error::new(io::ErrorKind::Other, e)
129 }
130 .into())),
131 })
132 .collect()
133 }
134}
135#[async_trait(?Send)]
136impl File for &Path {
137 type Partition = PathBuf;
138 type Error = IoError;
139
140 async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
141 PathBuf::partitions(self.into()).await
142 }
143}
144#[async_trait(?Send)]
145impl File for String {
146 type Partition = PathBuf;
147 type Error = IoError;
148
149 async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
150 PathBuf::partitions(self.into()).await
151 }
152}
153#[async_trait(?Send)]
154impl File for &str {
155 type Partition = PathBuf;
156 type Error = IoError;
157
158 async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
159 PathBuf::partitions(self.into()).await
160 }
161}
162#[async_trait(?Send)]
163impl File for OsString {
164 type Partition = PathBuf;
165 type Error = IoError;
166
167 async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
168 PathBuf::partitions(self.into()).await
169 }
170}
171#[async_trait(?Send)]
172impl File for &OsStr {
173 type Partition = PathBuf;
174 type Error = IoError;
175
176 async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
177 PathBuf::partitions(self.into()).await
178 }
179}
180#[cfg(target_arch = "wasm32")]
201enum FutureOrOutput<Fut: Future> {
202 Future(Fut),
203 Output(Fut::Output),
204}
205#[cfg(target_arch = "wasm32")]
206impl<Fut: Future> FutureOrOutput<Fut> {
207 fn output(&mut self) -> Option<&mut Fut::Output> {
208 if let FutureOrOutput::Output(output) = self {
209 Some(output)
210 } else {
211 None
212 }
213 }
214}
215
216#[cfg(not(target_arch = "wasm32"))]
217struct LocalFileInner {
218 file: fs::File,
219}
220#[cfg(target_arch = "wasm32")]
221struct LocalFileInner {
222 file: Mutex<FutureOrOutput<LocalBoxFuture<'static, Blob>>>,
223}
224pub struct LocalFile {
225 inner: Arc<LocalFileInner>,
226}
227impl LocalFile {
228 pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
231 #[cfg(not(target_arch = "wasm32"))]
232 {
233 Self::from_file(fs::File::open(path)?)
234 }
235 #[cfg(target_arch = "wasm32")]
236 {
237 let path = path.as_ref().to_string_lossy().into_owned();
238 let file = Mutex::new(FutureOrOutput::Future(
239 async move {
240 let window = web_sys::window().unwrap();
241 let resp_value = JsFuture::from(window.fetch_with_str(&path)).await.unwrap();
242 let resp: Response = resp_value.dyn_into().unwrap();
243 let blob: JsValue = JsFuture::from(resp.blob().unwrap()).await.unwrap();
244 let blob: Blob = blob.dyn_into().unwrap();
245 blob
246 }
247 .boxed_local(),
248 ));
249 let inner = Arc::new(LocalFileInner { file });
250 Ok(Self { inner })
251 }
252 }
253
254 fn clone(&self) -> Self {
255 Self {
256 inner: self.inner.clone(),
257 }
258 }
259
260 #[cfg(not(target_arch = "wasm32"))]
261 fn len(&self) -> LocalBoxFuture<'static, Result<u64, <Self as Page>::Error>> {
262 let self_ = self.inner.clone();
263 future::lazy(move |_| (&self_.file).seek(SeekFrom::End(0)).map_err(Into::into))
264 .boxed_local()
265 }
266 #[cfg(target_arch = "wasm32")]
267 fn len(&self) -> LocalBoxFuture<'static, Result<u64, <Self as Page>::Error>> {
268 let self_ = self.inner.clone();
269 async move {
270 let mut file = self_.file.lock().await;
271 if let FutureOrOutput::Future(fut) = &mut *file {
272 *file = FutureOrOutput::Output(fut.await);
273 }
274 let blob = file.output().unwrap();
275 let size = blob.size();
276 Ok(f64_to_u64(size))
277 }
278 .boxed_local()
279 }
280
281 #[cfg(not(target_arch = "wasm32"))]
282 fn from_file(file: fs::File) -> io::Result<Self> {
283 let inner = Arc::new(LocalFileInner { file });
284 Ok(Self { inner })
285 }
286 #[cfg(target_arch = "wasm32")]
287 fn from_file(_file: fs::File) -> io::Result<Self> {
288 unimplemented!()
289 }
290}
291
292impl From<fs::File> for LocalFile {
293 fn from(file: fs::File) -> Self {
294 Self::from_file(file).unwrap()
295 }
296}
297
298#[cfg(unix)]
301impl LocalFile {
302 #[inline]
303 fn read_at<'a>(
304 &self, pos: u64, buf: &'a mut [u8],
305 ) -> impl Future<Output = io::Result<usize>> + 'a {
306 let self_ = self.inner.clone();
307 let len = buf.len();
308 spawn_blocking(move || {
309 let mut buf = vec![0; len];
310 FileExt::read_at(&self_.file, &mut buf, pos).map(|len| {
311 buf.truncate(len);
312 buf
313 })
314 })
315 .map(move |vec| {
316 vec.unwrap().map(move |vec| {
317 buf[..vec.len()].copy_from_slice(&vec);
318 vec.len()
319 })
320 })
321 }
322 #[inline]
323 fn write_at<'a>(
324 &self, pos: u64, buf: &'a [u8],
325 ) -> impl Future<Output = io::Result<usize>> + 'a {
326 let self_ = self.inner.clone();
327 let buf = buf.to_owned();
328 spawn_blocking(move || FileExt::write_at(&self_.file, &buf, pos)).map(Result::unwrap)
329 }
330}
331
332#[cfg(windows)]
333impl LocalFile {
334 #[inline]
335 fn read_at<'a>(
336 &self, pos: u64, buf: &'a mut [u8],
337 ) -> impl Future<Output = io::Result<usize>> + 'a {
338 let self_ = self.inner.clone();
339 let len = buf.len();
340 spawn_blocking(move || {
341 let mut buf = vec![0; len];
342 FileExt::seek_read(&self_.file, &mut buf, pos).map(|len| {
343 buf.truncate(len);
344 buf
345 })
346 })
347 .map(move |vec| {
348 vec.unwrap().map(move |vec| {
349 buf[..vec.len()].copy_from_slice(&vec);
350 vec.len()
351 })
352 })
353 }
354 #[inline]
355 fn write_at<'a>(
356 &self, pos: u64, buf: &'a [u8],
357 ) -> impl Future<Output = io::Result<usize>> + 'a {
358 let self_ = self.inner.clone();
359 let buf = buf.to_owned();
360 spawn_blocking(move || FileExt::seek_write(&self_.file, &buf, pos)).map(Result::unwrap)
361 }
362}
363
364#[cfg(target_arch = "wasm32")]
365impl LocalFile {
366 fn read_at<'a>(
367 &self, pos: u64, buf: &'a mut [u8],
368 ) -> impl Future<Output = io::Result<usize>> + 'a {
369 let self_ = self.inner.clone();
370 async move {
371 let mut file = self_.file.lock().await;
372 if let FutureOrOutput::Future(fut) = &mut *file {
373 *file = FutureOrOutput::Output(fut.await);
374 }
375 let blob = file.output().unwrap();
376 let end = pos + u64::try_from(buf.len()).unwrap();
377 let slice: Blob = blob
378 .slice_with_f64_and_f64(u64_to_f64(pos), u64_to_f64(end))
379 .unwrap();
380 drop(file);
381 let array_buffer = if false {
383 slice.array_buffer()
384 } else {
385 Response::new_with_opt_blob(Some(&slice))
387 .unwrap()
388 .array_buffer()
389 .unwrap()
390 };
391 drop(slice);
392 let array_buffer: JsValue = JsFuture::from(array_buffer).await.unwrap();
393 let array_buffer: ArrayBuffer = array_buffer.dyn_into().unwrap();
394 let buf_: Uint8Array = Uint8Array::new(&array_buffer);
395 drop(array_buffer);
396 let len = usize::try_from(buf_.length()).unwrap();
397 buf_.copy_to(&mut buf[..len]);
398 Ok(len)
399 }
400 .boxed_local()
401 }
402 #[inline]
403 fn write_at<'a>(
404 &self, _pos: u64, _buf: &'a [u8],
405 ) -> impl Future<Output = io::Result<usize>> + 'a {
406 let _self = self;
407 future::lazy(|_| unimplemented!()).boxed_local()
408 }
409}
410
411impl Page for LocalFile {
412 type Error = IoError;
413
414 fn len(&self) -> LocalBoxFuture<'static, Result<u64, Self::Error>> {
415 self.len()
416 }
417 fn read(
418 &self, mut offset: u64, len: usize,
419 ) -> LocalBoxFuture<'static, Result<Box<[u8]>, Self::Error>> {
420 let self_ = self.clone();
421 Box::pin(async move {
422 let mut buf_ = vec![0; len];
423 let mut buf = &mut *buf_;
424 while !buf.is_empty() {
425 match self_.read_at(offset, buf).await {
426 Ok(0) => break,
427 Ok(n) => {
428 let tmp = buf;
429 buf = &mut tmp[n..];
430 offset += n as u64;
431 }
432 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
433 Err(e) => return Err(e.into()),
434 }
435 }
436 let len = len - buf.len();
437 buf_.truncate(len);
438 Ok(buf_.into_boxed_slice())
439 })
440 }
441 fn write(
442 &self, mut offset: u64, buf: Box<[u8]>,
443 ) -> LocalBoxFuture<'static, Result<(), Self::Error>> {
444 let self_ = self.clone();
445 Box::pin(async move {
446 let mut buf = &*buf;
447 while !buf.is_empty() {
448 match self_.write_at(offset, buf).await {
449 Ok(0) => {
450 return Err(io::Error::new(
451 io::ErrorKind::WriteZero,
452 "failed to write whole buffer",
453 )
454 .into())
455 }
456 Ok(n) => {
457 buf = &buf[n..];
458 offset += n as u64
459 }
460 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
461 Err(e) => return Err(e.into()),
462 }
463 }
464 Ok(())
465 })
466 }
467}