1use std::sync::Arc;
19
20use object_store::aws::{AmazonS3, AmazonS3Builder};
21use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
22use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
23use object_store::http::{HttpBuilder, HttpStore};
24use object_store::local::LocalFileSystem;
25use pyo3::exceptions::PyValueError;
26use pyo3::prelude::*;
27use url::Url;
28
29#[derive(FromPyObject)]
30pub enum StorageContexts {
31 AmazonS3(PyAmazonS3Context),
32 GoogleCloudStorage(PyGoogleCloudContext),
33 MicrosoftAzure(PyMicrosoftAzureContext),
34 LocalFileSystem(PyLocalFileSystemContext),
35 HTTP(PyHttpContext),
36}
37
38#[pyclass(
39 frozen,
40 name = "LocalFileSystem",
41 module = "datafusion.store",
42 subclass
43)]
44#[derive(Debug, Clone)]
45pub struct PyLocalFileSystemContext {
46 pub inner: Arc<LocalFileSystem>,
47}
48
49#[pymethods]
50impl PyLocalFileSystemContext {
51 #[pyo3(signature = (prefix=None))]
52 #[new]
53 fn new(prefix: Option<String>) -> Self {
54 if let Some(prefix) = prefix {
55 Self {
56 inner: Arc::new(
57 LocalFileSystem::new_with_prefix(prefix)
58 .expect("Could not create local LocalFileSystem"),
59 ),
60 }
61 } else {
62 Self {
63 inner: Arc::new(LocalFileSystem::new()),
64 }
65 }
66 }
67}
68
69#[pyclass(frozen, name = "MicrosoftAzure", module = "datafusion.store", subclass)]
70#[derive(Debug, Clone)]
71pub struct PyMicrosoftAzureContext {
72 pub inner: Arc<MicrosoftAzure>,
73 pub container_name: String,
74}
75
76#[pymethods]
77impl PyMicrosoftAzureContext {
78 #[allow(clippy::too_many_arguments)]
79 #[pyo3(signature = (container_name, account=None, access_key=None, bearer_token=None, client_id=None, client_secret=None, tenant_id=None, sas_query_pairs=None, use_emulator=None, allow_http=None))]
80 #[new]
81 fn new(
82 container_name: String,
83 account: Option<String>,
84 access_key: Option<String>,
85 bearer_token: Option<String>,
86 client_id: Option<String>,
87 client_secret: Option<String>,
88 tenant_id: Option<String>,
89 sas_query_pairs: Option<Vec<(String, String)>>,
90 use_emulator: Option<bool>,
91 allow_http: Option<bool>,
92 ) -> Self {
93 let mut builder = MicrosoftAzureBuilder::from_env().with_container_name(&container_name);
94
95 if let Some(account) = account {
96 builder = builder.with_account(account);
97 }
98
99 if let Some(access_key) = access_key {
100 builder = builder.with_access_key(access_key);
101 }
102
103 if let Some(bearer_token) = bearer_token {
104 builder = builder.with_bearer_token_authorization(bearer_token);
105 }
106
107 match (client_id, client_secret, tenant_id) {
108 (Some(client_id), Some(client_secret), Some(tenant_id)) => {
109 builder =
110 builder.with_client_secret_authorization(client_id, client_secret, tenant_id);
111 }
112 (None, None, None) => {}
113 _ => {
114 panic!("client_id, client_secret, tenat_id must be all set or all None");
115 }
116 }
117
118 if let Some(sas_query_pairs) = sas_query_pairs {
119 builder = builder.with_sas_authorization(sas_query_pairs);
120 }
121
122 if let Some(use_emulator) = use_emulator {
123 builder = builder.with_use_emulator(use_emulator);
124 }
125
126 if let Some(allow_http) = allow_http {
127 builder = builder.with_allow_http(allow_http);
128 }
129
130 Self {
131 inner: Arc::new(
132 builder
133 .build()
134 .expect("Could not create Azure Storage context"), ),
136 container_name,
137 }
138 }
139}
140
141#[pyclass(frozen, name = "GoogleCloud", module = "datafusion.store", subclass)]
142#[derive(Debug, Clone)]
143pub struct PyGoogleCloudContext {
144 pub inner: Arc<GoogleCloudStorage>,
145 pub bucket_name: String,
146}
147
148#[pymethods]
149impl PyGoogleCloudContext {
150 #[allow(clippy::too_many_arguments)]
151 #[pyo3(signature = (bucket_name, service_account_path=None))]
152 #[new]
153 fn new(bucket_name: String, service_account_path: Option<String>) -> Self {
154 let mut builder = GoogleCloudStorageBuilder::new().with_bucket_name(&bucket_name);
155
156 if let Some(credential_path) = service_account_path {
157 builder = builder.with_service_account_path(credential_path);
158 }
159
160 Self {
161 inner: Arc::new(
162 builder
163 .build()
164 .expect("Could not create Google Cloud Storage"),
165 ),
166 bucket_name,
167 }
168 }
169}
170
171#[pyclass(frozen, name = "AmazonS3", module = "datafusion.store", subclass)]
172#[derive(Debug, Clone)]
173pub struct PyAmazonS3Context {
174 pub inner: Arc<AmazonS3>,
175 pub bucket_name: String,
176}
177
178#[pymethods]
179impl PyAmazonS3Context {
180 #[allow(clippy::too_many_arguments)]
181 #[pyo3(signature = (bucket_name, region=None, access_key_id=None, secret_access_key=None, session_token=None, endpoint=None, allow_http=false, imdsv1_fallback=false))]
182 #[new]
183 fn new(
184 bucket_name: String,
185 region: Option<String>,
186 access_key_id: Option<String>,
187 secret_access_key: Option<String>,
188 session_token: Option<String>,
189 endpoint: Option<String>,
190 allow_http: bool,
192 imdsv1_fallback: bool,
193 ) -> Self {
194 let mut builder = AmazonS3Builder::from_env();
196
197 if let Some(region) = region {
198 builder = builder.with_region(region);
199 }
200
201 if let Some(access_key_id) = access_key_id {
202 builder = builder.with_access_key_id(access_key_id);
203 };
204
205 if let Some(secret_access_key) = secret_access_key {
206 builder = builder.with_secret_access_key(secret_access_key);
207 };
208
209 if let Some(session_token) = session_token {
210 builder = builder.with_token(session_token);
211 }
212
213 if let Some(endpoint) = endpoint {
214 builder = builder.with_endpoint(endpoint);
215 };
216
217 if imdsv1_fallback {
218 builder = builder.with_imdsv1_fallback();
219 };
220
221 let store = builder
222 .with_bucket_name(bucket_name.clone())
223 .with_allow_http(allow_http)
225 .build()
226 .expect("failed to build AmazonS3");
227
228 Self {
229 inner: Arc::new(store),
230 bucket_name,
231 }
232 }
233}
234
235#[pyclass(frozen, name = "Http", module = "datafusion.store", subclass)]
236#[derive(Debug, Clone)]
237pub struct PyHttpContext {
238 pub url: String,
239 pub store: Arc<HttpStore>,
240}
241
242#[pymethods]
243impl PyHttpContext {
244 #[new]
245 fn new(url: String) -> PyResult<Self> {
246 let store = match Url::parse(url.as_str()) {
247 Ok(url) => HttpBuilder::new()
248 .with_url(url.origin().ascii_serialization())
249 .build(),
250 Err(_) => HttpBuilder::new().build(),
251 }
252 .map_err(|e| PyValueError::new_err(format!("Error: {:?}", e.to_string())))?;
253
254 Ok(Self {
255 url,
256 store: Arc::new(store),
257 })
258 }
259}
260
261pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
262 m.add_class::<PyAmazonS3Context>()?;
263 m.add_class::<PyMicrosoftAzureContext>()?;
264 m.add_class::<PyGoogleCloudContext>()?;
265 m.add_class::<PyLocalFileSystemContext>()?;
266 m.add_class::<PyHttpContext>()?;
267 Ok(())
268}