1use crate::types::OssConfig;
2use crate::SignatureAble;
3use base64::prelude::*;
4
5#[derive(Debug)]
6pub struct Client {
7 oss_config: OssConfig,
8 bucket: crate::Bucket,
9}
10
11impl Client {
12 pub fn from_env() -> anyhow::Result<Self> {
13 let oss_config = OssConfig::from_env()?;
14 let bucket = crate::Bucket::new(oss_config.bucket_name.as_str(), oss_config.bucket_location.as_str(), "", None);
15 Ok(Self { oss_config, bucket })
16 }
17 pub fn new<T: ToString>(access_key_id: T, access_key_secret: T, bucket_name: T, bucket_location: T, path: T, is_internal: bool) -> Self {
18 let oss_config = OssConfig::new(
19 access_key_id.to_string(),
20 access_key_secret.to_string(),
21 bucket_name.to_string().clone(),
22 bucket_location.to_string().clone(),
23 path.to_string().clone(),
24 is_internal,
25 );
26 let bucket = crate::Bucket::new(bucket_name, bucket_location, "", None);
27 Self { oss_config, bucket }
28 }
29}
30
31impl Client {
32 pub async fn list_buckets(&self) -> anyhow::Result<Vec<crate::Bucket>> {
34 let mut request = self.oss_config.get_endpoint_request(reqwest::Method::GET)?;
35 self.oss_config.sign_header_request(&mut request)?;
36
37 let response = self.oss_config.get_request_builder(request)?.send().await?;
38 if !response.status().is_success() {
39 return Err(anyhow::anyhow!(response.text().await?));
40 }
41 let xml_data = response.text().await?;
42 let doc: roxmltree::Document = roxmltree::Document::parse(&xml_data)?;
43 let mut buckets = Vec::new();
44 let buckets_node = doc.descendants().find(|n| n.has_tag_name("Buckets")).ok_or_else(|| anyhow::anyhow!("Buckets node not found"))?;
45 for bucket_node in buckets_node.descendants().filter(|n| n.has_tag_name("Bucket")) {
46 buckets.push(crate::Bucket::new_from_xml_node(bucket_node)?);
47 }
48 Ok(buckets)
49 }
50
51 pub async fn put_bucket(&self) -> anyhow::Result<crate::Bucket> {
53 let mut request = self.oss_config.get_bucket_request(reqwest::Method::PUT, None)?;
54 self.oss_config.sign_header_request(&mut request)?;
55
56 let response = self.oss_config.get_request_builder(request)?.send().await?;
57 if !response.status().is_success() {
58 return Err(anyhow::anyhow!(response.text().await?));
59 }
60 let creation_date = {
61 let date = response.headers().get("date");
62 if let Some(date) = date {
63 Some(chrono::DateTime::parse_from_rfc2822(date.to_str()?)?.into())
64 } else {
65 None
66 }
67 };
68 *self.bucket.creation_date.lock().unwrap() = creation_date.clone();
69 Ok(crate::Bucket::new(self.bucket.name.as_str(), self.bucket.location.as_str(), "", creation_date))
70 }
71
72 pub async fn get_bucket_info(&self) -> anyhow::Result<Option<crate::Bucket>> {
74 static BUCKET_INFO: &str = "bucketInfo";
75 let mut request = self.oss_config.get_bucket_request(reqwest::Method::GET, None)?;
76 request.url_mut().set_query(Some(BUCKET_INFO));
77 self.oss_config.sign_header_request(&mut request)?;
78
79 let response = self.oss_config.get_request_builder(request)?.send().await?;
80 if !response.status().is_success() {
81 return Err(anyhow::anyhow!(response.text().await?));
82 }
83 let xml_data = response.text().await?;
84 let doc: roxmltree::Document = roxmltree::Document::parse(&xml_data)?;
85 if let Some(bucket_node) = doc.descendants().find(|n| n.has_tag_name("Bucket")) {
86 let bucket = crate::Bucket::new_from_xml_node(bucket_node)?;
87 *self.bucket.creation_date.lock().unwrap() = bucket.creation_date.lock().unwrap().clone();
88 return Ok(Some(bucket));
89 }
90 Ok(None)
91 }
92
93 pub async fn get_bucket_location(&self) -> anyhow::Result<crate::types::BucketLocation> {
95 static BUCKET_LOCATION: &str = "location";
96 let mut request = self.oss_config.get_bucket_request(reqwest::Method::GET, None)?;
97 request.url_mut().set_query(Some(BUCKET_LOCATION));
98 self.oss_config.sign_header_request(&mut request)?;
99
100 let response = self.oss_config.get_request_builder(request)?.send().await?;
101 if !response.status().is_success() {
102 return Err(anyhow::anyhow!(response.text().await?));
103 }
104 let xml_data = response.text().await?;
105 let doc: roxmltree::Document = roxmltree::Document::parse(&xml_data)?;
106 crate::types::BucketLocation::new_from_xml_node(doc.root())
107 }
108
109 pub async fn get_bucket_stat(&self) -> anyhow::Result<crate::types::BucketStat> {
111 static BUCKET_STAT: &str = "stat";
112 let mut request = self.oss_config.get_bucket_request(reqwest::Method::GET, None)?;
113 request.url_mut().set_query(Some(BUCKET_STAT));
114 self.oss_config.sign_header_request(&mut request)?;
115
116 let response = self.oss_config.get_request_builder(request)?.send().await?;
117 if !response.status().is_success() {
118 return Err(anyhow::anyhow!(response.text().await?));
119 }
120 let xml_data = response.text().await?;
121 let doc: roxmltree::Document = roxmltree::Document::parse(&xml_data)?;
122 crate::types::BucketStat::new_from_xml_node(doc.root())
123 }
124
125 pub async fn delete_bucket(&self) -> anyhow::Result<()> {
127 let mut request = self.oss_config.get_bucket_request(reqwest::Method::DELETE, None)?;
128 self.oss_config.sign_header_request(&mut request)?;
129
130 let response = self.oss_config.get_request_builder(request)?.send().await?;
131 if !response.status().is_success() {
132 return Err(anyhow::anyhow!(response.text().await?));
133 }
134 *self.bucket.creation_date.lock().unwrap() = None;
135 Ok(())
136 }
137}
138
139impl Client {
140 pub async fn list_objects(&self, prefix: Option<&str>, delimiter: Option<&str>) -> anyhow::Result<(Vec<crate::Folder>, Vec<crate::File>)> {
142 let mut request = self.oss_config.get_bucket_request(reqwest::Method::GET, None)?;
143 request.url_mut().query_pairs_mut().append_pair("list-type", "2");
144 if let Some(prefix) = prefix {
145 let prefix = self.oss_config.get_object_name(prefix);
146 request.url_mut().query_pairs_mut().append_pair("prefix", prefix.as_ref());
147 }
148 if let Some(delimiter) = delimiter {
149 request.url_mut().query_pairs_mut().append_pair("delimiter", delimiter);
150 }
151 self.oss_config.sign_header_request(&mut request)?;
152
153 let response = self.oss_config.get_request_builder(request)?.send().await?;
154 if !response.status().is_success() {
155 return Err(anyhow::anyhow!(response.text().await?));
156 }
157 let xml_data = response.text().await?;
158 let doc: roxmltree::Document = roxmltree::Document::parse(&xml_data)?;
159 let mut folders = Vec::new();
160 for folder_node in doc.descendants().filter(|n| n.has_tag_name("CommonPrefixes")) {
161 folders.push(crate::Folder::new_from_xml_node(folder_node)?);
162 }
163 let mut files = Vec::new();
164 for file_node in doc.descendants().filter(|n| n.has_tag_name("Contents")) {
165 files.push(crate::File::new_from_xml_node(file_node)?);
166 }
167 Ok((folders, files))
168 }
169 pub async fn list_folders(&self, prefix: Option<&str>) -> anyhow::Result<Vec<crate::Folder>> {
170 let (folders, _files) = self.list_objects(prefix, Some("/")).await?;
171 Ok(folders)
172 }
173 pub async fn list_files(&self, prefix: Option<&str>) -> anyhow::Result<Vec<crate::File>> {
174 let (_folders, files) = self.list_objects(prefix, Some("/")).await?;
175 Ok(files)
176 }
177
178 pub async fn put_object<T: Into<bytes::Bytes>>(&self, object_name: &str, bytes: T) -> anyhow::Result<reqwest::header::HeaderMap> {
180 let object_name = self.oss_config.get_object_name(object_name);
181 let mut request = self.oss_config.get_bucket_request(reqwest::Method::PUT, Some(bytes.into()))?;
182 request.url_mut().set_path(object_name.as_ref());
183 self.oss_config.sign_header_request(&mut request)?;
184
185 let response = self.oss_config.get_request_builder(request)?.send().await?;
186 if !response.status().is_success() {
187 return Err(anyhow::anyhow!(response.text().await?));
188 }
189 Ok(response.headers().clone())
190 }
191 pub async fn put_object_stream<S>(&self, object_name: &str, stream: S) -> anyhow::Result<reqwest::header::HeaderMap>
192 where
193 S: futures::stream::Stream<Item = reqwest::Result<bytes::Bytes>> + Send + Sync + 'static,
194 {
195 let object_name = self.oss_config.get_object_name(object_name);
196 let mut request = self.oss_config.get_bucket_request(reqwest::Method::PUT, None)?;
197 request.url_mut().set_path(object_name.as_ref());
198 *request.body_mut() = Some(reqwest::Body::wrap_stream(stream));
199 self.oss_config.sign_header_request(&mut request)?;
200
201 let response = self.oss_config.get_request_builder(request)?.send().await?;
202 if !response.status().is_success() {
203 return Err(anyhow::anyhow!(response.text().await?));
204 }
205 Ok(response.headers().clone())
206 }
207
208 pub async fn get_object(&self, object_name: &str) -> anyhow::Result<(bytes::Bytes, reqwest::header::HeaderMap)> {
210 let object_name = self.oss_config.get_object_name(object_name);
211 let mut request = self.oss_config.get_bucket_request(reqwest::Method::GET, None)?;
212 request.url_mut().set_path(object_name.as_ref());
213 self.oss_config.sign_header_request(&mut request)?;
214
215 let response = self.oss_config.get_request_builder(request)?.send().await?;
216 if !response.status().is_success() {
217 return Err(anyhow::anyhow!(response.text().await?));
218 }
219 let headers = response.headers().clone();
220 Ok((response.bytes().await?, headers))
221 }
222
223 pub async fn delete_object(&self, object_name: &str) -> anyhow::Result<()> {
225 let object_name = self.oss_config.get_object_name(object_name);
226 let mut request = self.oss_config.get_bucket_request(reqwest::Method::DELETE, None)?;
227 request.url_mut().set_path(object_name.as_ref());
228 self.oss_config.sign_header_request(&mut request)?;
229
230 let response = self.oss_config.get_request_builder(request)?.send().await?;
231 if !response.status().is_success() {
232 return Err(anyhow::anyhow!(response.text().await?));
233 }
234 Ok(())
235 }
236 pub async fn delete_multiple_objects(&self, object_names: Vec<&str>) -> anyhow::Result<()> {
238 let object_names = object_names.into_iter().map(|object_name| self.oss_config.get_object_name(object_name));
239 let xml_body = {
240 let xml_objects = object_names
241 .into_iter()
242 .map(|object_name| format!("<Object><Key>{}</Key></Object>", self.oss_config.get_object_name(&object_name)))
243 .collect::<Vec<String>>()
244 .join("");
245 format!(
246 r#"<?xml version="1.0" encoding="UTF-8"?>
247 <Delete>
248 <Quiet>true</Quiet>
249 {}
250 </Delete>"#,
251 xml_objects
252 )
253 };
254
255 static OBJECT_META: &str = "delete";
256 let mut request = self.oss_config.get_bucket_request(reqwest::Method::POST, Some(xml_body.clone().into()))?;
257 request.url_mut().set_query(Some(OBJECT_META));
258 request.headers_mut().insert("Content-Type", "application/xml".try_into()?);
259 request.body_mut().replace(reqwest::Body::from(xml_body));
260 self.oss_config.sign_header_request(&mut request)?;
261
262 let response = self.oss_config.get_request_builder(request)?.send().await?;
263 if !response.status().is_success() {
264 return Err(anyhow::anyhow!(response.text().await?));
265 }
266 Ok(())
267 }
268 pub async fn copy_object(&self, dest_object_name: &str, source_object_name: &str) -> anyhow::Result<reqwest::header::HeaderMap> {
270 let dest_object_name = self.oss_config.get_object_name(dest_object_name);
271 let source_object_name = self.oss_config.get_object_name(source_object_name);
272 let mut request = self.oss_config.get_bucket_request(reqwest::Method::PUT, None)?;
273 request.url_mut().set_path(dest_object_name.as_ref());
274 request.headers_mut().insert("x-oss-copy-source", format!("/{}/{}", self.oss_config.bucket_name, source_object_name).try_into()?);
275 self.oss_config.sign_header_request(&mut request)?;
276
277 let response = self.oss_config.get_request_builder(request)?.send().await?;
278 if !response.status().is_success() {
279 return Err(anyhow::anyhow!(response.text().await?));
280 }
281 Ok(response.headers().clone())
282 }
283
284 pub async fn append_object<T: Into<bytes::Bytes>>(&self, object_name: &str, bytes: T, position: usize) -> anyhow::Result<reqwest::header::HeaderMap> {
286 let object_name = self.oss_config.get_object_name(object_name);
287 static APPEND: &str = "append";
288 let mut request = self.oss_config.get_bucket_request(reqwest::Method::POST, Some(bytes.into()))?;
289 request.url_mut().set_path(object_name.as_ref());
290 request.headers_mut().insert("position", position.try_into()?);
291 request.url_mut().set_query(Some(APPEND));
292 request.url_mut().query_pairs_mut().append_pair("position", position.to_string().as_str());
293 self.oss_config.sign_header_request(&mut request)?;
294
295 let response = self.oss_config.get_request_builder(request)?.send().await?;
296 if !response.status().is_success() {
297 return Err(anyhow::anyhow!(response.text().await?));
298 }
299 Ok(response.headers().clone())
300 }
301
302 pub async fn head_object(&self, object_name: &str) -> anyhow::Result<reqwest::header::HeaderMap> {
304 let object_name = self.oss_config.get_object_name(object_name);
305 let mut request = self.oss_config.get_bucket_request(reqwest::Method::HEAD, None)?;
306 request.url_mut().set_path(object_name.as_ref());
307 self.oss_config.sign_header_request(&mut request)?;
308
309 let response = self.oss_config.get_request_builder(request)?.send().await?;
310 if !response.status().is_success() {
311 let encoded_error_message = response.headers().get("x-oss-err").ok_or(anyhow::anyhow!("x-oss-err header not found"))?.to_str()?.to_string();
312 let err_message = String::from_utf8(BASE64_STANDARD.decode(encoded_error_message.as_bytes())?)?;
313 return Err(anyhow::anyhow!(err_message));
314 }
315 Ok(response.headers().clone())
316 }
317
318 pub async fn get_object_meta(&self, object_name: &str) -> anyhow::Result<reqwest::header::HeaderMap> {
320 let object_name = self.oss_config.get_object_name(object_name);
321 static OBJECT_META: &str = "objectMeta";
322 let mut request = self.oss_config.get_bucket_request(reqwest::Method::HEAD, None)?;
323 request.url_mut().set_path(object_name.as_ref());
324 request.url_mut().set_query(Some(OBJECT_META));
325 self.oss_config.sign_header_request(&mut request)?;
326
327 let response = self.oss_config.get_request_builder(request)?.send().await?;
328 if !response.status().is_success() {
329 let encoded_error_message = response.headers().get("x-oss-err").ok_or(anyhow::anyhow!("x-oss-err header not found"))?.to_str()?.to_string();
330 let err_message = String::from_utf8(BASE64_STANDARD.decode(encoded_error_message.as_bytes())?)?;
331 return Err(anyhow::anyhow!(err_message));
332 }
333 Ok(response.headers().clone())
334 }
335 pub async fn is_object_exist(&self, object_name: &str) -> anyhow::Result<bool> {
337 match self.get_object_meta(object_name).await {
338 Ok(_) => Ok(true),
339 Err(e) if e.to_string().contains("NoSuchKey") => Ok(false),
340 Err(e) => Err(e),
341 }
342 }
343
344 pub async fn sign_object(&self, object_name: &str, expires_duration: std::time::Duration) -> anyhow::Result<String> {
346 let object_name = self.oss_config.get_decoded_object_name(object_name);
347 let expires_time = {
348 let datetime: chrono::DateTime<chrono::Utc> = std::time::SystemTime::now().into();
349 let expires_time = datetime + chrono::Duration::from_std(expires_duration)?;
350 expires_time
351 };
352 let mut object_url = {
353 let host = format!("{}.{}.aliyuncs.com", self.oss_config.bucket_name, self.oss_config.bucket_location.as_str());
354 let object_link = format!("https://{}/{}", host, object_name);
355 reqwest::Url::parse(&object_link)?
356 };
357 let signature_string = crate::types::ParamSignature::new(
358 reqwest::Method::GET,
359 None,
360 None,
361 expires_time.clone(),
362 crate::types::CanonicalizedHeaders::new(None),
363 crate::types::CanonicalizedResource::new(format!("/{}/{}", self.oss_config.bucket_name, object_name)),
364 )
365 .get_signature_string(&self.oss_config);
366 object_url
367 .query_pairs_mut()
368 .append_pair("OSSAccessKeyId", &self.oss_config.access_key_id)
369 .append_pair("Expires", expires_time.timestamp().to_string().as_str())
370 .append_pair("Signature", &signature_string);
371
372 Ok(object_url.to_string())
373 }
374}
375
376impl Client {
377 pub async fn put_symlink(&self, symlink_object_name: &str, target_object_name: &str) -> anyhow::Result<()> {
379 static SYMLINK: &str = "symlink";
380 let symlink_object_name = self.oss_config.get_object_name(symlink_object_name);
381 let target_object_name = self.oss_config.get_encoded_object_name(target_object_name);
382 let mut request = self.oss_config.get_bucket_request(reqwest::Method::PUT, None)?;
383 request.url_mut().set_path(symlink_object_name.as_ref());
384 request.headers_mut().insert("x-oss-symlink-target", target_object_name.as_ref().try_into()?);
385 request.url_mut().set_query(Some(SYMLINK));
386 self.oss_config.sign_header_request(&mut request)?;
387
388 let response = self.oss_config.get_request_builder(request)?.send().await?;
389 if !response.status().is_success() {
390 return Err(anyhow::anyhow!(response.text().await?));
391 }
392 Ok(())
393 }
394
395 pub async fn get_symlink(&self, object_name: &str) -> anyhow::Result<String> {
397 let object_name = self.oss_config.get_object_name(object_name);
398 static SYMLINK: &str = "symlink";
399 let mut request = self.oss_config.get_bucket_request(reqwest::Method::GET, None)?;
400 request.url_mut().set_path(object_name.as_ref());
401 request.url_mut().set_query(Some(SYMLINK));
402 self.oss_config.sign_header_request(&mut request)?;
403
404 let response = self.oss_config.get_request_builder(request)?.send().await?;
405 if !response.status().is_success() {
406 return Err(anyhow::anyhow!(response.text().await?));
407 }
408 Ok(response.headers().get("x-oss-symlink-target").ok_or(anyhow::anyhow!("no symlink target"))?.to_str()?.to_owned())
409 }
410}