1use std::mem::{ManuallyDrop, MaybeUninit};
2
3use polars::prelude::*;
4use polars_ffi::version_0::SeriesExport;
5use pyo3::prelude::*;
6use pyo3::types::{PyDict, PyList};
7
8use crate::py_modules::{pl_series, polars, polars_rs};
9use crate::series::PySeries;
10use crate::{PyExpr, Wrap};
11
12pub(crate) trait ToSeries {
13 fn to_series(
14 &self,
15 py: Python<'_>,
16 py_polars_module: &Py<PyModule>,
17 name: &str,
18 ) -> PolarsResult<Series>;
19}
20
21impl ToSeries for PyObject {
22 fn to_series(
23 &self,
24 py: Python<'_>,
25 py_polars_module: &Py<PyModule>,
26 name: &str,
27 ) -> PolarsResult<Series> {
28 let py_pyseries = match self.getattr(py, "_s") {
29 Ok(s) => s,
30 _ => {
32 let res = py_polars_module
33 .getattr(py, "Series")
34 .unwrap()
35 .call1(py, (name, PyList::new(py, [self]).unwrap()));
36
37 match res {
38 Ok(python_s) => python_s.getattr(py, "_s").unwrap(),
39 Err(_) => {
40 polars_bail!(ComputeError:
41 "expected a something that could convert to a `Series` but got: {}",
42 self.bind(py).get_type()
43 )
44 },
45 }
46 },
47 };
48 let s = match py_pyseries.extract::<PySeries>(py) {
49 Ok(pyseries) => pyseries.series,
50 Err(_) => {
53 let mut export: MaybeUninit<SeriesExport> = MaybeUninit::uninit();
54 py_pyseries
55 .call_method1(py, "_export", (&raw mut export as usize,))
56 .unwrap();
57 unsafe {
58 let export = export.assume_init();
59 polars_ffi::version_0::import_series(export)?
60 }
61 },
62 };
63 Ok(s)
64 }
65}
66
67pub(crate) fn call_lambda_with_series(
68 py: Python<'_>,
69 s: &Series,
70 output_dtype: Option<Option<DataType>>,
71 lambda: &PyObject,
72) -> PyResult<PyObject> {
73 let pypolars = polars(py).bind(py);
74
75 let pyseries = PySeries::new(s.clone());
77 let mut python_series_wrapper = pypolars
79 .getattr("wrap_s")
80 .unwrap()
81 .call1((pyseries,))
82 .unwrap();
83
84 if !python_series_wrapper
85 .getattr("_s")
86 .unwrap()
87 .is_instance(polars_rs(py).getattr(py, "PySeries").unwrap().bind(py))
88 .unwrap()
89 {
90 let mut export = ManuallyDrop::new(polars_ffi::version_0::export_series(s));
91 let plseries = pl_series(py).bind(py);
92
93 let s_location = &raw mut export;
94 python_series_wrapper = plseries
95 .getattr("_import")
96 .unwrap()
97 .call1((s_location as usize,))
98 .unwrap()
99 }
100
101 let mut dict = None;
102 if let Some(output_dtype) = output_dtype {
103 let d = PyDict::new(py);
104 let output_dtype = match output_dtype {
105 None => None,
106 Some(dt) => Some(Wrap(dt).into_pyobject(py)?),
107 };
108 d.set_item("return_dtype", output_dtype)?;
109 dict = Some(d);
110 }
111
112 lambda.call(py, (python_series_wrapper,), dict.as_ref())
113}
114
115pub(crate) fn binary_lambda(
117 lambda: &PyObject,
118 a: Series,
119 b: Series,
120) -> PolarsResult<Option<Series>> {
121 Python::with_gil(|py| {
122 let pypolars = polars(py).bind(py);
124 let pyseries_a = PySeries::new(a);
126 let pyseries_b = PySeries::new(b);
127
128 let python_series_wrapper_a = pypolars
130 .getattr("wrap_s")
131 .unwrap()
132 .call1((pyseries_a,))
133 .unwrap();
134 let python_series_wrapper_b = pypolars
135 .getattr("wrap_s")
136 .unwrap()
137 .call1((pyseries_b,))
138 .unwrap();
139
140 let result_series_wrapper =
142 match lambda.call1(py, (python_series_wrapper_a, python_series_wrapper_b)) {
143 Ok(pyobj) => pyobj,
144 Err(e) => polars_bail!(
145 ComputeError: "custom python function failed: {}", e.value(py),
146 ),
147 };
148 let pyseries = if let Ok(expr) = result_series_wrapper.getattr(py, "_pyexpr") {
149 let pyexpr = expr.extract::<PyExpr>(py).unwrap();
150 let expr = pyexpr.inner;
151 let df = DataFrame::empty();
152 let out = df
153 .lazy()
154 .select([expr])
155 .with_predicate_pushdown(false)
156 .with_projection_pushdown(false)
157 .collect()?;
158
159 let s = out.select_at_idx(0).unwrap().clone();
160 PySeries::new(s.take_materialized_series())
161 } else {
162 return Some(result_series_wrapper.to_series(py, pypolars.as_unbound(), ""))
163 .transpose();
164 };
165
166 Ok(Some(pyseries.series))
168 })
169}
170
171pub fn map_single(
172 pyexpr: &PyExpr,
173 lambda: PyObject,
174 output_type: Option<DataTypeExpr>,
175 agg_list: bool,
176 is_elementwise: bool,
177 returns_scalar: bool,
178) -> PyExpr {
179 let func =
180 python_dsl::PythonUdfExpression::new(lambda, output_type, is_elementwise, returns_scalar);
181 pyexpr.inner.clone().map_python(func, agg_list).into()
182}
183
184pub(crate) fn call_lambda_with_columns_slice(
185 py: Python<'_>,
186 s: &[Column],
187 lambda: &PyObject,
188 pypolars: &Py<PyModule>,
189) -> PyObject {
190 let pypolars = pypolars.bind(py);
191
192 let iter = s.iter().map(|s| {
194 let ps = PySeries::new(s.as_materialized_series().clone());
195
196 pypolars.getattr("wrap_s").unwrap().call1((ps,)).unwrap()
198 });
199 let wrapped_s = PyList::new(py, iter).unwrap();
200
201 match lambda.call1(py, (wrapped_s,)) {
203 Ok(pyobj) => pyobj,
204 Err(e) => panic!("python function failed: {}", e.value(py)),
205 }
206}
207
208pub fn map_mul(
209 pyexpr: &[PyExpr],
210 py: Python<'_>,
211 lambda: PyObject,
212 output_type: Option<Wrap<DataType>>,
213 map_groups: bool,
214 returns_scalar: bool,
215) -> PyExpr {
216 let pypolars = polars(py).clone_ref(py);
219
220 let function = move |s: &mut [Column]| {
221 Python::with_gil(|py| {
222 let out = call_lambda_with_columns_slice(py, s, &lambda, &pypolars);
224
225 if map_groups && out.is_none(py) {
227 return Ok(None);
228 }
229
230 Ok(Some(out.to_series(py, &pypolars, "")?.into_column()))
231 })
232 };
233
234 let exprs = pyexpr.iter().map(|pe| pe.clone().inner).collect::<Vec<_>>();
235
236 let output_map = GetOutput::map_field(move |fld| {
237 Ok(match output_type {
238 Some(ref dt) => Field::new(fld.name().clone(), dt.0.clone()),
239 None => fld.clone(),
240 })
241 });
242 if map_groups {
243 polars::lazy::dsl::apply_multiple(function, exprs, output_map, returns_scalar).into()
244 } else {
245 polars::lazy::dsl::map_multiple(function, exprs, output_map).into()
246 }
247}