aliyun-oss-object-store 0.0.2

Object store implementation for aliyun-oss-client
Documentation
use std::{fmt::Display, io::Cursor, sync::Arc};

use aliyun_oss_client::{Bucket, Client, EndPoint, Error as OssError, Key, Object, Secret};
use async_trait::async_trait;
use bytes::Bytes;
use futures_util::{stream::BoxStream, StreamExt as _};
use object_store::{
    path::Path, Attribute, Attributes, CopyMode, CopyOptions, Error, GetOptions, GetResult,
    GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions,
    PutOptions, PutPayload, PutResult, Result,
};

mod list;
mod multipart;
mod put_payload;
use put_payload::BuiltinPutPayload;

#[derive(Debug)]
pub struct AliyunOssObjectStore {
    bucket: Bucket,
}

impl Display for AliyunOssObjectStore {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "AliyunOssObjectStore")
    }
}

const STORE: &str = "AliyunOssObjectStore";

/// 将 [`OssError`] 转为 [`Error`],用于 `Client::from_env`、`bucket` 等尚未涉及对象路径的场景。
pub fn map_oss_error(err: OssError) -> Error {
    map_oss_error_at(err, None)
}

fn map_oss_error_at(err: OssError, path: Option<&Path>) -> Error {
    match (err.service_code(), path) {
        (Some("NoSuchKey"), Some(path)) => Error::NotFound {
            path: path.to_string(),
            source: Box::new(err),
        },
        _ => Error::Generic {
            store: STORE,
            source: Box::new(err),
        },
    }
}

fn to_object_store_error(err: OssError, path: &Path) -> Error {
    map_oss_error_at(err, Some(path))
}

impl AliyunOssObjectStore {
    pub fn new(bucket: Bucket) -> Self {
        Self { bucket }
    }

    /// 使用 AccessKey、Secret 与 Endpoint 打开指定 Bucket。
    pub fn try_new<K, S, E>(
        key: K,
        secret: S,
        endpoint: E,
        bucket: impl AsRef<str>,
    ) -> Result<Self, Error>
    where
        K: Into<Key>,
        S: Into<Secret>,
        E: TryInto<EndPoint>,
        OssError: From<E::Error>,
    {
        let client = Client::new(key, secret, endpoint).map_err(map_oss_error)?;
        Self::try_from_client(client, bucket)
    }

    /// 从环境变量(`ALIYUN_KEY_ID`、`ALIYUN_KEY_SECRET`、`ALIYUN_ENDPOINT`)读取凭证并打开 Bucket。
    pub fn try_from_env(bucket: impl AsRef<str>) -> Result<Self, Error> {
        let client = Client::from_env().map_err(|e| map_oss_error(e.into()))?;
        Self::try_from_client(client, bucket)
    }

    /// 使用已有的 [`Client`] 打开指定 Bucket(适用于 STS 等自定义构造方式)。
    pub fn try_from_client(client: Client, bucket: impl AsRef<str>) -> Result<Self, Error> {
        let bucket = client.bucket(bucket.as_ref()).map_err(map_oss_error)?;
        Ok(Self::new(bucket))
    }

    pub fn object(&self, path: &Path) -> Object {
        Object::new(path.to_string(), Arc::new(self.bucket.clone()))
    }
}

#[async_trait]
impl ObjectStore for AliyunOssObjectStore {
    async fn put_opts(
        &self,
        location: &Path,
        payload: PutPayload,
        opts: PutOptions,
    ) -> Result<PutResult> {
        let mut object = self.object(location);

        if let Some(content_type) = opts.attributes.get(&Attribute::ContentType) {
            object = object.content_type(content_type.as_ref());
        }

        let etag = object
            .upload_with_etag(BuiltinPutPayload::new(payload))
            .await
            .map_err(|e| to_object_store_error(e, location))?;

        Ok(PutResult {
            e_tag: Some(etag),
            version: None,
        })
    }

    async fn put_multipart_opts(
        &self,
        location: &Path,
        _opts: PutMultipartOptions,
    ) -> Result<Box<dyn MultipartUpload>> {
        let upload =
            multipart::OssMultipartUpload::new(location.clone(), Arc::new(self.bucket.clone()))
                .await?;
        Ok(Box::new(upload))
    }

    async fn get_opts(&self, location: &Path, opts: GetOptions) -> Result<GetResult> {
        let object = self.object(location);

        let info = object
            .get_info()
            .await
            .map_err(|e| to_object_store_error(e, location))?;

        let meta = ObjectMeta {
            location: location.clone(),
            last_modified: *info.last_modified(),
            size: info.size(),
            e_tag: Some(info.etag().to_string()),
            version: None,
        };

        opts.check_preconditions(&meta)?;

        if opts.version.is_some() {
            return Err(Error::NotImplemented {
                operation: "get with version".into(),
                implementer: "AliyunOssObjectStore".into(),
            });
        }

        let range = match &opts.range {
            Some(r) => r.as_range(meta.size).map_err(|source| Error::Generic {
                store: "AliyunOssObjectStore",
                source: Box::new(source),
            })?,
            None => 0..meta.size,
        };

        if opts.head {
            return Ok(GetResult {
                payload: GetResultPayload::Stream(futures_util::stream::empty().boxed()),
                meta,
                range,
                attributes: Attributes::new(),
            });
        }

        let stream = if range == (0..meta.size) {
            object
                .download_stream()
                .await
                .map_err(|e| to_object_store_error(e, location))?
                .map(|chunk| {
                    chunk.map_err(|e| Error::Generic {
                        store: "AliyunOssObjectStore",
                        source: Box::new(e),
                    })
                })
                .boxed()
        } else {
            let mut buf = Cursor::new(Vec::with_capacity(meta.size as usize));
            object
                .download(&mut buf)
                .await
                .map_err(|e| to_object_store_error(e, location))?;

            let bytes = Bytes::from(buf.into_inner());
            let start = range.start as usize;
            let end = range.end as usize;
            let data = bytes.slice(start..end.min(bytes.len()));
            futures_util::stream::once(futures_util::future::ready(Ok(data))).boxed()
        };

        Ok(GetResult {
            payload: GetResultPayload::Stream(stream),
            meta,
            range,
            attributes: Attributes::new(),
        })
    }

    fn delete_stream(
        &self,
        locations: BoxStream<'static, Result<Path, Error>>,
    ) -> BoxStream<'static, Result<Path, Error>> {
        let bucket = self.bucket.clone();
        locations
            .map(move |location| {
                let bucket = bucket.clone();
                async move {
                    let location = location?;
                    Object::new(location.to_string(), Arc::new(bucket))
                        .delete()
                        .await
                        .map_err(|e| to_object_store_error(e, &location))?;
                    Ok(location)
                }
            })
            .buffered(10)
            .boxed()
    }

    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
        use async_stream::try_stream;
        use list::{should_include, to_meta, ListedObject};

        let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default();
        let prefix_filter = prefix.cloned();

        let mut bucket = self.bucket.clone();
        if let Some(ref p) = prefix_filter {
            bucket = bucket.prefix(p.as_ref());
        }

        try_stream! {
            let mut objects = std::pin::pin!(bucket.objects_as_impl::<ListedObject>());
            while let Some(item) = objects.next().await {
                let obj = item.map_err(|e| Error::Generic {
                    store: "AliyunOssObjectStore",
                    source: Box::new(e),
                })?;

                let Some(meta) = to_meta(obj)? else {
                    continue;
                };

                if should_include(&meta.location, prefix_filter.as_ref(), prefix_len) {
                    yield meta;
                }
            }
        }
        .boxed()
    }

    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
        list::fetch_list_with_delimiter(self.bucket.clone(), prefix).await
    }

    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
        if matches!(options.mode, CopyMode::Create) {
            if self.object(to).get_info().await.is_ok() {
                return Err(Error::AlreadyExists {
                    path: to.to_string(),
                    source: "destination object already exists".into(),
                });
            }
        }

        let copy_source = format!("/{}/{}", self.bucket.name(), from.as_ref());

        self.object(to)
            .copy_source(copy_source)
            .copy()
            .await
            .map_err(|e| to_object_store_error(e, from))?;

        Ok(())
    }
}