Skip to main content

polars_python/io/
sink_output.rs

1use polars::prelude::file_provider::{FileProviderFunction, FileProviderType};
2use polars::prelude::{PartitionStrategy, PlRefPath, SinkDestination, SpecialEq};
3use polars_utils::IdxSize;
4use polars_utils::python_function::PythonObject;
5use pyo3::intern;
6use pyo3::prelude::*;
7
8use crate::PyExpr;
9use crate::prelude::Wrap;
10
11pub struct PyFileSinkDestination<'py>(Bound<'py, PyAny>);
12
13impl<'a, 'py> FromPyObject<'a, 'py> for PyFileSinkDestination<'py> {
14    type Error = PyErr;
15
16    fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
17        Ok(Self(ob.to_owned()))
18    }
19}
20
21impl PyFileSinkDestination<'_> {
22    pub fn extract_file_sink_destination(&self) -> PyResult<SinkDestination> {
23        let py = self.0.py();
24
25        if let Ok(partition_by_dataclass) = self.0.getattr(intern!(py, "_pl_partition_by")) {
26            return self.extract_from_py_partition_by(partition_by_dataclass);
27        };
28
29        let v: Wrap<polars_plan::dsl::SinkTarget> = self.0.extract()?;
30
31        Ok(SinkDestination::File { target: v.0 })
32    }
33
34    fn extract_from_py_partition_by(
35        &self,
36        partition_by_dataclass: Bound<'_, PyAny>,
37    ) -> PyResult<SinkDestination> {
38        /// Extract from `PartitionByInner` dataclass.
39        #[derive(FromPyObject)]
40        struct Extract {
41            base_path: Wrap<PlRefPath>,
42            file_path_provider: Option<Py<PyAny>>,
43            key: Option<Vec<PyExpr>>,
44            include_key: Option<bool>,
45            max_rows_per_file: Option<IdxSize>,
46            approximate_bytes_per_file: u64,
47        }
48
49        let Extract {
50            base_path,
51            file_path_provider,
52            key,
53            include_key,
54            max_rows_per_file,
55            approximate_bytes_per_file,
56        } = partition_by_dataclass.extract()?;
57
58        let partition_strategy: PartitionStrategy = if let Some(partition_by) = key {
59            PartitionStrategy::Keyed {
60                keys: partition_by.into_iter().map(|x| x.inner).collect(),
61                include_keys: include_key.unwrap_or(true),
62                keys_pre_grouped: false,
63            }
64        } else {
65            // Should be validated on Python side
66            assert!(include_key.is_none());
67
68            PartitionStrategy::FileSize
69        };
70
71        Ok(SinkDestination::Partitioned {
72            base_path: base_path.0,
73            file_path_provider: file_path_provider.map(|x| {
74                FileProviderType::Function(FileProviderFunction::Python(SpecialEq::new(
75                    PythonObject(x).into(),
76                )))
77            }),
78            partition_strategy,
79            max_rows_per_file: max_rows_per_file.unwrap_or(IdxSize::MAX),
80            approximate_bytes_per_file,
81        })
82    }
83}