tame_gcs/v1/objects/insert/
resumable.rs1use super::*;
2use crate::objects::{Metadata, Object};
3
4#[derive(Clone)]
5pub struct ResumableSession(pub http::Uri);
6
7impl From<ResumableSession> for http::Uri {
8 fn from(rs: ResumableSession) -> Self {
9 rs.0
10 }
11}
12
13pub struct InitResumableInsertResponse {
16 pub resumable_session: ResumableSession,
17}
18
19impl ApiResponse<&[u8]> for InitResumableInsertResponse {}
20impl ApiResponse<bytes::Bytes> for InitResumableInsertResponse {}
21
22impl<B> TryFrom<http::Response<B>> for InitResumableInsertResponse
23where
24 B: AsRef<[u8]>,
25{
26 type Error = Error;
27
28 fn try_from(response: http::Response<B>) -> Result<Self, Self::Error> {
29 let (parts, _body) = response.into_parts();
30 match parts.headers.get(http::header::LOCATION) {
31 Some(session_uri) => match session_uri.to_str() {
32 Ok(session_uri) => Ok(Self {
33 resumable_session: ResumableSession(session_uri.parse()?),
34 }),
35 Err(_err) => Err(Error::OpaqueHeaderValue(session_uri.clone())),
36 },
37 None => Err(Error::UnknownHeader(http::header::LOCATION)),
38 }
39 }
40}
41
42pub enum ResumableInsertResponseMetadata {
43 PartialSize(u64),
44 Complete(Box<Metadata>),
45}
46
47pub struct ResumableInsertResponse {
53 pub metadata: ResumableInsertResponseMetadata,
54}
55
56impl ResumableInsertResponse {
57 fn try_from_response<B: AsRef<[u8]>>(
58 response: http::response::Response<B>,
59 ) -> Result<Self, Error> {
60 let status = response.status();
61 if status.eq(&http::StatusCode::PERMANENT_REDIRECT)
62 || status.eq(&http::StatusCode::OK)
65 || status.eq(&http::StatusCode::CREATED)
66 {
67 Self::try_from(response)
68 } else {
69 if let Some(ct) = response
72 .headers()
73 .get(http::header::CONTENT_TYPE)
74 .and_then(|ct| ct.to_str().ok())
75 {
76 if ct.starts_with("text/plain") && !response.body().as_ref().is_empty() {
77 if let Ok(message) = std::str::from_utf8(response.body().as_ref()) {
78 let api_err = error::ApiError {
79 code: status.into(),
80 message: message.to_owned(),
81 errors: vec![],
82 };
83 return Err(Error::Api(api_err));
84 }
85 }
86 }
87 Err(Error::from(response.status()))
88 }
89 }
90}
91
92impl ApiResponse<&[u8]> for ResumableInsertResponse {
93 fn try_from_parts(response: http::response::Response<&[u8]>) -> Result<Self, Error> {
94 Self::try_from_response(response)
95 }
96}
97
98impl ApiResponse<bytes::Bytes> for ResumableInsertResponse {
99 fn try_from_parts(response: http::response::Response<bytes::Bytes>) -> Result<Self, Error> {
100 Self::try_from_response(response)
101 }
102}
103
104impl<B> TryFrom<http::Response<B>> for ResumableInsertResponse
105where
106 B: AsRef<[u8]>,
107{
108 type Error = Error;
109
110 fn try_from(response: http::Response<B>) -> Result<Self, Self::Error> {
111 if response.status().eq(&http::StatusCode::PERMANENT_REDIRECT) {
112 let (parts, _body) = response.into_parts();
113 let end_pos = match parts.headers.get(http::header::RANGE) {
114 Some(range_val) => match range_val.to_str() {
115 Ok(range) => match range.split('-').next_back() {
116 Some(pos) => {
117 let pos = pos.parse::<u64>();
118 match pos {
119 Ok(pos) => Ok(pos),
120 Err(_err) => Err(Error::OpaqueHeaderValue(range_val.clone())),
121 }
122 }
123 None => Err(Error::UnknownHeader(http::header::RANGE)),
124 },
125 Err(_err) => Err(Error::OpaqueHeaderValue(range_val.clone())),
126 },
127 None => Err(Error::UnknownHeader(http::header::RANGE)),
128 }?;
129 Ok(Self {
130 metadata: ResumableInsertResponseMetadata::PartialSize(end_pos + 1),
131 })
132 } else {
133 let (_parts, body) = response.into_parts();
134 let metadata = Box::new(serde_json::from_slice(body.as_ref())?);
135 Ok(Self {
136 metadata: ResumableInsertResponseMetadata::Complete(metadata),
137 })
138 }
139 }
140}
141
142pub struct CancelResumableInsertResponse;
143
144impl CancelResumableInsertResponse {
145 fn try_from_response<B: AsRef<[u8]>>(
146 response: http::response::Response<B>,
147 ) -> Result<Self, Error> {
148 if response.status().as_u16() == 499 {
149 Self::try_from(response)
151 } else {
152 Err(Error::from(response.status()))
153 }
154 }
155}
156
157impl ApiResponse<&[u8]> for CancelResumableInsertResponse {
158 fn try_from_parts(response: http::response::Response<&[u8]>) -> Result<Self, Error> {
159 Self::try_from_response(response)
160 }
161}
162
163impl ApiResponse<bytes::Bytes> for CancelResumableInsertResponse {
164 fn try_from_parts(response: http::response::Response<bytes::Bytes>) -> Result<Self, Error> {
165 Self::try_from_response(response)
166 }
167}
168
169impl<B> TryFrom<http::Response<B>> for CancelResumableInsertResponse
170where
171 B: AsRef<[u8]>,
172{
173 type Error = Error;
174
175 fn try_from(_response: http::Response<B>) -> Result<Self, Self::Error> {
176 Ok(Self)
177 }
178}
179
180impl Object {
181 pub fn resumable_insert_init<'a, OID>(
198 &self,
199 id: &OID,
200 content_type: Option<&str>,
201 ) -> Result<http::Request<()>, Error>
202 where
203 OID: ObjectIdentifier<'a> + ?Sized,
204 {
205 let uri = format!(
206 "https://{}/upload/storage/v1/b/{}/o?uploadType=resumable&name={}",
207 self.authority.as_str(),
208 percent_encoding::percent_encode(id.bucket().as_ref(), crate::util::PATH_ENCODE_SET,),
209 percent_encoding::percent_encode(id.object().as_ref(), crate::util::QUERY_ENCODE_SET,),
210 );
211
212 let req_builder = http::Request::builder()
213 .header(http::header::CONTENT_LENGTH, 0u64)
214 .header(
215 http::header::HeaderName::from_static("x-upload-content-type"),
216 http::header::HeaderValue::from_str(
217 content_type.unwrap_or("application/octet-stream"),
218 )
219 .map_err(http::Error::from)?,
220 );
221
222 Ok(req_builder.method("POST").uri(uri).body(())?)
223 }
224
225 pub fn resumable_cancel(session: ResumableSession) -> Result<http::Request<()>, Error> {
230 let req_builder = http::Request::builder().header(http::header::CONTENT_LENGTH, 0u64);
231
232 Ok(req_builder.method("DELETE").uri(session).body(())?)
233 }
234
235 pub fn resumable_append<B>(
257 session: ResumableSession,
258 content: B,
259 length: u64,
260 ) -> Result<http::Request<B>, Error> {
261 let req_builder = http::Request::builder().header(http::header::CONTENT_LENGTH, length);
262
263 Ok(req_builder.method("PUT").uri(session).body(content)?)
264 }
265}