Skip to main content

lance_io/object_store/providers/
gcp.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
5
6use object_store::ObjectStore as OSObjectStore;
7use object_store_opendal::OpendalStore;
8use opendal::{Operator, services::Gcs};
9
10use object_store::{
11    RetryConfig, StaticCredentialProvider,
12    gcp::{GcpCredential, GoogleCloudStorageBuilder, GoogleConfigKey},
13};
14use url::Url;
15
16use crate::object_store::{
17    DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, ObjectStore,
18    ObjectStoreParams, ObjectStoreProvider, StorageOptions, StorageOptionsAccessor,
19    dynamic_credentials::build_dynamic_credential_provider,
20    throttle::{AimdThrottleConfig, AimdThrottledStore},
21};
22use lance_core::error::{Error, Result};
23
24#[derive(Default, Debug)]
25pub struct GcsStoreProvider;
26
27impl GcsStoreProvider {
28    async fn build_opendal_gcs_store(
29        &self,
30        base_path: &Url,
31        storage_options: &StorageOptions,
32    ) -> Result<Arc<dyn OSObjectStore>> {
33        let bucket = base_path
34            .host_str()
35            .ok_or_else(|| Error::invalid_input("GCS URL must contain bucket name"))?
36            .to_string();
37
38        let prefix = base_path.path().trim_start_matches('/').to_string();
39
40        // Start with all storage options as the config map
41        // OpenDAL will handle environment variables through its default credentials chain
42        let mut config_map: HashMap<String, String> = storage_options.0.clone();
43
44        // Set required OpenDAL configuration
45        config_map.insert("bucket".to_string(), bucket);
46
47        if !prefix.is_empty() {
48            config_map.insert("root".to_string(), format!("/{}", prefix));
49        }
50
51        let operator = Operator::from_iter::<Gcs>(config_map)
52            .map_err(|e| Error::invalid_input(format!("Failed to create GCS operator: {:?}", e)))?
53            .finish();
54
55        Ok(Arc::new(OpendalStore::new(operator)) as Arc<dyn OSObjectStore>)
56    }
57
58    async fn build_google_cloud_store(
59        &self,
60        base_path: &Url,
61        storage_options: &StorageOptions,
62        accessor: Option<Arc<StorageOptionsAccessor>>,
63    ) -> Result<Arc<dyn OSObjectStore>> {
64        // Use a low retry count since the AIMD throttle layer handles
65        // throttle recovery with its own retry loop.
66        let retry_config = RetryConfig {
67            backoff: Default::default(),
68            max_retries: storage_options.client_max_retries(),
69            retry_timeout: Duration::from_secs(storage_options.client_retry_timeout()),
70        };
71
72        let mut builder = GoogleCloudStorageBuilder::new()
73            .with_url(base_path.as_ref())
74            .with_retry(retry_config)
75            .with_client_options(storage_options.client_options()?);
76        for (key, value) in storage_options.as_gcs_options() {
77            builder = builder.with_config(key, value);
78        }
79
80        if let Some(credentials) =
81            build_dynamic_credential_provider::<GcpCredential>(accessor).await?
82        {
83            builder = builder.with_credentials(credentials);
84        } else if let Some(storage_token) = storage_options.get("google_storage_token") {
85            let credential = GcpCredential {
86                bearer: storage_token.clone(),
87            };
88            let credential_provider = Arc::new(StaticCredentialProvider::new(credential)) as _;
89            builder = builder.with_credentials(credential_provider);
90        }
91
92        Ok(Arc::new(builder.build()?) as Arc<dyn OSObjectStore>)
93    }
94}
95
96#[async_trait::async_trait]
97impl ObjectStoreProvider for GcsStoreProvider {
98    async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
99        let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE);
100        let mut storage_options =
101            StorageOptions::new(params.storage_options().cloned().unwrap_or_default());
102        storage_options.with_env_gcs();
103        let download_retry_count = storage_options.download_retry_count();
104
105        let use_opendal = storage_options
106            .0
107            .get("use_opendal")
108            .map(|v| v.as_str() == "true")
109            .unwrap_or(false);
110
111        let accessor = params.get_accessor();
112
113        let inner = if use_opendal {
114            // OpenDAL GCS intentionally uses static/environment-backed configuration only.
115            // Namespace-vended dynamic credentials are supported on the native object_store path.
116            self.build_opendal_gcs_store(&base_path, &storage_options)
117                .await?
118        } else {
119            self.build_google_cloud_store(&base_path, &storage_options, accessor)
120                .await?
121        };
122        let throttle_config = AimdThrottleConfig::from_storage_options(params.storage_options())?;
123        let inner = if throttle_config.is_disabled() {
124            inner
125        } else if storage_options.client_max_retries() == 0 {
126            log::warn!(
127                "AIMD throttle disabled: the current implementation relies on the object store \
128                 client surfacing retry errors, which requires client_max_retries > 0. \
129                 No throttle or retry layer will be applied."
130            );
131            inner
132        } else {
133            Arc::new(AimdThrottledStore::new(inner, throttle_config)?) as Arc<dyn OSObjectStore>
134        };
135
136        Ok(ObjectStore {
137            inner,
138            scheme: String::from("gs"),
139            block_size,
140            max_iop_size: *DEFAULT_MAX_IOP_SIZE,
141            use_constant_size_upload_parts: false,
142            list_is_lexically_ordered: true,
143            io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
144            download_retry_count,
145            io_tracker: Default::default(),
146            store_prefix: self
147                .calculate_object_store_prefix(&base_path, params.storage_options())?,
148        })
149    }
150}
151
152impl StorageOptions {
153    /// Add values from the environment to storage options
154    pub fn with_env_gcs(&mut self) {
155        for (os_key, os_value) in std::env::vars_os() {
156            if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
157                let lowercase_key = key.to_ascii_lowercase();
158                let token_key = "google_storage_token";
159
160                if let Ok(config_key) = GoogleConfigKey::from_str(&lowercase_key) {
161                    if !self.0.contains_key(config_key.as_ref()) {
162                        self.0
163                            .insert(config_key.as_ref().to_string(), value.to_string());
164                    }
165                }
166                // Check for GOOGLE_STORAGE_TOKEN until GoogleConfigKey supports storage token
167                else if lowercase_key == token_key && !self.0.contains_key(token_key) {
168                    self.0.insert(token_key.to_string(), value.to_string());
169                }
170            }
171        }
172    }
173
174    /// Subset of options relevant for gcs storage
175    pub fn as_gcs_options(&self) -> HashMap<GoogleConfigKey, String> {
176        self.0
177            .iter()
178            .filter_map(|(key, value)| {
179                let gcs_key = GoogleConfigKey::from_str(&key.to_ascii_lowercase()).ok()?;
180                Some((gcs_key, value.clone()))
181            })
182            .collect()
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189    use std::sync::Arc;
190
191    use crate::object_store::test_utils::StaticMockStorageOptionsProvider;
192    use crate::object_store::{ObjectStoreParams, StorageOptionsAccessor};
193    use std::collections::HashMap;
194
195    #[test]
196    fn test_gcs_store_path() {
197        let provider = GcsStoreProvider;
198
199        let url = Url::parse("gs://bucket/path/to/file").unwrap();
200        let path = provider.extract_path(&url).unwrap();
201        let expected_path = object_store::path::Path::from("path/to/file");
202        assert_eq!(path, expected_path);
203    }
204
205    #[tokio::test]
206    async fn test_use_opendal_flag() {
207        let provider = GcsStoreProvider;
208        let url = Url::parse("gs://test-bucket/path").unwrap();
209        let params_with_flag = ObjectStoreParams {
210            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
211                HashMap::from([
212                    ("use_opendal".to_string(), "true".to_string()),
213                    (
214                        "service_account".to_string(),
215                        "test@example.iam.gserviceaccount.com".to_string(),
216                    ),
217                ]),
218            ))),
219            ..Default::default()
220        };
221
222        let store = provider
223            .new_store(url.clone(), &params_with_flag)
224            .await
225            .unwrap();
226        assert_eq!(store.scheme, "gs");
227    }
228
229    #[tokio::test]
230    async fn test_dynamic_gcp_credentials_provider() {
231        let accessor = Arc::new(StorageOptionsAccessor::with_provider(Arc::new(
232            StaticMockStorageOptionsProvider {
233                options: HashMap::from([(
234                    "google_storage_token".to_string(),
235                    "gcp-token".to_string(),
236                )]),
237            },
238        )));
239
240        let credentials = build_dynamic_credential_provider::<GcpCredential>(Some(accessor))
241            .await
242            .expect("dynamic gcp credentials should build")
243            .expect("expected credential provider")
244            .get_credential()
245            .await
246            .expect("expected gcp credential");
247
248        assert_eq!(credentials.bearer, "gcp-token");
249    }
250}