huaweicloud_sdk_rust_obs/
object.rs

1use async_trait::async_trait;
2use base64::{engine::general_purpose, Engine};
3use md5::{Digest, Md5};
4use reqwest::{
5    header::{HeaderMap, HeaderValue},
6    Method,
7};
8use std::collections::HashMap;
9use urlencoding::encode;
10
11use crate::{
12    client::Client,
13    error::{status_to_response, ObsError},
14    model::{
15        bucket::copy_object::CopyObjectResult,
16        delete_object::{Boolean, Delete, Item, Object, ResponseMode},
17        object::{NextPosition, ObjectMeta},
18    },
19};
20
21#[async_trait]
22pub trait ObjectTrait {
23    /// PUT上传
24    async fn put_object<S: AsRef<str> + Send>(
25        &self,
26        bucket: S,
27        key: S,
28        object: &[u8],
29    ) -> Result<(), ObsError>;
30
31    /// 复制对象
32    async fn copy_object<S1, S2, S3>(
33        &self,
34        bucket: S1,
35        src: S2,
36        dest: S3,
37    ) -> Result<CopyObjectResult, ObsError>
38    where
39        S1: AsRef<str> + Send,
40        S2: AsRef<str> + Send,
41        S3: AsRef<str> + Send;
42
43    /// 删除对象
44    async fn delete_object<S: AsRef<str> + Send>(&self, bucket: S, key: S) -> Result<(), ObsError>;
45
46    /// 批量删除对象
47    async fn delete_objects<S: AsRef<str> + Send, K: IntoIterator<Item = S> + Send>(
48        &self,
49        bucket: S,
50        keys: K,
51        response_mode: ResponseMode,
52    ) -> Result<(), ObsError>;
53
54    /// 获取对象内容
55    async fn get_object<S: AsRef<str> + Send>(
56        &self,
57        bucket: S,
58        key: S,
59    ) -> Result<bytes::Bytes, ObsError>;
60
61    /// 获取对象元数据
62    async fn get_object_metadata<S: AsRef<str> + Send>(
63        &self,
64        bucket: S,
65        key: S,
66    ) -> Result<ObjectMeta, ObsError>;
67
68    /// 追加写对象
69    async fn append_object<S: AsRef<str> + Send>(
70        &self,
71        bucket: S,
72        key: S,
73        appended: &[u8],
74        position: u64,
75    ) -> Result<NextPosition, ObsError>;
76}
77
78#[async_trait]
79impl ObjectTrait for Client {
80    /// PUT上传
81    async fn put_object<S: AsRef<str> + Send>(
82        &self,
83        bucket: S,
84        key: S,
85        object: &[u8],
86    ) -> Result<(), ObsError> {
87        let mut with_headers = HeaderMap::new();
88        with_headers.insert(
89            "Content-Length",
90            HeaderValue::from_str(format!("{}", object.len()).as_str()).unwrap(),
91        );
92        let resp = self
93            .do_action(
94                Method::PUT,
95                bucket,
96                key,
97                Some(with_headers),
98                None,
99                Some(object.to_owned()),
100            )
101            .await?;
102        let status = resp.status();
103        let text = resp.text().await?;
104
105        if status.is_success() {
106            Ok(())
107        } else {
108            // 尝试解析错误响应
109            match serde_xml_rs::from_str::<crate::model::ErrorResponse>(&text) {
110                Ok(error_response) => Err(crate::error::ObsError::Response {
111                    status,
112                    message: error_response.message,
113                }),
114                Err(_) => {
115                    // 如果无法解析错误响应,则返回原始文本
116                    Err(crate::error::ObsError::Response {
117                        status,
118                        message: text,
119                    })
120                }
121            }
122        }
123    }
124
125    async fn append_object<S: AsRef<str> + Send>(
126        &self,
127        bucket: S,
128        key: S,
129        appended: &[u8],
130        position: u64,
131    ) -> Result<NextPosition, ObsError> {
132        let mut params = HashMap::new();
133        params.insert("append".to_string(), "".into());
134        params.insert("position".into(), position.to_string());
135        let mut with_headers = HeaderMap::new();
136        with_headers.insert(
137            "Content-Length",
138            HeaderValue::from_str(format!("{}", appended.len()).as_str()).unwrap(),
139        );
140        let resp = self
141            .do_action(
142                Method::POST,
143                bucket,
144                key,
145                Some(with_headers),
146                Some(params),
147                Some(appended.to_owned()),
148            )
149            .await?;
150        let status = resp.status();
151        let headers = resp.headers().clone();
152        if status.is_success() {
153            let next_position = if let Some(next) = headers.get("x-obs-next-append-position") {
154                let next = String::from_utf8_lossy(next.as_bytes()).to_string();
155                next.parse::<u64>().ok()
156            } else {
157                None
158            };
159            Ok(next_position as NextPosition)
160        } else {
161            Err(ObsError::Response {
162                status,
163                message: "response error".into(),
164            })
165        }
166    }
167
168    /// 复制对象
169    async fn copy_object<S1, S2, S3>(
170        &self,
171        bucket: S1,
172        src: S2,
173        dest: S3,
174    ) -> Result<CopyObjectResult, ObsError>
175    where
176        S1: AsRef<str> + Send,
177        S2: AsRef<str> + Send,
178        S3: AsRef<str> + Send,
179    {
180        let mut with_headers = HeaderMap::new();
181        let dest = dest.as_ref().trim_start_matches('/');
182        let src = src.as_ref().trim_start_matches('/');
183        let src = encode(src);
184        let copy_source = format!("/{}/{}", bucket.as_ref(), src);
185        with_headers.insert(
186            "x-obs-copy-source",
187            HeaderValue::from_str(&copy_source).unwrap(),
188        );
189
190        let resp = self
191            .do_action(
192                Method::PUT,
193                bucket,
194                dest,
195                Some(with_headers),
196                None,
197                None::<String>,
198            )
199            .await?;
200        let status = resp.status();
201        let text = resp.text().await?;
202        status_to_response::<CopyObjectResult>(status, text)
203        // match status {
204        //     StatusCode::OK => {
205        //         let r: CopyObjectResponse = serde_xml_rs::from_str(&text)?;
206        //         Ok(r)
207        //     }
208        //     StatusCode::FORBIDDEN => {
209        //         let er: ErrorResponse = serde_xml_rs::from_str(&text)?;
210        //         Err(ObsError::Response {
211        //             status: StatusCode::FORBIDDEN,
212        //             message: er.message,
213        //         })
214        //     }
215        //     _ => Err(ObsError::Unknown),
216        // }
217    }
218
219    /// 删除对象
220    async fn delete_object<S: AsRef<str> + Send>(&self, bucket: S, key: S) -> Result<(), ObsError> {
221        let _resp = self
222            .do_action(Method::DELETE, bucket, key, None, None, None::<String>)
223            .await?;
224        Ok(())
225    }
226    /// 批量删除对象
227    async fn delete_objects<S: AsRef<str> + Send, K: IntoIterator<Item = S> + Send>(
228        &self,
229        bucket: S,
230        keys: K,
231        response_mode: ResponseMode,
232    ) -> Result<(), ObsError> {
233        let mut with_headers = HeaderMap::new();
234        let mut params = HashMap::new();
235        params.insert("delete".to_string(), "".to_string());
236
237        let body = Delete {
238            quiet: response_mode.to_bool(),
239            item: keys
240                .into_iter()
241                .map(|key| {
242                    Item::Object(Object {
243                        key_name: key.as_ref().to_owned(),
244                        version_id: None,
245                    })
246                })
247                .collect::<Vec<Item>>(),
248        };
249        let body = serde_xml_rs::to_string(&body)?;
250        let mut hasher = Md5::new();
251        hasher.update(body.as_bytes());
252        let result = hasher.finalize();
253
254        let val = general_purpose::STANDARD.encode(result);
255
256        with_headers.insert("Content-MD5", HeaderValue::from_str(val.as_str()).unwrap());
257
258        with_headers.insert(
259            "Content-Length",
260            HeaderValue::from_str(format!("{}", body.len()).as_str()).unwrap(),
261        );
262        let _resp = self
263            .do_action(
264                Method::POST,
265                bucket,
266                "",
267                Some(with_headers),
268                Some(params),
269                Some(body),
270            )
271            .await?;
272        // dbg!(_resp.text().await?);
273        Ok(())
274    }
275
276    ///获取对象内容
277    async fn get_object<S: AsRef<str> + Send>(
278        &self,
279        bucket: S,
280        key: S,
281    ) -> Result<bytes::Bytes, ObsError> {
282        let resp = self
283            .do_action(Method::GET, bucket, key, None, None, None::<String>)
284            .await?
285            .bytes()
286            .await?;
287
288        Ok(resp)
289    }
290
291    /// 获取对象元数据
292    async fn get_object_metadata<S: AsRef<str> + Send>(
293        &self,
294        bucket: S,
295        key: S,
296    ) -> Result<ObjectMeta, ObsError> {
297        let resp = self
298            .do_action(Method::HEAD, bucket, key, None, None, None::<String>)
299            .await?;
300        let headers = resp.headers();
301        let mut data = HashMap::with_capacity(headers.len());
302        for (key, val) in headers {
303            data.insert(key.as_str(), val.to_str().unwrap());
304        }
305
306        let header_str = serde_json::to_string(&data).map_err(|_e| ObsError::ParseOrConvert)?;
307
308        let data: ObjectMeta =
309            serde_json::from_str(&header_str).map_err(|_e| ObsError::ParseOrConvert)?;
310
311        Ok(data)
312    }
313}