polars_python/io/
sink_options.rs1use std::sync::Arc;
2
3use polars::prelude::sync_on_close::SyncOnCloseType;
4use polars::prelude::{CloudScheme, UnifiedSinkArgs};
5use pyo3::prelude::*;
6
7use crate::io::cloud_options::OptPyCloudOptions;
8use crate::prelude::Wrap;
9
10pub struct PySinkOptions<'py>(Bound<'py, PyAny>);
12
13impl<'a, 'py> FromPyObject<'a, 'py> for PySinkOptions<'py> {
14 type Error = PyErr;
15
16 fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
17 Ok(Self(ob.to_owned()))
18 }
19}
20
21impl PySinkOptions<'_> {
22 pub fn extract_unified_sink_args(
23 &self,
24 cloud_scheme: Option<CloudScheme>,
25 ) -> PyResult<UnifiedSinkArgs> {
26 #[derive(FromPyObject)]
27 struct Extract<'a> {
28 mkdir: bool,
29 maintain_order: bool,
30 sync_on_close: Option<Wrap<SyncOnCloseType>>,
31 storage_options: OptPyCloudOptions<'a>,
32 credential_provider: Option<Py<PyAny>>,
33 }
34
35 let Extract {
36 mkdir,
37 maintain_order,
38 sync_on_close,
39 storage_options,
40 credential_provider,
41 } = self.0.extract()?;
42
43 let cloud_options =
44 storage_options.extract_opt_cloud_options(cloud_scheme, credential_provider)?;
45
46 let sync_on_close = sync_on_close.map_or(SyncOnCloseType::default(), |x| x.0);
47
48 let unified_sink_args = UnifiedSinkArgs {
49 mkdir,
50 maintain_order,
51 sync_on_close,
52 cloud_options: cloud_options.map(Arc::new),
53 };
54
55 Ok(unified_sink_args)
56 }
57}