use crate::{
maybe_spawn_blocking,
path::{absolute_path_to_url, Path},
GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta,
ObjectStore, Result,
};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::future::BoxFuture;
use futures::ready;
use futures::{stream::BoxStream, StreamExt};
use futures::{FutureExt, TryStreamExt};
use snafu::{ensure, ResultExt, Snafu};
use std::fs::{metadata, symlink_metadata, File, Metadata, OpenOptions};
use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::{collections::BTreeSet, convert::TryFrom, io};
use std::{collections::VecDeque, path::PathBuf};
use tokio::io::AsyncWrite;
use url::Url;
use walkdir::{DirEntry, WalkDir};
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub(crate) enum Error {
#[snafu(display("File size for {} did not fit in a usize: {}", path, source))]
FileSizeOverflowedUsize {
source: std::num::TryFromIntError,
path: String,
},
#[snafu(display("Unable to walk dir: {}", source))]
UnableToWalkDir {
source: walkdir::Error,
},
#[snafu(display("Unable to access metadata for {}: {}", path, source))]
Metadata {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
path: String,
},
#[snafu(display("Unable to copy data to file: {}", source))]
UnableToCopyDataToFile {
source: io::Error,
},
#[snafu(display("Unable to rename file: {}", source))]
UnableToRenameFile {
source: io::Error,
},
#[snafu(display("Unable to create dir {}: {}", path.display(), source))]
UnableToCreateDir {
source: io::Error,
path: PathBuf,
},
#[snafu(display("Unable to create file {}: {}", path.display(), source))]
UnableToCreateFile {
source: io::Error,
path: PathBuf,
},
#[snafu(display("Unable to delete file {}: {}", path.display(), source))]
UnableToDeleteFile {
source: io::Error,
path: PathBuf,
},
#[snafu(display("Unable to open file {}: {}", path.display(), source))]
UnableToOpenFile {
source: io::Error,
path: PathBuf,
},
#[snafu(display("Unable to read data from file {}: {}", path.display(), source))]
UnableToReadBytes {
source: io::Error,
path: PathBuf,
},
#[snafu(display("Out of range of file {}, expected: {}, actual: {}", path.display(), expected, actual))]
OutOfRange {
path: PathBuf,
expected: usize,
actual: usize,
},
#[snafu(display("Unable to copy file from {} to {}: {}", from.display(), to.display(), source))]
UnableToCopyFile {
from: PathBuf,
to: PathBuf,
source: io::Error,
},
NotFound {
path: PathBuf,
source: io::Error,
},
#[snafu(display("Error seeking file {}: {}", path.display(), source))]
Seek {
source: io::Error,
path: PathBuf,
},
#[snafu(display("Unable to convert URL \"{}\" to filesystem path", url))]
InvalidUrl {
url: Url,
},
AlreadyExists {
path: String,
source: io::Error,
},
#[snafu(display("Unable to canonicalize filesystem root: {}", path.display()))]
UnableToCanonicalize {
path: PathBuf,
source: io::Error,
},
}
impl From<Error> for super::Error {
fn from(source: Error) -> Self {
match source {
Error::NotFound { path, source } => Self::NotFound {
path: path.to_string_lossy().to_string(),
source: source.into(),
},
Error::AlreadyExists { path, source } => Self::AlreadyExists {
path,
source: source.into(),
},
_ => Self::Generic {
store: "LocalFileSystem",
source: Box::new(source),
},
}
}
}
#[derive(Debug)]
pub struct LocalFileSystem {
config: Arc<Config>,
}
#[derive(Debug)]
struct Config {
root: Url,
}
impl std::fmt::Display for LocalFileSystem {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "LocalFileSystem({})", self.config.root)
}
}
impl Default for LocalFileSystem {
fn default() -> Self {
Self::new()
}
}
impl LocalFileSystem {
pub fn new() -> Self {
Self {
config: Arc::new(Config {
root: Url::parse("file:///").unwrap(),
}),
}
}
pub fn new_with_prefix(prefix: impl AsRef<std::path::Path>) -> Result<Self> {
let path = std::fs::canonicalize(&prefix).context(UnableToCanonicalizeSnafu {
path: prefix.as_ref(),
})?;
Ok(Self {
config: Arc::new(Config {
root: absolute_path_to_url(path)?,
}),
})
}
}
impl Config {
fn path_to_filesystem(&self, location: &Path) -> Result<PathBuf> {
let mut url = self.root.clone();
url.path_segments_mut()
.expect("url path")
.pop_if_empty()
.extend(location.parts());
url.to_file_path()
.map_err(|_| Error::InvalidUrl { url }.into())
}
fn filesystem_to_path(&self, location: &std::path::Path) -> Result<Path> {
Ok(Path::from_absolute_path_with_base(
location,
Some(&self.root),
)?)
}
}
#[async_trait]
impl ObjectStore for LocalFileSystem {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
let path = self.config.path_to_filesystem(location)?;
maybe_spawn_blocking(move || {
let (mut file, suffix) = new_staged_upload(&path)?;
let staging_path = staged_upload_path(&path, &suffix);
file.write_all(&bytes)
.context(UnableToCopyDataToFileSnafu)
.and_then(|_| {
std::fs::rename(&staging_path, &path).context(UnableToRenameFileSnafu)
})
.map_err(|e| {
let _ = std::fs::remove_file(&staging_path); e.into()
})
})
.await
}
async fn put_multipart(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
let dest = self.config.path_to_filesystem(location)?;
let (file, suffix) = new_staged_upload(&dest)?;
Ok((
suffix.clone(),
Box::new(LocalUpload::new(dest, suffix, Arc::new(file))),
))
}
async fn abort_multipart(
&self,
location: &Path,
multipart_id: &MultipartId,
) -> Result<()> {
let dest = self.config.path_to_filesystem(location)?;
let path: PathBuf = staged_upload_path(&dest, multipart_id);
maybe_spawn_blocking(move || match std::fs::remove_file(&path) {
Ok(_) => Ok(()),
Err(source) => match source.kind() {
ErrorKind::NotFound => Ok(()), _ => Err(Error::UnableToDeleteFile { path, source }.into()),
},
})
.await
}
async fn append(
&self,
location: &Path,
) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
let path = self.config.path_to_filesystem(location)?;
loop {
let mut options = tokio::fs::OpenOptions::new();
match options
.truncate(false)
.append(true)
.create(true)
.open(&path)
.await
{
Ok(file) => return Ok(Box::new(file)),
Err(source) if source.kind() == ErrorKind::NotFound => {
let parent =
path.parent().ok_or_else(|| Error::UnableToCreateFile {
path: path.to_path_buf(),
source,
})?;
tokio::fs::create_dir_all(parent)
.await
.context(UnableToCreateDirSnafu { path: parent })?;
continue;
}
Err(source) => {
return Err(Error::UnableToOpenFile { source, path }.into())
}
}
}
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
if options.if_match.is_some() || options.if_none_match.is_some() {
return Err(super::Error::NotSupported {
source: "ETags not supported by LocalFileSystem".to_string().into(),
});
}
let location = location.clone();
let path = self.config.path_to_filesystem(&location)?;
maybe_spawn_blocking(move || {
let (file, metadata) = open_file(&path)?;
if options.if_unmodified_since.is_some()
|| options.if_modified_since.is_some()
{
options.check_modified(&location, last_modified(&metadata))?;
}
let meta = convert_metadata(metadata, location)?;
Ok(GetResult {
payload: GetResultPayload::File(file, path),
range: options.range.unwrap_or(0..meta.size),
meta,
})
})
.await
}
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let path = self.config.path_to_filesystem(location)?;
maybe_spawn_blocking(move || {
let (mut file, _) = open_file(&path)?;
read_range(&mut file, &path, range)
})
.await
}
async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<usize>],
) -> Result<Vec<Bytes>> {
let path = self.config.path_to_filesystem(location)?;
let ranges = ranges.to_vec();
maybe_spawn_blocking(move || {
let (mut file, _) = open_file(&path)?;
ranges
.into_iter()
.map(|r| read_range(&mut file, &path, r))
.collect()
})
.await
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let path = self.config.path_to_filesystem(location)?;
let location = location.clone();
maybe_spawn_blocking(move || {
let metadata = match metadata(&path) {
Err(e) => Err(match e.kind() {
ErrorKind::NotFound => Error::NotFound {
path: path.clone(),
source: e,
},
_ => Error::Metadata {
source: e.into(),
path: location.to_string(),
},
}),
Ok(m) => match !m.is_dir() {
true => Ok(m),
false => Err(Error::NotFound {
path,
source: io::Error::new(ErrorKind::NotFound, "is directory"),
}),
},
}?;
convert_metadata(metadata, location)
})
.await
}
async fn delete(&self, location: &Path) -> Result<()> {
let path = self.config.path_to_filesystem(location)?;
maybe_spawn_blocking(move || match std::fs::remove_file(&path) {
Ok(_) => Ok(()),
Err(e) => Err(match e.kind() {
ErrorKind::NotFound => Error::NotFound { path, source: e }.into(),
_ => Error::UnableToDeleteFile { path, source: e }.into(),
}),
})
.await
}
async fn list(
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
let config = Arc::clone(&self.config);
let root_path = match prefix {
Some(prefix) => config.path_to_filesystem(prefix)?,
None => self.config.root.to_file_path().unwrap(),
};
let walkdir = WalkDir::new(root_path)
.min_depth(1)
.follow_links(true);
let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
match convert_walkdir_result(result_dir_entry) {
Err(e) => Some(Err(e)),
Ok(None) => None,
Ok(entry @ Some(_)) => entry
.filter(|dir_entry| {
dir_entry.file_type().is_file()
&& !dir_entry.file_name().to_string_lossy().contains('#')
})
.map(|entry| {
let location = config.filesystem_to_path(entry.path())?;
convert_entry(entry, location)
}),
}
});
if tokio::runtime::Handle::try_current().is_err() {
return Ok(futures::stream::iter(s).boxed());
}
const CHUNK_SIZE: usize = 1024;
let buffer = VecDeque::with_capacity(CHUNK_SIZE);
let stream =
futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async move {
if buffer.is_empty() {
(s, buffer) = tokio::task::spawn_blocking(move || {
for _ in 0..CHUNK_SIZE {
match s.next() {
Some(r) => buffer.push_back(r),
None => break,
}
}
(s, buffer)
})
.await?;
}
match buffer.pop_front() {
Some(Err(e)) => Err(e),
Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
None => Ok(None),
}
});
Ok(stream.boxed())
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
let config = Arc::clone(&self.config);
let prefix = prefix.cloned().unwrap_or_default();
let resolved_prefix = config.path_to_filesystem(&prefix)?;
maybe_spawn_blocking(move || {
let walkdir = WalkDir::new(&resolved_prefix)
.min_depth(1)
.max_depth(1)
.follow_links(true);
let mut common_prefixes = BTreeSet::new();
let mut objects = Vec::new();
for entry_res in walkdir.into_iter().map(convert_walkdir_result) {
if let Some(entry) = entry_res? {
if entry.file_type().is_file()
&& entry.file_name().to_string_lossy().contains('#')
{
continue;
}
let is_directory = entry.file_type().is_dir();
let entry_location = config.filesystem_to_path(entry.path())?;
let mut parts = match entry_location.prefix_match(&prefix) {
Some(parts) => parts,
None => continue,
};
let common_prefix = match parts.next() {
Some(p) => p,
None => continue,
};
drop(parts);
if is_directory {
common_prefixes.insert(prefix.child(common_prefix));
} else {
objects.push(convert_entry(entry, entry_location)?);
}
}
}
Ok(ListResult {
common_prefixes: common_prefixes.into_iter().collect(),
objects,
})
})
.await
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let from = self.config.path_to_filesystem(from)?;
let to = self.config.path_to_filesystem(to)?;
let mut id = 0;
maybe_spawn_blocking(move || loop {
let staged = staged_upload_path(&to, &id.to_string());
match std::fs::hard_link(&from, &staged) {
Ok(_) => {
return std::fs::rename(&staged, &to).map_err(|source| {
let _ = std::fs::remove_file(&staged); Error::UnableToCopyFile { from, to, source }.into()
});
}
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => id += 1,
ErrorKind::NotFound => create_parent_dirs(&to, source)?,
_ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
},
}
})
.await
}
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
let from = self.config.path_to_filesystem(from)?;
let to = self.config.path_to_filesystem(to)?;
maybe_spawn_blocking(move || loop {
match std::fs::rename(&from, &to) {
Ok(_) => return Ok(()),
Err(source) => match source.kind() {
ErrorKind::NotFound => create_parent_dirs(&to, source)?,
_ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
},
}
})
.await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
let from = self.config.path_to_filesystem(from)?;
let to = self.config.path_to_filesystem(to)?;
maybe_spawn_blocking(move || loop {
match std::fs::hard_link(&from, &to) {
Ok(_) => return Ok(()),
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => {
return Err(Error::AlreadyExists {
path: to.to_str().unwrap().to_string(),
source,
}
.into())
}
ErrorKind::NotFound => create_parent_dirs(&to, source)?,
_ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
},
}
})
.await
}
}
fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> {
let parent = path.parent().ok_or_else(|| Error::UnableToCreateFile {
path: path.to_path_buf(),
source,
})?;
std::fs::create_dir_all(parent).context(UnableToCreateDirSnafu { path: parent })?;
Ok(())
}
fn new_staged_upload(base: &std::path::Path) -> Result<(File, String)> {
let mut multipart_id = 1;
loop {
let suffix = multipart_id.to_string();
let path = staged_upload_path(base, &suffix);
let mut options = OpenOptions::new();
match options.read(true).write(true).create_new(true).open(&path) {
Ok(f) => return Ok((f, suffix)),
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => multipart_id += 1,
ErrorKind::NotFound => create_parent_dirs(&path, source)?,
_ => return Err(Error::UnableToOpenFile { source, path }.into()),
},
}
}
}
fn staged_upload_path(dest: &std::path::Path, suffix: &str) -> PathBuf {
let mut staging_path = dest.as_os_str().to_owned();
staging_path.push("#");
staging_path.push(suffix);
staging_path.into()
}
enum LocalUploadState {
Idle(Arc<File>),
Writing(Arc<File>, BoxFuture<'static, Result<usize, io::Error>>),
ShuttingDown(BoxFuture<'static, Result<(), io::Error>>),
Committing(BoxFuture<'static, Result<(), io::Error>>),
Complete,
}
struct LocalUpload {
inner_state: LocalUploadState,
dest: PathBuf,
multipart_id: MultipartId,
}
impl LocalUpload {
pub fn new(dest: PathBuf, multipart_id: MultipartId, file: Arc<File>) -> Self {
Self {
inner_state: LocalUploadState::Idle(file),
dest,
multipart_id,
}
}
}
impl AsyncWrite for LocalUpload {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
let invalid_state = |condition: &str| -> Poll<Result<usize, io::Error>> {
Poll::Ready(Err(io::Error::new(
ErrorKind::InvalidInput,
format!("Tried to write to file {condition}."),
)))
};
if let Ok(runtime) = tokio::runtime::Handle::try_current() {
let mut data: Vec<u8> = buf.to_vec();
let data_len = data.len();
loop {
match &mut self.inner_state {
LocalUploadState::Idle(file) => {
let file = Arc::clone(file);
let file2 = Arc::clone(&file);
let data: Vec<u8> = std::mem::take(&mut data);
self.inner_state = LocalUploadState::Writing(
file,
Box::pin(
runtime
.spawn_blocking(move || (&*file2).write_all(&data))
.map(move |res| match res {
Err(err) => {
Err(io::Error::new(ErrorKind::Other, err))
}
Ok(res) => res.map(move |_| data_len),
}),
),
);
}
LocalUploadState::Writing(file, inner_write) => {
let res = ready!(inner_write.poll_unpin(cx));
self.inner_state = LocalUploadState::Idle(Arc::clone(file));
return Poll::Ready(res);
}
LocalUploadState::ShuttingDown(_) => {
return invalid_state("when writer is shutting down");
}
LocalUploadState::Committing(_) => {
return invalid_state("when writer is committing data");
}
LocalUploadState::Complete => {
return invalid_state("when writer is complete");
}
}
}
} else if let LocalUploadState::Idle(file) = &self.inner_state {
let file = Arc::clone(file);
(&*file).write_all(buf)?;
Poll::Ready(Ok(buf.len()))
} else {
invalid_state("when writer is already complete.")
}
}
fn poll_flush(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), io::Error>> {
if let Ok(runtime) = tokio::runtime::Handle::try_current() {
loop {
match &mut self.inner_state {
LocalUploadState::Idle(file) => {
let file = Arc::clone(file);
self.inner_state = LocalUploadState::ShuttingDown(Box::pin(
runtime.spawn_blocking(move || (*file).sync_all()).map(
move |res| match res {
Err(err) => {
Err(io::Error::new(io::ErrorKind::Other, err))
}
Ok(res) => res,
},
),
));
}
LocalUploadState::ShuttingDown(fut) => match fut.poll_unpin(cx) {
Poll::Ready(res) => {
res?;
let staging_path =
staged_upload_path(&self.dest, &self.multipart_id);
let dest = self.dest.clone();
self.inner_state = LocalUploadState::Committing(Box::pin(
runtime
.spawn_blocking(move || {
std::fs::rename(&staging_path, &dest)
})
.map(move |res| match res {
Err(err) => {
Err(io::Error::new(io::ErrorKind::Other, err))
}
Ok(res) => res,
}),
));
}
Poll::Pending => {
return Poll::Pending;
}
},
LocalUploadState::Writing(_, _) => {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Tried to commit a file where a write is in progress.",
)));
}
LocalUploadState::Committing(fut) => {
let res = ready!(fut.poll_unpin(cx));
self.inner_state = LocalUploadState::Complete;
return Poll::Ready(res);
}
LocalUploadState::Complete => {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
"Already complete",
)))
}
}
}
} else {
let staging_path = staged_upload_path(&self.dest, &self.multipart_id);
match &mut self.inner_state {
LocalUploadState::Idle(file) => {
let file = Arc::clone(file);
self.inner_state = LocalUploadState::Complete;
file.sync_all()?;
drop(file);
std::fs::rename(staging_path, &self.dest)?;
Poll::Ready(Ok(()))
}
_ => {
Poll::Ready(Err(io::Error::new(ErrorKind::Other, "Already complete")))
}
}
}
}
}
impl Drop for LocalUpload {
fn drop(&mut self) {
match self.inner_state {
LocalUploadState::Complete => (),
_ => {
self.inner_state = LocalUploadState::Complete;
let path = staged_upload_path(&self.dest, &self.multipart_id);
match tokio::runtime::Handle::try_current() {
Ok(r) => drop(r.spawn_blocking(move || std::fs::remove_file(path))),
Err(_) => drop(std::fs::remove_file(path)),
};
}
}
}
}
pub(crate) fn chunked_stream(
mut file: File,
path: PathBuf,
range: Range<usize>,
chunk_size: usize,
) -> BoxStream<'static, Result<Bytes, super::Error>> {
futures::stream::once(async move {
let (file, path) = maybe_spawn_blocking(move || {
file.seek(SeekFrom::Start(range.start as _))
.map_err(|source| Error::Seek {
source,
path: path.clone(),
})?;
Ok((file, path))
})
.await?;
let stream = futures::stream::try_unfold(
(file, path, range.end - range.start),
move |(mut file, path, remaining)| {
maybe_spawn_blocking(move || {
if remaining == 0 {
return Ok(None);
}
let to_read = remaining.min(chunk_size);
let mut buffer = Vec::with_capacity(to_read);
let read = (&mut file)
.take(to_read as u64)
.read_to_end(&mut buffer)
.map_err(|e| Error::UnableToReadBytes {
source: e,
path: path.clone(),
})?;
Ok(Some((buffer.into(), (file, path, remaining - read))))
})
},
);
Ok::<_, super::Error>(stream)
})
.try_flatten()
.boxed()
}
pub(crate) fn read_range(
file: &mut File,
path: &PathBuf,
range: Range<usize>,
) -> Result<Bytes> {
let to_read = range.end - range.start;
file.seek(SeekFrom::Start(range.start as u64))
.context(SeekSnafu { path })?;
let mut buf = Vec::with_capacity(to_read);
let read = file
.take(to_read as u64)
.read_to_end(&mut buf)
.context(UnableToReadBytesSnafu { path })?;
ensure!(
read == to_read,
OutOfRangeSnafu {
path,
expected: to_read,
actual: read
}
);
Ok(buf.into())
}
fn open_file(path: &PathBuf) -> Result<(File, Metadata)> {
let ret = match File::open(path).and_then(|f| Ok((f.metadata()?, f))) {
Err(e) => Err(match e.kind() {
ErrorKind::NotFound => Error::NotFound {
path: path.clone(),
source: e,
},
_ => Error::UnableToOpenFile {
path: path.clone(),
source: e,
},
}),
Ok((metadata, file)) => match !metadata.is_dir() {
true => Ok((file, metadata)),
false => Err(Error::NotFound {
path: path.clone(),
source: io::Error::new(ErrorKind::NotFound, "is directory"),
}),
},
}?;
Ok(ret)
}
fn convert_entry(entry: DirEntry, location: Path) -> Result<ObjectMeta> {
let metadata = entry.metadata().map_err(|e| Error::Metadata {
source: e.into(),
path: location.to_string(),
})?;
convert_metadata(metadata, location)
}
fn last_modified(metadata: &std::fs::Metadata) -> DateTime<Utc> {
metadata
.modified()
.expect("Modified file time should be supported on this platform")
.into()
}
fn convert_metadata(metadata: Metadata, location: Path) -> Result<ObjectMeta> {
let last_modified = last_modified(&metadata);
let size = usize::try_from(metadata.len()).context(FileSizeOverflowedUsizeSnafu {
path: location.as_ref(),
})?;
Ok(ObjectMeta {
location,
last_modified,
size,
e_tag: None,
})
}
fn convert_walkdir_result(
res: std::result::Result<DirEntry, walkdir::Error>,
) -> Result<Option<DirEntry>> {
match res {
Ok(entry) => {
match symlink_metadata(entry.path()) {
Ok(attr) => {
if attr.is_symlink() {
let target_metadata = metadata(entry.path());
match target_metadata {
Ok(_) => {
Ok(Some(entry))
}
Err(_) => {
Ok(None)
}
}
} else {
Ok(Some(entry))
}
}
Err(_) => Ok(None),
}
}
Err(walkdir_err) => match walkdir_err.io_error() {
Some(io_err) => match io_err.kind() {
ErrorKind::NotFound => Ok(None),
_ => Err(Error::UnableToWalkDir {
source: walkdir_err,
}
.into()),
},
None => Err(Error::UnableToWalkDir {
source: walkdir_err,
}
.into()),
},
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_util::flatten_list_stream;
use crate::tests::*;
use futures::TryStreamExt;
use tempfile::{NamedTempFile, TempDir};
use tokio::io::AsyncWriteExt;
#[tokio::test]
async fn file_test() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
put_get_delete_list(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
copy_if_not_exists(&integration).await;
stream_get(&integration).await;
}
#[test]
fn test_non_tokio() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
futures::executor::block_on(async move {
put_get_delete_list(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
stream_get(&integration).await;
});
}
#[tokio::test]
async fn creates_dir_if_not_present() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
let location = Path::from("nested/file/test_file");
let data = Bytes::from("arbitrary data");
let expected_data = data.clone();
integration.put(&location, data).await.unwrap();
let read_data = integration
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, expected_data);
}
#[tokio::test]
async fn unknown_length() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
let location = Path::from("some_file");
let data = Bytes::from("arbitrary data");
let expected_data = data.clone();
integration.put(&location, data).await.unwrap();
let read_data = integration
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, expected_data);
}
#[tokio::test]
#[cfg(target_family = "unix")]
#[ignore]
async fn bubble_up_io_errors() {
use std::{fs::set_permissions, os::unix::prelude::PermissionsExt};
let root = TempDir::new().unwrap();
let metadata = root.path().metadata().unwrap();
let mut permissions = metadata.permissions();
permissions.set_mode(0o000);
set_permissions(root.path(), permissions).unwrap();
let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
match store.list(None).await {
Err(_) => {
}
Ok(mut stream) => {
let mut any_err = false;
while let Some(res) = stream.next().await {
if res.is_err() {
any_err = true;
}
}
assert!(any_err);
}
}
assert!(store.list_with_delimiter(None).await.is_err());
}
const NON_EXISTENT_NAME: &str = "nonexistentname";
#[tokio::test]
async fn get_nonexistent_location() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
let location = Path::from(NON_EXISTENT_NAME);
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
if let crate::Error::NotFound { path, source } = err {
let source_variant = source.downcast_ref::<std::io::Error>();
assert!(
matches!(source_variant, Some(std::io::Error { .. }),),
"got: {source_variant:?}"
);
assert!(path.ends_with(NON_EXISTENT_NAME), "{}", path);
} else {
panic!("unexpected error type: {err:?}");
}
}
#[tokio::test]
async fn root() {
let integration = LocalFileSystem::new();
let canonical = std::path::Path::new("Cargo.toml").canonicalize().unwrap();
let url = Url::from_directory_path(&canonical).unwrap();
let path = Path::parse(url.path()).unwrap();
let roundtrip = integration.config.path_to_filesystem(&path).unwrap();
let roundtrip = roundtrip.canonicalize().unwrap();
assert_eq!(roundtrip, canonical);
integration.head(&path).await.unwrap();
}
#[tokio::test]
#[cfg(target_family = "windows")]
async fn test_list_root() {
let fs = LocalFileSystem::new();
let r = fs.list_with_delimiter(None).await.unwrap_err().to_string();
assert!(
r.contains("Unable to convert URL \"file:///\" to filesystem path"),
"{}",
r
);
}
#[tokio::test]
#[cfg(target_os = "linux")]
async fn test_list_root() {
let fs = LocalFileSystem::new();
fs.list_with_delimiter(None).await.unwrap();
}
async fn check_list(
integration: &LocalFileSystem,
prefix: Option<&Path>,
expected: &[&str],
) {
let result: Vec<_> = integration
.list(prefix)
.await
.unwrap()
.try_collect()
.await
.unwrap();
let mut strings: Vec<_> = result.iter().map(|x| x.location.as_ref()).collect();
strings.sort_unstable();
assert_eq!(&strings, expected)
}
#[tokio::test]
#[cfg(target_family = "unix")]
async fn test_symlink() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
let subdir = root.path().join("a");
std::fs::create_dir(&subdir).unwrap();
let file = subdir.join("file.parquet");
std::fs::write(file, "test").unwrap();
check_list(&integration, None, &["a/file.parquet"]).await;
integration
.head(&Path::from("a/file.parquet"))
.await
.unwrap();
let other = NamedTempFile::new().unwrap();
std::os::unix::fs::symlink(other.path(), root.path().join("test.parquet"))
.unwrap();
check_list(&integration, None, &["a/file.parquet", "test.parquet"]).await;
integration.head(&Path::from("test.parquet")).await.unwrap();
std::os::unix::fs::symlink(&subdir, root.path().join("b")).unwrap();
check_list(
&integration,
None,
&["a/file.parquet", "b/file.parquet", "test.parquet"],
)
.await;
check_list(&integration, Some(&Path::from("b")), &["b/file.parquet"]).await;
integration
.head(&Path::from("b/file.parquet"))
.await
.unwrap();
std::os::unix::fs::symlink(
root.path().join("foo.parquet"),
root.path().join("c"),
)
.unwrap();
check_list(
&integration,
None,
&["a/file.parquet", "b/file.parquet", "test.parquet"],
)
.await;
let mut r = integration.list_with_delimiter(None).await.unwrap();
r.common_prefixes.sort_unstable();
assert_eq!(r.common_prefixes.len(), 2);
assert_eq!(r.common_prefixes[0].as_ref(), "a");
assert_eq!(r.common_prefixes[1].as_ref(), "b");
assert_eq!(r.objects.len(), 1);
assert_eq!(r.objects[0].location.as_ref(), "test.parquet");
let r = integration
.list_with_delimiter(Some(&Path::from("a")))
.await
.unwrap();
assert_eq!(r.common_prefixes.len(), 0);
assert_eq!(r.objects.len(), 1);
assert_eq!(r.objects[0].location.as_ref(), "a/file.parquet");
integration
.delete(&Path::from("test.parquet"))
.await
.unwrap();
assert!(other.path().exists());
check_list(&integration, None, &["a/file.parquet", "b/file.parquet"]).await;
integration
.delete(&Path::from("b/file.parquet"))
.await
.unwrap();
check_list(&integration, None, &[]).await;
integration
.put(&Path::from("b/file.parquet"), Bytes::from(vec![0, 1, 2]))
.await
.unwrap();
check_list(&integration, None, &["a/file.parquet", "b/file.parquet"]).await;
}
#[tokio::test]
async fn invalid_path() {
let root = TempDir::new().unwrap();
let root = root.path().join("🙀");
std::fs::create_dir(root.clone()).unwrap();
let integration = LocalFileSystem::new_with_prefix(root.clone()).unwrap();
let directory = Path::from("directory");
let object = directory.child("child.txt");
let data = Bytes::from("arbitrary");
integration.put(&object, data.clone()).await.unwrap();
integration.head(&object).await.unwrap();
let result = integration.get(&object).await.unwrap();
assert_eq!(result.bytes().await.unwrap(), data);
flatten_list_stream(&integration, None).await.unwrap();
flatten_list_stream(&integration, Some(&directory))
.await
.unwrap();
let result = integration
.list_with_delimiter(Some(&directory))
.await
.unwrap();
assert_eq!(result.objects.len(), 1);
assert!(result.common_prefixes.is_empty());
assert_eq!(result.objects[0].location, object);
let illegal = root.join("💀");
std::fs::write(illegal, "foo").unwrap();
flatten_list_stream(&integration, Some(&directory))
.await
.unwrap();
let err = flatten_list_stream(&integration, None)
.await
.unwrap_err()
.to_string();
assert!(
err.contains("Encountered illegal character sequence \"💀\" whilst parsing path segment \"💀\""),
"{}",
err
);
}
#[tokio::test]
async fn list_hides_incomplete_uploads() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
let location = Path::from("some_file");
let data = Bytes::from("arbitrary data");
let (multipart_id, mut writer) =
integration.put_multipart(&location).await.unwrap();
writer.write_all(&data).await.unwrap();
let (multipart_id_2, mut writer_2) =
integration.put_multipart(&location).await.unwrap();
assert_ne!(multipart_id, multipart_id_2);
writer_2.write_all(&data).await.unwrap();
let list = flatten_list_stream(&integration, None).await.unwrap();
assert_eq!(list.len(), 0);
assert_eq!(
integration
.list_with_delimiter(None)
.await
.unwrap()
.objects
.len(),
0
);
}
#[tokio::test]
async fn filesystem_filename_with_percent() {
let temp_dir = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap();
let filename = "L%3ABC.parquet";
std::fs::write(temp_dir.path().join(filename), "foo").unwrap();
let list_stream = integration.list(None).await.unwrap();
let res: Vec<_> = list_stream.try_collect().await.unwrap();
assert_eq!(res.len(), 1);
assert_eq!(res[0].location.as_ref(), filename);
let res = integration.list_with_delimiter(None).await.unwrap();
assert_eq!(res.objects.len(), 1);
assert_eq!(res.objects[0].location.as_ref(), filename);
}
#[tokio::test]
async fn relative_paths() {
LocalFileSystem::new_with_prefix(".").unwrap();
LocalFileSystem::new_with_prefix("..").unwrap();
LocalFileSystem::new_with_prefix("../..").unwrap();
let integration = LocalFileSystem::new();
let path = Path::from_filesystem_path(".").unwrap();
integration.list_with_delimiter(Some(&path)).await.unwrap();
}
}
#[cfg(not(target_arch = "wasm32"))]
#[cfg(test)]
mod not_wasm_tests {
use crate::local::LocalFileSystem;
use crate::{ObjectStore, Path};
use bytes::Bytes;
use std::time::Duration;
use tempfile::TempDir;
use tokio::io::AsyncWriteExt;
#[tokio::test]
async fn creates_dir_if_not_present_append() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
let location = Path::from("nested/file/test_file");
let data = Bytes::from("arbitrary data");
let expected_data = data.clone();
let mut writer = integration.append(&location).await.unwrap();
writer.write_all(data.as_ref()).await.unwrap();
let read_data = integration
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, expected_data);
}
#[tokio::test]
async fn unknown_length_append() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
let location = Path::from("some_file");
let data = Bytes::from("arbitrary data");
let expected_data = data.clone();
let mut writer = integration.append(&location).await.unwrap();
writer.write_all(data.as_ref()).await.unwrap();
writer.flush().await.unwrap();
let read_data = integration
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, expected_data);
}
#[tokio::test]
async fn multiple_append() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
let location = Path::from("some_file");
let data = vec![
Bytes::from("arbitrary"),
Bytes::from("data"),
Bytes::from("gnz"),
];
let mut writer = integration.append(&location).await.unwrap();
for d in &data {
writer.write_all(d).await.unwrap();
}
let mut writer = integration.append(&location).await.unwrap();
for d in &data {
writer.write_all(d).await.unwrap();
}
let read_data = integration
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
let expected_data = Bytes::from("arbitrarydatagnzarbitrarydatagnz");
assert_eq!(&*read_data, expected_data);
}
#[tokio::test]
async fn test_cleanup_intermediate_files() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
let location = Path::from("some_file");
let (_, mut writer) = integration.put_multipart(&location).await.unwrap();
writer.write_all(b"hello").await.unwrap();
let file_count = std::fs::read_dir(root.path()).unwrap().count();
assert_eq!(file_count, 1);
drop(writer);
tokio::time::sleep(Duration::from_millis(1)).await;
let file_count = std::fs::read_dir(root.path()).unwrap().count();
assert_eq!(file_count, 0);
}
}
#[cfg(target_family = "unix")]
#[cfg(test)]
mod unix_test {
use crate::local::LocalFileSystem;
use crate::{ObjectStore, Path};
use nix::sys::stat;
use nix::unistd;
use std::fs::OpenOptions;
use tempfile::TempDir;
#[tokio::test]
async fn test_fifo() {
let filename = "some_file";
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
let path = root.path().join(filename);
unistd::mkfifo(&path, stat::Mode::S_IRWXU).unwrap();
let location = Path::from(filename);
integration.head(&location).await.unwrap();
let spawned = tokio::task::spawn_blocking(|| {
OpenOptions::new().write(true).open(path).unwrap();
});
integration.get(&location).await.unwrap();
spawned.await.unwrap();
}
}