polars_python/io/
sink_output.rs1use polars::prelude::file_provider::{FileProviderFunction, FileProviderType};
2use polars::prelude::{PartitionStrategy, PlRefPath, SinkDestination, SpecialEq};
3use polars_utils::IdxSize;
4use polars_utils::python_function::PythonObject;
5use pyo3::intern;
6use pyo3::prelude::*;
7
8use crate::PyExpr;
9use crate::prelude::Wrap;
10
11pub struct PyFileSinkDestination<'py>(Bound<'py, PyAny>);
12
13impl<'a, 'py> FromPyObject<'a, 'py> for PyFileSinkDestination<'py> {
14 type Error = PyErr;
15
16 fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
17 Ok(Self(ob.to_owned()))
18 }
19}
20
21impl PyFileSinkDestination<'_> {
22 pub fn extract_file_sink_destination(&self) -> PyResult<SinkDestination> {
23 let py = self.0.py();
24
25 if let Ok(partition_by_dataclass) = self.0.getattr(intern!(py, "_pl_partition_by")) {
26 return self.extract_from_py_partition_by(partition_by_dataclass);
27 };
28
29 let v: Wrap<polars_plan::dsl::SinkTarget> = self.0.extract()?;
30
31 Ok(SinkDestination::File { target: v.0 })
32 }
33
34 fn extract_from_py_partition_by(
35 &self,
36 partition_by_dataclass: Bound<'_, PyAny>,
37 ) -> PyResult<SinkDestination> {
38 #[derive(FromPyObject)]
40 struct Extract {
41 base_path: Wrap<PlRefPath>,
42 file_path_provider: Option<Py<PyAny>>,
43 key: Option<Vec<PyExpr>>,
44 include_key: Option<bool>,
45 max_rows_per_file: Option<IdxSize>,
46 approximate_bytes_per_file: u64,
47 }
48
49 let Extract {
50 base_path,
51 file_path_provider,
52 key,
53 include_key,
54 max_rows_per_file,
55 approximate_bytes_per_file,
56 } = partition_by_dataclass.extract()?;
57
58 let partition_strategy: PartitionStrategy = if let Some(partition_by) = key {
59 PartitionStrategy::Keyed {
60 keys: partition_by.into_iter().map(|x| x.inner).collect(),
61 include_keys: include_key.unwrap_or(true),
62 keys_pre_grouped: false,
63 }
64 } else {
65 assert!(include_key.is_none());
67
68 PartitionStrategy::FileSize
69 };
70
71 Ok(SinkDestination::Partitioned {
72 base_path: base_path.0,
73 file_path_provider: file_path_provider.map(|x| {
74 FileProviderType::Function(FileProviderFunction::Python(SpecialEq::new(
75 PythonObject(x).into(),
76 )))
77 }),
78 partition_strategy,
79 max_rows_per_file: max_rows_per_file.unwrap_or(IdxSize::MAX),
80 approximate_bytes_per_file,
81 })
82 }
83}