use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::path::Path;
use object_store::{
path, Error, GetResult, ObjectMeta, ObjectStore, PutMode, PutOptions, PutPayload, PutResult,
};
#[async_trait]
pub(crate) trait TransactionalObjectStore: Send + Sync {
async fn put_if_not_exists(&self, path: &Path, data: Bytes) -> Result<PutResult, Error>;
async fn get(&self, path: &Path) -> Result<GetResult, Error>;
async fn delete(&self, path: &Path) -> Result<(), Error>;
fn list(&self, path: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta, Error>>;
}
pub(crate) struct DelegatingTransactionalObjectStore {
root_path: Path,
object_store: Arc<dyn ObjectStore>,
}
impl DelegatingTransactionalObjectStore {
pub(crate) fn new(root_path: Path, object_store: Arc<dyn ObjectStore>) -> Self {
Self {
root_path,
object_store,
}
}
fn path(&self, path: &Path) -> Path {
Path::from(format!("{}/{}", self.root_path, path))
}
fn strip_root(&self, path: &Path) -> Result<Path, Error> {
let root_raw = self.root_path.to_string();
let path_raw = path.to_string();
if let Some(stripped) = path_raw.strip_prefix(root_raw.as_str()) {
return Ok(Path::from(stripped));
}
Err(Error::InvalidPath {
source: path::Error::PrefixMismatch {
path: path.to_string(),
prefix: self.root_path.to_string(),
},
})
}
}
#[async_trait]
impl TransactionalObjectStore for DelegatingTransactionalObjectStore {
async fn put_if_not_exists(&self, path: &Path, data: Bytes) -> Result<PutResult, Error> {
let path = self.path(path);
self.object_store
.put_opts(
&path,
PutPayload::from_bytes(data),
PutOptions::from(PutMode::Create),
)
.await
}
async fn get(&self, path: &Path) -> Result<GetResult, Error> {
let path = self.path(path);
self.object_store.get(&path).await
}
async fn delete(&self, path: &Path) -> Result<(), Error> {
let path = self.path(path);
self.object_store.delete(&path).await
}
fn list(&self, path: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta, Error>> {
let path = path.map_or(self.root_path.clone(), |p| self.path(p));
self.object_store
.list(Some(&path))
.map(|r| match r {
Ok(om) => Ok(ObjectMeta {
location: self.strip_root(&om.location)?,
last_modified: om.last_modified,
size: om.size,
e_tag: om.e_tag,
version: om.version,
}),
Err(err) => Err(err),
})
.boxed()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use bytes::Bytes;
use futures::StreamExt;
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::{ObjectStore, PutPayload};
use crate::transactional_object_store::{
DelegatingTransactionalObjectStore, TransactionalObjectStore,
};
const ROOT_PATH: &str = "/root/path";
#[tokio::test]
async fn test_delegating_should_fail_put_if_exists() {
let os = Arc::new(InMemory::new());
let txnl_os = DelegatingTransactionalObjectStore::new(Path::from(ROOT_PATH), os.clone());
txnl_os
.put_if_not_exists(
&Path::from("obj"),
Bytes::copy_from_slice("data1".as_bytes()),
)
.await
.unwrap();
let result = txnl_os
.put_if_not_exists(
&Path::from("obj"),
Bytes::copy_from_slice("data2".as_bytes()),
)
.await;
assert!(result.is_err());
assert!(matches!(
result.err().unwrap(),
object_store::Error::AlreadyExists { path: _, source: _ }
));
let result = txnl_os.get(&Path::from("obj")).await.unwrap();
assert_eq!(
result.bytes().await.unwrap(),
Bytes::copy_from_slice("data1".as_bytes())
);
}
#[tokio::test]
async fn test_delegating_should_get_put() {
let os = Arc::new(InMemory::new());
let txnl_os = DelegatingTransactionalObjectStore::new(Path::from(ROOT_PATH), os.clone());
txnl_os
.put_if_not_exists(
&Path::from("obj"),
Bytes::copy_from_slice("data1".as_bytes()),
)
.await
.unwrap();
let result = txnl_os.get(&Path::from("obj")).await.unwrap();
assert_eq!(
result.bytes().await.unwrap(),
Bytes::copy_from_slice("data1".as_bytes())
);
}
#[tokio::test]
async fn test_delegating_should_list() {
let os = Arc::new(InMemory::new());
let txnl_os = DelegatingTransactionalObjectStore::new(Path::from(ROOT_PATH), os.clone());
txnl_os
.put_if_not_exists(
&Path::from("obj"),
Bytes::copy_from_slice("data1".as_bytes()),
)
.await
.unwrap();
txnl_os
.put_if_not_exists(
&Path::from("foo/bar"),
Bytes::copy_from_slice("data1".as_bytes()),
)
.await
.unwrap();
os.put(&Path::from("biz/baz"), PutPayload::from("data1".as_bytes()))
.await
.unwrap();
let mut listing = txnl_os.list(None);
let item = listing.next().await.unwrap().unwrap();
assert_eq!(item.location, Path::from("foo/bar"));
let item = listing.next().await.unwrap().unwrap();
assert_eq!(item.location, Path::from("obj"));
assert!(listing.next().await.is_none());
}
#[tokio::test]
async fn test_delegating_should_put_with_prefix() {
let os = Arc::new(InMemory::new());
let txnl_os = DelegatingTransactionalObjectStore::new(Path::from(ROOT_PATH), os.clone());
txnl_os
.put_if_not_exists(
&Path::from("obj"),
Bytes::copy_from_slice("data1".as_bytes()),
)
.await
.unwrap();
let result = os.get(&Path::from("root/path/obj")).await.unwrap();
assert_eq!(
result.bytes().await.unwrap(),
Bytes::copy_from_slice("data1".as_bytes())
);
}
#[tokio::test]
async fn test_delegating_object_store_delete() {
let os = Arc::new(InMemory::new());
let txnl_os = DelegatingTransactionalObjectStore::new(Path::from(ROOT_PATH), os.clone());
txnl_os
.put_if_not_exists(
&Path::from("obj"),
Bytes::copy_from_slice("data1".as_bytes()),
)
.await
.unwrap();
let result = os.get(&Path::from("root/path/obj")).await.unwrap();
assert_eq!(
result.bytes().await.unwrap(),
Bytes::copy_from_slice("data1".as_bytes())
);
txnl_os.delete(&Path::from("obj")).await.unwrap();
let result = os.get(&Path::from("root/path/obj")).await;
assert!(result.is_err());
assert!(matches!(
result.err().unwrap(),
object_store::Error::NotFound { path: _, source: _ }
));
}
}