Skip to main content

polars_python/io/
cloud_options.rs

1use std::time::Duration;
2
3use polars::prelude::CloudScheme;
4use polars_core::config::verbose_print_sensitive;
5use polars_io::cloud::{CloudOptions, CloudRetryConfig};
6use polars_utils::total_ord::TotalOrdWrap;
7use pyo3::exceptions::PyValueError;
8use pyo3::intern;
9use pyo3::prelude::*;
10use pyo3::pybacked::PyBackedStr;
11use pyo3::types::PyDict;
12
13use crate::utils::to_py_err;
14
15/// Interface to `StorageOptionsDict | None` on the Python side
16pub struct OptPyCloudOptions<'py>(Bound<'py, PyAny>);
17
18impl<'a, 'py> FromPyObject<'a, 'py> for OptPyCloudOptions<'py> {
19    type Error = PyErr;
20
21    fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
22        Ok(Self(ob.to_owned()))
23    }
24}
25
26impl OptPyCloudOptions<'_> {
27    pub fn extract_opt_cloud_options(
28        &self,
29        cloud_scheme: Option<CloudScheme>,
30        credential_provider: Option<Py<PyAny>>,
31    ) -> PyResult<Option<CloudOptions>> {
32        if self.0.is_none() && credential_provider.is_none() {
33            return Ok(None);
34        }
35
36        let py = self.0.py();
37
38        let mut storage_options: Vec<(PyBackedStr, String)> = vec![];
39        let mut file_cache_ttl: u64 = 2;
40        let mut retry_config = CloudRetryConfig::default();
41
42        let storage_options_dict: Option<Bound<'_, PyDict>> = self.0.extract()?;
43
44        if let Some(storage_options_dict) = storage_options_dict {
45            storage_options.reserve(
46                storage_options_dict
47                    .call_method0(intern!(py, "__len__"))?
48                    .extract()?,
49            );
50
51            for v in storage_options_dict
52                .call_method0(intern!(py, "items"))?
53                .try_iter()?
54            {
55                let (key, value): (PyBackedStr, Bound<'_, PyAny>) = v?.extract()?;
56
57                macro_rules! expected_type {
58                    ($key_name:expr, $type_name:expr) => {{
59                        |_| {
60                            let key_name = $key_name;
61                            let type_name = $type_name;
62                            PyValueError::new_err(format!(
63                                "invalid value for '{key_name}': '{value}' (expected {type_name})"
64                            ))
65                        }
66                    }};
67                }
68
69                match &*key {
70                    "file_cache_ttl" => {
71                        file_cache_ttl = value
72                            .extract()
73                            .map_err(expected_type!("file_cache_ttl", "int"))?;
74                    },
75                    "max_retries" => {
76                        retry_config.max_retries = value
77                            .extract()
78                            .map_err(expected_type!("max_retries", "int"))?;
79                    },
80                    "retry_timeout_ms" => {
81                        retry_config.retry_timeout = Some(Duration::from_millis(
82                            value
83                                .extract()
84                                .map_err(expected_type!("retry_timeout", "int"))?,
85                        ));
86                    },
87                    "retry_init_backoff_ms" => {
88                        retry_config.retry_init_backoff = Some(Duration::from_millis(
89                            value
90                                .extract()
91                                .map_err(expected_type!("retry_init_backoff", "int"))?,
92                        ));
93                    },
94                    "retry_max_backoff_ms" => {
95                        retry_config.retry_max_backoff = Some(Duration::from_millis(
96                            value
97                                .extract()
98                                .map_err(expected_type!("retry_max_backoff", "int"))?,
99                        ));
100                    },
101                    "retry_base_multiplier" => {
102                        retry_config.retry_base_multiplier = Some(TotalOrdWrap(
103                            value
104                                .extract()
105                                .map_err(expected_type!("retry_base_multiplier", "float"))?,
106                        ));
107                    },
108                    _ => {
109                        let value: String = value.extract().map_err(expected_type!(&key, "str"))?;
110                        storage_options.push((key, value))
111                    },
112                }
113            }
114        }
115
116        let cloud_options = CloudOptions::from_untyped_config(cloud_scheme, storage_options)
117            .map_err(to_py_err)?
118            .with_retry_config(retry_config);
119
120        #[cfg(feature = "cloud")]
121        let mut cloud_options =
122            cloud_options.with_credential_provider(credential_provider.map(
123                polars_io::cloud::credential_provider::PlCredentialProvider::from_python_builder,
124            ));
125
126        #[cfg(feature = "cloud")]
127        if file_cache_ttl > 0 {
128            cloud_options.file_cache_ttl = file_cache_ttl;
129        }
130        verbose_print_sensitive(|| format!("extracted cloud_options: {:?}", &cloud_options));
131
132        Ok(Some(cloud_options))
133    }
134}