cloud_storage_lite/client/
bucket.rs

1use bytes::Bytes;
2use futures::{stream::BoxStream, TryStream, TryStreamExt};
3use reqwest::{Method, StatusCode};
4
5use crate::{
6    api::{self, percent_encode, DecodeResponse, ListObjectOptions, Object, Page},
7    errors::{Error, NotFoundError},
8};
9
10const GCS_UPLOAD_API_URL: &str = "https://www.googleapis.com/upload/storage/v1/";
11
12/// A GCS client that targets a specific bucket.
13#[async_trait::async_trait]
14pub trait BucketClient {
15    /// Returns the name of the bucket to which this `BucketClient` is connected.
16    fn bucket_name(&self) -> &str;
17
18    /// Checks connectivity to the bucket.
19    async fn ping(&self) -> Result<(), Error>;
20
21    /// Returns a page of objects.
22    async fn list_objects<'a>(&self, options: ListObjectOptions<'a>)
23        -> Result<Page<Object>, Error>;
24
25    /// Creates a new object.
26    async fn create_object<S>(&self, key: &str, value: S) -> Result<Object, Error>
27    where
28        S: TryStream + Send + Sync + 'static,
29        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
30        Bytes: From<S::Ok>;
31
32    /// Gets an object's metadata.
33    async fn get_object(&self, key: &str) -> Result<Object, Error>;
34
35    /// Downloads an object.
36    async fn download_object(
37        &self,
38        key: &str,
39    ) -> Result<BoxStream<'static, Result<Bytes, Error>>, Error>;
40
41    /// Deletes an object. This library does not consider deleting a non-existent object an error.
42    async fn delete_object(&self, key: &str) -> Result<(), Error>;
43}
44
45/// A client whose operations target a specific bucket.
46/// See [`BucketClient`] for operations on this type.
47#[derive(Clone)]
48pub struct GcsBucketClient {
49    client: crate::Client,
50    bucket_name: String,
51    object_path: String,
52    upload_url: reqwest::Url,
53}
54
55impl GcsBucketClient {
56    pub(super) fn new(client: crate::Client, bucket_name: String) -> Self {
57        let encoded_bucket = percent_encode(&bucket_name);
58        let object_path = format!("b/{}/o", encoded_bucket);
59        Self {
60            client,
61            bucket_name,
62            upload_url: reqwest::Url::parse(GCS_UPLOAD_API_URL)
63                .and_then(|u| u.join(&object_path))
64                .expect("malformed url"),
65            object_path,
66        }
67    }
68
69    fn convert_api_error(&self, api_err: api::Error, requested_key: Option<&str>) -> Error {
70        match api_err {
71            api::Error::Http(e) => Error::Http(e),
72            api::Error::Google(e) => {
73                if e.status == StatusCode::NOT_FOUND {
74                    if e.message.is_empty() || e.message.starts_with("No such object") {
75                        NotFoundError::Object {
76                            bucket: self.bucket_name.clone(),
77                            key: requested_key.unwrap_or_default().into(),
78                        }
79                        .into()
80                    } else {
81                        NotFoundError::Bucket {
82                            bucket: self.bucket_name.clone(),
83                        }
84                        .into()
85                    }
86                } else if e.status == StatusCode::FORBIDDEN {
87                    Error::PermissionDenied(e.message)
88                } else {
89                    Error::OtherGoogle(e)
90                }
91            }
92        }
93    }
94}
95
96#[async_trait::async_trait]
97impl BucketClient for GcsBucketClient {
98    fn bucket_name(&self) -> &str {
99        &self.bucket_name
100    }
101
102    async fn ping(&self) -> Result<(), Error> {
103        self.list_objects(ListObjectOptions {
104            max_results: Some(0),
105            ..Default::default()
106        })
107        .await?;
108        Ok(())
109    }
110
111    async fn list_objects<'a>(
112        &self,
113        options: ListObjectOptions<'a>,
114    ) -> Result<Page<Object>, Error> {
115        self.client
116            .make_request(&Method::GET, &self.object_path, |builder| {
117                builder.query(&options).send()
118            })
119            .await?
120            .decode_response()
121            .await
122            .map_err(|e| self.convert_api_error(e, None /* no object */))
123    }
124
125    async fn create_object<S>(&self, key: &str, value: S) -> Result<Object, Error>
126    where
127        S: TryStream + Send + Sync + 'static,
128        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
129        Bytes: From<S::Ok>,
130    {
131        self.client
132            .make_request_to_url(&Method::POST, &self.upload_url.clone(), |builder| {
133                builder
134                    .query(&[("name", key)])
135                    .body(reqwest::Body::wrap_stream(value))
136                    .send()
137            })
138            .await?
139            .decode_response()
140            .await
141            .map_err(|e| self.convert_api_error(e, Some(key)))
142    }
143
144    async fn get_object(&self, key: &str) -> Result<Object, Error> {
145        if key.trim().is_empty() {
146            return Err(Error::NotFound(NotFoundError::Object {
147                bucket: self.bucket_name.clone(),
148                key: key.into(),
149            }));
150        }
151        self.client
152            .make_request(
153                &Method::GET,
154                &format!("{}/{}", self.object_path, percent_encode(key)),
155                |builder| builder.send(),
156            )
157            .await?
158            .decode_response()
159            .await
160            .map_err(|e| self.convert_api_error(e, Some(key)))
161    }
162
163    async fn download_object(
164        &self,
165        key: &str,
166    ) -> Result<BoxStream<'static, Result<Bytes, Error>>, Error> {
167        let res = self
168            .client
169            .make_request(
170                &Method::GET,
171                &format!("{}/{}?alt=media", self.object_path, percent_encode(key)),
172                |builder| builder.send(),
173            )
174            .await?;
175        if res.status().is_success() {
176            Ok(Box::pin(res.bytes_stream().map_err(Error::from)))
177        } else if res.status() == StatusCode::NOT_FOUND {
178            Err(Error::NotFound(NotFoundError::Object {
179                bucket: self.bucket_name.clone(),
180                key: key.into(),
181            }))
182        } else {
183            Err(Error::OtherGoogle(api::GoogleError {
184                status: res.status(),
185                message: res.text().await?,
186            }))
187        }
188    }
189
190    async fn delete_object(&self, key: &str) -> Result<(), Error> {
191        self.client
192            .make_request(
193                &Method::DELETE,
194                &format!("{}/{}", self.object_path, percent_encode(key)),
195                |builder| builder.send(),
196            )
197            .await?
198            .decode_response::<()>()
199            .await
200            .or_else(|e| match e {
201                api::Error::Google(api::GoogleError {
202                    status: StatusCode::NOT_FOUND,
203                    ..
204                }) => Ok(()),
205                _ => Err(e),
206            })
207            .map_err(|e| self.convert_api_error(e, Some(key)))
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214
215    use crate::{
216        token_provider::oauth::{OAuthTokenProvider, ServiceAccount, SCOPE_STORAGE_FULL_CONTROL},
217        Client,
218    };
219
220    fn test_bucket() -> String {
221        std::env::var("CLOUD_STORAGE_LITE_TEST_BUCKET").unwrap()
222    }
223
224    fn random_string() -> String {
225        let mut rng = rand::thread_rng();
226        std::iter::repeat(())
227            .map(|()| rand::Rng::sample(&mut rng, rand::distributions::Alphanumeric))
228            .map(char::from)
229            .take(8)
230            .collect()
231    }
232
233    fn get_client() -> Client {
234        let token_provider = OAuthTokenProvider::new(
235            ServiceAccount::read_from_canonical_env().unwrap(),
236            SCOPE_STORAGE_FULL_CONTROL,
237        )
238        .unwrap();
239        Client::new(token_provider)
240    }
241
242    fn get_bucket_client() -> impl BucketClient {
243        get_client().into_bucket_client(test_bucket())
244    }
245
246    #[tokio::test]
247    async fn ping() {
248        let bucket_client = get_bucket_client();
249        bucket_client.ping().await.unwrap();
250    }
251
252    #[tokio::test]
253    async fn ping_notfound() {
254        let bucket_client = get_client().into_bucket_client(test_bucket() + "qqq");
255        let result = bucket_client.ping().await;
256        assert!(matches!(result, Err(Error::NotFound(_))), "{:?}", result);
257    }
258
259    #[tokio::test]
260    async fn ping_forbidden() {
261        let bucket_client = get_client().into_bucket_client("admin".into());
262        let result = bucket_client.ping().await;
263        assert!(
264            matches!(result, Err(Error::PermissionDenied(_))),
265            "{:?}",
266            result
267        );
268    }
269
270    static TEST_DATA: &str = "test";
271
272    fn make_data_stream() -> impl futures::Stream<Item = Result<Bytes, std::convert::Infallible>> {
273        futures::stream::once(futures::future::ok::<_, std::convert::Infallible>(
274            Bytes::from(TEST_DATA),
275        ))
276    }
277
278    #[tokio::test]
279    async fn create_object() {
280        let bucket_client = get_bucket_client();
281        let key = random_string();
282        bucket_client
283            .create_object(&key, make_data_stream())
284            .await
285            .unwrap();
286        let obj = bucket_client.get_object(&key).await.unwrap();
287        assert_eq!(obj.name, key);
288        assert_eq!(obj.size, TEST_DATA.len() as u64);
289        assert!(obj.id.starts_with(&(test_bucket() + "/" + &key)));
290    }
291
292    #[tokio::test]
293    async fn get_object_notfound() {
294        let bucket_client = get_bucket_client();
295        assert!(matches!(
296            bucket_client.get_object("thiskeydoesnotexist").await,
297            Err(Error::NotFound(NotFoundError::Object { .. }))
298        ));
299        assert!(matches!(
300            bucket_client.get_object("").await,
301            Err(Error::NotFound(NotFoundError::Object { .. }))
302        ));
303    }
304
305    #[tokio::test]
306    async fn list_objects() {
307        let bucket_client = get_bucket_client();
308        let prefix = random_string();
309
310        let key1 = prefix.clone() + "key1";
311        let key2 = prefix.clone() + "key2";
312
313        let create_key1 = bucket_client.create_object(&key1, make_data_stream());
314        let create_key2 = bucket_client.create_object(&key2, make_data_stream());
315        futures::try_join!(create_key1, create_key2).unwrap();
316
317        let page = bucket_client
318            .list_objects(ListObjectOptions {
319                prefix: Some(&prefix),
320                ..Default::default()
321            })
322            .await
323            .unwrap();
324        assert_eq!(page.items.len(), 2);
325
326        let page = bucket_client
327            .list_objects(ListObjectOptions {
328                prefix: Some(&key1),
329                ..Default::default()
330            })
331            .await
332            .unwrap();
333        assert_eq!(page.items.len(), 1);
334    }
335
336    #[tokio::test]
337    async fn download_object() {
338        let bucket_client = get_bucket_client();
339        let key = random_string();
340        bucket_client
341            .create_object(&key, make_data_stream())
342            .await
343            .unwrap();
344        let downloaded_data = bucket_client
345            .download_object(&key)
346            .await
347            .unwrap()
348            .try_fold(Vec::new(), |mut buf, chunk| async move {
349                buf.extend_from_slice(&chunk);
350                Ok(buf)
351            })
352            .await
353            .unwrap();
354        assert_eq!(downloaded_data, TEST_DATA.as_bytes());
355    }
356
357    #[tokio::test]
358    async fn download_notfound() {
359        let bucket_client = get_bucket_client();
360        let err_res = bucket_client.download_object("thiskeydoesnotexist").await;
361        assert!(matches!(
362            err_res,
363            Err(Error::NotFound(NotFoundError::Object { .. }))
364        ));
365    }
366
367    #[tokio::test]
368    async fn delete_object() {
369        let bucket_client = get_bucket_client();
370        let key = random_string();
371        bucket_client
372            .create_object(&key, make_data_stream())
373            .await
374            .unwrap();
375        bucket_client.delete_object(&key).await.unwrap();
376        assert!(matches!(
377            bucket_client.get_object(&key).await.unwrap_err(),
378            Error::NotFound(NotFoundError::Object { .. })
379        ));
380    }
381
382    #[tokio::test]
383    async fn delete_nonexistent() {
384        let bucket_client = get_bucket_client();
385        bucket_client
386            .delete_object("thiskeydoesnotexist")
387            .await
388            .unwrap();
389    }
390
391    #[tokio::test]
392    async fn object_lifecycle() {
393        let bucket_client = get_bucket_client();
394        let key = random_string() + "/" + &random_string();
395        bucket_client
396            .create_object(&key, make_data_stream())
397            .await
398            .unwrap();
399        bucket_client.get_object(&key).await.unwrap();
400        bucket_client.download_object(&key).await.unwrap();
401        bucket_client.delete_object(&key).await.unwrap();
402        bucket_client.get_object(&key).await.unwrap_err();
403    }
404}