Skip to main content

polars_python/io/
sink_output.rs

1use polars::prelude::file_provider::{FileProviderFunction, FileProviderType, IcebergPathProvider};
2use polars::prelude::{PartitionStrategy, PlRefPath, PlSmallStr, SinkDestination, SpecialEq};
3use polars_utils::IdxSize;
4use polars_utils::python_function::PythonObject;
5use pyo3::exceptions::PyValueError;
6use pyo3::intern;
7use pyo3::prelude::*;
8use pyo3::pybacked::PyBackedStr;
9
10use crate::PyExpr;
11use crate::prelude::Wrap;
12
13pub struct PyFileSinkDestination<'py>(Bound<'py, PyAny>);
14
15impl<'a, 'py> FromPyObject<'a, 'py> for PyFileSinkDestination<'py> {
16    type Error = PyErr;
17
18    fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
19        Ok(Self(ob.to_owned()))
20    }
21}
22
23impl PyFileSinkDestination<'_> {
24    pub fn extract_file_sink_destination(&self) -> PyResult<SinkDestination> {
25        let py = self.0.py();
26
27        if let Ok(partition_by_dataclass) = self.0.getattr(intern!(py, "_pl_partition_by")) {
28            return self.extract_from_py_partition_by(partition_by_dataclass);
29        };
30
31        let v: Wrap<polars_plan::dsl::SinkTarget> = self.0.extract()?;
32
33        Ok(SinkDestination::File { target: v.0 })
34    }
35
36    fn extract_from_py_partition_by(
37        &self,
38        partition_by_dataclass: Bound<'_, PyAny>,
39    ) -> PyResult<SinkDestination> {
40        /// Extract from `PartitionByInner` dataclass.
41        #[derive(FromPyObject)]
42        struct Extract {
43            base_path: Wrap<PlRefPath>,
44            file_path_provider: Option<Py<PyAny>>,
45            key: Option<Vec<PyExpr>>,
46            include_key: Option<bool>,
47            max_rows_per_file: Option<IdxSize>,
48            approximate_bytes_per_file: u64,
49        }
50
51        let Extract {
52            base_path,
53            file_path_provider,
54            key,
55            include_key,
56            max_rows_per_file,
57            approximate_bytes_per_file,
58        } = partition_by_dataclass.extract()?;
59
60        let partition_strategy: PartitionStrategy = if let Some(partition_by) = key {
61            PartitionStrategy::Keyed {
62                keys: partition_by.into_iter().map(|x| x.inner).collect(),
63                include_keys: include_key.unwrap_or(true),
64                keys_pre_grouped: false,
65            }
66        } else {
67            // Should be validated on Python side
68            assert!(include_key.is_none());
69
70            PartitionStrategy::FileSize
71        };
72
73        let file_path_provider = if let Some(file_path_provider) = file_path_provider {
74            let py = self.0.py();
75
76            Some(
77                match file_path_provider.getattr(py, intern!(py, "pl_path_provider_id")) {
78                    Ok(v) => match &*v.extract::<PyBackedStr>(py)? {
79                        "iceberg" => {
80                            let extension: PyBackedStr = file_path_provider
81                                .getattr(py, intern!(py, "extension"))?
82                                .extract(py)?;
83
84                            FileProviderType::Iceberg(IcebergPathProvider {
85                                extension: PlSmallStr::from_str(&extension),
86                                file_part_prefix: String::new(),
87                            })
88                        },
89                        id => {
90                            return Err(PyValueError::new_err(format!(
91                                "unknown pl_path_provider_id: '{id}'"
92                            )));
93                        },
94                    },
95                    Err(_) => FileProviderType::Function(FileProviderFunction::Python(
96                        SpecialEq::new(PythonObject(file_path_provider).into()),
97                    )),
98                },
99            )
100        } else {
101            None
102        };
103
104        Ok(SinkDestination::Partitioned {
105            base_path: base_path.0,
106            file_path_provider,
107            partition_strategy,
108            max_rows_per_file: max_rows_per_file.unwrap_or(IdxSize::MAX),
109            approximate_bytes_per_file,
110        })
111    }
112}