polars_python/lazyframe/
sink.rs1use std::path::{Path, PathBuf};
2use std::sync::{Arc, Mutex};
3
4use polars::prelude::sync_on_close::SyncOnCloseType;
5use polars::prelude::{PartitionVariant, SinkOptions, SpecialEq};
6use polars_utils::IdxSize;
7use polars_utils::python_function::{PythonFunction, PythonObject};
8use pyo3::exceptions::PyValueError;
9use pyo3::pybacked::PyBackedStr;
10use pyo3::types::{PyAnyMethods, PyDict, PyDictMethods};
11use pyo3::{Bound, FromPyObject, PyAny, PyObject, PyResult, Python, pyclass, pymethods};
12
13use crate::expr::PyExpr;
14use crate::prelude::Wrap;
15
16#[derive(Clone)]
17pub enum SinkTarget {
18 File(polars_plan::dsl::SinkTarget),
19 Partition(PyPartitioning),
20}
21
22#[pyclass]
23#[derive(Clone)]
24pub struct PyPartitioning {
25 #[pyo3(get)]
26 pub base_path: PathBuf,
27 pub file_path_cb: Option<PythonFunction>,
28 pub variant: PartitionVariant,
29}
30
31#[cfg(feature = "pymethods")]
32#[pymethods]
33impl PyPartitioning {
34 #[staticmethod]
35 #[pyo3(signature = (base_path, file_path_cb, max_size))]
36 pub fn new_max_size(
37 base_path: PathBuf,
38 file_path_cb: Option<PyObject>,
39 max_size: IdxSize,
40 ) -> PyPartitioning {
41 let file_path_cb = file_path_cb.map(|f| PythonObject(f.into_any()));
42 PyPartitioning {
43 base_path,
44 file_path_cb,
45 variant: PartitionVariant::MaxSize(max_size),
46 }
47 }
48
49 #[staticmethod]
50 #[pyo3(signature = (base_path, file_path_cb, by, include_key))]
51 pub fn new_by_key(
52 base_path: PathBuf,
53 file_path_cb: Option<PyObject>,
54 by: Vec<PyExpr>,
55 include_key: bool,
56 ) -> PyPartitioning {
57 let file_path_cb = file_path_cb.map(|f| PythonObject(f.into_any()));
58 PyPartitioning {
59 base_path,
60 file_path_cb,
61 variant: PartitionVariant::ByKey {
62 key_exprs: by.into_iter().map(|e| e.inner).collect(),
63 include_key,
64 },
65 }
66 }
67
68 #[staticmethod]
69 #[pyo3(signature = (base_path, file_path_cb, by, include_key))]
70 pub fn new_parted(
71 base_path: PathBuf,
72 file_path_cb: Option<PyObject>,
73 by: Vec<PyExpr>,
74 include_key: bool,
75 ) -> PyPartitioning {
76 let file_path_cb = file_path_cb.map(|f| PythonObject(f.into_any()));
77 PyPartitioning {
78 base_path,
79 file_path_cb,
80 variant: PartitionVariant::Parted {
81 key_exprs: by.into_iter().map(|e| e.inner).collect(),
82 include_key,
83 },
84 }
85 }
86}
87
88impl<'py> FromPyObject<'py> for Wrap<polars_plan::dsl::SinkTarget> {
89 fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
90 if let Ok(v) = ob.extract::<PathBuf>() {
91 Ok(Wrap(polars::prelude::SinkTarget::Path(Arc::new(v))))
92 } else {
93 let writer = Python::with_gil(|py| {
94 let py_f = ob.clone();
95 PyResult::Ok(
96 crate::file::try_get_pyfile(py, py_f, true)?
97 .0
98 .into_writeable(),
99 )
100 })?;
101
102 Ok(Wrap(polars_plan::prelude::SinkTarget::Dyn(SpecialEq::new(
103 Arc::new(Mutex::new(Some(writer))),
104 ))))
105 }
106 }
107}
108
109impl<'py> FromPyObject<'py> for SinkTarget {
110 fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
111 if let Ok(v) = ob.extract::<PyPartitioning>() {
112 Ok(Self::Partition(v))
113 } else {
114 Ok(Self::File(
115 <Wrap<polars_plan::dsl::SinkTarget>>::extract_bound(ob)?.0,
116 ))
117 }
118 }
119}
120
121impl SinkTarget {
122 pub fn base_path(&self) -> Option<&Path> {
123 match self {
124 Self::File(t) => match t {
125 polars::prelude::SinkTarget::Path(p) => Some(p.as_path()),
126 polars::prelude::SinkTarget::Dyn(_) => None,
127 },
128 Self::Partition(p) => Some(&p.base_path),
129 }
130 }
131}
132
133impl<'py> FromPyObject<'py> for Wrap<SyncOnCloseType> {
134 fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
135 let parsed = match &*ob.extract::<PyBackedStr>()? {
136 "none" => SyncOnCloseType::None,
137 "data" => SyncOnCloseType::Data,
138 "all" => SyncOnCloseType::All,
139 v => {
140 return Err(PyValueError::new_err(format!(
141 "`sync_on_close` must be one of {{'none', 'data', 'all'}}, got {v}",
142 )));
143 },
144 };
145 Ok(Wrap(parsed))
146 }
147}
148
149impl<'py> FromPyObject<'py> for Wrap<SinkOptions> {
150 fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
151 let parsed = ob.extract::<pyo3::Bound<'_, PyDict>>()?;
152
153 if parsed.len() != 3 {
154 return Err(PyValueError::new_err(
155 "`sink_options` must be a dictionary with the exactly 3 field.",
156 ));
157 }
158
159 let sync_on_close =
160 PyDictMethods::get_item(&parsed, "sync_on_close")?.ok_or_else(|| {
161 PyValueError::new_err("`sink_options` must contain `sync_on_close` field")
162 })?;
163 let sync_on_close = sync_on_close.extract::<Wrap<SyncOnCloseType>>()?.0;
164
165 let maintain_order =
166 PyDictMethods::get_item(&parsed, "maintain_order")?.ok_or_else(|| {
167 PyValueError::new_err("`sink_options` must contain `maintain_order` field")
168 })?;
169 let maintain_order = maintain_order.extract::<bool>()?;
170
171 let mkdir = PyDictMethods::get_item(&parsed, "mkdir")?
172 .ok_or_else(|| PyValueError::new_err("`sink_options` must contain `mkdir` field"))?;
173 let mkdir = mkdir.extract::<bool>()?;
174
175 Ok(Wrap(SinkOptions {
176 sync_on_close,
177 maintain_order,
178 mkdir,
179 }))
180 }
181}