blobd_client_rs/
lib.rs

1use blobd_token::AuthToken;
2use blobd_token::AuthTokenAction;
3use blobd_token::BlobdTokens;
4use bytes::Bytes;
5use futures::channel::mpsc::UnboundedSender;
6use futures::stream::iter;
7use futures::Stream;
8use futures::StreamExt;
9use futures::TryStreamExt;
10use itertools::Itertools;
11use off64::int::create_u16_be;
12use off64::int::create_u40_be;
13use percent_encoding::utf8_percent_encode;
14use percent_encoding::CONTROLS;
15use reqwest::header::CONTENT_LENGTH;
16use reqwest::header::RANGE;
17use reqwest::Body;
18use serde::Deserialize;
19use serde::Serialize;
20use std::error::Error;
21use std::fmt::Display;
22use std::time::SystemTime;
23use std::time::UNIX_EPOCH;
24use url::Url;
25
26// This client has internal state that is not very cheap to clone, nor memory efficient when cloned lots. Therefore, we don't derive Clone for it; wrap it in an Arc for cheap sharing.
27pub struct BlobdClient {
28  client: reqwest::Client,
29  endpoint: String,
30  tokens: BlobdTokens,
31}
32
33fn now() -> u64 {
34  SystemTime::now()
35    .duration_since(UNIX_EPOCH)
36    .expect("system clock is before 1970")
37    .as_secs()
38}
39
40type BoxErr = Box<dyn Error + Send + Sync>;
41
42pub struct BatchCreateObjectEntry<DS: Stream<Item = Result<Bytes, BoxErr>>> {
43  pub size: u64,
44  pub data_stream: DS,
45  pub key: Vec<u8>,
46}
47
48pub struct BatchCreatedObjects {
49  pub successful_count: u64,
50}
51
52#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
53pub struct CreatedObject {
54  pub upload_token: String,
55}
56
57#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
58pub struct WrittenObjectPart {
59  pub write_receipt: String,
60}
61
62#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
63pub struct InspectedObject {
64  pub object_id: u128,
65  pub content_length: u64,
66}
67
68impl BlobdClient {
69  /// The `endpoint` should be something like `https://127.0.0.1:8080` or `https://my.blobd.io`. Make sure to omit the trailing slash (i.e. empty path).
70  pub fn new(endpoint: String, token_secret: [u8; 32]) -> BlobdClient {
71    BlobdClient {
72      client: reqwest::Client::new(),
73      endpoint,
74      tokens: BlobdTokens::new(token_secret),
75    }
76  }
77
78  fn build_url(&self, key: &str) -> Url {
79    let mut url = self.endpoint.clone();
80    for p in key.split('/') {
81      url.push('/');
82      url.extend(utf8_percent_encode(p, CONTROLS));
83    }
84    Url::parse(&url).unwrap()
85  }
86
87  /// NOTE: This does not encode the parameter value, as it's expected this would be passed into a URL builder. However, currently the token doesn't contain any character that would require encoding anyway, so it's safe either way.
88  pub fn generate_token_query_param(
89    &self,
90    action: AuthTokenAction,
91    expires_in_seconds: u64,
92  ) -> (&'static str, String) {
93    let t = AuthToken::new(&self.tokens, action, now() + expires_in_seconds);
94    ("t", t)
95  }
96
97  pub fn generate_presigned_url(
98    &self,
99    key: &str,
100    action: AuthTokenAction,
101    expires_in_seconds: u64,
102  ) -> Url {
103    let mut url = self.build_url(key);
104    let (k, v) = self.generate_token_query_param(action, expires_in_seconds);
105    url.query_pairs_mut().append_pair(k, &v);
106    url
107  }
108
109  /// Parts to this method:
110  /// - The list of objects. It's an infallible stream to allow for scenarios where the list is not known ahead of time or is buffered asynchronously.
111  /// - Each object's data. It's a fallable stream of chunks of bytes, using the bytes::Bytes type as that's what reqwest requires.
112  /// Approaches to the list of objects:
113  /// - If it's all in memory, use `futures::stream::iter(my_list_of_objects)`.
114  /// - If it can be derived from an existing stream, use the `StreamExt` methods to transform items into `BatchCreateObjectEntry`.
115  /// - If it is dynamically built from another thread/async function, use `futures::channel::mpsc::unbounded`; the receiver already implements `Stream` and can be provided as the list.
116  /// Approaches to object data stream:
117  /// - If it's all in memory, use `futures::stream::once(Ok(bytes::Bytes::from(my_object_data)))`.
118  /// - If it's a synchronous Read, wrap in a Stream that reads chunks into a `Bytes`, preferably on a different thread (e.g. `spawn_blocking`).
119  /// - If it's an AsyncRead, read in chunks and wrap each chunk with `Bytes::from`.
120  /// Approaches to error handling:
121  /// - The blobd server will never return a bad status, but will bail as soon as an error occurs.
122  /// - However, reqwest will bail if the request body fails, which can occur if a `data_stream` of a `BatchCreateObjectEntry` yields an `Err` chunk.
123  /// - If reqwest bails, the response will be unavailable and the amount of successfully committed objects will not be returned.
124  /// - Therefore, there are some optional approaches worth considering:
125  ///   - Filter out `BatchCreateObjectEntry` items that have a `data_stream` that will definitely fail, as once an entry starts being transferred, there's no way to skip over it midway.
126  ///   - When a `data_stream` chunk is about to yield `Err`, return an `Ok` instead with empty data and stop the object list stream (as the current data transferred is midway in an object and there's no way to skip over it). The server will notice that the object was not completely uploaded, decide to bail, and return the successful count.
127  /// Provide an optional async channel sender for `transfer_byte_counter` to get data stream chunk sizes as they're about to be uploaded, which can be useful for calculating transfer progress or rate.
128  pub async fn batch_create_objects<DS, Objects>(
129    &self,
130    objects: Objects,
131    transfer_byte_counter: Option<UnboundedSender<usize>>,
132  ) -> reqwest::Result<BatchCreatedObjects>
133  where
134    DS: 'static + Stream<Item = Result<Bytes, BoxErr>> + Send + Sync,
135    Objects: 'static + Stream<Item = BatchCreateObjectEntry<DS>> + Send + Sync,
136  {
137    let body_stream = objects.flat_map(move |e| {
138      let transfer_byte_counter = transfer_byte_counter.clone();
139      iter(vec![
140        Ok(Bytes::from(
141          create_u16_be(e.key.len().try_into().unwrap()).to_vec(),
142        )),
143        Ok(Bytes::from(e.key)),
144        Ok(Bytes::from(create_u40_be(e.size).to_vec())),
145      ])
146      .chain(e.data_stream.inspect_ok(move |chunk| {
147        if let Some(c) = &transfer_byte_counter {
148          c.unbounded_send(chunk.len()).unwrap();
149        }
150      }))
151    });
152    let body = Body::wrap_stream(body_stream);
153    let res = self
154      .client
155      .post(self.endpoint.clone())
156      .query(&[self.generate_token_query_param(AuthTokenAction::BatchCreateObjects {}, 300)])
157      .body(body)
158      .send()
159      .await?
160      .error_for_status()?;
161    Ok(BatchCreatedObjects {
162      successful_count: res
163        .headers()
164        .get("x-blobd-objects-created-count")
165        .unwrap()
166        .to_str()
167        .unwrap()
168        .parse()
169        .unwrap(),
170    })
171  }
172
173  pub async fn create_object(&self, key: &str, size: u64) -> reqwest::Result<CreatedObject> {
174    let res = self
175      .client
176      .post(self.build_url(key))
177      .query(&[
178        ("size", size.to_string()),
179        self.generate_token_query_param(
180          AuthTokenAction::CreateObject {
181            key: key.as_bytes().to_vec(),
182            size,
183          },
184          300,
185        ),
186      ])
187      .send()
188      .await?
189      .error_for_status()?;
190    Ok(CreatedObject {
191      upload_token: res
192        .headers()
193        .get("x-blobd-upload-token")
194        .unwrap()
195        .to_str()
196        .unwrap()
197        .parse()
198        .unwrap(),
199    })
200  }
201
202  pub async fn commit_object(
203    &self,
204    key: &str,
205    creation: CreatedObject,
206    write_receipts: impl IntoIterator<Item = impl Display>,
207  ) -> reqwest::Result<()> {
208    self
209      .client
210      .put(self.build_url(key))
211      .query(&[
212        ("upload_token", creation.upload_token.to_string()),
213        ("write_receipts", write_receipts.into_iter().join(",")),
214        self.generate_token_query_param(
215          AuthTokenAction::CommitObject {
216            key: key.as_bytes().to_vec(),
217          },
218          300,
219        ),
220      ])
221      .send()
222      .await?
223      .error_for_status()?;
224    Ok(())
225  }
226
227  pub async fn delete_object(&self, key: &str) -> reqwest::Result<()> {
228    self
229      .client
230      .delete(self.build_url(key))
231      .query(&[self.generate_token_query_param(
232        AuthTokenAction::DeleteObject {
233          key: key.as_bytes().to_vec(),
234        },
235        300,
236      )])
237      .send()
238      .await?
239      .error_for_status()?;
240    Ok(())
241  }
242
243  pub async fn inspect_object(&self, key: &str) -> reqwest::Result<InspectedObject> {
244    let res = self
245      .client
246      .head(self.build_url(key))
247      .query(&[self.generate_token_query_param(
248        AuthTokenAction::InspectObject {
249          key: key.as_bytes().to_vec(),
250        },
251        300,
252      )])
253      .send()
254      .await?
255      .error_for_status()?;
256    Ok(InspectedObject {
257      object_id: res
258        .headers()
259        .get("x-blobd-object-id")
260        .unwrap()
261        .to_str()
262        .unwrap()
263        .parse()
264        .unwrap(),
265      content_length: res
266        .headers()
267        .get(CONTENT_LENGTH)
268        .unwrap()
269        .to_str()
270        .unwrap()
271        .parse()
272        .unwrap(),
273    })
274  }
275
276  pub async fn read_object(
277    &self,
278    key: &str,
279    start: Option<u64>,
280    end: Option<u64>,
281  ) -> reqwest::Result<impl Stream<Item = reqwest::Result<Bytes>>> {
282    let mut req = self
283      .client
284      .get(self.build_url(key))
285      .query(&[self.generate_token_query_param(
286        AuthTokenAction::ReadObject {
287          key: key.as_bytes().to_vec(),
288        },
289        300,
290      )]);
291    
292    if let Some(start) = start {
293      req = req.header(
294        RANGE,
295        format!(
296          "bytes={}-{}",
297          start,
298          end.map(|e| e.to_string()).unwrap_or_default()
299        ),
300      );
301    }
302    
303    let res = req.send().await?.error_for_status()?;
304    Ok(res.bytes_stream())
305  }
306
307  pub async fn write_object(
308    &self,
309    key: &str,
310    creation: CreatedObject,
311    offset: u64,
312    data: impl Into<Body>,
313  ) -> reqwest::Result<WrittenObjectPart> {
314    let res = self
315      .client
316      .patch(self.build_url(key))
317      .query(&[
318        ("offset", offset.to_string()),
319        ("upload_token", creation.upload_token.to_string()),
320        self.generate_token_query_param(
321          AuthTokenAction::WriteObject {
322            key: key.as_bytes().to_vec(),
323          },
324          300,
325        ),
326      ])
327      .body(data)
328      .send()
329      .await?
330      .error_for_status()?;
331    Ok(WrittenObjectPart {
332      write_receipt: res
333        .headers()
334        .get("x-blobd-write-receipt")
335        .unwrap()
336        .to_str()
337        .unwrap()
338        .parse()
339        .unwrap(),
340    })
341  }
342}