polars_python/delta/
dv_provider_funcs.rs1use arrow::array::{MutableBinaryViewArray, Utf8ViewArray};
2use polars::prelude::{ArrowDataType, IntoColumn, PlRefPath, ScanSourceRef};
3use polars::series::Series;
4use polars_buffer::Buffer;
5use polars_core::frame::DataFrame;
6use polars_error::{PolarsError, PolarsResult};
7use polars_utils::python_function::PythonObject;
8use pyo3::types::{PyAnyMethods, PyModule};
9use pyo3::{PyErr, Python, intern};
10
11use crate::dataframe::PyDataFrame;
12
13pub fn call(callback: &PythonObject, paths: Buffer<PlRefPath>) -> PolarsResult<Option<DataFrame>> {
14 let df = {
15 let mut builder = MutableBinaryViewArray::with_capacity(
16 paths.len().wrapping_mul(
17 paths
18 .first()
19 .map_or(0, |x| ScanSourceRef::Path(x).to_include_path_name().len()),
20 ),
21 );
22
23 for path in paths.iter() {
24 builder.push_value_ignore_validity(ScanSourceRef::Path(path).to_include_path_name());
25 }
26
27 let array: Utf8ViewArray = builder.freeze_with_dtype(ArrowDataType::Utf8View);
28 let c = Series::from_arrow("path".into(), Box::new(array))
29 .unwrap()
30 .into_column();
31
32 DataFrame::new(paths.len(), vec![c]).unwrap()
33 };
34
35 Python::attach(|py| {
36 let pl = PyModule::import(py, "polars")?;
38 let py_df_wrapped = pl
39 .getattr(intern!(py, "DataFrame"))?
40 .getattr(intern!(py, "_from_pydf"))?
41 .call1((PyDataFrame::new(df),))?;
42
43 let result_wrapped = callback
44 .getattr(py, intern!(py, "__call__"))?
45 .call1(py, (py_df_wrapped,))?;
46
47 if result_wrapped.is_none(py) {
48 return Ok(None);
49 }
50
51 let py_pydf = result_wrapped.getattr(py, "_df").map_err(|_| {
53 let pytype = result_wrapped.bind(py).get_type();
54 PolarsError::ComputeError(
55 format!("expected the deletion vector callback to return a 'DataFrame', got a '{pytype}'",)
56 .into(),
57 )
58 })?;
59
60 let pydf = py_pydf.extract::<PyDataFrame>(py).map_err(PyErr::from)?;
61 Ok(Some(pydf.df.into_inner()))
62 })
63}