Skip to main content

polars_python/io/
sink_options.rs

1use std::sync::Arc;
2
3use polars::prelude::sync_on_close::SyncOnCloseType;
4use polars::prelude::{CloudScheme, UnifiedSinkArgs};
5use pyo3::prelude::*;
6
7use crate::io::cloud_options::OptPyCloudOptions;
8use crate::prelude::Wrap;
9
10/// Interface to `class SinkOptions` on the Python side
11pub struct PySinkOptions<'py>(Bound<'py, PyAny>);
12
13impl<'a, 'py> FromPyObject<'a, 'py> for PySinkOptions<'py> {
14    type Error = PyErr;
15
16    fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
17        Ok(Self(ob.to_owned()))
18    }
19}
20
21impl PySinkOptions<'_> {
22    pub fn extract_unified_sink_args(
23        &self,
24        cloud_scheme: Option<CloudScheme>,
25    ) -> PyResult<UnifiedSinkArgs> {
26        #[derive(FromPyObject)]
27        struct Extract<'a> {
28            mkdir: bool,
29            maintain_order: bool,
30            sync_on_close: Option<Wrap<SyncOnCloseType>>,
31            storage_options: OptPyCloudOptions<'a>,
32            credential_provider: Option<Py<PyAny>>,
33        }
34
35        let Extract {
36            mkdir,
37            maintain_order,
38            sync_on_close,
39            storage_options,
40            credential_provider,
41        } = self.0.extract()?;
42
43        let cloud_options =
44            storage_options.extract_opt_cloud_options(cloud_scheme, credential_provider)?;
45
46        let sync_on_close = sync_on_close.map_or(SyncOnCloseType::default(), |x| x.0);
47
48        let unified_sink_args = UnifiedSinkArgs {
49            mkdir,
50            maintain_order,
51            sync_on_close,
52            cloud_options: cloud_options.map(Arc::new),
53        };
54
55        Ok(unified_sink_args)
56    }
57}