1use blobd_token::AuthToken;
2use blobd_token::AuthTokenAction;
3use blobd_token::BlobdTokens;
4use bytes::Bytes;
5use futures::channel::mpsc::UnboundedSender;
6use futures::stream::iter;
7use futures::Stream;
8use futures::StreamExt;
9use futures::TryStreamExt;
10use itertools::Itertools;
11use off64::create_u16_be;
12use off64::create_u40_be;
13use percent_encoding::utf8_percent_encode;
14use percent_encoding::CONTROLS;
15use reqwest::header::CONTENT_LENGTH;
16use reqwest::header::RANGE;
17use reqwest::Body;
18use serde::Deserialize;
19use serde::Serialize;
20use std::error::Error;
21use std::fmt::Display;
22use std::time::SystemTime;
23use std::time::UNIX_EPOCH;
24use url::Url;
25
26pub struct BlobdClient {
28 client: reqwest::Client,
29 endpoint: String,
30 tokens: BlobdTokens,
31}
32
33fn now() -> u64 {
34 SystemTime::now()
35 .duration_since(UNIX_EPOCH)
36 .expect("system clock is before 1970")
37 .as_secs()
38}
39
40type BoxErr = Box<dyn Error + Send + Sync>;
41
42pub struct BatchCreateObjectEntry<DS: Stream<Item = Result<Bytes, BoxErr>>> {
43 pub size: u64,
44 pub data_stream: DS,
45 pub key: Vec<u8>,
46}
47
48pub struct BatchCreatedObjects {
49 pub successful_count: u64,
50}
51
52#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
53pub struct CreatedObject {
54 pub object_id: u64,
55 pub upload_token: String,
56}
57
58#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
59pub struct WrittenObjectPart {
60 pub write_receipt: String,
61}
62
63#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
64pub struct InspectedObject {
65 pub object_id: u64,
66 pub content_length: u64,
67}
68
69impl BlobdClient {
70 pub fn new(endpoint: String, token_secret: [u8; 32]) -> BlobdClient {
72 BlobdClient {
73 client: reqwest::Client::new(),
74 endpoint,
75 tokens: BlobdTokens::new(token_secret),
76 }
77 }
78
79 fn build_url(&self, key: &str) -> Url {
80 let mut url = self.endpoint.clone();
81 for p in key.split('/') {
82 url.push('/');
83 url.extend(utf8_percent_encode(p, CONTROLS));
84 }
85 Url::parse(&url).unwrap()
86 }
87
88 pub fn generate_token_query_param(
90 &self,
91 action: AuthTokenAction,
92 expires_in_seconds: u64,
93 ) -> (&'static str, String) {
94 let t = AuthToken::new(&self.tokens, action, now() + expires_in_seconds);
95 ("t", t)
96 }
97
98 pub fn generate_presigned_url(
99 &self,
100 key: &str,
101 action: AuthTokenAction,
102 expires_in_seconds: u64,
103 ) -> Url {
104 let mut url = self.build_url(key);
105 let (k, v) = self.generate_token_query_param(action, expires_in_seconds);
106 url.query_pairs_mut().append_pair(k, &v);
107 url
108 }
109
110 pub async fn batch_create_objects<DS, Objects>(
130 &self,
131 objects: Objects,
132 transfer_byte_counter: Option<UnboundedSender<usize>>,
133 ) -> reqwest::Result<BatchCreatedObjects>
134 where
135 DS: 'static + Stream<Item = Result<Bytes, BoxErr>> + Send + Sync,
136 Objects: 'static + Stream<Item = BatchCreateObjectEntry<DS>> + Send + Sync,
137 {
138 let body_stream = objects.flat_map(move |e| {
139 let transfer_byte_counter = transfer_byte_counter.clone();
140 iter(vec![
141 Ok(Bytes::from(
142 create_u16_be(e.key.len().try_into().unwrap()).to_vec(),
143 )),
144 Ok(Bytes::from(e.key)),
145 Ok(Bytes::from(create_u40_be(e.size).to_vec())),
146 ])
147 .chain(e.data_stream.inspect_ok(move |chunk| {
148 if let Some(c) = &transfer_byte_counter {
149 c.unbounded_send(chunk.len()).unwrap();
150 }
151 }))
152 });
153 let body = Body::wrap_stream(body_stream);
154 let res = self
155 .client
156 .post(self.endpoint.clone())
157 .query(&[self.generate_token_query_param(AuthTokenAction::BatchCreateObjects {}, 300)])
158 .body(body)
159 .send()
160 .await?
161 .error_for_status()?;
162 Ok(BatchCreatedObjects {
163 successful_count: res
164 .headers()
165 .get("x-blobd-objects-created-count")
166 .unwrap()
167 .to_str()
168 .unwrap()
169 .parse()
170 .unwrap(),
171 })
172 }
173
174 pub async fn create_object(&self, key: &str, size: u64) -> reqwest::Result<CreatedObject> {
175 let res = self
176 .client
177 .post(self.build_url(key))
178 .query(&[
179 ("size", size.to_string()),
180 self.generate_token_query_param(
181 AuthTokenAction::CreateObject {
182 key: key.as_bytes().to_vec(),
183 size,
184 },
185 300,
186 ),
187 ])
188 .send()
189 .await?
190 .error_for_status()?;
191 Ok(CreatedObject {
192 object_id: res
193 .headers()
194 .get("x-blobd-object-id")
195 .unwrap()
196 .to_str()
197 .unwrap()
198 .parse()
199 .unwrap(),
200 upload_token: res
201 .headers()
202 .get("x-blobd-upload-token")
203 .unwrap()
204 .to_str()
205 .unwrap()
206 .parse()
207 .unwrap(),
208 })
209 }
210
211 pub async fn commit_object(
212 &self,
213 key: &str,
214 creation: CreatedObject,
215 write_receipts: impl IntoIterator<Item = impl Display>,
216 ) -> reqwest::Result<()> {
217 self
218 .client
219 .put(self.build_url(key))
220 .query(&[
221 ("object_id", creation.object_id.to_string()),
222 ("upload_token", creation.upload_token.to_string()),
223 ("write_receipts", write_receipts.into_iter().join(",")),
224 self.generate_token_query_param(
225 AuthTokenAction::CommitObject {
226 object_id: creation.object_id,
227 },
228 300,
229 ),
230 ])
231 .send()
232 .await?
233 .error_for_status()?;
234 Ok(())
235 }
236
237 pub async fn delete_object(&self, key: &str) -> reqwest::Result<()> {
238 self
239 .client
240 .delete(self.build_url(key))
241 .query(&[self.generate_token_query_param(
242 AuthTokenAction::DeleteObject {
243 key: key.as_bytes().to_vec(),
244 },
245 300,
246 )])
247 .send()
248 .await?
249 .error_for_status()?;
250 Ok(())
251 }
252
253 pub async fn inspect_object(&self, key: &str) -> reqwest::Result<InspectedObject> {
254 let res = self
255 .client
256 .head(self.build_url(key))
257 .query(&[self.generate_token_query_param(
258 AuthTokenAction::InspectObject {
259 key: key.as_bytes().to_vec(),
260 },
261 300,
262 )])
263 .send()
264 .await?
265 .error_for_status()?;
266 Ok(InspectedObject {
267 object_id: res
268 .headers()
269 .get("x-blobd-object-id")
270 .unwrap()
271 .to_str()
272 .unwrap()
273 .parse()
274 .unwrap(),
275 content_length: res
276 .headers()
277 .get(CONTENT_LENGTH)
278 .unwrap()
279 .to_str()
280 .unwrap()
281 .parse()
282 .unwrap(),
283 })
284 }
285
286 pub async fn read_object(
287 &self,
288 key: &str,
289 start: u64,
290 end: Option<u64>,
291 ) -> reqwest::Result<impl Stream<Item = reqwest::Result<Bytes>>> {
292 let res = self
293 .client
294 .get(self.build_url(key))
295 .query(&[self.generate_token_query_param(
296 AuthTokenAction::ReadObject {
297 key: key.as_bytes().to_vec(),
298 },
299 300,
300 )])
301 .header(
302 RANGE,
303 format!(
304 "bytes={}-{}",
305 start,
306 end.map(|e| e.to_string()).unwrap_or_default()
307 ),
308 )
309 .send()
310 .await?
311 .error_for_status()?;
312 Ok(res.bytes_stream())
313 }
314
315 pub async fn write_object(
316 &self,
317 key: &str,
318 creation: CreatedObject,
319 offset: u64,
320 data: impl Into<Body>,
321 ) -> reqwest::Result<WrittenObjectPart> {
322 let res = self
323 .client
324 .patch(self.build_url(key))
325 .query(&[
326 ("offset", offset.to_string()),
327 ("object_id", creation.object_id.to_string()),
328 ("upload_token", creation.upload_token.to_string()),
329 self.generate_token_query_param(
330 AuthTokenAction::WriteObject {
331 object_id: creation.object_id,
332 },
333 300,
334 ),
335 ])
336 .body(data)
337 .send()
338 .await?
339 .error_for_status()?;
340 Ok(WrittenObjectPart {
341 write_receipt: res
342 .headers()
343 .get("x-blobd-write-receipt")
344 .unwrap()
345 .to_str()
346 .unwrap()
347 .parse()
348 .unwrap(),
349 })
350 }
351}