use bytes::Bytes;
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use std::ops::Range;
use tokio::io::AsyncWrite;
use crate::path::Path;
use crate::{
GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
Result as ObjectStoreResult,
};
#[derive(Debug, Clone)]
pub struct PrefixObjectStore<T: ObjectStore> {
prefix: Path,
inner: T,
}
impl<T: ObjectStore> std::fmt::Display for PrefixObjectStore<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PrefixObjectStore({})", self.prefix.as_ref())
}
}
impl<T: ObjectStore> PrefixObjectStore<T> {
pub fn new(store: T, prefix: impl Into<Path>) -> Self {
Self {
prefix: prefix.into(),
inner: store,
}
}
fn full_path(&self, location: &Path) -> Path {
self.prefix.parts().chain(location.parts()).collect()
}
fn strip_prefix(&self, path: &Path) -> Option<Path> {
Some(path.prefix_match(&self.prefix)?.collect())
}
}
#[async_trait::async_trait]
impl<T: ObjectStore> ObjectStore for PrefixObjectStore<T> {
async fn put(&self, location: &Path, bytes: Bytes) -> ObjectStoreResult<()> {
let full_path = self.full_path(location);
self.inner.put(&full_path, bytes).await
}
async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
let full_path = self.full_path(location);
self.inner.get(&full_path).await
}
async fn get_range(
&self,
location: &Path,
range: Range<usize>,
) -> ObjectStoreResult<Bytes> {
let full_path = self.full_path(location);
self.inner.get_range(&full_path, range).await
}
async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
let full_path = self.full_path(location);
self.inner.head(&full_path).await.map(|meta| ObjectMeta {
last_modified: meta.last_modified,
size: meta.size,
location: self.strip_prefix(&meta.location).unwrap_or(meta.location),
})
}
async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
let full_path = self.full_path(location);
self.inner.delete(&full_path).await
}
async fn list(
&self,
prefix: Option<&Path>,
) -> ObjectStoreResult<BoxStream<'_, ObjectStoreResult<ObjectMeta>>> {
Ok(self
.inner
.list(Some(&self.full_path(prefix.unwrap_or(&Path::from("/")))))
.await?
.map_ok(|meta| ObjectMeta {
last_modified: meta.last_modified,
size: meta.size,
location: self.strip_prefix(&meta.location).unwrap_or(meta.location),
})
.boxed())
}
async fn list_with_delimiter(
&self,
prefix: Option<&Path>,
) -> ObjectStoreResult<ListResult> {
self.inner
.list_with_delimiter(Some(
&self.full_path(prefix.unwrap_or(&Path::from("/"))),
))
.await
.map(|lst| ListResult {
common_prefixes: lst
.common_prefixes
.iter()
.filter_map(|p| self.strip_prefix(p))
.collect(),
objects: lst
.objects
.iter()
.filter_map(|meta| {
Some(ObjectMeta {
last_modified: meta.last_modified,
size: meta.size,
location: self.strip_prefix(&meta.location)?,
})
})
.collect(),
})
}
async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
let full_from = self.full_path(from);
let full_to = self.full_path(to);
self.inner.copy(&full_from, &full_to).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
let full_from = self.full_path(from);
let full_to = self.full_path(to);
self.inner.copy_if_not_exists(&full_from, &full_to).await
}
async fn rename_if_not_exists(
&self,
from: &Path,
to: &Path,
) -> ObjectStoreResult<()> {
let full_from = self.full_path(from);
let full_to = self.full_path(to);
self.inner.rename_if_not_exists(&full_from, &full_to).await
}
async fn put_multipart(
&self,
location: &Path,
) -> ObjectStoreResult<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
let full_path = self.full_path(location);
self.inner.put_multipart(&full_path).await
}
async fn abort_multipart(
&self,
location: &Path,
multipart_id: &MultipartId,
) -> ObjectStoreResult<()> {
let full_path = self.full_path(location);
self.inner.abort_multipart(&full_path, multipart_id).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::local::LocalFileSystem;
use crate::test_util::flatten_list_stream;
use crate::tests::{
copy_if_not_exists, list_uses_directories_correctly, list_with_delimiter,
put_get_delete_list, rename_and_copy, stream_get,
};
use tempfile::TempDir;
#[tokio::test]
async fn prefix_test() {
let root = TempDir::new().unwrap();
let inner = LocalFileSystem::new_with_prefix(root.path()).unwrap();
let integration = PrefixObjectStore::new(inner, "prefix");
put_get_delete_list(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
copy_if_not_exists(&integration).await;
stream_get(&integration).await;
}
#[tokio::test]
async fn prefix_test_applies_prefix() {
let tmpdir = TempDir::new().unwrap();
let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap();
let location = Path::from("prefix/test_file.json");
let data = Bytes::from("arbitrary data");
let expected_data = data.clone();
local.put(&location, data).await.unwrap();
let prefix = PrefixObjectStore::new(local, "prefix");
let location_prefix = Path::from("test_file.json");
let content_list = flatten_list_stream(&prefix, None).await.unwrap();
assert_eq!(content_list, &[location_prefix.clone()]);
let root = Path::from("/");
let content_list = flatten_list_stream(&prefix, Some(&root)).await.unwrap();
assert_eq!(content_list, &[location_prefix.clone()]);
let read_data = prefix
.get(&location_prefix)
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, expected_data);
let target_prefix = Path::from("/test_written.json");
prefix
.put(&target_prefix, expected_data.clone())
.await
.unwrap();
prefix.delete(&location_prefix).await.unwrap();
let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap();
let err = local.get(&location).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
let location = Path::from("prefix/test_written.json");
let read_data = local.get(&location).await.unwrap().bytes().await.unwrap();
assert_eq!(&*read_data, expected_data)
}
}