polars_python/map/
lazy.rs

1use polars::prelude::*;
2use pyo3::ffi::Py_uintptr_t;
3use pyo3::prelude::*;
4use pyo3::types::{PyDict, PyList};
5
6use crate::py_modules::polars;
7use crate::series::PySeries;
8use crate::{PyExpr, Wrap};
9
10pub(crate) trait ToSeries {
11    fn to_series(
12        &self,
13        py: Python,
14        py_polars_module: &Py<PyModule>,
15        name: &str,
16    ) -> PolarsResult<Series>;
17}
18
19impl ToSeries for PyObject {
20    fn to_series(
21        &self,
22        py: Python,
23        py_polars_module: &Py<PyModule>,
24        name: &str,
25    ) -> PolarsResult<Series> {
26        let py_pyseries = match self.getattr(py, "_s") {
27            Ok(s) => s,
28            // the lambda did not return a series, we try to create a new python Series
29            _ => {
30                let res = py_polars_module
31                    .getattr(py, "Series")
32                    .unwrap()
33                    .call1(py, (name, PyList::new(py, [self]).unwrap()));
34
35                match res {
36                    Ok(python_s) => python_s.getattr(py, "_s").unwrap(),
37                    Err(_) => {
38                        polars_bail!(ComputeError:
39                            "expected a something that could convert to a `Series` but got: {}",
40                            self.bind(py).get_type()
41                        )
42                    },
43                }
44            },
45        };
46        let s = match py_pyseries.extract::<PySeries>(py) {
47            Ok(pyseries) => pyseries.series,
48            // This happens if the executed Polars is not from this source.
49            // Currently only happens in PC-workers
50            // For now use arrow to convert
51            // Eventually we must use Polars' Series Export as that can deal with
52            // multiple chunks
53            Err(_) => {
54                use arrow::ffi;
55                let kwargs = PyDict::new(py);
56                kwargs.set_item("in_place", true).unwrap();
57                py_pyseries
58                    .call_method(py, "rechunk", (), Some(&kwargs))
59                    .map_err(|e| polars_err!(ComputeError: "could not rechunk: {e}"))?;
60
61                // Prepare a pointer to receive the Array struct.
62                let array = Box::new(ffi::ArrowArray::empty());
63                let schema = Box::new(ffi::ArrowSchema::empty());
64
65                let array_ptr = &*array as *const ffi::ArrowArray;
66                let schema_ptr = &*schema as *const ffi::ArrowSchema;
67                // SAFETY:
68                // this is unsafe as it write to the pointers we just prepared
69                py_pyseries
70                    .call_method1(
71                        py,
72                        "_export_arrow_to_c",
73                        (array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t),
74                    )
75                    .map_err(|e| polars_err!(ComputeError: "{e}"))?;
76
77                unsafe {
78                    let field = ffi::import_field_from_c(schema.as_ref())?;
79                    let array = ffi::import_array_from_c(*array, field.dtype)?;
80                    Series::from_arrow(field.name, array)?
81                }
82            },
83        };
84        Ok(s)
85    }
86}
87
88pub(crate) fn call_lambda_with_series(
89    py: Python,
90    s: Series,
91    lambda: &PyObject,
92) -> PyResult<PyObject> {
93    let pypolars = polars(py).bind(py);
94
95    // create a PySeries struct/object for Python
96    let pyseries = PySeries::new(s);
97    // Wrap this PySeries object in the python side Series wrapper
98    let python_series_wrapper = pypolars
99        .getattr("wrap_s")
100        .unwrap()
101        .call1((pyseries,))
102        .unwrap();
103    // call the lambda and get a python side Series wrapper
104    lambda.call1(py, (python_series_wrapper,))
105}
106
107/// A python lambda taking two Series
108pub(crate) fn binary_lambda(
109    lambda: &PyObject,
110    a: Series,
111    b: Series,
112) -> PolarsResult<Option<Series>> {
113    Python::with_gil(|py| {
114        // get the pypolars module
115        let pypolars = polars(py).bind(py);
116        // create a PySeries struct/object for Python
117        let pyseries_a = PySeries::new(a);
118        let pyseries_b = PySeries::new(b);
119
120        // Wrap this PySeries object in the python side Series wrapper
121        let python_series_wrapper_a = pypolars
122            .getattr("wrap_s")
123            .unwrap()
124            .call1((pyseries_a,))
125            .unwrap();
126        let python_series_wrapper_b = pypolars
127            .getattr("wrap_s")
128            .unwrap()
129            .call1((pyseries_b,))
130            .unwrap();
131
132        // call the lambda and get a python side Series wrapper
133        let result_series_wrapper =
134            match lambda.call1(py, (python_series_wrapper_a, python_series_wrapper_b)) {
135                Ok(pyobj) => pyobj,
136                Err(e) => polars_bail!(
137                    ComputeError: "custom python function failed: {}", e.value(py),
138                ),
139            };
140        let pyseries = if let Ok(expr) = result_series_wrapper.getattr(py, "_pyexpr") {
141            let pyexpr = expr.extract::<PyExpr>(py).unwrap();
142            let expr = pyexpr.inner;
143            let df = DataFrame::empty();
144            let out = df
145                .lazy()
146                .select([expr])
147                .with_predicate_pushdown(false)
148                .with_projection_pushdown(false)
149                .collect()?;
150
151            let s = out.select_at_idx(0).unwrap().clone();
152            PySeries::new(s.take_materialized_series())
153        } else {
154            return Some(result_series_wrapper.to_series(py, pypolars.as_unbound(), ""))
155                .transpose();
156        };
157
158        // Finally get the actual Series
159        Ok(Some(pyseries.series))
160    })
161}
162
163pub fn map_single(
164    pyexpr: &PyExpr,
165    lambda: PyObject,
166    output_type: Option<Wrap<DataType>>,
167    agg_list: bool,
168    is_elementwise: bool,
169    returns_scalar: bool,
170) -> PyExpr {
171    let output_type = output_type.map(|wrap| wrap.0);
172
173    let func =
174        python_udf::PythonUdfExpression::new(lambda, output_type, is_elementwise, returns_scalar);
175    pyexpr.inner.clone().map_python(func, agg_list).into()
176}
177
178pub(crate) fn call_lambda_with_columns_slice(
179    py: Python,
180    s: &[Column],
181    lambda: &PyObject,
182    pypolars: &Py<PyModule>,
183) -> PyObject {
184    let pypolars = pypolars.bind(py);
185
186    // create a PySeries struct/object for Python
187    let iter = s.iter().map(|s| {
188        let ps = PySeries::new(s.as_materialized_series().clone());
189
190        // Wrap this PySeries object in the python side Series wrapper
191        let python_series_wrapper = pypolars.getattr("wrap_s").unwrap().call1((ps,)).unwrap();
192
193        python_series_wrapper
194    });
195    let wrapped_s = PyList::new(py, iter).unwrap();
196
197    // call the lambda and get a python side Series wrapper
198    match lambda.call1(py, (wrapped_s,)) {
199        Ok(pyobj) => pyobj,
200        Err(e) => panic!("python function failed: {}", e.value(py)),
201    }
202}
203
204pub fn map_mul(
205    pyexpr: &[PyExpr],
206    py: Python,
207    lambda: PyObject,
208    output_type: Option<Wrap<DataType>>,
209    map_groups: bool,
210    returns_scalar: bool,
211) -> PyExpr {
212    // get the pypolars module
213    // do the import outside of the function to prevent import side effects in a hot loop.
214    let pypolars = polars(py).clone_ref(py);
215
216    let function = move |s: &mut [Column]| {
217        Python::with_gil(|py| {
218            // this is a python Series
219            let out = call_lambda_with_columns_slice(py, s, &lambda, &pypolars);
220
221            // we return an error, because that will become a null value polars lazy apply list
222            if map_groups && out.is_none(py) {
223                return Ok(None);
224            }
225
226            Ok(Some(out.to_series(py, &pypolars, "")?.into_column()))
227        })
228    };
229
230    let exprs = pyexpr.iter().map(|pe| pe.clone().inner).collect::<Vec<_>>();
231
232    let output_map = GetOutput::map_field(move |fld| {
233        Ok(match output_type {
234            Some(ref dt) => Field::new(fld.name().clone(), dt.0.clone()),
235            None => fld.clone(),
236        })
237    });
238    if map_groups {
239        polars::lazy::dsl::apply_multiple(function, exprs, output_map, returns_scalar).into()
240    } else {
241        polars::lazy::dsl::map_multiple(function, exprs, output_map).into()
242    }
243}