lance_io/object_store/providers/
gcp.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
5
6use object_store::ObjectStore as OSObjectStore;
7use object_store_opendal::OpendalStore;
8use opendal::{services::Gcs, Operator};
9use snafu::location;
10
11use object_store::{
12    gcp::{GcpCredential, GoogleCloudStorageBuilder, GoogleConfigKey},
13    RetryConfig, StaticCredentialProvider,
14};
15use url::Url;
16
17use crate::object_store::{
18    ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, DEFAULT_CLOUD_BLOCK_SIZE,
19    DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE,
20};
21use lance_core::error::{Error, Result};
22
23#[derive(Default, Debug)]
24pub struct GcsStoreProvider;
25
26impl GcsStoreProvider {
27    async fn build_opendal_gcs_store(
28        &self,
29        base_path: &Url,
30        storage_options: &StorageOptions,
31    ) -> Result<Arc<dyn OSObjectStore>> {
32        let bucket = base_path
33            .host_str()
34            .ok_or_else(|| Error::invalid_input("GCS URL must contain bucket name", location!()))?
35            .to_string();
36
37        let prefix = base_path.path().trim_start_matches('/').to_string();
38
39        // Start with all storage options as the config map
40        // OpenDAL will handle environment variables through its default credentials chain
41        let mut config_map: HashMap<String, String> = storage_options.0.clone();
42
43        // Set required OpenDAL configuration
44        config_map.insert("bucket".to_string(), bucket);
45
46        if !prefix.is_empty() {
47            config_map.insert("root".to_string(), format!("/{}", prefix));
48        }
49
50        let operator = Operator::from_iter::<Gcs>(config_map)
51            .map_err(|e| {
52                Error::invalid_input(
53                    format!("Failed to create GCS operator: {:?}", e),
54                    location!(),
55                )
56            })?
57            .finish();
58
59        Ok(Arc::new(OpendalStore::new(operator)) as Arc<dyn OSObjectStore>)
60    }
61
62    async fn build_google_cloud_store(
63        &self,
64        base_path: &Url,
65        storage_options: &StorageOptions,
66    ) -> Result<Arc<dyn OSObjectStore>> {
67        let max_retries = storage_options.client_max_retries();
68        let retry_timeout = storage_options.client_retry_timeout();
69        let retry_config = RetryConfig {
70            backoff: Default::default(),
71            max_retries,
72            retry_timeout: Duration::from_secs(retry_timeout),
73        };
74
75        let mut builder = GoogleCloudStorageBuilder::new()
76            .with_url(base_path.as_ref())
77            .with_retry(retry_config);
78        for (key, value) in storage_options.as_gcs_options() {
79            builder = builder.with_config(key, value);
80        }
81        let token_key = "google_storage_token";
82        if let Some(storage_token) = storage_options.get(token_key) {
83            let credential = GcpCredential {
84                bearer: storage_token.clone(),
85            };
86            let credential_provider = Arc::new(StaticCredentialProvider::new(credential)) as _;
87            builder = builder.with_credentials(credential_provider);
88        }
89
90        Ok(Arc::new(builder.build()?) as Arc<dyn OSObjectStore>)
91    }
92}
93
94#[async_trait::async_trait]
95impl ObjectStoreProvider for GcsStoreProvider {
96    async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
97        let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE);
98        let mut storage_options =
99            StorageOptions(params.storage_options.clone().unwrap_or_default());
100        storage_options.with_env_gcs();
101        let download_retry_count = storage_options.download_retry_count();
102
103        let use_opendal = storage_options
104            .0
105            .get("use_opendal")
106            .map(|v| v.as_str() == "true")
107            .unwrap_or(false);
108
109        let inner = if use_opendal {
110            self.build_opendal_gcs_store(&base_path, &storage_options)
111                .await?
112        } else {
113            self.build_google_cloud_store(&base_path, &storage_options)
114                .await?
115        };
116
117        Ok(ObjectStore {
118            inner,
119            scheme: String::from("gs"),
120            block_size,
121            max_iop_size: *DEFAULT_MAX_IOP_SIZE,
122            use_constant_size_upload_parts: false,
123            list_is_lexically_ordered: true,
124            io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
125            download_retry_count,
126            io_tracker: Default::default(),
127        })
128    }
129}
130
131impl StorageOptions {
132    /// Add values from the environment to storage options
133    pub fn with_env_gcs(&mut self) {
134        for (os_key, os_value) in std::env::vars_os() {
135            if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
136                let lowercase_key = key.to_ascii_lowercase();
137                let token_key = "google_storage_token";
138
139                if let Ok(config_key) = GoogleConfigKey::from_str(&lowercase_key) {
140                    if !self.0.contains_key(config_key.as_ref()) {
141                        self.0
142                            .insert(config_key.as_ref().to_string(), value.to_string());
143                    }
144                }
145                // Check for GOOGLE_STORAGE_TOKEN until GoogleConfigKey supports storage token
146                else if lowercase_key == token_key && !self.0.contains_key(token_key) {
147                    self.0.insert(token_key.to_string(), value.to_string());
148                }
149            }
150        }
151    }
152
153    /// Subset of options relevant for gcs storage
154    pub fn as_gcs_options(&self) -> HashMap<GoogleConfigKey, String> {
155        self.0
156            .iter()
157            .filter_map(|(key, value)| {
158                let gcs_key = GoogleConfigKey::from_str(&key.to_ascii_lowercase()).ok()?;
159                Some((gcs_key, value.clone()))
160            })
161            .collect()
162    }
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168    use crate::object_store::ObjectStoreParams;
169    use std::collections::HashMap;
170
171    #[test]
172    fn test_gcs_store_path() {
173        let provider = GcsStoreProvider;
174
175        let url = Url::parse("gs://bucket/path/to/file").unwrap();
176        let path = provider.extract_path(&url).unwrap();
177        let expected_path = object_store::path::Path::from("path/to/file");
178        assert_eq!(path, expected_path);
179    }
180
181    #[tokio::test]
182    async fn test_use_opendal_flag() {
183        let provider = GcsStoreProvider;
184        let url = Url::parse("gs://test-bucket/path").unwrap();
185        let params_with_flag = ObjectStoreParams {
186            storage_options: Some(HashMap::from([
187                ("use_opendal".to_string(), "true".to_string()),
188                (
189                    "service_account".to_string(),
190                    "test@example.iam.gserviceaccount.com".to_string(),
191                ),
192            ])),
193            ..Default::default()
194        };
195
196        let store = provider
197            .new_store(url.clone(), &params_with_flag)
198            .await
199            .unwrap();
200        assert_eq!(store.scheme, "gs");
201    }
202}