use bytes::Bytes;
use futures_util::{StreamExt, TryStreamExt, stream::BoxStream};
use std::ops::Range;
use crate::multipart::{MultipartStore, PartId};
use crate::path::Path;
use crate::{
CopyOptions, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta,
ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions, Result,
};
#[derive(Debug, Clone)]
pub struct PrefixStore<T> {
prefix: Path,
inner: T,
}
impl<T> std::fmt::Display for PrefixStore<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PrefixObjectStore({})", self.prefix.as_ref())
}
}
impl<T> PrefixStore<T> {
pub fn new(store: T, prefix: impl Into<Path>) -> Self {
Self {
prefix: prefix.into(),
inner: store,
}
}
fn full_path(&self, location: &Path) -> Path {
full_path(&self.prefix, location)
}
fn strip_prefix(&self, path: Path) -> Path {
strip_prefix(&self.prefix, path)
}
fn strip_meta(&self, meta: ObjectMeta) -> ObjectMeta {
strip_meta(&self.prefix, meta)
}
}
fn full_path(prefix: &Path, path: &Path) -> Path {
prefix.parts().chain(path.parts()).collect()
}
fn strip_prefix(prefix: &Path, path: Path) -> Path {
if let Some(suffix) = path.prefix_match(prefix) {
return suffix.collect();
}
path
}
fn strip_meta(prefix: &Path, meta: ObjectMeta) -> ObjectMeta {
ObjectMeta {
last_modified: meta.last_modified,
size: meta.size,
location: strip_prefix(prefix, meta.location),
e_tag: meta.e_tag,
version: None,
}
}
#[async_trait::async_trait]
#[deny(clippy::missing_trait_methods)]
impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
let full_path = self.full_path(location);
self.inner.put_opts(&full_path, payload, opts).await
}
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOptions,
) -> Result<Box<dyn MultipartUpload>> {
let full_path = self.full_path(location);
self.inner.put_multipart_opts(&full_path, opts).await
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let full_path = self.full_path(location);
self.inner.get_opts(&full_path, options).await
}
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
let full_path = self.full_path(location);
self.inner.get_ranges(&full_path, ranges).await
}
fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
) -> BoxStream<'static, Result<Path>> {
let prefix = self.prefix.clone();
let locations = locations
.map(move |location| location.map(|loc| full_path(&prefix, &loc)))
.boxed();
let prefix = self.prefix.clone();
self.inner
.delete_stream(locations)
.map(move |location| location.map(|loc| strip_prefix(&prefix, loc)))
.boxed()
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
let s = self.inner.list(Some(&prefix));
let slf_prefix = self.prefix.clone();
s.map_ok(move |meta| strip_meta(&slf_prefix, meta)).boxed()
}
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'static, Result<ObjectMeta>> {
let offset = self.full_path(offset);
let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
let s = self.inner.list_with_offset(Some(&prefix), &offset);
let slf_prefix = self.prefix.clone();
s.map_ok(move |meta| strip_meta(&slf_prefix, meta)).boxed()
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
self.inner
.list_with_delimiter(Some(&prefix))
.await
.map(|lst| ListResult {
common_prefixes: lst
.common_prefixes
.into_iter()
.map(|p| self.strip_prefix(p))
.collect(),
objects: lst
.objects
.into_iter()
.map(|meta| self.strip_meta(meta))
.collect(),
})
}
async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
let full_from = self.full_path(from);
let full_to = self.full_path(to);
self.inner.copy_opts(&full_from, &full_to, options).await
}
async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
let full_from = self.full_path(from);
let full_to = self.full_path(to);
self.inner.rename_opts(&full_from, &full_to, options).await
}
}
#[async_trait::async_trait]
impl<T: MultipartStore> MultipartStore for PrefixStore<T> {
async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
let full_path = self.full_path(path);
self.inner.create_multipart(&full_path).await
}
async fn put_part(
&self,
path: &Path,
id: &MultipartId,
part_idx: usize,
data: PutPayload,
) -> Result<PartId> {
let full_path = self.full_path(path);
self.inner.put_part(&full_path, id, part_idx, data).await
}
async fn complete_multipart(
&self,
path: &Path,
id: &MultipartId,
parts: Vec<PartId>,
) -> Result<PutResult> {
let full_path = self.full_path(path);
self.inner.complete_multipart(&full_path, id, parts).await
}
async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
let full_path = self.full_path(path);
self.inner.abort_multipart(&full_path, id).await
}
}
#[cfg(not(target_arch = "wasm32"))]
#[cfg(test)]
mod tests {
use std::slice;
use super::*;
use crate::local::LocalFileSystem;
use crate::memory::InMemory;
use crate::{ObjectStoreExt, integration::*};
use tempfile::TempDir;
#[tokio::test]
async fn prefix_test() {
let root = TempDir::new().unwrap();
let inner = LocalFileSystem::new_with_prefix(root.path()).unwrap();
let integration = PrefixStore::new(inner, "prefix");
put_get_delete_list(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
copy_if_not_exists(&integration).await;
stream_get(&integration).await;
}
#[tokio::test]
async fn prefix_test_applies_prefix() {
let tmpdir = TempDir::new().unwrap();
let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap();
let location = Path::from("prefix/test_file.json");
let data = Bytes::from("arbitrary data");
local.put(&location, data.clone().into()).await.unwrap();
let prefix = PrefixStore::new(local, "prefix");
let location_prefix = Path::from("test_file.json");
let content_list = flatten_list_stream(&prefix, None).await.unwrap();
assert_eq!(content_list, slice::from_ref(&location_prefix));
let root = Path::from("/");
let content_list = flatten_list_stream(&prefix, Some(&root)).await.unwrap();
assert_eq!(content_list, slice::from_ref(&location_prefix));
let read_data = prefix
.get(&location_prefix)
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, data);
let target_prefix = Path::from("/test_written.json");
prefix
.put(&target_prefix, data.clone().into())
.await
.unwrap();
prefix.delete(&location_prefix).await.unwrap();
let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap();
let err = local.get(&location).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
let location = Path::from("prefix/test_written.json");
let read_data = local.get(&location).await.unwrap().bytes().await.unwrap();
assert_eq!(&*read_data, data)
}
#[tokio::test]
async fn prefix_multipart() {
let store = PrefixStore::new(InMemory::new(), "prefix");
multipart(&store, &store).await;
multipart_out_of_order(&store).await;
multipart_race_condition(&store, true).await;
}
}