Skip to main content

polars_python/io/
sink_options.rs

1use std::sync::Arc;
2
3use polars::prelude::sink::SinkedPathsCallback;
4use polars::prelude::sync_on_close::SyncOnCloseType;
5use polars::prelude::{CloudScheme, PlanCallback, SpecialEq, UnifiedSinkArgs};
6use polars_utils::python_function::PythonObject;
7use pyo3::prelude::*;
8
9use crate::io::cloud_options::OptPyCloudOptions;
10use crate::prelude::Wrap;
11
12/// Interface to `class SinkOptions` on the Python side
13pub struct PySinkOptions<'py>(Bound<'py, PyAny>);
14
15impl<'a, 'py> FromPyObject<'a, 'py> for PySinkOptions<'py> {
16    type Error = PyErr;
17
18    fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
19        Ok(Self(ob.to_owned()))
20    }
21}
22
23impl PySinkOptions<'_> {
24    pub fn extract_unified_sink_args(
25        &self,
26        cloud_scheme: Option<CloudScheme>,
27    ) -> PyResult<UnifiedSinkArgs> {
28        #[derive(FromPyObject)]
29        struct Extract<'a> {
30            mkdir: bool,
31            maintain_order: bool,
32            sync_on_close: Option<Wrap<SyncOnCloseType>>,
33            storage_options: OptPyCloudOptions<'a>,
34            credential_provider: Option<Py<PyAny>>,
35            sinked_paths_callback: Option<Py<PyAny>>,
36        }
37
38        let Extract {
39            mkdir,
40            maintain_order,
41            sync_on_close,
42            storage_options,
43            credential_provider,
44            sinked_paths_callback,
45        } = self.0.extract()?;
46
47        let cloud_options =
48            storage_options.extract_opt_cloud_options(cloud_scheme, credential_provider)?;
49
50        let sync_on_close = sync_on_close.map_or(SyncOnCloseType::default(), |x| x.0);
51
52        let unified_sink_args = UnifiedSinkArgs {
53            mkdir,
54            maintain_order,
55            sync_on_close,
56            cloud_options: cloud_options.map(Arc::new),
57            sinked_paths_callback: sinked_paths_callback.map(|x| {
58                SinkedPathsCallback::Callback(PlanCallback::Python(SpecialEq::new(Arc::new(
59                    PythonObject(x),
60                ))))
61            }),
62        };
63
64        Ok(unified_sink_args)
65    }
66}