1mod utils;
195mod algorithm;
196
197use algorithm::HmacSha1;
198use futures::stream::{self, StreamExt};
199use std::{fmt, fs, io};
200use std::path::Path;
201use serde::{Serialize, Deserialize};
202use regex::Regex;
203use std::vec::Vec;
204use url::{Url, form_urlencoded};
205use chrono::Local;
206use reqwest::StatusCode;
207use urlencoding::encode;
208
209pub struct ObsClient {
212 pub endpoint: String,
213 pub ak: String,
214 pub sk: String,
215 pub bucket: String,
216}
217
218pub enum ObsError {
219 Common(Box<dyn core::error::Error + Send + Sync + 'static>),
220}
221
222impl <E> From<E> for ObsError
224where
225 E: core::error::Error + Send + Sync + 'static
226{
227 fn from(err: E) -> ObsError {
228 ObsError::Common(Box::new(err))
229 }
230}
231
232impl fmt::Debug for ObsError {
233 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
234 let mut builder = f.debug_struct("obs-sdk::Error");
235 builder.finish()
236 }
237}
238
239impl fmt::Display for ObsError {
240 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
241 match self {
242 ObsError::Common(e) => f.write_str(&format!("obs sdk error: {:?}", e))?,
243 };
244 Ok(())
245 }
246}
247
248
249
250impl ObsClient {
251
252 pub async fn list(&self, prefix: &str, is_all: bool) -> Result<Vec<ObjectMeta>, ObsError> {
266 let mut all_objects = Vec::new();
267 let mut marker: Option<String> = None;
268
269 loop {
270 let url = match &marker {
271 Some(m) => format!("https://{}.{}/?prefix={}&marker={}", self.bucket, self.endpoint, prefix, m),
272 None => format!("https://{}.{}/?prefix={}", self.bucket, self.endpoint, prefix),
273 };
274
275 let date = utils::now_str_gmt();
276 let hmacsha1 = HmacSha1();
277 let string_to_sign = hmacsha1.header_string_to_sign("GET", "", "", &date, "", &format!("/{}/", self.bucket));
278 let signature = hmacsha1.sign_to_base64string(&string_to_sign, &self.sk);
279 let authorization = format!("OBS {}:{}", self.ak, signature);
280
281 let client = reqwest::Client::new();
282 let res = client.get(&url)
283 .header("Date", &date)
284 .header("Authorization", &authorization)
285 .send()
286 .await?;
287
288 if !res.status().is_success() {
289 return Err(ObsError::Common(Box::new(io::Error::new(
290 io::ErrorKind::Other,
291 format!("请求失败,状态码={}", res.status())
292 ))));
293 }
294
295 let xml_content_string = res.text().await?;
296 let result = XmlParser::new(&xml_content_string).parse();
297
298 all_objects.extend(result.objects);
299
300 if !is_all || !result.is_truncated {
301 break;
302 }
303
304 marker = result.next_marker;
305 }
306
307 Ok(all_objects)
308 }
309
310 pub async fn upload_object(&self, obj_key: &str, data: Vec<u8>) -> Result<(), ObsError> {
327 let obj_key = &Self::urlencode(obj_key);
328 let url = format!("https://{}.{}/{}", self.bucket, self.endpoint, obj_key);
330
331 let md5_string = utils::base64_md5_str(&data);
332
333 let date = utils::now_str_gmt();
335
336 let hmacsha1 = HmacSha1();
338
339 let file_type = &utils::get_mime_type_from_extension(obj_key)
340 .unwrap_or(String::from("application/octet-stream"));
341
342 let string_to_sign = hmacsha1.header_string_to_sign("PUT", &md5_string, file_type, &date, "", &format!("/{}/{}", self.bucket, obj_key));
344
345 let signature = hmacsha1.sign_to_base64string(&string_to_sign, &self.sk);
347
348 let authorization = format!("OBS {}:{}", self.ak, signature);
350
351 let client = reqwest::Client::new();
353 let res = client.put(url)
354 .header("Content-MD5", &md5_string)
355 .header("Date", &date)
356 .header("Content-Type", file_type)
357 .header("Content-Length", data.len())
358 .header("Authorization", authorization)
359 .body(data)
360 .send()
361 .await?;
362 let _status: StatusCode = res.status();
363 Ok(())
364
365 }
366
367 pub async fn upload_file(&self, obj_key: &str, file_path: &str) -> Result<(), ObsError> {
369 let data = fs::read(file_path)?;
370 self.upload_object(obj_key, data).await
371 }
372
373 pub async fn upload_files(&self, files: Vec<(&str, &str)>) -> Vec<Result<(), ObsError>> {
386 const MAX_CONCURRENT: usize = 32;
387
388 stream::iter(files)
389 .map(|(obj_key, file_path)| async move {
390 self.upload_file(obj_key, file_path).await
391 })
392 .buffer_unordered(MAX_CONCURRENT)
393 .collect()
394 .await
395 }
396
397 pub async fn download_object(&self, obj_key: &str) -> Result<Vec<u8>, ObsError> {
405 let obj_key = &Self::urlencode(obj_key);
406 let url = format!("https://{}.{}/{}", self.bucket, self.endpoint, obj_key);
408
409 let date = utils::now_str_gmt();
411
412 let hmacsha1 = HmacSha1();
414
415 let string_to_sign = hmacsha1.header_string_to_sign("GET", "", "", &date, "", &format!("/{}/{}", self.bucket, obj_key));
417
418 let signature = hmacsha1.sign_to_base64string(&string_to_sign, &self.sk);
420
421 let authorization = format!("OBS {}:{}", self.ak, signature);
423
424 let client = reqwest::Client::new();
426 let res = client.get(url)
427 .header("Authorization", &authorization)
428 .header("Date", &date)
429 .send()
430 .await?;
431
432 if res.status().is_success() {
434 return Ok(res.bytes().await?.to_vec());
435 }
436
437 Err(ObsError::Common(Box::new(io::Error::new(io::ErrorKind::Other, format!("请求失败,状态码={}", res.status())))))
438 }
439
440 pub async fn delete_object(&self, obj_key: &str) -> Result<(), ObsError> {
442 let obj_key = &Self::urlencode(obj_key);
443 let url = format!("https://{}.{}/{}", self.bucket, self.endpoint, obj_key);
445
446 let md5_string = "";
447
448 let date = utils::now_str_gmt();
450
451 let hmacsha1 = HmacSha1();
453
454 let string_to_sign = hmacsha1.header_string_to_sign("DELETE", &md5_string, "", &date, "", &format!("/{}/{}", self.bucket, obj_key));
456
457 let signature = hmacsha1.sign_to_base64string(&string_to_sign, &self.sk);
459
460 let authorization = format!("OBS {}:{}", self.ak, signature);
462
463 let client = reqwest::Client::new();
465 let res = client.delete(url)
466 .header("Date", &date)
467 .header("Authorization", authorization)
468 .send()
469 .await;
470
471 let res = match res {
472 Ok(response) => response,
473 Err(e) => {
474 return Err(ObsError::Common(Box::new(io::Error::new(io::ErrorKind::Other, e))));
475 },
476 };
477 let status = res.status();
478 println!("status = {}, {}", status, res.text_with_charset("utf-8").await?);
479
480 Ok(())
481 }
482
483 pub async fn download_file(&self, obj_key: &str, file_path: &str, overwrite: bool) -> Result<(), ObsError> {
490 let file_path = Path::new(file_path);
491
492 if file_path.exists() && !overwrite {
494 return Err(ObsError::Common(Box::new(io::Error::new(io::ErrorKind::AlreadyExists, "文件已存在,请删除文件或设置覆盖参数"))));
495 }
496
497 let parent = file_path.parent().unwrap();
499 if !parent.exists() {
500 fs::create_dir_all(&parent)?;
501 }
502
503 let data = self.download_object(obj_key).await?;
505
506 fs::write(file_path, data)?;
508 Ok(())
509 }
510
511 pub async fn download_files(&self, files: Vec<(&str, &str, bool)>) -> Vec<Result<(), ObsError>> {
524 const MAX_CONCURRENT: usize = 32;
525
526 stream::iter(files)
527 .map(|(obj_key, file_path, overwrite)| async move {
528 self.download_file(obj_key, file_path, overwrite).await
529 })
530 .buffer_unordered(MAX_CONCURRENT)
531 .collect()
532 .await
533 }
534
535 pub fn url_sign(&self, url_str: &str) -> Result<String, ObsError> {
536 let obs_object_url = Url::parse(url_str)?;
537 let resource_part = obs_object_url.path();
538 let host = obs_object_url.host().unwrap();
539 let domain = match host {
540 url::Host::Domain(domain) => domain.to_string(),
541 _ => format!("{}.{}", self.bucket, self.endpoint)
542 };
543 let parts: Vec<&str> = domain.split(".").collect();
544 let bucket_name = parts[0];
545
546 let timestamp = utils::timestamp(Local::now(), 3600*2);
547
548 let expires = format!("{}", timestamp);
550
551 let hmacsha1 = HmacSha1();
553
554 let string_to_sign = hmacsha1.url_string_to_sign("GET", "", "", &expires, "", &format!("/{}{}", bucket_name, resource_part));
556
557 let signature = hmacsha1.sign_to_base64string(&string_to_sign, &self.sk);
559 let signature = form_urlencoded::byte_serialize(signature.as_bytes()).collect::<String>();
560
561 let sign_url = format!("{}?AccessKeyId={}&Expires={}&Signature={}", url_str, self.ak, expires, signature);
563 Ok(sign_url)
564 }
565
566 fn urlencode(s: &str) -> String {
567 let tokens: Vec<String> = s.split("/").map(|token| {
568 encode(token).to_string()
569 }).collect();
570 tokens.join("/")
571 }
572
573}
574
575
576#[derive(Serialize, Deserialize, Debug)]
580pub struct ObjectMeta {
581
582 pub key: String,
586
587 pub last_modified: String,
591
592 pub etag: String,
596
597 pub size: u64,
601
602 pub storage_class: String,
606}
607
608#[derive(Debug)]
610struct ListObjectsResult {
611 objects: Vec<ObjectMeta>,
612 is_truncated: bool,
613 next_marker: Option<String>,
614}
615
616struct XmlParser {
620 xml: String,
621}
622
623
624impl XmlParser {
625 fn new(xml: &str) -> Self {
626 XmlParser { xml: xml.to_string() }
627 }
628
629 fn parse(&self) -> ListObjectsResult {
633 let xml = &self.xml;
634
635 let contents_re = Regex::new(r#"<Contents>(.*?)</Contents>"#).unwrap();
637 let key_regex = Regex::new(r#"<Key>(.*?)</Key>"#).unwrap();
638 let last_modified_regex = Regex::new(r#"<LastModified>(.*?)</LastModified>"#).unwrap();
639 let etag_regex = Regex::new(r#"<ETag>(.*?)</ETag>"#).unwrap();
640 let size_regex = Regex::new(r#"<Size>(.*?)</Size>"#).unwrap();
641 let storage_class_regex = Regex::new(r#"<StorageClass>(.*?)</StorageClass>"#).unwrap();
642 let is_truncated_regex = Regex::new(r#"<IsTruncated>(.*?)</IsTruncated>"#).unwrap();
643 let next_marker_regex = Regex::new(r#"<NextMarker>(.*?)</NextMarker>"#).unwrap();
644
645
646 let mut contents_vec = Vec::new();
648 for captures in contents_re.captures_iter(xml) {
649 let inner_content = &captures[1];
650
651 let key = key_regex.captures(inner_content).map(|cap| cap[1].to_string()).unwrap_or_default();
652 let last_modified = last_modified_regex.captures(inner_content).map(|cap| cap[1].to_string()).unwrap_or_default();
653 let etag = etag_regex.captures(inner_content).map(|cap| cap[1].to_string()).unwrap_or_default();
654 let size = size_regex.captures(inner_content).and_then(|cap| cap[1].parse().ok()).unwrap_or(0);
655 let storage_class = storage_class_regex.captures(inner_content).map(|cap| cap[1].to_string()).unwrap_or_default();
656 let content = ObjectMeta {
657 key,
658 last_modified,
659 etag,
660 size,
661 storage_class,
662 };
663 contents_vec.push(content);
664 }
665
666 let is_truncated = is_truncated_regex.captures(xml)
668 .and_then(|cap| cap[1].to_string().parse::<bool>().ok())
669 .unwrap_or(false);
670
671 let next_marker = next_marker_regex.captures(xml)
673 .map(|cap| cap[1].to_string());
674
675 ListObjectsResult {
676 objects: contents_vec,
677 is_truncated,
678 next_marker,
679 }
680 }
681}
682
683
684#[cfg(test)]
685mod tests {
686 use super::*;
687 use chrono::{Duration, Local};
688 use std::time::{SystemTime, UNIX_EPOCH};
689
690 #[test]
691 fn test_parse_xml() {
692 let xml = r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?><ListBucketResult xmlns="http://obs.myhwclouds.com/doc/2015-06-30/"><Name>obs-products</Name><Prefix>tmp</Prefix><Marker></Marker><MaxKeys>1000</MaxKeys><IsTruncated>false</IsTruncated><Contents><Key>tmp/</Key><LastModified>2024-12-03T12:01:48.020Z</LastModified><ETag>"d41d8cd98f00b204e9800998ecf8427e"</ETag><Size>0</Size><Owner><ID>74df55bf376f41d48959d2aa9deaaf38</ID></Owner><StorageClass>STANDARD</StorageClass></Contents><Contents><Key>tmp/index001.png</Key><LastModified>2025-08-20T07:42:59.813Z</LastModified><ETag>"de317c0b7b6e02b42ef2b9e29bb5906a"</ETag><Size>12082</Size><Owner><ID>74df55bf376f41d48959d2aa9deaaf38</ID></Owner><StorageClass>STANDARD</StorageClass></Contents><Contents><Key>tmp/index002.png</Key><LastModified>2025-08-20T07:52:10.204Z</LastModified><ETag>"de317c0b7b6e02b42ef2b9e29bb5906a"</ETag><Size>12082</Size><Owner><ID>74df55bf376f41d48959d2aa9deaaf38</ID></Owner><StorageClass>STANDARD</StorageClass></Contents></ListBucketResult>"#;
693 let parser = XmlParser::new(xml);
694 let result = parser.parse();
695 let json_data = serde_json::to_string_pretty(&result.objects).unwrap();
696 println!("{}", json_data);
697 println!("is_truncated: {}", result.is_truncated);
698 println!("next_marker: {:?}", result.next_marker);
699 }
700
701 #[test]
702 fn test_parse_xml_with_next_marker() {
703 let xml = r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?><ListBucketResult xmlns="http://obs.myhwclouds.com/doc/2015-06-30/"><Name>obs-products</Name><Prefix>tmp</Prefix><Marker></Marker><MaxKeys>1000</MaxKeys><IsTruncated>true</IsTruncated><NextMarker>tmp/index999.png</NextMarker><Contents><Key>tmp/index001.png</Key><LastModified>2025-08-20T07:42:59.813Z</LastModified><ETag>"de317c0b7b6e02b42ef2b9e29bb5906a"</ETag><Size>12082</Size><Owner><ID>74df55bf376f41d48959d2aa9deaaf38</ID></Owner><StorageClass>STANDARD</StorageClass></Contents></ListBucketResult>"#;
704 let parser = XmlParser::new(xml);
705 let result = parser.parse();
706 assert_eq!(result.is_truncated, true);
707 assert_eq!(result.next_marker, Some("tmp/index999.png".to_string()));
708 println!("is_truncated: {}", result.is_truncated);
709 println!("next_marker: {:?}", result.next_marker);
710 }
711
712 #[test]
713 fn test_timestamp() {
714 let now = Local::now();
715 let two_hours = Duration::hours(2);
716 let future_time = now + two_hours;
717
718 let system_time: SystemTime = future_time.into();
719 let duration = system_time.duration_since(UNIX_EPOCH).unwrap();
720 let timestamp = duration.as_secs();
721 println!("timestamp = {}", timestamp);
722 }
723
724}