use std::ops::Range;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::DateTime;
use chrono::NaiveDateTime;
use chrono::Utc;
use futures::stream::BoxStream;
use futures::Stream;
use object_store::path::Path;
use object_store::GetResult;
use object_store::ListResult;
use object_store::MultipartId;
use object_store::ObjectMeta;
use object_store::ObjectStore;
use object_store::Result;
use opendal::Operator;
use opendal::Reader;
use tokio::io::AsyncWrite;
#[derive(Debug)]
pub struct OpendalStore {
inner: Operator,
}
impl OpendalStore {
pub fn new(op: Operator) -> Self {
Self { inner: op }
}
}
impl std::fmt::Display for OpendalStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "OpenDAL({:?})", self.inner)
}
}
#[async_trait]
impl ObjectStore for OpendalStore {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
Ok(self
.inner
.write(location.as_ref(), bytes)
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?)
}
async fn put_multipart(
&self,
_location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
Err(object_store::Error::NotSupported {
source: Box::new(opendal::Error::new(
opendal::ErrorKind::Unsupported,
"put_multipart is not implemented so far",
)),
})
}
async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> {
Err(object_store::Error::NotSupported {
source: Box::new(opendal::Error::new(
opendal::ErrorKind::Unsupported,
"abort_multipart is not implemented so far",
)),
})
}
async fn get(&self, location: &Path) -> Result<GetResult> {
let r = self
.inner
.reader(location.as_ref())
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
Ok(GetResult::Stream(Box::pin(OpendalReader { inner: r })))
}
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let bs = self
.inner
.range_read(location.as_ref(), range.start as u64..range.end as u64)
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
Ok(Bytes::from(bs))
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let meta = self
.inner
.stat(location.as_ref())
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
let (secs, nsecs) = meta
.last_modified()
.map(|v| (v.unix_timestamp(), v.nanosecond()))
.unwrap_or((0, 0));
Ok(ObjectMeta {
location: location.clone(),
last_modified: DateTime::from_utc(
NaiveDateTime::from_timestamp_opt(secs, nsecs)
.expect("returning timestamp must be valid"),
Utc,
),
size: meta.content_length() as usize,
})
}
async fn delete(&self, location: &Path) -> Result<()> {
self.inner
.delete(location.as_ref())
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
Ok(())
}
async fn list(&self, _prefix: Option<&Path>) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
todo!()
}
async fn list_with_delimiter(&self, _prefix: Option<&Path>) -> Result<ListResult> {
todo!()
}
async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
Err(object_store::Error::NotSupported {
source: Box::new(opendal::Error::new(
opendal::ErrorKind::Unsupported,
"copy is not implemented so far",
)),
})
}
async fn rename(&self, _from: &Path, _to: &Path) -> Result<()> {
Err(object_store::Error::NotSupported {
source: Box::new(opendal::Error::new(
opendal::ErrorKind::Unsupported,
"rename is not implemented so far",
)),
})
}
async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> {
Err(object_store::Error::NotSupported {
source: Box::new(opendal::Error::new(
opendal::ErrorKind::Unsupported,
"copy_if_not_exists is not implemented so far",
)),
})
}
}
fn format_object_store_error(err: opendal::Error, path: &str) -> object_store::Error {
use opendal::ErrorKind;
match err.kind() {
ErrorKind::NotFound => object_store::Error::NotFound {
path: path.to_string(),
source: Box::new(err),
},
ErrorKind::Unsupported => object_store::Error::NotSupported {
source: Box::new(err),
},
ErrorKind::AlreadyExists => object_store::Error::AlreadyExists {
path: path.to_string(),
source: Box::new(err),
},
kind => object_store::Error::Generic {
store: kind.into_static(),
source: Box::new(err),
},
}
}
struct OpendalReader {
inner: Reader,
}
impl Stream for OpendalReader {
type Item = Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use opendal::raw::oio::Read;
self.inner
.poll_next(cx)
.map_err(|err| object_store::Error::Generic {
store: "IoError",
source: Box::new(err),
})
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use object_store::path::Path;
use object_store::ObjectStore;
use opendal::services;
use super::*;
#[tokio::test]
async fn test_basic() {
let op = Operator::new(services::Memory::default()).unwrap().finish();
let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
let path: Path = "data/test.txt".try_into().unwrap();
let bytes = Bytes::from_static(b"hello, world!");
object_store.put(&path, bytes).await.unwrap();
let meta = object_store.head(&path).await.unwrap();
assert_eq!(meta.size, 13)
}
}