deltalake_gcp/
lib.rs

1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use deltalake_core::logstore::object_store::gcp::{GoogleCloudStorageBuilder, GoogleConfigKey};
6use deltalake_core::logstore::object_store::ObjectStoreScheme;
7use deltalake_core::logstore::{default_logstore, logstore_factories, LogStore, LogStoreFactory};
8use deltalake_core::logstore::{
9    object_store_factories, ObjectStoreFactory, ObjectStoreRef, StorageConfig,
10};
11use deltalake_core::{DeltaResult, DeltaTableError, Path};
12use object_store::client::SpawnedReqwestConnector;
13use url::Url;
14
15mod config;
16pub mod error;
17mod storage;
18
19trait GcpOptions {
20    fn as_gcp_options(&self) -> HashMap<GoogleConfigKey, String>;
21}
22
23impl GcpOptions for HashMap<String, String> {
24    fn as_gcp_options(&self) -> HashMap<GoogleConfigKey, String> {
25        self.iter()
26            .filter_map(|(key, value)| {
27                Some((
28                    GoogleConfigKey::from_str(&key.to_ascii_lowercase()).ok()?,
29                    value.clone(),
30                ))
31            })
32            .collect()
33    }
34}
35
36#[derive(Clone, Default, Debug)]
37pub struct GcpFactory {}
38
39impl ObjectStoreFactory for GcpFactory {
40    fn parse_url_opts(
41        &self,
42        url: &Url,
43        config: &StorageConfig,
44    ) -> DeltaResult<(ObjectStoreRef, Path)> {
45        let mut builder = GoogleCloudStorageBuilder::new().with_url(url.to_string());
46        builder = builder.with_retry(config.retry.clone());
47
48        if let Some(runtime) = &config.runtime {
49            builder =
50                builder.with_http_connector(SpawnedReqwestConnector::new(runtime.get_handle()));
51        }
52        let config = config::GcpConfigHelper::try_new(config.raw.as_gcp_options())?.build()?;
53
54        let (_, path) =
55            ObjectStoreScheme::parse(url).map_err(|e| DeltaTableError::GenericError {
56                source: Box::new(e),
57            })?;
58        let prefix = Path::parse(path)?;
59
60        for (key, value) in config.iter() {
61            builder = builder.with_config(*key, value.clone());
62        }
63
64        let store = crate::storage::GcsStorageBackend::try_new(Arc::new(builder.build()?))?;
65
66        Ok((Arc::new(store), prefix))
67    }
68}
69
70impl LogStoreFactory for GcpFactory {
71    fn with_options(
72        &self,
73        prefixed_store: ObjectStoreRef,
74        root_store: ObjectStoreRef,
75        location: &Url,
76        options: &StorageConfig,
77    ) -> DeltaResult<Arc<dyn LogStore>> {
78        Ok(default_logstore(
79            prefixed_store,
80            root_store,
81            location,
82            options,
83        ))
84    }
85}
86
87/// Register an [ObjectStoreFactory] for common Google Cloud [Url] schemes
88pub fn register_handlers(_additional_prefixes: Option<Url>) {
89    let factory = Arc::new(GcpFactory {});
90    let scheme = &"gs";
91    let url = Url::parse(&format!("{scheme}://")).unwrap();
92    object_store_factories().insert(url.clone(), factory.clone());
93    logstore_factories().insert(url.clone(), factory.clone());
94}