1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use deltalake_core::logstore::object_store::gcp::{GoogleCloudStorageBuilder, GoogleConfigKey};
6use deltalake_core::logstore::object_store::ObjectStoreScheme;
7use deltalake_core::logstore::{default_logstore, logstore_factories, LogStore, LogStoreFactory};
8use deltalake_core::logstore::{
9 object_store_factories, ObjectStoreFactory, ObjectStoreRef, StorageConfig,
10};
11use deltalake_core::{DeltaResult, DeltaTableError, Path};
12use object_store::client::SpawnedReqwestConnector;
13use url::Url;
14
15mod config;
16pub mod error;
17mod storage;
18
19trait GcpOptions {
20 fn as_gcp_options(&self) -> HashMap<GoogleConfigKey, String>;
21}
22
23impl GcpOptions for HashMap<String, String> {
24 fn as_gcp_options(&self) -> HashMap<GoogleConfigKey, String> {
25 self.iter()
26 .filter_map(|(key, value)| {
27 Some((
28 GoogleConfigKey::from_str(&key.to_ascii_lowercase()).ok()?,
29 value.clone(),
30 ))
31 })
32 .collect()
33 }
34}
35
36#[derive(Clone, Default, Debug)]
37pub struct GcpFactory {}
38
39impl ObjectStoreFactory for GcpFactory {
40 fn parse_url_opts(
41 &self,
42 url: &Url,
43 config: &StorageConfig,
44 ) -> DeltaResult<(ObjectStoreRef, Path)> {
45 let mut builder = GoogleCloudStorageBuilder::new().with_url(url.to_string());
46 builder = builder.with_retry(config.retry.clone());
47
48 if let Some(runtime) = &config.runtime {
49 builder =
50 builder.with_http_connector(SpawnedReqwestConnector::new(runtime.get_handle()));
51 }
52 let config = config::GcpConfigHelper::try_new(config.raw.as_gcp_options())?.build()?;
53
54 let (_, path) =
55 ObjectStoreScheme::parse(url).map_err(|e| DeltaTableError::GenericError {
56 source: Box::new(e),
57 })?;
58 let prefix = Path::parse(path)?;
59
60 for (key, value) in config.iter() {
61 builder = builder.with_config(*key, value.clone());
62 }
63
64 let store = crate::storage::GcsStorageBackend::try_new(Arc::new(builder.build()?))?;
65
66 Ok((Arc::new(store), prefix))
67 }
68}
69
70impl LogStoreFactory for GcpFactory {
71 fn with_options(
72 &self,
73 prefixed_store: ObjectStoreRef,
74 root_store: ObjectStoreRef,
75 location: &Url,
76 options: &StorageConfig,
77 ) -> DeltaResult<Arc<dyn LogStore>> {
78 Ok(default_logstore(
79 prefixed_store,
80 root_store,
81 location,
82 options,
83 ))
84 }
85}
86
87pub fn register_handlers(_additional_prefixes: Option<Url>) {
89 let factory = Arc::new(GcpFactory {});
90 let scheme = &"gs";
91 let url = Url::parse(&format!("{scheme}://")).unwrap();
92 object_store_factories().insert(url.clone(), factory.clone());
93 logstore_factories().insert(url.clone(), factory.clone());
94}