polars_python/functions/
io.rs

1use std::io::BufReader;
2
3#[cfg(any(feature = "ipc", feature = "parquet"))]
4use polars::prelude::ArrowSchema;
5use polars::prelude::PlPathRef;
6use polars_io::cloud::CloudOptions;
7use pyo3::prelude::*;
8use pyo3::types::PyDict;
9
10use crate::conversion::Wrap;
11use crate::error::PyPolarsErr;
12use crate::file::{EitherRustPythonFile, get_either_file};
13
14#[cfg(feature = "ipc")]
15#[pyfunction]
16pub fn read_ipc_schema(py: Python<'_>, py_f: Py<PyAny>) -> PyResult<Bound<'_, PyDict>> {
17    use arrow::io::ipc::read::read_file_metadata;
18    let metadata = match get_either_file(py_f, false)? {
19        EitherRustPythonFile::Rust(r) => {
20            read_file_metadata(&mut BufReader::new(r)).map_err(PyPolarsErr::from)?
21        },
22        EitherRustPythonFile::Py(mut r) => read_file_metadata(&mut r).map_err(PyPolarsErr::from)?,
23    };
24
25    let dict = PyDict::new(py);
26    fields_to_pydict(&metadata.schema, &dict)?;
27    Ok(dict)
28}
29
30#[cfg(feature = "parquet")]
31#[pyfunction]
32pub fn read_parquet_metadata(
33    py: Python,
34    py_f: Py<PyAny>,
35    storage_options: Option<Vec<(String, String)>>,
36    credential_provider: Option<Py<PyAny>>,
37    retries: usize,
38) -> PyResult<Bound<PyDict>> {
39    use std::io::Cursor;
40
41    use polars_error::feature_gated;
42    use polars_io::pl_async::get_runtime;
43    use polars_parquet::read::read_metadata;
44    use polars_parquet::read::schema::read_custom_key_value_metadata;
45    use polars_utils::plpath::PlPath;
46
47    use crate::file::{PythonScanSourceInput, get_python_scan_source_input};
48
49    let metadata = match get_python_scan_source_input(py_f, false)? {
50        PythonScanSourceInput::Buffer(buf) => {
51            read_metadata(&mut Cursor::new(buf)).map_err(PyPolarsErr::from)?
52        },
53        PythonScanSourceInput::Path(p) => {
54            let cloud_options = parse_cloud_options(
55                Some(p.as_ref()),
56                storage_options,
57                credential_provider,
58                retries,
59            )?;
60            match p {
61                PlPath::Local(local) => {
62                    let file = polars_utils::open_file(&local).map_err(PyPolarsErr::from)?;
63                    read_metadata(&mut BufReader::new(file)).map_err(PyPolarsErr::from)?
64                },
65                PlPath::Cloud(_) => {
66                    use polars::prelude::ParquetObjectStore;
67                    use polars_error::PolarsResult;
68
69                    feature_gated!("cloud", {
70                        py.detach(|| {
71                            get_runtime().block_on(async {
72                                let mut reader = ParquetObjectStore::from_uri(
73                                    p.as_ref(),
74                                    cloud_options.as_ref(),
75                                    None,
76                                )
77                                .await?;
78                                let result = reader.get_metadata().await?;
79                                PolarsResult::Ok((**result).clone())
80                            })
81                        })
82                    })
83                    .map_err(PyPolarsErr::from)?
84                },
85            }
86        },
87        PythonScanSourceInput::File(f) => {
88            read_metadata(&mut BufReader::new(f)).map_err(PyPolarsErr::from)?
89        },
90    };
91
92    let key_value_metadata = read_custom_key_value_metadata(metadata.key_value_metadata());
93    let dict = PyDict::new(py);
94    for (key, value) in key_value_metadata.into_iter() {
95        dict.set_item(key.as_str(), value.as_str())?;
96    }
97    Ok(dict)
98}
99
100#[cfg(any(feature = "ipc", feature = "parquet"))]
101fn fields_to_pydict(schema: &ArrowSchema, dict: &Bound<'_, PyDict>) -> PyResult<()> {
102    for field in schema.iter_values() {
103        let dt = Wrap(polars::prelude::DataType::from_arrow_field(field));
104        dict.set_item(field.name.as_str(), &dt)?;
105    }
106    Ok(())
107}
108
109#[cfg(feature = "clipboard")]
110#[pyfunction]
111pub fn read_clipboard_string() -> PyResult<String> {
112    use arboard;
113    let mut clipboard =
114        arboard::Clipboard::new().map_err(|e| PyPolarsErr::Other(format!("{e}")))?;
115    let result = clipboard
116        .get_text()
117        .map_err(|e| PyPolarsErr::Other(format!("{e}")))?;
118    Ok(result)
119}
120
121#[cfg(feature = "clipboard")]
122#[pyfunction]
123pub fn write_clipboard_string(s: &str) -> PyResult<()> {
124    use arboard;
125    let mut clipboard =
126        arboard::Clipboard::new().map_err(|e| PyPolarsErr::Other(format!("{e}")))?;
127    clipboard
128        .set_text(s)
129        .map_err(|e| PyPolarsErr::Other(format!("{e}")))?;
130    Ok(())
131}
132
133pub fn parse_cloud_options<'a>(
134    first_path: Option<PlPathRef<'a>>,
135    storage_options: Option<Vec<(String, String)>>,
136    credential_provider: Option<Py<PyAny>>,
137    retries: usize,
138) -> PyResult<Option<CloudOptions>> {
139    let result = if let Some(first_path) = first_path {
140        #[cfg(feature = "cloud")]
141        {
142            use polars_io::cloud::credential_provider::PlCredentialProvider;
143
144            use crate::prelude::parse_cloud_options;
145
146            let first_path_url = first_path.to_str();
147            let cloud_options =
148                parse_cloud_options(first_path_url, storage_options.unwrap_or_default())?;
149
150            Some(
151                cloud_options
152                    .with_max_retries(retries)
153                    .with_credential_provider(
154                        credential_provider.map(PlCredentialProvider::from_python_builder),
155                    ),
156            )
157        }
158
159        #[cfg(not(feature = "cloud"))]
160        {
161            None
162        }
163    } else {
164        None
165    };
166    Ok(result)
167}