1use blobd_token::AuthToken;
2use blobd_token::AuthTokenAction;
3use blobd_token::BlobdTokens;
4use bytes::Bytes;
5use futures::channel::mpsc::UnboundedSender;
6use futures::stream::iter;
7use futures::Stream;
8use futures::StreamExt;
9use futures::TryStreamExt;
10use itertools::Itertools;
11use off64::int::create_u16_be;
12use off64::int::create_u40_be;
13use percent_encoding::utf8_percent_encode;
14use percent_encoding::CONTROLS;
15use reqwest::header::CONTENT_LENGTH;
16use reqwest::header::RANGE;
17use reqwest::Body;
18use serde::Deserialize;
19use serde::Serialize;
20use std::error::Error;
21use std::fmt::Display;
22use std::time::SystemTime;
23use std::time::UNIX_EPOCH;
24use url::Url;
25
26pub struct BlobdClient {
28 client: reqwest::Client,
29 endpoint: String,
30 tokens: BlobdTokens,
31}
32
33fn now() -> u64 {
34 SystemTime::now()
35 .duration_since(UNIX_EPOCH)
36 .expect("system clock is before 1970")
37 .as_secs()
38}
39
40type BoxErr = Box<dyn Error + Send + Sync>;
41
42pub struct BatchCreateObjectEntry<DS: Stream<Item = Result<Bytes, BoxErr>>> {
43 pub size: u64,
44 pub data_stream: DS,
45 pub key: Vec<u8>,
46}
47
48pub struct BatchCreatedObjects {
49 pub successful_count: u64,
50}
51
52#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
53pub struct CreatedObject {
54 pub upload_token: String,
55}
56
57#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
58pub struct WrittenObjectPart {
59 pub write_receipt: String,
60}
61
62#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
63pub struct InspectedObject {
64 pub object_id: u128,
65 pub content_length: u64,
66}
67
68impl BlobdClient {
69 pub fn new(endpoint: String, token_secret: [u8; 32]) -> BlobdClient {
71 BlobdClient {
72 client: reqwest::Client::new(),
73 endpoint,
74 tokens: BlobdTokens::new(token_secret),
75 }
76 }
77
78 fn build_url(&self, key: &str) -> Url {
79 let mut url = self.endpoint.clone();
80 for p in key.split('/') {
81 url.push('/');
82 url.extend(utf8_percent_encode(p, CONTROLS));
83 }
84 Url::parse(&url).unwrap()
85 }
86
87 pub fn generate_token_query_param(
89 &self,
90 action: AuthTokenAction,
91 expires_in_seconds: u64,
92 ) -> (&'static str, String) {
93 let t = AuthToken::new(&self.tokens, action, now() + expires_in_seconds);
94 ("t", t)
95 }
96
97 pub fn generate_presigned_url(
98 &self,
99 key: &str,
100 action: AuthTokenAction,
101 expires_in_seconds: u64,
102 ) -> Url {
103 let mut url = self.build_url(key);
104 let (k, v) = self.generate_token_query_param(action, expires_in_seconds);
105 url.query_pairs_mut().append_pair(k, &v);
106 url
107 }
108
109 pub async fn batch_create_objects<DS, Objects>(
129 &self,
130 objects: Objects,
131 transfer_byte_counter: Option<UnboundedSender<usize>>,
132 ) -> reqwest::Result<BatchCreatedObjects>
133 where
134 DS: 'static + Stream<Item = Result<Bytes, BoxErr>> + Send + Sync,
135 Objects: 'static + Stream<Item = BatchCreateObjectEntry<DS>> + Send + Sync,
136 {
137 let body_stream = objects.flat_map(move |e| {
138 let transfer_byte_counter = transfer_byte_counter.clone();
139 iter(vec![
140 Ok(Bytes::from(
141 create_u16_be(e.key.len().try_into().unwrap()).to_vec(),
142 )),
143 Ok(Bytes::from(e.key)),
144 Ok(Bytes::from(create_u40_be(e.size).to_vec())),
145 ])
146 .chain(e.data_stream.inspect_ok(move |chunk| {
147 if let Some(c) = &transfer_byte_counter {
148 c.unbounded_send(chunk.len()).unwrap();
149 }
150 }))
151 });
152 let body = Body::wrap_stream(body_stream);
153 let res = self
154 .client
155 .post(self.endpoint.clone())
156 .query(&[self.generate_token_query_param(AuthTokenAction::BatchCreateObjects {}, 300)])
157 .body(body)
158 .send()
159 .await?
160 .error_for_status()?;
161 Ok(BatchCreatedObjects {
162 successful_count: res
163 .headers()
164 .get("x-blobd-objects-created-count")
165 .unwrap()
166 .to_str()
167 .unwrap()
168 .parse()
169 .unwrap(),
170 })
171 }
172
173 pub async fn create_object(&self, key: &str, size: u64) -> reqwest::Result<CreatedObject> {
174 let res = self
175 .client
176 .post(self.build_url(key))
177 .query(&[
178 ("size", size.to_string()),
179 self.generate_token_query_param(
180 AuthTokenAction::CreateObject {
181 key: key.as_bytes().to_vec(),
182 size,
183 },
184 300,
185 ),
186 ])
187 .send()
188 .await?
189 .error_for_status()?;
190 Ok(CreatedObject {
191 upload_token: res
192 .headers()
193 .get("x-blobd-upload-token")
194 .unwrap()
195 .to_str()
196 .unwrap()
197 .parse()
198 .unwrap(),
199 })
200 }
201
202 pub async fn commit_object(
203 &self,
204 key: &str,
205 creation: CreatedObject,
206 write_receipts: impl IntoIterator<Item = impl Display>,
207 ) -> reqwest::Result<()> {
208 self
209 .client
210 .put(self.build_url(key))
211 .query(&[
212 ("upload_token", creation.upload_token.to_string()),
213 ("write_receipts", write_receipts.into_iter().join(",")),
214 self.generate_token_query_param(
215 AuthTokenAction::CommitObject {
216 key: key.as_bytes().to_vec(),
217 },
218 300,
219 ),
220 ])
221 .send()
222 .await?
223 .error_for_status()?;
224 Ok(())
225 }
226
227 pub async fn delete_object(&self, key: &str) -> reqwest::Result<()> {
228 self
229 .client
230 .delete(self.build_url(key))
231 .query(&[self.generate_token_query_param(
232 AuthTokenAction::DeleteObject {
233 key: key.as_bytes().to_vec(),
234 },
235 300,
236 )])
237 .send()
238 .await?
239 .error_for_status()?;
240 Ok(())
241 }
242
243 pub async fn inspect_object(&self, key: &str) -> reqwest::Result<InspectedObject> {
244 let res = self
245 .client
246 .head(self.build_url(key))
247 .query(&[self.generate_token_query_param(
248 AuthTokenAction::InspectObject {
249 key: key.as_bytes().to_vec(),
250 },
251 300,
252 )])
253 .send()
254 .await?
255 .error_for_status()?;
256 Ok(InspectedObject {
257 object_id: res
258 .headers()
259 .get("x-blobd-object-id")
260 .unwrap()
261 .to_str()
262 .unwrap()
263 .parse()
264 .unwrap(),
265 content_length: res
266 .headers()
267 .get(CONTENT_LENGTH)
268 .unwrap()
269 .to_str()
270 .unwrap()
271 .parse()
272 .unwrap(),
273 })
274 }
275
276 pub async fn read_object(
277 &self,
278 key: &str,
279 start: Option<u64>,
280 end: Option<u64>,
281 ) -> reqwest::Result<impl Stream<Item = reqwest::Result<Bytes>>> {
282 let mut req = self
283 .client
284 .get(self.build_url(key))
285 .query(&[self.generate_token_query_param(
286 AuthTokenAction::ReadObject {
287 key: key.as_bytes().to_vec(),
288 },
289 300,
290 )]);
291
292 if let Some(start) = start {
293 req = req.header(
294 RANGE,
295 format!(
296 "bytes={}-{}",
297 start,
298 end.map(|e| e.to_string()).unwrap_or_default()
299 ),
300 );
301 }
302
303 let res = req.send().await?.error_for_status()?;
304 Ok(res.bytes_stream())
305 }
306
307 pub async fn write_object(
308 &self,
309 key: &str,
310 creation: CreatedObject,
311 offset: u64,
312 data: impl Into<Body>,
313 ) -> reqwest::Result<WrittenObjectPart> {
314 let res = self
315 .client
316 .patch(self.build_url(key))
317 .query(&[
318 ("offset", offset.to_string()),
319 ("upload_token", creation.upload_token.to_string()),
320 self.generate_token_query_param(
321 AuthTokenAction::WriteObject {
322 key: key.as_bytes().to_vec(),
323 },
324 300,
325 ),
326 ])
327 .body(data)
328 .send()
329 .await?
330 .error_for_status()?;
331 Ok(WrittenObjectPart {
332 write_receipt: res
333 .headers()
334 .get("x-blobd-write-receipt")
335 .unwrap()
336 .to_str()
337 .unwrap()
338 .parse()
339 .unwrap(),
340 })
341 }
342}