blobd_client_rs/
lib.rs

1use blobd_token::AuthToken;
2use blobd_token::AuthTokenAction;
3use blobd_token::BlobdTokens;
4use bytes::Bytes;
5use futures::channel::mpsc::UnboundedSender;
6use futures::stream::iter;
7use futures::Stream;
8use futures::StreamExt;
9use futures::TryStreamExt;
10use itertools::Itertools;
11use off64::create_u16_be;
12use off64::create_u40_be;
13use percent_encoding::utf8_percent_encode;
14use percent_encoding::CONTROLS;
15use reqwest::header::CONTENT_LENGTH;
16use reqwest::header::RANGE;
17use reqwest::Body;
18use serde::Deserialize;
19use serde::Serialize;
20use std::error::Error;
21use std::fmt::Display;
22use std::time::SystemTime;
23use std::time::UNIX_EPOCH;
24use url::Url;
25
26// This client has internal state that is not very cheap to clone, nor memory efficient when cloned lots. Therefore, we don't derive Clone for it; wrap it in an Arc for cheap sharing.
27pub struct BlobdClient {
28  client: reqwest::Client,
29  endpoint: String,
30  tokens: BlobdTokens,
31}
32
33fn now() -> u64 {
34  SystemTime::now()
35    .duration_since(UNIX_EPOCH)
36    .expect("system clock is before 1970")
37    .as_secs()
38}
39
40type BoxErr = Box<dyn Error + Send + Sync>;
41
42pub struct BatchCreateObjectEntry<DS: Stream<Item = Result<Bytes, BoxErr>>> {
43  pub size: u64,
44  pub data_stream: DS,
45  pub key: Vec<u8>,
46}
47
48pub struct BatchCreatedObjects {
49  pub successful_count: u64,
50}
51
52#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
53pub struct CreatedObject {
54  pub object_id: u64,
55  pub upload_token: String,
56}
57
58#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
59pub struct WrittenObjectPart {
60  pub write_receipt: String,
61}
62
63#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
64pub struct InspectedObject {
65  pub object_id: u64,
66  pub content_length: u64,
67}
68
69impl BlobdClient {
70  /// The `endpoint` should be something like `https://127.0.0.1:8080` or `https://my.blobd.io`. Make sure to omit the trailing slash (i.e. empty path).
71  pub fn new(endpoint: String, token_secret: [u8; 32]) -> BlobdClient {
72    BlobdClient {
73      client: reqwest::Client::new(),
74      endpoint,
75      tokens: BlobdTokens::new(token_secret),
76    }
77  }
78
79  fn build_url(&self, key: &str) -> Url {
80    let mut url = self.endpoint.clone();
81    for p in key.split('/') {
82      url.push('/');
83      url.extend(utf8_percent_encode(p, CONTROLS));
84    }
85    Url::parse(&url).unwrap()
86  }
87
88  /// NOTE: This does not encode the parameter value, as it's expected this would be passed into a URL builder. However, currently the token doesn't contain any character that would require encoding anyway, so it's safe either way.
89  pub fn generate_token_query_param(
90    &self,
91    action: AuthTokenAction,
92    expires_in_seconds: u64,
93  ) -> (&'static str, String) {
94    let t = AuthToken::new(&self.tokens, action, now() + expires_in_seconds);
95    ("t", t)
96  }
97
98  pub fn generate_presigned_url(
99    &self,
100    key: &str,
101    action: AuthTokenAction,
102    expires_in_seconds: u64,
103  ) -> Url {
104    let mut url = self.build_url(key);
105    let (k, v) = self.generate_token_query_param(action, expires_in_seconds);
106    url.query_pairs_mut().append_pair(k, &v);
107    url
108  }
109
110  /// Parts to this method:
111  /// - The list of objects. It's an infallible stream to allow for scenarios where the list is not known ahead of time or is buffered asynchronously.
112  /// - Each object's data. It's a fallable stream of chunks of bytes, using the bytes::Bytes type as that's what reqwest requires.
113  /// Approaches to the list of objects:
114  /// - If it's all in memory, use `futures::stream::iter(my_list_of_objects)`.
115  /// - If it can be derived from an existing stream, use the `StreamExt` methods to transform items into `BatchCreateObjectEntry`.
116  /// - If it is dynamically built from another thread/async function, use `futures::channel::mpsc::unbounded`; the receiver already implements `Stream` and can be provided as the list.
117  /// Approaches to object data stream:
118  /// - If it's all in memory, use `futures::stream::once(Ok(bytes::Bytes::from(my_object_data)))`.
119  /// - If it's a synchronous Read, wrap in a Stream that reads chunks into a `Bytes`, preferably on a different thread (e.g. `spawn_blocking`).
120  /// - If it's an AsyncRead, read in chunks and wrap each chunk with `Bytes::from`.
121  /// Approaches to error handling:
122  /// - The blobd server will never return a bad status, but will bail as soon as an error occurs.
123  /// - However, reqwest will bail if the request body fails, which can occur if a `data_stream` of a `BatchCreateObjectEntry` yields an `Err` chunk.
124  /// - If reqwest bails, the response will be unavailable and the amount of successfully committed objects will not be returned.
125  /// - Therefore, there are some optional approaches worth considering:
126  ///   - Filter out `BatchCreateObjectEntry` items that have a `data_stream` that will definitely fail, as once an entry starts being transferred, there's no way to skip over it midway.
127  ///   - When a `data_stream` chunk is about to yield `Err`, return an `Ok` instead with empty data and stop the object list stream (as the current data transferred is midway in an object and there's no way to skip over it). The server will notice that the object was not completely uploaded, decide to bail, and return the successful count.
128  /// Provide an optional async channel sender for `transfer_byte_counter` to get data stream chunk sizes as they're about to be uploaded, which can be useful for calculating transfer progress or rate.
129  pub async fn batch_create_objects<DS, Objects>(
130    &self,
131    objects: Objects,
132    transfer_byte_counter: Option<UnboundedSender<usize>>,
133  ) -> reqwest::Result<BatchCreatedObjects>
134  where
135    DS: 'static + Stream<Item = Result<Bytes, BoxErr>> + Send + Sync,
136    Objects: 'static + Stream<Item = BatchCreateObjectEntry<DS>> + Send + Sync,
137  {
138    let body_stream = objects.flat_map(move |e| {
139      let transfer_byte_counter = transfer_byte_counter.clone();
140      iter(vec![
141        Ok(Bytes::from(
142          create_u16_be(e.key.len().try_into().unwrap()).to_vec(),
143        )),
144        Ok(Bytes::from(e.key)),
145        Ok(Bytes::from(create_u40_be(e.size).to_vec())),
146      ])
147      .chain(e.data_stream.inspect_ok(move |chunk| {
148        if let Some(c) = &transfer_byte_counter {
149          c.unbounded_send(chunk.len()).unwrap();
150        }
151      }))
152    });
153    let body = Body::wrap_stream(body_stream);
154    let res = self
155      .client
156      .post(self.endpoint.clone())
157      .query(&[self.generate_token_query_param(AuthTokenAction::BatchCreateObjects {}, 300)])
158      .body(body)
159      .send()
160      .await?
161      .error_for_status()?;
162    Ok(BatchCreatedObjects {
163      successful_count: res
164        .headers()
165        .get("x-blobd-objects-created-count")
166        .unwrap()
167        .to_str()
168        .unwrap()
169        .parse()
170        .unwrap(),
171    })
172  }
173
174  pub async fn create_object(&self, key: &str, size: u64) -> reqwest::Result<CreatedObject> {
175    let res = self
176      .client
177      .post(self.build_url(key))
178      .query(&[
179        ("size", size.to_string()),
180        self.generate_token_query_param(
181          AuthTokenAction::CreateObject {
182            key: key.as_bytes().to_vec(),
183            size,
184          },
185          300,
186        ),
187      ])
188      .send()
189      .await?
190      .error_for_status()?;
191    Ok(CreatedObject {
192      object_id: res
193        .headers()
194        .get("x-blobd-object-id")
195        .unwrap()
196        .to_str()
197        .unwrap()
198        .parse()
199        .unwrap(),
200      upload_token: res
201        .headers()
202        .get("x-blobd-upload-token")
203        .unwrap()
204        .to_str()
205        .unwrap()
206        .parse()
207        .unwrap(),
208    })
209  }
210
211  pub async fn commit_object(
212    &self,
213    key: &str,
214    creation: CreatedObject,
215    write_receipts: impl IntoIterator<Item = impl Display>,
216  ) -> reqwest::Result<()> {
217    self
218      .client
219      .put(self.build_url(key))
220      .query(&[
221        ("object_id", creation.object_id.to_string()),
222        ("upload_token", creation.upload_token.to_string()),
223        ("write_receipts", write_receipts.into_iter().join(",")),
224        self.generate_token_query_param(
225          AuthTokenAction::CommitObject {
226            object_id: creation.object_id,
227          },
228          300,
229        ),
230      ])
231      .send()
232      .await?
233      .error_for_status()?;
234    Ok(())
235  }
236
237  pub async fn delete_object(&self, key: &str) -> reqwest::Result<()> {
238    self
239      .client
240      .delete(self.build_url(key))
241      .query(&[self.generate_token_query_param(
242        AuthTokenAction::DeleteObject {
243          key: key.as_bytes().to_vec(),
244        },
245        300,
246      )])
247      .send()
248      .await?
249      .error_for_status()?;
250    Ok(())
251  }
252
253  pub async fn inspect_object(&self, key: &str) -> reqwest::Result<InspectedObject> {
254    let res = self
255      .client
256      .head(self.build_url(key))
257      .query(&[self.generate_token_query_param(
258        AuthTokenAction::InspectObject {
259          key: key.as_bytes().to_vec(),
260        },
261        300,
262      )])
263      .send()
264      .await?
265      .error_for_status()?;
266    Ok(InspectedObject {
267      object_id: res
268        .headers()
269        .get("x-blobd-object-id")
270        .unwrap()
271        .to_str()
272        .unwrap()
273        .parse()
274        .unwrap(),
275      content_length: res
276        .headers()
277        .get(CONTENT_LENGTH)
278        .unwrap()
279        .to_str()
280        .unwrap()
281        .parse()
282        .unwrap(),
283    })
284  }
285
286  pub async fn read_object(
287    &self,
288    key: &str,
289    start: u64,
290    end: Option<u64>,
291  ) -> reqwest::Result<impl Stream<Item = reqwest::Result<Bytes>>> {
292    let res = self
293      .client
294      .get(self.build_url(key))
295      .query(&[self.generate_token_query_param(
296        AuthTokenAction::ReadObject {
297          key: key.as_bytes().to_vec(),
298        },
299        300,
300      )])
301      .header(
302        RANGE,
303        format!(
304          "bytes={}-{}",
305          start,
306          end.map(|e| e.to_string()).unwrap_or_default()
307        ),
308      )
309      .send()
310      .await?
311      .error_for_status()?;
312    Ok(res.bytes_stream())
313  }
314
315  pub async fn write_object(
316    &self,
317    key: &str,
318    creation: CreatedObject,
319    offset: u64,
320    data: impl Into<Body>,
321  ) -> reqwest::Result<WrittenObjectPart> {
322    let res = self
323      .client
324      .patch(self.build_url(key))
325      .query(&[
326        ("offset", offset.to_string()),
327        ("object_id", creation.object_id.to_string()),
328        ("upload_token", creation.upload_token.to_string()),
329        self.generate_token_query_param(
330          AuthTokenAction::WriteObject {
331            object_id: creation.object_id,
332          },
333          300,
334        ),
335      ])
336      .body(data)
337      .send()
338      .await?
339      .error_for_status()?;
340    Ok(WrittenObjectPart {
341      write_receipt: res
342        .headers()
343        .get("x-blobd-write-receipt")
344        .unwrap()
345        .to_str()
346        .unwrap()
347        .parse()
348        .unwrap(),
349    })
350  }
351}