lance_io/object_store/providers/
gcp.rs1use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
5
6use object_store::ObjectStore as OSObjectStore;
7use object_store_opendal::OpendalStore;
8use opendal::{Operator, services::Gcs};
9
10use object_store::{
11 RetryConfig, StaticCredentialProvider,
12 gcp::{GcpCredential, GoogleCloudStorageBuilder, GoogleConfigKey},
13};
14use url::Url;
15
16use crate::object_store::{
17 DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, ObjectStore,
18 ObjectStoreParams, ObjectStoreProvider, StorageOptions,
19};
20use lance_core::error::{Error, Result};
21
22#[derive(Default, Debug)]
23pub struct GcsStoreProvider;
24
25impl GcsStoreProvider {
26 async fn build_opendal_gcs_store(
27 &self,
28 base_path: &Url,
29 storage_options: &StorageOptions,
30 ) -> Result<Arc<dyn OSObjectStore>> {
31 let bucket = base_path
32 .host_str()
33 .ok_or_else(|| Error::invalid_input("GCS URL must contain bucket name"))?
34 .to_string();
35
36 let prefix = base_path.path().trim_start_matches('/').to_string();
37
38 let mut config_map: HashMap<String, String> = storage_options.0.clone();
41
42 config_map.insert("bucket".to_string(), bucket);
44
45 if !prefix.is_empty() {
46 config_map.insert("root".to_string(), format!("/{}", prefix));
47 }
48
49 let operator = Operator::from_iter::<Gcs>(config_map)
50 .map_err(|e| Error::invalid_input(format!("Failed to create GCS operator: {:?}", e)))?
51 .finish();
52
53 Ok(Arc::new(OpendalStore::new(operator)) as Arc<dyn OSObjectStore>)
54 }
55
56 async fn build_google_cloud_store(
57 &self,
58 base_path: &Url,
59 storage_options: &StorageOptions,
60 ) -> Result<Arc<dyn OSObjectStore>> {
61 let max_retries = storage_options.client_max_retries();
62 let retry_timeout = storage_options.client_retry_timeout();
63 let retry_config = RetryConfig {
64 backoff: Default::default(),
65 max_retries,
66 retry_timeout: Duration::from_secs(retry_timeout),
67 };
68
69 let mut builder = GoogleCloudStorageBuilder::new()
70 .with_url(base_path.as_ref())
71 .with_retry(retry_config)
72 .with_client_options(storage_options.client_options()?);
73 for (key, value) in storage_options.as_gcs_options() {
74 builder = builder.with_config(key, value);
75 }
76 let token_key = "google_storage_token";
77 if let Some(storage_token) = storage_options.get(token_key) {
78 let credential = GcpCredential {
79 bearer: storage_token.clone(),
80 };
81 let credential_provider = Arc::new(StaticCredentialProvider::new(credential)) as _;
82 builder = builder.with_credentials(credential_provider);
83 }
84
85 Ok(Arc::new(builder.build()?) as Arc<dyn OSObjectStore>)
86 }
87}
88
89#[async_trait::async_trait]
90impl ObjectStoreProvider for GcsStoreProvider {
91 async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
92 let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE);
93 let mut storage_options =
94 StorageOptions(params.storage_options().cloned().unwrap_or_default());
95 storage_options.with_env_gcs();
96 let download_retry_count = storage_options.download_retry_count();
97
98 let use_opendal = storage_options
99 .0
100 .get("use_opendal")
101 .map(|v| v.as_str() == "true")
102 .unwrap_or(false);
103
104 let inner = if use_opendal {
105 self.build_opendal_gcs_store(&base_path, &storage_options)
106 .await?
107 } else {
108 self.build_google_cloud_store(&base_path, &storage_options)
109 .await?
110 };
111
112 Ok(ObjectStore {
113 inner,
114 scheme: String::from("gs"),
115 block_size,
116 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
117 use_constant_size_upload_parts: false,
118 list_is_lexically_ordered: true,
119 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
120 download_retry_count,
121 io_tracker: Default::default(),
122 store_prefix: self
123 .calculate_object_store_prefix(&base_path, params.storage_options())?,
124 })
125 }
126}
127
128impl StorageOptions {
129 pub fn with_env_gcs(&mut self) {
131 for (os_key, os_value) in std::env::vars_os() {
132 if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
133 let lowercase_key = key.to_ascii_lowercase();
134 let token_key = "google_storage_token";
135
136 if let Ok(config_key) = GoogleConfigKey::from_str(&lowercase_key) {
137 if !self.0.contains_key(config_key.as_ref()) {
138 self.0
139 .insert(config_key.as_ref().to_string(), value.to_string());
140 }
141 }
142 else if lowercase_key == token_key && !self.0.contains_key(token_key) {
144 self.0.insert(token_key.to_string(), value.to_string());
145 }
146 }
147 }
148 }
149
150 pub fn as_gcs_options(&self) -> HashMap<GoogleConfigKey, String> {
152 self.0
153 .iter()
154 .filter_map(|(key, value)| {
155 let gcs_key = GoogleConfigKey::from_str(&key.to_ascii_lowercase()).ok()?;
156 Some((gcs_key, value.clone()))
157 })
158 .collect()
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165 use crate::object_store::ObjectStoreParams;
166 use std::collections::HashMap;
167
168 #[test]
169 fn test_gcs_store_path() {
170 let provider = GcsStoreProvider;
171
172 let url = Url::parse("gs://bucket/path/to/file").unwrap();
173 let path = provider.extract_path(&url).unwrap();
174 let expected_path = object_store::path::Path::from("path/to/file");
175 assert_eq!(path, expected_path);
176 }
177
178 #[tokio::test]
179 async fn test_use_opendal_flag() {
180 use crate::object_store::StorageOptionsAccessor;
181 let provider = GcsStoreProvider;
182 let url = Url::parse("gs://test-bucket/path").unwrap();
183 let params_with_flag = ObjectStoreParams {
184 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
185 HashMap::from([
186 ("use_opendal".to_string(), "true".to_string()),
187 (
188 "service_account".to_string(),
189 "test@example.iam.gserviceaccount.com".to_string(),
190 ),
191 ]),
192 ))),
193 ..Default::default()
194 };
195
196 let store = provider
197 .new_store(url.clone(), ¶ms_with_flag)
198 .await
199 .unwrap();
200 assert_eq!(store.scheme, "gs");
201 }
202}