aliyun_oss_rs/object/
get_object.rs1use crate::common::body_to_bytes;
2use crate::{
3 Error,
4 common::format_gmt,
5 error::normal_error,
6 request::{Oss, OssRequest},
7};
8use bytes::Bytes;
9use futures_util::{Stream, StreamExt};
10use http::Method;
11use http_body_util::BodyExt;
12use std::pin::Pin;
13use time::OffsetDateTime;
14use tokio::{
15 fs::{OpenOptions, create_dir_all},
16 io::{AsyncWriteExt, BufWriter},
17};
18
19pub struct GetObject {
23 req: OssRequest,
24}
25impl GetObject {
26 pub(super) fn new(oss: Oss) -> Self {
27 GetObject {
28 req: OssRequest::new(oss, Method::GET),
29 }
30 }
31 pub fn set_range(mut self, start: usize, end: Option<usize>) -> Self {
37 self.req.insert_header(
38 "Range",
39 format!(
40 "bytes={}-{}",
41 start,
42 end.map(|v| v.to_string()).unwrap_or_else(|| String::new())
43 ),
44 );
45 self
46 }
47 pub fn set_if_modified_since(mut self, if_modified_since: OffsetDateTime) -> Self {
50 self.req
51 .insert_header("If-Modified-Since", format_gmt(if_modified_since));
52 self
53 }
54 pub fn set_if_unmodified_since(mut self, if_unmodified_since: OffsetDateTime) -> Self {
57 self.req
58 .insert_header("If-Unmodified-Since", format_gmt(if_unmodified_since));
59 self
60 }
61 pub fn set_if_match(mut self, if_match: impl ToString) -> Self {
65 self.req.insert_header("If-Match", if_match);
66 self
67 }
68 pub fn set_if_none_match(mut self, if_none_match: impl ToString) -> Self {
71 self.req.insert_header("If-None-Match", if_none_match);
72 self
73 }
74 pub async fn download_to_file(self, save_path: &str) -> Result<(), Error> {
78 if save_path.contains("://") {
80 return Err(Error::PathNotSupported);
81 }
82 let response = self.req.send_to_oss()?.await?;
84 let status_code = response.status();
86 match status_code {
87 code if code.is_success() => {
88 let parent_dir = std::path::Path::new(save_path).parent();
90 if let Some(dir) = parent_dir {
91 create_dir_all(dir).await?;
92 }
93 let file = OpenOptions::new()
95 .write(true)
96 .create_new(true)
97 .open(save_path)
98 .await?;
99 let mut writer = BufWriter::with_capacity(131072, file);
101 let mut response_bytes = response.into_body().into_data_stream();
103 while let Some(chunk) = response_bytes.next().await {
104 match chunk {
105 Ok(data) => writer.write_all(data.as_ref()).await?,
106 Err(e) => return Err(Error::HyperError(e)),
107 }
108 }
109 writer.flush().await?;
110 writer.shutdown().await?;
111 Ok(())
112 }
113 _ => Err(normal_error(response).await),
114 }
115 }
116 pub async fn download(self) -> Result<Bytes, Error> {
120 let response = self.req.send_to_oss()?.await?;
122 let status_code = response.status();
124 match status_code {
125 code if code.is_success() => Ok(body_to_bytes(response.into_body()).await?),
126 _ => Err(normal_error(response).await),
127 }
128 }
129 pub async fn download_to_stream(
147 self,
148 ) -> Result<Pin<Box<dyn Stream<Item = Result<bytes::Bytes, Error>> + Send>>, Error> {
149 let response = self.req.send_to_oss()?.await?;
151 let status_code = response.status();
153 match status_code {
154 code if code.is_success() => {
155 let stream = response
156 .into_body()
157 .into_data_stream()
158 .map(|item| match item {
159 Ok(bytes) => Ok(bytes),
160 Err(e) => Err(e.into()),
161 });
162 Ok(Box::pin(stream))
163 }
164 _ => Err(normal_error(response).await),
165 }
166 }
167}