polars_python/io/
cloud_options.rs1use std::time::Duration;
2
3use polars::prelude::CloudScheme;
4use polars_core::config::verbose_print_sensitive;
5use polars_io::cloud::{CloudOptions, CloudRetryConfig};
6use polars_utils::total_ord::TotalOrdWrap;
7use pyo3::exceptions::PyValueError;
8use pyo3::intern;
9use pyo3::prelude::*;
10use pyo3::pybacked::PyBackedStr;
11use pyo3::types::PyDict;
12
13use crate::utils::to_py_err;
14
15pub struct OptPyCloudOptions<'py>(Bound<'py, PyAny>);
17
18impl<'a, 'py> FromPyObject<'a, 'py> for OptPyCloudOptions<'py> {
19 type Error = PyErr;
20
21 fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
22 Ok(Self(ob.to_owned()))
23 }
24}
25
26impl OptPyCloudOptions<'_> {
27 pub fn extract_opt_cloud_options(
28 &self,
29 cloud_scheme: Option<CloudScheme>,
30 credential_provider: Option<Py<PyAny>>,
31 ) -> PyResult<Option<CloudOptions>> {
32 if self.0.is_none() && credential_provider.is_none() {
33 return Ok(None);
34 }
35
36 let py = self.0.py();
37
38 let mut storage_options: Vec<(PyBackedStr, String)> = vec![];
39 let mut file_cache_ttl: u64 = 2;
40 let mut retry_config = CloudRetryConfig::default();
41
42 let storage_options_dict: Option<Bound<'_, PyDict>> = self.0.extract()?;
43
44 if let Some(storage_options_dict) = storage_options_dict {
45 storage_options.reserve(
46 storage_options_dict
47 .call_method0(intern!(py, "__len__"))?
48 .extract()?,
49 );
50
51 for v in storage_options_dict
52 .call_method0(intern!(py, "items"))?
53 .try_iter()?
54 {
55 let (key, value): (PyBackedStr, Bound<'_, PyAny>) = v?.extract()?;
56
57 macro_rules! expected_type {
58 ($key_name:expr, $type_name:expr) => {{
59 |_| {
60 let key_name = $key_name;
61 let type_name = $type_name;
62 PyValueError::new_err(format!(
63 "invalid value for '{key_name}': '{value}' (expected {type_name})"
64 ))
65 }
66 }};
67 }
68
69 match &*key {
70 "file_cache_ttl" => {
71 file_cache_ttl = value
72 .extract()
73 .map_err(expected_type!("file_cache_ttl", "int"))?;
74 },
75 "max_retries" => {
76 retry_config.max_retries = value
77 .extract()
78 .map_err(expected_type!("max_retries", "int"))?;
79 },
80 "retry_timeout_ms" => {
81 retry_config.retry_timeout = Some(Duration::from_millis(
82 value
83 .extract()
84 .map_err(expected_type!("retry_timeout", "int"))?,
85 ));
86 },
87 "retry_init_backoff_ms" => {
88 retry_config.retry_init_backoff = Some(Duration::from_millis(
89 value
90 .extract()
91 .map_err(expected_type!("retry_init_backoff", "int"))?,
92 ));
93 },
94 "retry_max_backoff_ms" => {
95 retry_config.retry_max_backoff = Some(Duration::from_millis(
96 value
97 .extract()
98 .map_err(expected_type!("retry_max_backoff", "int"))?,
99 ));
100 },
101 "retry_base_multiplier" => {
102 retry_config.retry_base_multiplier = Some(TotalOrdWrap(
103 value
104 .extract()
105 .map_err(expected_type!("retry_base_multiplier", "float"))?,
106 ));
107 },
108 _ => {
109 let value: String = value.extract().map_err(expected_type!(&key, "str"))?;
110 storage_options.push((key, value))
111 },
112 }
113 }
114 }
115
116 let cloud_options = CloudOptions::from_untyped_config(cloud_scheme, storage_options)
117 .map_err(to_py_err)?
118 .with_retry_config(retry_config);
119
120 #[cfg(feature = "cloud")]
121 let mut cloud_options =
122 cloud_options.with_credential_provider(credential_provider.map(
123 polars_io::cloud::credential_provider::PlCredentialProvider::from_python_builder,
124 ));
125
126 #[cfg(feature = "cloud")]
127 if file_cache_ttl > 0 {
128 cloud_options.file_cache_ttl = file_cache_ttl;
129 }
130 verbose_print_sensitive(|| format!("extracted cloud_options: {:?}", &cloud_options));
131
132 Ok(Some(cloud_options))
133 }
134}