polars_python/io/
sink_output.rs1use polars::prelude::file_provider::{FileProviderFunction, FileProviderType, IcebergPathProvider};
2use polars::prelude::{PartitionStrategy, PlRefPath, PlSmallStr, SinkDestination, SpecialEq};
3use polars_utils::IdxSize;
4use polars_utils::python_function::PythonObject;
5use pyo3::exceptions::PyValueError;
6use pyo3::intern;
7use pyo3::prelude::*;
8use pyo3::pybacked::PyBackedStr;
9
10use crate::PyExpr;
11use crate::prelude::Wrap;
12
13pub struct PyFileSinkDestination<'py>(Bound<'py, PyAny>);
14
15impl<'a, 'py> FromPyObject<'a, 'py> for PyFileSinkDestination<'py> {
16 type Error = PyErr;
17
18 fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
19 Ok(Self(ob.to_owned()))
20 }
21}
22
23impl PyFileSinkDestination<'_> {
24 pub fn extract_file_sink_destination(&self) -> PyResult<SinkDestination> {
25 let py = self.0.py();
26
27 if let Ok(partition_by_dataclass) = self.0.getattr(intern!(py, "_pl_partition_by")) {
28 return self.extract_from_py_partition_by(partition_by_dataclass);
29 };
30
31 let v: Wrap<polars_plan::dsl::SinkTarget> = self.0.extract()?;
32
33 Ok(SinkDestination::File { target: v.0 })
34 }
35
36 fn extract_from_py_partition_by(
37 &self,
38 partition_by_dataclass: Bound<'_, PyAny>,
39 ) -> PyResult<SinkDestination> {
40 #[derive(FromPyObject)]
42 struct Extract {
43 base_path: Wrap<PlRefPath>,
44 file_path_provider: Option<Py<PyAny>>,
45 key: Option<Vec<PyExpr>>,
46 include_key: Option<bool>,
47 max_rows_per_file: Option<IdxSize>,
48 approximate_bytes_per_file: u64,
49 }
50
51 let Extract {
52 base_path,
53 file_path_provider,
54 key,
55 include_key,
56 max_rows_per_file,
57 approximate_bytes_per_file,
58 } = partition_by_dataclass.extract()?;
59
60 let partition_strategy: PartitionStrategy = if let Some(partition_by) = key {
61 PartitionStrategy::Keyed {
62 keys: partition_by.into_iter().map(|x| x.inner).collect(),
63 include_keys: include_key.unwrap_or(true),
64 keys_pre_grouped: false,
65 }
66 } else {
67 assert!(include_key.is_none());
69
70 PartitionStrategy::FileSize
71 };
72
73 let file_path_provider = if let Some(file_path_provider) = file_path_provider {
74 let py = self.0.py();
75
76 Some(
77 match file_path_provider.getattr(py, intern!(py, "pl_path_provider_id")) {
78 Ok(v) => match &*v.extract::<PyBackedStr>(py)? {
79 "iceberg" => {
80 let extension: PyBackedStr = file_path_provider
81 .getattr(py, intern!(py, "extension"))?
82 .extract(py)?;
83
84 FileProviderType::Iceberg(IcebergPathProvider {
85 extension: PlSmallStr::from_str(&extension),
86 file_part_prefix: String::new(),
87 })
88 },
89 id => {
90 return Err(PyValueError::new_err(format!(
91 "unknown pl_path_provider_id: '{id}'"
92 )));
93 },
94 },
95 Err(_) => FileProviderType::Function(FileProviderFunction::Python(
96 SpecialEq::new(PythonObject(file_path_provider).into()),
97 )),
98 },
99 )
100 } else {
101 None
102 };
103
104 Ok(SinkDestination::Partitioned {
105 base_path: base_path.0,
106 file_path_provider,
107 partition_strategy,
108 max_rows_per_file: max_rows_per_file.unwrap_or(IdxSize::MAX),
109 approximate_bytes_per_file,
110 })
111 }
112}