polars_python/map/
lazy.rs

1use std::mem::{ManuallyDrop, MaybeUninit};
2
3use polars::prelude::*;
4use polars_ffi::version_0::SeriesExport;
5use pyo3::prelude::*;
6use pyo3::types::{PyDict, PyList};
7
8use crate::py_modules::{pl_series, polars, polars_rs};
9use crate::series::PySeries;
10use crate::{PyExpr, Wrap};
11
12pub(crate) trait ToSeries {
13    fn to_series(
14        &self,
15        py: Python<'_>,
16        py_polars_module: &Py<PyModule>,
17        name: &str,
18    ) -> PolarsResult<Series>;
19}
20
21impl ToSeries for PyObject {
22    fn to_series(
23        &self,
24        py: Python<'_>,
25        py_polars_module: &Py<PyModule>,
26        name: &str,
27    ) -> PolarsResult<Series> {
28        let py_pyseries = match self.getattr(py, "_s") {
29            Ok(s) => s,
30            // the lambda did not return a series, we try to create a new python Series
31            _ => {
32                let res = py_polars_module
33                    .getattr(py, "Series")
34                    .unwrap()
35                    .call1(py, (name, PyList::new(py, [self]).unwrap()));
36
37                match res {
38                    Ok(python_s) => python_s.getattr(py, "_s").unwrap(),
39                    Err(_) => {
40                        polars_bail!(ComputeError:
41                            "expected a something that could convert to a `Series` but got: {}",
42                            self.bind(py).get_type()
43                        )
44                    },
45                }
46            },
47        };
48        let s = match py_pyseries.extract::<PySeries>(py) {
49            Ok(pyseries) => pyseries.series,
50            // This happens if the executed Polars is not from this source.
51            // Currently only happens in PC-workers
52            Err(_) => {
53                let mut export: MaybeUninit<SeriesExport> = MaybeUninit::uninit();
54                py_pyseries
55                    .call_method1(py, "_export", (&raw mut export as usize,))
56                    .unwrap();
57                unsafe {
58                    let export = export.assume_init();
59                    polars_ffi::version_0::import_series(export)?
60                }
61            },
62        };
63        Ok(s)
64    }
65}
66
67pub(crate) fn call_lambda_with_series(
68    py: Python<'_>,
69    s: &Series,
70    output_dtype: Option<Option<DataType>>,
71    lambda: &PyObject,
72) -> PyResult<PyObject> {
73    let pypolars = polars(py).bind(py);
74
75    // create a PySeries struct/object for Python
76    let pyseries = PySeries::new(s.clone());
77    // Wrap this PySeries object in the python side Series wrapper
78    let mut python_series_wrapper = pypolars
79        .getattr("wrap_s")
80        .unwrap()
81        .call1((pyseries,))
82        .unwrap();
83
84    if !python_series_wrapper
85        .getattr("_s")
86        .unwrap()
87        .is_instance(polars_rs(py).getattr(py, "PySeries").unwrap().bind(py))
88        .unwrap()
89    {
90        let mut export = ManuallyDrop::new(polars_ffi::version_0::export_series(s));
91        let plseries = pl_series(py).bind(py);
92
93        let s_location = &raw mut export;
94        python_series_wrapper = plseries
95            .getattr("_import")
96            .unwrap()
97            .call1((s_location as usize,))
98            .unwrap()
99    }
100
101    let mut dict = None;
102    if let Some(output_dtype) = output_dtype {
103        let d = PyDict::new(py);
104        let output_dtype = match output_dtype {
105            None => None,
106            Some(dt) => Some(Wrap(dt).into_pyobject(py)?),
107        };
108        d.set_item("return_dtype", output_dtype)?;
109        dict = Some(d);
110    }
111
112    lambda.call(py, (python_series_wrapper,), dict.as_ref())
113}
114
115/// A python lambda taking two Series
116pub(crate) fn binary_lambda(
117    lambda: &PyObject,
118    a: Series,
119    b: Series,
120) -> PolarsResult<Option<Series>> {
121    Python::with_gil(|py| {
122        // get the pypolars module
123        let pypolars = polars(py).bind(py);
124        // create a PySeries struct/object for Python
125        let pyseries_a = PySeries::new(a);
126        let pyseries_b = PySeries::new(b);
127
128        // Wrap this PySeries object in the python side Series wrapper
129        let python_series_wrapper_a = pypolars
130            .getattr("wrap_s")
131            .unwrap()
132            .call1((pyseries_a,))
133            .unwrap();
134        let python_series_wrapper_b = pypolars
135            .getattr("wrap_s")
136            .unwrap()
137            .call1((pyseries_b,))
138            .unwrap();
139
140        // call the lambda and get a python side Series wrapper
141        let result_series_wrapper =
142            match lambda.call1(py, (python_series_wrapper_a, python_series_wrapper_b)) {
143                Ok(pyobj) => pyobj,
144                Err(e) => polars_bail!(
145                    ComputeError: "custom python function failed: {}", e.value(py),
146                ),
147            };
148        let pyseries = if let Ok(expr) = result_series_wrapper.getattr(py, "_pyexpr") {
149            let pyexpr = expr.extract::<PyExpr>(py).unwrap();
150            let expr = pyexpr.inner;
151            let df = DataFrame::empty();
152            let out = df
153                .lazy()
154                .select([expr])
155                .with_predicate_pushdown(false)
156                .with_projection_pushdown(false)
157                .collect()?;
158
159            let s = out.select_at_idx(0).unwrap().clone();
160            PySeries::new(s.take_materialized_series())
161        } else {
162            return Some(result_series_wrapper.to_series(py, pypolars.as_unbound(), ""))
163                .transpose();
164        };
165
166        // Finally get the actual Series
167        Ok(Some(pyseries.series))
168    })
169}
170
171pub fn map_single(
172    pyexpr: &PyExpr,
173    lambda: PyObject,
174    output_type: Option<DataTypeExpr>,
175    agg_list: bool,
176    is_elementwise: bool,
177    returns_scalar: bool,
178) -> PyExpr {
179    let func =
180        python_dsl::PythonUdfExpression::new(lambda, output_type, is_elementwise, returns_scalar);
181    pyexpr.inner.clone().map_python(func, agg_list).into()
182}
183
184pub(crate) fn call_lambda_with_columns_slice(
185    py: Python<'_>,
186    s: &[Column],
187    lambda: &PyObject,
188    pypolars: &Py<PyModule>,
189) -> PyObject {
190    let pypolars = pypolars.bind(py);
191
192    // create a PySeries struct/object for Python
193    let iter = s.iter().map(|s| {
194        let ps = PySeries::new(s.as_materialized_series().clone());
195
196        // Wrap this PySeries object in the python side Series wrapper
197        pypolars.getattr("wrap_s").unwrap().call1((ps,)).unwrap()
198    });
199    let wrapped_s = PyList::new(py, iter).unwrap();
200
201    // call the lambda and get a python side Series wrapper
202    match lambda.call1(py, (wrapped_s,)) {
203        Ok(pyobj) => pyobj,
204        Err(e) => panic!("python function failed: {}", e.value(py)),
205    }
206}
207
208pub fn map_mul(
209    pyexpr: &[PyExpr],
210    py: Python<'_>,
211    lambda: PyObject,
212    output_type: Option<Wrap<DataType>>,
213    map_groups: bool,
214    returns_scalar: bool,
215) -> PyExpr {
216    // get the pypolars module
217    // do the import outside of the function to prevent import side effects in a hot loop.
218    let pypolars = polars(py).clone_ref(py);
219
220    let function = move |s: &mut [Column]| {
221        Python::with_gil(|py| {
222            // this is a python Series
223            let out = call_lambda_with_columns_slice(py, s, &lambda, &pypolars);
224
225            // we return an error, because that will become a null value polars lazy apply list
226            if map_groups && out.is_none(py) {
227                return Ok(None);
228            }
229
230            Ok(Some(out.to_series(py, &pypolars, "")?.into_column()))
231        })
232    };
233
234    let exprs = pyexpr.iter().map(|pe| pe.clone().inner).collect::<Vec<_>>();
235
236    let output_map = GetOutput::map_field(move |fld| {
237        Ok(match output_type {
238            Some(ref dt) => Field::new(fld.name().clone(), dt.0.clone()),
239            None => fld.clone(),
240        })
241    });
242    if map_groups {
243        polars::lazy::dsl::apply_multiple(function, exprs, output_map, returns_scalar).into()
244    } else {
245        polars::lazy::dsl::map_multiple(function, exprs, output_map).into()
246    }
247}