lance_io/object_store/providers/
gcp.rs1use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
5
6use object_store::{
7 gcp::{GcpCredential, GoogleCloudStorageBuilder, GoogleConfigKey},
8 RetryConfig, StaticCredentialProvider,
9};
10use url::Url;
11
12use crate::object_store::{
13 ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, DEFAULT_CLOUD_BLOCK_SIZE,
14 DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE,
15};
16use lance_core::error::Result;
17
18#[derive(Default, Debug)]
19pub struct GcsStoreProvider;
20
21#[async_trait::async_trait]
22impl ObjectStoreProvider for GcsStoreProvider {
23 async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
24 let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE);
25 let mut storage_options =
26 StorageOptions(params.storage_options.clone().unwrap_or_default());
27 let download_retry_count = storage_options.download_retry_count();
28
29 let max_retries = storage_options.client_max_retries();
30 let retry_timeout = storage_options.client_retry_timeout();
31 let retry_config = RetryConfig {
32 backoff: Default::default(),
33 max_retries,
34 retry_timeout: Duration::from_secs(retry_timeout),
35 };
36
37 storage_options.with_env_gcs();
38 let mut builder = GoogleCloudStorageBuilder::new()
39 .with_url(base_path.as_ref())
40 .with_retry(retry_config);
41 for (key, value) in storage_options.as_gcs_options() {
42 builder = builder.with_config(key, value);
43 }
44 let token_key = "google_storage_token";
45 if let Some(storage_token) = storage_options.get(token_key) {
46 let credential = GcpCredential {
47 bearer: storage_token.to_string(),
48 };
49 let credential_provider = Arc::new(StaticCredentialProvider::new(credential)) as _;
50 builder = builder.with_credentials(credential_provider);
51 }
52 let inner = Arc::new(builder.build()?);
53
54 Ok(ObjectStore {
55 inner,
56 scheme: String::from("gs"),
57 block_size,
58 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
59 use_constant_size_upload_parts: false,
60 list_is_lexically_ordered: true,
61 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
62 download_retry_count,
63 })
64 }
65}
66
67impl StorageOptions {
68 pub fn with_env_gcs(&mut self) {
70 for (os_key, os_value) in std::env::vars_os() {
71 if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
72 let lowercase_key = key.to_ascii_lowercase();
73 let token_key = "google_storage_token";
74
75 if let Ok(config_key) = GoogleConfigKey::from_str(&lowercase_key) {
76 if !self.0.contains_key(config_key.as_ref()) {
77 self.0
78 .insert(config_key.as_ref().to_string(), value.to_string());
79 }
80 }
81 else if lowercase_key == token_key && !self.0.contains_key(token_key) {
83 self.0.insert(token_key.to_string(), value.to_string());
84 }
85 }
86 }
87 }
88
89 pub fn as_gcs_options(&self) -> HashMap<GoogleConfigKey, String> {
91 self.0
92 .iter()
93 .filter_map(|(key, value)| {
94 let gcs_key = GoogleConfigKey::from_str(&key.to_ascii_lowercase()).ok()?;
95 Some((gcs_key, value.clone()))
96 })
97 .collect()
98 }
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104
105 #[test]
106 fn test_gcs_store_path() {
107 let provider = GcsStoreProvider;
108
109 let url = Url::parse("gs://bucket/path/to/file").unwrap();
110 let path = provider.extract_path(&url);
111 let expected_path = object_store::path::Path::from("path/to/file");
112 assert_eq!(path, expected_path);
113 }
114}