polars_python/lazyframe/
sink.rs1use std::sync::{Arc, Mutex};
2
3use polars::prelude::file_provider::FileProviderReturn;
4use polars::prelude::sync_on_close::SyncOnCloseType;
5use polars::prelude::{PlRefPath, SpecialEq};
6use polars_error::polars_err;
7use pyo3::exceptions::PyValueError;
8use pyo3::prelude::*;
9use pyo3::pybacked::PyBackedStr;
10
11use crate::prelude::Wrap;
12use crate::utils::to_py_err;
13
14impl<'a, 'py> FromPyObject<'a, 'py> for Wrap<polars_plan::dsl::SinkTarget> {
15 type Error = PyErr;
16
17 fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
18 if let Ok(v) = ob.extract::<PyBackedStr>() {
19 Ok(Wrap(polars::prelude::SinkTarget::Path(PlRefPath::new(&*v))))
20 } else {
21 let writer = Python::attach(|py| {
22 let py_f = ob.to_owned();
23 PyResult::Ok(
24 crate::file::try_get_pyfile(py, py_f, true)?
25 .0
26 .into_writeable(),
27 )
28 })?;
29
30 Ok(Wrap(polars_plan::prelude::SinkTarget::Dyn(SpecialEq::new(
31 Arc::new(Mutex::new(Some(writer))),
32 ))))
33 }
34 }
35}
36
37impl<'a, 'py> FromPyObject<'a, 'py> for Wrap<FileProviderReturn> {
38 type Error = PyErr;
39
40 fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
41 if let Ok(v) = ob.extract::<PyBackedStr>() {
42 Ok(Wrap(FileProviderReturn::Path(v.to_string())))
43 } else if let Ok(v) = ob.extract::<std::path::PathBuf>() {
44 Ok(Wrap(FileProviderReturn::Path(
45 v.to_str()
46 .ok_or_else(|| to_py_err(polars_err!(non_utf8_path)))?
47 .to_string(),
48 )))
49 } else {
50 let py = ob.py();
51
52 let writeable = crate::file::try_get_pyfile(py, ob.to_owned(), true)?
53 .0
54 .into_writeable();
55
56 Ok(Wrap(FileProviderReturn::Writeable(writeable)))
57 }
58 }
59}
60
61impl<'a, 'py> FromPyObject<'a, 'py> for Wrap<SyncOnCloseType> {
62 type Error = PyErr;
63
64 fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
65 let parsed = match &*ob.extract::<PyBackedStr>()? {
66 "none" => SyncOnCloseType::None,
67 "data" => SyncOnCloseType::Data,
68 "all" => SyncOnCloseType::All,
69 v => {
70 return Err(PyValueError::new_err(format!(
71 "`sync_on_close` must be one of {{'none', 'data', 'all'}}, got {v}",
72 )));
73 },
74 };
75 Ok(Wrap(parsed))
76 }
77}