object_store/gcp/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! An object store implementation for Google Cloud Storage
19//!
20//! ## Multipart uploads
21//!
22//! [Multipart uploads](https://cloud.google.com/storage/docs/multipart-uploads)
23//! can be initiated with the [ObjectStore::put_multipart] method. If neither
24//! [`MultipartUpload::complete`] nor [`MultipartUpload::abort`] is invoked, you may
25//! have parts uploaded to GCS but not used, that you will be charged for. It is recommended
26//! you configure a [lifecycle rule] to abort incomplete multipart uploads after a certain
27//! period of time to avoid being charged for storing partial uploads.
28//!
29//! ## Using HTTP/2
30//!
31//! Google Cloud Storage supports both HTTP/2 and HTTP/1. HTTP/1 is used by default
32//! because it allows much higher throughput in our benchmarks (see
33//! [#5194](https://github.com/apache/arrow-rs/issues/5194)). HTTP/2 can be
34//! enabled by setting [crate::ClientConfigKey::Http1Only] to false.
35//!
36//! [lifecycle rule]: https://cloud.google.com/storage/docs/lifecycle#abort-mpu
37use std::sync::Arc;
38use std::time::Duration;
39
40use crate::client::CredentialProvider;
41use crate::gcp::credential::GCSAuthorizer;
42use crate::signer::Signer;
43use crate::{
44    multipart::PartId, path::Path, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload,
45    ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
46    UploadPart,
47};
48use async_trait::async_trait;
49use client::GoogleCloudStorageClient;
50use futures::stream::BoxStream;
51use http::Method;
52use url::Url;
53
54use crate::client::get::GetClientExt;
55use crate::client::list::{ListClient, ListClientExt};
56use crate::client::parts::Parts;
57use crate::list::{PaginatedListOptions, PaginatedListResult, PaginatedListStore};
58use crate::multipart::MultipartStore;
59pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey};
60pub use credential::{GcpCredential, GcpSigningCredential, ServiceAccountKey};
61
62mod builder;
63mod client;
64mod credential;
65
66const STORE: &str = "GCS";
67
68/// [`CredentialProvider`] for [`GoogleCloudStorage`]
69pub type GcpCredentialProvider = Arc<dyn CredentialProvider<Credential = GcpCredential>>;
70
71/// [`GcpSigningCredential`] for [`GoogleCloudStorage`]
72pub type GcpSigningCredentialProvider =
73    Arc<dyn CredentialProvider<Credential = GcpSigningCredential>>;
74
75/// Interface for [Google Cloud Storage](https://cloud.google.com/storage/).
76#[derive(Debug, Clone)]
77pub struct GoogleCloudStorage {
78    client: Arc<GoogleCloudStorageClient>,
79}
80
81impl std::fmt::Display for GoogleCloudStorage {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        write!(
84            f,
85            "GoogleCloudStorage({})",
86            self.client.config().bucket_name
87        )
88    }
89}
90
91impl GoogleCloudStorage {
92    /// Returns the [`GcpCredentialProvider`] used by [`GoogleCloudStorage`]
93    pub fn credentials(&self) -> &GcpCredentialProvider {
94        &self.client.config().credentials
95    }
96
97    /// Returns the [`GcpSigningCredentialProvider`] used by [`GoogleCloudStorage`]
98    pub fn signing_credentials(&self) -> &GcpSigningCredentialProvider {
99        &self.client.config().signing_credentials
100    }
101}
102
103#[derive(Debug)]
104struct GCSMultipartUpload {
105    state: Arc<UploadState>,
106    part_idx: usize,
107}
108
109#[derive(Debug)]
110struct UploadState {
111    client: Arc<GoogleCloudStorageClient>,
112    path: Path,
113    multipart_id: MultipartId,
114    parts: Parts,
115}
116
117#[async_trait]
118impl MultipartUpload for GCSMultipartUpload {
119    fn put_part(&mut self, payload: PutPayload) -> UploadPart {
120        let idx = self.part_idx;
121        self.part_idx += 1;
122        let state = Arc::clone(&self.state);
123        Box::pin(async move {
124            let part = state
125                .client
126                .put_part(&state.path, &state.multipart_id, idx, payload)
127                .await?;
128            state.parts.put(idx, part);
129            Ok(())
130        })
131    }
132
133    async fn complete(&mut self) -> Result<PutResult> {
134        let parts = self.state.parts.finish(self.part_idx)?;
135
136        self.state
137            .client
138            .multipart_complete(&self.state.path, &self.state.multipart_id, parts)
139            .await
140    }
141
142    async fn abort(&mut self) -> Result<()> {
143        self.state
144            .client
145            .multipart_cleanup(&self.state.path, &self.state.multipart_id)
146            .await
147    }
148}
149
150#[async_trait]
151impl ObjectStore for GoogleCloudStorage {
152    async fn put_opts(
153        &self,
154        location: &Path,
155        payload: PutPayload,
156        opts: PutOptions,
157    ) -> Result<PutResult> {
158        self.client.put(location, payload, opts).await
159    }
160
161    async fn put_multipart_opts(
162        &self,
163        location: &Path,
164        opts: PutMultipartOptions,
165    ) -> Result<Box<dyn MultipartUpload>> {
166        let upload_id = self.client.multipart_initiate(location, opts).await?;
167
168        Ok(Box::new(GCSMultipartUpload {
169            part_idx: 0,
170            state: Arc::new(UploadState {
171                client: Arc::clone(&self.client),
172                path: location.clone(),
173                multipart_id: upload_id.clone(),
174                parts: Default::default(),
175            }),
176        }))
177    }
178
179    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
180        self.client.get_opts(location, options).await
181    }
182
183    async fn delete(&self, location: &Path) -> Result<()> {
184        self.client.delete_request(location).await
185    }
186
187    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
188        self.client.list(prefix)
189    }
190
191    fn list_with_offset(
192        &self,
193        prefix: Option<&Path>,
194        offset: &Path,
195    ) -> BoxStream<'static, Result<ObjectMeta>> {
196        self.client.list_with_offset(prefix, offset)
197    }
198
199    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
200        self.client.list_with_delimiter(prefix).await
201    }
202
203    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
204        self.client.copy_request(from, to, false).await
205    }
206
207    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
208        self.client.copy_request(from, to, true).await
209    }
210}
211
212#[async_trait]
213impl MultipartStore for GoogleCloudStorage {
214    async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
215        self.client
216            .multipart_initiate(path, PutMultipartOptions::default())
217            .await
218    }
219
220    async fn put_part(
221        &self,
222        path: &Path,
223        id: &MultipartId,
224        part_idx: usize,
225        payload: PutPayload,
226    ) -> Result<PartId> {
227        self.client.put_part(path, id, part_idx, payload).await
228    }
229
230    async fn complete_multipart(
231        &self,
232        path: &Path,
233        id: &MultipartId,
234        parts: Vec<PartId>,
235    ) -> Result<PutResult> {
236        self.client.multipart_complete(path, id, parts).await
237    }
238
239    async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
240        self.client.multipart_cleanup(path, id).await
241    }
242}
243
244#[async_trait]
245impl Signer for GoogleCloudStorage {
246    async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result<Url> {
247        if expires_in.as_secs() > 604800 {
248            return Err(crate::Error::Generic {
249                store: STORE,
250                source: "Expiration Time can't be longer than 604800 seconds (7 days).".into(),
251            });
252        }
253
254        let config = self.client.config();
255        let path_url = config.path_url(path);
256        let mut url = Url::parse(&path_url).map_err(|e| crate::Error::Generic {
257            store: STORE,
258            source: format!("Unable to parse url {path_url}: {e}").into(),
259        })?;
260
261        let signing_credentials = self.signing_credentials().get_credential().await?;
262        let authorizer = GCSAuthorizer::new(signing_credentials);
263
264        authorizer
265            .sign(method, &mut url, expires_in, &self.client)
266            .await?;
267
268        Ok(url)
269    }
270}
271
272#[async_trait]
273impl PaginatedListStore for GoogleCloudStorage {
274    async fn list_paginated(
275        &self,
276        prefix: Option<&str>,
277        opts: PaginatedListOptions,
278    ) -> Result<PaginatedListResult> {
279        self.client.list_request(prefix, opts).await
280    }
281}
282
283#[cfg(test)]
284mod test {
285
286    use credential::DEFAULT_GCS_BASE_URL;
287
288    use crate::integration::*;
289    use crate::tests::*;
290
291    use super::*;
292
293    const NON_EXISTENT_NAME: &str = "nonexistentname";
294
295    #[tokio::test]
296    async fn gcs_test() {
297        maybe_skip_integration!();
298        let integration = GoogleCloudStorageBuilder::from_env().build().unwrap();
299
300        put_get_delete_list(&integration).await;
301        list_uses_directories_correctly(&integration).await;
302        list_with_delimiter(&integration).await;
303        rename_and_copy(&integration).await;
304        if integration.client.config().base_url == DEFAULT_GCS_BASE_URL {
305            // Fake GCS server doesn't currently honor ifGenerationMatch
306            // https://github.com/fsouza/fake-gcs-server/issues/994
307            copy_if_not_exists(&integration).await;
308            // Fake GCS server does not yet implement XML Multipart uploads
309            // https://github.com/fsouza/fake-gcs-server/issues/852
310            stream_get(&integration).await;
311            multipart(&integration, &integration).await;
312            multipart_race_condition(&integration, true).await;
313            multipart_out_of_order(&integration).await;
314            list_paginated(&integration, &integration).await;
315            // Fake GCS server doesn't currently honor preconditions
316            get_opts(&integration).await;
317            put_opts(&integration, true).await;
318            // Fake GCS server doesn't currently support attributes
319            put_get_attributes(&integration).await;
320        }
321    }
322
323    #[tokio::test]
324    #[ignore]
325    async fn gcs_test_sign() {
326        maybe_skip_integration!();
327        let integration = GoogleCloudStorageBuilder::from_env().build().unwrap();
328
329        let client = reqwest::Client::new();
330
331        let path = Path::from("test_sign");
332        let url = integration
333            .signed_url(Method::PUT, &path, Duration::from_secs(3600))
334            .await
335            .unwrap();
336        println!("PUT {url}");
337
338        let resp = client.put(url).body("data").send().await.unwrap();
339        resp.error_for_status().unwrap();
340
341        let url = integration
342            .signed_url(Method::GET, &path, Duration::from_secs(3600))
343            .await
344            .unwrap();
345        println!("GET {url}");
346
347        let resp = client.get(url).send().await.unwrap();
348        let resp = resp.error_for_status().unwrap();
349        let data = resp.bytes().await.unwrap();
350        assert_eq!(data.as_ref(), b"data");
351    }
352
353    #[tokio::test]
354    async fn gcs_test_get_nonexistent_location() {
355        maybe_skip_integration!();
356        let integration = GoogleCloudStorageBuilder::from_env().build().unwrap();
357
358        let location = Path::from_iter([NON_EXISTENT_NAME]);
359
360        let err = integration.get(&location).await.unwrap_err();
361
362        assert!(
363            matches!(err, crate::Error::NotFound { .. }),
364            "unexpected error type: {err}"
365        );
366    }
367
368    #[tokio::test]
369    async fn gcs_test_get_nonexistent_bucket() {
370        maybe_skip_integration!();
371        let config = GoogleCloudStorageBuilder::from_env();
372        let integration = config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
373
374        let location = Path::from_iter([NON_EXISTENT_NAME]);
375
376        let err = get_nonexistent_object(&integration, Some(location))
377            .await
378            .unwrap_err();
379
380        assert!(
381            matches!(err, crate::Error::NotFound { .. }),
382            "unexpected error type: {err}"
383        );
384    }
385
386    #[tokio::test]
387    async fn gcs_test_delete_nonexistent_location() {
388        maybe_skip_integration!();
389        let integration = GoogleCloudStorageBuilder::from_env().build().unwrap();
390
391        let location = Path::from_iter([NON_EXISTENT_NAME]);
392
393        let err = integration.delete(&location).await.unwrap_err();
394        assert!(
395            matches!(err, crate::Error::NotFound { .. }),
396            "unexpected error type: {err}"
397        );
398    }
399
400    #[tokio::test]
401    async fn gcs_test_delete_nonexistent_bucket() {
402        maybe_skip_integration!();
403        let config = GoogleCloudStorageBuilder::from_env();
404        let integration = config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
405
406        let location = Path::from_iter([NON_EXISTENT_NAME]);
407
408        let err = integration.delete(&location).await.unwrap_err();
409        assert!(
410            matches!(err, crate::Error::NotFound { .. }),
411            "unexpected error type: {err}"
412        );
413    }
414
415    #[tokio::test]
416    async fn gcs_test_put_nonexistent_bucket() {
417        maybe_skip_integration!();
418        let config = GoogleCloudStorageBuilder::from_env();
419        let integration = config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
420
421        let location = Path::from_iter([NON_EXISTENT_NAME]);
422        let data = PutPayload::from("arbitrary data");
423
424        let err = integration
425            .put(&location, data)
426            .await
427            .unwrap_err()
428            .to_string();
429        assert!(
430            err.contains("Server returned non-2xx status code: 404 Not Found"),
431            "{}",
432            err
433        )
434    }
435}