use std::{
cmp::min,
collections::HashMap,
fs::OpenOptions,
future::Future,
io::{Cursor, Error, ErrorKind},
mem::MaybeUninit,
path::{Component, Path, PathBuf},
pin::Pin,
task::{Context, Poll},
time::SystemTime,
};
use futures_util::future::{ready, Ready};
use hyper::body::Bytes;
use tokio::{
fs::{self, File},
io::{AsyncRead, AsyncSeek, ReadBuf},
task::{spawn_blocking, JoinHandle},
};
#[cfg(windows)]
use std::os::windows::fs::OpenOptionsExt;
#[cfg(windows)]
use winapi::um::winbase::FILE_FLAG_BACKUP_SEMANTICS;
const TOKIO_READ_BUF_SIZE: usize = 8 * 1024;
#[derive(Debug)]
pub struct FileWithMetadata<F = File> {
pub handle: F,
pub size: u64,
pub modified: Option<SystemTime>,
pub is_dir: bool,
}
pub trait FileOpener: Send + Sync + 'static {
type File: IntoFileAccess;
type Future: Future<Output = Result<FileWithMetadata<Self::File>, Error>> + Send;
fn open(&self, path: &Path) -> Self::Future;
}
pub trait IntoFileAccess: Send + Unpin + 'static {
type Output: FileAccess;
fn into_file_access(self) -> Self::Output;
}
pub trait FileAccess: AsyncSeek + Send + Unpin + 'static {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
len: usize,
) -> Poll<Result<Bytes, Error>>;
}
impl IntoFileAccess for File {
type Output = TokioFileAccess;
fn into_file_access(self) -> Self::Output {
TokioFileAccess::new(self)
}
}
pub struct TokioFileAccess {
file: File,
read_buf: Box<[MaybeUninit<u8>; TOKIO_READ_BUF_SIZE]>,
}
impl TokioFileAccess {
pub fn new(file: File) -> Self {
TokioFileAccess {
file,
read_buf: Box::new([MaybeUninit::uninit(); TOKIO_READ_BUF_SIZE]),
}
}
}
impl AsyncSeek for TokioFileAccess {
fn start_seek(mut self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
Pin::new(&mut self.file).start_seek(position)
}
fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
Pin::new(&mut self.file).poll_complete(cx)
}
}
impl FileAccess for TokioFileAccess {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
len: usize,
) -> Poll<Result<Bytes, Error>> {
let Self {
ref mut file,
ref mut read_buf,
} = *self;
let len = min(len, read_buf.len()) as usize;
let mut read_buf = ReadBuf::uninit(&mut read_buf[..len]);
match Pin::new(file).poll_read(cx, &mut read_buf) {
Poll::Ready(Ok(())) => {
let filled = read_buf.filled();
if filled.is_empty() {
Poll::Ready(Ok(Bytes::new()))
} else {
Poll::Ready(Ok(Bytes::copy_from_slice(filled)))
}
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
}
}
pub struct TokioFileOpener {
pub root: PathBuf,
}
impl TokioFileOpener {
pub fn new(root: impl Into<PathBuf>) -> Self {
Self { root: root.into() }
}
}
impl FileOpener for TokioFileOpener {
type File = File;
type Future = TokioFileFuture;
fn open(&self, path: &Path) -> Self::Future {
let mut full_path = self.root.clone();
full_path.extend(path);
let inner = spawn_blocking(move || {
let mut opts = OpenOptions::new();
opts.read(true);
#[cfg(windows)]
opts.custom_flags(FILE_FLAG_BACKUP_SEMANTICS);
let handle = opts.open(full_path)?;
let metadata = handle.metadata()?;
Ok(FileWithMetadata {
handle: File::from_std(handle),
size: metadata.len(),
modified: metadata.modified().ok(),
is_dir: metadata.is_dir(),
})
});
TokioFileFuture { inner }
}
}
pub struct TokioFileFuture {
inner: JoinHandle<Result<FileWithMetadata<File>, Error>>,
}
impl Future for TokioFileFuture {
type Output = Result<FileWithMetadata<File>, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.inner).poll(cx) {
Poll::Ready(Ok(res)) => Poll::Ready(res),
Poll::Ready(Err(_)) => {
Poll::Ready(Err(Error::new(ErrorKind::Other, "background task failed")))
}
Poll::Pending => Poll::Pending,
}
}
}
type MemoryFileMap = HashMap<PathBuf, FileWithMetadata<Bytes>>;
impl IntoFileAccess for Cursor<Bytes> {
type Output = Self;
fn into_file_access(self) -> Self::Output {
self
}
}
impl FileAccess for Cursor<Bytes> {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
len: usize,
) -> Poll<Result<Bytes, Error>> {
let pos = self.position();
let slice = (*self).get_ref();
if pos > slice.len() as u64 {
return Poll::Ready(Ok(Bytes::new()));
}
let start = pos as usize;
let amt = min(slice.len() - start, len);
let end = start + amt;
Poll::Ready(Ok(slice.slice(start..end)))
}
}
pub struct MemoryFs {
files: MemoryFileMap,
}
impl Default for MemoryFs {
fn default() -> Self {
let mut files = MemoryFileMap::new();
files.insert(
PathBuf::new(),
FileWithMetadata {
handle: Bytes::new(),
size: 0,
modified: None,
is_dir: true,
},
);
Self { files }
}
}
impl MemoryFs {
pub async fn from_dir(path: impl AsRef<Path>) -> Result<Self, Error> {
let mut fs = Self::default();
let mut dirs = vec![(path.as_ref().to_path_buf(), PathBuf::new())];
while let Some((dir, base)) = dirs.pop() {
let mut iter = fs::read_dir(dir).await?;
while let Some(entry) = iter.next_entry().await? {
let metadata = entry.metadata().await?;
let mut out_path = base.to_path_buf();
out_path.push(entry.file_name());
if metadata.is_dir() {
dirs.push((entry.path(), out_path));
} else if metadata.is_file() {
let data = fs::read(entry.path()).await?;
fs.add(out_path, data.into(), metadata.modified().ok());
}
}
}
Ok(fs)
}
pub fn add(
&mut self,
path: impl Into<PathBuf>,
data: Bytes,
modified: Option<SystemTime>,
) -> &mut Self {
let path = path.into();
let mut components: Vec<_> = path.components().collect();
components.pop();
let mut dir_path = PathBuf::new();
for component in components {
if let Component::Normal(x) = component {
dir_path.push(x);
self.files.insert(
dir_path.clone(),
FileWithMetadata {
handle: Bytes::new(),
size: 0,
modified: None,
is_dir: true,
},
);
}
}
let size = data.len() as u64;
self.files.insert(
path,
FileWithMetadata {
handle: data,
size,
modified,
is_dir: false,
},
);
self
}
}
impl FileOpener for MemoryFs {
type File = Cursor<Bytes>;
type Future = Ready<Result<FileWithMetadata<Self::File>, Error>>;
fn open(&self, path: &Path) -> Self::Future {
ready(
self.files
.get(path)
.map(|file| FileWithMetadata {
handle: Cursor::new(file.handle.clone()),
size: file.size,
modified: file.modified,
is_dir: file.is_dir,
})
.ok_or_else(|| Error::new(ErrorKind::NotFound, "Not found")),
)
}
}