lance_io/object_store/providers/
gcp.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
5
6use object_store::{
7    gcp::{GcpCredential, GoogleCloudStorageBuilder, GoogleConfigKey},
8    RetryConfig, StaticCredentialProvider,
9};
10use url::Url;
11
12use crate::object_store::{
13    ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, DEFAULT_CLOUD_BLOCK_SIZE,
14    DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE,
15};
16use lance_core::error::Result;
17
18#[derive(Default, Debug)]
19pub struct GcsStoreProvider;
20
21#[async_trait::async_trait]
22impl ObjectStoreProvider for GcsStoreProvider {
23    async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
24        let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE);
25        let mut storage_options =
26            StorageOptions(params.storage_options.clone().unwrap_or_default());
27        let download_retry_count = storage_options.download_retry_count();
28
29        let max_retries = storage_options.client_max_retries();
30        let retry_timeout = storage_options.client_retry_timeout();
31        let retry_config = RetryConfig {
32            backoff: Default::default(),
33            max_retries,
34            retry_timeout: Duration::from_secs(retry_timeout),
35        };
36
37        storage_options.with_env_gcs();
38        let mut builder = GoogleCloudStorageBuilder::new()
39            .with_url(base_path.as_ref())
40            .with_retry(retry_config);
41        for (key, value) in storage_options.as_gcs_options() {
42            builder = builder.with_config(key, value);
43        }
44        let token_key = "google_storage_token";
45        if let Some(storage_token) = storage_options.get(token_key) {
46            let credential = GcpCredential {
47                bearer: storage_token.to_string(),
48            };
49            let credential_provider = Arc::new(StaticCredentialProvider::new(credential)) as _;
50            builder = builder.with_credentials(credential_provider);
51        }
52        let inner = Arc::new(builder.build()?);
53
54        Ok(ObjectStore {
55            inner,
56            scheme: String::from("gs"),
57            block_size,
58            max_iop_size: *DEFAULT_MAX_IOP_SIZE,
59            use_constant_size_upload_parts: false,
60            list_is_lexically_ordered: true,
61            io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
62            download_retry_count,
63        })
64    }
65}
66
67impl StorageOptions {
68    /// Add values from the environment to storage options
69    pub fn with_env_gcs(&mut self) {
70        for (os_key, os_value) in std::env::vars_os() {
71            if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
72                let lowercase_key = key.to_ascii_lowercase();
73                let token_key = "google_storage_token";
74
75                if let Ok(config_key) = GoogleConfigKey::from_str(&lowercase_key) {
76                    if !self.0.contains_key(config_key.as_ref()) {
77                        self.0
78                            .insert(config_key.as_ref().to_string(), value.to_string());
79                    }
80                }
81                // Check for GOOGLE_STORAGE_TOKEN until GoogleConfigKey supports storage token
82                else if lowercase_key == token_key && !self.0.contains_key(token_key) {
83                    self.0.insert(token_key.to_string(), value.to_string());
84                }
85            }
86        }
87    }
88
89    /// Subset of options relevant for gcs storage
90    pub fn as_gcs_options(&self) -> HashMap<GoogleConfigKey, String> {
91        self.0
92            .iter()
93            .filter_map(|(key, value)| {
94                let gcs_key = GoogleConfigKey::from_str(&key.to_ascii_lowercase()).ok()?;
95                Some((gcs_key, value.clone()))
96            })
97            .collect()
98    }
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104
105    #[test]
106    fn test_gcs_store_path() {
107        let provider = GcsStoreProvider;
108
109        let url = Url::parse("gs://bucket/path/to/file").unwrap();
110        let path = provider.extract_path(&url);
111        let expected_path = object_store::path::Path::from("path/to/file");
112        assert_eq!(path, expected_path);
113    }
114}