lance_io/object_store/providers/
gcp.rs1use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
5
6use object_store::ObjectStore as OSObjectStore;
7use object_store_opendal::OpendalStore;
8use opendal::{Operator, services::Gcs};
9
10use object_store::{
11 RetryConfig, StaticCredentialProvider,
12 gcp::{GcpCredential, GoogleCloudStorageBuilder, GoogleConfigKey},
13};
14use url::Url;
15
16use crate::object_store::{
17 DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, ObjectStore,
18 ObjectStoreParams, ObjectStoreProvider, StorageOptions, StorageOptionsAccessor,
19 dynamic_credentials::build_dynamic_credential_provider,
20 throttle::{AimdThrottleConfig, AimdThrottledStore},
21};
22use lance_core::error::{Error, Result};
23
24#[derive(Default, Debug)]
25pub struct GcsStoreProvider;
26
27impl GcsStoreProvider {
28 async fn build_opendal_gcs_store(
29 &self,
30 base_path: &Url,
31 storage_options: &StorageOptions,
32 ) -> Result<Arc<dyn OSObjectStore>> {
33 let bucket = base_path
34 .host_str()
35 .ok_or_else(|| Error::invalid_input("GCS URL must contain bucket name"))?
36 .to_string();
37
38 let prefix = base_path.path().trim_start_matches('/').to_string();
39
40 let mut config_map: HashMap<String, String> = storage_options.0.clone();
43
44 config_map.insert("bucket".to_string(), bucket);
46
47 if !prefix.is_empty() {
48 config_map.insert("root".to_string(), format!("/{}", prefix));
49 }
50
51 let operator = Operator::from_iter::<Gcs>(config_map)
52 .map_err(|e| Error::invalid_input(format!("Failed to create GCS operator: {:?}", e)))?
53 .finish();
54
55 Ok(Arc::new(OpendalStore::new(operator)) as Arc<dyn OSObjectStore>)
56 }
57
58 async fn build_google_cloud_store(
59 &self,
60 base_path: &Url,
61 storage_options: &StorageOptions,
62 accessor: Option<Arc<StorageOptionsAccessor>>,
63 ) -> Result<Arc<dyn OSObjectStore>> {
64 let retry_config = RetryConfig {
67 backoff: Default::default(),
68 max_retries: storage_options.client_max_retries(),
69 retry_timeout: Duration::from_secs(storage_options.client_retry_timeout()),
70 };
71
72 let mut builder = GoogleCloudStorageBuilder::new()
73 .with_url(base_path.as_ref())
74 .with_retry(retry_config)
75 .with_client_options(storage_options.client_options()?);
76 for (key, value) in storage_options.as_gcs_options() {
77 builder = builder.with_config(key, value);
78 }
79
80 if let Some(credentials) =
81 build_dynamic_credential_provider::<GcpCredential>(accessor).await?
82 {
83 builder = builder.with_credentials(credentials);
84 } else if let Some(storage_token) = storage_options.get("google_storage_token") {
85 let credential = GcpCredential {
86 bearer: storage_token.clone(),
87 };
88 let credential_provider = Arc::new(StaticCredentialProvider::new(credential)) as _;
89 builder = builder.with_credentials(credential_provider);
90 }
91
92 Ok(Arc::new(builder.build()?) as Arc<dyn OSObjectStore>)
93 }
94}
95
96#[async_trait::async_trait]
97impl ObjectStoreProvider for GcsStoreProvider {
98 async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
99 let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE);
100 let mut storage_options =
101 StorageOptions::new(params.storage_options().cloned().unwrap_or_default());
102 storage_options.with_env_gcs();
103 let download_retry_count = storage_options.download_retry_count();
104
105 let use_opendal = storage_options
106 .0
107 .get("use_opendal")
108 .map(|v| v.as_str() == "true")
109 .unwrap_or(false);
110
111 let accessor = params.get_accessor();
112
113 let inner = if use_opendal {
114 self.build_opendal_gcs_store(&base_path, &storage_options)
117 .await?
118 } else {
119 self.build_google_cloud_store(&base_path, &storage_options, accessor)
120 .await?
121 };
122 let throttle_config = AimdThrottleConfig::from_storage_options(params.storage_options())?;
123 let inner = if throttle_config.is_disabled() {
124 inner
125 } else {
126 Arc::new(AimdThrottledStore::new(inner, throttle_config)?) as Arc<dyn OSObjectStore>
127 };
128
129 Ok(ObjectStore {
130 inner,
131 scheme: String::from("gs"),
132 block_size,
133 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
134 use_constant_size_upload_parts: false,
135 list_is_lexically_ordered: true,
136 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
137 download_retry_count,
138 io_tracker: Default::default(),
139 store_prefix: self
140 .calculate_object_store_prefix(&base_path, params.storage_options())?,
141 })
142 }
143}
144
145impl StorageOptions {
146 pub fn with_env_gcs(&mut self) {
148 for (os_key, os_value) in std::env::vars_os() {
149 if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
150 let lowercase_key = key.to_ascii_lowercase();
151 let token_key = "google_storage_token";
152
153 if let Ok(config_key) = GoogleConfigKey::from_str(&lowercase_key) {
154 if !self.0.contains_key(config_key.as_ref()) {
155 self.0
156 .insert(config_key.as_ref().to_string(), value.to_string());
157 }
158 }
159 else if lowercase_key == token_key && !self.0.contains_key(token_key) {
161 self.0.insert(token_key.to_string(), value.to_string());
162 }
163 }
164 }
165 }
166
167 pub fn as_gcs_options(&self) -> HashMap<GoogleConfigKey, String> {
169 self.0
170 .iter()
171 .filter_map(|(key, value)| {
172 let gcs_key = GoogleConfigKey::from_str(&key.to_ascii_lowercase()).ok()?;
173 Some((gcs_key, value.clone()))
174 })
175 .collect()
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use std::sync::Arc;
183
184 use crate::object_store::test_utils::StaticMockStorageOptionsProvider;
185 use crate::object_store::{ObjectStoreParams, StorageOptionsAccessor};
186 use std::collections::HashMap;
187
188 #[test]
189 fn test_gcs_store_path() {
190 let provider = GcsStoreProvider;
191
192 let url = Url::parse("gs://bucket/path/to/file").unwrap();
193 let path = provider.extract_path(&url).unwrap();
194 let expected_path = object_store::path::Path::from("path/to/file");
195 assert_eq!(path, expected_path);
196 }
197
198 #[tokio::test]
199 async fn test_use_opendal_flag() {
200 let provider = GcsStoreProvider;
201 let url = Url::parse("gs://test-bucket/path").unwrap();
202 let params_with_flag = ObjectStoreParams {
203 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
204 HashMap::from([
205 ("use_opendal".to_string(), "true".to_string()),
206 (
207 "service_account".to_string(),
208 "test@example.iam.gserviceaccount.com".to_string(),
209 ),
210 ]),
211 ))),
212 ..Default::default()
213 };
214
215 let store = provider
216 .new_store(url.clone(), ¶ms_with_flag)
217 .await
218 .unwrap();
219 assert_eq!(store.scheme, "gs");
220 }
221
222 #[tokio::test]
223 async fn test_dynamic_gcp_credentials_provider() {
224 let accessor = Arc::new(StorageOptionsAccessor::with_provider(Arc::new(
225 StaticMockStorageOptionsProvider {
226 options: HashMap::from([(
227 "google_storage_token".to_string(),
228 "gcp-token".to_string(),
229 )]),
230 },
231 )));
232
233 let credentials = build_dynamic_credential_provider::<GcpCredential>(Some(accessor))
234 .await
235 .expect("dynamic gcp credentials should build")
236 .expect("expected credential provider")
237 .get_credential()
238 .await
239 .expect("expected gcp credential");
240
241 assert_eq!(credentials.bearer, "gcp-token");
242 }
243}