1use std::{fmt::Display, io::Cursor, sync::Arc};
2
3use aliyun_oss_client::{Bucket, Client, EndPoint, Error as OssError, Key, Object, Secret};
4use async_trait::async_trait;
5use bytes::Bytes;
6use futures_util::{stream::BoxStream, StreamExt as _};
7use object_store::{
8 path::Path, Attribute, Attributes, CopyMode, CopyOptions, Error, GetOptions, GetResult,
9 GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions,
10 PutOptions, PutPayload, PutResult, Result,
11};
12
13mod list;
14mod multipart;
15mod put_payload;
16use put_payload::BuiltinPutPayload;
17
18#[derive(Debug)]
19pub struct AliyunOssObjectStore {
20 bucket: Bucket,
21}
22
23impl Display for AliyunOssObjectStore {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 write!(f, "AliyunOssObjectStore")
26 }
27}
28
29const STORE: &str = "AliyunOssObjectStore";
30
31pub fn map_oss_error(err: OssError) -> Error {
33 map_oss_error_at(err, None)
34}
35
36fn map_oss_error_at(err: OssError, path: Option<&Path>) -> Error {
37 match (err.service_code(), path) {
38 (Some("NoSuchKey"), Some(path)) => Error::NotFound {
39 path: path.to_string(),
40 source: Box::new(err),
41 },
42 _ => Error::Generic {
43 store: STORE,
44 source: Box::new(err),
45 },
46 }
47}
48
49fn to_object_store_error(err: OssError, path: &Path) -> Error {
50 map_oss_error_at(err, Some(path))
51}
52
53impl AliyunOssObjectStore {
54 pub fn new(bucket: Bucket) -> Self {
55 Self { bucket }
56 }
57
58 pub fn try_new<K, S, E>(
60 key: K,
61 secret: S,
62 endpoint: E,
63 bucket: impl AsRef<str>,
64 ) -> Result<Self, Error>
65 where
66 K: Into<Key>,
67 S: Into<Secret>,
68 E: TryInto<EndPoint>,
69 OssError: From<E::Error>,
70 {
71 let client = Client::new(key, secret, endpoint).map_err(map_oss_error)?;
72 Self::try_from_client(client, bucket)
73 }
74
75 pub fn try_from_env(bucket: impl AsRef<str>) -> Result<Self, Error> {
77 let client = Client::from_env().map_err(|e| map_oss_error(e.into()))?;
78 Self::try_from_client(client, bucket)
79 }
80
81 pub fn try_from_client(client: Client, bucket: impl AsRef<str>) -> Result<Self, Error> {
83 let bucket = client.bucket(bucket.as_ref()).map_err(map_oss_error)?;
84 Ok(Self::new(bucket))
85 }
86
87 pub fn object(&self, path: &Path) -> Object {
88 Object::new(path.to_string(), Arc::new(self.bucket.clone()))
89 }
90}
91
92#[async_trait]
93impl ObjectStore for AliyunOssObjectStore {
94 async fn put_opts(
95 &self,
96 location: &Path,
97 payload: PutPayload,
98 opts: PutOptions,
99 ) -> Result<PutResult> {
100 let mut object = self.object(location);
101
102 if let Some(content_type) = opts.attributes.get(&Attribute::ContentType) {
103 object = object.content_type(content_type.as_ref());
104 }
105
106 let etag = object
107 .upload_with_etag(BuiltinPutPayload::new(payload))
108 .await
109 .map_err(|e| to_object_store_error(e, location))?;
110
111 Ok(PutResult {
112 e_tag: Some(etag),
113 version: None,
114 })
115 }
116
117 async fn put_multipart_opts(
118 &self,
119 location: &Path,
120 _opts: PutMultipartOptions,
121 ) -> Result<Box<dyn MultipartUpload>> {
122 let upload =
123 multipart::OssMultipartUpload::new(location.clone(), Arc::new(self.bucket.clone()))
124 .await?;
125 Ok(Box::new(upload))
126 }
127
128 async fn get_opts(&self, location: &Path, opts: GetOptions) -> Result<GetResult> {
129 let object = self.object(location);
130
131 let info = object
132 .get_info()
133 .await
134 .map_err(|e| to_object_store_error(e, location))?;
135
136 let meta = ObjectMeta {
137 location: location.clone(),
138 last_modified: *info.last_modified(),
139 size: info.size(),
140 e_tag: Some(info.etag().to_string()),
141 version: None,
142 };
143
144 opts.check_preconditions(&meta)?;
145
146 if opts.version.is_some() {
147 return Err(Error::NotImplemented {
148 operation: "get with version".into(),
149 implementer: "AliyunOssObjectStore".into(),
150 });
151 }
152
153 let range = match &opts.range {
154 Some(r) => r.as_range(meta.size).map_err(|source| Error::Generic {
155 store: "AliyunOssObjectStore",
156 source: Box::new(source),
157 })?,
158 None => 0..meta.size,
159 };
160
161 if opts.head {
162 return Ok(GetResult {
163 payload: GetResultPayload::Stream(futures_util::stream::empty().boxed()),
164 meta,
165 range,
166 attributes: Attributes::new(),
167 });
168 }
169
170 let stream = if range == (0..meta.size) {
171 object
172 .download_stream()
173 .await
174 .map_err(|e| to_object_store_error(e, location))?
175 .map(|chunk| {
176 chunk.map_err(|e| Error::Generic {
177 store: "AliyunOssObjectStore",
178 source: Box::new(e),
179 })
180 })
181 .boxed()
182 } else {
183 let mut buf = Cursor::new(Vec::with_capacity(meta.size as usize));
184 object
185 .download(&mut buf)
186 .await
187 .map_err(|e| to_object_store_error(e, location))?;
188
189 let bytes = Bytes::from(buf.into_inner());
190 let start = range.start as usize;
191 let end = range.end as usize;
192 let data = bytes.slice(start..end.min(bytes.len()));
193 futures_util::stream::once(futures_util::future::ready(Ok(data))).boxed()
194 };
195
196 Ok(GetResult {
197 payload: GetResultPayload::Stream(stream),
198 meta,
199 range,
200 attributes: Attributes::new(),
201 })
202 }
203
204 fn delete_stream(
205 &self,
206 locations: BoxStream<'static, Result<Path, Error>>,
207 ) -> BoxStream<'static, Result<Path, Error>> {
208 let bucket = self.bucket.clone();
209 locations
210 .map(move |location| {
211 let bucket = bucket.clone();
212 async move {
213 let location = location?;
214 Object::new(location.to_string(), Arc::new(bucket))
215 .delete()
216 .await
217 .map_err(|e| to_object_store_error(e, &location))?;
218 Ok(location)
219 }
220 })
221 .buffered(10)
222 .boxed()
223 }
224
225 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
226 use async_stream::try_stream;
227 use list::{should_include, to_meta, ListedObject};
228
229 let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default();
230 let prefix_filter = prefix.cloned();
231
232 let mut bucket = self.bucket.clone();
233 if let Some(ref p) = prefix_filter {
234 bucket = bucket.prefix(p.as_ref());
235 }
236
237 try_stream! {
238 let mut objects = std::pin::pin!(bucket.objects_as_impl::<ListedObject>());
239 while let Some(item) = objects.next().await {
240 let obj = item.map_err(|e| Error::Generic {
241 store: "AliyunOssObjectStore",
242 source: Box::new(e),
243 })?;
244
245 let Some(meta) = to_meta(obj)? else {
246 continue;
247 };
248
249 if should_include(&meta.location, prefix_filter.as_ref(), prefix_len) {
250 yield meta;
251 }
252 }
253 }
254 .boxed()
255 }
256
257 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
258 list::fetch_list_with_delimiter(self.bucket.clone(), prefix).await
259 }
260
261 async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
262 if matches!(options.mode, CopyMode::Create) {
263 if self.object(to).get_info().await.is_ok() {
264 return Err(Error::AlreadyExists {
265 path: to.to_string(),
266 source: "destination object already exists".into(),
267 });
268 }
269 }
270
271 let copy_source = format!("/{}/{}", self.bucket.name(), from.as_ref());
272
273 self.object(to)
274 .copy_source(copy_source)
275 .copy()
276 .await
277 .map_err(|e| to_object_store_error(e, from))?;
278
279 Ok(())
280 }
281}