aliyun_oss_rs/object/
get_object.rs

1use crate::common::body_to_bytes;
2use crate::{
3    Error,
4    common::format_gmt,
5    error::normal_error,
6    request::{Oss, OssRequest},
7};
8use bytes::Bytes;
9use futures_util::{Stream, StreamExt};
10use http::Method;
11use http_body_util::BodyExt;
12use std::pin::Pin;
13use time::OffsetDateTime;
14use tokio::{
15    fs::{OpenOptions, create_dir_all},
16    io::{AsyncWriteExt, BufWriter},
17};
18
19/// Retrieve the object's content
20///
21/// See the [Alibaba Cloud documentation](https://help.aliyun.com/document_detail/31980.html) for details
22pub struct GetObject {
23    req: OssRequest,
24}
25impl GetObject {
26    pub(super) fn new(oss: Oss) -> Self {
27        GetObject {
28            req: OssRequest::new(oss, Method::GET),
29        }
30    }
31    /// Set the response range
32    ///
33    /// `end` must be greater than or equal to `start` and both must be within the valid range; invalid values result in downloading the entire file
34    ///
35    /// Byte indexing starts at 0; for a 500-byte file, the range is 0-499
36    pub fn set_range(mut self, start: usize, end: Option<usize>) -> Self {
37        self.req.insert_header(
38            "Range",
39            format!(
40                "bytes={}-{}",
41                start,
42                end.map(|v| v.to_string()).unwrap_or_else(|| String::new())
43            ),
44        );
45        self
46    }
47    /// If the specified time is earlier than the actual modification time, the request succeeds
48    ///
49    pub fn set_if_modified_since(mut self, if_modified_since: OffsetDateTime) -> Self {
50        self.req
51            .insert_header("If-Modified-Since", format_gmt(if_modified_since));
52        self
53    }
54    /// If the specified time is equal to or later than the actual modification time, the request succeeds
55    ///
56    pub fn set_if_unmodified_since(mut self, if_unmodified_since: OffsetDateTime) -> Self {
57        self.req
58            .insert_header("If-Unmodified-Since", format_gmt(if_unmodified_since));
59        self
60    }
61    /// If the provided ETag matches the object's ETag, the request succeeds
62    ///
63    /// The ETag verifies whether the data has changed and can be used to check data integrity
64    pub fn set_if_match(mut self, if_match: impl ToString) -> Self {
65        self.req.insert_header("If-Match", if_match);
66        self
67    }
68    /// If the provided ETag differs from the object's ETag, the request succeeds
69    ///
70    pub fn set_if_none_match(mut self, if_none_match: impl ToString) -> Self {
71        self.req.insert_header("If-None-Match", if_none_match);
72        self
73    }
74    /// Download the object to disk
75    ///
76    /// Network paths are not supported. For SMB/NFS and similar storage, mount locally and use the local path
77    pub async fn download_to_file(self, save_path: &str) -> Result<(), Error> {
78        // Validate path
79        if save_path.contains("://") {
80            return Err(Error::PathNotSupported);
81        }
82        // Send request
83        let response = self.req.send_to_oss()?.await?;
84        // Parse the response
85        let status_code = response.status();
86        match status_code {
87            code if code.is_success() => {
88                // Create directory
89                let parent_dir = std::path::Path::new(save_path).parent();
90                if let Some(dir) = parent_dir {
91                    create_dir_all(dir).await?;
92                }
93                // Create file
94                let file = OpenOptions::new()
95                    .write(true)
96                    .create_new(true)
97                    .open(save_path)
98                    .await?;
99                // Create write buffer
100                let mut writer = BufWriter::with_capacity(131072, file);
101                // Read byte stream
102                let mut response_bytes = response.into_body().into_data_stream();
103                while let Some(chunk) = response_bytes.next().await {
104                    match chunk {
105                        Ok(data) => writer.write_all(data.as_ref()).await?,
106                        Err(e) => return Err(Error::HyperError(e)),
107                    }
108                }
109                writer.flush().await?;
110                writer.shutdown().await?;
111                Ok(())
112            }
113            _ => Err(normal_error(response).await),
114        }
115    }
116    /// Download the object and return the content
117    ///
118    /// If the object is large, this method may use too much memory; use with caution
119    pub async fn download(self) -> Result<Bytes, Error> {
120        // Send request
121        let response = self.req.send_to_oss()?.await?;
122        // Parse the response
123        let status_code = response.status();
124        match status_code {
125            code if code.is_success() => Ok(body_to_bytes(response.into_body()).await?),
126            _ => Err(normal_error(response).await),
127        }
128    }
129    /// Download the object and return a data stream
130    ///
131    /// Use this if the object is large and you do not want to save directly to a file; process the stream yourself
132    ///
133    /// ```ignore
134    /// use futures_util::StreamExt;
135    ///
136    /// let mut stream = object.get_object().download_to_stream().await.unwrap();
137    /// while let Some(item) = stream.next().await {
138    ///     match item {
139    ///         Ok(bytes) => {
140    ///             // Do something with bytes...
141    ///         }
142    ///         Err(e) => eprintln!("Error: {}", e),
143    ///     }
144    /// }
145    /// ```
146    pub async fn download_to_stream(
147        self,
148    ) -> Result<Pin<Box<dyn Stream<Item = Result<bytes::Bytes, Error>> + Send>>, Error> {
149        // Send request
150        let response = self.req.send_to_oss()?.await?;
151        // Parse the response
152        let status_code = response.status();
153        match status_code {
154            code if code.is_success() => {
155                let stream = response
156                    .into_body()
157                    .into_data_stream()
158                    .map(|item| match item {
159                        Ok(bytes) => Ok(bytes),
160                        Err(e) => Err(e.into()),
161                    });
162                Ok(Box::pin(stream))
163            }
164            _ => Err(normal_error(response).await),
165        }
166    }
167}