Skip to main content

aliyun_oss_object_store/
lib.rs

1use std::{fmt::Display, io::Cursor, sync::Arc};
2
3use aliyun_oss_client::{Bucket, Client, EndPoint, Error as OssError, Key, Object, Secret};
4use async_trait::async_trait;
5use bytes::Bytes;
6use futures_util::{stream::BoxStream, StreamExt as _};
7use object_store::{
8    path::Path, Attribute, Attributes, CopyMode, CopyOptions, Error, GetOptions, GetResult,
9    GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions,
10    PutOptions, PutPayload, PutResult, Result,
11};
12
13mod list;
14mod multipart;
15mod put_payload;
16use put_payload::BuiltinPutPayload;
17
18#[derive(Debug)]
19pub struct AliyunOssObjectStore {
20    bucket: Bucket,
21}
22
23impl Display for AliyunOssObjectStore {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        write!(f, "AliyunOssObjectStore")
26    }
27}
28
29const STORE: &str = "AliyunOssObjectStore";
30
31/// 将 [`OssError`] 转为 [`Error`],用于 `Client::from_env`、`bucket` 等尚未涉及对象路径的场景。
32pub fn map_oss_error(err: OssError) -> Error {
33    map_oss_error_at(err, None)
34}
35
36fn map_oss_error_at(err: OssError, path: Option<&Path>) -> Error {
37    match (err.service_code(), path) {
38        (Some("NoSuchKey"), Some(path)) => Error::NotFound {
39            path: path.to_string(),
40            source: Box::new(err),
41        },
42        _ => Error::Generic {
43            store: STORE,
44            source: Box::new(err),
45        },
46    }
47}
48
49fn to_object_store_error(err: OssError, path: &Path) -> Error {
50    map_oss_error_at(err, Some(path))
51}
52
53impl AliyunOssObjectStore {
54    pub fn new(bucket: Bucket) -> Self {
55        Self { bucket }
56    }
57
58    /// 使用 AccessKey、Secret 与 Endpoint 打开指定 Bucket。
59    pub fn try_new<K, S, E>(
60        key: K,
61        secret: S,
62        endpoint: E,
63        bucket: impl AsRef<str>,
64    ) -> Result<Self, Error>
65    where
66        K: Into<Key>,
67        S: Into<Secret>,
68        E: TryInto<EndPoint>,
69        OssError: From<E::Error>,
70    {
71        let client = Client::new(key, secret, endpoint).map_err(map_oss_error)?;
72        Self::try_from_client(client, bucket)
73    }
74
75    /// 从环境变量(`ALIYUN_KEY_ID`、`ALIYUN_KEY_SECRET`、`ALIYUN_ENDPOINT`)读取凭证并打开 Bucket。
76    pub fn try_from_env(bucket: impl AsRef<str>) -> Result<Self, Error> {
77        let client = Client::from_env().map_err(|e| map_oss_error(e.into()))?;
78        Self::try_from_client(client, bucket)
79    }
80
81    /// 使用已有的 [`Client`] 打开指定 Bucket(适用于 STS 等自定义构造方式)。
82    pub fn try_from_client(client: Client, bucket: impl AsRef<str>) -> Result<Self, Error> {
83        let bucket = client.bucket(bucket.as_ref()).map_err(map_oss_error)?;
84        Ok(Self::new(bucket))
85    }
86
87    pub fn object(&self, path: &Path) -> Object {
88        Object::new(path.to_string(), Arc::new(self.bucket.clone()))
89    }
90}
91
92#[async_trait]
93impl ObjectStore for AliyunOssObjectStore {
94    async fn put_opts(
95        &self,
96        location: &Path,
97        payload: PutPayload,
98        opts: PutOptions,
99    ) -> Result<PutResult> {
100        let mut object = self.object(location);
101
102        if let Some(content_type) = opts.attributes.get(&Attribute::ContentType) {
103            object = object.content_type(content_type.as_ref());
104        }
105
106        let etag = object
107            .upload_with_etag(BuiltinPutPayload::new(payload))
108            .await
109            .map_err(|e| to_object_store_error(e, location))?;
110
111        Ok(PutResult {
112            e_tag: Some(etag),
113            version: None,
114        })
115    }
116
117    async fn put_multipart_opts(
118        &self,
119        location: &Path,
120        _opts: PutMultipartOptions,
121    ) -> Result<Box<dyn MultipartUpload>> {
122        let upload =
123            multipart::OssMultipartUpload::new(location.clone(), Arc::new(self.bucket.clone()))
124                .await?;
125        Ok(Box::new(upload))
126    }
127
128    async fn get_opts(&self, location: &Path, opts: GetOptions) -> Result<GetResult> {
129        let object = self.object(location);
130
131        let info = object
132            .get_info()
133            .await
134            .map_err(|e| to_object_store_error(e, location))?;
135
136        let meta = ObjectMeta {
137            location: location.clone(),
138            last_modified: *info.last_modified(),
139            size: info.size(),
140            e_tag: Some(info.etag().to_string()),
141            version: None,
142        };
143
144        opts.check_preconditions(&meta)?;
145
146        if opts.version.is_some() {
147            return Err(Error::NotImplemented {
148                operation: "get with version".into(),
149                implementer: "AliyunOssObjectStore".into(),
150            });
151        }
152
153        let range = match &opts.range {
154            Some(r) => r.as_range(meta.size).map_err(|source| Error::Generic {
155                store: "AliyunOssObjectStore",
156                source: Box::new(source),
157            })?,
158            None => 0..meta.size,
159        };
160
161        if opts.head {
162            return Ok(GetResult {
163                payload: GetResultPayload::Stream(futures_util::stream::empty().boxed()),
164                meta,
165                range,
166                attributes: Attributes::new(),
167            });
168        }
169
170        let stream = if range == (0..meta.size) {
171            object
172                .download_stream()
173                .await
174                .map_err(|e| to_object_store_error(e, location))?
175                .map(|chunk| {
176                    chunk.map_err(|e| Error::Generic {
177                        store: "AliyunOssObjectStore",
178                        source: Box::new(e),
179                    })
180                })
181                .boxed()
182        } else {
183            let mut buf = Cursor::new(Vec::with_capacity(meta.size as usize));
184            object
185                .download(&mut buf)
186                .await
187                .map_err(|e| to_object_store_error(e, location))?;
188
189            let bytes = Bytes::from(buf.into_inner());
190            let start = range.start as usize;
191            let end = range.end as usize;
192            let data = bytes.slice(start..end.min(bytes.len()));
193            futures_util::stream::once(futures_util::future::ready(Ok(data))).boxed()
194        };
195
196        Ok(GetResult {
197            payload: GetResultPayload::Stream(stream),
198            meta,
199            range,
200            attributes: Attributes::new(),
201        })
202    }
203
204    fn delete_stream(
205        &self,
206        locations: BoxStream<'static, Result<Path, Error>>,
207    ) -> BoxStream<'static, Result<Path, Error>> {
208        let bucket = self.bucket.clone();
209        locations
210            .map(move |location| {
211                let bucket = bucket.clone();
212                async move {
213                    let location = location?;
214                    Object::new(location.to_string(), Arc::new(bucket))
215                        .delete()
216                        .await
217                        .map_err(|e| to_object_store_error(e, &location))?;
218                    Ok(location)
219                }
220            })
221            .buffered(10)
222            .boxed()
223    }
224
225    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
226        use async_stream::try_stream;
227        use list::{should_include, to_meta, ListedObject};
228
229        let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default();
230        let prefix_filter = prefix.cloned();
231
232        let mut bucket = self.bucket.clone();
233        if let Some(ref p) = prefix_filter {
234            bucket = bucket.prefix(p.as_ref());
235        }
236
237        try_stream! {
238            let mut objects = std::pin::pin!(bucket.objects_as_impl::<ListedObject>());
239            while let Some(item) = objects.next().await {
240                let obj = item.map_err(|e| Error::Generic {
241                    store: "AliyunOssObjectStore",
242                    source: Box::new(e),
243                })?;
244
245                let Some(meta) = to_meta(obj)? else {
246                    continue;
247                };
248
249                if should_include(&meta.location, prefix_filter.as_ref(), prefix_len) {
250                    yield meta;
251                }
252            }
253        }
254        .boxed()
255    }
256
257    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
258        list::fetch_list_with_delimiter(self.bucket.clone(), prefix).await
259    }
260
261    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
262        if matches!(options.mode, CopyMode::Create) {
263            if self.object(to).get_info().await.is_ok() {
264                return Err(Error::AlreadyExists {
265                    path: to.to_string(),
266                    source: "destination object already exists".into(),
267                });
268            }
269        }
270
271        let copy_source = format!("/{}/{}", self.bucket.name(), from.as_ref());
272
273        self.object(to)
274            .copy_source(copy_source)
275            .copy()
276            .await
277            .map_err(|e| to_object_store_error(e, from))?;
278
279        Ok(())
280    }
281}