use arrow::array::{Array, BooleanArray, PrimitiveArray, Utf8Array};
use arrow::bitmap::Bitmap;
use arrow::offset::OffsetsBuffer;
use arrow::types::NativeType;
use polars::prelude::*;
use polars_buffer::Buffer;
use polars_core::{with_match_physical_numeric_polars_type, with_match_physical_numeric_type};
use pyo3::exceptions::PyTypeError;
use pyo3::prelude::*;
use pyo3::types::PyTuple;
use super::{PySeries, ToSeries};
use crate::conversion::Wrap;
use crate::error::PyPolarsErr;
use crate::raise_err;
use crate::utils::EnterPolarsExt;
struct BufferInfo {
pointer: usize,
offset: usize,
length: usize,
}
impl<'py> IntoPyObject<'py> for BufferInfo {
type Target = PyTuple;
type Output = Bound<'py, Self::Target>;
type Error = PyErr;
fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
(self.pointer, self.offset, self.length).into_pyobject(py)
}
}
impl<'a, 'py> FromPyObject<'a, 'py> for BufferInfo {
type Error = PyErr;
fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
let (pointer, offset, length) = ob.extract()?;
Ok(Self {
pointer,
offset,
length,
})
}
}
#[pymethods]
impl PySeries {
fn _get_buffer_info(&self) -> PyResult<BufferInfo> {
let lock = self.series.read();
let s = lock.to_physical_repr();
let arrays = s.chunks();
if arrays.len() != 1 {
let msg = "cannot get buffer info for Series consisting of multiple chunks";
raise_err!(msg, ComputeError);
}
match s.dtype() {
DataType::Boolean => {
let ca = s.bool().unwrap();
let arr = ca.downcast_iter().next().unwrap();
let (slice, offset, len) = arr.values().as_slice();
Ok(BufferInfo {
pointer: slice.as_ptr() as usize,
offset,
length: len,
})
},
dt if dt.is_primitive_numeric() => {
Ok(with_match_physical_numeric_polars_type!(dt, |$T| {
let ca: &ChunkedArray<$T> = s.as_ref().as_ref().as_ref();
BufferInfo { pointer: get_pointer(ca), offset: 0, length: ca.len() }
}))
},
dt => {
let msg = format!(
"`_get_buffer_info` not implemented for non-physical type {dt}; try to select a buffer first"
);
Err(PyTypeError::new_err(msg))
},
}
}
fn _get_buffers(&self, py: Python) -> PyResult<(Self, Option<Self>, Option<Self>)> {
let s = &self.series.read();
py.enter_polars(|| match s.dtype().to_physical() {
dt if dt.is_primitive_numeric() => get_buffers_from_primitive(s),
DataType::Boolean => get_buffers_from_primitive(s),
DataType::String => get_buffers_from_string(s),
dt => {
let msg = format!("`_get_buffers` not implemented for `dtype` {dt}");
Err(PyTypeError::new_err(msg))
},
})
}
}
fn get_pointer<T: PolarsNumericType>(ca: &ChunkedArray<T>) -> usize {
let arr = ca.downcast_iter().next().unwrap();
arr.values().as_ptr() as usize
}
fn get_buffers_from_primitive(
s: &Series,
) -> PyResult<(PySeries, Option<PySeries>, Option<PySeries>)> {
let chunks = s
.chunks()
.iter()
.map(|arr| arr.with_validity(None))
.collect::<Vec<_>>();
let values = Series::try_from((s.name().clone(), chunks))
.map_err(PyPolarsErr::from)?
.into();
let validity = get_bitmap(s);
let offsets = None;
Ok((values, validity, offsets))
}
fn get_buffers_from_string(s: &Series) -> PyResult<(PySeries, Option<PySeries>, Option<PySeries>)> {
let s = s.rechunk();
let ca = s.str().map_err(PyPolarsErr::from)?;
let arr_binview = ca.downcast_iter().next().unwrap();
let arr_utf8 = polars_compute::cast::utf8view_to_utf8(arr_binview);
let values = get_string_bytes(&arr_utf8)?;
let validity = get_bitmap(&s);
let offsets = get_string_offsets(&arr_utf8)?;
Ok((values, validity, Some(offsets)))
}
fn get_bitmap(s: &Series) -> Option<PySeries> {
if s.null_count() > 0 {
Some(s.is_not_null().into_series().into())
} else {
None
}
}
fn get_string_bytes(arr: &Utf8Array<i64>) -> PyResult<PySeries> {
let values_buffer = arr.values();
let values_arr =
PrimitiveArray::<u8>::try_new(ArrowDataType::UInt8, values_buffer.clone(), None)
.map_err(PyPolarsErr::from)?;
let values = Series::from_arrow(PlSmallStr::EMPTY, values_arr.to_boxed())
.map_err(PyPolarsErr::from)?
.into();
Ok(values)
}
fn get_string_offsets(arr: &Utf8Array<i64>) -> PyResult<PySeries> {
let offsets_buffer = arr.offsets().buffer();
let offsets_arr =
PrimitiveArray::<i64>::try_new(ArrowDataType::Int64, offsets_buffer.clone(), None)
.map_err(PyPolarsErr::from)?;
let offsets = Series::from_arrow(PlSmallStr::EMPTY, offsets_arr.to_boxed())
.map_err(PyPolarsErr::from)?
.into();
Ok(offsets)
}
#[pymethods]
impl PySeries {
#[staticmethod]
unsafe fn _from_buffer(
dtype: Wrap<DataType>,
buffer_info: BufferInfo,
owner: &Bound<'_, PyAny>,
) -> PyResult<Self> {
let dtype = dtype.0;
let BufferInfo {
pointer,
offset,
length,
} = buffer_info;
let owner = owner.to_owned().unbind();
let arr_boxed = match dtype {
dt if dt.is_primitive_numeric() => {
with_match_physical_numeric_type!(dt, |$T| unsafe {
from_buffer_impl::<$T>(pointer, offset, length, owner)
})
},
DataType::Boolean => {
unsafe { from_buffer_boolean_impl(pointer, offset, length, owner) }?
},
dt => {
let msg = format!(
"`_from_buffer` requires a physical type as input for `dtype`, got {dt}"
);
return Err(PyTypeError::new_err(msg));
},
};
let s = Series::from_arrow(PlSmallStr::EMPTY, arr_boxed)
.unwrap()
.into();
Ok(s)
}
}
unsafe fn from_buffer_impl<T: NativeType>(
pointer: usize,
offset: usize,
length: usize,
owner: Py<PyAny>,
) -> Box<dyn Array> {
let pointer = pointer as *const T;
let pointer = unsafe { pointer.add(offset) };
let slice = unsafe { std::slice::from_raw_parts(pointer, length) };
let arr = unsafe { arrow::ffi::mmap::slice_and_owner(slice, owner) };
arr.to_boxed()
}
unsafe fn from_buffer_boolean_impl(
pointer: usize,
offset: usize,
length: usize,
owner: Py<PyAny>,
) -> PyResult<Box<dyn Array>> {
let length_in_bytes = get_boolean_buffer_length_in_bytes(length, offset);
let pointer = pointer as *const u8;
let slice = unsafe { std::slice::from_raw_parts(pointer, length_in_bytes) };
let arr_result = unsafe { arrow::ffi::mmap::bitmap_and_owner(slice, offset, length, owner) };
let arr = arr_result.map_err(PyPolarsErr::from)?;
Ok(arr.to_boxed())
}
fn get_boolean_buffer_length_in_bytes(length: usize, offset: usize) -> usize {
let n_bits = offset + length;
let n_bytes = n_bits / 8;
let rest = n_bits % 8;
if rest == 0 { n_bytes } else { n_bytes + 1 }
}
#[pymethods]
impl PySeries {
#[staticmethod]
#[pyo3(signature = (dtype, data, validity))]
unsafe fn _from_buffers(
py: Python<'_>,
dtype: Wrap<DataType>,
data: Vec<PySeries>,
validity: Option<PySeries>,
) -> PyResult<Self> {
let dtype = dtype.0;
let mut data = data.to_series();
match data.len() {
0 => {
let msg = "`data` input to `_from_buffers` must contain at least one buffer";
return Err(PyTypeError::new_err(msg));
},
1 if validity.is_none() => {
let values = data.pop().unwrap();
let s = values.strict_cast(&dtype).map_err(PyPolarsErr::from)?;
return Ok(s.into());
},
_ => (),
}
let validity = match validity {
Some(ps) => {
let s = ps.series.into_inner();
let dtype = s.dtype();
if !dtype.is_bool() {
let msg = format!("validity buffer must have data type Boolean, got {dtype:?}");
return Err(PyTypeError::new_err(msg));
}
Some(series_to_bitmap(s).unwrap())
},
None => None,
};
let s = match dtype.to_physical() {
dt if dt.is_primitive_numeric() => {
let values = data.into_iter().next().unwrap();
with_match_physical_numeric_polars_type!(dt, |$T| {
let values_buffer = series_to_buffer::<$T>(values);
from_buffers_num_impl::<<$T as PolarsNumericType>::Native>(values_buffer, validity)?
})
},
DataType::Boolean => {
let values = data.into_iter().next().unwrap();
let values_buffer = series_to_bitmap(values)?;
from_buffers_bool_impl(values_buffer, validity)?
},
DataType::String => {
let mut data_iter = data.into_iter();
let values = data_iter.next().unwrap();
let offsets = match data_iter.next() {
Some(s) => {
let dtype = s.dtype();
if !matches!(dtype, DataType::Int64) {
return Err(PyTypeError::new_err(format!(
"offsets buffer must have data type Int64, got {dtype:?}"
)));
}
series_to_offsets(s)
},
None => {
return Err(PyTypeError::new_err(
"`_from_buffers` cannot create a String column without an offsets buffer",
));
},
};
let values = series_to_buffer::<UInt8Type>(values);
py.enter_polars(|| from_buffers_string_impl(values, validity, offsets))?
},
dt => {
let msg = format!("`_from_buffers` not implemented for `dtype` {dt}");
return Err(PyTypeError::new_err(msg));
},
};
let out = s.strict_cast(&dtype).map_err(PyPolarsErr::from)?;
Ok(out.into())
}
}
fn series_to_buffer<T>(s: Series) -> Buffer<T::Native>
where
T: PolarsNumericType,
{
let ca: &ChunkedArray<T> = s.as_ref().as_ref();
let ca = ca.rechunk();
ca.downcast_as_array().values().clone()
}
fn series_to_bitmap(s: Series) -> PyResult<Bitmap> {
let ca_result = s.bool();
let ca = ca_result.map_err(PyPolarsErr::from)?.rechunk();
Ok(ca.downcast_as_array().values().clone())
}
fn series_to_offsets(s: Series) -> OffsetsBuffer<i64> {
let buffer = series_to_buffer::<Int64Type>(s);
unsafe { OffsetsBuffer::new_unchecked(buffer) }
}
fn from_buffers_num_impl<T: NativeType>(
data: Buffer<T>,
validity: Option<Bitmap>,
) -> PyResult<Series> {
let arr = PrimitiveArray::new(T::PRIMITIVE.into(), data, validity);
let s_result = Series::from_arrow(PlSmallStr::EMPTY, arr.to_boxed());
let s = s_result.map_err(PyPolarsErr::from)?;
Ok(s)
}
fn from_buffers_bool_impl(data: Bitmap, validity: Option<Bitmap>) -> PyResult<Series> {
let arr = BooleanArray::new(ArrowDataType::Boolean, data, validity);
let s_result = Series::from_arrow(PlSmallStr::EMPTY, arr.to_boxed());
let s = s_result.map_err(PyPolarsErr::from)?;
Ok(s)
}
fn from_buffers_string_impl(
data: Buffer<u8>,
validity: Option<Bitmap>,
offsets: OffsetsBuffer<i64>,
) -> PyResult<Series> {
let arr = Utf8Array::new(ArrowDataType::LargeUtf8, offsets, data, validity);
let s_result = Series::from_arrow(PlSmallStr::EMPTY, arr.to_boxed());
let s = s_result.map_err(PyPolarsErr::from)?;
Ok(s)
}