1use std::sync::Arc;
2
3use futures_util::{Stream, TryStream, TryStreamExt};
4use reqwest::header::{HeaderValue, CONTENT_LENGTH, LOCATION};
5use reqwest::{Body, Request};
6use reqwest_middleware::RequestBuilder;
7
8use token_source::TokenSource;
9
10use crate::http::bucket_access_controls::delete::DeleteBucketAccessControlRequest;
11use crate::http::bucket_access_controls::get::GetBucketAccessControlRequest;
12use crate::http::bucket_access_controls::insert::InsertBucketAccessControlRequest;
13use crate::http::bucket_access_controls::list::{ListBucketAccessControlsRequest, ListBucketAccessControlsResponse};
14use crate::http::bucket_access_controls::patch::PatchBucketAccessControlRequest;
15use crate::http::bucket_access_controls::BucketAccessControl;
16use crate::http::buckets::delete::DeleteBucketRequest;
17use crate::http::buckets::get::GetBucketRequest;
18use crate::http::buckets::get_iam_policy::GetIamPolicyRequest;
19use crate::http::buckets::insert::InsertBucketRequest;
20use crate::http::buckets::list::{ListBucketsRequest, ListBucketsResponse};
21use crate::http::buckets::patch::PatchBucketRequest;
22use crate::http::buckets::set_iam_policy::SetIamPolicyRequest;
23use crate::http::buckets::test_iam_permissions::{TestIamPermissionsRequest, TestIamPermissionsResponse};
24use crate::http::buckets::{Bucket, Policy};
25use crate::http::default_object_access_controls::delete::DeleteDefaultObjectAccessControlRequest;
26use crate::http::default_object_access_controls::get::GetDefaultObjectAccessControlRequest;
27use crate::http::default_object_access_controls::insert::InsertDefaultObjectAccessControlRequest;
28use crate::http::default_object_access_controls::list::{
29 ListDefaultObjectAccessControlsRequest, ListDefaultObjectAccessControlsResponse,
30};
31use crate::http::default_object_access_controls::patch::PatchDefaultObjectAccessControlRequest;
32use crate::http::hmac_keys::create::{CreateHmacKeyRequest, CreateHmacKeyResponse};
33use crate::http::hmac_keys::delete::DeleteHmacKeyRequest;
34use crate::http::hmac_keys::get::GetHmacKeyRequest;
35use crate::http::hmac_keys::list::{ListHmacKeysRequest, ListHmacKeysResponse};
36use crate::http::hmac_keys::update::UpdateHmacKeyRequest;
37use crate::http::hmac_keys::HmacKeyMetadata;
38use crate::http::notifications::delete::DeleteNotificationRequest;
39use crate::http::notifications::get::GetNotificationRequest;
40use crate::http::notifications::insert::InsertNotificationRequest;
41use crate::http::notifications::list::{ListNotificationsRequest, ListNotificationsResponse};
42use crate::http::notifications::Notification;
43use crate::http::object_access_controls::delete::DeleteObjectAccessControlRequest;
44use crate::http::object_access_controls::get::GetObjectAccessControlRequest;
45use crate::http::object_access_controls::insert::InsertObjectAccessControlRequest;
46use crate::http::object_access_controls::list::ListObjectAccessControlsRequest;
47use crate::http::object_access_controls::patch::PatchObjectAccessControlRequest;
48use crate::http::object_access_controls::ObjectAccessControl;
49use crate::http::objects::compose::ComposeObjectRequest;
50use crate::http::objects::copy::CopyObjectRequest;
51use crate::http::objects::delete::DeleteObjectRequest;
52use crate::http::objects::download::Range;
53use crate::http::objects::get::GetObjectRequest;
54use crate::http::objects::list::{ListObjectsRequest, ListObjectsResponse};
55use crate::http::objects::patch::PatchObjectRequest;
56use crate::http::objects::r#move::MoveObjectRequest;
57use crate::http::objects::rewrite::{RewriteObjectRequest, RewriteObjectResponse};
58use crate::http::objects::upload::{UploadObjectRequest, UploadType};
59use crate::http::objects::Object;
60use crate::http::resumable_upload_client::ResumableUploadClient;
61use crate::http::{
62 bucket_access_controls, buckets, check_response_status, default_object_access_controls, hmac_keys, notifications,
63 object_access_controls, objects, Error,
64};
65
66pub const AUDIENCE: &str = "https://storage.googleapis.com/";
67pub const SCOPES: [&str; 2] = [
68 "https://www.googleapis.com/auth/cloud-platform",
69 "https://www.googleapis.com/auth/devstorage.full_control",
70];
71
72#[derive(Clone)]
73pub struct StorageClient {
74 ts: Option<Arc<dyn TokenSource>>,
75 v1_endpoint: String,
76 v1_upload_endpoint: String,
77 http: reqwest_middleware::ClientWithMiddleware,
78}
79
80impl StorageClient {
81 pub(crate) fn new(
82 ts: Option<Arc<dyn TokenSource>>,
83 endpoint: &str,
84 http: reqwest_middleware::ClientWithMiddleware,
85 ) -> Self {
86 Self {
87 ts,
88 v1_endpoint: format!("{endpoint}/storage/v1"),
89 v1_upload_endpoint: format!("{endpoint}/upload/storage/v1"),
90 http,
91 }
92 }
93
94 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
109 pub async fn delete_bucket(&self, req: &DeleteBucketRequest) -> Result<(), Error> {
110 let builder = buckets::delete::build(self.v1_endpoint.as_str(), &self.http, req);
111 self.send_get_empty(builder).await
112 }
113
114 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
133 pub async fn insert_bucket(&self, req: &InsertBucketRequest) -> Result<Bucket, Error> {
134 let builder = buckets::insert::build(self.v1_endpoint.as_str(), &self.http, req);
135 self.send(builder).await
136 }
137
138 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
153 pub async fn get_bucket(&self, req: &GetBucketRequest) -> Result<Bucket, Error> {
154 let builder = buckets::get::build(self.v1_endpoint.as_str(), &self.http, req);
155 self.send(builder).await
156 }
157
158 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
179 pub async fn patch_bucket(&self, req: &PatchBucketRequest) -> Result<Bucket, Error> {
180 let builder = buckets::patch::build(self.v1_endpoint.as_str(), &self.http, req);
181 self.send(builder).await
182 }
183
184 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
199 pub async fn list_buckets(&self, req: &ListBucketsRequest) -> Result<ListBucketsResponse, Error> {
200 let builder = buckets::list::build(self.v1_endpoint.as_str(), &self.http, req);
201 self.send(builder).await
202 }
203
204 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
228 pub async fn set_iam_policy(&self, req: &SetIamPolicyRequest) -> Result<Policy, Error> {
229 let builder = buckets::set_iam_policy::build(self.v1_endpoint.as_str(), &self.http, req);
230 self.send(builder).await
231 }
232
233 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
249 pub async fn get_iam_policy(&self, req: &GetIamPolicyRequest) -> Result<Policy, Error> {
250 let builder = buckets::get_iam_policy::build(self.v1_endpoint.as_str(), &self.http, req);
251 self.send(builder).await
252 }
253
254 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
269 pub async fn test_iam_permissions(
270 &self,
271 req: &TestIamPermissionsRequest,
272 ) -> Result<TestIamPermissionsResponse, Error> {
273 let builder = buckets::test_iam_permissions::build(self.v1_endpoint.as_str(), &self.http, req);
274 self.send(builder).await
275 }
276
277 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
293 pub async fn list_default_object_access_controls(
294 &self,
295 req: &ListDefaultObjectAccessControlsRequest,
296 ) -> Result<ListDefaultObjectAccessControlsResponse, Error> {
297 let builder = default_object_access_controls::list::build(self.v1_endpoint.as_str(), &self.http, req);
298 self.send(builder).await
299 }
300
301 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
316 pub async fn get_default_object_access_control(
317 &self,
318 req: &GetDefaultObjectAccessControlRequest,
319 ) -> Result<ObjectAccessControl, Error> {
320 let builder = default_object_access_controls::get::build(self.v1_endpoint.as_str(), &self.http, req);
321 self.send(builder).await
322 }
323
324 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
344 pub async fn insert_default_object_access_control(
345 &self,
346 req: &InsertDefaultObjectAccessControlRequest,
347 ) -> Result<ObjectAccessControl, Error> {
348 let builder = default_object_access_controls::insert::build(self.v1_endpoint.as_str(), &self.http, req);
349 self.send(builder).await
350 }
351
352 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
374 pub async fn patch_default_object_access_control(
375 &self,
376 req: &PatchDefaultObjectAccessControlRequest,
377 ) -> Result<ObjectAccessControl, Error> {
378 let builder = default_object_access_controls::patch::build(self.v1_endpoint.as_str(), &self.http, req);
379 self.send(builder).await
380 }
381
382 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
397 pub async fn delete_default_object_access_control(
398 &self,
399 req: &DeleteDefaultObjectAccessControlRequest,
400 ) -> Result<(), Error> {
401 let builder = default_object_access_controls::delete::build(self.v1_endpoint.as_str(), &self.http, req);
402 self.send_get_empty(builder).await
403 }
404
405 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
419 pub async fn list_bucket_access_controls(
420 &self,
421 req: &ListBucketAccessControlsRequest,
422 ) -> Result<ListBucketAccessControlsResponse, Error> {
423 let builder = bucket_access_controls::list::build(self.v1_endpoint.as_str(), &self.http, req);
424 self.send(builder).await
425 }
426
427 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
442 pub async fn get_bucket_access_control(
443 &self,
444 req: &GetBucketAccessControlRequest,
445 ) -> Result<BucketAccessControl, Error> {
446 let builder = bucket_access_controls::get::build(self.v1_endpoint.as_str(), &self.http, req);
447 self.send(builder).await
448 }
449
450 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
469 pub async fn insert_bucket_access_control(
470 &self,
471 req: &InsertBucketAccessControlRequest,
472 ) -> Result<BucketAccessControl, Error> {
473 let builder = bucket_access_controls::insert::build(self.v1_endpoint.as_str(), &self.http, req);
474 self.send(builder).await
475 }
476
477 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
498 pub async fn patch_bucket_access_control(
499 &self,
500 req: &PatchBucketAccessControlRequest,
501 ) -> Result<BucketAccessControl, Error> {
502 let builder = bucket_access_controls::patch::build(self.v1_endpoint.as_str(), &self.http, req);
503 self.send(builder).await
504 }
505
506 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
520 pub async fn delete_bucket_access_control(&self, req: &DeleteBucketAccessControlRequest) -> Result<(), Error> {
521 let builder = bucket_access_controls::delete::build(self.v1_endpoint.as_str(), &self.http, req);
522 self.send_get_empty(builder).await
523 }
524
525 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
541 pub async fn list_object_access_controls(
542 &self,
543 req: &ListObjectAccessControlsRequest,
544 ) -> Result<ListBucketAccessControlsResponse, Error> {
545 let builder = object_access_controls::list::build(self.v1_endpoint.as_str(), &self.http, req);
546 self.send(builder).await
547 }
548
549 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
568 pub async fn get_object_access_control(
569 &self,
570 req: &GetObjectAccessControlRequest,
571 ) -> Result<ObjectAccessControl, Error> {
572 let builder = object_access_controls::get::build(self.v1_endpoint.as_str(), &self.http, req);
573 self.send(builder).await
574 }
575
576 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
599 pub async fn insert_object_access_control(
600 &self,
601 req: &InsertObjectAccessControlRequest,
602 ) -> Result<ObjectAccessControl, Error> {
603 let builder = object_access_controls::insert::build(self.v1_endpoint.as_str(), &self.http, req);
604 self.send(builder).await
605 }
606
607 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
631 pub async fn patch_object_access_control(
632 &self,
633 req: &PatchObjectAccessControlRequest,
634 ) -> Result<ObjectAccessControl, Error> {
635 let builder = object_access_controls::patch::build(self.v1_endpoint.as_str(), &self.http, req);
636 self.send(builder).await
637 }
638
639 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
657 pub async fn delete_object_access_control(&self, req: &DeleteObjectAccessControlRequest) -> Result<(), Error> {
658 let builder = object_access_controls::delete::build(self.v1_endpoint.as_str(), &self.http, req);
659 self.send_get_empty(builder).await
660 }
661
662 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
677 pub async fn list_notifications(&self, req: &ListNotificationsRequest) -> Result<ListNotificationsResponse, Error> {
678 let builder = notifications::list::build(self.v1_endpoint.as_str(), &self.http, req);
679 self.send(builder).await
680 }
681
682 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
699 pub async fn get_notification(&self, req: &GetNotificationRequest) -> Result<Notification, Error> {
700 let builder = notifications::get::build(self.v1_endpoint.as_str(), &self.http, req);
701 self.send(builder).await
702 }
703
704 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
726 pub async fn insert_notification(&self, req: &InsertNotificationRequest) -> Result<Notification, Error> {
727 let builder = notifications::insert::build(self.v1_endpoint.as_str(), &self.http, req);
728 self.send(builder).await
729 }
730
731 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
748 pub async fn delete_notification(&self, req: &DeleteNotificationRequest) -> Result<(), Error> {
749 let builder = notifications::delete::build(self.v1_endpoint.as_str(), &self.http, req);
750 self.send_get_empty(builder).await
751 }
752
753 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
770 pub async fn list_hmac_keys(&self, req: &ListHmacKeysRequest) -> Result<ListHmacKeysResponse, Error> {
771 let builder = hmac_keys::list::build(self.v1_endpoint.as_str(), &self.http, req);
772 self.send(builder).await
773 }
774
775 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
792 pub async fn get_hmac_key(&self, req: &GetHmacKeyRequest) -> Result<HmacKeyMetadata, Error> {
793 let builder = hmac_keys::get::build(self.v1_endpoint.as_str(), &self.http, req);
794 self.send(builder).await
795 }
796
797 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
814 pub async fn create_hmac_key(&self, req: &CreateHmacKeyRequest) -> Result<CreateHmacKeyResponse, Error> {
815 let builder = hmac_keys::create::build(self.v1_endpoint.as_str(), &self.http, req);
816 self.send(builder).await
817 }
818
819 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
841 pub async fn update_hmac_key(&self, req: &UpdateHmacKeyRequest) -> Result<HmacKeyMetadata, Error> {
842 let builder = hmac_keys::update::build(self.v1_endpoint.as_str(), &self.http, req);
843 self.send(builder).await
844 }
845
846 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
863 pub async fn delete_hmac_key(&self, req: &DeleteHmacKeyRequest) -> Result<(), Error> {
864 let builder = hmac_keys::delete::build(self.v1_endpoint.as_str(), &self.http, req);
865 self.send_get_empty(builder).await
866 }
867
868 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
885 pub async fn list_objects(&self, req: &ListObjectsRequest) -> Result<ListObjectsResponse, Error> {
886 let builder = objects::list::build(self.v1_endpoint.as_str(), &self.http, req);
887 self.send(builder).await
888 }
889
890 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
906 pub async fn get_object(&self, req: &GetObjectRequest) -> Result<Object, Error> {
907 let builder = objects::get::build(self.v1_endpoint.as_str(), &self.http, req);
908 self.send(builder).await
909 }
910
911 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
930 pub async fn copy_object(&self, req: &CopyObjectRequest) -> Result<Object, Error> {
931 let builder = objects::copy::build(self.v1_endpoint.as_str(), &self.http, req);
932 self.send(builder).await
933 }
934
935 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
955 pub async fn move_object(&self, req: &MoveObjectRequest) -> Result<Object, Error> {
956 let copy_req: CopyObjectRequest = req.clone().into();
957 let delete_req: DeleteObjectRequest = req.clone().into();
958 let copy_result = self.copy_object(©_req).await?;
960 self.delete_object(&delete_req).await?;
961 Ok(copy_result)
962 }
963
964 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
984 pub async fn download_object(&self, req: &GetObjectRequest, range: &Range) -> Result<Vec<u8>, Error> {
985 let builder = objects::download::build(self.v1_endpoint.as_str(), &self.http, req, range);
986 let request = self.with_headers(builder).await?;
987 let response = request.send().await?;
988 let response = check_response_status(response).await?;
989 Ok(response.bytes().await?.to_vec())
990 }
991
992 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
1014 pub async fn download_streamed_object(
1015 &self,
1016 req: &GetObjectRequest,
1017 range: &Range,
1018 ) -> Result<impl Stream<Item = Result<bytes::Bytes, Error>>, Error> {
1019 let builder = objects::download::build(self.v1_endpoint.as_str(), &self.http, req, range);
1020 let request = self.with_headers(builder).await?;
1021 let response = request.send().await?;
1022 let response = check_response_status(response).await?;
1023 Ok(response.bytes_stream().map_err(Error::from))
1024 }
1025
1026 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
1059 pub async fn upload_object<T: Into<Body>>(
1060 &self,
1061 req: &UploadObjectRequest,
1062 data: T,
1063 upload_type: &UploadType,
1064 ) -> Result<Object, Error> {
1065 match upload_type {
1066 UploadType::Multipart(meta) => {
1067 let builder =
1068 objects::upload::build_multipart(self.v1_upload_endpoint.as_str(), &self.http, req, meta, data)?;
1069 self.send(builder).await
1070 }
1071 UploadType::Simple(media) => {
1072 let builder = objects::upload::build(self.v1_upload_endpoint.as_str(), &self.http, req, media, data);
1073 let builder = self.with_headers(builder).await?;
1074 let mut request = builder.build()?;
1075 if !request.headers().contains_key(CONTENT_LENGTH) {
1077 if let Some(Some(is_empty)) = request.body().map(|b| b.as_bytes().map(|b| b.is_empty())) {
1078 if is_empty {
1079 request
1080 .headers_mut()
1081 .insert(CONTENT_LENGTH, HeaderValue::from_static("0"));
1082 }
1083 }
1084 }
1085 self.send_request(request).await
1086 }
1087 }
1088 }
1089
1090 pub fn get_resumable_upload(&self, url: String) -> ResumableUploadClient {
1094 ResumableUploadClient::new(url, self.http.clone())
1095 }
1096
1097 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
1156 pub async fn prepare_resumable_upload(
1157 &self,
1158 req: &UploadObjectRequest,
1159 upload_type: &UploadType,
1160 ) -> Result<ResumableUploadClient, Error> {
1161 let request = match upload_type {
1162 UploadType::Multipart(meta) => objects::upload::build_resumable_session_metadata(
1163 self.v1_upload_endpoint.as_str(),
1164 &self.http,
1165 req,
1166 meta,
1167 ),
1168 UploadType::Simple(media) => objects::upload::build_resumable_session_simple(
1169 self.v1_upload_endpoint.as_str(),
1170 &self.http,
1171 req,
1172 media,
1173 ),
1174 };
1175 self.send_get_url(request)
1176 .await
1177 .map(|url| ResumableUploadClient::new(url, self.http.clone()))
1178 }
1179
1180 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
1202 pub async fn upload_streamed_object<S>(
1203 &self,
1204 req: &UploadObjectRequest,
1205 data: S,
1206 upload_type: &UploadType,
1207 ) -> Result<Object, Error>
1208 where
1209 S: TryStream + Send + 'static,
1210 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
1211 bytes::Bytes: From<S::Ok>,
1212 {
1213 self.upload_object(req, Body::wrap_stream(data), upload_type).await
1215 }
1216
1217 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
1235 pub async fn patch_object(&self, req: &PatchObjectRequest) -> Result<Object, Error> {
1236 let builder = objects::patch::build(self.v1_endpoint.as_str(), &self.http, req);
1237 self.send(builder).await
1238 }
1239
1240 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
1258 pub async fn delete_object(&self, req: &DeleteObjectRequest) -> Result<(), Error> {
1259 let builder = objects::delete::build(self.v1_endpoint.as_str(), &self.http, req);
1260 self.send_get_empty(builder).await
1261 }
1262
1263 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
1291 pub async fn rewrite_object(&self, req: &RewriteObjectRequest) -> Result<RewriteObjectResponse, Error> {
1292 let builder = objects::rewrite::build(self.v1_endpoint.as_str(), &self.http, req);
1293 self.send(builder).await
1294 }
1295
1296 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
1321 pub async fn compose_object(&self, req: &ComposeObjectRequest) -> Result<Object, Error> {
1322 let builder = objects::compose::build(self.v1_endpoint.as_str(), &self.http, req);
1323 self.send(builder).await
1324 }
1325
1326 async fn with_headers(&self, builder: RequestBuilder) -> Result<RequestBuilder, Error> {
1327 let builder = builder
1328 .header("X-Goog-Api-Client", "rust")
1329 .header(reqwest::header::USER_AGENT, "google-cloud-storage");
1330 let builder = match &self.ts {
1331 Some(ts) => {
1332 let token = ts.token().await.map_err(Error::TokenSource)?;
1333 builder.header(reqwest::header::AUTHORIZATION, token)
1334 }
1335 None => builder,
1336 };
1337 Ok(builder)
1338 }
1339
1340 async fn send_request<T>(&self, request: Request) -> Result<T, Error>
1341 where
1342 T: serde::de::DeserializeOwned,
1343 {
1344 let response = self.http.execute(request).await?;
1345 let response = check_response_status(response).await?;
1346 Ok(response.json().await?)
1347 }
1348
1349 async fn send<T>(&self, builder: RequestBuilder) -> Result<T, Error>
1350 where
1351 T: serde::de::DeserializeOwned,
1352 {
1353 let builder = self.with_headers(builder).await?;
1354 let response = builder.send().await?;
1355 let response = check_response_status(response).await?;
1356 Ok(response.json().await?)
1357 }
1358
1359 async fn send_get_empty(&self, builder: RequestBuilder) -> Result<(), Error> {
1360 let builder = self.with_headers(builder).await?;
1361 let response = builder.send().await?;
1362 check_response_status(response).await?;
1363 Ok(())
1364 }
1365
1366 async fn send_get_url(&self, builder: RequestBuilder) -> Result<String, Error> {
1367 let builder = self.with_headers(builder).await?;
1368 let response = builder.send().await?;
1369 let response = check_response_status(response).await?;
1370 Ok(String::from_utf8_lossy(response.headers()[LOCATION].as_bytes()).into_owned())
1371 }
1372}
1373
1374#[cfg(test)]
1375pub(crate) mod test {
1376 use std::collections::HashMap;
1377
1378 use bytes::Buf;
1379 use futures_util::StreamExt;
1380 use serial_test::serial;
1381
1382 use google_cloud_auth::project::Config;
1383 use google_cloud_auth::token::DefaultTokenSourceProvider;
1384 use token_source::TokenSourceProvider;
1385
1386 use crate::http::bucket_access_controls::delete::DeleteBucketAccessControlRequest;
1387 use crate::http::bucket_access_controls::get::GetBucketAccessControlRequest;
1388 use crate::http::bucket_access_controls::insert::{
1389 BucketAccessControlCreationConfig, InsertBucketAccessControlRequest,
1390 };
1391 use crate::http::bucket_access_controls::list::ListBucketAccessControlsRequest;
1392 use crate::http::bucket_access_controls::BucketACLRole;
1393 use crate::http::buckets::delete::DeleteBucketRequest;
1394 use crate::http::buckets::get::GetBucketRequest;
1395 use crate::http::buckets::get_iam_policy::GetIamPolicyRequest;
1396 use crate::http::buckets::iam_configuration::{PublicAccessPrevention, UniformBucketLevelAccess};
1397 use crate::http::buckets::insert::{
1398 BucketCreationConfig, InsertBucketParam, InsertBucketRequest, RetentionPolicyCreationConfig,
1399 };
1400 use crate::http::buckets::list::ListBucketsRequest;
1401 use crate::http::buckets::patch::{BucketPatchConfig, PatchBucketRequest};
1402 use crate::http::buckets::set_iam_policy::SetIamPolicyRequest;
1403 use crate::http::buckets::test_iam_permissions::TestIamPermissionsRequest;
1404 use crate::http::buckets::{lifecycle, Billing, Binding, Cors, IamConfiguration, Lifecycle, Website};
1405 use crate::http::default_object_access_controls::delete::DeleteDefaultObjectAccessControlRequest;
1406 use crate::http::default_object_access_controls::get::GetDefaultObjectAccessControlRequest;
1407 use crate::http::default_object_access_controls::insert::InsertDefaultObjectAccessControlRequest;
1408 use crate::http::default_object_access_controls::list::ListDefaultObjectAccessControlsRequest;
1409 use crate::http::hmac_keys::create::CreateHmacKeyRequest;
1410 use crate::http::hmac_keys::delete::DeleteHmacKeyRequest;
1411 use crate::http::hmac_keys::get::GetHmacKeyRequest;
1412 use crate::http::hmac_keys::list::ListHmacKeysRequest;
1413 use crate::http::hmac_keys::update::UpdateHmacKeyRequest;
1414 use crate::http::hmac_keys::HmacKeyMetadata;
1415 use crate::http::notifications::delete::DeleteNotificationRequest;
1416 use crate::http::notifications::get::GetNotificationRequest;
1417 use crate::http::notifications::insert::{InsertNotificationRequest, NotificationCreationConfig};
1418 use crate::http::notifications::list::ListNotificationsRequest;
1419 use crate::http::notifications::EventType;
1420 use crate::http::object_access_controls::delete::DeleteObjectAccessControlRequest;
1421 use crate::http::object_access_controls::get::GetObjectAccessControlRequest;
1422 use crate::http::object_access_controls::insert::{
1423 InsertObjectAccessControlRequest, ObjectAccessControlCreationConfig,
1424 };
1425 use crate::http::object_access_controls::list::ListObjectAccessControlsRequest;
1426 use crate::http::object_access_controls::ObjectACLRole;
1427 use crate::http::objects::compose::{ComposeObjectRequest, ComposingTargets};
1428 use crate::http::objects::copy::CopyObjectRequest;
1429 use crate::http::objects::delete::DeleteObjectRequest;
1430 use crate::http::objects::download::Range;
1431 use crate::http::objects::get::GetObjectRequest;
1432 use crate::http::objects::list::ListObjectsRequest;
1433 use crate::http::objects::rewrite::RewriteObjectRequest;
1434 use crate::http::objects::upload::{Media, UploadObjectRequest, UploadType};
1435 use crate::http::objects::{Object, SourceObjects};
1436 use crate::http::resumable_upload_client::{ChunkSize, UploadStatus, UploadedRange};
1437 use crate::http::storage_client::{StorageClient, SCOPES};
1438
1439 #[ctor::ctor]
1440 fn init() {
1441 let filter = tracing_subscriber::filter::EnvFilter::from_default_env()
1442 .add_directive("google_cloud_storage=trace".parse().unwrap());
1443 let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
1444 }
1445
1446 pub fn bucket_name(project: &str, name: &str) -> String {
1447 format!("{project}_gcrgcs_{name}")
1448 }
1449
1450 async fn client() -> (StorageClient, String, String) {
1451 let tsp = DefaultTokenSourceProvider::new(Config::default().with_scopes(&SCOPES))
1452 .await
1453 .unwrap();
1454 let cred = tsp.source_credentials.clone();
1455 let ts = tsp.token_source();
1456 let client = StorageClient::new(
1457 Some(ts),
1458 "https://storage.googleapis.com",
1459 reqwest_middleware::ClientBuilder::new(reqwest::Client::default()).build(),
1460 );
1461 let cred = cred.unwrap();
1462 (client, cred.project_id.unwrap(), cred.client_email.unwrap())
1463 }
1464
1465 #[tokio::test]
1466 #[serial]
1467 pub async fn list_buckets() {
1468 let (client, project, _) = client().await;
1469 let buckets = client
1470 .list_buckets(&ListBucketsRequest {
1471 project: project.clone(),
1472 max_results: None,
1473 page_token: None,
1474 prefix: Some(bucket_name(&project, "object")),
1475 projection: None,
1476 match_glob: None,
1477 })
1478 .await
1479 .unwrap();
1480 assert_eq!(2, buckets.items.len());
1481 }
1482
1483 #[tokio::test]
1484 #[serial]
1485 pub async fn crud_bucket() {
1486 let (client, project, email) = client().await;
1487 let name = bucket_name(
1488 &project,
1489 &format!("crud_bucket-{}", time::OffsetDateTime::now_utc().unix_timestamp()),
1490 );
1491 let mut labels = HashMap::new();
1492 labels.insert("labelkey".to_string(), "labelvalue".to_string());
1493
1494 let bucket = client
1495 .insert_bucket(&InsertBucketRequest {
1496 name,
1497 param: InsertBucketParam {
1498 project,
1499 ..Default::default()
1500 },
1501 bucket: BucketCreationConfig {
1502 location: "ASIA-NORTHEAST1".to_string(),
1503 storage_class: Some("STANDARD".to_string()),
1504 default_event_based_hold: true,
1505 labels: Some(labels),
1506 website: Some(Website {
1507 main_page_suffix: "_suffix".to_string(),
1508 not_found_page: "notfound.html".to_string(),
1509 }),
1510 iam_configuration: Some(IamConfiguration {
1511 uniform_bucket_level_access: Some(UniformBucketLevelAccess {
1512 enabled: false,
1513 locked_time: None,
1514 }),
1515 public_access_prevention: Some(PublicAccessPrevention::Enforced),
1516 }),
1517 billing: Some(Billing { requester_pays: false }),
1518 retention_policy: Some(RetentionPolicyCreationConfig {
1519 retention_period: 10000,
1520 }),
1521 cors: Some(vec![Cors {
1522 origin: vec!["*".to_string()],
1523 method: vec!["GET".to_string(), "HEAD".to_string()],
1524 response_header: vec!["200".to_string()],
1525 max_age_seconds: 100,
1526 }]),
1527 lifecycle: Some(Lifecycle {
1528 rule: vec![lifecycle::Rule {
1529 action: Some(lifecycle::rule::Action {
1530 r#type: lifecycle::rule::ActionType::Delete,
1531 storage_class: None,
1532 }),
1533 condition: Some(lifecycle::rule::Condition {
1534 age: Some(365),
1535 is_live: Some(true),
1536 ..Default::default()
1537 }),
1538 }],
1539 }),
1540 rpo: None,
1541 ..Default::default()
1542 },
1543 })
1544 .await
1545 .unwrap();
1546
1547 let found = client
1548 .get_bucket(&GetBucketRequest {
1549 bucket: bucket.name.to_string(),
1550 ..Default::default()
1551 })
1552 .await
1553 .unwrap();
1554
1555 assert_eq!(found.location.as_str(), "ASIA-NORTHEAST1");
1556
1557 let entity = format!("user-{email}");
1558 let patched = client
1559 .patch_bucket(&PatchBucketRequest {
1560 bucket: bucket.name.to_string(),
1561 metadata: Some(BucketPatchConfig {
1562 default_object_acl: Some(vec![ObjectAccessControlCreationConfig {
1563 entity: entity.to_string(),
1564 role: ObjectACLRole::READER,
1565 }]),
1566 ..Default::default()
1567 }),
1568 ..Default::default()
1569 })
1570 .await
1571 .unwrap();
1572
1573 let default_object_acl = patched.default_object_acl.unwrap();
1574 assert_eq!(default_object_acl.len(), 1);
1575 assert_eq!(default_object_acl[0].entity.as_str(), entity);
1576 assert_eq!(default_object_acl[0].role, ObjectACLRole::READER);
1577 assert_eq!(found.storage_class.as_str(), patched.storage_class.as_str());
1578 assert_eq!(found.location.as_str(), patched.location.as_str());
1579
1580 client
1581 .delete_bucket(&DeleteBucketRequest {
1582 bucket: bucket.name,
1583 param: Default::default(),
1584 })
1585 .await
1586 .unwrap();
1587 }
1588
1589 #[tokio::test]
1590 #[serial]
1591 async fn set_get_test_iam() {
1592 let (client, project, email) = client().await;
1593 let bucket_name = bucket_name(&project, "test_iam");
1594 let mut policy = client
1595 .get_iam_policy(&GetIamPolicyRequest {
1596 resource: bucket_name.to_string(),
1597 options_requested_policy_version: None,
1598 })
1599 .await
1600 .unwrap();
1601 policy.bindings.push(Binding {
1602 role: "roles/storage.objectViewer".to_string(),
1603 members: vec![format!("serviceAccount:{}", email)],
1604 condition: None,
1605 });
1606
1607 let mut result = client
1608 .set_iam_policy(&SetIamPolicyRequest {
1609 resource: bucket_name.to_string(),
1610 policy,
1611 })
1612 .await
1613 .unwrap();
1614 assert_eq!(result.bindings.len(), 5);
1615 assert_eq!(result.bindings.pop().unwrap().role, "roles/storage.objectViewer");
1616
1617 let permissions = client
1618 .test_iam_permissions(&TestIamPermissionsRequest {
1619 resource: bucket_name.to_string(),
1620 permissions: vec!["storage.buckets.get".to_string()],
1621 })
1622 .await
1623 .unwrap();
1624 assert_eq!(permissions.permissions[0], "storage.buckets.get");
1625 }
1626
1627 #[tokio::test]
1628 #[serial]
1629 pub async fn crud_default_object_controls() {
1630 let (client, project, email) = client().await;
1631 let bucket_name = bucket_name(&project, "default_object_acl");
1632 let entity = format!("user-{email}");
1633
1634 client
1635 .insert_default_object_access_control(&InsertDefaultObjectAccessControlRequest {
1636 bucket: bucket_name.to_string(),
1637 object_access_control: ObjectAccessControlCreationConfig {
1638 entity: entity.to_string(),
1639 role: ObjectACLRole::READER,
1640 },
1641 })
1642 .await
1643 .unwrap();
1644
1645 let found = client
1646 .get_default_object_access_control(&GetDefaultObjectAccessControlRequest {
1647 bucket: bucket_name.to_string(),
1648 entity: entity.to_string(),
1649 })
1650 .await
1651 .unwrap();
1652 assert_eq!(found.entity, entity);
1653 assert_eq!(found.role, ObjectACLRole::READER);
1654
1655 let acls = client
1656 .list_default_object_access_controls(&ListDefaultObjectAccessControlsRequest {
1657 bucket: bucket_name.to_string(),
1658 ..Default::default()
1659 })
1660 .await
1661 .unwrap();
1662 assert!(acls.items.is_some());
1663 assert_eq!(4, acls.items.unwrap().len());
1664
1665 client
1666 .delete_default_object_access_control(&DeleteDefaultObjectAccessControlRequest {
1667 bucket: bucket_name.to_string(),
1668 entity: entity.to_string(),
1669 })
1670 .await
1671 .unwrap();
1672 }
1673
1674 #[tokio::test]
1675 #[serial]
1676 pub async fn crud_bucket_access_controls() {
1677 let (client, project, email) = client().await;
1678 let bucket_name = bucket_name(&project, "bucket_acl");
1679
1680 let entity = format!("user-{email}");
1681 client
1682 .insert_bucket_access_control(&InsertBucketAccessControlRequest {
1683 bucket: bucket_name.to_string(),
1684 acl: BucketAccessControlCreationConfig {
1685 entity: entity.to_string(),
1686 role: BucketACLRole::READER,
1687 },
1688 })
1689 .await
1690 .unwrap();
1691
1692 let found = client
1693 .get_bucket_access_control(&GetBucketAccessControlRequest {
1694 bucket: bucket_name.to_string(),
1695 entity: entity.to_string(),
1696 })
1697 .await
1698 .unwrap();
1699 assert_eq!(found.entity, entity);
1700 assert_eq!(found.role, BucketACLRole::READER);
1701
1702 let acls = client
1703 .list_bucket_access_controls(&ListBucketAccessControlsRequest {
1704 bucket: bucket_name.to_string(),
1705 })
1706 .await
1707 .unwrap();
1708 assert_eq!(4, acls.items.len());
1709
1710 client
1711 .delete_bucket_access_control(&DeleteBucketAccessControlRequest {
1712 bucket: bucket_name.to_string(),
1713 entity: entity.to_string(),
1714 })
1715 .await
1716 .unwrap();
1717 }
1718
1719 #[tokio::test]
1720 #[serial]
1721 pub async fn crud_object_access_controls() {
1722 let (client, project, email) = client().await;
1723 let bucket_name = bucket_name(&project, "object_acl");
1724 let object_name = "test.txt";
1725
1726 let entity = format!("user-{email}");
1727
1728 client
1729 .insert_object_access_control(&InsertObjectAccessControlRequest {
1730 bucket: bucket_name.to_string(),
1731 object: object_name.to_string(),
1732 generation: None,
1733 acl: ObjectAccessControlCreationConfig {
1734 entity: entity.to_string(),
1735 role: ObjectACLRole::READER,
1736 },
1737 })
1738 .await
1739 .unwrap();
1740
1741 let found = client
1742 .get_object_access_control(&GetObjectAccessControlRequest {
1743 bucket: bucket_name.to_string(),
1744 entity: entity.to_string(),
1745 object: object_name.to_string(),
1746 generation: None,
1747 })
1748 .await
1749 .unwrap();
1750 assert_eq!(found.entity, entity);
1751 assert_eq!(found.role, ObjectACLRole::READER);
1752
1753 let acls = client
1754 .list_object_access_controls(&ListObjectAccessControlsRequest {
1755 bucket: bucket_name.to_string(),
1756 object: object_name.to_string(),
1757 generation: None,
1758 })
1759 .await
1760 .unwrap();
1761 assert_eq!(5, acls.items.len());
1762
1763 client
1764 .delete_object_access_control(&DeleteObjectAccessControlRequest {
1765 bucket: bucket_name.to_string(),
1766 object: object_name.to_string(),
1767 entity: entity.to_string(),
1768 generation: None,
1769 })
1770 .await
1771 .unwrap();
1772 }
1773
1774 #[tokio::test]
1775 #[serial]
1776 pub async fn crud_notification() {
1777 let (client, project, _) = client().await;
1778 let bucket_name = bucket_name(&project, "notification");
1779
1780 let notifications = client
1781 .list_notifications(&ListNotificationsRequest {
1782 bucket: bucket_name.to_string(),
1783 })
1784 .await
1785 .unwrap();
1786
1787 for n in notifications.items.unwrap_or_default() {
1788 client
1789 .delete_notification(&DeleteNotificationRequest {
1790 bucket: bucket_name.to_string(),
1791 notification: n.id.to_string(),
1792 })
1793 .await
1794 .unwrap();
1795 }
1796
1797 let post = client
1798 .insert_notification(&InsertNotificationRequest {
1799 bucket: bucket_name.to_string(),
1800 notification: NotificationCreationConfig {
1801 topic: format!("projects/{project}/topics/{bucket_name}"),
1802 event_types: Some(vec![EventType::ObjectMetadataUpdate, EventType::ObjectDelete]),
1803 object_name_prefix: Some("notification-test".to_string()),
1804 ..Default::default()
1805 },
1806 })
1807 .await
1808 .unwrap();
1809
1810 let found = client
1811 .get_notification(&GetNotificationRequest {
1812 bucket: bucket_name.to_string(),
1813 notification: post.id.to_string(),
1814 })
1815 .await
1816 .unwrap();
1817 assert_eq!(found.id, post.id);
1818 assert_eq!(found.event_types.unwrap().len(), 2);
1819 }
1820
1821 #[tokio::test]
1822 #[serial]
1823 pub async fn crud_hmac_key() {
1824 let (client, project_id, email) = client().await;
1825
1826 let post = client
1827 .create_hmac_key(&CreateHmacKeyRequest {
1828 project_id: project_id.clone(),
1829 service_account_email: email,
1830 })
1831 .await
1832 .unwrap();
1833
1834 let found = client
1835 .get_hmac_key(&GetHmacKeyRequest {
1836 access_id: post.metadata.access_id.to_string(),
1837 project_id: project_id.clone(),
1838 })
1839 .await
1840 .unwrap();
1841 assert_eq!(found.id, post.metadata.id);
1842 assert_eq!(found.state, "ACTIVE");
1843
1844 let keys = client
1845 .list_hmac_keys(&ListHmacKeysRequest {
1846 project_id: project_id.clone(),
1847 ..Default::default()
1848 })
1849 .await
1850 .unwrap();
1851
1852 for n in keys.items.unwrap_or_default() {
1853 let result = client
1854 .update_hmac_key(&UpdateHmacKeyRequest {
1855 access_id: n.access_id.to_string(),
1856 project_id: n.project_id.to_string(),
1857 metadata: HmacKeyMetadata {
1858 state: "INACTIVE".to_string(),
1859 ..n.clone()
1860 },
1861 })
1862 .await
1863 .unwrap();
1864 assert_eq!(result.state, "INACTIVE");
1865
1866 client
1867 .delete_hmac_key(&DeleteHmacKeyRequest {
1868 access_id: n.access_id.to_string(),
1869 project_id: n.project_id.to_string(),
1870 })
1871 .await
1872 .unwrap();
1873 }
1874 }
1875
1876 #[tokio::test]
1877 #[serial]
1878 pub async fn crud_object_with_metadata() {
1879 let (client, project, _) = client().await;
1880 let bucket_name = bucket_name(&project, "object");
1881 let mut metadata = HashMap::<String, String>::new();
1882 metadata.insert("key1".to_string(), "value1".to_string());
1883 let uploaded = client
1884 .upload_object(
1885 &UploadObjectRequest {
1886 bucket: bucket_name.to_string(),
1887 ..Default::default()
1888 },
1889 vec![1, 2, 3, 4, 5, 6, 7],
1890 &UploadType::Multipart(Box::new(Object {
1891 name: "test1_meta".to_string(),
1892 content_type: Some("text/plain".to_string()),
1893 content_language: Some("ja".to_string()),
1894 metadata: Some(metadata),
1895 ..Default::default()
1896 })),
1897 )
1898 .await
1899 .unwrap();
1900 assert_eq!(uploaded.content_type.unwrap(), "text/plain".to_string());
1901 assert_eq!(uploaded.content_language.unwrap(), "ja".to_string());
1902 assert_eq!(uploaded.metadata.unwrap().get("key1").unwrap().clone(), "value1".to_string());
1903
1904 let download = |range: Range| {
1905 let client = client.clone();
1906 let bucket_name = uploaded.bucket.clone();
1907 let object_name = uploaded.name.clone();
1908 async move {
1909 client
1910 .download_object(
1911 &GetObjectRequest {
1912 bucket: bucket_name,
1913 object: object_name,
1914 ..Default::default()
1915 },
1916 &range,
1917 )
1918 .await
1919 .unwrap()
1920 }
1921 };
1922
1923 let object = client
1924 .get_object(&GetObjectRequest {
1925 bucket: uploaded.bucket.clone(),
1926 object: uploaded.name.clone(),
1927 ..Default::default()
1928 })
1929 .await
1930 .unwrap();
1931
1932 assert_eq!(object.content_type.unwrap(), "text/plain".to_string());
1933 assert_eq!(object.content_language.unwrap(), "ja".to_string());
1934 assert_eq!(object.metadata.unwrap().get("key1").unwrap().clone(), "value1".to_string());
1935
1936 let downloaded = download(Range::default()).await;
1937 assert_eq!(downloaded, vec![1, 2, 3, 4, 5, 6, 7]);
1938 }
1939
1940 #[tokio::test]
1941 #[serial]
1942 pub async fn crud_object() {
1943 let (client, project, _) = client().await;
1944 let bucket_name = bucket_name(&project, "object");
1945
1946 let objects = client
1947 .list_objects(&ListObjectsRequest {
1948 bucket: bucket_name.to_string(),
1949 ..Default::default()
1950 })
1951 .await
1952 .unwrap()
1953 .items
1954 .unwrap_or_default();
1955 for o in objects {
1956 client
1957 .delete_object(&DeleteObjectRequest {
1958 bucket: o.bucket.to_string(),
1959 object: o.name.to_string(),
1960 ..Default::default()
1961 })
1962 .await
1963 .unwrap();
1964 }
1965
1966 let mut media = Media::new("test1");
1967 media.content_type = "text/plain".into();
1968 let uploaded = client
1969 .upload_object(
1970 &UploadObjectRequest {
1971 bucket: bucket_name.to_string(),
1972 ..Default::default()
1973 },
1974 vec![1, 2, 3, 4, 5, 6],
1975 &UploadType::Simple(media.clone()),
1976 )
1977 .await
1978 .unwrap();
1979
1980 assert_eq!(uploaded.content_type.unwrap(), "text/plain".to_string());
1981
1982 let media = Media::new("test1_zero");
1983 let uploaded_empty = client
1984 .upload_object(
1985 &UploadObjectRequest {
1986 bucket: bucket_name.to_string(),
1987 ..Default::default()
1988 },
1989 vec![],
1990 &UploadType::Simple(media),
1991 )
1992 .await
1993 .unwrap();
1994
1995 let download = |name: &str, range: Range| {
1996 let client = client.clone();
1997 let bucket_name = uploaded.bucket.clone();
1998 let object_name = name.to_string();
1999 async move {
2000 client
2001 .download_object(
2002 &GetObjectRequest {
2003 bucket: bucket_name,
2004 object: object_name,
2005 ..Default::default()
2006 },
2007 &range,
2008 )
2009 .await
2010 .unwrap()
2011 }
2012 };
2013
2014 let downloaded = download(&uploaded.name, Range::default()).await;
2015 assert_eq!(downloaded, vec![1, 2, 3, 4, 5, 6]);
2016 let downloaded = download(&uploaded.name, Range(Some(1), None)).await;
2017 assert_eq!(downloaded, vec![2, 3, 4, 5, 6]);
2018 let downloaded = download(&uploaded.name, Range(Some(1), Some(2))).await;
2019 assert_eq!(downloaded, vec![2, 3]);
2020 let downloaded = download(&uploaded.name, Range(None, Some(2))).await;
2021 assert_eq!(downloaded, vec![5, 6]);
2022
2023 let downloaded = download(&uploaded_empty.name, Range::default()).await;
2024 assert!(downloaded.is_empty());
2025
2026 let _copied = client
2027 .copy_object(&CopyObjectRequest {
2028 destination_bucket: bucket_name.to_string(),
2029 destination_object: format!("{}_copy", uploaded.name),
2030 source_bucket: bucket_name.to_string(),
2031 source_object: uploaded.name.to_string(),
2032 ..Default::default()
2033 })
2034 .await
2035 .unwrap();
2036
2037 let _rewrited = client
2038 .rewrite_object(&RewriteObjectRequest {
2039 destination_bucket: bucket_name.to_string(),
2040 destination_object: format!("{}_rewrite", uploaded.name),
2041 source_bucket: bucket_name.to_string(),
2042 source_object: uploaded.name.to_string(),
2043 ..Default::default()
2044 })
2045 .await
2046 .unwrap();
2047
2048 let _composed = client
2049 .compose_object(&ComposeObjectRequest {
2050 bucket: bucket_name.to_string(),
2051 destination_object: format!("{}_composed", uploaded.name),
2052 destination_predefined_acl: None,
2053 composing_targets: ComposingTargets {
2054 destination: Some(Object {
2055 content_type: Some("image/jpeg".to_string()),
2056 ..Default::default()
2057 }),
2058 source_objects: vec![SourceObjects {
2059 name: format!("{}_rewrite", uploaded.name),
2060 ..Default::default()
2061 }],
2062 },
2063 ..Default::default()
2064 })
2065 .await
2066 .unwrap();
2067 }
2068
2069 #[tokio::test]
2070 #[serial]
2071 pub async fn streamed_object() {
2072 let (client, project, _) = client().await;
2073 let bucket_name = bucket_name(&project, "object");
2074 let file_name = format!("stream_{}", time::OffsetDateTime::now_utc().unix_timestamp());
2075
2076 let source = vec!["hello", " ", "world"];
2078 let chunks: Vec<Result<_, ::std::io::Error>> = source.clone().into_iter().map(Ok).collect();
2079 let stream = futures_util::stream::iter(chunks);
2080 let media = Media::new(file_name);
2081 let upload_type = UploadType::Simple(media);
2082 let uploaded = client
2083 .upload_streamed_object(
2084 &UploadObjectRequest {
2085 bucket: bucket_name.to_string(),
2086 predefined_acl: None,
2087 ..Default::default()
2088 },
2089 stream,
2090 &upload_type,
2091 )
2092 .await
2093 .unwrap();
2094
2095 let file_name = format!("stream_empty_{}", time::OffsetDateTime::now_utc().unix_timestamp());
2096 let source: Vec<&str> = vec![];
2097 let chunks: Vec<Result<_, ::std::io::Error>> = source.clone().into_iter().map(Ok).collect();
2098 let stream = futures_util::stream::iter(chunks);
2099 let media = Media::new(file_name);
2100 let upload_type = UploadType::Simple(media);
2101 let uploaded_empty = client
2102 .upload_streamed_object(
2103 &UploadObjectRequest {
2104 bucket: bucket_name.to_string(),
2105 predefined_acl: None,
2106 ..Default::default()
2107 },
2108 stream,
2109 &upload_type,
2110 )
2111 .await
2112 .unwrap();
2113
2114 let download = |name: &str, range: Range| {
2115 let client = client.clone();
2116 let bucket_name = uploaded.bucket.clone();
2117 let object_name = name.to_string();
2118 async move {
2119 let mut downloaded = client
2120 .download_streamed_object(
2121 &GetObjectRequest {
2122 bucket: bucket_name,
2123 object: object_name,
2124 ..Default::default()
2125 },
2126 &range,
2127 )
2128 .await
2129 .unwrap();
2130 let mut data = Vec::with_capacity(10);
2131 while let Some(v) = downloaded.next().await {
2132 let d: bytes::Bytes = v.unwrap();
2133 data.extend_from_slice(d.chunk());
2134 }
2135 data
2136 }
2137 };
2138 let downloaded = download(&uploaded.name, Range::default()).await;
2139 assert_eq!("hello world", String::from_utf8_lossy(downloaded.as_slice()));
2140 let downloaded = download(&uploaded.name, Range(Some(1), None)).await;
2141 assert_eq!("ello world", String::from_utf8_lossy(downloaded.as_slice()));
2142 let downloaded = download(&uploaded.name, Range(Some(1), Some(2))).await;
2143 assert_eq!("el", String::from_utf8_lossy(downloaded.as_slice()));
2144 let downloaded = download(&uploaded.name, Range(None, Some(2))).await;
2145 assert_eq!("ld", String::from_utf8_lossy(downloaded.as_slice()));
2146
2147 let downloaded = download(&uploaded_empty.name, Range::default()).await;
2148 assert!(downloaded.is_empty());
2149 }
2150
2151 #[tokio::test]
2152 #[serial]
2153 pub async fn resumable_simple_upload() {
2154 let (client, project, _) = client().await;
2155 let bucket_name = bucket_name(&project, "object");
2156 let file_name = format!("resumable_{}", time::OffsetDateTime::now_utc().unix_timestamp());
2157
2158 let mut media = Media::new(file_name.clone());
2159 media.content_type = "text/plain".into();
2160 let upload_type = UploadType::Simple(media);
2161 let uploader = client
2162 .prepare_resumable_upload(
2163 &UploadObjectRequest {
2164 bucket: bucket_name.to_string(),
2165 ..Default::default()
2166 },
2167 &upload_type,
2168 )
2169 .await
2170 .unwrap();
2171 let data = vec![1, 2, 3, 4, 5];
2172 uploader.upload_single_chunk(data.clone(), 5).await.unwrap();
2173
2174 let get_request = &GetObjectRequest {
2175 bucket: bucket_name.to_string(),
2176 object: file_name.to_string(),
2177 ..Default::default()
2178 };
2179 let download = client.download_object(get_request, &Range::default()).await.unwrap();
2180 assert_eq!(data, download);
2181
2182 let object = client.get_object(get_request).await.unwrap();
2183 assert_eq!(object.content_type.unwrap(), "text/plain");
2184 }
2185
2186 #[tokio::test]
2187 #[serial]
2188 pub async fn resumable_multiple_chunk_upload() {
2189 let (client, project, _) = client().await;
2190 let bucket_name = bucket_name(&project, "object");
2191 let file_name = format!("resumable_multiple_chunk{}", time::OffsetDateTime::now_utc().unix_timestamp());
2192
2193 let metadata = Object {
2194 name: file_name.to_string(),
2195 content_type: Some("video/mp4".to_string()),
2196 ..Default::default()
2197 };
2198 let upload_type = UploadType::Multipart(Box::new(metadata));
2199 let uploader = client
2200 .prepare_resumable_upload(
2201 &UploadObjectRequest {
2202 bucket: bucket_name.to_string(),
2203 ..Default::default()
2204 },
2205 &upload_type,
2206 )
2207 .await
2208 .unwrap();
2209 let mut chunk1_data: Vec<u8> = (0..256 * 1024).map(|i| (i % 256) as u8).collect();
2210 let chunk2_data: Vec<u8> = (1..256 * 1024 + 50).map(|i| (i % 256) as u8).collect();
2211 let total_size = Some(chunk1_data.len() as u64 + chunk2_data.len() as u64);
2212
2213 tracing::info!("start upload chunk {}", uploader.url());
2214 let chunk1 = ChunkSize::new(0, chunk1_data.len() as u64 - 1, total_size);
2215 tracing::info!("upload chunk1 {:?}", chunk1);
2216 let status1 = uploader
2217 .upload_multiple_chunk(chunk1_data.clone(), &chunk1)
2218 .await
2219 .unwrap();
2220
2221 assert_eq!(
2222 status1,
2223 UploadStatus::ResumeIncomplete(UploadedRange {
2224 first_byte: 0,
2225 last_byte: chunk1_data.len() as u64 - 1,
2226 })
2227 );
2228
2229 tracing::info!("check status chunk1");
2230 let status_check = uploader.status(total_size).await.unwrap();
2231 assert_eq!(
2232 status_check,
2233 UploadStatus::ResumeIncomplete(UploadedRange {
2234 first_byte: 0,
2235 last_byte: chunk1_data.len() as u64 - 1,
2236 })
2237 );
2238
2239 let chunk2 = ChunkSize::new(
2240 chunk1_data.len() as u64,
2241 chunk1_data.len() as u64 + chunk2_data.len() as u64 - 1,
2242 total_size,
2243 );
2244 tracing::info!("upload chunk2 {:?}", chunk2);
2245 let status2 = uploader
2246 .upload_multiple_chunk(chunk2_data.clone(), &chunk2)
2247 .await
2248 .unwrap();
2249 assert!(matches!(status2, UploadStatus::Ok(_)));
2250
2251 tracing::info!("check status chunk2");
2252 let status_check2 = uploader.status(total_size).await.unwrap();
2253 assert!(matches!(status_check2, UploadStatus::Ok(_)));
2254
2255 let get_request = &GetObjectRequest {
2256 bucket: bucket_name.to_string(),
2257 object: file_name.to_string(),
2258 ..Default::default()
2259 };
2260
2261 let object = client.get_object(get_request).await.unwrap();
2262 assert_eq!(object.content_type.unwrap(), "video/mp4");
2263
2264 let download = client.download_object(get_request, &Range::default()).await.unwrap();
2265 chunk1_data.extend(chunk2_data);
2266 assert_eq!(chunk1_data, download);
2267 }
2268
2269 #[tokio::test]
2270 #[serial]
2271 pub async fn resumable_upload_cancel() {
2272 let (client, project, _) = client().await;
2273 let bucket_name = bucket_name(&project, "object");
2274 let file_name = format!("resumable_cancel{}", time::OffsetDateTime::now_utc().unix_timestamp());
2275
2276 let metadata = Object {
2277 name: file_name.to_string(),
2278 content_type: Some("video/mp4".to_string()),
2279 ..Default::default()
2280 };
2281 let upload_type = UploadType::Multipart(Box::new(metadata));
2282 let uploader = client
2283 .prepare_resumable_upload(
2284 &UploadObjectRequest {
2285 bucket: bucket_name.to_string(),
2286 ..Default::default()
2287 },
2288 &upload_type,
2289 )
2290 .await
2291 .unwrap();
2292 let cloned = uploader.clone();
2293 uploader.cancel().await.unwrap();
2294
2295 let result = cloned.upload_single_chunk(vec![1], 1).await;
2296 assert!(result.is_err());
2297 }
2298
2299 #[tokio::test]
2300 #[serial]
2301 pub async fn resumable_multiple_chunk_upload_unknown() {
2302 let (client, project, _) = client().await;
2303 let bucket_name = bucket_name(&project, "object");
2304 let file_name = format!(
2305 "resumable_multiple_chunk_unknown{}",
2306 time::OffsetDateTime::now_utc().unix_timestamp()
2307 );
2308
2309 let metadata = Object {
2310 name: file_name.to_string(),
2311 content_type: Some("video/mp4".to_string()),
2312 ..Default::default()
2313 };
2314 let upload_type = UploadType::Multipart(Box::new(metadata));
2315 let uploader = client
2316 .prepare_resumable_upload(
2317 &UploadObjectRequest {
2318 bucket: bucket_name.to_string(),
2319 ..Default::default()
2320 },
2321 &upload_type,
2322 )
2323 .await
2324 .unwrap();
2325 let mut chunk1_data: Vec<u8> = (0..256 * 1024).map(|i| (i % 256) as u8).collect();
2326 let chunk2_data: Vec<u8> = vec![10, 20, 30];
2327 let total_size = None;
2328
2329 tracing::info!("start upload chunk {}", uploader.url());
2330 let chunk1 = ChunkSize::new(0, chunk1_data.len() as u64 - 1, total_size);
2331 tracing::info!("upload chunk1 {:?}", chunk1);
2332 let status1 = uploader
2333 .upload_multiple_chunk(chunk1_data.clone(), &chunk1)
2334 .await
2335 .unwrap();
2336
2337 assert_eq!(
2338 status1,
2339 UploadStatus::ResumeIncomplete(UploadedRange {
2340 first_byte: 0,
2341 last_byte: chunk1_data.len() as u64 - 1,
2342 })
2343 );
2344
2345 tracing::info!("upload chunk1 resume {:?}", chunk1);
2346 let status1 = uploader
2347 .upload_multiple_chunk(chunk1_data.clone(), &chunk1)
2348 .await
2349 .unwrap();
2350
2351 assert_eq!(
2352 status1,
2353 UploadStatus::ResumeIncomplete(UploadedRange {
2354 first_byte: 0,
2355 last_byte: chunk1_data.len() as u64 - 1,
2356 })
2357 );
2358
2359 let remaining = chunk1_data.len() as u64 + chunk2_data.len() as u64;
2361 let chunk2 = ChunkSize::new(chunk1_data.len() as u64, remaining - 1, Some(remaining));
2362 tracing::info!("upload chunk2 {:?}", chunk2);
2363 let status2 = uploader
2364 .upload_multiple_chunk(chunk2_data.clone(), &chunk2)
2365 .await
2366 .unwrap();
2367 assert!(matches!(status2, UploadStatus::Ok(_)));
2368
2369 let get_request = &GetObjectRequest {
2370 bucket: bucket_name.to_string(),
2371 object: file_name.to_string(),
2372 ..Default::default()
2373 };
2374
2375 let object = client.get_object(get_request).await.unwrap();
2376 assert_eq!(object.content_type.unwrap(), "video/mp4");
2377
2378 let download = client.download_object(get_request, &Range::default()).await.unwrap();
2379 chunk1_data.extend(chunk2_data);
2380 assert_eq!(chunk1_data, download);
2381 }
2382}