polars_python/lazyframe/
sink.rs

1use std::path::{Path, PathBuf};
2use std::sync::{Arc, Mutex};
3
4use polars::prelude::sync_on_close::SyncOnCloseType;
5use polars::prelude::{PartitionVariant, SinkOptions, SpecialEq};
6use polars_utils::IdxSize;
7use polars_utils::python_function::{PythonFunction, PythonObject};
8use pyo3::exceptions::PyValueError;
9use pyo3::pybacked::PyBackedStr;
10use pyo3::types::{PyAnyMethods, PyDict, PyDictMethods};
11use pyo3::{Bound, FromPyObject, PyAny, PyObject, PyResult, Python, pyclass, pymethods};
12
13use crate::expr::PyExpr;
14use crate::prelude::Wrap;
15
16#[derive(Clone)]
17pub enum SinkTarget {
18    File(polars_plan::dsl::SinkTarget),
19    Partition(PyPartitioning),
20}
21
22#[pyclass]
23#[derive(Clone)]
24pub struct PyPartitioning {
25    #[pyo3(get)]
26    pub base_path: PathBuf,
27    pub file_path_cb: Option<PythonFunction>,
28    pub variant: PartitionVariant,
29}
30
31#[cfg(feature = "pymethods")]
32#[pymethods]
33impl PyPartitioning {
34    #[staticmethod]
35    #[pyo3(signature = (base_path, file_path_cb, max_size))]
36    pub fn new_max_size(
37        base_path: PathBuf,
38        file_path_cb: Option<PyObject>,
39        max_size: IdxSize,
40    ) -> PyPartitioning {
41        let file_path_cb = file_path_cb.map(|f| PythonObject(f.into_any()));
42        PyPartitioning {
43            base_path,
44            file_path_cb,
45            variant: PartitionVariant::MaxSize(max_size),
46        }
47    }
48
49    #[staticmethod]
50    #[pyo3(signature = (base_path, file_path_cb, by, include_key))]
51    pub fn new_by_key(
52        base_path: PathBuf,
53        file_path_cb: Option<PyObject>,
54        by: Vec<PyExpr>,
55        include_key: bool,
56    ) -> PyPartitioning {
57        let file_path_cb = file_path_cb.map(|f| PythonObject(f.into_any()));
58        PyPartitioning {
59            base_path,
60            file_path_cb,
61            variant: PartitionVariant::ByKey {
62                key_exprs: by.into_iter().map(|e| e.inner).collect(),
63                include_key,
64            },
65        }
66    }
67
68    #[staticmethod]
69    #[pyo3(signature = (base_path, file_path_cb, by, include_key))]
70    pub fn new_parted(
71        base_path: PathBuf,
72        file_path_cb: Option<PyObject>,
73        by: Vec<PyExpr>,
74        include_key: bool,
75    ) -> PyPartitioning {
76        let file_path_cb = file_path_cb.map(|f| PythonObject(f.into_any()));
77        PyPartitioning {
78            base_path,
79            file_path_cb,
80            variant: PartitionVariant::Parted {
81                key_exprs: by.into_iter().map(|e| e.inner).collect(),
82                include_key,
83            },
84        }
85    }
86}
87
88impl<'py> FromPyObject<'py> for Wrap<polars_plan::dsl::SinkTarget> {
89    fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
90        if let Ok(v) = ob.extract::<PathBuf>() {
91            Ok(Wrap(polars::prelude::SinkTarget::Path(Arc::new(v))))
92        } else {
93            let writer = Python::with_gil(|py| {
94                let py_f = ob.clone();
95                PyResult::Ok(
96                    crate::file::try_get_pyfile(py, py_f, true)?
97                        .0
98                        .into_writeable(),
99                )
100            })?;
101
102            Ok(Wrap(polars_plan::prelude::SinkTarget::Dyn(SpecialEq::new(
103                Arc::new(Mutex::new(Some(writer))),
104            ))))
105        }
106    }
107}
108
109impl<'py> FromPyObject<'py> for SinkTarget {
110    fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
111        if let Ok(v) = ob.extract::<PyPartitioning>() {
112            Ok(Self::Partition(v))
113        } else {
114            Ok(Self::File(
115                <Wrap<polars_plan::dsl::SinkTarget>>::extract_bound(ob)?.0,
116            ))
117        }
118    }
119}
120
121impl SinkTarget {
122    pub fn base_path(&self) -> Option<&Path> {
123        match self {
124            Self::File(t) => match t {
125                polars::prelude::SinkTarget::Path(p) => Some(p.as_path()),
126                polars::prelude::SinkTarget::Dyn(_) => None,
127            },
128            Self::Partition(p) => Some(&p.base_path),
129        }
130    }
131}
132
133impl<'py> FromPyObject<'py> for Wrap<SyncOnCloseType> {
134    fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
135        let parsed = match &*ob.extract::<PyBackedStr>()? {
136            "none" => SyncOnCloseType::None,
137            "data" => SyncOnCloseType::Data,
138            "all" => SyncOnCloseType::All,
139            v => {
140                return Err(PyValueError::new_err(format!(
141                    "`sync_on_close` must be one of {{'none', 'data', 'all'}}, got {v}",
142                )));
143            },
144        };
145        Ok(Wrap(parsed))
146    }
147}
148
149impl<'py> FromPyObject<'py> for Wrap<SinkOptions> {
150    fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
151        let parsed = ob.extract::<pyo3::Bound<'_, PyDict>>()?;
152
153        if parsed.len() != 3 {
154            return Err(PyValueError::new_err(
155                "`sink_options` must be a dictionary with the exactly 3 field.",
156            ));
157        }
158
159        let sync_on_close =
160            PyDictMethods::get_item(&parsed, "sync_on_close")?.ok_or_else(|| {
161                PyValueError::new_err("`sink_options` must contain `sync_on_close` field")
162            })?;
163        let sync_on_close = sync_on_close.extract::<Wrap<SyncOnCloseType>>()?.0;
164
165        let maintain_order =
166            PyDictMethods::get_item(&parsed, "maintain_order")?.ok_or_else(|| {
167                PyValueError::new_err("`sink_options` must contain `maintain_order` field")
168            })?;
169        let maintain_order = maintain_order.extract::<bool>()?;
170
171        let mkdir = PyDictMethods::get_item(&parsed, "mkdir")?
172            .ok_or_else(|| PyValueError::new_err("`sink_options` must contain `mkdir` field"))?;
173        let mkdir = mkdir.extract::<bool>()?;
174
175        Ok(Wrap(SinkOptions {
176            sync_on_close,
177            maintain_order,
178            mkdir,
179        }))
180    }
181}