#![allow(clippy::needless_lifetimes)]
use async_trait::async_trait;
use futures::{future::BoxFuture, stream, FutureExt, StreamExt, TryStreamExt};
use std::{
convert::TryFrom, ffi::{OsStr, OsString}, fs, io::{self, Seek, SeekFrom}, path::{Path, PathBuf}, sync::{
atomic::{AtomicU64, Ordering}, Arc
}
};
use walkdir::WalkDir;
#[cfg(unix)]
use std::os::unix::fs::FileExt;
#[cfg(windows)]
use std::os::windows::fs::FileExt;
use super::{Directory, File, Page, Partition};
use crate::util::{IoError, ResultExpand};
#[async_trait(?Send)]
impl<F> File for Vec<F>
where
F: File,
{
type Partition = F::Partition;
type Error = F::Error;
async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
stream::iter(self.into_iter())
.flat_map(|file| {
async { stream::iter(ResultExpand(file.partitions().await)) }.flatten_stream()
})
.try_collect()
.await
}
}
#[async_trait(?Send)]
impl<F> File for &[F]
where
F: File + Clone,
{
type Partition = F::Partition;
type Error = F::Error;
async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
stream::iter(self.iter().cloned())
.flat_map(|file| {
async { stream::iter(ResultExpand(file.partitions().await)) }.flatten_stream()
})
.try_collect()
.await
}
}
#[async_trait(?Send)]
impl File for PathBuf {
type Partition = Self;
type Error = IoError;
async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
Ok(vec![self])
}
}
#[async_trait(?Send)]
impl Directory for PathBuf {
async fn partitions_filter<F>(
self, f: F,
) -> Result<Vec<<Self as File>::Partition>, <Self as File>::Error>
where
F: FnMut(&super::PathBuf) -> bool,
{
(*self).partitions_filter(f).await
}
}
#[async_trait(?Send)]
impl Partition for PathBuf {
type Page = LocalFile;
type Error = IoError;
async fn pages(self) -> Result<Vec<Self::Page>, Self::Error> {
Ok(vec![LocalFile::open(self)?])
}
}
#[async_trait(?Send)]
impl Directory for &Path {
async fn partitions_filter<F>(
self, mut f: F,
) -> Result<Vec<<Self as File>::Partition>, <Self as File>::Error>
where
F: FnMut(&super::PathBuf) -> bool,
{
WalkDir::new(self)
.follow_links(true)
.sort_by(|a, b| a.file_name().cmp(b.file_name()))
.into_iter()
.filter_entry(|e| {
let is_dir = e.file_type().is_dir();
let path = e.path();
if path == self {
return true;
}
let mut path = path.strip_prefix(self).unwrap();
let mut path_buf = super::PathBuf::new();
let mut file_name = None;
if !is_dir {
file_name = Some(path.file_name().unwrap());
path = path.parent().unwrap();
}
for component in path {
path_buf.push(component);
}
path_buf.set_file_name(file_name);
f(&path_buf)
})
.filter_map(|e| match e {
Ok(ref e) if e.file_type().is_dir() => None,
Ok(e) => Some(Ok(e.into_path())),
Err(e) => Some(Err(if e.io_error().is_some() {
e.into_io_error().unwrap()
} else {
io::Error::new(io::ErrorKind::Other, e)
}
.into())),
})
.collect()
}
}
#[async_trait(?Send)]
impl File for &Path {
type Partition = PathBuf;
type Error = IoError;
async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
PathBuf::partitions(self.into()).await
}
}
#[async_trait(?Send)]
impl File for String {
type Partition = PathBuf;
type Error = IoError;
async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
PathBuf::partitions(self.into()).await
}
}
#[async_trait(?Send)]
impl File for &str {
type Partition = PathBuf;
type Error = IoError;
async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
PathBuf::partitions(self.into()).await
}
}
#[async_trait(?Send)]
impl File for OsString {
type Partition = PathBuf;
type Error = IoError;
async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
PathBuf::partitions(self.into()).await
}
}
#[async_trait(?Send)]
impl File for &OsStr {
type Partition = PathBuf;
type Error = IoError;
async fn partitions(self) -> Result<Vec<Self::Partition>, Self::Error> {
PathBuf::partitions(self.into()).await
}
}
struct LocalFileInner {
file: fs::File,
len: AtomicU64,
}
pub struct LocalFile {
inner: Arc<LocalFileInner>,
}
impl LocalFile {
pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
Self::from_file(fs::File::open(path)?)
}
fn from_file(mut file: fs::File) -> io::Result<Self> {
let len = file.seek(SeekFrom::End(0))?;
let len = AtomicU64::new(len);
let inner = Arc::new(LocalFileInner { file, len });
Ok(Self { inner })
}
}
impl From<fs::File> for LocalFile {
fn from(file: fs::File) -> Self {
Self::from_file(file).unwrap()
}
}
#[cfg(unix)]
impl LocalFile {
#[inline]
fn read_at(&self, pos: u64, buf: &mut [u8]) -> io::Result<usize> {
FileExt::read_at(&self.inner.file, buf, pos)
}
#[inline]
fn write_at(&self, pos: u64, buf: &[u8]) -> io::Result<usize> {
FileExt::write_at(&self.inner.file, buf, pos)
}
}
#[cfg(windows)]
impl LocalFile {
#[inline]
fn read_at(&self, pos: u64, buf: &mut [u8]) -> io::Result<usize> {
FileExt::seek_read(&self.inner.file, buf, pos)
}
#[inline]
fn write_at(&self, pos: u64, buf: &[u8]) -> io::Result<usize> {
FileExt::seek_write(&self.inner.file, buf, pos)
}
}
#[async_trait]
impl Page for LocalFile {
type Error = IoError;
fn len(&self) -> u64 {
self.inner.len.load(Ordering::Relaxed)
}
fn set_len(&self, len: u64) -> Result<(), Self::Error> {
self.inner.file.set_len(len)?;
self.inner.len.store(len, Ordering::Relaxed);
Ok(())
}
fn read(
&self, mut offset: u64, len: usize,
) -> BoxFuture<'static, Result<Box<[u8]>, Self::Error>> {
let self_ = LocalFile {
inner: self.inner.clone(),
};
Box::pin(async move {
let mut buf_ = vec![0; len].into_boxed_slice();
let mut buf = &mut *buf_;
while !buf.is_empty() {
match self_.read_at(offset, buf) {
Ok(0) => break,
Ok(n) => {
let tmp = buf;
buf = &mut tmp[n..];
offset += n as u64;
}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e.into()),
}
}
if !buf.is_empty() {
Err(
io::Error::new(io::ErrorKind::UnexpectedEof, "failed to fill whole buffer")
.into(),
)
} else {
Ok(buf_)
}
})
}
fn write(
&self, mut offset: u64, buf: Box<[u8]>,
) -> BoxFuture<'static, Result<(), Self::Error>> {
let self_ = LocalFile {
inner: self.inner.clone(),
};
Box::pin(async move {
let mut buf = &*buf;
while !buf.is_empty() {
match self_.write_at(offset, buf) {
Ok(0) => {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write whole buffer",
)
.into())
}
Ok(n) => {
let _ = self_
.inner
.len
.fetch_max(offset + u64::try_from(n).unwrap(), Ordering::Relaxed);
buf = &buf[n..];
offset += n as u64
}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e.into()),
}
}
Ok(())
})
}
}