1use std::collections::HashMap;
2use std::sync::Arc;
3
4use itertools::Itertools;
5use object_store::aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey};
6use object_store::ObjectStoreScheme;
7use pyo3::prelude::*;
8use pyo3::pybacked::PyBackedStr;
9use pyo3::types::{PyDict, PyString, PyTuple, PyType};
10use pyo3::{intern, IntoPyObjectExt};
11use url::Url;
12
13use crate::aws::credentials::PyAWSCredentialProvider;
14use crate::client::PyClientOptions;
15use crate::config::PyConfigValue;
16use crate::error::{GenericError, ParseUrlError, PyObjectStoreError, PyObjectStoreResult};
17use crate::path::PyPath;
18use crate::prefix::MaybePrefixedStore;
19use crate::retry::PyRetryConfig;
20use crate::PyUrl;
21
22#[derive(Debug, Clone, PartialEq)]
23struct S3Config {
24 prefix: Option<PyPath>,
25 config: PyAmazonS3Config,
26 client_options: Option<PyClientOptions>,
27 retry_config: Option<PyRetryConfig>,
28 credential_provider: Option<PyAWSCredentialProvider>,
29}
30
31impl S3Config {
32 fn bucket(&self) -> &str {
33 self.config
34 .0
35 .get(&PyAmazonS3ConfigKey(AmazonS3ConfigKey::Bucket))
36 .expect("bucket should always exist in the config")
37 .as_ref()
38 }
39
40 fn __getnewargs_ex__<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
41 let args = PyTuple::empty(py).into_bound_py_any(py)?;
42 let kwargs = PyDict::new(py);
43
44 if let Some(prefix) = &self.prefix {
45 kwargs.set_item(intern!(py, "prefix"), prefix.as_ref().as_ref())?;
46 }
47 kwargs.set_item(intern!(py, "config"), &self.config)?;
48 if let Some(client_options) = &self.client_options {
49 kwargs.set_item(intern!(py, "client_options"), client_options)?;
50 }
51 if let Some(retry_config) = &self.retry_config {
52 kwargs.set_item(intern!(py, "retry_config"), retry_config)?;
53 }
54 if let Some(credential_provider) = &self.credential_provider {
55 kwargs.set_item("credential_provider", credential_provider)?;
56 }
57
58 PyTuple::new(py, [args, kwargs.into_bound_py_any(py)?])
59 }
60}
61
62#[derive(Debug, Clone)]
64#[pyclass(name = "S3Store", frozen, subclass, from_py_object)]
65pub struct PyS3Store {
66 store: Arc<MaybePrefixedStore<AmazonS3>>,
67 config: S3Config,
69}
70
71impl AsRef<Arc<MaybePrefixedStore<AmazonS3>>> for PyS3Store {
72 fn as_ref(&self) -> &Arc<MaybePrefixedStore<AmazonS3>> {
73 &self.store
74 }
75}
76
77impl PyS3Store {
78 pub fn into_inner(self) -> Arc<MaybePrefixedStore<AmazonS3>> {
80 self.store
81 }
82}
83
84#[pymethods]
85impl PyS3Store {
86 #[new]
88 #[pyo3(signature = (bucket=None, *, prefix=None, config=None, client_options=None, retry_config=None, credential_provider=None, **kwargs))]
89 fn new(
90 bucket: Option<String>,
91 prefix: Option<PyPath>,
92 config: Option<PyAmazonS3Config>,
93 client_options: Option<PyClientOptions>,
94 retry_config: Option<PyRetryConfig>,
95 credential_provider: Option<PyAWSCredentialProvider>,
96 kwargs: Option<PyAmazonS3Config>,
97 ) -> PyObjectStoreResult<Self> {
98 let mut builder = AmazonS3Builder::from_env();
99 let mut config = config.unwrap_or_default();
100
101 if let Some(bucket) = bucket {
102 config.insert_raising_if_exists(AmazonS3ConfigKey::Bucket, bucket)?;
105 }
106
107 let mut combined_config = combine_config_kwargs(config, kwargs)?;
108
109 if let Some(client_options) = client_options.clone() {
110 builder = builder.with_client_options(client_options.into())
111 }
112 if let Some(retry_config) = retry_config.clone() {
113 builder = builder.with_retry(retry_config.into())
114 }
115
116 if let Some(credential_provider) = credential_provider.clone() {
117 if let Some(credential_config) = credential_provider.config() {
119 for (key, val) in credential_config.0.iter() {
120 combined_config.insert_if_not_exists(key.clone(), val.clone());
122 }
123 }
124 builder = builder.with_credentials(Arc::new(credential_provider));
125 }
126
127 builder = combined_config.clone().apply_config(builder);
128
129 Ok(Self {
130 store: Arc::new(MaybePrefixedStore::new(builder.build()?, prefix.clone())),
131 config: S3Config {
132 prefix,
133 config: combined_config,
134 client_options,
135 retry_config,
136 credential_provider,
137 },
138 })
139 }
140
141 #[classmethod]
142 #[pyo3(signature = (url, *, config=None, client_options=None, retry_config=None, credential_provider=None, **kwargs))]
143 pub(crate) fn from_url<'py>(
144 cls: &Bound<'py, PyType>,
145 url: PyUrl,
146 config: Option<PyAmazonS3Config>,
147 client_options: Option<PyClientOptions>,
148 retry_config: Option<PyRetryConfig>,
149 credential_provider: Option<PyAWSCredentialProvider>,
150 kwargs: Option<PyAmazonS3Config>,
151 ) -> PyObjectStoreResult<Bound<'py, PyAny>> {
152 let (_, prefix) =
155 ObjectStoreScheme::parse(url.as_ref()).map_err(object_store::Error::from)?;
156 let prefix: Option<String> = if prefix.parts().count() != 0 {
157 Some(prefix.into())
158 } else {
159 None
160 };
161 let config = parse_url(config, url.as_ref())?;
162
163 let kwargs = kwargs.unwrap_or_default().into_pyobject(cls.py())?;
166 kwargs.set_item("prefix", prefix)?;
167 kwargs.set_item("config", config)?;
168 kwargs.set_item("client_options", client_options)?;
169 kwargs.set_item("retry_config", retry_config)?;
170 kwargs.set_item("credential_provider", credential_provider)?;
171 Ok(cls.call((), Some(&kwargs))?)
172 }
173
174 fn __eq__(&self, other: &Bound<PyAny>) -> bool {
175 other
177 .cast::<PyS3Store>()
178 .map(|other| self.config == other.get().config)
179 .unwrap_or(false)
180 }
181
182 fn __getnewargs_ex__<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
183 self.config.__getnewargs_ex__(py)
184 }
185
186 fn __repr__(&self) -> String {
187 let bucket = self.config.bucket();
188 if let Some(prefix) = &self.config.prefix {
189 format!(
190 "S3Store(bucket=\"{}\", prefix=\"{}\")",
191 bucket,
192 prefix.as_ref()
193 )
194 } else {
195 format!("S3Store(bucket=\"{bucket}\")")
196 }
197 }
198
199 #[getter]
200 fn prefix(&self) -> Option<&PyPath> {
201 self.config.prefix.as_ref()
202 }
203
204 #[getter]
205 fn config(&self) -> &PyAmazonS3Config {
206 &self.config.config
207 }
208
209 #[getter]
210 fn client_options(&self) -> Option<&PyClientOptions> {
211 self.config.client_options.as_ref()
212 }
213
214 #[getter]
215 fn credential_provider(&self) -> Option<&PyAWSCredentialProvider> {
216 self.config.credential_provider.as_ref()
217 }
218
219 #[getter]
220 fn retry_config(&self) -> Option<&PyRetryConfig> {
221 self.config.retry_config.as_ref()
222 }
223}
224
225#[derive(Clone, Debug, PartialEq, Eq, Hash)]
226pub struct PyAmazonS3ConfigKey(AmazonS3ConfigKey);
227
228impl<'py> FromPyObject<'_, 'py> for PyAmazonS3ConfigKey {
229 type Error = PyErr;
230
231 fn extract(obj: Borrowed<'_, 'py, pyo3::PyAny>) -> PyResult<Self> {
232 let s = obj.extract::<PyBackedStr>()?.to_lowercase();
233 let key = s.parse().map_err(PyObjectStoreError::ObjectStoreError)?;
234 Ok(Self(key))
235 }
236}
237
238impl AsRef<str> for PyAmazonS3ConfigKey {
239 fn as_ref(&self) -> &str {
240 self.0.as_ref()
241 }
242}
243
244impl<'py> IntoPyObject<'py> for &PyAmazonS3ConfigKey {
245 type Target = PyString;
246 type Output = Bound<'py, PyString>;
247 type Error = PyErr;
248
249 fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
250 let s = self
251 .0
252 .as_ref()
253 .strip_prefix("aws_")
254 .expect("Expected config prefix to start with aws_");
255 Ok(PyString::new(py, s))
256 }
257}
258
259impl<'py> IntoPyObject<'py> for PyAmazonS3ConfigKey {
260 type Target = PyString;
261 type Output = Bound<'py, PyString>;
262 type Error = PyErr;
263
264 fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
265 (&self).into_pyobject(py)
266 }
267}
268
269impl From<AmazonS3ConfigKey> for PyAmazonS3ConfigKey {
270 fn from(value: AmazonS3ConfigKey) -> Self {
271 Self(value)
272 }
273}
274
275impl From<PyAmazonS3ConfigKey> for AmazonS3ConfigKey {
276 fn from(value: PyAmazonS3ConfigKey) -> Self {
277 value.0
278 }
279}
280
281#[derive(Clone, Debug, Default, PartialEq, Eq, IntoPyObject, IntoPyObjectRef)]
282pub struct PyAmazonS3Config(HashMap<PyAmazonS3ConfigKey, PyConfigValue>);
283
284impl<'py> FromPyObject<'_, 'py> for PyAmazonS3Config {
289 type Error = PyErr;
290
291 fn extract(obj: Borrowed<'_, 'py, pyo3::PyAny>) -> PyResult<Self> {
292 let mut slf = Self::new();
293 for (key, val) in obj.extract::<Bound<'py, PyDict>>()?.iter() {
294 slf.insert_raising_if_exists(
295 key.extract::<PyAmazonS3ConfigKey>()?,
296 val.extract::<PyConfigValue>()?,
297 )?;
298 }
299 Ok(slf)
300 }
301}
302
303impl PyAmazonS3Config {
304 fn new() -> Self {
305 Self(HashMap::new())
306 }
307
308 fn apply_config(self, mut builder: AmazonS3Builder) -> AmazonS3Builder {
309 for (key, value) in self.0.into_iter() {
310 builder = builder.with_config(key.0, value.0);
311 }
312 builder
313 }
314
315 fn merge(mut self, other: PyAmazonS3Config) -> PyObjectStoreResult<PyAmazonS3Config> {
316 for (key, val) in other.0.into_iter() {
317 self.insert_raising_if_exists(key, val)?;
318 }
319
320 Ok(self)
321 }
322
323 fn insert_raising_if_exists(
324 &mut self,
325 key: impl Into<PyAmazonS3ConfigKey>,
326 val: impl Into<String>,
327 ) -> PyObjectStoreResult<()> {
328 let key = key.into();
329 let old_value = self.0.insert(key.clone(), PyConfigValue::new(val.into()));
330 if old_value.is_some() {
331 return Err(GenericError::new_err(format!(
332 "Duplicate key {} provided",
333 key.0.as_ref()
334 ))
335 .into());
336 }
337
338 Ok(())
339 }
340
341 fn insert_if_not_exists(
346 &mut self,
347 key: impl Into<PyAmazonS3ConfigKey>,
348 val: impl Into<String>,
349 ) {
350 self.0.entry(key.into()).or_insert(PyConfigValue::new(val));
351 }
352}
353
354fn combine_config_kwargs(
355 config: PyAmazonS3Config,
356 kwargs: Option<PyAmazonS3Config>,
357) -> PyObjectStoreResult<PyAmazonS3Config> {
358 if let Some(kwargs) = kwargs {
359 config.merge(kwargs)
360 } else {
361 Ok(config)
362 }
363}
364
365fn parse_url(
375 config: Option<PyAmazonS3Config>,
376 parsed: &Url,
377) -> object_store::Result<PyAmazonS3Config> {
378 let host = parsed
379 .host_str()
380 .ok_or_else(|| ParseUrlError::UrlNotRecognised {
381 url: parsed.as_str().to_string(),
382 })?;
383 let mut config = config.unwrap_or_default();
384
385 match parsed.scheme() {
386 "s3" | "s3a" => {
387 config.insert_if_not_exists(AmazonS3ConfigKey::Bucket, host);
388 }
389 "https" => match host.splitn(4, '.').collect_tuple() {
390 Some(("s3", region, "amazonaws", "com")) => {
391 config.insert_if_not_exists(AmazonS3ConfigKey::Region, region);
392 let bucket = parsed.path_segments().into_iter().flatten().next();
393 if let Some(bucket) = bucket {
394 config.insert_if_not_exists(AmazonS3ConfigKey::Bucket, bucket);
395 }
396 }
397 Some((bucket, "s3", "amazonaws", "com")) => {
398 config.insert_if_not_exists(AmazonS3ConfigKey::Bucket, bucket);
399 config.insert_if_not_exists(AmazonS3ConfigKey::VirtualHostedStyleRequest, "true");
400 }
401 Some((bucket, "s3", region, "amazonaws.com")) => {
402 config.insert_if_not_exists(AmazonS3ConfigKey::Bucket, bucket);
403 config.insert_if_not_exists(AmazonS3ConfigKey::Region, region);
404 config.insert_if_not_exists(AmazonS3ConfigKey::VirtualHostedStyleRequest, "true");
405 }
406 Some((account, "r2", "cloudflarestorage", "com")) => {
407 config.insert_if_not_exists(AmazonS3ConfigKey::Region, "auto");
408 let endpoint = format!("https://{account}.r2.cloudflarestorage.com");
409 config.insert_if_not_exists(AmazonS3ConfigKey::Endpoint, endpoint);
410
411 let bucket = parsed.path_segments().into_iter().flatten().next();
412 if let Some(bucket) = bucket {
413 config.insert_if_not_exists(AmazonS3ConfigKey::Bucket, bucket);
414 }
415 }
416 _ => {
417 return Err(ParseUrlError::UrlNotRecognised {
418 url: parsed.as_str().to_string(),
419 }
420 .into())
421 }
422 },
423 scheme => {
424 let scheme = scheme.into();
425 return Err(ParseUrlError::UnknownUrlScheme { scheme }.into());
426 }
427 };
428
429 Ok(config)
430}