1use std::collections::HashMap;
2use std::sync::Arc;
3
4use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder, GoogleConfigKey};
5use object_store::ObjectStoreScheme;
6use pyo3::prelude::*;
7use pyo3::pybacked::PyBackedStr;
8use pyo3::types::{PyDict, PyString, PyTuple, PyType};
9use pyo3::{intern, IntoPyObjectExt};
10use url::Url;
11
12use crate::client::PyClientOptions;
13use crate::config::PyConfigValue;
14use crate::error::{GenericError, ParseUrlError, PyObjectStoreError, PyObjectStoreResult};
15use crate::gcp::credentials::PyGcpCredentialProvider;
16use crate::path::PyPath;
17use crate::retry::PyRetryConfig;
18use crate::{MaybePrefixedStore, PyUrl};
19
20#[derive(Debug, Clone, PartialEq)]
21struct GCSConfig {
22 prefix: Option<PyPath>,
23 config: PyGoogleConfig,
24 client_options: Option<PyClientOptions>,
25 retry_config: Option<PyRetryConfig>,
26 credential_provider: Option<PyGcpCredentialProvider>,
27}
28
29impl GCSConfig {
30 fn bucket(&self) -> &str {
31 self.config
32 .0
33 .get(&PyGoogleConfigKey(GoogleConfigKey::Bucket))
34 .expect("Bucket should always exist in the config")
35 .as_ref()
36 }
37
38 fn __getnewargs_ex__<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
39 let args = PyTuple::empty(py).into_bound_py_any(py)?;
40 let kwargs = PyDict::new(py);
41
42 if let Some(prefix) = &self.prefix {
43 kwargs.set_item(intern!(py, "prefix"), prefix.as_ref().as_ref())?;
44 }
45 kwargs.set_item(intern!(py, "config"), &self.config)?;
46 if let Some(client_options) = &self.client_options {
47 kwargs.set_item(intern!(py, "client_options"), client_options)?;
48 }
49 if let Some(retry_config) = &self.retry_config {
50 kwargs.set_item(intern!(py, "retry_config"), retry_config)?;
51 }
52 if let Some(credential_provider) = &self.credential_provider {
53 kwargs.set_item("credential_provider", credential_provider)?;
54 }
55
56 PyTuple::new(py, [args, kwargs.into_bound_py_any(py)?])
57 }
58}
59
60#[derive(Debug, Clone)]
62#[pyclass(name = "GCSStore", frozen, subclass, from_py_object)]
63pub struct PyGCSStore {
64 store: Arc<MaybePrefixedStore<GoogleCloudStorage>>,
65 config: GCSConfig,
67}
68
69impl AsRef<Arc<MaybePrefixedStore<GoogleCloudStorage>>> for PyGCSStore {
70 fn as_ref(&self) -> &Arc<MaybePrefixedStore<GoogleCloudStorage>> {
71 &self.store
72 }
73}
74
75impl PyGCSStore {
76 pub fn into_inner(self) -> Arc<MaybePrefixedStore<GoogleCloudStorage>> {
78 self.store
79 }
80}
81
82#[pymethods]
83impl PyGCSStore {
84 #[new]
86 #[pyo3(signature = (bucket=None, *, prefix=None, config=None, client_options=None, retry_config=None, credential_provider=None, **kwargs))]
87 fn new(
88 bucket: Option<String>,
89 prefix: Option<PyPath>,
90 config: Option<PyGoogleConfig>,
91 client_options: Option<PyClientOptions>,
92 retry_config: Option<PyRetryConfig>,
93 credential_provider: Option<PyGcpCredentialProvider>,
94 kwargs: Option<PyGoogleConfig>,
95 ) -> PyObjectStoreResult<Self> {
96 let mut builder = GoogleCloudStorageBuilder::from_env();
97 let mut config = config.unwrap_or_default();
98 if let Some(bucket) = bucket.clone() {
99 config.insert_raising_if_exists(GoogleConfigKey::Bucket, bucket)?;
102 }
103 let combined_config = combine_config_kwargs(Some(config), kwargs)?;
104 builder = combined_config.clone().apply_config(builder);
105 if let Some(client_options) = client_options.clone() {
106 builder = builder.with_client_options(client_options.into())
107 }
108 if let Some(retry_config) = retry_config.clone() {
109 builder = builder.with_retry(retry_config.into())
110 }
111 if let Some(credential_provider) = credential_provider.clone() {
112 builder = builder.with_credentials(Arc::new(credential_provider));
113 }
114 Ok(Self {
115 store: Arc::new(MaybePrefixedStore::new(builder.build()?, prefix.clone())),
116 config: GCSConfig {
117 prefix,
118 config: combined_config,
119 client_options,
120 retry_config,
121 credential_provider,
122 },
123 })
124 }
125
126 #[classmethod]
127 #[pyo3(signature = (url, *, config=None, client_options=None, retry_config=None, credential_provider=None, **kwargs))]
128 pub(crate) fn from_url<'py>(
129 cls: &Bound<'py, PyType>,
130 url: PyUrl,
131 config: Option<PyGoogleConfig>,
132 client_options: Option<PyClientOptions>,
133 retry_config: Option<PyRetryConfig>,
134 credential_provider: Option<PyGcpCredentialProvider>,
135 kwargs: Option<PyGoogleConfig>,
136 ) -> PyObjectStoreResult<Bound<'py, PyAny>> {
137 let (_, prefix) =
140 ObjectStoreScheme::parse(url.as_ref()).map_err(object_store::Error::from)?;
141 let prefix: Option<String> = if prefix.parts().count() != 0 {
142 Some(prefix.into())
143 } else {
144 None
145 };
146 let config = parse_url(config, url.as_ref())?;
147
148 let kwargs = kwargs.unwrap_or_default().into_pyobject(cls.py())?;
151 kwargs.set_item("prefix", prefix)?;
152 kwargs.set_item("config", config)?;
153 kwargs.set_item("client_options", client_options)?;
154 kwargs.set_item("retry_config", retry_config)?;
155 kwargs.set_item("credential_provider", credential_provider)?;
156 Ok(cls.call((), Some(&kwargs))?)
157 }
158
159 fn __eq__(&self, other: &Bound<PyAny>) -> bool {
160 other
163 .cast::<PyGCSStore>()
164 .map(|other| self.config == other.get().config)
165 .unwrap_or(false)
166 }
167
168 fn __getnewargs_ex__<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
169 self.config.__getnewargs_ex__(py)
170 }
171
172 fn __repr__(&self) -> String {
173 let bucket = self.config.bucket();
174 if let Some(prefix) = &self.config.prefix {
175 format!(
176 "GCSStore(bucket=\"{}\", prefix=\"{}\")",
177 bucket,
178 prefix.as_ref()
179 )
180 } else {
181 format!("GCSStore(bucket=\"{bucket}\")")
182 }
183 }
184
185 #[getter]
186 fn prefix(&self) -> Option<&PyPath> {
187 self.config.prefix.as_ref()
188 }
189
190 #[getter]
191 fn config(&self) -> &PyGoogleConfig {
192 &self.config.config
193 }
194
195 #[getter]
196 fn client_options(&self) -> Option<&PyClientOptions> {
197 self.config.client_options.as_ref()
198 }
199
200 #[getter]
201 fn credential_provider(&self) -> Option<&PyGcpCredentialProvider> {
202 self.config.credential_provider.as_ref()
203 }
204
205 #[getter]
206 fn retry_config(&self) -> Option<&PyRetryConfig> {
207 self.config.retry_config.as_ref()
208 }
209}
210
211#[derive(Clone, Debug, PartialEq, Eq, Hash)]
212pub struct PyGoogleConfigKey(GoogleConfigKey);
213
214impl<'py> FromPyObject<'_, 'py> for PyGoogleConfigKey {
215 type Error = PyErr;
216
217 fn extract(obj: Borrowed<'_, 'py, pyo3::PyAny>) -> PyResult<Self> {
218 let s = obj.extract::<PyBackedStr>()?.to_lowercase();
219 if s == "application_credentials" {
221 return Ok(Self(GoogleConfigKey::ApplicationCredentials));
222 }
223
224 let key = s.parse().map_err(PyObjectStoreError::ObjectStoreError)?;
225 Ok(Self(key))
226 }
227}
228
229impl AsRef<str> for PyGoogleConfigKey {
230 fn as_ref(&self) -> &str {
231 self.0.as_ref()
232 }
233}
234
235impl<'py> IntoPyObject<'py> for PyGoogleConfigKey {
236 type Target = PyString;
237 type Output = Bound<'py, PyString>;
238 type Error = PyErr;
239
240 fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
241 (&self).into_pyobject(py)
242 }
243}
244
245impl<'py> IntoPyObject<'py> for &PyGoogleConfigKey {
246 type Target = PyString;
247 type Output = Bound<'py, PyString>;
248 type Error = PyErr;
249
250 fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
251 let s = self
252 .0
253 .as_ref()
254 .strip_prefix("google_")
255 .expect("Expected config prefix to start with google_");
256 Ok(PyString::new(py, s))
257 }
258}
259
260impl From<GoogleConfigKey> for PyGoogleConfigKey {
261 fn from(value: GoogleConfigKey) -> Self {
262 Self(value)
263 }
264}
265
266impl From<PyGoogleConfigKey> for GoogleConfigKey {
267 fn from(value: PyGoogleConfigKey) -> Self {
268 value.0
269 }
270}
271
272#[derive(Clone, Debug, Default, PartialEq, Eq, IntoPyObject, IntoPyObjectRef)]
273pub struct PyGoogleConfig(HashMap<PyGoogleConfigKey, PyConfigValue>);
274
275impl<'py> FromPyObject<'_, 'py> for PyGoogleConfig {
280 type Error = PyErr;
281
282 fn extract(obj: Borrowed<'_, 'py, pyo3::PyAny>) -> PyResult<Self> {
283 let mut slf = Self::new();
284 for (key, val) in obj.extract::<Bound<'py, PyDict>>()?.iter() {
285 slf.insert_raising_if_exists(
286 key.extract::<PyGoogleConfigKey>()?,
287 val.extract::<PyConfigValue>()?,
288 )?;
289 }
290 Ok(slf)
291 }
292}
293
294impl PyGoogleConfig {
295 fn new() -> Self {
296 Self(HashMap::new())
297 }
298
299 fn apply_config(self, mut builder: GoogleCloudStorageBuilder) -> GoogleCloudStorageBuilder {
300 for (key, value) in self.0.into_iter() {
301 builder = builder.with_config(key.0, value.0);
302 }
303 builder
304 }
305
306 fn merge(mut self, other: PyGoogleConfig) -> PyObjectStoreResult<PyGoogleConfig> {
307 for (key, val) in other.0.into_iter() {
308 self.insert_raising_if_exists(key, val)?;
309 }
310
311 Ok(self)
312 }
313
314 fn insert_raising_if_exists(
315 &mut self,
316 key: impl Into<PyGoogleConfigKey>,
317 val: impl Into<String>,
318 ) -> PyObjectStoreResult<()> {
319 let key = key.into();
320 let old_value = self.0.insert(key.clone(), PyConfigValue::new(val.into()));
321 if old_value.is_some() {
322 return Err(GenericError::new_err(format!(
323 "Duplicate key {} provided",
324 key.0.as_ref()
325 ))
326 .into());
327 }
328
329 Ok(())
330 }
331
332 fn insert_if_not_exists(&mut self, key: impl Into<PyGoogleConfigKey>, val: impl Into<String>) {
337 self.0.entry(key.into()).or_insert(PyConfigValue::new(val));
338 }
339}
340
341fn combine_config_kwargs(
342 config: Option<PyGoogleConfig>,
343 kwargs: Option<PyGoogleConfig>,
344) -> PyObjectStoreResult<PyGoogleConfig> {
345 match (config, kwargs) {
346 (None, None) => Ok(Default::default()),
347 (Some(x), None) | (None, Some(x)) => Ok(x),
348 (Some(config), Some(kwargs)) => Ok(config.merge(kwargs)?),
349 }
350}
351
352fn parse_url(config: Option<PyGoogleConfig>, parsed: &Url) -> object_store::Result<PyGoogleConfig> {
362 let host = parsed
363 .host_str()
364 .ok_or_else(|| ParseUrlError::UrlNotRecognised {
365 url: parsed.as_str().to_string(),
366 })?;
367 let mut config = config.unwrap_or_default();
368
369 match parsed.scheme() {
370 "gs" => {
371 config.insert_if_not_exists(GoogleConfigKey::Bucket, host);
372 }
373 scheme => {
374 let scheme = scheme.to_string();
375 return Err(ParseUrlError::UnknownUrlScheme { scheme }.into());
376 }
377 }
378
379 Ok(config)
380}