datafusion_python/
store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use object_store::aws::{AmazonS3, AmazonS3Builder};
21use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
22use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
23use object_store::http::{HttpBuilder, HttpStore};
24use object_store::local::LocalFileSystem;
25use pyo3::exceptions::PyValueError;
26use pyo3::prelude::*;
27use url::Url;
28
29#[derive(FromPyObject)]
30pub enum StorageContexts {
31    AmazonS3(PyAmazonS3Context),
32    GoogleCloudStorage(PyGoogleCloudContext),
33    MicrosoftAzure(PyMicrosoftAzureContext),
34    LocalFileSystem(PyLocalFileSystemContext),
35    HTTP(PyHttpContext),
36}
37
38#[pyclass(
39    frozen,
40    name = "LocalFileSystem",
41    module = "datafusion.store",
42    subclass
43)]
44#[derive(Debug, Clone)]
45pub struct PyLocalFileSystemContext {
46    pub inner: Arc<LocalFileSystem>,
47}
48
49#[pymethods]
50impl PyLocalFileSystemContext {
51    #[pyo3(signature = (prefix=None))]
52    #[new]
53    fn new(prefix: Option<String>) -> Self {
54        if let Some(prefix) = prefix {
55            Self {
56                inner: Arc::new(
57                    LocalFileSystem::new_with_prefix(prefix)
58                        .expect("Could not create local LocalFileSystem"),
59                ),
60            }
61        } else {
62            Self {
63                inner: Arc::new(LocalFileSystem::new()),
64            }
65        }
66    }
67}
68
69#[pyclass(frozen, name = "MicrosoftAzure", module = "datafusion.store", subclass)]
70#[derive(Debug, Clone)]
71pub struct PyMicrosoftAzureContext {
72    pub inner: Arc<MicrosoftAzure>,
73    pub container_name: String,
74}
75
76#[pymethods]
77impl PyMicrosoftAzureContext {
78    #[allow(clippy::too_many_arguments)]
79    #[pyo3(signature = (container_name, account=None, access_key=None, bearer_token=None, client_id=None, client_secret=None, tenant_id=None, sas_query_pairs=None, use_emulator=None, allow_http=None))]
80    #[new]
81    fn new(
82        container_name: String,
83        account: Option<String>,
84        access_key: Option<String>,
85        bearer_token: Option<String>,
86        client_id: Option<String>,
87        client_secret: Option<String>,
88        tenant_id: Option<String>,
89        sas_query_pairs: Option<Vec<(String, String)>>,
90        use_emulator: Option<bool>,
91        allow_http: Option<bool>,
92    ) -> Self {
93        let mut builder = MicrosoftAzureBuilder::from_env().with_container_name(&container_name);
94
95        if let Some(account) = account {
96            builder = builder.with_account(account);
97        }
98
99        if let Some(access_key) = access_key {
100            builder = builder.with_access_key(access_key);
101        }
102
103        if let Some(bearer_token) = bearer_token {
104            builder = builder.with_bearer_token_authorization(bearer_token);
105        }
106
107        match (client_id, client_secret, tenant_id) {
108            (Some(client_id), Some(client_secret), Some(tenant_id)) => {
109                builder =
110                    builder.with_client_secret_authorization(client_id, client_secret, tenant_id);
111            }
112            (None, None, None) => {}
113            _ => {
114                panic!("client_id, client_secret, tenat_id must be all set or all None");
115            }
116        }
117
118        if let Some(sas_query_pairs) = sas_query_pairs {
119            builder = builder.with_sas_authorization(sas_query_pairs);
120        }
121
122        if let Some(use_emulator) = use_emulator {
123            builder = builder.with_use_emulator(use_emulator);
124        }
125
126        if let Some(allow_http) = allow_http {
127            builder = builder.with_allow_http(allow_http);
128        }
129
130        Self {
131            inner: Arc::new(
132                builder
133                    .build()
134                    .expect("Could not create Azure Storage context"), //TODO: change these to PyErr
135            ),
136            container_name,
137        }
138    }
139}
140
141#[pyclass(frozen, name = "GoogleCloud", module = "datafusion.store", subclass)]
142#[derive(Debug, Clone)]
143pub struct PyGoogleCloudContext {
144    pub inner: Arc<GoogleCloudStorage>,
145    pub bucket_name: String,
146}
147
148#[pymethods]
149impl PyGoogleCloudContext {
150    #[allow(clippy::too_many_arguments)]
151    #[pyo3(signature = (bucket_name, service_account_path=None))]
152    #[new]
153    fn new(bucket_name: String, service_account_path: Option<String>) -> Self {
154        let mut builder = GoogleCloudStorageBuilder::new().with_bucket_name(&bucket_name);
155
156        if let Some(credential_path) = service_account_path {
157            builder = builder.with_service_account_path(credential_path);
158        }
159
160        Self {
161            inner: Arc::new(
162                builder
163                    .build()
164                    .expect("Could not create Google Cloud Storage"),
165            ),
166            bucket_name,
167        }
168    }
169}
170
171#[pyclass(frozen, name = "AmazonS3", module = "datafusion.store", subclass)]
172#[derive(Debug, Clone)]
173pub struct PyAmazonS3Context {
174    pub inner: Arc<AmazonS3>,
175    pub bucket_name: String,
176}
177
178#[pymethods]
179impl PyAmazonS3Context {
180    #[allow(clippy::too_many_arguments)]
181    #[pyo3(signature = (bucket_name, region=None, access_key_id=None, secret_access_key=None, session_token=None, endpoint=None, allow_http=false, imdsv1_fallback=false))]
182    #[new]
183    fn new(
184        bucket_name: String,
185        region: Option<String>,
186        access_key_id: Option<String>,
187        secret_access_key: Option<String>,
188        session_token: Option<String>,
189        endpoint: Option<String>,
190        //retry_config: RetryConfig,
191        allow_http: bool,
192        imdsv1_fallback: bool,
193    ) -> Self {
194        // start w/ the options that come directly from the environment
195        let mut builder = AmazonS3Builder::from_env();
196
197        if let Some(region) = region {
198            builder = builder.with_region(region);
199        }
200
201        if let Some(access_key_id) = access_key_id {
202            builder = builder.with_access_key_id(access_key_id);
203        };
204
205        if let Some(secret_access_key) = secret_access_key {
206            builder = builder.with_secret_access_key(secret_access_key);
207        };
208
209        if let Some(session_token) = session_token {
210            builder = builder.with_token(session_token);
211        }
212
213        if let Some(endpoint) = endpoint {
214            builder = builder.with_endpoint(endpoint);
215        };
216
217        if imdsv1_fallback {
218            builder = builder.with_imdsv1_fallback();
219        };
220
221        let store = builder
222            .with_bucket_name(bucket_name.clone())
223            //.with_retry_config(retry_config) #TODO: add later
224            .with_allow_http(allow_http)
225            .build()
226            .expect("failed to build AmazonS3");
227
228        Self {
229            inner: Arc::new(store),
230            bucket_name,
231        }
232    }
233}
234
235#[pyclass(frozen, name = "Http", module = "datafusion.store", subclass)]
236#[derive(Debug, Clone)]
237pub struct PyHttpContext {
238    pub url: String,
239    pub store: Arc<HttpStore>,
240}
241
242#[pymethods]
243impl PyHttpContext {
244    #[new]
245    fn new(url: String) -> PyResult<Self> {
246        let store = match Url::parse(url.as_str()) {
247            Ok(url) => HttpBuilder::new()
248                .with_url(url.origin().ascii_serialization())
249                .build(),
250            Err(_) => HttpBuilder::new().build(),
251        }
252        .map_err(|e| PyValueError::new_err(format!("Error: {:?}", e.to_string())))?;
253
254        Ok(Self {
255            url,
256            store: Arc::new(store),
257        })
258    }
259}
260
261pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
262    m.add_class::<PyAmazonS3Context>()?;
263    m.add_class::<PyMicrosoftAzureContext>()?;
264    m.add_class::<PyGoogleCloudContext>()?;
265    m.add_class::<PyLocalFileSystemContext>()?;
266    m.add_class::<PyHttpContext>()?;
267    Ok(())
268}