polars_python/map/
lazy.rs1use std::mem::{ManuallyDrop, MaybeUninit};
2
3use polars::prelude::*;
4use polars_ffi::version_0::SeriesExport;
5use pyo3::prelude::*;
6use pyo3::types::{PyDict, PyList};
7
8use crate::py_modules::{pl_series, polars, polars_rs};
9use crate::series::PySeries;
10use crate::{PyExpr, Wrap};
11
12pub(crate) trait ToSeries {
13 fn to_series(
14 &self,
15 py: Python<'_>,
16 py_polars_module: &Py<PyModule>,
17 name: &str,
18 ) -> PolarsResult<Series>;
19}
20
21impl ToSeries for PyObject {
22 fn to_series(
23 &self,
24 py: Python<'_>,
25 py_polars_module: &Py<PyModule>,
26 name: &str,
27 ) -> PolarsResult<Series> {
28 let py_pyseries = match self.getattr(py, "_s") {
29 Ok(s) => s,
30 _ => {
32 let res = py_polars_module
33 .getattr(py, "Series")
34 .unwrap()
35 .call1(py, (name, PyList::new(py, [self]).unwrap()));
36
37 match res {
38 Ok(python_s) => python_s.getattr(py, "_s").unwrap(),
39 Err(_) => {
40 polars_bail!(ComputeError:
41 "expected a something that could convert to a `Series` but got: {}",
42 self.bind(py).get_type()
43 )
44 },
45 }
46 },
47 };
48 let s = match py_pyseries.extract::<PySeries>(py) {
49 Ok(pyseries) => pyseries.series,
50 Err(_) => {
53 let mut export: MaybeUninit<SeriesExport> = MaybeUninit::uninit();
54 py_pyseries
55 .call_method1(py, "_export", (&raw mut export as usize,))
56 .unwrap();
57 unsafe {
58 let export = export.assume_init();
59 polars_ffi::version_0::import_series(export)?
60 }
61 },
62 };
63 Ok(s)
64 }
65}
66
67pub(crate) fn call_lambda_with_series(
68 py: Python<'_>,
69 s: &Series,
70 output_dtype: Option<Option<DataType>>,
71 lambda: &PyObject,
72) -> PyResult<PyObject> {
73 let pypolars = polars(py).bind(py);
74
75 let mut dict = None;
77 if let Some(output_dtype) = output_dtype {
78 let d = PyDict::new(py);
79 let output_dtype = match output_dtype {
80 None => None,
81 Some(dt) => Some(Wrap(dt).into_pyobject(py)?),
82 };
83 d.set_item("return_dtype", output_dtype)?;
84 dict = Some(d);
85 }
86
87 let pyseries = PySeries::new(s.clone());
89 let mut python_series_wrapper = pypolars
91 .getattr("wrap_s")
92 .unwrap()
93 .call1((pyseries,))
94 .unwrap();
95
96 if !python_series_wrapper
97 .getattr("_s")
98 .unwrap()
99 .is_instance(polars_rs(py).getattr(py, "PySeries").unwrap().bind(py))
100 .unwrap()
101 {
102 let mut export = ManuallyDrop::new(polars_ffi::version_0::export_series(s));
103 let plseries = pl_series(py).bind(py);
104
105 let s_location = &raw mut export;
106 python_series_wrapper = plseries
107 .getattr("_import")
108 .unwrap()
109 .call1((s_location as usize,))
110 .unwrap()
111 }
112
113 lambda.call(py, (python_series_wrapper,), dict.as_ref())
114}
115
116pub fn map_single(
117 pyexpr: &PyExpr,
118 lambda: PyObject,
119 output_type: Option<DataTypeExpr>,
120 is_elementwise: bool,
121 returns_scalar: bool,
122) -> PyExpr {
123 let func =
124 python_dsl::PythonUdfExpression::new(lambda, output_type, is_elementwise, returns_scalar);
125 pyexpr.inner.clone().map_python(func).into()
126}
127
128pub(crate) fn call_lambda_with_columns_slice(
129 py: Python<'_>,
130 s: &[Column],
131 lambda: &PyObject,
132 pypolars: &Py<PyModule>,
133) -> PyObject {
134 let pypolars = pypolars.bind(py);
135
136 let iter = s.iter().map(|s| {
138 let ps = PySeries::new(s.as_materialized_series().clone());
139
140 pypolars.getattr("wrap_s").unwrap().call1((ps,)).unwrap()
142 });
143 let wrapped_s = PyList::new(py, iter).unwrap();
144
145 match lambda.call1(py, (wrapped_s,)) {
147 Ok(pyobj) => pyobj,
148 Err(e) => panic!("python function failed: {}", e.value(py)),
149 }
150}
151
152pub fn map_mul(
153 pyexpr: &[PyExpr],
154 py: Python<'_>,
155 lambda: PyObject,
156 output_type: Option<Wrap<DataType>>,
157 map_groups: bool,
158 returns_scalar: bool,
159) -> PyExpr {
160 let pypolars = polars(py).clone_ref(py);
163
164 let function = move |s: &mut [Column]| {
165 Python::with_gil(|py| {
166 let out = call_lambda_with_columns_slice(py, s, &lambda, &pypolars);
168
169 if map_groups && out.is_none(py) {
171 return Ok(None);
172 }
173
174 Ok(Some(out.to_series(py, &pypolars, "")?.into_column()))
175 })
176 };
177
178 let exprs = pyexpr.iter().map(|pe| pe.clone().inner).collect::<Vec<_>>();
179
180 let output_map = GetOutput::map_field(move |fld| {
181 Ok(match output_type {
182 Some(ref dt) => Field::new(fld.name().clone(), dt.0.clone()),
183 None => fld.clone(),
184 })
185 });
186 if map_groups {
187 polars::lazy::dsl::apply_multiple(function, exprs, output_map, returns_scalar).into()
188 } else {
189 polars::lazy::dsl::map_multiple(function, exprs, output_map).into()
190 }
191}