Skip to main content

polars_python/lazyframe/
sink.rs

1use std::sync::{Arc, Mutex};
2
3use polars::prelude::file_provider::FileProviderReturn;
4use polars::prelude::sync_on_close::SyncOnCloseType;
5use polars::prelude::{PlRefPath, SpecialEq};
6use polars_error::polars_err;
7use pyo3::exceptions::PyValueError;
8use pyo3::prelude::*;
9use pyo3::pybacked::PyBackedStr;
10
11use crate::prelude::Wrap;
12use crate::utils::to_py_err;
13
14impl<'a, 'py> FromPyObject<'a, 'py> for Wrap<polars_plan::dsl::SinkTarget> {
15    type Error = PyErr;
16
17    fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
18        if let Ok(v) = ob.extract::<PyBackedStr>() {
19            Ok(Wrap(polars::prelude::SinkTarget::Path(PlRefPath::new(&*v))))
20        } else {
21            let writer = Python::attach(|py| {
22                let py_f = ob.to_owned();
23                PyResult::Ok(
24                    crate::file::try_get_pyfile(py, py_f, true)?
25                        .0
26                        .into_writeable(),
27                )
28            })?;
29
30            Ok(Wrap(polars_plan::prelude::SinkTarget::Dyn(SpecialEq::new(
31                Arc::new(Mutex::new(Some(writer))),
32            ))))
33        }
34    }
35}
36
37impl<'a, 'py> FromPyObject<'a, 'py> for Wrap<FileProviderReturn> {
38    type Error = PyErr;
39
40    fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
41        if let Ok(v) = ob.extract::<PyBackedStr>() {
42            Ok(Wrap(FileProviderReturn::Path(v.to_string())))
43        } else if let Ok(v) = ob.extract::<std::path::PathBuf>() {
44            Ok(Wrap(FileProviderReturn::Path(
45                v.to_str()
46                    .ok_or_else(|| to_py_err(polars_err!(non_utf8_path)))?
47                    .to_string(),
48            )))
49        } else {
50            let py = ob.py();
51
52            let writeable = crate::file::try_get_pyfile(py, ob.to_owned(), true)?
53                .0
54                .into_writeable();
55
56            Ok(Wrap(FileProviderReturn::Writeable(writeable)))
57        }
58    }
59}
60
61impl<'a, 'py> FromPyObject<'a, 'py> for Wrap<SyncOnCloseType> {
62    type Error = PyErr;
63
64    fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
65        let parsed = match &*ob.extract::<PyBackedStr>()? {
66            "none" => SyncOnCloseType::None,
67            "data" => SyncOnCloseType::Data,
68            "all" => SyncOnCloseType::All,
69            v => {
70                return Err(PyValueError::new_err(format!(
71                    "`sync_on_close` must be one of {{'none', 'data', 'all'}}, got {v}",
72                )));
73            },
74        };
75        Ok(Wrap(parsed))
76    }
77}