use std::{ptr::NonNull, sync::Arc};
use crate::{
array::*,
bitmap::{utils::bytes_for, Bitmap},
buffer::{
bytes::{Bytes, Deallocation},
Buffer,
},
datatypes::{DataType, PhysicalType},
error::{ArrowError, Result},
ffi::schema::get_child,
types::NativeType,
};
use super::ArrowArray;
pub unsafe fn try_from<A: ArrowArrayRef>(array: A) -> Result<Box<dyn Array>> {
use PhysicalType::*;
Ok(match array.data_type().to_physical_type() {
Null => Box::new(NullArray::try_from_ffi(array)?),
Boolean => Box::new(BooleanArray::try_from_ffi(array)?),
Primitive(primitive) => with_match_primitive_type!(primitive, |$T| {
Box::new(PrimitiveArray::<$T>::try_from_ffi(array)?)
}),
Utf8 => Box::new(Utf8Array::<i32>::try_from_ffi(array)?),
LargeUtf8 => Box::new(Utf8Array::<i64>::try_from_ffi(array)?),
Binary => Box::new(BinaryArray::<i32>::try_from_ffi(array)?),
LargeBinary => Box::new(BinaryArray::<i64>::try_from_ffi(array)?),
FixedSizeBinary => Box::new(FixedSizeBinaryArray::try_from_ffi(array)?),
List => Box::new(ListArray::<i32>::try_from_ffi(array)?),
LargeList => Box::new(ListArray::<i64>::try_from_ffi(array)?),
FixedSizeList => Box::new(FixedSizeListArray::try_from_ffi(array)?),
Struct => Box::new(StructArray::try_from_ffi(array)?),
Dictionary(key_type) => {
match_integer_type!(key_type, |$T| {
Box::new(DictionaryArray::<$T>::try_from_ffi(array)?)
})
}
Union => Box::new(UnionArray::try_from_ffi(array)?),
Map => Box::new(MapArray::try_from_ffi(array)?),
})
}
unsafe impl Send for ArrowArray {}
unsafe impl Sync for ArrowArray {}
impl Drop for ArrowArray {
fn drop(&mut self) {
match self.release {
None => (),
Some(release) => unsafe { release(self) },
};
}
}
unsafe extern "C" fn c_release_array(array: *mut ArrowArray) {
if array.is_null() {
return;
}
let array = &mut *array;
let private = Box::from_raw(array.private_data as *mut PrivateData);
for child in private.children_ptr.iter() {
let _ = Box::from_raw(*child);
}
if let Some(ptr) = private.dictionary_ptr {
let _ = Box::from_raw(ptr);
}
array.release = None;
}
#[allow(dead_code)]
struct PrivateData {
array: Arc<dyn Array>,
buffers_ptr: Box<[*const std::os::raw::c_void]>,
children_ptr: Box<[*mut ArrowArray]>,
dictionary_ptr: Option<*mut ArrowArray>,
}
impl ArrowArray {
pub(crate) fn new(array: Arc<dyn Array>) -> Self {
let (offset, buffers, children, dictionary) =
offset_buffers_children_dictionary(array.as_ref());
let buffers_ptr = buffers
.iter()
.map(|maybe_buffer| match maybe_buffer {
Some(b) => b.as_ptr() as *const std::os::raw::c_void,
None => std::ptr::null(),
})
.collect::<Box<[_]>>();
let n_buffers = buffers.len() as i64;
let children_ptr = children
.into_iter()
.map(|child| Box::into_raw(Box::new(ArrowArray::new(child))))
.collect::<Box<_>>();
let n_children = children_ptr.len() as i64;
let dictionary_ptr =
dictionary.map(|array| Box::into_raw(Box::new(ArrowArray::new(array))));
let length = array.len() as i64;
let null_count = array.null_count() as i64;
let mut private_data = Box::new(PrivateData {
array,
buffers_ptr,
children_ptr,
dictionary_ptr,
});
Self {
length,
null_count,
offset: offset as i64,
n_buffers,
n_children,
buffers: private_data.buffers_ptr.as_mut_ptr(),
children: private_data.children_ptr.as_mut_ptr(),
dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()),
release: Some(c_release_array),
private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void,
}
}
pub fn empty() -> Self {
Self {
length: 0,
null_count: 0,
offset: 0,
n_buffers: 0,
n_children: 0,
buffers: std::ptr::null_mut(),
children: std::ptr::null_mut(),
dictionary: std::ptr::null_mut(),
release: None,
private_data: std::ptr::null_mut(),
}
}
pub(crate) fn len(&self) -> usize {
self.length as usize
}
pub(crate) fn offset(&self) -> usize {
self.offset as usize
}
pub(crate) fn null_count(&self) -> usize {
self.null_count as usize
}
}
unsafe fn create_buffer<T: NativeType>(
array: &ArrowArray,
data_type: &DataType,
deallocation: Deallocation,
index: usize,
) -> Result<Buffer<T>> {
if array.buffers.is_null() {
return Err(ArrowError::OutOfSpec(
"The array buffers are null".to_string(),
));
}
let buffers = array.buffers as *mut *const u8;
assert!(index < array.n_buffers as usize);
let ptr = *buffers.add(index);
let ptr = NonNull::new(ptr as *mut T);
let len = buffer_len(array, data_type, index)?;
let offset = buffer_offset(array, data_type, index);
let bytes = ptr
.map(|ptr| Bytes::from_ffi(ptr, len, deallocation))
.ok_or_else(|| {
ArrowError::OutOfSpec(format!("The buffer at position {} is null", index))
})?;
Ok(Buffer::from_bytes(bytes).slice(offset, len - offset))
}
unsafe fn create_bitmap(
array: &ArrowArray,
deallocation: Deallocation,
index: usize,
) -> Result<Bitmap> {
if array.buffers.is_null() {
return Err(ArrowError::OutOfSpec(
"The array buffers are null".to_string(),
));
}
let len = array.length as usize;
let offset = array.offset as usize;
let buffers = array.buffers as *mut *const u8;
assert!(index < array.n_buffers as usize);
let ptr = *buffers.add(index);
let bytes_len = bytes_for(offset + len);
let ptr = NonNull::new(ptr as *mut u8);
let bytes = ptr
.map(|ptr| Bytes::from_ffi(ptr, bytes_len, deallocation))
.ok_or_else(|| {
ArrowError::OutOfSpec(format!(
"The buffer {} is a null pointer and cannot be interpreted as a bitmap",
index
))
})?;
Ok(Bitmap::from_bytes(bytes, offset + len).slice(offset, len))
}
fn buffer_offset(array: &ArrowArray, data_type: &DataType, i: usize) -> usize {
use PhysicalType::*;
match (data_type.to_physical_type(), i) {
(LargeUtf8, 2) | (LargeBinary, 2) | (Utf8, 2) | (Binary, 2) => 0,
_ => array.offset as usize,
}
}
fn buffer_len(array: &ArrowArray, data_type: &DataType, i: usize) -> Result<usize> {
Ok(match (data_type.to_physical_type(), i) {
(PhysicalType::FixedSizeBinary, 1) => {
if let DataType::FixedSizeBinary(size) = data_type.to_logical_type() {
*size * (array.offset as usize + array.length as usize)
} else {
unreachable!()
}
}
(PhysicalType::FixedSizeList, 1) => {
if let DataType::FixedSizeList(_, size) = data_type.to_logical_type() {
*size * (array.offset as usize + array.length as usize)
} else {
unreachable!()
}
}
(PhysicalType::Utf8, 1)
| (PhysicalType::LargeUtf8, 1)
| (PhysicalType::Binary, 1)
| (PhysicalType::LargeBinary, 1)
| (PhysicalType::List, 1)
| (PhysicalType::LargeList, 1)
| (PhysicalType::Map, 1) => {
array.offset as usize + array.length as usize + 1
}
(PhysicalType::Utf8, 2) | (PhysicalType::Binary, 2) => {
let len = buffer_len(array, data_type, 1)?;
let offset_buffer = unsafe { *(array.buffers as *mut *const u8).add(1) };
let offset_buffer = offset_buffer as *const i32;
(unsafe { *offset_buffer.add(len - 1) }) as usize
}
(PhysicalType::LargeUtf8, 2) | (PhysicalType::LargeBinary, 2) => {
let len = buffer_len(array, data_type, 1)?;
let offset_buffer = unsafe { *(array.buffers as *mut *const u8).add(1) };
let offset_buffer = offset_buffer as *const i64;
(unsafe { *offset_buffer.add(len - 1) }) as usize
}
_ => array.offset as usize + array.length as usize,
})
}
fn create_child(
array: &ArrowArray,
field: &DataType,
parent: Arc<InternalArrowArray>,
index: usize,
) -> Result<ArrowArrayChild<'static>> {
let data_type = get_child(field, index)?;
assert!(index < array.n_children as usize);
assert!(!array.children.is_null());
unsafe {
let arr_ptr = *array.children.add(index);
assert!(!arr_ptr.is_null());
let arr_ptr = &*arr_ptr;
Ok(ArrowArrayChild::from_raw(arr_ptr, data_type, parent))
}
}
fn create_dictionary(
array: &ArrowArray,
data_type: &DataType,
parent: Arc<InternalArrowArray>,
) -> Result<Option<ArrowArrayChild<'static>>> {
if let DataType::Dictionary(_, values, _) = data_type {
let data_type = values.as_ref().clone();
assert!(!array.dictionary.is_null());
let array = unsafe { &*array.dictionary };
Ok(Some(ArrowArrayChild::from_raw(array, data_type, parent)))
} else {
Ok(None)
}
}
pub trait ArrowArrayRef: std::fmt::Debug {
fn deallocation(&self) -> Deallocation {
Deallocation::Foreign(self.parent().clone())
}
unsafe fn validity(&self) -> Result<Option<Bitmap>> {
if self.array().null_count() == 0 {
Ok(None)
} else {
create_bitmap(self.array(), self.deallocation(), 0).map(Some)
}
}
unsafe fn buffer<T: NativeType>(&self, index: usize) -> Result<Buffer<T>> {
create_buffer::<T>(self.array(), self.data_type(), self.deallocation(), index)
}
unsafe fn bitmap(&self, index: usize) -> Result<Bitmap> {
create_bitmap(self.array(), self.deallocation(), index)
}
unsafe fn child(&self, index: usize) -> Result<ArrowArrayChild> {
create_child(self.array(), self.data_type(), self.parent().clone(), index)
}
fn dictionary(&self) -> Result<Option<ArrowArrayChild>> {
create_dictionary(self.array(), self.data_type(), self.parent().clone())
}
fn n_buffers(&self) -> usize;
fn parent(&self) -> &Arc<InternalArrowArray>;
fn array(&self) -> &ArrowArray;
fn data_type(&self) -> &DataType;
}
#[derive(Debug)]
pub struct InternalArrowArray {
array: Box<ArrowArray>,
data_type: DataType,
}
impl InternalArrowArray {
pub fn new(array: Box<ArrowArray>, data_type: DataType) -> Self {
Self { array, data_type }
}
}
impl ArrowArrayRef for Arc<InternalArrowArray> {
fn data_type(&self) -> &DataType {
&self.data_type
}
fn parent(&self) -> &Arc<InternalArrowArray> {
self
}
fn array(&self) -> &ArrowArray {
self.array.as_ref()
}
fn n_buffers(&self) -> usize {
self.array.n_buffers as usize
}
}
#[derive(Debug)]
pub struct ArrowArrayChild<'a> {
array: &'a ArrowArray,
data_type: DataType,
parent: Arc<InternalArrowArray>,
}
impl<'a> ArrowArrayRef for ArrowArrayChild<'a> {
fn data_type(&self) -> &DataType {
&self.data_type
}
fn parent(&self) -> &Arc<InternalArrowArray> {
&self.parent
}
fn array(&self) -> &ArrowArray {
self.array
}
fn n_buffers(&self) -> usize {
self.array.n_buffers as usize
}
}
impl<'a> ArrowArrayChild<'a> {
fn from_raw(
array: &'a ArrowArray,
data_type: DataType,
parent: Arc<InternalArrowArray>,
) -> Self {
Self {
array,
data_type,
parent,
}
}
}