1use std::sync::Arc;
38use std::time::Duration;
39
40use crate::client::CredentialProvider;
41use crate::gcp::credential::GCSAuthorizer;
42use crate::signer::Signer;
43use crate::{
44 multipart::PartId, path::Path, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload,
45 ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
46 UploadPart,
47};
48use async_trait::async_trait;
49use client::GoogleCloudStorageClient;
50use futures::stream::BoxStream;
51use http::Method;
52use url::Url;
53
54use crate::client::get::GetClientExt;
55use crate::client::list::{ListClient, ListClientExt};
56use crate::client::parts::Parts;
57use crate::list::{PaginatedListOptions, PaginatedListResult, PaginatedListStore};
58use crate::multipart::MultipartStore;
59pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey};
60pub use credential::{GcpCredential, GcpSigningCredential, ServiceAccountKey};
61
62mod builder;
63mod client;
64mod credential;
65
66const STORE: &str = "GCS";
67
68pub type GcpCredentialProvider = Arc<dyn CredentialProvider<Credential = GcpCredential>>;
70
71pub type GcpSigningCredentialProvider =
73 Arc<dyn CredentialProvider<Credential = GcpSigningCredential>>;
74
75#[derive(Debug, Clone)]
77pub struct GoogleCloudStorage {
78 client: Arc<GoogleCloudStorageClient>,
79}
80
81impl std::fmt::Display for GoogleCloudStorage {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 write!(
84 f,
85 "GoogleCloudStorage({})",
86 self.client.config().bucket_name
87 )
88 }
89}
90
91impl GoogleCloudStorage {
92 pub fn credentials(&self) -> &GcpCredentialProvider {
94 &self.client.config().credentials
95 }
96
97 pub fn signing_credentials(&self) -> &GcpSigningCredentialProvider {
99 &self.client.config().signing_credentials
100 }
101}
102
103#[derive(Debug)]
104struct GCSMultipartUpload {
105 state: Arc<UploadState>,
106 part_idx: usize,
107}
108
109#[derive(Debug)]
110struct UploadState {
111 client: Arc<GoogleCloudStorageClient>,
112 path: Path,
113 multipart_id: MultipartId,
114 parts: Parts,
115}
116
117#[async_trait]
118impl MultipartUpload for GCSMultipartUpload {
119 fn put_part(&mut self, payload: PutPayload) -> UploadPart {
120 let idx = self.part_idx;
121 self.part_idx += 1;
122 let state = Arc::clone(&self.state);
123 Box::pin(async move {
124 let part = state
125 .client
126 .put_part(&state.path, &state.multipart_id, idx, payload)
127 .await?;
128 state.parts.put(idx, part);
129 Ok(())
130 })
131 }
132
133 async fn complete(&mut self) -> Result<PutResult> {
134 let parts = self.state.parts.finish(self.part_idx)?;
135
136 self.state
137 .client
138 .multipart_complete(&self.state.path, &self.state.multipart_id, parts)
139 .await
140 }
141
142 async fn abort(&mut self) -> Result<()> {
143 self.state
144 .client
145 .multipart_cleanup(&self.state.path, &self.state.multipart_id)
146 .await
147 }
148}
149
150#[async_trait]
151impl ObjectStore for GoogleCloudStorage {
152 async fn put_opts(
153 &self,
154 location: &Path,
155 payload: PutPayload,
156 opts: PutOptions,
157 ) -> Result<PutResult> {
158 self.client.put(location, payload, opts).await
159 }
160
161 async fn put_multipart_opts(
162 &self,
163 location: &Path,
164 opts: PutMultipartOptions,
165 ) -> Result<Box<dyn MultipartUpload>> {
166 let upload_id = self.client.multipart_initiate(location, opts).await?;
167
168 Ok(Box::new(GCSMultipartUpload {
169 part_idx: 0,
170 state: Arc::new(UploadState {
171 client: Arc::clone(&self.client),
172 path: location.clone(),
173 multipart_id: upload_id.clone(),
174 parts: Default::default(),
175 }),
176 }))
177 }
178
179 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
180 self.client.get_opts(location, options).await
181 }
182
183 async fn delete(&self, location: &Path) -> Result<()> {
184 self.client.delete_request(location).await
185 }
186
187 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
188 self.client.list(prefix)
189 }
190
191 fn list_with_offset(
192 &self,
193 prefix: Option<&Path>,
194 offset: &Path,
195 ) -> BoxStream<'static, Result<ObjectMeta>> {
196 self.client.list_with_offset(prefix, offset)
197 }
198
199 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
200 self.client.list_with_delimiter(prefix).await
201 }
202
203 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
204 self.client.copy_request(from, to, false).await
205 }
206
207 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
208 self.client.copy_request(from, to, true).await
209 }
210}
211
212#[async_trait]
213impl MultipartStore for GoogleCloudStorage {
214 async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
215 self.client
216 .multipart_initiate(path, PutMultipartOptions::default())
217 .await
218 }
219
220 async fn put_part(
221 &self,
222 path: &Path,
223 id: &MultipartId,
224 part_idx: usize,
225 payload: PutPayload,
226 ) -> Result<PartId> {
227 self.client.put_part(path, id, part_idx, payload).await
228 }
229
230 async fn complete_multipart(
231 &self,
232 path: &Path,
233 id: &MultipartId,
234 parts: Vec<PartId>,
235 ) -> Result<PutResult> {
236 self.client.multipart_complete(path, id, parts).await
237 }
238
239 async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
240 self.client.multipart_cleanup(path, id).await
241 }
242}
243
244#[async_trait]
245impl Signer for GoogleCloudStorage {
246 async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result<Url> {
247 if expires_in.as_secs() > 604800 {
248 return Err(crate::Error::Generic {
249 store: STORE,
250 source: "Expiration Time can't be longer than 604800 seconds (7 days).".into(),
251 });
252 }
253
254 let config = self.client.config();
255 let path_url = config.path_url(path);
256 let mut url = Url::parse(&path_url).map_err(|e| crate::Error::Generic {
257 store: STORE,
258 source: format!("Unable to parse url {path_url}: {e}").into(),
259 })?;
260
261 let signing_credentials = self.signing_credentials().get_credential().await?;
262 let authorizer = GCSAuthorizer::new(signing_credentials);
263
264 authorizer
265 .sign(method, &mut url, expires_in, &self.client)
266 .await?;
267
268 Ok(url)
269 }
270}
271
272#[async_trait]
273impl PaginatedListStore for GoogleCloudStorage {
274 async fn list_paginated(
275 &self,
276 prefix: Option<&str>,
277 opts: PaginatedListOptions,
278 ) -> Result<PaginatedListResult> {
279 self.client.list_request(prefix, opts).await
280 }
281}
282
283#[cfg(test)]
284mod test {
285
286 use credential::DEFAULT_GCS_BASE_URL;
287
288 use crate::integration::*;
289 use crate::tests::*;
290
291 use super::*;
292
293 const NON_EXISTENT_NAME: &str = "nonexistentname";
294
295 #[tokio::test]
296 async fn gcs_test() {
297 maybe_skip_integration!();
298 let integration = GoogleCloudStorageBuilder::from_env().build().unwrap();
299
300 put_get_delete_list(&integration).await;
301 list_uses_directories_correctly(&integration).await;
302 list_with_delimiter(&integration).await;
303 rename_and_copy(&integration).await;
304 if integration.client.config().base_url == DEFAULT_GCS_BASE_URL {
305 copy_if_not_exists(&integration).await;
308 stream_get(&integration).await;
311 multipart(&integration, &integration).await;
312 multipart_race_condition(&integration, true).await;
313 multipart_out_of_order(&integration).await;
314 list_paginated(&integration, &integration).await;
315 get_opts(&integration).await;
317 put_opts(&integration, true).await;
318 put_get_attributes(&integration).await;
320 }
321 }
322
323 #[tokio::test]
324 #[ignore]
325 async fn gcs_test_sign() {
326 maybe_skip_integration!();
327 let integration = GoogleCloudStorageBuilder::from_env().build().unwrap();
328
329 let client = reqwest::Client::new();
330
331 let path = Path::from("test_sign");
332 let url = integration
333 .signed_url(Method::PUT, &path, Duration::from_secs(3600))
334 .await
335 .unwrap();
336 println!("PUT {url}");
337
338 let resp = client.put(url).body("data").send().await.unwrap();
339 resp.error_for_status().unwrap();
340
341 let url = integration
342 .signed_url(Method::GET, &path, Duration::from_secs(3600))
343 .await
344 .unwrap();
345 println!("GET {url}");
346
347 let resp = client.get(url).send().await.unwrap();
348 let resp = resp.error_for_status().unwrap();
349 let data = resp.bytes().await.unwrap();
350 assert_eq!(data.as_ref(), b"data");
351 }
352
353 #[tokio::test]
354 async fn gcs_test_get_nonexistent_location() {
355 maybe_skip_integration!();
356 let integration = GoogleCloudStorageBuilder::from_env().build().unwrap();
357
358 let location = Path::from_iter([NON_EXISTENT_NAME]);
359
360 let err = integration.get(&location).await.unwrap_err();
361
362 assert!(
363 matches!(err, crate::Error::NotFound { .. }),
364 "unexpected error type: {err}"
365 );
366 }
367
368 #[tokio::test]
369 async fn gcs_test_get_nonexistent_bucket() {
370 maybe_skip_integration!();
371 let config = GoogleCloudStorageBuilder::from_env();
372 let integration = config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
373
374 let location = Path::from_iter([NON_EXISTENT_NAME]);
375
376 let err = get_nonexistent_object(&integration, Some(location))
377 .await
378 .unwrap_err();
379
380 assert!(
381 matches!(err, crate::Error::NotFound { .. }),
382 "unexpected error type: {err}"
383 );
384 }
385
386 #[tokio::test]
387 async fn gcs_test_delete_nonexistent_location() {
388 maybe_skip_integration!();
389 let integration = GoogleCloudStorageBuilder::from_env().build().unwrap();
390
391 let location = Path::from_iter([NON_EXISTENT_NAME]);
392
393 let err = integration.delete(&location).await.unwrap_err();
394 assert!(
395 matches!(err, crate::Error::NotFound { .. }),
396 "unexpected error type: {err}"
397 );
398 }
399
400 #[tokio::test]
401 async fn gcs_test_delete_nonexistent_bucket() {
402 maybe_skip_integration!();
403 let config = GoogleCloudStorageBuilder::from_env();
404 let integration = config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
405
406 let location = Path::from_iter([NON_EXISTENT_NAME]);
407
408 let err = integration.delete(&location).await.unwrap_err();
409 assert!(
410 matches!(err, crate::Error::NotFound { .. }),
411 "unexpected error type: {err}"
412 );
413 }
414
415 #[tokio::test]
416 async fn gcs_test_put_nonexistent_bucket() {
417 maybe_skip_integration!();
418 let config = GoogleCloudStorageBuilder::from_env();
419 let integration = config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
420
421 let location = Path::from_iter([NON_EXISTENT_NAME]);
422 let data = PutPayload::from("arbitrary data");
423
424 let err = integration
425 .put(&location, data)
426 .await
427 .unwrap_err()
428 .to_string();
429 assert!(
430 err.contains("Server returned non-2xx status code: 404 Not Found"),
431 "{}",
432 err
433 )
434 }
435}