Skip to main content

tame_gcs/v1/objects/insert/
resumable.rs

1use super::*;
2use crate::objects::{Metadata, Object};
3
4#[derive(Clone)]
5pub struct ResumableSession(pub http::Uri);
6
7impl From<ResumableSession> for http::Uri {
8    fn from(rs: ResumableSession) -> Self {
9        rs.0
10    }
11}
12
13/// The response from an [`Object::init_resumable_insert`] request is the
14/// `session_uri`.
15pub struct InitResumableInsertResponse {
16    pub resumable_session: ResumableSession,
17}
18
19impl ApiResponse<&[u8]> for InitResumableInsertResponse {}
20impl ApiResponse<bytes::Bytes> for InitResumableInsertResponse {}
21
22impl<B> TryFrom<http::Response<B>> for InitResumableInsertResponse
23where
24    B: AsRef<[u8]>,
25{
26    type Error = Error;
27
28    fn try_from(response: http::Response<B>) -> Result<Self, Self::Error> {
29        let (parts, _body) = response.into_parts();
30        match parts.headers.get(http::header::LOCATION) {
31            Some(session_uri) => match session_uri.to_str() {
32                Ok(session_uri) => Ok(Self {
33                    resumable_session: ResumableSession(session_uri.parse()?),
34                }),
35                Err(_err) => Err(Error::OpaqueHeaderValue(session_uri.clone())),
36            },
37            None => Err(Error::UnknownHeader(http::header::LOCATION)),
38        }
39    }
40}
41
42pub enum ResumableInsertResponseMetadata {
43    PartialSize(u64),
44    Complete(Box<Metadata>),
45}
46
47/// The response from an [`resumable_upload`](#method.resumable_upload) request
48/// is the enum [`ResumableInsertResponseMetadata`], which would be the size of
49/// the object uploaded so far, unless it's the request with last chunk that
50/// completes the upload wherein it would be the object
51/// [metadata](https://cloud.google.com/storage/docs/json_api/v1/objects#resource).
52pub struct ResumableInsertResponse {
53    pub metadata: ResumableInsertResponseMetadata,
54}
55
56impl ResumableInsertResponse {
57    fn try_from_response<B: AsRef<[u8]>>(
58        response: http::response::Response<B>,
59    ) -> Result<Self, Error> {
60        let status = response.status();
61        if status.eq(&http::StatusCode::PERMANENT_REDIRECT)
62        // Cloud Storage uses 308 (PERMANENT_REDIRECT) in a non-standard way though.
63        // See https://cloud.google.com/storage/docs/json_api/v1/status-codes#308_Resume_Incomplete
64            || status.eq(&http::StatusCode::OK)
65            || status.eq(&http::StatusCode::CREATED)
66        {
67            Self::try_from(response)
68        } else {
69            // If we get an error, but with a plain text payload, attempt to deserialize
70            // an ApiError from it, otherwise fallback to the simple HttpStatus
71            if let Some(ct) = response
72                .headers()
73                .get(http::header::CONTENT_TYPE)
74                .and_then(|ct| ct.to_str().ok())
75            {
76                if ct.starts_with("text/plain") && !response.body().as_ref().is_empty() {
77                    if let Ok(message) = std::str::from_utf8(response.body().as_ref()) {
78                        let api_err = error::ApiError {
79                            code: status.into(),
80                            message: message.to_owned(),
81                            errors: vec![],
82                        };
83                        return Err(Error::Api(api_err));
84                    }
85                }
86            }
87            Err(Error::from(response.status()))
88        }
89    }
90}
91
92impl ApiResponse<&[u8]> for ResumableInsertResponse {
93    fn try_from_parts(response: http::response::Response<&[u8]>) -> Result<Self, Error> {
94        Self::try_from_response(response)
95    }
96}
97
98impl ApiResponse<bytes::Bytes> for ResumableInsertResponse {
99    fn try_from_parts(response: http::response::Response<bytes::Bytes>) -> Result<Self, Error> {
100        Self::try_from_response(response)
101    }
102}
103
104impl<B> TryFrom<http::Response<B>> for ResumableInsertResponse
105where
106    B: AsRef<[u8]>,
107{
108    type Error = Error;
109
110    fn try_from(response: http::Response<B>) -> Result<Self, Self::Error> {
111        if response.status().eq(&http::StatusCode::PERMANENT_REDIRECT) {
112            let (parts, _body) = response.into_parts();
113            let end_pos = match parts.headers.get(http::header::RANGE) {
114                Some(range_val) => match range_val.to_str() {
115                    Ok(range) => match range.split('-').next_back() {
116                        Some(pos) => {
117                            let pos = pos.parse::<u64>();
118                            match pos {
119                                Ok(pos) => Ok(pos),
120                                Err(_err) => Err(Error::OpaqueHeaderValue(range_val.clone())),
121                            }
122                        }
123                        None => Err(Error::UnknownHeader(http::header::RANGE)),
124                    },
125                    Err(_err) => Err(Error::OpaqueHeaderValue(range_val.clone())),
126                },
127                None => Err(Error::UnknownHeader(http::header::RANGE)),
128            }?;
129            Ok(Self {
130                metadata: ResumableInsertResponseMetadata::PartialSize(end_pos + 1),
131            })
132        } else {
133            let (_parts, body) = response.into_parts();
134            let metadata = Box::new(serde_json::from_slice(body.as_ref())?);
135            Ok(Self {
136                metadata: ResumableInsertResponseMetadata::Complete(metadata),
137            })
138        }
139    }
140}
141
142pub struct CancelResumableInsertResponse;
143
144impl CancelResumableInsertResponse {
145    fn try_from_response<B: AsRef<[u8]>>(
146        response: http::response::Response<B>,
147    ) -> Result<Self, Error> {
148        if response.status().as_u16() == 499 {
149            // See https://cloud.google.com/storage/docs/json_api/v1/status-codes#499_Client_Closed_Request
150            Self::try_from(response)
151        } else {
152            Err(Error::from(response.status()))
153        }
154    }
155}
156
157impl ApiResponse<&[u8]> for CancelResumableInsertResponse {
158    fn try_from_parts(response: http::response::Response<&[u8]>) -> Result<Self, Error> {
159        Self::try_from_response(response)
160    }
161}
162
163impl ApiResponse<bytes::Bytes> for CancelResumableInsertResponse {
164    fn try_from_parts(response: http::response::Response<bytes::Bytes>) -> Result<Self, Error> {
165        Self::try_from_response(response)
166    }
167}
168
169impl<B> TryFrom<http::Response<B>> for CancelResumableInsertResponse
170where
171    B: AsRef<[u8]>,
172{
173    type Error = Error;
174
175    fn try_from(_response: http::Response<B>) -> Result<Self, Self::Error> {
176        Ok(Self)
177    }
178}
179
180impl Object {
181    /// Initiates a resumable upload session.
182    ///
183    /// * Accepted Media MIME types: `*/*`
184    ///
185    /// Note: A resumable upload must be completed within a week of being initiated.
186    ///
187    /// **CAUTION**: Be careful when sharing the resumable session URI, because
188    /// it can be used by anyone to upload data to the target bucket without any
189    /// further authentication.
190    ///  
191    /// Required IAM Permissions: `storage.objects.create`, `storage.objects.delete`
192    ///
193    /// Note: `storage.objects.delete` is only needed if an object with the same
194    /// name already exists.
195    ///
196    /// [Complete API Documentation](https://cloud.google.com/storage/docs/performing-resumable-uploads#initiate-session)
197    pub fn resumable_insert_init<'a, OID>(
198        &self,
199        id: &OID,
200        content_type: Option<&str>,
201    ) -> Result<http::Request<()>, Error>
202    where
203        OID: ObjectIdentifier<'a> + ?Sized,
204    {
205        let uri = format!(
206            "https://{}/upload/storage/v1/b/{}/o?uploadType=resumable&name={}",
207            self.authority.as_str(),
208            percent_encoding::percent_encode(id.bucket().as_ref(), crate::util::PATH_ENCODE_SET,),
209            percent_encoding::percent_encode(id.object().as_ref(), crate::util::QUERY_ENCODE_SET,),
210        );
211
212        let req_builder = http::Request::builder()
213            .header(http::header::CONTENT_LENGTH, 0u64)
214            .header(
215                http::header::HeaderName::from_static("x-upload-content-type"),
216                http::header::HeaderValue::from_str(
217                    content_type.unwrap_or("application/octet-stream"),
218                )
219                .map_err(http::Error::from)?,
220            );
221
222        Ok(req_builder.method("POST").uri(uri).body(())?)
223    }
224
225    /// Cancels an incomplete resumable upload and prevent any further action for
226    /// `session_uri`, which should have been obtained using [`Object::init_resumable_insert`]
227    ///
228    /// [Complete API Documentation](https://cloud.google.com/storage/docs/performing-resumable-uploads#cancel-upload)
229    pub fn resumable_cancel(session: ResumableSession) -> Result<http::Request<()>, Error> {
230        let req_builder = http::Request::builder().header(http::header::CONTENT_LENGTH, 0u64);
231
232        Ok(req_builder.method("DELETE").uri(session).body(())?)
233    }
234
235    /// Performs resumable upload to the specified `session_uri`, which should
236    /// have been obtained using [`Object::init_resumable_insert`]
237    ///
238    /// * Maximum total object size: `5TB`
239    ///
240    /// There are two ways to upload the object's data:
241    /// * For single chunk upload, set `length` to the total size of the object.
242    /// * For multiple chunks upload, set `length` to the size of current chunk
243    ///   that is being uploaded and `Content-Range` header as
244    ///   `bytes CHUNK_FIRST_BYTE-CHUNK_LAST_BYTE/TOTAL_OBJECT_SIZE` where:
245    ///    * `CHUNK_FIRST_BYTE` is the starting byte in the overall object that
246    ///      the chunk you're uploading contains.
247    ///    * `CHUNK_LAST_BYTE` is the ending byte in the overall object that the
248    ///      chunk you're uploading contains.
249    ///    * `TOTAL_OBJECT_SIZE` is the total size of the object you are uploading.
250    ///
251    /// **NOTE**: `length` should be a multiple of 256KiB, unless it's the last
252    /// chunk. If not, the server will not accept all bytes sent in the request.
253    /// Also, it is recommended to use at least 8MiB.
254    ///
255    /// [Complete API Documentation](https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload)
256    pub fn resumable_append<B>(
257        session: ResumableSession,
258        content: B,
259        length: u64,
260    ) -> Result<http::Request<B>, Error> {
261        let req_builder = http::Request::builder().header(http::header::CONTENT_LENGTH, length);
262
263        Ok(req_builder.method("PUT").uri(session).body(content)?)
264    }
265}