1use polars::prelude::*;
2use pyo3::ffi::Py_uintptr_t;
3use pyo3::prelude::*;
4use pyo3::types::{PyDict, PyList};
5
6use crate::py_modules::polars;
7use crate::series::PySeries;
8use crate::{PyExpr, Wrap};
9
10pub(crate) trait ToSeries {
11 fn to_series(
12 &self,
13 py: Python,
14 py_polars_module: &Py<PyModule>,
15 name: &str,
16 ) -> PolarsResult<Series>;
17}
18
19impl ToSeries for PyObject {
20 fn to_series(
21 &self,
22 py: Python,
23 py_polars_module: &Py<PyModule>,
24 name: &str,
25 ) -> PolarsResult<Series> {
26 let py_pyseries = match self.getattr(py, "_s") {
27 Ok(s) => s,
28 _ => {
30 let res = py_polars_module
31 .getattr(py, "Series")
32 .unwrap()
33 .call1(py, (name, PyList::new(py, [self]).unwrap()));
34
35 match res {
36 Ok(python_s) => python_s.getattr(py, "_s").unwrap(),
37 Err(_) => {
38 polars_bail!(ComputeError:
39 "expected a something that could convert to a `Series` but got: {}",
40 self.bind(py).get_type()
41 )
42 },
43 }
44 },
45 };
46 let s = match py_pyseries.extract::<PySeries>(py) {
47 Ok(pyseries) => pyseries.series,
48 Err(_) => {
54 use arrow::ffi;
55 let kwargs = PyDict::new(py);
56 kwargs.set_item("in_place", true).unwrap();
57 py_pyseries
58 .call_method(py, "rechunk", (), Some(&kwargs))
59 .map_err(|e| polars_err!(ComputeError: "could not rechunk: {e}"))?;
60
61 let array = Box::new(ffi::ArrowArray::empty());
63 let schema = Box::new(ffi::ArrowSchema::empty());
64
65 let array_ptr = &*array as *const ffi::ArrowArray;
66 let schema_ptr = &*schema as *const ffi::ArrowSchema;
67 py_pyseries
70 .call_method1(
71 py,
72 "_export_arrow_to_c",
73 (array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t),
74 )
75 .map_err(|e| polars_err!(ComputeError: "{e}"))?;
76
77 unsafe {
78 let field = ffi::import_field_from_c(schema.as_ref())?;
79 let array = ffi::import_array_from_c(*array, field.dtype)?;
80 Series::from_arrow(field.name, array)?
81 }
82 },
83 };
84 Ok(s)
85 }
86}
87
88pub(crate) fn call_lambda_with_series(
89 py: Python,
90 s: Series,
91 lambda: &PyObject,
92) -> PyResult<PyObject> {
93 let pypolars = polars(py).bind(py);
94
95 let pyseries = PySeries::new(s);
97 let python_series_wrapper = pypolars
99 .getattr("wrap_s")
100 .unwrap()
101 .call1((pyseries,))
102 .unwrap();
103 lambda.call1(py, (python_series_wrapper,))
105}
106
107pub(crate) fn binary_lambda(
109 lambda: &PyObject,
110 a: Series,
111 b: Series,
112) -> PolarsResult<Option<Series>> {
113 Python::with_gil(|py| {
114 let pypolars = polars(py).bind(py);
116 let pyseries_a = PySeries::new(a);
118 let pyseries_b = PySeries::new(b);
119
120 let python_series_wrapper_a = pypolars
122 .getattr("wrap_s")
123 .unwrap()
124 .call1((pyseries_a,))
125 .unwrap();
126 let python_series_wrapper_b = pypolars
127 .getattr("wrap_s")
128 .unwrap()
129 .call1((pyseries_b,))
130 .unwrap();
131
132 let result_series_wrapper =
134 match lambda.call1(py, (python_series_wrapper_a, python_series_wrapper_b)) {
135 Ok(pyobj) => pyobj,
136 Err(e) => polars_bail!(
137 ComputeError: "custom python function failed: {}", e.value(py),
138 ),
139 };
140 let pyseries = if let Ok(expr) = result_series_wrapper.getattr(py, "_pyexpr") {
141 let pyexpr = expr.extract::<PyExpr>(py).unwrap();
142 let expr = pyexpr.inner;
143 let df = DataFrame::empty();
144 let out = df
145 .lazy()
146 .select([expr])
147 .with_predicate_pushdown(false)
148 .with_projection_pushdown(false)
149 .collect()?;
150
151 let s = out.select_at_idx(0).unwrap().clone();
152 PySeries::new(s.take_materialized_series())
153 } else {
154 return Some(result_series_wrapper.to_series(py, pypolars.as_unbound(), ""))
155 .transpose();
156 };
157
158 Ok(Some(pyseries.series))
160 })
161}
162
163pub fn map_single(
164 pyexpr: &PyExpr,
165 lambda: PyObject,
166 output_type: Option<Wrap<DataType>>,
167 agg_list: bool,
168 is_elementwise: bool,
169 returns_scalar: bool,
170) -> PyExpr {
171 let output_type = output_type.map(|wrap| wrap.0);
172
173 let func =
174 python_udf::PythonUdfExpression::new(lambda, output_type, is_elementwise, returns_scalar);
175 pyexpr.inner.clone().map_python(func, agg_list).into()
176}
177
178pub(crate) fn call_lambda_with_columns_slice(
179 py: Python,
180 s: &[Column],
181 lambda: &PyObject,
182 pypolars: &Py<PyModule>,
183) -> PyObject {
184 let pypolars = pypolars.bind(py);
185
186 let iter = s.iter().map(|s| {
188 let ps = PySeries::new(s.as_materialized_series().clone());
189
190 let python_series_wrapper = pypolars.getattr("wrap_s").unwrap().call1((ps,)).unwrap();
192
193 python_series_wrapper
194 });
195 let wrapped_s = PyList::new(py, iter).unwrap();
196
197 match lambda.call1(py, (wrapped_s,)) {
199 Ok(pyobj) => pyobj,
200 Err(e) => panic!("python function failed: {}", e.value(py)),
201 }
202}
203
204pub fn map_mul(
205 pyexpr: &[PyExpr],
206 py: Python,
207 lambda: PyObject,
208 output_type: Option<Wrap<DataType>>,
209 map_groups: bool,
210 returns_scalar: bool,
211) -> PyExpr {
212 let pypolars = polars(py).clone_ref(py);
215
216 let function = move |s: &mut [Column]| {
217 Python::with_gil(|py| {
218 let out = call_lambda_with_columns_slice(py, s, &lambda, &pypolars);
220
221 if map_groups && out.is_none(py) {
223 return Ok(None);
224 }
225
226 Ok(Some(out.to_series(py, &pypolars, "")?.into_column()))
227 })
228 };
229
230 let exprs = pyexpr.iter().map(|pe| pe.clone().inner).collect::<Vec<_>>();
231
232 let output_map = GetOutput::map_field(move |fld| {
233 Ok(match output_type {
234 Some(ref dt) => Field::new(fld.name().clone(), dt.0.clone()),
235 None => fld.clone(),
236 })
237 });
238 if map_groups {
239 polars::lazy::dsl::apply_multiple(function, exprs, output_map, returns_scalar).into()
240 } else {
241 polars::lazy::dsl::map_multiple(function, exprs, output_map).into()
242 }
243}