lance_io/object_store/providers/
gcp.rs1use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
5
6use object_store::ObjectStore as OSObjectStore;
7use object_store_opendal::OpendalStore;
8use opendal::{services::Gcs, Operator};
9use snafu::location;
10
11use object_store::{
12 gcp::{GcpCredential, GoogleCloudStorageBuilder, GoogleConfigKey},
13 RetryConfig, StaticCredentialProvider,
14};
15use url::Url;
16
17use crate::object_store::{
18 ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, DEFAULT_CLOUD_BLOCK_SIZE,
19 DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE,
20};
21use lance_core::error::{Error, Result};
22
23#[derive(Default, Debug)]
24pub struct GcsStoreProvider;
25
26impl GcsStoreProvider {
27 async fn build_opendal_gcs_store(
28 &self,
29 base_path: &Url,
30 storage_options: &StorageOptions,
31 ) -> Result<Arc<dyn OSObjectStore>> {
32 let bucket = base_path
33 .host_str()
34 .ok_or_else(|| Error::invalid_input("GCS URL must contain bucket name", location!()))?
35 .to_string();
36
37 let prefix = base_path.path().trim_start_matches('/').to_string();
38
39 let mut config_map: HashMap<String, String> = storage_options.0.clone();
42
43 config_map.insert("bucket".to_string(), bucket);
45
46 if !prefix.is_empty() {
47 config_map.insert("root".to_string(), format!("/{}", prefix));
48 }
49
50 let operator = Operator::from_iter::<Gcs>(config_map)
51 .map_err(|e| {
52 Error::invalid_input(
53 format!("Failed to create GCS operator: {:?}", e),
54 location!(),
55 )
56 })?
57 .finish();
58
59 Ok(Arc::new(OpendalStore::new(operator)) as Arc<dyn OSObjectStore>)
60 }
61
62 async fn build_google_cloud_store(
63 &self,
64 base_path: &Url,
65 storage_options: &StorageOptions,
66 ) -> Result<Arc<dyn OSObjectStore>> {
67 let max_retries = storage_options.client_max_retries();
68 let retry_timeout = storage_options.client_retry_timeout();
69 let retry_config = RetryConfig {
70 backoff: Default::default(),
71 max_retries,
72 retry_timeout: Duration::from_secs(retry_timeout),
73 };
74
75 let mut builder = GoogleCloudStorageBuilder::new()
76 .with_url(base_path.as_ref())
77 .with_retry(retry_config);
78 for (key, value) in storage_options.as_gcs_options() {
79 builder = builder.with_config(key, value);
80 }
81 let token_key = "google_storage_token";
82 if let Some(storage_token) = storage_options.get(token_key) {
83 let credential = GcpCredential {
84 bearer: storage_token.clone(),
85 };
86 let credential_provider = Arc::new(StaticCredentialProvider::new(credential)) as _;
87 builder = builder.with_credentials(credential_provider);
88 }
89
90 Ok(Arc::new(builder.build()?) as Arc<dyn OSObjectStore>)
91 }
92}
93
94#[async_trait::async_trait]
95impl ObjectStoreProvider for GcsStoreProvider {
96 async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
97 let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE);
98 let mut storage_options =
99 StorageOptions(params.storage_options.clone().unwrap_or_default());
100 storage_options.with_env_gcs();
101 let download_retry_count = storage_options.download_retry_count();
102
103 let use_opendal = storage_options
104 .0
105 .get("use_opendal")
106 .map(|v| v.as_str() == "true")
107 .unwrap_or(false);
108
109 let inner = if use_opendal {
110 self.build_opendal_gcs_store(&base_path, &storage_options)
111 .await?
112 } else {
113 self.build_google_cloud_store(&base_path, &storage_options)
114 .await?
115 };
116
117 Ok(ObjectStore {
118 inner,
119 scheme: String::from("gs"),
120 block_size,
121 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
122 use_constant_size_upload_parts: false,
123 list_is_lexically_ordered: true,
124 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
125 download_retry_count,
126 })
127 }
128}
129
130impl StorageOptions {
131 pub fn with_env_gcs(&mut self) {
133 for (os_key, os_value) in std::env::vars_os() {
134 if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
135 let lowercase_key = key.to_ascii_lowercase();
136 let token_key = "google_storage_token";
137
138 if let Ok(config_key) = GoogleConfigKey::from_str(&lowercase_key) {
139 if !self.0.contains_key(config_key.as_ref()) {
140 self.0
141 .insert(config_key.as_ref().to_string(), value.to_string());
142 }
143 }
144 else if lowercase_key == token_key && !self.0.contains_key(token_key) {
146 self.0.insert(token_key.to_string(), value.to_string());
147 }
148 }
149 }
150 }
151
152 pub fn as_gcs_options(&self) -> HashMap<GoogleConfigKey, String> {
154 self.0
155 .iter()
156 .filter_map(|(key, value)| {
157 let gcs_key = GoogleConfigKey::from_str(&key.to_ascii_lowercase()).ok()?;
158 Some((gcs_key, value.clone()))
159 })
160 .collect()
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use super::*;
167 use crate::object_store::ObjectStoreParams;
168 use std::collections::HashMap;
169
170 #[test]
171 fn test_gcs_store_path() {
172 let provider = GcsStoreProvider;
173
174 let url = Url::parse("gs://bucket/path/to/file").unwrap();
175 let path = provider.extract_path(&url).unwrap();
176 let expected_path = object_store::path::Path::from("path/to/file");
177 assert_eq!(path, expected_path);
178 }
179
180 #[tokio::test]
181 async fn test_use_opendal_flag() {
182 let provider = GcsStoreProvider;
183 let url = Url::parse("gs://test-bucket/path").unwrap();
184 let params_with_flag = ObjectStoreParams {
185 storage_options: Some(HashMap::from([
186 ("use_opendal".to_string(), "true".to_string()),
187 (
188 "service_account".to_string(),
189 "test@example.iam.gserviceaccount.com".to_string(),
190 ),
191 ])),
192 ..Default::default()
193 };
194
195 let store = provider
196 .new_store(url.clone(), ¶ms_with_flag)
197 .await
198 .unwrap();
199 assert_eq!(store.scheme, "gs");
200 }
201}