polars_python/io/
sink_options.rs1use std::sync::Arc;
2
3use polars::prelude::sink::SinkedPathsCallback;
4use polars::prelude::sync_on_close::SyncOnCloseType;
5use polars::prelude::{CloudScheme, PlanCallback, SpecialEq, UnifiedSinkArgs};
6use polars_utils::python_function::PythonObject;
7use pyo3::prelude::*;
8
9use crate::io::cloud_options::OptPyCloudOptions;
10use crate::prelude::Wrap;
11
12pub struct PySinkOptions<'py>(Bound<'py, PyAny>);
14
15impl<'a, 'py> FromPyObject<'a, 'py> for PySinkOptions<'py> {
16 type Error = PyErr;
17
18 fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
19 Ok(Self(ob.to_owned()))
20 }
21}
22
23impl PySinkOptions<'_> {
24 pub fn extract_unified_sink_args(
25 &self,
26 cloud_scheme: Option<CloudScheme>,
27 ) -> PyResult<UnifiedSinkArgs> {
28 #[derive(FromPyObject)]
29 struct Extract<'a> {
30 mkdir: bool,
31 maintain_order: bool,
32 sync_on_close: Option<Wrap<SyncOnCloseType>>,
33 storage_options: OptPyCloudOptions<'a>,
34 credential_provider: Option<Py<PyAny>>,
35 sinked_paths_callback: Option<Py<PyAny>>,
36 }
37
38 let Extract {
39 mkdir,
40 maintain_order,
41 sync_on_close,
42 storage_options,
43 credential_provider,
44 sinked_paths_callback,
45 } = self.0.extract()?;
46
47 let cloud_options =
48 storage_options.extract_opt_cloud_options(cloud_scheme, credential_provider)?;
49
50 let sync_on_close = sync_on_close.map_or(SyncOnCloseType::default(), |x| x.0);
51
52 let unified_sink_args = UnifiedSinkArgs {
53 mkdir,
54 maintain_order,
55 sync_on_close,
56 cloud_options: cloud_options.map(Arc::new),
57 sinked_paths_callback: sinked_paths_callback.map(|x| {
58 SinkedPathsCallback::Callback(PlanCallback::Python(SpecialEq::new(Arc::new(
59 PythonObject(x),
60 ))))
61 }),
62 };
63
64 Ok(unified_sink_args)
65 }
66}