#![allow(clippy::needless_lifetimes)]
use async_trait::async_trait;
use futures::{future, future::LocalBoxFuture, stream, FutureExt, StreamExt, TryStreamExt};
use std::{
ffi::{OsStr, OsString}, fs, future::Future, io, path::{Path, PathBuf}, sync::Arc
};
use walkdir::WalkDir;
#[cfg(unix)]
use std::os::unix::fs::FileExt;
#[cfg(windows)]
use std::os::windows::fs::FileExt;
#[cfg(target_arch = "wasm32")]
use {
futures::lock::Mutex, js_sys::{ArrayBuffer, Uint8Array}, std::convert::TryFrom, wasm_bindgen::{JsCast, JsValue}, wasm_bindgen_futures::JsFuture, web_sys::{Blob, Response}
};
#[cfg(not(target_arch = "wasm32"))]
use {
std::io::{Seek, SeekFrom}, tokio::task::spawn_blocking
};
use super::{Directory, File, Page, Partition};
#[cfg(target_arch = "wasm32")]
use crate::util::{f64_to_u64, u64_to_f64};
use crate::util::{IoError, ResultExpand};
#[async_trait(?Send)]
impl<F> File for Vec<F>
where
F: File,
{
type Partition = F::Partition;
type Error = F::Error;
async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
stream::iter(self.into_iter())
.flat_map(|file| {
async { stream::iter(ResultExpand(file.partitions().await)) }.flatten_stream()
})
.try_collect()
.await
}
}
#[async_trait(?Send)]
impl<F> File for &[F]
where
F: File + Clone,
{
type Partition = F::Partition;
type Error = F::Error;
async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
stream::iter(self.iter().cloned())
.flat_map(|file| {
async { stream::iter(ResultExpand(file.partitions().await)) }.flatten_stream()
})
.try_collect()
.await
}
}
#[async_trait(?Send)]
impl File for PathBuf {
type Partition = Self;
type Error = IoError;
async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
Ok(vec![self])
}
}
#[async_trait(?Send)]
impl Directory for PathBuf {
async fn partitions_filter<F>(
self, f: F,
) -> Result<Vec<<Self as File>::Partition>, <Self as File>::Error>
where
F: FnMut(&super::PathBuf) -> bool,
{
(*self).partitions_filter(f).await
}
}
#[async_trait(?Send)]
impl Partition for PathBuf {
type Page = LocalFile;
type Error = IoError;
async fn pages(self) -> Result<Vec<Self::Page>, Self::Error> {
Ok(vec![LocalFile::open(self)?])
}
}
#[async_trait(?Send)]
impl Directory for &Path {
async fn partitions_filter<F>(
self, mut f: F,
) -> Result<Vec<<Self as File>::Partition>, <Self as File>::Error>
where
F: FnMut(&super::PathBuf) -> bool,
{
WalkDir::new(self)
.follow_links(true)
.sort_by(|a, b| a.file_name().cmp(b.file_name()))
.into_iter()
.filter_entry(|e| {
let is_dir = e.file_type().is_dir();
let path = e.path();
if path == self {
return true;
}
let mut path = path.strip_prefix(self).unwrap();
let mut path_buf = super::PathBuf::new();
let mut file_name = None;
if !is_dir {
file_name = Some(path.file_name().unwrap());
path = path.parent().unwrap();
}
for component in path {
path_buf.push(component);
}
path_buf.set_file_name(file_name);
f(&path_buf)
})
.filter_map(|e| match e {
Ok(ref e) if e.file_type().is_dir() => None,
Ok(e) => Some(Ok(e.into_path())),
Err(e) => Some(Err(if e.io_error().is_some() {
e.into_io_error().unwrap()
} else {
io::Error::new(io::ErrorKind::Other, e)
}
.into())),
})
.collect()
}
}
#[async_trait(?Send)]
impl File for &Path {
type Partition = PathBuf;
type Error = IoError;
async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
PathBuf::partitions(self.into()).await
}
}
#[async_trait(?Send)]
impl File for String {
type Partition = PathBuf;
type Error = IoError;
async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
PathBuf::partitions(self.into()).await
}
}
#[async_trait(?Send)]
impl File for &str {
type Partition = PathBuf;
type Error = IoError;
async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
PathBuf::partitions(self.into()).await
}
}
#[async_trait(?Send)]
impl File for OsString {
type Partition = PathBuf;
type Error = IoError;
async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
PathBuf::partitions(self.into()).await
}
}
#[async_trait(?Send)]
impl File for &OsStr {
type Partition = PathBuf;
type Error = IoError;
async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
PathBuf::partitions(self.into()).await
}
}
#[cfg(target_arch = "wasm32")]
enum FutureOrOutput<Fut: Future> {
Future(Fut),
Output(Fut::Output),
}
#[cfg(target_arch = "wasm32")]
impl<Fut: Future> FutureOrOutput<Fut> {
fn output(&mut self) -> Option<&mut Fut::Output> {
if let FutureOrOutput::Output(output) = self {
Some(output)
} else {
None
}
}
}
#[cfg(not(target_arch = "wasm32"))]
struct LocalFileInner {
file: fs::File,
}
#[cfg(target_arch = "wasm32")]
struct LocalFileInner {
file: Mutex<FutureOrOutput<LocalBoxFuture<'static, Blob>>>,
}
pub struct LocalFile {
inner: Arc<LocalFileInner>,
}
impl LocalFile {
pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
#[cfg(not(target_arch = "wasm32"))]
{
Self::from_file(fs::File::open(path)?)
}
#[cfg(target_arch = "wasm32")]
{
let path = path.as_ref().to_string_lossy().into_owned();
let file = Mutex::new(FutureOrOutput::Future(
async move {
let window = web_sys::window().unwrap();
let resp_value = JsFuture::from(window.fetch_with_str(&path)).await.unwrap();
let resp: Response = resp_value.dyn_into().unwrap();
let blob: JsValue = JsFuture::from(resp.blob().unwrap()).await.unwrap();
let blob: Blob = blob.dyn_into().unwrap();
blob
}
.boxed_local(),
));
let inner = Arc::new(LocalFileInner { file });
Ok(Self { inner })
}
}
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
#[cfg(not(target_arch = "wasm32"))]
fn len(&self) -> LocalBoxFuture<'static, Result<u64, <Self as Page>::Error>> {
let self_ = self.inner.clone();
future::lazy(move |_| (&self_.file).seek(SeekFrom::End(0)).map_err(Into::into))
.boxed_local()
}
#[cfg(target_arch = "wasm32")]
fn len(&self) -> LocalBoxFuture<'static, Result<u64, <Self as Page>::Error>> {
let self_ = self.inner.clone();
async move {
let mut file = self_.file.lock().await;
if let FutureOrOutput::Future(fut) = &mut *file {
*file = FutureOrOutput::Output(fut.await);
}
let blob = file.output().unwrap();
let size = blob.size();
Ok(f64_to_u64(size))
}
.boxed_local()
}
#[cfg(not(target_arch = "wasm32"))]
fn from_file(file: fs::File) -> io::Result<Self> {
let inner = Arc::new(LocalFileInner { file });
Ok(Self { inner })
}
#[cfg(target_arch = "wasm32")]
fn from_file(_file: fs::File) -> io::Result<Self> {
unimplemented!()
}
}
impl From<fs::File> for LocalFile {
fn from(file: fs::File) -> Self {
Self::from_file(file).unwrap()
}
}
#[cfg(unix)]
impl LocalFile {
#[inline]
fn read_at<'a>(
&self, pos: u64, buf: &'a mut [u8],
) -> impl Future<Output = io::Result<usize>> + 'a {
let self_ = self.inner.clone();
let len = buf.len();
spawn_blocking(move || {
let mut buf = vec![0; len];
FileExt::read_at(&self_.file, &mut buf, pos).map(|len| {
buf.truncate(len);
buf
})
})
.map(move |vec| {
vec.unwrap().map(move |vec| {
buf[..vec.len()].copy_from_slice(&vec);
vec.len()
})
})
}
#[inline]
fn write_at<'a>(
&self, pos: u64, buf: &'a [u8],
) -> impl Future<Output = io::Result<usize>> + 'a {
let self_ = self.inner.clone();
let buf = buf.to_owned();
spawn_blocking(move || FileExt::write_at(&self_.file, &buf, pos)).map(Result::unwrap)
}
}
#[cfg(windows)]
impl LocalFile {
#[inline]
fn read_at<'a>(
&self, pos: u64, buf: &'a mut [u8],
) -> impl Future<Output = io::Result<usize>> + 'a {
let self_ = self.inner.clone();
let len = buf.len();
spawn_blocking(move || {
let mut buf = vec![0; len];
FileExt::seek_read(&self_.file, &mut buf, pos).map(|len| {
buf.truncate(len);
buf
})
})
.map(move |vec| {
vec.unwrap().map(move |vec| {
buf[..vec.len()].copy_from_slice(&vec);
vec.len()
})
})
}
#[inline]
fn write_at<'a>(
&self, pos: u64, buf: &'a [u8],
) -> impl Future<Output = io::Result<usize>> + 'a {
let self_ = self.inner.clone();
let buf = buf.to_owned();
spawn_blocking(move || FileExt::seek_write(&self_.file, &buf, pos)).map(Result::unwrap)
}
}
#[cfg(target_arch = "wasm32")]
impl LocalFile {
fn read_at<'a>(
&self, pos: u64, buf: &'a mut [u8],
) -> impl Future<Output = io::Result<usize>> + 'a {
let self_ = self.inner.clone();
async move {
let mut file = self_.file.lock().await;
if let FutureOrOutput::Future(fut) = &mut *file {
*file = FutureOrOutput::Output(fut.await);
}
let blob = file.output().unwrap();
let end = pos + u64::try_from(buf.len()).unwrap();
let slice: Blob = blob
.slice_with_f64_and_f64(u64_to_f64(pos), u64_to_f64(end))
.unwrap();
drop(file);
let array_buffer = if false {
slice.array_buffer()
} else {
Response::new_with_opt_blob(Some(&slice))
.unwrap()
.array_buffer()
.unwrap()
};
drop(slice);
let array_buffer: JsValue = JsFuture::from(array_buffer).await.unwrap();
let array_buffer: ArrayBuffer = array_buffer.dyn_into().unwrap();
let buf_: Uint8Array = Uint8Array::new(&array_buffer);
drop(array_buffer);
let len = usize::try_from(buf_.length()).unwrap();
buf_.copy_to(&mut buf[..len]);
Ok(len)
}
.boxed_local()
}
#[inline]
fn write_at<'a>(
&self, _pos: u64, _buf: &'a [u8],
) -> impl Future<Output = io::Result<usize>> + 'a {
let _self = self;
future::lazy(|_| unimplemented!()).boxed_local()
}
}
impl Page for LocalFile {
type Error = IoError;
fn len(&self) -> LocalBoxFuture<'static, Result<u64, Self::Error>> {
self.len()
}
fn read(
&self, mut offset: u64, len: usize,
) -> LocalBoxFuture<'static, Result<Box<[u8]>, Self::Error>> {
let self_ = self.clone();
Box::pin(async move {
let mut buf_ = vec![0; len];
let mut buf = &mut *buf_;
while !buf.is_empty() {
match self_.read_at(offset, buf).await {
Ok(0) => break,
Ok(n) => {
let tmp = buf;
buf = &mut tmp[n..];
offset += n as u64;
}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e.into()),
}
}
let len = len - buf.len();
buf_.truncate(len);
Ok(buf_.into_boxed_slice())
})
}
fn write(
&self, mut offset: u64, buf: Box<[u8]>,
) -> LocalBoxFuture<'static, Result<(), Self::Error>> {
let self_ = self.clone();
Box::pin(async move {
let mut buf = &*buf;
while !buf.is_empty() {
match self_.write_at(offset, buf).await {
Ok(0) => {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write whole buffer",
)
.into())
}
Ok(n) => {
buf = &buf[n..];
offset += n as u64
}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e.into()),
}
}
Ok(())
})
}
}