use std::fmt::Debug;
use std::sync::Arc;
use object_store::ObjectStore;
use object_store::path::Path as ObjectStorePath;
use opendal::Error;
use opendal::ErrorKind;
use opendal::raw::oio::BatchDeleter;
use opendal::raw::oio::MultipartWriter;
use opendal::raw::*;
use opendal::*;
mod core;
mod deleter;
mod error;
mod lister;
mod reader;
mod writer;
use deleter::ObjectStoreDeleter;
use error::parse_error;
use lister::ObjectStoreLister;
use reader::ObjectStoreReader;
use writer::ObjectStoreWriter;
use crate::service::core::format_metadata as parse_metadata;
use crate::service::core::parse_op_stat;
pub const OBJECT_STORE_SCHEME: &str = "object_store";
#[derive(Default)]
pub struct ObjectStoreBuilder {
store: Option<Arc<dyn ObjectStore + 'static>>,
}
impl Debug for ObjectStoreBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut d = f.debug_struct("ObjectStoreBuilder");
d.finish_non_exhaustive()
}
}
impl ObjectStoreBuilder {
pub fn new(store: Arc<dyn ObjectStore + 'static>) -> Self {
Self { store: Some(store) }
}
}
impl Builder for ObjectStoreBuilder {
type Config = ();
fn build(self) -> Result<impl Access> {
let store = self.store.ok_or_else(|| {
Error::new(ErrorKind::ConfigInvalid, "object store is required")
.with_context("service", OBJECT_STORE_SCHEME)
})?;
Ok(ObjectStoreService { store })
}
}
pub struct ObjectStoreService {
store: Arc<dyn ObjectStore + 'static>,
}
impl Debug for ObjectStoreService {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut d = f.debug_struct("ObjectStoreBackend");
d.finish_non_exhaustive()
}
}
impl Access for ObjectStoreService {
type Reader = ObjectStoreReader;
type Writer = MultipartWriter<ObjectStoreWriter>;
type Lister = ObjectStoreLister;
type Deleter = BatchDeleter<ObjectStoreDeleter>;
fn info(&self) -> Arc<AccessorInfo> {
let info = AccessorInfo::default();
info.set_scheme(OBJECT_STORE_SCHEME)
.set_root("/")
.set_name("object_store")
.set_native_capability(Capability {
stat: true,
stat_with_if_match: true,
stat_with_if_unmodified_since: true,
read: true,
write: true,
delete: true,
list: true,
list_with_limit: true,
list_with_start_after: true,
delete_with_version: false,
..Default::default()
});
Arc::new(info)
}
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let path = ObjectStorePath::from(path);
let opts = parse_op_stat(&args)?;
let result = self
.store
.get_opts(&path, opts)
.await
.map_err(parse_error)?;
let metadata = parse_metadata(&result.meta);
Ok(RpStat::new(metadata))
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let reader = ObjectStoreReader::new(self.store.clone(), path, args).await?;
Ok((reader.rp(), reader))
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let writer = ObjectStoreWriter::new(self.store.clone(), path, args);
Ok((
RpWrite::default(),
MultipartWriter::new(self.info(), writer, 10),
))
}
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
let deleter = BatchDeleter::new(ObjectStoreDeleter::new(self.store.clone()));
Ok((RpDelete::default(), deleter))
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let lister = ObjectStoreLister::new(self.store.clone(), path, args).await?;
Ok((RpList::default(), lister))
}
}
#[cfg(test)]
mod tests {
use super::*;
use object_store::memory::InMemory;
use opendal::Buffer;
use opendal::raw::oio::{Delete, List, Read, Write};
#[tokio::test]
async fn test_object_store_backend_builder() {
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let builder = ObjectStoreBuilder::new(store);
let backend = builder.build().expect("build should succeed");
assert!(backend.info().scheme() == OBJECT_STORE_SCHEME);
}
#[tokio::test]
async fn test_object_store_backend_info() {
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let backend = ObjectStoreBuilder::new(store)
.build()
.expect("build should succeed");
let info = backend.info();
assert_eq!(info.scheme(), "object_store");
assert_eq!(info.name(), "object_store".into());
assert_eq!(info.root(), "/".into());
let cap = info.native_capability();
assert!(cap.stat);
assert!(cap.read);
assert!(cap.write);
assert!(cap.delete);
assert!(cap.list);
}
#[tokio::test]
async fn test_object_store_backend_basic_operations() {
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let backend = ObjectStoreBuilder::new(store.clone())
.build()
.expect("build should succeed");
let path = "test_file.txt";
let content = b"Hello, world!";
let (_, mut writer) = backend
.write(path, OpWrite::default())
.await
.expect("write should succeed");
writer
.write(Buffer::from(&content[..]))
.await
.expect("write content should succeed");
writer.close().await.expect("close should succeed");
let stat_result = backend
.stat(path, OpStat::default())
.await
.expect("stat should succeed");
assert_eq!(
stat_result.into_metadata().content_length(),
content.len() as u64
);
let (_, mut reader) = backend
.read(path, OpRead::default())
.await
.expect("read should succeed");
let buf = reader.read().await.expect("read should succeed");
assert_eq!(buf.to_vec(), content);
}
#[tokio::test]
async fn test_object_store_backend_multipart_upload() {
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let backend = ObjectStoreBuilder::new(store.clone())
.build()
.expect("build should succeed");
let path = "test_file.txt";
let content =
b"Hello, multipart upload! This is a test content for multipart upload functionality.";
let content_len = content.len();
let (_, mut writer) = backend
.write(path, OpWrite::default())
.await
.expect("write should succeed");
let chunk_size = 20;
for chunk in content.chunks(chunk_size) {
writer
.write(Buffer::from(chunk))
.await
.expect("write chunk should succeed");
}
writer.close().await.expect("close should succeed");
let stat_result = backend
.stat(path, OpStat::default())
.await
.expect("stat should succeed");
assert_eq!(
stat_result.into_metadata().content_length(),
content_len as u64
);
let (_, mut reader) = backend
.read(path, OpRead::default())
.await
.expect("read should succeed");
let buf = reader.read().await.expect("read should succeed");
assert_eq!(buf.to_vec(), content);
}
#[tokio::test]
async fn test_object_store_backend_list() {
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let backend = ObjectStoreBuilder::new(store.clone())
.build()
.expect("build should succeed");
let files = vec![
("dir1/file1.txt", b"content1"),
("dir1/file2.txt", b"content2"),
("dir2/file3.txt", b"content3"),
];
for (path, content) in &files {
let (_, mut writer) = backend
.write(path, OpWrite::default())
.await
.expect("write should succeed");
writer
.write(Buffer::from(&content[..]))
.await
.expect("write content should succeed");
writer.close().await.expect("close should succeed");
}
let (_, mut lister) = backend
.list("dir1/", OpList::default())
.await
.expect("list should succeed");
let mut entries = Vec::new();
while let Some(entry) = lister.next().await.expect("next should succeed") {
entries.push(entry);
}
assert_eq!(entries.len(), 2);
assert!(entries.iter().any(|e| e.path() == "dir1/file1.txt"));
assert!(entries.iter().any(|e| e.path() == "dir1/file2.txt"));
}
#[tokio::test]
async fn test_object_store_backend_delete() {
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let backend = ObjectStoreBuilder::new(store)
.build()
.expect("build should succeed");
let path = "test_delete.txt";
let content = b"To be deleted";
let (_, mut writer) = backend
.write(path, OpWrite::default())
.await
.expect("write should succeed");
writer
.write(Buffer::from(&content[..]))
.await
.expect("write content should succeed");
writer.close().await.expect("close should succeed");
backend
.stat(path, OpStat::default())
.await
.expect("file should exist");
let (_, mut deleter) = backend.delete().await.expect("delete should succeed");
deleter
.delete(path, OpDelete::default())
.expect("delete should succeed");
deleter.flush().await.expect("flush should succeed");
let result = backend.stat(path, OpStat::default()).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_object_store_backend_error_handling() {
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let backend = ObjectStoreBuilder::new(store)
.build()
.expect("build should succeed");
let result = backend.stat("non_existent.txt", OpStat::default()).await;
assert!(result.is_err());
let result = backend.read("non_existent.txt", OpRead::default()).await;
assert!(result.is_err());
let result = backend.list("non_existent_dir/", OpList::default()).await;
if let Ok((_, mut lister)) = result {
let entry = lister.next().await.expect("next should succeed");
assert!(entry.is_none());
}
}
}