huaweicloud_sdk_rust_obs/
object.rs

1use std::collections::HashMap;
2use base64::{engine::general_purpose, Engine};
3use md5::{Md5, Digest};
4use async_trait::async_trait;
5use reqwest::{ header::{ HeaderMap, HeaderValue }, Method };
6use urlencoding::encode;
7
8use crate::{
9    client::Client,
10    error::{ status_to_response, ObsError },
11    model::{
12        bucket::copy_object::CopyObjectResult,
13        object::{ NextPosition, ObjectMeta },
14        delete_object::{ Delete, Object, Item, ResponseMode, Boolean },
15    },
16};
17
18#[async_trait]
19pub trait ObjectTrait {
20    /// PUT上传
21    async fn put_object<S: AsRef<str> + Send>(
22        &self,
23        bucket: S,
24        key: S,
25        object: &[u8]
26    ) -> Result<(), ObsError>;
27
28    /// 复制对象
29    async fn copy_object<S1, S2, S3>(
30        &self,
31        bucket: S1,
32        src: S2,
33        dest: S3
34    )
35        -> Result<CopyObjectResult, ObsError>
36        where S1: AsRef<str> + Send, S2: AsRef<str> + Send, S3: AsRef<str> + Send;
37
38    /// 删除对象
39    async fn delete_object<S: AsRef<str> + Send>(&self, bucket: S, key: S) -> Result<(), ObsError>;
40
41    /// 批量删除对象
42    async fn delete_objects<S: AsRef<str> + Send, K: IntoIterator<Item = S> + Send>(
43        &self,
44        bucket: S,
45        keys: K,
46        response_mode: ResponseMode,
47    ) -> Result<(), ObsError>;
48
49    /// 获取对象内容
50    async fn get_object<S: AsRef<str> + Send>(
51        &self,
52        bucket: S,
53        key: S
54    ) -> Result<bytes::Bytes, ObsError>;
55
56    /// 获取对象元数据
57    async fn get_object_metadata<S: AsRef<str> + Send>(
58        &self,
59        bucket: S,
60        key: S
61    ) -> Result<ObjectMeta, ObsError>;
62
63    /// 追加写对象
64    async fn append_object<S: AsRef<str> + Send>(
65        &self,
66        bucket: S,
67        key: S,
68        appended: &[u8],
69        position: u64
70    ) -> Result<NextPosition, ObsError>;
71}
72
73#[async_trait]
74impl ObjectTrait for Client {
75    /// PUT上传
76    async fn put_object<S: AsRef<str> + Send>(
77        &self,
78        bucket: S,
79        key: S,
80        object: &[u8]
81    ) -> Result<(), ObsError> {
82        let mut with_headers = HeaderMap::new();
83        with_headers.insert(
84            "Content-Length",
85            HeaderValue::from_str(format!("{}", object.len()).as_str()).unwrap()
86        );
87        let resp = self.do_action(
88            Method::PUT,
89            bucket,
90            key,
91            Some(with_headers),
92            None,
93            Some(object.to_owned())
94        ).await?;
95        let _ = resp.text().await?;
96
97        Ok(())
98    }
99
100    async fn append_object<S: AsRef<str> + Send>(
101        &self,
102        bucket: S,
103        key: S,
104        appended: &[u8],
105        position: u64
106    ) -> Result<NextPosition, ObsError> {
107        let mut params = HashMap::new();
108        params.insert("append".to_string(), "".into());
109        params.insert("position".into(), position.to_string());
110        let mut with_headers = HeaderMap::new();
111        with_headers.insert(
112            "Content-Length",
113            HeaderValue::from_str(format!("{}", appended.len()).as_str()).unwrap()
114        );
115        let resp = self.do_action(
116            Method::POST,
117            bucket,
118            key,
119            Some(with_headers),
120            Some(params),
121            Some(appended.to_owned())
122        ).await?;
123        let status = resp.status();
124        let headers = resp.headers().clone();
125        if status.is_success() {
126            let next_position = if let Some(next) = headers.get("x-obs-next-append-position") {
127                let next = String::from_utf8_lossy(next.as_bytes()).to_string();
128                match next.parse::<u64>() {
129                    Ok(u) => Some(u),
130                    Err(_) => None,
131                }
132            } else {
133                None
134            };
135            Ok(next_position as NextPosition)
136        } else {
137            Err(ObsError::Response {
138                status,
139                message: "response error".into(),
140            })
141        }
142    }
143
144    /// 复制对象
145    async fn copy_object<S1, S2, S3>(
146        &self,
147        bucket: S1,
148        src: S2,
149        dest: S3
150    )
151        -> Result<CopyObjectResult, ObsError>
152        where S1: AsRef<str> + Send, S2: AsRef<str> + Send, S3: AsRef<str> + Send
153    {
154        let mut with_headers = HeaderMap::new();
155        let dest = dest.as_ref().trim_start_matches('/');
156        let src = src.as_ref().trim_start_matches('/');
157        let src = encode(src);
158        let copy_source = format!("/{}/{}", bucket.as_ref(), src);
159        with_headers.insert("x-obs-copy-source", HeaderValue::from_str(&copy_source).unwrap());
160
161        let resp = self.do_action(
162            Method::PUT,
163            bucket,
164            dest,
165            Some(with_headers),
166            None,
167            None::<String>
168        ).await?;
169        let status = resp.status();
170        let text = resp.text().await?;
171        status_to_response::<CopyObjectResult>(status, text)
172        // match status {
173        //     StatusCode::OK => {
174        //         let r: CopyObjectResponse = serde_xml_rs::from_str(&text)?;
175        //         Ok(r)
176        //     }
177        //     StatusCode::FORBIDDEN => {
178        //         let er: ErrorResponse = serde_xml_rs::from_str(&text)?;
179        //         Err(ObsError::Response {
180        //             status: StatusCode::FORBIDDEN,
181        //             message: er.message,
182        //         })
183        //     }
184        //     _ => Err(ObsError::Unknown),
185        // }
186    }
187
188    /// 删除对象
189    async fn delete_object<S: AsRef<str> + Send>(&self, bucket: S, key: S) -> Result<(), ObsError> {
190        let _resp = self.do_action(Method::DELETE, bucket, key, None, None, None::<String>).await?;
191        Ok(())
192    }
193    /// 批量删除对象
194    async fn delete_objects<S: AsRef<str> + Send, K: IntoIterator<Item = S> + Send>(
195        &self,
196        bucket: S,
197        keys: K,
198        response_mode: ResponseMode,
199    ) -> Result<(), ObsError> {
200        let mut with_headers = HeaderMap::new();
201        let mut params = HashMap::new();
202        params.insert("delete".to_string(), "".to_string());
203
204        let body = Delete {
205            quiet: response_mode.to_bool(),
206            item: keys
207                .into_iter()
208                .map(|key|
209                    Item::Object(Object { key_name: key.as_ref().to_owned(), version_id: None })
210                )
211                .collect::<Vec<Item>>(),
212        };
213        let body = serde_xml_rs::to_string(&body)?;
214        let mut hasher = Md5::new();
215        hasher.update(body.as_bytes());
216        let result = hasher.finalize();
217
218        let val = general_purpose::STANDARD.encode(result);
219
220        with_headers.insert("Content-MD5", HeaderValue::from_str(val.as_str()).unwrap());
221
222        with_headers.insert(
223            "Content-Length",
224            HeaderValue::from_str(format!("{}", body.as_bytes().len()).as_str()).unwrap()
225        );
226        let _resp = self.do_action(
227            Method::POST,
228            bucket,
229            "",
230            Some(with_headers),
231            Some(params),
232            Some(body)
233        ).await?;
234        // dbg!(_resp.text().await?);
235        Ok(())
236    }
237
238    ///获取对象内容
239    async fn get_object<S: AsRef<str> + Send>(
240        &self,
241        bucket: S,
242        key: S
243    ) -> Result<bytes::Bytes, ObsError> {
244        let resp = self
245            .do_action(Method::GET, bucket, key, None, None, None::<String>).await?
246            .bytes().await?;
247
248        Ok(resp)
249    }
250
251    /// 获取对象元数据
252    async fn get_object_metadata<S: AsRef<str> + Send>(
253        &self,
254        bucket: S,
255        key: S
256    ) -> Result<ObjectMeta, ObsError> {
257        let resp = self.do_action(Method::HEAD, bucket, key, None, None, None::<String>).await?;
258        let headers = resp.headers();
259        let mut data = HashMap::with_capacity(headers.len());
260        for (key, val) in headers {
261            data.insert(key.as_str(), val.to_str().unwrap());
262        }
263
264        let header_str = serde_json::to_string(&data).map_err(|_e| ObsError::ParseOrConvert)?;
265
266        let data: ObjectMeta = serde_json
267            ::from_str(&header_str)
268            .map_err(|_e| ObsError::ParseOrConvert)?;
269
270        Ok(data)
271    }
272}