amadeus_core/file/
local.rs

1#![allow(clippy::needless_lifetimes)]
2
3use async_trait::async_trait;
4use futures::{future, future::LocalBoxFuture, stream, FutureExt, StreamExt, TryStreamExt};
5use std::{
6	ffi::{OsStr, OsString}, fs, future::Future, io, path::{Path, PathBuf}, sync::Arc
7};
8use walkdir::WalkDir;
9
10#[cfg(unix)]
11use std::os::unix::fs::FileExt;
12#[cfg(windows)]
13use std::os::windows::fs::FileExt;
14#[cfg(target_arch = "wasm32")]
15use {
16	futures::lock::Mutex, js_sys::{ArrayBuffer, Uint8Array}, std::convert::TryFrom, wasm_bindgen::{JsCast, JsValue}, wasm_bindgen_futures::JsFuture, web_sys::{Blob, Response}
17};
18#[cfg(not(target_arch = "wasm32"))]
19use {
20	std::io::{Seek, SeekFrom}, tokio::task::spawn_blocking
21};
22
23use super::{Directory, File, Page, Partition};
24#[cfg(target_arch = "wasm32")]
25use crate::util::{f64_to_u64, u64_to_f64};
26use crate::util::{IoError, ResultExpand};
27
28#[async_trait(?Send)]
29impl<F> File for Vec<F>
30where
31	F: File,
32{
33	type Partition = F::Partition;
34	type Error = F::Error;
35
36	async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
37		stream::iter(self.into_iter())
38			.flat_map(|file| {
39				async { stream::iter(ResultExpand(file.partitions().await)) }.flatten_stream()
40			})
41			.try_collect()
42			.await
43	}
44}
45#[async_trait(?Send)]
46impl<F> File for &[F]
47where
48	F: File + Clone,
49{
50	type Partition = F::Partition;
51	type Error = F::Error;
52
53	async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
54		stream::iter(self.iter().cloned())
55			.flat_map(|file| {
56				async { stream::iter(ResultExpand(file.partitions().await)) }.flatten_stream()
57			})
58			.try_collect()
59			.await
60	}
61}
62#[async_trait(?Send)]
63impl File for PathBuf {
64	type Partition = Self;
65	type Error = IoError;
66
67	async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
68		Ok(vec![self])
69	}
70}
71#[async_trait(?Send)]
72impl Directory for PathBuf {
73	async fn partitions_filter<F>(
74		self, f: F,
75	) -> Result<Vec<<Self as File>::Partition>, <Self as File>::Error>
76	where
77		F: FnMut(&super::PathBuf) -> bool,
78	{
79		(*self).partitions_filter(f).await
80	}
81}
82#[async_trait(?Send)]
83impl Partition for PathBuf {
84	type Page = LocalFile;
85	type Error = IoError;
86
87	async fn pages(self) -> Result<Vec<Self::Page>, Self::Error> {
88		Ok(vec![LocalFile::open(self)?])
89	}
90}
91#[async_trait(?Send)]
92impl Directory for &Path {
93	async fn partitions_filter<F>(
94		self, mut f: F,
95	) -> Result<Vec<<Self as File>::Partition>, <Self as File>::Error>
96	where
97		F: FnMut(&super::PathBuf) -> bool,
98	{
99		WalkDir::new(self)
100			.follow_links(true)
101			.sort_by(|a, b| a.file_name().cmp(b.file_name()))
102			.into_iter()
103			.filter_entry(|e| {
104				let is_dir = e.file_type().is_dir();
105				let path = e.path();
106				if path == self {
107					return true;
108				}
109				let mut path = path.strip_prefix(self).unwrap();
110				let mut path_buf = super::PathBuf::new();
111				let mut file_name = None;
112				if !is_dir {
113					file_name = Some(path.file_name().unwrap());
114					path = path.parent().unwrap();
115				}
116				for component in path {
117					path_buf.push(component);
118				}
119				path_buf.set_file_name(file_name);
120				f(&path_buf)
121			})
122			.filter_map(|e| match e {
123				Ok(ref e) if e.file_type().is_dir() => None,
124				Ok(e) => Some(Ok(e.into_path())),
125				Err(e) => Some(Err(if e.io_error().is_some() {
126					e.into_io_error().unwrap()
127				} else {
128					io::Error::new(io::ErrorKind::Other, e)
129				}
130				.into())),
131			})
132			.collect()
133	}
134}
135#[async_trait(?Send)]
136impl File for &Path {
137	type Partition = PathBuf;
138	type Error = IoError;
139
140	async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
141		PathBuf::partitions(self.into()).await
142	}
143}
144#[async_trait(?Send)]
145impl File for String {
146	type Partition = PathBuf;
147	type Error = IoError;
148
149	async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
150		PathBuf::partitions(self.into()).await
151	}
152}
153#[async_trait(?Send)]
154impl File for &str {
155	type Partition = PathBuf;
156	type Error = IoError;
157
158	async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
159		PathBuf::partitions(self.into()).await
160	}
161}
162#[async_trait(?Send)]
163impl File for OsString {
164	type Partition = PathBuf;
165	type Error = IoError;
166
167	async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
168		PathBuf::partitions(self.into()).await
169	}
170}
171#[async_trait(?Send)]
172impl File for &OsStr {
173	type Partition = PathBuf;
174	type Error = IoError;
175
176	async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
177		PathBuf::partitions(self.into()).await
178	}
179}
180// impl File for fs::File {
181// 	type Partition = Self;
182// 	type Error = IoError;
183
184// 	fn partitions(self) -> Result<Vec<Self::Partition>,Self::Error> {
185// 		Ok(vec![self])
186// 	}
187// }
188// impl Partition for fs::File {
189// 	type Page = LocalFile;
190// 	type Error = IoError;
191
192// 	fn pages(self) -> Result<Vec<Self::Page>,Self::Error> {
193// 		Ok(vec![LocalFile::from_file(self)?])
194// 	}
195// }
196
197// To support converting back to fs::File:
198// https://github.com/vasi/positioned-io/blob/a03c792f5b6f99cb4f72e146befdfc8b1f6e1d28/src/raf.rs
199
200#[cfg(target_arch = "wasm32")]
201enum FutureOrOutput<Fut: Future> {
202	Future(Fut),
203	Output(Fut::Output),
204}
205#[cfg(target_arch = "wasm32")]
206impl<Fut: Future> FutureOrOutput<Fut> {
207	fn output(&mut self) -> Option<&mut Fut::Output> {
208		if let FutureOrOutput::Output(output) = self {
209			Some(output)
210		} else {
211			None
212		}
213	}
214}
215
216#[cfg(not(target_arch = "wasm32"))]
217struct LocalFileInner {
218	file: fs::File,
219}
220#[cfg(target_arch = "wasm32")]
221struct LocalFileInner {
222	file: Mutex<FutureOrOutput<LocalBoxFuture<'static, Blob>>>,
223}
224pub struct LocalFile {
225	inner: Arc<LocalFileInner>,
226}
227impl LocalFile {
228	/// [Opens](https://doc.rust-lang.org/std/fs/struct.File.html#method.open)
229	/// a file for random access.
230	pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
231		#[cfg(not(target_arch = "wasm32"))]
232		{
233			Self::from_file(fs::File::open(path)?)
234		}
235		#[cfg(target_arch = "wasm32")]
236		{
237			let path = path.as_ref().to_string_lossy().into_owned();
238			let file = Mutex::new(FutureOrOutput::Future(
239				async move {
240					let window = web_sys::window().unwrap();
241					let resp_value = JsFuture::from(window.fetch_with_str(&path)).await.unwrap();
242					let resp: Response = resp_value.dyn_into().unwrap();
243					let blob: JsValue = JsFuture::from(resp.blob().unwrap()).await.unwrap();
244					let blob: Blob = blob.dyn_into().unwrap();
245					blob
246				}
247				.boxed_local(),
248			));
249			let inner = Arc::new(LocalFileInner { file });
250			Ok(Self { inner })
251		}
252	}
253
254	fn clone(&self) -> Self {
255		Self {
256			inner: self.inner.clone(),
257		}
258	}
259
260	#[cfg(not(target_arch = "wasm32"))]
261	fn len(&self) -> LocalBoxFuture<'static, Result<u64, <Self as Page>::Error>> {
262		let self_ = self.inner.clone();
263		future::lazy(move |_| (&self_.file).seek(SeekFrom::End(0)).map_err(Into::into))
264			.boxed_local()
265	}
266	#[cfg(target_arch = "wasm32")]
267	fn len(&self) -> LocalBoxFuture<'static, Result<u64, <Self as Page>::Error>> {
268		let self_ = self.inner.clone();
269		async move {
270			let mut file = self_.file.lock().await;
271			if let FutureOrOutput::Future(fut) = &mut *file {
272				*file = FutureOrOutput::Output(fut.await);
273			}
274			let blob = file.output().unwrap();
275			let size = blob.size();
276			Ok(f64_to_u64(size))
277		}
278		.boxed_local()
279	}
280
281	#[cfg(not(target_arch = "wasm32"))]
282	fn from_file(file: fs::File) -> io::Result<Self> {
283		let inner = Arc::new(LocalFileInner { file });
284		Ok(Self { inner })
285	}
286	#[cfg(target_arch = "wasm32")]
287	fn from_file(_file: fs::File) -> io::Result<Self> {
288		unimplemented!()
289	}
290}
291
292impl From<fs::File> for LocalFile {
293	fn from(file: fs::File) -> Self {
294		Self::from_file(file).unwrap()
295	}
296}
297
298// read_at/write_at for tokio::File https://github.com/tokio-rs/tokio/issues/1529
299
300#[cfg(unix)]
301impl LocalFile {
302	#[inline]
303	fn read_at<'a>(
304		&self, pos: u64, buf: &'a mut [u8],
305	) -> impl Future<Output = io::Result<usize>> + 'a {
306		let self_ = self.inner.clone();
307		let len = buf.len();
308		spawn_blocking(move || {
309			let mut buf = vec![0; len];
310			FileExt::read_at(&self_.file, &mut buf, pos).map(|len| {
311				buf.truncate(len);
312				buf
313			})
314		})
315		.map(move |vec| {
316			vec.unwrap().map(move |vec| {
317				buf[..vec.len()].copy_from_slice(&vec);
318				vec.len()
319			})
320		})
321	}
322	#[inline]
323	fn write_at<'a>(
324		&self, pos: u64, buf: &'a [u8],
325	) -> impl Future<Output = io::Result<usize>> + 'a {
326		let self_ = self.inner.clone();
327		let buf = buf.to_owned();
328		spawn_blocking(move || FileExt::write_at(&self_.file, &buf, pos)).map(Result::unwrap)
329	}
330}
331
332#[cfg(windows)]
333impl LocalFile {
334	#[inline]
335	fn read_at<'a>(
336		&self, pos: u64, buf: &'a mut [u8],
337	) -> impl Future<Output = io::Result<usize>> + 'a {
338		let self_ = self.inner.clone();
339		let len = buf.len();
340		spawn_blocking(move || {
341			let mut buf = vec![0; len];
342			FileExt::seek_read(&self_.file, &mut buf, pos).map(|len| {
343				buf.truncate(len);
344				buf
345			})
346		})
347		.map(move |vec| {
348			vec.unwrap().map(move |vec| {
349				buf[..vec.len()].copy_from_slice(&vec);
350				vec.len()
351			})
352		})
353	}
354	#[inline]
355	fn write_at<'a>(
356		&self, pos: u64, buf: &'a [u8],
357	) -> impl Future<Output = io::Result<usize>> + 'a {
358		let self_ = self.inner.clone();
359		let buf = buf.to_owned();
360		spawn_blocking(move || FileExt::seek_write(&self_.file, &buf, pos)).map(Result::unwrap)
361	}
362}
363
364#[cfg(target_arch = "wasm32")]
365impl LocalFile {
366	fn read_at<'a>(
367		&self, pos: u64, buf: &'a mut [u8],
368	) -> impl Future<Output = io::Result<usize>> + 'a {
369		let self_ = self.inner.clone();
370		async move {
371			let mut file = self_.file.lock().await;
372			if let FutureOrOutput::Future(fut) = &mut *file {
373				*file = FutureOrOutput::Output(fut.await);
374			}
375			let blob = file.output().unwrap();
376			let end = pos + u64::try_from(buf.len()).unwrap();
377			let slice: Blob = blob
378				.slice_with_f64_and_f64(u64_to_f64(pos), u64_to_f64(end))
379				.unwrap();
380			drop(file);
381			// TODO: only do workaround when necessary
382			let array_buffer = if false {
383				slice.array_buffer()
384			} else {
385				// workaround for lack of Blob::array_buffer
386				Response::new_with_opt_blob(Some(&slice))
387					.unwrap()
388					.array_buffer()
389					.unwrap()
390			};
391			drop(slice);
392			let array_buffer: JsValue = JsFuture::from(array_buffer).await.unwrap();
393			let array_buffer: ArrayBuffer = array_buffer.dyn_into().unwrap();
394			let buf_: Uint8Array = Uint8Array::new(&array_buffer);
395			drop(array_buffer);
396			let len = usize::try_from(buf_.length()).unwrap();
397			buf_.copy_to(&mut buf[..len]);
398			Ok(len)
399		}
400		.boxed_local()
401	}
402	#[inline]
403	fn write_at<'a>(
404		&self, _pos: u64, _buf: &'a [u8],
405	) -> impl Future<Output = io::Result<usize>> + 'a {
406		let _self = self;
407		future::lazy(|_| unimplemented!()).boxed_local()
408	}
409}
410
411impl Page for LocalFile {
412	type Error = IoError;
413
414	fn len(&self) -> LocalBoxFuture<'static, Result<u64, Self::Error>> {
415		self.len()
416	}
417	fn read(
418		&self, mut offset: u64, len: usize,
419	) -> LocalBoxFuture<'static, Result<Box<[u8]>, Self::Error>> {
420		let self_ = self.clone();
421		Box::pin(async move {
422			let mut buf_ = vec![0; len];
423			let mut buf = &mut *buf_;
424			while !buf.is_empty() {
425				match self_.read_at(offset, buf).await {
426					Ok(0) => break,
427					Ok(n) => {
428						let tmp = buf;
429						buf = &mut tmp[n..];
430						offset += n as u64;
431					}
432					Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
433					Err(e) => return Err(e.into()),
434				}
435			}
436			let len = len - buf.len();
437			buf_.truncate(len);
438			Ok(buf_.into_boxed_slice())
439		})
440	}
441	fn write(
442		&self, mut offset: u64, buf: Box<[u8]>,
443	) -> LocalBoxFuture<'static, Result<(), Self::Error>> {
444		let self_ = self.clone();
445		Box::pin(async move {
446			let mut buf = &*buf;
447			while !buf.is_empty() {
448				match self_.write_at(offset, buf).await {
449					Ok(0) => {
450						return Err(io::Error::new(
451							io::ErrorKind::WriteZero,
452							"failed to write whole buffer",
453						)
454						.into())
455					}
456					Ok(n) => {
457						buf = &buf[n..];
458						offset += n as u64
459					}
460					Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
461					Err(e) => return Err(e.into()),
462				}
463			}
464			Ok(())
465		})
466	}
467}