tame_gcs/v1/objects/insert/
multipart.rs1use super::InsertObjectOptional;
2use crate::{
3 Error,
4 objects::{Metadata, Object},
5 types::{BucketName, ObjectName},
6};
7use std::io;
8
9#[cfg(feature = "async-multipart")]
10mod async_mp;
11
12const MULTI_PART_SEPARATOR: &[u8] = b"--tame_gcs\n";
13const MULTI_PART_SUFFIX: &[u8] = b"\n--tame_gcs--";
14const MULTI_PART_CT: &[u8] = b"content-type: application/json; charset=utf-8\n\n";
15
16enum MultipartPart {
17 Prefix,
18 Body,
19 Suffix,
20 End,
21}
22
23impl MultipartPart {
24 fn next(&mut self) {
25 match self {
26 MultipartPart::Prefix => *self = MultipartPart::Body,
27 MultipartPart::Body => *self = MultipartPart::Suffix,
28 MultipartPart::Suffix => *self = MultipartPart::End,
29 MultipartPart::End => unreachable!(),
30 }
31 }
32}
33
34struct MultipartCursor {
35 position: usize,
36 part: MultipartPart,
37}
38
39pub struct Multipart<B> {
42 body: B,
43 prefix: bytes::Bytes,
44 body_len: u64,
45 total_len: u64,
46 cursor: MultipartCursor,
47}
48
49impl<B> Multipart<B> {
50 #[cfg(feature = "async-multipart")]
51 pin_utils::unsafe_pinned!(body: B);
52
53 pub fn wrap(body: B, body_length: u64, metadata: &Metadata) -> Result<Self, Error> {
57 use bytes::BufMut;
58
59 const CT_HN: &[u8] = b"content-type: ";
60
61 let serialized_metadata = serde_json::to_vec(metadata)?;
63 let content_type = metadata
64 .content_type
65 .as_deref()
66 .unwrap_or("application/octet-stream")
67 .as_bytes();
68
69 let metadata = &serialized_metadata[..];
70
71 let prefix_len = MULTI_PART_SEPARATOR.len()
90 + MULTI_PART_CT.len()
91 + metadata.len()
92 + 1
93 + MULTI_PART_SEPARATOR.len()
94 + CT_HN.len()
95 + content_type.len()
96 + 2;
97
98 let prefix = {
99 let mut prefix = bytes::BytesMut::with_capacity(prefix_len);
100 prefix.put_slice(MULTI_PART_SEPARATOR);
101 prefix.put_slice(MULTI_PART_CT);
102 prefix.put_slice(metadata);
103 prefix.put_slice(b"\n");
104 prefix.put_slice(MULTI_PART_SEPARATOR);
105 prefix.put_slice(CT_HN);
106 prefix.put_slice(content_type);
107 prefix.put_slice(b"\n\n");
108
109 prefix.freeze()
110 };
111
112 let total_len = prefix_len as u64 + body_length + MULTI_PART_SUFFIX.len() as u64;
113
114 Ok(Self {
115 body,
116 prefix,
117 body_len: body_length,
118 total_len,
119 cursor: MultipartCursor {
120 position: 0,
121 part: MultipartPart::Prefix,
122 },
123 })
124 }
125
126 pub fn total_len(&self) -> u64 {
128 self.total_len
129 }
130}
131
132impl<B> io::Read for Multipart<B>
133where
134 B: io::Read,
135{
136 fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
137 use std::cmp::min;
138 let mut total_copied = 0;
139
140 while total_copied < buffer.len() {
141 let buf = &mut buffer[total_copied..];
142
143 let (copied, len) = match self.cursor.part {
144 MultipartPart::Prefix => {
145 let to_copy = min(buf.len(), self.prefix.len() - self.cursor.position);
146
147 buf[..to_copy].copy_from_slice(
148 &self.prefix[self.cursor.position..self.cursor.position + to_copy],
149 );
150
151 (to_copy, self.prefix.len())
152 }
153 MultipartPart::Body => {
154 let copied = self.body.read(buf)?;
155 (copied, self.body_len as usize)
156 }
157 MultipartPart::Suffix => {
158 let to_copy = min(buf.len(), MULTI_PART_SUFFIX.len() - self.cursor.position);
159
160 buf[..to_copy].copy_from_slice(
161 &MULTI_PART_SUFFIX[self.cursor.position..self.cursor.position + to_copy],
162 );
163
164 (to_copy, MULTI_PART_SUFFIX.len())
165 }
166 MultipartPart::End => return Ok(total_copied),
167 };
168
169 self.cursor.position += copied;
170 total_copied += copied;
171
172 if self.cursor.position == len {
173 self.cursor.part.next();
174 self.cursor.position = 0;
175 }
176 }
177
178 Ok(total_copied)
179 }
180}
181
182impl Object {
183 pub fn insert_multipart<B>(
204 &self,
205 bucket: &BucketName<'_>,
206 content: B,
207 length: u64,
208 metadata: &Metadata,
209 optional: Option<InsertObjectOptional<'_>>,
210 ) -> Result<http::Request<Multipart<B>>, Error> {
211 match metadata.name {
214 Some(ref name) => ObjectName::try_from(name.as_ref())?,
215 None => {
216 return Err(Error::InvalidLength {
217 len: 0,
218 min: 1,
219 max: 1024,
220 });
221 }
222 };
223
224 let mut uri = format!(
225 "https://{}/upload/storage/v1/b/{}/o?uploadType=multipart",
226 self.authority.as_str(),
227 percent_encoding::percent_encode(bucket.as_ref(), crate::util::PATH_ENCODE_SET,),
228 );
229
230 let query = optional.unwrap_or_default();
231
232 let multipart = Multipart::wrap(content, length, metadata)?;
233
234 let req_builder = http::Request::builder()
235 .header(
236 http::header::CONTENT_TYPE,
237 http::header::HeaderValue::from_static("multipart/related; boundary=tame_gcs"),
238 )
239 .header(http::header::CONTENT_LENGTH, multipart.total_len());
240
241 let query_params = serde_urlencoded::to_string(query)?;
242 if !query_params.is_empty() {
243 uri.push('&');
244 uri.push_str(&query_params);
245 }
246
247 Ok(req_builder.method("POST").uri(uri).body(multipart)?)
248 }
249}