1use std::sync::Arc;
2
3use futures_util::{Stream, TryStream, TryStreamExt};
4use reqwest::header::{HeaderValue, CONTENT_LENGTH, LOCATION};
5use reqwest::{Body, Request};
6use reqwest_middleware::RequestBuilder;
7
8use google_cloud_token::TokenSource;
9
10use crate::http::bucket_access_controls::delete::DeleteBucketAccessControlRequest;
11use crate::http::bucket_access_controls::get::GetBucketAccessControlRequest;
12use crate::http::bucket_access_controls::insert::InsertBucketAccessControlRequest;
13use crate::http::bucket_access_controls::list::{ListBucketAccessControlsRequest, ListBucketAccessControlsResponse};
14use crate::http::bucket_access_controls::patch::PatchBucketAccessControlRequest;
15use crate::http::bucket_access_controls::BucketAccessControl;
16use crate::http::buckets::delete::DeleteBucketRequest;
17use crate::http::buckets::get::GetBucketRequest;
18use crate::http::buckets::get_iam_policy::GetIamPolicyRequest;
19use crate::http::buckets::insert::InsertBucketRequest;
20use crate::http::buckets::list::{ListBucketsRequest, ListBucketsResponse};
21use crate::http::buckets::patch::PatchBucketRequest;
22use crate::http::buckets::set_iam_policy::SetIamPolicyRequest;
23use crate::http::buckets::test_iam_permissions::{TestIamPermissionsRequest, TestIamPermissionsResponse};
24use crate::http::buckets::{Bucket, Policy};
25use crate::http::default_object_access_controls::delete::DeleteDefaultObjectAccessControlRequest;
26use crate::http::default_object_access_controls::get::GetDefaultObjectAccessControlRequest;
27use crate::http::default_object_access_controls::insert::InsertDefaultObjectAccessControlRequest;
28use crate::http::default_object_access_controls::list::{
29 ListDefaultObjectAccessControlsRequest, ListDefaultObjectAccessControlsResponse,
30};
31use crate::http::default_object_access_controls::patch::PatchDefaultObjectAccessControlRequest;
32use crate::http::hmac_keys::create::{CreateHmacKeyRequest, CreateHmacKeyResponse};
33use crate::http::hmac_keys::delete::DeleteHmacKeyRequest;
34use crate::http::hmac_keys::get::GetHmacKeyRequest;
35use crate::http::hmac_keys::list::{ListHmacKeysRequest, ListHmacKeysResponse};
36use crate::http::hmac_keys::update::UpdateHmacKeyRequest;
37use crate::http::hmac_keys::HmacKeyMetadata;
38use crate::http::notifications::delete::DeleteNotificationRequest;
39use crate::http::notifications::get::GetNotificationRequest;
40use crate::http::notifications::insert::InsertNotificationRequest;
41use crate::http::notifications::list::{ListNotificationsRequest, ListNotificationsResponse};
42use crate::http::notifications::Notification;
43use crate::http::object_access_controls::delete::DeleteObjectAccessControlRequest;
44use crate::http::object_access_controls::get::GetObjectAccessControlRequest;
45use crate::http::object_access_controls::insert::InsertObjectAccessControlRequest;
46use crate::http::object_access_controls::list::ListObjectAccessControlsRequest;
47use crate::http::object_access_controls::patch::PatchObjectAccessControlRequest;
48use crate::http::object_access_controls::ObjectAccessControl;
49use crate::http::objects::compose::ComposeObjectRequest;
50use crate::http::objects::copy::CopyObjectRequest;
51use crate::http::objects::delete::DeleteObjectRequest;
52use crate::http::objects::download::Range;
53use crate::http::objects::get::GetObjectRequest;
54use crate::http::objects::list::{ListObjectsRequest, ListObjectsResponse};
55use crate::http::objects::patch::PatchObjectRequest;
56use crate::http::objects::r#move::MoveObjectRequest;
57use crate::http::objects::rewrite::{RewriteObjectRequest, RewriteObjectResponse};
58use crate::http::objects::upload::{UploadObjectRequest, UploadType};
59use crate::http::objects::Object;
60use crate::http::resumable_upload_client::ResumableUploadClient;
61use crate::http::{
62 bucket_access_controls, buckets, check_response_status, default_object_access_controls, hmac_keys, notifications,
63 object_access_controls, objects, Error,
64};
65
66pub const SCOPES: [&str; 2] = [
67 "https://www.googleapis.com/auth/cloud-platform",
68 "https://www.googleapis.com/auth/devstorage.full_control",
69];
70
71#[derive(Clone)]
72pub struct StorageClient {
73 ts: Option<Arc<dyn TokenSource>>,
74 v1_endpoint: String,
75 v1_upload_endpoint: String,
76 http: reqwest_middleware::ClientWithMiddleware,
77}
78
79impl StorageClient {
80 pub(crate) fn new(
81 ts: Option<Arc<dyn TokenSource>>,
82 endpoint: &str,
83 http: reqwest_middleware::ClientWithMiddleware,
84 ) -> Self {
85 Self {
86 ts,
87 v1_endpoint: format!("{endpoint}/storage/v1"),
88 v1_upload_endpoint: format!("{endpoint}/upload/storage/v1"),
89 http,
90 }
91 }
92
93 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
108 pub async fn delete_bucket(&self, req: &DeleteBucketRequest) -> Result<(), Error> {
109 let builder = buckets::delete::build(self.v1_endpoint.as_str(), &self.http, req);
110 self.send_get_empty(builder).await
111 }
112
113 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
132 pub async fn insert_bucket(&self, req: &InsertBucketRequest) -> Result<Bucket, Error> {
133 let builder = buckets::insert::build(self.v1_endpoint.as_str(), &self.http, req);
134 self.send(builder).await
135 }
136
137 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
152 pub async fn get_bucket(&self, req: &GetBucketRequest) -> Result<Bucket, Error> {
153 let builder = buckets::get::build(self.v1_endpoint.as_str(), &self.http, req);
154 self.send(builder).await
155 }
156
157 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
178 pub async fn patch_bucket(&self, req: &PatchBucketRequest) -> Result<Bucket, Error> {
179 let builder = buckets::patch::build(self.v1_endpoint.as_str(), &self.http, req);
180 self.send(builder).await
181 }
182
183 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
198 pub async fn list_buckets(&self, req: &ListBucketsRequest) -> Result<ListBucketsResponse, Error> {
199 let builder = buckets::list::build(self.v1_endpoint.as_str(), &self.http, req);
200 self.send(builder).await
201 }
202
203 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
227 pub async fn set_iam_policy(&self, req: &SetIamPolicyRequest) -> Result<Policy, Error> {
228 let builder = buckets::set_iam_policy::build(self.v1_endpoint.as_str(), &self.http, req);
229 self.send(builder).await
230 }
231
232 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
248 pub async fn get_iam_policy(&self, req: &GetIamPolicyRequest) -> Result<Policy, Error> {
249 let builder = buckets::get_iam_policy::build(self.v1_endpoint.as_str(), &self.http, req);
250 self.send(builder).await
251 }
252
253 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
268 pub async fn test_iam_permissions(
269 &self,
270 req: &TestIamPermissionsRequest,
271 ) -> Result<TestIamPermissionsResponse, Error> {
272 let builder = buckets::test_iam_permissions::build(self.v1_endpoint.as_str(), &self.http, req);
273 self.send(builder).await
274 }
275
276 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
292 pub async fn list_default_object_access_controls(
293 &self,
294 req: &ListDefaultObjectAccessControlsRequest,
295 ) -> Result<ListDefaultObjectAccessControlsResponse, Error> {
296 let builder = default_object_access_controls::list::build(self.v1_endpoint.as_str(), &self.http, req);
297 self.send(builder).await
298 }
299
300 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
315 pub async fn get_default_object_access_control(
316 &self,
317 req: &GetDefaultObjectAccessControlRequest,
318 ) -> Result<ObjectAccessControl, Error> {
319 let builder = default_object_access_controls::get::build(self.v1_endpoint.as_str(), &self.http, req);
320 self.send(builder).await
321 }
322
323 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
343 pub async fn insert_default_object_access_control(
344 &self,
345 req: &InsertDefaultObjectAccessControlRequest,
346 ) -> Result<ObjectAccessControl, Error> {
347 let builder = default_object_access_controls::insert::build(self.v1_endpoint.as_str(), &self.http, req);
348 self.send(builder).await
349 }
350
351 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
373 pub async fn patch_default_object_access_control(
374 &self,
375 req: &PatchDefaultObjectAccessControlRequest,
376 ) -> Result<ObjectAccessControl, Error> {
377 let builder = default_object_access_controls::patch::build(self.v1_endpoint.as_str(), &self.http, req);
378 self.send(builder).await
379 }
380
381 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
396 pub async fn delete_default_object_access_control(
397 &self,
398 req: &DeleteDefaultObjectAccessControlRequest,
399 ) -> Result<(), Error> {
400 let builder = default_object_access_controls::delete::build(self.v1_endpoint.as_str(), &self.http, req);
401 self.send_get_empty(builder).await
402 }
403
404 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
418 pub async fn list_bucket_access_controls(
419 &self,
420 req: &ListBucketAccessControlsRequest,
421 ) -> Result<ListBucketAccessControlsResponse, Error> {
422 let builder = bucket_access_controls::list::build(self.v1_endpoint.as_str(), &self.http, req);
423 self.send(builder).await
424 }
425
426 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
441 pub async fn get_bucket_access_control(
442 &self,
443 req: &GetBucketAccessControlRequest,
444 ) -> Result<BucketAccessControl, Error> {
445 let builder = bucket_access_controls::get::build(self.v1_endpoint.as_str(), &self.http, req);
446 self.send(builder).await
447 }
448
449 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
468 pub async fn insert_bucket_access_control(
469 &self,
470 req: &InsertBucketAccessControlRequest,
471 ) -> Result<BucketAccessControl, Error> {
472 let builder = bucket_access_controls::insert::build(self.v1_endpoint.as_str(), &self.http, req);
473 self.send(builder).await
474 }
475
476 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
497 pub async fn patch_bucket_access_control(
498 &self,
499 req: &PatchBucketAccessControlRequest,
500 ) -> Result<BucketAccessControl, Error> {
501 let builder = bucket_access_controls::patch::build(self.v1_endpoint.as_str(), &self.http, req);
502 self.send(builder).await
503 }
504
505 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
519 pub async fn delete_bucket_access_control(&self, req: &DeleteBucketAccessControlRequest) -> Result<(), Error> {
520 let builder = bucket_access_controls::delete::build(self.v1_endpoint.as_str(), &self.http, req);
521 self.send_get_empty(builder).await
522 }
523
524 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
540 pub async fn list_object_access_controls(
541 &self,
542 req: &ListObjectAccessControlsRequest,
543 ) -> Result<ListBucketAccessControlsResponse, Error> {
544 let builder = object_access_controls::list::build(self.v1_endpoint.as_str(), &self.http, req);
545 self.send(builder).await
546 }
547
548 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
567 pub async fn get_object_access_control(
568 &self,
569 req: &GetObjectAccessControlRequest,
570 ) -> Result<ObjectAccessControl, Error> {
571 let builder = object_access_controls::get::build(self.v1_endpoint.as_str(), &self.http, req);
572 self.send(builder).await
573 }
574
575 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
598 pub async fn insert_object_access_control(
599 &self,
600 req: &InsertObjectAccessControlRequest,
601 ) -> Result<ObjectAccessControl, Error> {
602 let builder = object_access_controls::insert::build(self.v1_endpoint.as_str(), &self.http, req);
603 self.send(builder).await
604 }
605
606 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
630 pub async fn patch_object_access_control(
631 &self,
632 req: &PatchObjectAccessControlRequest,
633 ) -> Result<ObjectAccessControl, Error> {
634 let builder = object_access_controls::patch::build(self.v1_endpoint.as_str(), &self.http, req);
635 self.send(builder).await
636 }
637
638 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
656 pub async fn delete_object_access_control(&self, req: &DeleteObjectAccessControlRequest) -> Result<(), Error> {
657 let builder = object_access_controls::delete::build(self.v1_endpoint.as_str(), &self.http, req);
658 self.send_get_empty(builder).await
659 }
660
661 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
676 pub async fn list_notifications(&self, req: &ListNotificationsRequest) -> Result<ListNotificationsResponse, Error> {
677 let builder = notifications::list::build(self.v1_endpoint.as_str(), &self.http, req);
678 self.send(builder).await
679 }
680
681 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
698 pub async fn get_notification(&self, req: &GetNotificationRequest) -> Result<Notification, Error> {
699 let builder = notifications::get::build(self.v1_endpoint.as_str(), &self.http, req);
700 self.send(builder).await
701 }
702
703 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
725 pub async fn insert_notification(&self, req: &InsertNotificationRequest) -> Result<Notification, Error> {
726 let builder = notifications::insert::build(self.v1_endpoint.as_str(), &self.http, req);
727 self.send(builder).await
728 }
729
730 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
747 pub async fn delete_notification(&self, req: &DeleteNotificationRequest) -> Result<(), Error> {
748 let builder = notifications::delete::build(self.v1_endpoint.as_str(), &self.http, req);
749 self.send_get_empty(builder).await
750 }
751
752 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
769 pub async fn list_hmac_keys(&self, req: &ListHmacKeysRequest) -> Result<ListHmacKeysResponse, Error> {
770 let builder = hmac_keys::list::build(self.v1_endpoint.as_str(), &self.http, req);
771 self.send(builder).await
772 }
773
774 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
791 pub async fn get_hmac_key(&self, req: &GetHmacKeyRequest) -> Result<HmacKeyMetadata, Error> {
792 let builder = hmac_keys::get::build(self.v1_endpoint.as_str(), &self.http, req);
793 self.send(builder).await
794 }
795
796 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
813 pub async fn create_hmac_key(&self, req: &CreateHmacKeyRequest) -> Result<CreateHmacKeyResponse, Error> {
814 let builder = hmac_keys::create::build(self.v1_endpoint.as_str(), &self.http, req);
815 self.send(builder).await
816 }
817
818 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
840 pub async fn update_hmac_key(&self, req: &UpdateHmacKeyRequest) -> Result<HmacKeyMetadata, Error> {
841 let builder = hmac_keys::update::build(self.v1_endpoint.as_str(), &self.http, req);
842 self.send(builder).await
843 }
844
845 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
862 pub async fn delete_hmac_key(&self, req: &DeleteHmacKeyRequest) -> Result<(), Error> {
863 let builder = hmac_keys::delete::build(self.v1_endpoint.as_str(), &self.http, req);
864 self.send_get_empty(builder).await
865 }
866
867 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
884 pub async fn list_objects(&self, req: &ListObjectsRequest) -> Result<ListObjectsResponse, Error> {
885 let builder = objects::list::build(self.v1_endpoint.as_str(), &self.http, req);
886 self.send(builder).await
887 }
888
889 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
905 pub async fn get_object(&self, req: &GetObjectRequest) -> Result<Object, Error> {
906 let builder = objects::get::build(self.v1_endpoint.as_str(), &self.http, req);
907 self.send(builder).await
908 }
909
910 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
929 pub async fn copy_object(&self, req: &CopyObjectRequest) -> Result<Object, Error> {
930 let builder = objects::copy::build(self.v1_endpoint.as_str(), &self.http, req);
931 self.send(builder).await
932 }
933
934 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
954 pub async fn move_object(&self, req: &MoveObjectRequest) -> Result<Object, Error> {
955 let copy_req: CopyObjectRequest = req.clone().into();
956 let delete_req: DeleteObjectRequest = req.clone().into();
957 let copy_result = self.copy_object(©_req).await?;
959 self.delete_object(&delete_req).await?;
960 Ok(copy_result)
961 }
962
963 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
983 pub async fn download_object(&self, req: &GetObjectRequest, range: &Range) -> Result<Vec<u8>, Error> {
984 let builder = objects::download::build(self.v1_endpoint.as_str(), &self.http, req, range);
985 let request = self.with_headers(builder).await?;
986 let response = request.send().await?;
987 let response = check_response_status(response).await?;
988 Ok(response.bytes().await?.to_vec())
989 }
990
991 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
1013 pub async fn download_streamed_object(
1014 &self,
1015 req: &GetObjectRequest,
1016 range: &Range,
1017 ) -> Result<impl Stream<Item = Result<bytes::Bytes, Error>>, Error> {
1018 let builder = objects::download::build(self.v1_endpoint.as_str(), &self.http, req, range);
1019 let request = self.with_headers(builder).await?;
1020 let response = request.send().await?;
1021 let response = check_response_status(response).await?;
1022 Ok(response.bytes_stream().map_err(Error::from))
1023 }
1024
1025 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
1058 pub async fn upload_object<T: Into<Body>>(
1059 &self,
1060 req: &UploadObjectRequest,
1061 data: T,
1062 upload_type: &UploadType,
1063 ) -> Result<Object, Error> {
1064 match upload_type {
1065 UploadType::Multipart(meta) => {
1066 let builder =
1067 objects::upload::build_multipart(self.v1_upload_endpoint.as_str(), &self.http, req, meta, data)?;
1068 self.send(builder).await
1069 }
1070 UploadType::Simple(media) => {
1071 let builder = objects::upload::build(self.v1_upload_endpoint.as_str(), &self.http, req, media, data);
1072 let builder = self.with_headers(builder).await?;
1073 let mut request = builder.build()?;
1074 if !request.headers().contains_key(CONTENT_LENGTH) {
1076 if let Some(Some(is_empty)) = request.body().map(|b| b.as_bytes().map(|b| b.is_empty())) {
1077 if is_empty {
1078 request
1079 .headers_mut()
1080 .insert(CONTENT_LENGTH, HeaderValue::from_static("0"));
1081 }
1082 }
1083 }
1084 self.send_request(request).await
1085 }
1086 }
1087 }
1088
1089 pub fn get_resumable_upload(&self, url: String) -> ResumableUploadClient {
1093 ResumableUploadClient::new(url, self.http.clone())
1094 }
1095
1096 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
1155 pub async fn prepare_resumable_upload(
1156 &self,
1157 req: &UploadObjectRequest,
1158 upload_type: &UploadType,
1159 ) -> Result<ResumableUploadClient, Error> {
1160 let request = match upload_type {
1161 UploadType::Multipart(meta) => objects::upload::build_resumable_session_metadata(
1162 self.v1_upload_endpoint.as_str(),
1163 &self.http,
1164 req,
1165 meta,
1166 ),
1167 UploadType::Simple(media) => objects::upload::build_resumable_session_simple(
1168 self.v1_upload_endpoint.as_str(),
1169 &self.http,
1170 req,
1171 media,
1172 ),
1173 };
1174 self.send_get_url(request)
1175 .await
1176 .map(|url| ResumableUploadClient::new(url, self.http.clone()))
1177 }
1178
1179 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
1201 pub async fn upload_streamed_object<S>(
1202 &self,
1203 req: &UploadObjectRequest,
1204 data: S,
1205 upload_type: &UploadType,
1206 ) -> Result<Object, Error>
1207 where
1208 S: TryStream + Send + Sync + 'static,
1209 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
1210 bytes::Bytes: From<S::Ok>,
1211 {
1212 self.upload_object(req, Body::wrap_stream(data), upload_type).await
1214 }
1215
1216 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
1234 pub async fn patch_object(&self, req: &PatchObjectRequest) -> Result<Object, Error> {
1235 let builder = objects::patch::build(self.v1_endpoint.as_str(), &self.http, req);
1236 self.send(builder).await
1237 }
1238
1239 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
1257 pub async fn delete_object(&self, req: &DeleteObjectRequest) -> Result<(), Error> {
1258 let builder = objects::delete::build(self.v1_endpoint.as_str(), &self.http, req);
1259 self.send_get_empty(builder).await
1260 }
1261
1262 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
1290 pub async fn rewrite_object(&self, req: &RewriteObjectRequest) -> Result<RewriteObjectResponse, Error> {
1291 let builder = objects::rewrite::build(self.v1_endpoint.as_str(), &self.http, req);
1292 self.send(builder).await
1293 }
1294
1295 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
1320 pub async fn compose_object(&self, req: &ComposeObjectRequest) -> Result<Object, Error> {
1321 let builder = objects::compose::build(self.v1_endpoint.as_str(), &self.http, req);
1322 self.send(builder).await
1323 }
1324
1325 async fn with_headers(&self, builder: RequestBuilder) -> Result<RequestBuilder, Error> {
1326 let builder = builder
1327 .header("X-Goog-Api-Client", "rust")
1328 .header(reqwest::header::USER_AGENT, "google-cloud-storage");
1329 let builder = match &self.ts {
1330 Some(ts) => {
1331 let token = ts.token().await.map_err(Error::TokenSource)?;
1332 builder.header(reqwest::header::AUTHORIZATION, token)
1333 }
1334 None => builder,
1335 };
1336 Ok(builder)
1337 }
1338
1339 async fn send_request<T>(&self, request: Request) -> Result<T, Error>
1340 where
1341 T: serde::de::DeserializeOwned,
1342 {
1343 let response = self.http.execute(request).await?;
1344 let response = check_response_status(response).await?;
1345 Ok(response.json().await?)
1346 }
1347
1348 async fn send<T>(&self, builder: RequestBuilder) -> Result<T, Error>
1349 where
1350 T: serde::de::DeserializeOwned,
1351 {
1352 let builder = self.with_headers(builder).await?;
1353 let response = builder.send().await?;
1354 let response = check_response_status(response).await?;
1355 Ok(response.json().await?)
1356 }
1357
1358 async fn send_get_empty(&self, builder: RequestBuilder) -> Result<(), Error> {
1359 let builder = self.with_headers(builder).await?;
1360 let response = builder.send().await?;
1361 check_response_status(response).await?;
1362 Ok(())
1363 }
1364
1365 async fn send_get_url(&self, builder: RequestBuilder) -> Result<String, Error> {
1366 let builder = self.with_headers(builder).await?;
1367 let response = builder.send().await?;
1368 let response = check_response_status(response).await?;
1369 Ok(String::from_utf8_lossy(response.headers()[LOCATION].as_bytes()).into_owned())
1370 }
1371}
1372
1373#[cfg(test)]
1374pub(crate) mod test {
1375 use std::collections::HashMap;
1376
1377 use bytes::Buf;
1378 use futures_util::StreamExt;
1379 use serial_test::serial;
1380
1381 use google_cloud_auth::project::Config;
1382 use google_cloud_auth::token::DefaultTokenSourceProvider;
1383 use google_cloud_token::TokenSourceProvider;
1384
1385 use crate::http::bucket_access_controls::delete::DeleteBucketAccessControlRequest;
1386 use crate::http::bucket_access_controls::get::GetBucketAccessControlRequest;
1387 use crate::http::bucket_access_controls::insert::{
1388 BucketAccessControlCreationConfig, InsertBucketAccessControlRequest,
1389 };
1390 use crate::http::bucket_access_controls::list::ListBucketAccessControlsRequest;
1391 use crate::http::bucket_access_controls::BucketACLRole;
1392 use crate::http::buckets::delete::DeleteBucketRequest;
1393 use crate::http::buckets::get::GetBucketRequest;
1394 use crate::http::buckets::get_iam_policy::GetIamPolicyRequest;
1395 use crate::http::buckets::iam_configuration::{PublicAccessPrevention, UniformBucketLevelAccess};
1396 use crate::http::buckets::insert::{
1397 BucketCreationConfig, InsertBucketParam, InsertBucketRequest, RetentionPolicyCreationConfig,
1398 };
1399 use crate::http::buckets::list::ListBucketsRequest;
1400 use crate::http::buckets::patch::{BucketPatchConfig, PatchBucketRequest};
1401 use crate::http::buckets::set_iam_policy::SetIamPolicyRequest;
1402 use crate::http::buckets::test_iam_permissions::TestIamPermissionsRequest;
1403 use crate::http::buckets::{lifecycle, Billing, Binding, Cors, IamConfiguration, Lifecycle, Website};
1404 use crate::http::default_object_access_controls::delete::DeleteDefaultObjectAccessControlRequest;
1405 use crate::http::default_object_access_controls::get::GetDefaultObjectAccessControlRequest;
1406 use crate::http::default_object_access_controls::insert::InsertDefaultObjectAccessControlRequest;
1407 use crate::http::default_object_access_controls::list::ListDefaultObjectAccessControlsRequest;
1408 use crate::http::hmac_keys::create::CreateHmacKeyRequest;
1409 use crate::http::hmac_keys::delete::DeleteHmacKeyRequest;
1410 use crate::http::hmac_keys::get::GetHmacKeyRequest;
1411 use crate::http::hmac_keys::list::ListHmacKeysRequest;
1412 use crate::http::hmac_keys::update::UpdateHmacKeyRequest;
1413 use crate::http::hmac_keys::HmacKeyMetadata;
1414 use crate::http::notifications::delete::DeleteNotificationRequest;
1415 use crate::http::notifications::get::GetNotificationRequest;
1416 use crate::http::notifications::insert::{InsertNotificationRequest, NotificationCreationConfig};
1417 use crate::http::notifications::list::ListNotificationsRequest;
1418 use crate::http::notifications::EventType;
1419 use crate::http::object_access_controls::delete::DeleteObjectAccessControlRequest;
1420 use crate::http::object_access_controls::get::GetObjectAccessControlRequest;
1421 use crate::http::object_access_controls::insert::{
1422 InsertObjectAccessControlRequest, ObjectAccessControlCreationConfig,
1423 };
1424 use crate::http::object_access_controls::list::ListObjectAccessControlsRequest;
1425 use crate::http::object_access_controls::ObjectACLRole;
1426 use crate::http::objects::compose::{ComposeObjectRequest, ComposingTargets};
1427 use crate::http::objects::copy::CopyObjectRequest;
1428 use crate::http::objects::delete::DeleteObjectRequest;
1429 use crate::http::objects::download::Range;
1430 use crate::http::objects::get::GetObjectRequest;
1431 use crate::http::objects::list::ListObjectsRequest;
1432 use crate::http::objects::rewrite::RewriteObjectRequest;
1433 use crate::http::objects::upload::{Media, UploadObjectRequest, UploadType};
1434 use crate::http::objects::{Object, SourceObjects};
1435 use crate::http::resumable_upload_client::{ChunkSize, UploadStatus, UploadedRange};
1436 use crate::http::storage_client::{StorageClient, SCOPES};
1437
1438 #[ctor::ctor]
1439 fn init() {
1440 let filter = tracing_subscriber::filter::EnvFilter::from_default_env()
1441 .add_directive("google_cloud_storage=trace".parse().unwrap());
1442 let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
1443 }
1444
1445 pub fn bucket_name(project: &str, name: &str) -> String {
1446 format!("{}_gcrgcs_{}", project, name)
1447 }
1448
1449 async fn client() -> (StorageClient, String, String) {
1450 let tsp = DefaultTokenSourceProvider::new(Config::default().with_scopes(&SCOPES))
1451 .await
1452 .unwrap();
1453 let cred = tsp.source_credentials.clone();
1454 let ts = tsp.token_source();
1455 let client = StorageClient::new(
1456 Some(ts),
1457 "https://storage.googleapis.com",
1458 reqwest_middleware::ClientBuilder::new(reqwest::Client::default()).build(),
1459 );
1460 let cred = cred.unwrap();
1461 (client, cred.project_id.unwrap(), cred.client_email.unwrap())
1462 }
1463
1464 #[tokio::test]
1465 #[serial]
1466 pub async fn list_buckets() {
1467 let (client, project, _) = client().await;
1468 let buckets = client
1469 .list_buckets(&ListBucketsRequest {
1470 project: project.clone(),
1471 max_results: None,
1472 page_token: None,
1473 prefix: Some(bucket_name(&project, "object")),
1474 projection: None,
1475 match_glob: None,
1476 })
1477 .await
1478 .unwrap();
1479 assert_eq!(2, buckets.items.len());
1480 }
1481
1482 #[tokio::test]
1483 #[serial]
1484 pub async fn crud_bucket() {
1485 let (client, project, email) = client().await;
1486 let name = bucket_name(
1487 &project,
1488 &format!("crud_bucket-{}", time::OffsetDateTime::now_utc().unix_timestamp()),
1489 );
1490 let mut labels = HashMap::new();
1491 labels.insert("labelkey".to_string(), "labelvalue".to_string());
1492
1493 let bucket = client
1494 .insert_bucket(&InsertBucketRequest {
1495 name,
1496 param: InsertBucketParam {
1497 project,
1498 ..Default::default()
1499 },
1500 bucket: BucketCreationConfig {
1501 location: "ASIA-NORTHEAST1".to_string(),
1502 storage_class: Some("STANDARD".to_string()),
1503 default_event_based_hold: true,
1504 labels: Some(labels),
1505 website: Some(Website {
1506 main_page_suffix: "_suffix".to_string(),
1507 not_found_page: "notfound.html".to_string(),
1508 }),
1509 iam_configuration: Some(IamConfiguration {
1510 uniform_bucket_level_access: Some(UniformBucketLevelAccess {
1511 enabled: false,
1512 locked_time: None,
1513 }),
1514 public_access_prevention: Some(PublicAccessPrevention::Enforced),
1515 }),
1516 billing: Some(Billing { requester_pays: false }),
1517 retention_policy: Some(RetentionPolicyCreationConfig {
1518 retention_period: 10000,
1519 }),
1520 cors: Some(vec![Cors {
1521 origin: vec!["*".to_string()],
1522 method: vec!["GET".to_string(), "HEAD".to_string()],
1523 response_header: vec!["200".to_string()],
1524 max_age_seconds: 100,
1525 }]),
1526 lifecycle: Some(Lifecycle {
1527 rule: vec![lifecycle::Rule {
1528 action: Some(lifecycle::rule::Action {
1529 r#type: lifecycle::rule::ActionType::Delete,
1530 storage_class: None,
1531 }),
1532 condition: Some(lifecycle::rule::Condition {
1533 age: Some(365),
1534 is_live: Some(true),
1535 ..Default::default()
1536 }),
1537 }],
1538 }),
1539 rpo: None,
1540 ..Default::default()
1541 },
1542 })
1543 .await
1544 .unwrap();
1545
1546 let found = client
1547 .get_bucket(&GetBucketRequest {
1548 bucket: bucket.name.to_string(),
1549 ..Default::default()
1550 })
1551 .await
1552 .unwrap();
1553
1554 assert_eq!(found.location.as_str(), "ASIA-NORTHEAST1");
1555
1556 let entity = format!("user-{}", email);
1557 let patched = client
1558 .patch_bucket(&PatchBucketRequest {
1559 bucket: bucket.name.to_string(),
1560 metadata: Some(BucketPatchConfig {
1561 default_object_acl: Some(vec![ObjectAccessControlCreationConfig {
1562 entity: entity.to_string(),
1563 role: ObjectACLRole::READER,
1564 }]),
1565 ..Default::default()
1566 }),
1567 ..Default::default()
1568 })
1569 .await
1570 .unwrap();
1571
1572 let default_object_acl = patched.default_object_acl.unwrap();
1573 assert_eq!(default_object_acl.len(), 1);
1574 assert_eq!(default_object_acl[0].entity.as_str(), entity);
1575 assert_eq!(default_object_acl[0].role, ObjectACLRole::READER);
1576 assert_eq!(found.storage_class.as_str(), patched.storage_class.as_str());
1577 assert_eq!(found.location.as_str(), patched.location.as_str());
1578
1579 client
1580 .delete_bucket(&DeleteBucketRequest {
1581 bucket: bucket.name,
1582 param: Default::default(),
1583 })
1584 .await
1585 .unwrap();
1586 }
1587
1588 #[tokio::test]
1589 #[serial]
1590 async fn set_get_test_iam() {
1591 let (client, project, email) = client().await;
1592 let bucket_name = bucket_name(&project, "test_iam");
1593 let mut policy = client
1594 .get_iam_policy(&GetIamPolicyRequest {
1595 resource: bucket_name.to_string(),
1596 options_requested_policy_version: None,
1597 })
1598 .await
1599 .unwrap();
1600 policy.bindings.push(Binding {
1601 role: "roles/storage.objectViewer".to_string(),
1602 members: vec![format!("serviceAccount:{}", email)],
1603 condition: None,
1604 });
1605
1606 let mut result = client
1607 .set_iam_policy(&SetIamPolicyRequest {
1608 resource: bucket_name.to_string(),
1609 policy,
1610 })
1611 .await
1612 .unwrap();
1613 assert_eq!(result.bindings.len(), 5);
1614 assert_eq!(result.bindings.pop().unwrap().role, "roles/storage.objectViewer");
1615
1616 let permissions = client
1617 .test_iam_permissions(&TestIamPermissionsRequest {
1618 resource: bucket_name.to_string(),
1619 permissions: vec!["storage.buckets.get".to_string()],
1620 })
1621 .await
1622 .unwrap();
1623 assert_eq!(permissions.permissions[0], "storage.buckets.get");
1624 }
1625
1626 #[tokio::test]
1627 #[serial]
1628 pub async fn crud_default_object_controls() {
1629 let (client, project, email) = client().await;
1630 let bucket_name = bucket_name(&project, "default_object_acl");
1631 let entity = format!("user-{}", email);
1632
1633 client
1634 .insert_default_object_access_control(&InsertDefaultObjectAccessControlRequest {
1635 bucket: bucket_name.to_string(),
1636 object_access_control: ObjectAccessControlCreationConfig {
1637 entity: entity.to_string(),
1638 role: ObjectACLRole::READER,
1639 },
1640 })
1641 .await
1642 .unwrap();
1643
1644 let found = client
1645 .get_default_object_access_control(&GetDefaultObjectAccessControlRequest {
1646 bucket: bucket_name.to_string(),
1647 entity: entity.to_string(),
1648 })
1649 .await
1650 .unwrap();
1651 assert_eq!(found.entity, entity);
1652 assert_eq!(found.role, ObjectACLRole::READER);
1653
1654 let acls = client
1655 .list_default_object_access_controls(&ListDefaultObjectAccessControlsRequest {
1656 bucket: bucket_name.to_string(),
1657 ..Default::default()
1658 })
1659 .await
1660 .unwrap();
1661 assert!(acls.items.is_some());
1662 assert_eq!(4, acls.items.unwrap().len());
1663
1664 client
1665 .delete_default_object_access_control(&DeleteDefaultObjectAccessControlRequest {
1666 bucket: bucket_name.to_string(),
1667 entity: entity.to_string(),
1668 })
1669 .await
1670 .unwrap();
1671 }
1672
1673 #[tokio::test]
1674 #[serial]
1675 pub async fn crud_bucket_access_controls() {
1676 let (client, project, email) = client().await;
1677 let bucket_name = bucket_name(&project, "bucket_acl");
1678
1679 let entity = format!("user-{}", email);
1680 client
1681 .insert_bucket_access_control(&InsertBucketAccessControlRequest {
1682 bucket: bucket_name.to_string(),
1683 acl: BucketAccessControlCreationConfig {
1684 entity: entity.to_string(),
1685 role: BucketACLRole::READER,
1686 },
1687 })
1688 .await
1689 .unwrap();
1690
1691 let found = client
1692 .get_bucket_access_control(&GetBucketAccessControlRequest {
1693 bucket: bucket_name.to_string(),
1694 entity: entity.to_string(),
1695 })
1696 .await
1697 .unwrap();
1698 assert_eq!(found.entity, entity);
1699 assert_eq!(found.role, BucketACLRole::READER);
1700
1701 let acls = client
1702 .list_bucket_access_controls(&ListBucketAccessControlsRequest {
1703 bucket: bucket_name.to_string(),
1704 })
1705 .await
1706 .unwrap();
1707 assert_eq!(4, acls.items.len());
1708
1709 client
1710 .delete_bucket_access_control(&DeleteBucketAccessControlRequest {
1711 bucket: bucket_name.to_string(),
1712 entity: entity.to_string(),
1713 })
1714 .await
1715 .unwrap();
1716 }
1717
1718 #[tokio::test]
1719 #[serial]
1720 pub async fn crud_object_access_controls() {
1721 let (client, project, email) = client().await;
1722 let bucket_name = bucket_name(&project, "object_acl");
1723 let object_name = "test.txt";
1724
1725 let entity = format!("user-{}", email);
1726
1727 client
1728 .insert_object_access_control(&InsertObjectAccessControlRequest {
1729 bucket: bucket_name.to_string(),
1730 object: object_name.to_string(),
1731 generation: None,
1732 acl: ObjectAccessControlCreationConfig {
1733 entity: entity.to_string(),
1734 role: ObjectACLRole::READER,
1735 },
1736 })
1737 .await
1738 .unwrap();
1739
1740 let found = client
1741 .get_object_access_control(&GetObjectAccessControlRequest {
1742 bucket: bucket_name.to_string(),
1743 entity: entity.to_string(),
1744 object: object_name.to_string(),
1745 generation: None,
1746 })
1747 .await
1748 .unwrap();
1749 assert_eq!(found.entity, entity);
1750 assert_eq!(found.role, ObjectACLRole::READER);
1751
1752 let acls = client
1753 .list_object_access_controls(&ListObjectAccessControlsRequest {
1754 bucket: bucket_name.to_string(),
1755 object: object_name.to_string(),
1756 generation: None,
1757 })
1758 .await
1759 .unwrap();
1760 assert_eq!(5, acls.items.len());
1761
1762 client
1763 .delete_object_access_control(&DeleteObjectAccessControlRequest {
1764 bucket: bucket_name.to_string(),
1765 object: object_name.to_string(),
1766 entity: entity.to_string(),
1767 generation: None,
1768 })
1769 .await
1770 .unwrap();
1771 }
1772
1773 #[tokio::test]
1774 #[serial]
1775 pub async fn crud_notification() {
1776 let (client, project, _) = client().await;
1777 let bucket_name = bucket_name(&project, "notification");
1778
1779 let notifications = client
1780 .list_notifications(&ListNotificationsRequest {
1781 bucket: bucket_name.to_string(),
1782 })
1783 .await
1784 .unwrap();
1785
1786 for n in notifications.items.unwrap_or_default() {
1787 client
1788 .delete_notification(&DeleteNotificationRequest {
1789 bucket: bucket_name.to_string(),
1790 notification: n.id.to_string(),
1791 })
1792 .await
1793 .unwrap();
1794 }
1795
1796 let post = client
1797 .insert_notification(&InsertNotificationRequest {
1798 bucket: bucket_name.to_string(),
1799 notification: NotificationCreationConfig {
1800 topic: format!("projects/{project}/topics/{bucket_name}"),
1801 event_types: Some(vec![EventType::ObjectMetadataUpdate, EventType::ObjectDelete]),
1802 object_name_prefix: Some("notification-test".to_string()),
1803 ..Default::default()
1804 },
1805 })
1806 .await
1807 .unwrap();
1808
1809 let found = client
1810 .get_notification(&GetNotificationRequest {
1811 bucket: bucket_name.to_string(),
1812 notification: post.id.to_string(),
1813 })
1814 .await
1815 .unwrap();
1816 assert_eq!(found.id, post.id);
1817 assert_eq!(found.event_types.unwrap().len(), 2);
1818 }
1819
1820 #[tokio::test]
1821 #[serial]
1822 pub async fn crud_hmac_key() {
1823 let (client, project_id, email) = client().await;
1824
1825 let post = client
1826 .create_hmac_key(&CreateHmacKeyRequest {
1827 project_id: project_id.clone(),
1828 service_account_email: email,
1829 })
1830 .await
1831 .unwrap();
1832
1833 let found = client
1834 .get_hmac_key(&GetHmacKeyRequest {
1835 access_id: post.metadata.access_id.to_string(),
1836 project_id: project_id.clone(),
1837 })
1838 .await
1839 .unwrap();
1840 assert_eq!(found.id, post.metadata.id);
1841 assert_eq!(found.state, "ACTIVE");
1842
1843 let keys = client
1844 .list_hmac_keys(&ListHmacKeysRequest {
1845 project_id: project_id.clone(),
1846 ..Default::default()
1847 })
1848 .await
1849 .unwrap();
1850
1851 for n in keys.items.unwrap_or_default() {
1852 let result = client
1853 .update_hmac_key(&UpdateHmacKeyRequest {
1854 access_id: n.access_id.to_string(),
1855 project_id: n.project_id.to_string(),
1856 metadata: HmacKeyMetadata {
1857 state: "INACTIVE".to_string(),
1858 ..n.clone()
1859 },
1860 })
1861 .await
1862 .unwrap();
1863 assert_eq!(result.state, "INACTIVE");
1864
1865 client
1866 .delete_hmac_key(&DeleteHmacKeyRequest {
1867 access_id: n.access_id.to_string(),
1868 project_id: n.project_id.to_string(),
1869 })
1870 .await
1871 .unwrap();
1872 }
1873 }
1874
1875 #[tokio::test]
1876 #[serial]
1877 pub async fn crud_object_with_metadata() {
1878 let (client, project, _) = client().await;
1879 let bucket_name = bucket_name(&project, "object");
1880 let mut metadata = HashMap::<String, String>::new();
1881 metadata.insert("key1".to_string(), "value1".to_string());
1882 let uploaded = client
1883 .upload_object(
1884 &UploadObjectRequest {
1885 bucket: bucket_name.to_string(),
1886 ..Default::default()
1887 },
1888 vec![1, 2, 3, 4, 5, 6, 7],
1889 &UploadType::Multipart(Box::new(Object {
1890 name: "test1_meta".to_string(),
1891 content_type: Some("text/plain".to_string()),
1892 content_language: Some("ja".to_string()),
1893 metadata: Some(metadata),
1894 ..Default::default()
1895 })),
1896 )
1897 .await
1898 .unwrap();
1899 assert_eq!(uploaded.content_type.unwrap(), "text/plain".to_string());
1900 assert_eq!(uploaded.content_language.unwrap(), "ja".to_string());
1901 assert_eq!(uploaded.metadata.unwrap().get("key1").unwrap().clone(), "value1".to_string());
1902
1903 let download = |range: Range| {
1904 let client = client.clone();
1905 let bucket_name = uploaded.bucket.clone();
1906 let object_name = uploaded.name.clone();
1907 async move {
1908 client
1909 .download_object(
1910 &GetObjectRequest {
1911 bucket: bucket_name,
1912 object: object_name,
1913 ..Default::default()
1914 },
1915 &range,
1916 )
1917 .await
1918 .unwrap()
1919 }
1920 };
1921
1922 let object = client
1923 .get_object(&GetObjectRequest {
1924 bucket: uploaded.bucket.clone(),
1925 object: uploaded.name.clone(),
1926 ..Default::default()
1927 })
1928 .await
1929 .unwrap();
1930
1931 assert_eq!(object.content_type.unwrap(), "text/plain".to_string());
1932 assert_eq!(object.content_language.unwrap(), "ja".to_string());
1933 assert_eq!(object.metadata.unwrap().get("key1").unwrap().clone(), "value1".to_string());
1934
1935 let downloaded = download(Range::default()).await;
1936 assert_eq!(downloaded, vec![1, 2, 3, 4, 5, 6, 7]);
1937 }
1938
1939 #[tokio::test]
1940 #[serial]
1941 pub async fn crud_object() {
1942 let (client, project, _) = client().await;
1943 let bucket_name = bucket_name(&project, "object");
1944
1945 let objects = client
1946 .list_objects(&ListObjectsRequest {
1947 bucket: bucket_name.to_string(),
1948 ..Default::default()
1949 })
1950 .await
1951 .unwrap()
1952 .items
1953 .unwrap_or_default();
1954 for o in objects {
1955 client
1956 .delete_object(&DeleteObjectRequest {
1957 bucket: o.bucket.to_string(),
1958 object: o.name.to_string(),
1959 ..Default::default()
1960 })
1961 .await
1962 .unwrap();
1963 }
1964
1965 let mut media = Media::new("test1");
1966 media.content_type = "text/plain".into();
1967 let uploaded = client
1968 .upload_object(
1969 &UploadObjectRequest {
1970 bucket: bucket_name.to_string(),
1971 ..Default::default()
1972 },
1973 vec![1, 2, 3, 4, 5, 6],
1974 &UploadType::Simple(media.clone()),
1975 )
1976 .await
1977 .unwrap();
1978
1979 assert_eq!(uploaded.content_type.unwrap(), "text/plain".to_string());
1980
1981 let media = Media::new("test1_zero");
1982 let uploaded_empty = client
1983 .upload_object(
1984 &UploadObjectRequest {
1985 bucket: bucket_name.to_string(),
1986 ..Default::default()
1987 },
1988 vec![],
1989 &UploadType::Simple(media),
1990 )
1991 .await
1992 .unwrap();
1993
1994 let download = |name: &str, range: Range| {
1995 let client = client.clone();
1996 let bucket_name = uploaded.bucket.clone();
1997 let object_name = name.to_string();
1998 async move {
1999 client
2000 .download_object(
2001 &GetObjectRequest {
2002 bucket: bucket_name,
2003 object: object_name,
2004 ..Default::default()
2005 },
2006 &range,
2007 )
2008 .await
2009 .unwrap()
2010 }
2011 };
2012
2013 let downloaded = download(&uploaded.name, Range::default()).await;
2014 assert_eq!(downloaded, vec![1, 2, 3, 4, 5, 6]);
2015 let downloaded = download(&uploaded.name, Range(Some(1), None)).await;
2016 assert_eq!(downloaded, vec![2, 3, 4, 5, 6]);
2017 let downloaded = download(&uploaded.name, Range(Some(1), Some(2))).await;
2018 assert_eq!(downloaded, vec![2, 3]);
2019 let downloaded = download(&uploaded.name, Range(None, Some(2))).await;
2020 assert_eq!(downloaded, vec![5, 6]);
2021
2022 let downloaded = download(&uploaded_empty.name, Range::default()).await;
2023 assert!(downloaded.is_empty());
2024
2025 let _copied = client
2026 .copy_object(&CopyObjectRequest {
2027 destination_bucket: bucket_name.to_string(),
2028 destination_object: format!("{}_copy", uploaded.name),
2029 source_bucket: bucket_name.to_string(),
2030 source_object: uploaded.name.to_string(),
2031 ..Default::default()
2032 })
2033 .await
2034 .unwrap();
2035
2036 let _rewrited = client
2037 .rewrite_object(&RewriteObjectRequest {
2038 destination_bucket: bucket_name.to_string(),
2039 destination_object: format!("{}_rewrite", uploaded.name),
2040 source_bucket: bucket_name.to_string(),
2041 source_object: uploaded.name.to_string(),
2042 ..Default::default()
2043 })
2044 .await
2045 .unwrap();
2046
2047 let _composed = client
2048 .compose_object(&ComposeObjectRequest {
2049 bucket: bucket_name.to_string(),
2050 destination_object: format!("{}_composed", uploaded.name),
2051 destination_predefined_acl: None,
2052 composing_targets: ComposingTargets {
2053 destination: Some(Object {
2054 content_type: Some("image/jpeg".to_string()),
2055 ..Default::default()
2056 }),
2057 source_objects: vec![SourceObjects {
2058 name: format!("{}_rewrite", uploaded.name),
2059 ..Default::default()
2060 }],
2061 },
2062 ..Default::default()
2063 })
2064 .await
2065 .unwrap();
2066 }
2067
2068 #[tokio::test]
2069 #[serial]
2070 pub async fn streamed_object() {
2071 let (client, project, _) = client().await;
2072 let bucket_name = bucket_name(&project, "object");
2073 let file_name = format!("stream_{}", time::OffsetDateTime::now_utc().unix_timestamp());
2074
2075 let source = vec!["hello", " ", "world"];
2077 let chunks: Vec<Result<_, ::std::io::Error>> = source.clone().into_iter().map(Ok).collect();
2078 let stream = futures_util::stream::iter(chunks);
2079 let media = Media::new(file_name);
2080 let upload_type = UploadType::Simple(media);
2081 let uploaded = client
2082 .upload_streamed_object(
2083 &UploadObjectRequest {
2084 bucket: bucket_name.to_string(),
2085 predefined_acl: None,
2086 ..Default::default()
2087 },
2088 stream,
2089 &upload_type,
2090 )
2091 .await
2092 .unwrap();
2093
2094 let file_name = format!("stream_empty_{}", time::OffsetDateTime::now_utc().unix_timestamp());
2095 let source: Vec<&str> = vec![];
2096 let chunks: Vec<Result<_, ::std::io::Error>> = source.clone().into_iter().map(Ok).collect();
2097 let stream = futures_util::stream::iter(chunks);
2098 let media = Media::new(file_name);
2099 let upload_type = UploadType::Simple(media);
2100 let uploaded_empty = client
2101 .upload_streamed_object(
2102 &UploadObjectRequest {
2103 bucket: bucket_name.to_string(),
2104 predefined_acl: None,
2105 ..Default::default()
2106 },
2107 stream,
2108 &upload_type,
2109 )
2110 .await
2111 .unwrap();
2112
2113 let download = |name: &str, range: Range| {
2114 let client = client.clone();
2115 let bucket_name = uploaded.bucket.clone();
2116 let object_name = name.to_string();
2117 async move {
2118 let mut downloaded = client
2119 .download_streamed_object(
2120 &GetObjectRequest {
2121 bucket: bucket_name,
2122 object: object_name,
2123 ..Default::default()
2124 },
2125 &range,
2126 )
2127 .await
2128 .unwrap();
2129 let mut data = Vec::with_capacity(10);
2130 while let Some(v) = downloaded.next().await {
2131 let d: bytes::Bytes = v.unwrap();
2132 data.extend_from_slice(d.chunk());
2133 }
2134 data
2135 }
2136 };
2137 let downloaded = download(&uploaded.name, Range::default()).await;
2138 assert_eq!("hello world", String::from_utf8_lossy(downloaded.as_slice()));
2139 let downloaded = download(&uploaded.name, Range(Some(1), None)).await;
2140 assert_eq!("ello world", String::from_utf8_lossy(downloaded.as_slice()));
2141 let downloaded = download(&uploaded.name, Range(Some(1), Some(2))).await;
2142 assert_eq!("el", String::from_utf8_lossy(downloaded.as_slice()));
2143 let downloaded = download(&uploaded.name, Range(None, Some(2))).await;
2144 assert_eq!("ld", String::from_utf8_lossy(downloaded.as_slice()));
2145
2146 let downloaded = download(&uploaded_empty.name, Range::default()).await;
2147 assert!(downloaded.is_empty());
2148 }
2149
2150 #[tokio::test]
2151 #[serial]
2152 pub async fn resumable_simple_upload() {
2153 let (client, project, _) = client().await;
2154 let bucket_name = bucket_name(&project, "object");
2155 let file_name = format!("resumable_{}", time::OffsetDateTime::now_utc().unix_timestamp());
2156
2157 let mut media = Media::new(file_name.clone());
2158 media.content_type = "text/plain".into();
2159 let upload_type = UploadType::Simple(media);
2160 let uploader = client
2161 .prepare_resumable_upload(
2162 &UploadObjectRequest {
2163 bucket: bucket_name.to_string(),
2164 ..Default::default()
2165 },
2166 &upload_type,
2167 )
2168 .await
2169 .unwrap();
2170 let data = vec![1, 2, 3, 4, 5];
2171 uploader.upload_single_chunk(data.clone(), 5).await.unwrap();
2172
2173 let get_request = &GetObjectRequest {
2174 bucket: bucket_name.to_string(),
2175 object: file_name.to_string(),
2176 ..Default::default()
2177 };
2178 let download = client.download_object(get_request, &Range::default()).await.unwrap();
2179 assert_eq!(data, download);
2180
2181 let object = client.get_object(get_request).await.unwrap();
2182 assert_eq!(object.content_type.unwrap(), "text/plain");
2183 }
2184
2185 #[tokio::test]
2186 #[serial]
2187 pub async fn resumable_multiple_chunk_upload() {
2188 let (client, project, _) = client().await;
2189 let bucket_name = bucket_name(&project, "object");
2190 let file_name = format!("resumable_multiple_chunk{}", time::OffsetDateTime::now_utc().unix_timestamp());
2191
2192 let metadata = Object {
2193 name: file_name.to_string(),
2194 content_type: Some("video/mp4".to_string()),
2195 ..Default::default()
2196 };
2197 let upload_type = UploadType::Multipart(Box::new(metadata));
2198 let uploader = client
2199 .prepare_resumable_upload(
2200 &UploadObjectRequest {
2201 bucket: bucket_name.to_string(),
2202 ..Default::default()
2203 },
2204 &upload_type,
2205 )
2206 .await
2207 .unwrap();
2208 let mut chunk1_data: Vec<u8> = (0..256 * 1024).map(|i| (i % 256) as u8).collect();
2209 let chunk2_data: Vec<u8> = (1..256 * 1024 + 50).map(|i| (i % 256) as u8).collect();
2210 let total_size = Some(chunk1_data.len() as u64 + chunk2_data.len() as u64);
2211
2212 tracing::info!("start upload chunk {}", uploader.url());
2213 let chunk1 = ChunkSize::new(0, chunk1_data.len() as u64 - 1, total_size);
2214 tracing::info!("upload chunk1 {:?}", chunk1);
2215 let status1 = uploader
2216 .upload_multiple_chunk(chunk1_data.clone(), &chunk1)
2217 .await
2218 .unwrap();
2219
2220 assert_eq!(
2221 status1,
2222 UploadStatus::ResumeIncomplete(UploadedRange {
2223 first_byte: 0,
2224 last_byte: chunk1_data.len() as u64 - 1,
2225 })
2226 );
2227
2228 tracing::info!("check status chunk1");
2229 let status_check = uploader.status(total_size).await.unwrap();
2230 assert_eq!(
2231 status_check,
2232 UploadStatus::ResumeIncomplete(UploadedRange {
2233 first_byte: 0,
2234 last_byte: chunk1_data.len() as u64 - 1,
2235 })
2236 );
2237
2238 let chunk2 = ChunkSize::new(
2239 chunk1_data.len() as u64,
2240 chunk1_data.len() as u64 + chunk2_data.len() as u64 - 1,
2241 total_size,
2242 );
2243 tracing::info!("upload chunk2 {:?}", chunk2);
2244 let status2 = uploader
2245 .upload_multiple_chunk(chunk2_data.clone(), &chunk2)
2246 .await
2247 .unwrap();
2248 assert!(matches!(status2, UploadStatus::Ok(_)));
2249
2250 tracing::info!("check status chunk2");
2251 let status_check2 = uploader.status(total_size).await.unwrap();
2252 assert!(matches!(status_check2, UploadStatus::Ok(_)));
2253
2254 let get_request = &GetObjectRequest {
2255 bucket: bucket_name.to_string(),
2256 object: file_name.to_string(),
2257 ..Default::default()
2258 };
2259
2260 let object = client.get_object(get_request).await.unwrap();
2261 assert_eq!(object.content_type.unwrap(), "video/mp4");
2262
2263 let download = client.download_object(get_request, &Range::default()).await.unwrap();
2264 chunk1_data.extend(chunk2_data);
2265 assert_eq!(chunk1_data, download);
2266 }
2267
2268 #[tokio::test]
2269 #[serial]
2270 pub async fn resumable_upload_cancel() {
2271 let (client, project, _) = client().await;
2272 let bucket_name = bucket_name(&project, "object");
2273 let file_name = format!("resumable_cancel{}", time::OffsetDateTime::now_utc().unix_timestamp());
2274
2275 let metadata = Object {
2276 name: file_name.to_string(),
2277 content_type: Some("video/mp4".to_string()),
2278 ..Default::default()
2279 };
2280 let upload_type = UploadType::Multipart(Box::new(metadata));
2281 let uploader = client
2282 .prepare_resumable_upload(
2283 &UploadObjectRequest {
2284 bucket: bucket_name.to_string(),
2285 ..Default::default()
2286 },
2287 &upload_type,
2288 )
2289 .await
2290 .unwrap();
2291 let cloned = uploader.clone();
2292 uploader.cancel().await.unwrap();
2293
2294 let result = cloned.upload_single_chunk(vec![1], 1).await;
2295 assert!(result.is_err());
2296 }
2297
2298 #[tokio::test]
2299 #[serial]
2300 pub async fn resumable_multiple_chunk_upload_unknown() {
2301 let (client, project, _) = client().await;
2302 let bucket_name = bucket_name(&project, "object");
2303 let file_name = format!(
2304 "resumable_multiple_chunk_unknown{}",
2305 time::OffsetDateTime::now_utc().unix_timestamp()
2306 );
2307
2308 let metadata = Object {
2309 name: file_name.to_string(),
2310 content_type: Some("video/mp4".to_string()),
2311 ..Default::default()
2312 };
2313 let upload_type = UploadType::Multipart(Box::new(metadata));
2314 let uploader = client
2315 .prepare_resumable_upload(
2316 &UploadObjectRequest {
2317 bucket: bucket_name.to_string(),
2318 ..Default::default()
2319 },
2320 &upload_type,
2321 )
2322 .await
2323 .unwrap();
2324 let mut chunk1_data: Vec<u8> = (0..256 * 1024).map(|i| (i % 256) as u8).collect();
2325 let chunk2_data: Vec<u8> = vec![10, 20, 30];
2326 let total_size = None;
2327
2328 tracing::info!("start upload chunk {}", uploader.url());
2329 let chunk1 = ChunkSize::new(0, chunk1_data.len() as u64 - 1, total_size);
2330 tracing::info!("upload chunk1 {:?}", chunk1);
2331 let status1 = uploader
2332 .upload_multiple_chunk(chunk1_data.clone(), &chunk1)
2333 .await
2334 .unwrap();
2335
2336 assert_eq!(
2337 status1,
2338 UploadStatus::ResumeIncomplete(UploadedRange {
2339 first_byte: 0,
2340 last_byte: chunk1_data.len() as u64 - 1,
2341 })
2342 );
2343
2344 tracing::info!("upload chunk1 resume {:?}", chunk1);
2345 let status1 = uploader
2346 .upload_multiple_chunk(chunk1_data.clone(), &chunk1)
2347 .await
2348 .unwrap();
2349
2350 assert_eq!(
2351 status1,
2352 UploadStatus::ResumeIncomplete(UploadedRange {
2353 first_byte: 0,
2354 last_byte: chunk1_data.len() as u64 - 1,
2355 })
2356 );
2357
2358 let remaining = chunk1_data.len() as u64 + chunk2_data.len() as u64;
2360 let chunk2 = ChunkSize::new(chunk1_data.len() as u64, remaining - 1, Some(remaining));
2361 tracing::info!("upload chunk2 {:?}", chunk2);
2362 let status2 = uploader
2363 .upload_multiple_chunk(chunk2_data.clone(), &chunk2)
2364 .await
2365 .unwrap();
2366 assert!(matches!(status2, UploadStatus::Ok(_)));
2367
2368 let get_request = &GetObjectRequest {
2369 bucket: bucket_name.to_string(),
2370 object: file_name.to_string(),
2371 ..Default::default()
2372 };
2373
2374 let object = client.get_object(get_request).await.unwrap();
2375 assert_eq!(object.content_type.unwrap(), "video/mp4");
2376
2377 let download = client.download_object(get_request, &Range::default()).await.unwrap();
2378 chunk1_data.extend(chunk2_data);
2379 assert_eq!(chunk1_data, download);
2380 }
2381}