cloud_storage_lite/client/
bucket.rs1use bytes::Bytes;
2use futures::{stream::BoxStream, TryStream, TryStreamExt};
3use reqwest::{Method, StatusCode};
4
5use crate::{
6 api::{self, percent_encode, DecodeResponse, ListObjectOptions, Object, Page},
7 errors::{Error, NotFoundError},
8};
9
10const GCS_UPLOAD_API_URL: &str = "https://www.googleapis.com/upload/storage/v1/";
11
12#[async_trait::async_trait]
14pub trait BucketClient {
15 fn bucket_name(&self) -> &str;
17
18 async fn ping(&self) -> Result<(), Error>;
20
21 async fn list_objects<'a>(&self, options: ListObjectOptions<'a>)
23 -> Result<Page<Object>, Error>;
24
25 async fn create_object<S>(&self, key: &str, value: S) -> Result<Object, Error>
27 where
28 S: TryStream + Send + Sync + 'static,
29 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
30 Bytes: From<S::Ok>;
31
32 async fn get_object(&self, key: &str) -> Result<Object, Error>;
34
35 async fn download_object(
37 &self,
38 key: &str,
39 ) -> Result<BoxStream<'static, Result<Bytes, Error>>, Error>;
40
41 async fn delete_object(&self, key: &str) -> Result<(), Error>;
43}
44
45#[derive(Clone)]
48pub struct GcsBucketClient {
49 client: crate::Client,
50 bucket_name: String,
51 object_path: String,
52 upload_url: reqwest::Url,
53}
54
55impl GcsBucketClient {
56 pub(super) fn new(client: crate::Client, bucket_name: String) -> Self {
57 let encoded_bucket = percent_encode(&bucket_name);
58 let object_path = format!("b/{}/o", encoded_bucket);
59 Self {
60 client,
61 bucket_name,
62 upload_url: reqwest::Url::parse(GCS_UPLOAD_API_URL)
63 .and_then(|u| u.join(&object_path))
64 .expect("malformed url"),
65 object_path,
66 }
67 }
68
69 fn convert_api_error(&self, api_err: api::Error, requested_key: Option<&str>) -> Error {
70 match api_err {
71 api::Error::Http(e) => Error::Http(e),
72 api::Error::Google(e) => {
73 if e.status == StatusCode::NOT_FOUND {
74 if e.message.is_empty() || e.message.starts_with("No such object") {
75 NotFoundError::Object {
76 bucket: self.bucket_name.clone(),
77 key: requested_key.unwrap_or_default().into(),
78 }
79 .into()
80 } else {
81 NotFoundError::Bucket {
82 bucket: self.bucket_name.clone(),
83 }
84 .into()
85 }
86 } else if e.status == StatusCode::FORBIDDEN {
87 Error::PermissionDenied(e.message)
88 } else {
89 Error::OtherGoogle(e)
90 }
91 }
92 }
93 }
94}
95
96#[async_trait::async_trait]
97impl BucketClient for GcsBucketClient {
98 fn bucket_name(&self) -> &str {
99 &self.bucket_name
100 }
101
102 async fn ping(&self) -> Result<(), Error> {
103 self.list_objects(ListObjectOptions {
104 max_results: Some(0),
105 ..Default::default()
106 })
107 .await?;
108 Ok(())
109 }
110
111 async fn list_objects<'a>(
112 &self,
113 options: ListObjectOptions<'a>,
114 ) -> Result<Page<Object>, Error> {
115 self.client
116 .make_request(&Method::GET, &self.object_path, |builder| {
117 builder.query(&options).send()
118 })
119 .await?
120 .decode_response()
121 .await
122 .map_err(|e| self.convert_api_error(e, None ))
123 }
124
125 async fn create_object<S>(&self, key: &str, value: S) -> Result<Object, Error>
126 where
127 S: TryStream + Send + Sync + 'static,
128 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
129 Bytes: From<S::Ok>,
130 {
131 self.client
132 .make_request_to_url(&Method::POST, &self.upload_url.clone(), |builder| {
133 builder
134 .query(&[("name", key)])
135 .body(reqwest::Body::wrap_stream(value))
136 .send()
137 })
138 .await?
139 .decode_response()
140 .await
141 .map_err(|e| self.convert_api_error(e, Some(key)))
142 }
143
144 async fn get_object(&self, key: &str) -> Result<Object, Error> {
145 if key.trim().is_empty() {
146 return Err(Error::NotFound(NotFoundError::Object {
147 bucket: self.bucket_name.clone(),
148 key: key.into(),
149 }));
150 }
151 self.client
152 .make_request(
153 &Method::GET,
154 &format!("{}/{}", self.object_path, percent_encode(key)),
155 |builder| builder.send(),
156 )
157 .await?
158 .decode_response()
159 .await
160 .map_err(|e| self.convert_api_error(e, Some(key)))
161 }
162
163 async fn download_object(
164 &self,
165 key: &str,
166 ) -> Result<BoxStream<'static, Result<Bytes, Error>>, Error> {
167 let res = self
168 .client
169 .make_request(
170 &Method::GET,
171 &format!("{}/{}?alt=media", self.object_path, percent_encode(key)),
172 |builder| builder.send(),
173 )
174 .await?;
175 if res.status().is_success() {
176 Ok(Box::pin(res.bytes_stream().map_err(Error::from)))
177 } else if res.status() == StatusCode::NOT_FOUND {
178 Err(Error::NotFound(NotFoundError::Object {
179 bucket: self.bucket_name.clone(),
180 key: key.into(),
181 }))
182 } else {
183 Err(Error::OtherGoogle(api::GoogleError {
184 status: res.status(),
185 message: res.text().await?,
186 }))
187 }
188 }
189
190 async fn delete_object(&self, key: &str) -> Result<(), Error> {
191 self.client
192 .make_request(
193 &Method::DELETE,
194 &format!("{}/{}", self.object_path, percent_encode(key)),
195 |builder| builder.send(),
196 )
197 .await?
198 .decode_response::<()>()
199 .await
200 .or_else(|e| match e {
201 api::Error::Google(api::GoogleError {
202 status: StatusCode::NOT_FOUND,
203 ..
204 }) => Ok(()),
205 _ => Err(e),
206 })
207 .map_err(|e| self.convert_api_error(e, Some(key)))
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214
215 use crate::{
216 token_provider::oauth::{OAuthTokenProvider, ServiceAccount, SCOPE_STORAGE_FULL_CONTROL},
217 Client,
218 };
219
220 fn test_bucket() -> String {
221 std::env::var("CLOUD_STORAGE_LITE_TEST_BUCKET").unwrap()
222 }
223
224 fn random_string() -> String {
225 let mut rng = rand::thread_rng();
226 std::iter::repeat(())
227 .map(|()| rand::Rng::sample(&mut rng, rand::distributions::Alphanumeric))
228 .map(char::from)
229 .take(8)
230 .collect()
231 }
232
233 fn get_client() -> Client {
234 let token_provider = OAuthTokenProvider::new(
235 ServiceAccount::read_from_canonical_env().unwrap(),
236 SCOPE_STORAGE_FULL_CONTROL,
237 )
238 .unwrap();
239 Client::new(token_provider)
240 }
241
242 fn get_bucket_client() -> impl BucketClient {
243 get_client().into_bucket_client(test_bucket())
244 }
245
246 #[tokio::test]
247 async fn ping() {
248 let bucket_client = get_bucket_client();
249 bucket_client.ping().await.unwrap();
250 }
251
252 #[tokio::test]
253 async fn ping_notfound() {
254 let bucket_client = get_client().into_bucket_client(test_bucket() + "qqq");
255 let result = bucket_client.ping().await;
256 assert!(matches!(result, Err(Error::NotFound(_))), "{:?}", result);
257 }
258
259 #[tokio::test]
260 async fn ping_forbidden() {
261 let bucket_client = get_client().into_bucket_client("admin".into());
262 let result = bucket_client.ping().await;
263 assert!(
264 matches!(result, Err(Error::PermissionDenied(_))),
265 "{:?}",
266 result
267 );
268 }
269
270 static TEST_DATA: &str = "test";
271
272 fn make_data_stream() -> impl futures::Stream<Item = Result<Bytes, std::convert::Infallible>> {
273 futures::stream::once(futures::future::ok::<_, std::convert::Infallible>(
274 Bytes::from(TEST_DATA),
275 ))
276 }
277
278 #[tokio::test]
279 async fn create_object() {
280 let bucket_client = get_bucket_client();
281 let key = random_string();
282 bucket_client
283 .create_object(&key, make_data_stream())
284 .await
285 .unwrap();
286 let obj = bucket_client.get_object(&key).await.unwrap();
287 assert_eq!(obj.name, key);
288 assert_eq!(obj.size, TEST_DATA.len() as u64);
289 assert!(obj.id.starts_with(&(test_bucket() + "/" + &key)));
290 }
291
292 #[tokio::test]
293 async fn get_object_notfound() {
294 let bucket_client = get_bucket_client();
295 assert!(matches!(
296 bucket_client.get_object("thiskeydoesnotexist").await,
297 Err(Error::NotFound(NotFoundError::Object { .. }))
298 ));
299 assert!(matches!(
300 bucket_client.get_object("").await,
301 Err(Error::NotFound(NotFoundError::Object { .. }))
302 ));
303 }
304
305 #[tokio::test]
306 async fn list_objects() {
307 let bucket_client = get_bucket_client();
308 let prefix = random_string();
309
310 let key1 = prefix.clone() + "key1";
311 let key2 = prefix.clone() + "key2";
312
313 let create_key1 = bucket_client.create_object(&key1, make_data_stream());
314 let create_key2 = bucket_client.create_object(&key2, make_data_stream());
315 futures::try_join!(create_key1, create_key2).unwrap();
316
317 let page = bucket_client
318 .list_objects(ListObjectOptions {
319 prefix: Some(&prefix),
320 ..Default::default()
321 })
322 .await
323 .unwrap();
324 assert_eq!(page.items.len(), 2);
325
326 let page = bucket_client
327 .list_objects(ListObjectOptions {
328 prefix: Some(&key1),
329 ..Default::default()
330 })
331 .await
332 .unwrap();
333 assert_eq!(page.items.len(), 1);
334 }
335
336 #[tokio::test]
337 async fn download_object() {
338 let bucket_client = get_bucket_client();
339 let key = random_string();
340 bucket_client
341 .create_object(&key, make_data_stream())
342 .await
343 .unwrap();
344 let downloaded_data = bucket_client
345 .download_object(&key)
346 .await
347 .unwrap()
348 .try_fold(Vec::new(), |mut buf, chunk| async move {
349 buf.extend_from_slice(&chunk);
350 Ok(buf)
351 })
352 .await
353 .unwrap();
354 assert_eq!(downloaded_data, TEST_DATA.as_bytes());
355 }
356
357 #[tokio::test]
358 async fn download_notfound() {
359 let bucket_client = get_bucket_client();
360 let err_res = bucket_client.download_object("thiskeydoesnotexist").await;
361 assert!(matches!(
362 err_res,
363 Err(Error::NotFound(NotFoundError::Object { .. }))
364 ));
365 }
366
367 #[tokio::test]
368 async fn delete_object() {
369 let bucket_client = get_bucket_client();
370 let key = random_string();
371 bucket_client
372 .create_object(&key, make_data_stream())
373 .await
374 .unwrap();
375 bucket_client.delete_object(&key).await.unwrap();
376 assert!(matches!(
377 bucket_client.get_object(&key).await.unwrap_err(),
378 Error::NotFound(NotFoundError::Object { .. })
379 ));
380 }
381
382 #[tokio::test]
383 async fn delete_nonexistent() {
384 let bucket_client = get_bucket_client();
385 bucket_client
386 .delete_object("thiskeydoesnotexist")
387 .await
388 .unwrap();
389 }
390
391 #[tokio::test]
392 async fn object_lifecycle() {
393 let bucket_client = get_bucket_client();
394 let key = random_string() + "/" + &random_string();
395 bucket_client
396 .create_object(&key, make_data_stream())
397 .await
398 .unwrap();
399 bucket_client.get_object(&key).await.unwrap();
400 bucket_client.download_object(&key).await.unwrap();
401 bucket_client.delete_object(&key).await.unwrap();
402 bucket_client.get_object(&key).await.unwrap_err();
403 }
404}