use std::fmt::{self, Debug, Display, Formatter};
use std::future::IntoFuture;
use std::io;
use std::sync::Arc;
use crate::utils::*;
use crate::{datetime_to_timestamp, timestamp_to_datetime};
use async_trait::async_trait;
use futures::FutureExt;
use futures::StreamExt;
use futures::TryStreamExt;
use futures::stream::BoxStream;
use object_store::ListResult;
use object_store::MultipartUpload;
use object_store::ObjectMeta;
use object_store::ObjectStore;
use object_store::PutMultipartOptions;
use object_store::PutOptions;
use object_store::PutPayload;
use object_store::PutResult;
use object_store::path::Path;
use object_store::{GetOptions, UploadPart};
use object_store::{GetRange, GetResultPayload};
use object_store::{GetResult, PutMode};
use opendal::Buffer;
use opendal::Writer;
use opendal::options::CopyOptions;
use opendal::raw::percent_decode_path;
use opendal::{Operator, OperatorInfo};
use std::collections::HashMap;
use tokio::sync::{Mutex, Notify};
#[derive(Clone)]
pub struct OpendalStore {
info: Arc<OperatorInfo>,
inner: Operator,
}
impl OpendalStore {
pub fn new(op: Operator) -> Self {
Self {
info: op.info().into(),
inner: op,
}
}
pub fn info(&self) -> &OperatorInfo {
self.info.as_ref()
}
async fn copy_request(
&self,
from: &Path,
to: &Path,
if_not_exists: bool,
) -> object_store::Result<()> {
let mut copy_options = CopyOptions::default();
if if_not_exists {
copy_options.if_not_exists = true;
}
self.inner
.copy_options(
&percent_decode_path(from.as_ref()),
&percent_decode_path(to.as_ref()),
copy_options,
)
.into_send()
.await
.map_err(|err| {
if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists {
object_store::Error::AlreadyExists {
path: to.to_string(),
source: Box::new(err),
}
} else {
format_object_store_error(err, from.as_ref())
}
})?;
Ok(())
}
}
impl Debug for OpendalStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OpendalStore")
.field("scheme", &self.info.scheme())
.field("name", &self.info.name())
.field("root", &self.info.root())
.field("capability", &self.info.full_capability())
.finish()
}
}
impl Display for OpendalStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let info = self.inner.info();
write!(
f,
"Opendal({}, bucket={}, root={})",
info.scheme(),
info.name(),
info.root()
)
}
}
impl From<Operator> for OpendalStore {
fn from(value: Operator) -> Self {
Self::new(value)
}
}
#[async_trait]
impl ObjectStore for OpendalStore {
async fn put_opts(
&self,
location: &Path,
bytes: PutPayload,
opts: PutOptions,
) -> object_store::Result<PutResult> {
let decoded_location = percent_decode_path(location.as_ref());
let mut future_write = self
.inner
.write_with(&decoded_location, Buffer::from_iter(bytes));
let opts_mode = opts.mode.clone();
match opts.mode {
PutMode::Overwrite => {}
PutMode::Create => {
future_write = future_write.if_not_exists(true);
}
PutMode::Update(update_version) => {
let Some(etag) = update_version.e_tag else {
Err(object_store::Error::NotSupported {
source: Box::new(opendal::Error::new(
opendal::ErrorKind::Unsupported,
"etag is required for conditional put",
)),
})?
};
future_write = future_write.if_match(etag.as_str());
}
}
let rp = future_write.into_send().await.map_err(|err| {
match format_object_store_error(err, location.as_ref()) {
object_store::Error::Precondition { path, source }
if opts_mode == PutMode::Create =>
{
object_store::Error::AlreadyExists { path, source }
}
e => e,
}
})?;
let e_tag = rp.etag().map(|s| s.to_string());
let version = rp.version().map(|s| s.to_string());
Ok(PutResult { e_tag, version })
}
async fn put_multipart(
&self,
location: &Path,
) -> object_store::Result<Box<dyn MultipartUpload>> {
let decoded_location = percent_decode_path(location.as_ref());
let writer = self
.inner
.writer_with(&decoded_location)
.concurrent(8)
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
let upload = OpendalMultipartUpload::new(writer, location.clone());
Ok(Box::new(upload))
}
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOptions,
) -> object_store::Result<Box<dyn MultipartUpload>> {
const DEFAULT_CONCURRENT: usize = 8;
let mut options = opendal::options::WriteOptions {
concurrent: DEFAULT_CONCURRENT,
..Default::default()
};
let mut user_metadata = HashMap::new();
for (key, value) in opts.attributes.iter() {
match key {
object_store::Attribute::CacheControl => {
options.cache_control = Some(value.to_string());
}
object_store::Attribute::ContentDisposition => {
options.content_disposition = Some(value.to_string());
}
object_store::Attribute::ContentEncoding => {
options.content_encoding = Some(value.to_string());
}
object_store::Attribute::ContentLanguage => {
continue;
}
object_store::Attribute::ContentType => {
options.content_type = Some(value.to_string());
}
object_store::Attribute::Metadata(k) => {
user_metadata.insert(k.to_string(), value.to_string());
}
_ => {}
}
}
if !user_metadata.is_empty() {
options.user_metadata = Some(user_metadata);
}
let decoded_location = percent_decode_path(location.as_ref());
let writer = self
.inner
.writer_options(&decoded_location, options)
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
let upload = OpendalMultipartUpload::new(writer, location.clone());
Ok(Box::new(upload))
}
async fn get_opts(
&self,
location: &Path,
options: GetOptions,
) -> object_store::Result<GetResult> {
let raw_location = percent_decode_path(location.as_ref());
let meta = {
let mut s = self.inner.stat_with(&raw_location);
if let Some(version) = &options.version {
s = s.version(version.as_str())
}
if let Some(if_match) = &options.if_match {
s = s.if_match(if_match.as_str());
}
if let Some(if_none_match) = &options.if_none_match {
s = s.if_none_match(if_none_match.as_str());
}
if let Some(if_modified_since) =
options.if_modified_since.and_then(datetime_to_timestamp)
{
s = s.if_modified_since(if_modified_since);
}
if let Some(if_unmodified_since) =
options.if_unmodified_since.and_then(datetime_to_timestamp)
{
s = s.if_unmodified_since(if_unmodified_since);
}
s.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?
};
let mut attributes = object_store::Attributes::new();
if let Some(user_meta) = meta.user_metadata() {
for (key, value) in user_meta {
attributes.insert(
object_store::Attribute::Metadata(key.clone().into()),
value.clone().into(),
);
}
}
let meta = ObjectMeta {
location: location.clone(),
last_modified: meta
.last_modified()
.and_then(timestamp_to_datetime)
.unwrap_or_default(),
size: meta.content_length(),
e_tag: meta.etag().map(|x| x.to_string()),
version: meta.version().map(|x| x.to_string()),
};
if options.head {
return Ok(GetResult {
payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
range: 0..0,
meta,
attributes,
});
}
let reader = {
let mut r = self.inner.reader_with(raw_location.as_ref());
if let Some(version) = options.version {
r = r.version(version.as_str());
}
if let Some(if_match) = options.if_match {
r = r.if_match(if_match.as_str());
}
if let Some(if_none_match) = options.if_none_match {
r = r.if_none_match(if_none_match.as_str());
}
if let Some(if_modified_since) =
options.if_modified_since.and_then(datetime_to_timestamp)
{
r = r.if_modified_since(if_modified_since);
}
if let Some(if_unmodified_since) =
options.if_unmodified_since.and_then(datetime_to_timestamp)
{
r = r.if_unmodified_since(if_unmodified_since);
}
r.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?
};
let read_range = match options.range {
Some(GetRange::Bounded(r)) => {
if r.start >= r.end || r.start >= meta.size {
0..0
} else {
let end = r.end.min(meta.size);
r.start..end
}
}
Some(GetRange::Offset(r)) => {
if r < meta.size {
r..meta.size
} else {
0..0
}
}
Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size,
_ => 0..meta.size,
};
let stream = reader
.into_bytes_stream(read_range.start..read_range.end)
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?
.into_send()
.map_err(|err: io::Error| object_store::Error::Generic {
store: "IoError",
source: Box::new(err),
});
Ok(GetResult {
payload: GetResultPayload::Stream(Box::pin(stream)),
range: read_range.start..read_range.end,
meta,
attributes,
})
}
async fn delete(&self, location: &Path) -> object_store::Result<()> {
let decoded_location = percent_decode_path(location.as_ref());
self.inner
.delete(&decoded_location)
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
Ok(())
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
let path = prefix.map_or("".into(), |x| {
format!("{}/", percent_decode_path(x.as_ref()))
});
let this = self.clone();
let fut = async move {
let stream = this
.inner
.lister_with(&path)
.recursive(true)
.await
.map_err(|err| format_object_store_error(err, &path))?;
let stream = stream.then(|res| async {
let entry = res.map_err(|err| format_object_store_error(err, ""))?;
let meta = entry.metadata();
Ok(format_object_meta(entry.path(), meta))
});
Ok::<_, object_store::Error>(stream)
};
fut.into_stream().try_flatten().into_send().boxed()
}
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
let path = prefix.map_or("".into(), |x| {
format!("{}/", percent_decode_path(x.as_ref()))
});
let offset = offset.clone();
let this = self.clone();
let fut = async move {
let list_with_start_after = this.inner.info().full_capability().list_with_start_after;
let mut fut = this.inner.lister_with(&path).recursive(true);
if list_with_start_after {
fut = fut.start_after(offset.as_ref());
}
let lister = fut
.await
.map_err(|err| format_object_store_error(err, &path))?
.then(move |entry| {
let path = path.clone();
let this = this.clone();
async move {
let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
let (path, metadata) = entry.into_parts();
if metadata.is_dir() || metadata.last_modified().is_some() {
let object_meta = format_object_meta(&path, &metadata);
return Ok(object_meta);
}
let metadata = this
.inner
.stat(&path)
.await
.map_err(|err| format_object_store_error(err, &path))?;
let object_meta = format_object_meta(&path, &metadata);
Ok::<_, object_store::Error>(object_meta)
}
})
.into_send()
.boxed();
let stream = if list_with_start_after {
lister
} else {
lister
.try_filter(move |entry| futures::future::ready(entry.location > offset))
.into_send()
.boxed()
};
Ok::<_, object_store::Error>(stream)
};
fut.into_stream().into_send().try_flatten().boxed()
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
let path = prefix.map_or("".into(), |x| {
format!("{}/", percent_decode_path(x.as_ref()))
});
let mut stream = self
.inner
.lister_with(&path)
.into_future()
.into_send()
.await
.map_err(|err| format_object_store_error(err, &path))?
.into_send();
let mut common_prefixes = Vec::new();
let mut objects = Vec::new();
while let Some(res) = stream.next().into_send().await {
let entry = res.map_err(|err| format_object_store_error(err, ""))?;
let meta = entry.metadata();
if meta.is_dir() {
common_prefixes.push(entry.path().into());
} else if meta.last_modified().is_some() {
objects.push(format_object_meta(entry.path(), meta));
} else {
let meta = self
.inner
.stat(entry.path())
.into_send()
.await
.map_err(|err| format_object_store_error(err, entry.path()))?;
objects.push(format_object_meta(entry.path(), &meta));
}
}
Ok(ListResult {
common_prefixes,
objects,
})
}
async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.copy_request(from, to, false).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.copy_request(from, to, true).await
}
async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner
.rename(
&percent_decode_path(from.as_ref()),
&percent_decode_path(to.as_ref()),
)
.into_send()
.await
.map_err(|err| format_object_store_error(err, from.as_ref()))?;
Ok(())
}
}
struct OpendalMultipartUpload {
writer: Arc<Mutex<Writer>>,
location: Path,
next_notify: Option<Arc<Notify>>,
}
impl OpendalMultipartUpload {
fn new(writer: Writer, location: Path) -> Self {
Self {
writer: Arc::new(Mutex::new(writer)),
location,
next_notify: None,
}
}
}
#[async_trait]
impl MultipartUpload for OpendalMultipartUpload {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
let writer = self.writer.clone();
let location = self.location.clone();
let next_notify = Arc::new(Notify::new());
let current_notify = self.next_notify.replace(next_notify.clone());
async move {
if let Some(notify) = current_notify {
notify.notified().await;
}
let mut writer = writer.lock().await;
let result = writer
.write(Buffer::from_iter(data.into_iter()))
.await
.map_err(|err| format_object_store_error(err, location.as_ref()));
next_notify.notify_one();
result
}
.into_send()
.boxed()
}
async fn complete(&mut self) -> object_store::Result<PutResult> {
let mut writer = self.writer.lock().await;
let metadata = writer
.close()
.into_send()
.await
.map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
let e_tag = metadata.etag().map(|s| s.to_string());
let version = metadata.version().map(|s| s.to_string());
Ok(PutResult { e_tag, version })
}
async fn abort(&mut self) -> object_store::Result<()> {
let mut writer = self.writer.lock().await;
writer
.abort()
.into_send()
.await
.map_err(|err| format_object_store_error(err, self.location.as_ref()))
}
}
impl Debug for OpendalMultipartUpload {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("OpendalMultipartUpload")
.field("location", &self.location)
.finish()
}
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use object_store::path::Path;
use object_store::{ObjectStore, WriteMultipart};
use opendal::services;
use rand::prelude::*;
use std::sync::Arc;
use super::*;
async fn create_test_object_store() -> Arc<dyn ObjectStore> {
let op = Operator::new(services::Memory::default()).unwrap().finish();
let object_store = Arc::new(OpendalStore::new(op));
let path: Path = "data/test.txt".into();
let bytes = Bytes::from_static(b"hello, world!");
object_store.put(&path, bytes.into()).await.unwrap();
let path: Path = "data/nested/test.txt".into();
let bytes = Bytes::from_static(b"hello, world! I am nested.");
object_store.put(&path, bytes.into()).await.unwrap();
object_store
}
#[tokio::test]
async fn test_basic() {
let op = Operator::new(services::Memory::default()).unwrap().finish();
let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
let path: Path = "data/test.txt".into();
let bytes = Bytes::from_static(b"hello, world!");
object_store.put(&path, bytes.clone().into()).await.unwrap();
let meta = object_store.head(&path).await.unwrap();
assert_eq!(meta.size, 13);
assert_eq!(
object_store
.get(&path)
.await
.unwrap()
.bytes()
.await
.unwrap(),
bytes
);
}
#[tokio::test]
async fn test_put_multipart() {
let op = Operator::new(services::Memory::default()).unwrap().finish();
let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
let mut rng = thread_rng();
let path: Path = "data/test_complete.txt".into();
let upload = object_store.put_multipart(&path).await.unwrap();
let mut write = WriteMultipart::new(upload);
let mut all_bytes = vec![];
let round = rng.gen_range(1..=1024);
for _ in 0..round {
let size = rng.gen_range(1..=1024);
let mut bytes = vec![0; size];
rng.fill_bytes(&mut bytes);
all_bytes.extend_from_slice(&bytes);
write.put(bytes.into());
}
let _ = write.finish().await.unwrap();
let meta = object_store.head(&path).await.unwrap();
assert_eq!(meta.size, all_bytes.len() as u64);
assert_eq!(
object_store
.get(&path)
.await
.unwrap()
.bytes()
.await
.unwrap(),
Bytes::from(all_bytes)
);
let path: Path = "data/test_abort.txt".into();
let mut upload = object_store.put_multipart(&path).await.unwrap();
upload.put_part(vec![1; 1024].into()).await.unwrap();
upload.abort().await.unwrap();
let res = object_store.head(&path).await;
let err = res.unwrap_err();
assert!(matches!(err, object_store::Error::NotFound { .. }))
}
#[tokio::test]
async fn test_list() {
let object_store = create_test_object_store().await;
let path: Path = "data/".into();
let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
assert_eq!(results.len(), 2);
let mut locations = results
.iter()
.map(|x| x.as_ref().unwrap().location.as_ref())
.collect::<Vec<_>>();
let expected_files = vec![
(
"data/nested/test.txt",
Bytes::from_static(b"hello, world! I am nested."),
),
("data/test.txt", Bytes::from_static(b"hello, world!")),
];
let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
locations.sort();
assert_eq!(locations, expected_locations);
for (location, bytes) in expected_files {
let path: Path = location.into();
assert_eq!(
object_store
.get(&path)
.await
.unwrap()
.bytes()
.await
.unwrap(),
bytes
);
}
}
#[tokio::test]
async fn test_list_with_delimiter() {
let object_store = create_test_object_store().await;
let path: Path = "data/".into();
let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
assert_eq!(result.objects.len(), 1);
assert_eq!(result.common_prefixes.len(), 1);
assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
}
#[tokio::test]
async fn test_list_with_offset() {
let object_store = create_test_object_store().await;
let path: Path = "data/".into();
let offset: Path = "data/nested/test.txt".into();
let result = object_store
.list_with_offset(Some(&path), &offset)
.collect::<Vec<_>>()
.await;
assert_eq!(result.len(), 1);
assert_eq!(
result[0].as_ref().unwrap().location.as_ref(),
"data/test.txt"
);
}
}