Skip to main content

polars_python/delta/
dv_provider_funcs.rs

1use arrow::array::{MutableBinaryViewArray, Utf8ViewArray};
2use polars::prelude::{ArrowDataType, IntoColumn, PlRefPath, ScanSourceRef};
3use polars::series::Series;
4use polars_buffer::Buffer;
5use polars_core::frame::DataFrame;
6use polars_error::{PolarsError, PolarsResult};
7use polars_utils::python_function::PythonObject;
8use pyo3::types::{PyAnyMethods, PyModule};
9use pyo3::{PyErr, Python, intern};
10
11use crate::dataframe::PyDataFrame;
12
13pub fn call(callback: &PythonObject, paths: Buffer<PlRefPath>) -> PolarsResult<Option<DataFrame>> {
14    let df = {
15        let mut builder = MutableBinaryViewArray::with_capacity(
16            paths.len().wrapping_mul(
17                paths
18                    .first()
19                    .map_or(0, |x| ScanSourceRef::Path(x).to_include_path_name().len()),
20            ),
21        );
22
23        for path in paths.iter() {
24            builder.push_value_ignore_validity(ScanSourceRef::Path(path).to_include_path_name());
25        }
26
27        let array: Utf8ViewArray = builder.freeze_with_dtype(ArrowDataType::Utf8View);
28        let c = Series::from_arrow("path".into(), Box::new(array))
29            .unwrap()
30            .into_column();
31
32        DataFrame::new(paths.len(), vec![c]).unwrap()
33    };
34
35    Python::attach(|py| {
36        // Wrap to Python
37        let pl = PyModule::import(py, "polars")?;
38        let py_df_wrapped = pl
39            .getattr(intern!(py, "DataFrame"))?
40            .getattr(intern!(py, "_from_pydf"))?
41            .call1((PyDataFrame::new(df),))?;
42
43        let result_wrapped = callback
44            .getattr(py, intern!(py, "__call__"))?
45            .call1(py, (py_df_wrapped,))?;
46
47        if result_wrapped.is_none(py) {
48            return Ok(None);
49        }
50
51        // Unwrap to Rust
52        let py_pydf = result_wrapped.getattr(py, "_df").map_err(|_| {
53            let pytype = result_wrapped.bind(py).get_type();
54            PolarsError::ComputeError(
55                format!("expected the deletion vector callback to return a 'DataFrame', got a '{pytype}'",)
56                    .into(),
57            )
58        })?;
59
60        let pydf = py_pydf.extract::<PyDataFrame>(py).map_err(PyErr::from)?;
61        Ok(Some(pydf.df.into_inner()))
62    })
63}