use std::ffi::{CString, c_void};
use std::sync::Arc;
use std::{ptr, slice};
use crate::ffi::arrow_dtype::ArrowType;
use crate::ffi::arrow_dtype::CategoricalIndexType;
use crate::ffi::schema::Schema;
use crate::structs::buffer::Buffer;
use crate::structs::shared_buffer::SharedBuffer;
use crate::{
Array, Bitmask, BooleanArray, CategoricalArray, Field, Float, FloatArray, Integer,
IntegerArray, MaskedArray, StringArray, TextArray, Vec64, vec64,
};
#[cfg(feature = "datetime")]
use crate::{DatetimeArray, IntervalUnit, TemporalArray, TimeUnit};
#[repr(C)]
pub struct ArrowArray {
pub length: i64,
pub null_count: i64,
pub offset: i64,
pub n_buffers: i64,
pub n_children: i64,
pub buffers: *mut *const u8,
pub children: *mut *mut ArrowArray,
pub dictionary: *mut ArrowArray,
pub release: Option<unsafe extern "C" fn(*mut ArrowArray)>,
pub private_data: *mut c_void,
}
impl ArrowArray {
pub fn empty() -> Self {
Self {
length: 0,
null_count: 0,
offset: 0,
n_buffers: 0,
n_children: 0,
buffers: ptr::null_mut(),
children: ptr::null_mut(),
dictionary: ptr::null_mut(),
release: None,
private_data: ptr::null_mut(),
}
}
}
#[repr(C)]
#[derive(Clone)]
pub struct ArrowSchema {
pub format: *const i8,
pub name: *const i8,
pub metadata: *const i8,
pub flags: i64,
pub n_children: i64,
pub children: *mut *mut ArrowSchema,
pub dictionary: *mut ArrowSchema,
pub release: Option<unsafe extern "C" fn(*mut ArrowSchema)>,
pub private_data: *mut c_void,
}
impl ArrowSchema {
pub fn empty() -> Self {
Self {
format: ptr::null(),
name: ptr::null(),
metadata: ptr::null(),
flags: 0,
n_children: 0,
children: ptr::null_mut(),
dictionary: ptr::null_mut(),
release: None,
private_data: ptr::null_mut(),
}
}
}
#[repr(C)]
pub struct ArrowArrayStream {
pub get_schema:
Option<unsafe extern "C" fn(stream: *mut ArrowArrayStream, out: *mut ArrowSchema) -> i32>,
pub get_next:
Option<unsafe extern "C" fn(stream: *mut ArrowArrayStream, out: *mut ArrowArray) -> i32>,
pub get_last_error: Option<unsafe extern "C" fn(stream: *mut ArrowArrayStream) -> *const i8>,
pub release: Option<unsafe extern "C" fn(stream: *mut ArrowArrayStream)>,
pub private_data: *mut c_void,
}
impl ArrowArrayStream {
pub fn empty() -> Self {
Self {
get_schema: None,
get_next: None,
get_last_error: None,
release: None,
private_data: ptr::null_mut(),
}
}
}
unsafe impl Send for ArrowArrayStream {}
unsafe impl Sync for ArrowArrayStream {}
#[allow(dead_code)]
struct Holder {
array: Arc<Array>,
_schema: Box<ArrowSchema>,
buf_ptrs: Vec64<*const u8>,
name_cstr: CString,
format_cstr: CString,
metadata_bytes: Option<Vec<u8>>,
}
struct ForeignBuffer {
ptr: *const u8,
len: usize,
array: Option<Box<ArrowArray>>,
}
impl AsRef<[u8]> for ForeignBuffer {
fn as_ref(&self) -> &[u8] {
if self.len == 0 || self.ptr.is_null() {
return &[];
}
unsafe { slice::from_raw_parts(self.ptr, self.len) }
}
}
impl Drop for ForeignBuffer {
fn drop(&mut self) {
if let Some(mut arr_box) = self.array.take() {
if let Some(release) = arr_box.release {
unsafe { release(arr_box.as_mut() as *mut ArrowArray) };
}
}
}
}
unsafe impl Send for ForeignBuffer {}
unsafe impl Sync for ForeignBuffer {}
unsafe extern "C" fn release_arrow_array(arr: *mut ArrowArray) {
if arr.is_null() || (unsafe { &*arr }).release.is_none() {
return;
}
let _: Box<Holder> = unsafe { Box::from_raw((*arr).private_data as *mut Holder) };
unsafe { ptr::write_bytes(arr, 0, 1) };
}
unsafe extern "C" fn release_arrow_schema(s: *mut ArrowSchema) {
if s.is_null() || (unsafe { &*s }).release.is_none() {
return;
}
unsafe { ptr::write_bytes(s, 0, 1) };
}
pub fn fmt_c(dtype: ArrowType) -> CString {
#[cfg(feature = "datetime")]
if let ArrowType::Timestamp(u, tz) = &dtype {
let unit_str = match u {
TimeUnit::Seconds => "tss:",
TimeUnit::Milliseconds => "tsm:",
TimeUnit::Microseconds => "tsu:",
TimeUnit::Nanoseconds => "tsn:",
TimeUnit::Days => panic!("Timestamp(Days) is invalid in Arrow C format"),
};
let tz_str = tz.as_deref().unwrap_or("");
let format_str = format!("{}{}", unit_str, tz_str);
return CString::new(format_str).expect("CString formatting failed: invalid bytes");
}
let bytes: &'static [u8] = match dtype {
ArrowType::Null => b"n",
ArrowType::Boolean => b"b",
#[cfg(feature = "extended_numeric_types")]
ArrowType::Int8 => b"c",
#[cfg(feature = "extended_numeric_types")]
ArrowType::UInt8 => b"C",
#[cfg(feature = "extended_numeric_types")]
ArrowType::Int16 => b"s",
#[cfg(feature = "extended_numeric_types")]
ArrowType::UInt16 => b"S",
ArrowType::Int32 => b"i",
ArrowType::UInt32 => b"I",
ArrowType::Int64 => b"l",
ArrowType::UInt64 => b"L",
ArrowType::Float32 => b"f",
ArrowType::Float64 => b"g",
ArrowType::String => b"u",
#[cfg(feature = "large_string")]
ArrowType::LargeString => b"U",
ArrowType::Utf8View => b"vu",
#[cfg(feature = "datetime")]
ArrowType::Date32 => b"tdD",
#[cfg(feature = "datetime")]
ArrowType::Date64 => b"tdm",
#[cfg(feature = "datetime")]
ArrowType::Time32(u) => match u {
TimeUnit::Seconds => b"tts",
TimeUnit::Milliseconds => b"ttm",
_ => panic!("Time32 supports Seconds or Milliseconds only"),
},
#[cfg(feature = "datetime")]
ArrowType::Time64(u) => match u {
TimeUnit::Microseconds => b"ttu",
TimeUnit::Nanoseconds => b"ttn",
_ => panic!("Time64 supports Microseconds or Nanoseconds only"),
},
#[cfg(feature = "datetime")]
ArrowType::Duration32(u) => match u {
TimeUnit::Seconds => b"tDs",
TimeUnit::Milliseconds => b"tDm",
_ => panic!("Duration32 supports Seconds or Milliseconds only"),
},
#[cfg(feature = "datetime")]
ArrowType::Duration64(u) => match u {
TimeUnit::Microseconds => b"tDu",
TimeUnit::Nanoseconds => b"tDn",
_ => panic!("Duration64 supports Microseconds or Nanoseconds only"),
},
#[cfg(feature = "datetime")]
ArrowType::Timestamp(_, _) => {
unreachable!("Timestamp case handled above")
}
#[cfg(feature = "datetime")]
ArrowType::Interval(u) => match u {
IntervalUnit::YearMonth => b"tiM",
IntervalUnit::DaysTime => b"tiD",
IntervalUnit::MonthDaysNs => b"tin",
},
ArrowType::Dictionary(idx) => match idx {
#[cfg(feature = "default_categorical_8")]
CategoricalIndexType::UInt8 => b"C",
#[cfg(feature = "extended_categorical")]
CategoricalIndexType::UInt16 => b"S",
#[cfg(any(not(feature = "default_categorical_8"), feature = "extended_categorical"))]
CategoricalIndexType::UInt32 => b"I",
#[cfg(feature = "extended_categorical")]
CategoricalIndexType::UInt64 => b"L",
},
};
CString::new(bytes).expect("CString formatting failed: invalid bytes")
}
#[cfg(feature = "datetime")]
fn validate_temporal_field(array: &Array, dtype: &ArrowType) {
use crate::{TemporalArray, TimeUnit};
match (array, dtype) {
(Array::TemporalArray(TemporalArray::Datetime32(arr)), ArrowType::Date32) => {
assert!(
arr.time_unit == TimeUnit::Days,
"FFI export: Field=Date32 requires Datetime32(Days); got {:?}",
arr.time_unit
);
}
(Array::TemporalArray(TemporalArray::Datetime64(arr)), ArrowType::Date64) => {
assert!(
arr.time_unit == TimeUnit::Milliseconds,
"FFI export: Field=Date64 requires Datetime64(Milliseconds); got {:?}",
arr.time_unit
);
}
(Array::TemporalArray(TemporalArray::Datetime32(arr)), ArrowType::Time32(u)) => {
assert!(
arr.time_unit == *u,
"FFI export: Field=Time32({u:?}) requires Datetime32({u:?}); got {:?}",
arr.time_unit
);
}
(Array::TemporalArray(TemporalArray::Datetime64(arr)), ArrowType::Time64(u)) => {
assert!(
arr.time_unit == *u,
"FFI export: Field=Time64({u:?}) requires Datetime64({u:?}); got {:?}",
arr.time_unit
);
}
(Array::TemporalArray(TemporalArray::Datetime64(arr)), ArrowType::Timestamp(u, _tz)) => {
assert!(
arr.time_unit == *u,
"FFI export: Field=Timestamp({u:?}) requires Datetime64({u:?}); got {:?}",
arr.time_unit
);
}
(Array::TemporalArray(TemporalArray::Datetime32(arr)), ArrowType::Duration32(u)) => {
assert!(
arr.time_unit == *u,
"FFI export: Field=Duration32({u:?}) requires Datetime32({u:?}); got {:?}",
arr.time_unit
);
}
(Array::TemporalArray(TemporalArray::Datetime64(arr)), ArrowType::Duration64(u)) => {
assert!(
arr.time_unit == *u,
"FFI export: Field=Duration64({u:?}) requires Datetime64({u:?}); got {:?}",
arr.time_unit
);
}
_ => {}
}
}
pub fn export_to_c(array: Arc<Array>, schema: Schema) -> (*mut ArrowArray, *mut ArrowSchema) {
#[cfg(feature = "datetime")]
{
let field_ty = &schema.fields[0].dtype;
validate_temporal_field(&*array, field_ty);
}
match &*array {
Array::TextArray(TextArray::String32(s)) => {
export_string_array_to_c(&array, schema, s.len() as i64)
}
#[cfg(feature = "large_string")]
Array::TextArray(TextArray::String64(s)) => {
export_string_array_to_c(&array, schema, s.len() as i64)
}
#[cfg(any(not(feature = "default_categorical_8"), feature = "extended_categorical"))]
Array::TextArray(TextArray::Categorical32(cat)) => export_categorical_array_to_c(
&array,
schema,
cat.data.len() as i64,
&cat.unique_values,
32,
),
#[cfg(feature = "default_categorical_8")]
Array::TextArray(TextArray::Categorical8(cat)) => export_categorical_array_to_c(
&array,
schema,
cat.data.len() as i64,
&cat.unique_values,
8,
),
#[cfg(feature = "extended_categorical")]
Array::TextArray(TextArray::Categorical16(cat)) => export_categorical_array_to_c(
&array,
schema,
cat.data.len() as i64,
&cat.unique_values,
16,
),
#[cfg(feature = "extended_categorical")]
Array::TextArray(TextArray::Categorical64(cat)) => export_categorical_array_to_c(
&array,
schema,
cat.data.len() as i64,
&cat.unique_values,
64,
),
_ => {
let (data_ptr, len, _) = array.data_ptr_and_byte_len();
let (mask_ptr, _) = array
.null_mask_ptr_and_byte_len()
.unwrap_or((ptr::null(), 0));
let mut buf_ptrs = vec64![mask_ptr, data_ptr];
let name_cstr = CString::new(schema.fields[0].name.clone()).unwrap();
check_alignment(&mut buf_ptrs, len as i64);
create_arrow_export(array, schema, buf_ptrs, 2, len as i64, name_cstr)
}
}
}
fn export_string_array_to_c(
array: &Arc<Array>,
schema: Schema,
len: i64,
) -> (*mut ArrowArray, *mut ArrowSchema) {
let (offsets_ptr, _) = array.offsets_ptr_and_len().unwrap();
let (values_ptr, values_len, _) = array.data_ptr_and_byte_len();
let (null_ptr, _) = array
.null_mask_ptr_and_byte_len()
.unwrap_or((ptr::null(), 0));
let values_buf_ptr = if values_len > 0 {
values_ptr
} else {
ptr::null()
};
let mut buf_ptrs = vec64![null_ptr, offsets_ptr, values_buf_ptr];
let name_cstr = CString::new(schema.fields[0].name.clone()).unwrap();
check_alignment(&mut buf_ptrs, len);
create_arrow_export(array.clone(), schema, buf_ptrs, 3, len, name_cstr)
}
fn export_categorical_array_to_c(
array: &Arc<Array>,
schema: Schema,
codes_len: i64,
unique_values: &Vec64<String>,
index_bits: usize,
) -> (*mut ArrowArray, *mut ArrowSchema) {
let codes_ptr = array.data_ptr_and_byte_len().0;
let null_ptr = array
.null_mask_ptr_and_byte_len()
.map_or(ptr::null(), |(p, _)| p);
let mut buf_ptrs = vec64![null_ptr, codes_ptr];
check_alignment(&mut buf_ptrs, codes_len);
let dict_offsets: Vec64<u32> = {
let mut offsets = Vec64::with_capacity(unique_values.len() + 1);
let mut total = 0u32;
offsets.push(0);
for s in unique_values {
total = total
.checked_add(s.len() as u32)
.expect("String data too large for u32 offset");
offsets.push(total);
}
offsets
};
let dict_data: Vec64<u8> = unique_values
.iter()
.flat_map(|s| s.as_bytes())
.copied()
.collect();
let dict_array = StringArray {
offsets: dict_offsets.into(),
data: dict_data.into(),
null_mask: None,
};
let dict_schema = Schema::from(vec![Field::new(
"dictionary",
ArrowType::String,
false,
None,
)]);
let dict_array_arc = Arc::new(Array::TextArray(TextArray::String32(Arc::new(dict_array))));
let (dict_arr_ptr, dict_schema_ptr) = export_to_c(dict_array_arc, dict_schema);
let name_cstr = CString::new(schema.fields[0].name.clone()).unwrap();
let mut field = schema.fields[0].clone();
field.dtype = match index_bits {
#[cfg(feature = "default_categorical_8")]
8 => ArrowType::Dictionary(crate::ffi::arrow_dtype::CategoricalIndexType::UInt8),
#[cfg(feature = "extended_categorical")]
16 => ArrowType::Dictionary(crate::ffi::arrow_dtype::CategoricalIndexType::UInt16),
#[cfg(any(not(feature = "default_categorical_8"), feature = "extended_categorical"))]
32 => ArrowType::Dictionary(crate::ffi::arrow_dtype::CategoricalIndexType::UInt32),
#[cfg(feature = "extended_categorical")]
64 => ArrowType::Dictionary(crate::ffi::arrow_dtype::CategoricalIndexType::UInt64),
_ => panic!("Invalid index bits for categorical array"),
};
let format_cstr = fmt_c(field.dtype.clone());
let format_ptr = format_cstr.as_ptr();
let metadata_bytes = if field.metadata.is_empty() {
None
} else {
Some(encode_arrow_metadata(&field.metadata))
};
let metadata_ptr = metadata_bytes
.as_ref()
.map(|b| b.as_ptr() as *const i8)
.unwrap_or(ptr::null());
let arr = Box::new(ArrowArray {
length: codes_len,
null_count: if buf_ptrs[0].is_null() { 0 } else { -1 },
offset: 0,
n_buffers: 2,
n_children: 0,
buffers: buf_ptrs.as_mut_ptr(),
children: ptr::null_mut(),
dictionary: dict_arr_ptr,
release: Some(release_arrow_array),
private_data: ptr::null_mut(),
});
let flags = if field.nullable { 1 } else { 0 };
let schema_box = Box::new(ArrowSchema {
format: format_ptr,
name: name_cstr.as_ptr(),
metadata: metadata_ptr,
flags,
n_children: 0,
children: ptr::null_mut(),
dictionary: dict_schema_ptr,
release: Some(release_arrow_schema),
private_data: ptr::null_mut(),
});
let holder = Box::new(Holder {
array: array.clone(),
_schema: schema_box.clone(),
buf_ptrs,
name_cstr,
format_cstr,
metadata_bytes,
});
let arr_ptr = Box::into_raw(arr);
unsafe {
(*arr_ptr).private_data = Box::into_raw(holder) as *mut c_void;
}
(arr_ptr, Box::into_raw(schema_box))
}
pub unsafe fn import_from_c(arr_ptr: *const ArrowArray, sch_ptr: *const ArrowSchema) -> Arc<Array> {
if arr_ptr.is_null() || sch_ptr.is_null() {
panic!("FFI import_from_c: null pointer");
}
let arr = unsafe { &*arr_ptr };
let sch = unsafe { &*sch_ptr };
let fmt = unsafe { std::ffi::CStr::from_ptr(sch.format).to_bytes() };
let is_dict = !arr.dictionary.is_null() || !sch.dictionary.is_null();
let dtype = match fmt {
b"n" => ArrowType::Null,
b"b" => ArrowType::Boolean,
#[cfg(feature = "extended_numeric_types")]
b"c" => ArrowType::Int8,
#[cfg(feature = "extended_numeric_types")]
b"C" => ArrowType::UInt8,
#[cfg(feature = "extended_numeric_types")]
b"s" => ArrowType::Int16,
#[cfg(feature = "extended_numeric_types")]
b"S" => ArrowType::UInt16,
b"i" => ArrowType::Int32,
b"I" => ArrowType::UInt32,
b"l" => ArrowType::Int64,
b"L" => ArrowType::UInt64,
b"f" => ArrowType::Float32,
b"g" => ArrowType::Float64,
b"u" => ArrowType::String,
#[cfg(feature = "large_string")]
b"U" => ArrowType::LargeString,
b"vu" => ArrowType::Utf8View,
#[cfg(feature = "datetime")]
b"tdD" => ArrowType::Date32,
#[cfg(feature = "datetime")]
b"tdm" => ArrowType::Date64,
#[cfg(feature = "datetime")]
b"tts" => ArrowType::Time32(crate::TimeUnit::Seconds),
#[cfg(feature = "datetime")]
b"ttm" => ArrowType::Time32(crate::TimeUnit::Milliseconds),
#[cfg(feature = "datetime")]
b"ttu" => ArrowType::Time64(crate::TimeUnit::Microseconds),
#[cfg(feature = "datetime")]
b"ttn" => ArrowType::Time64(crate::TimeUnit::Nanoseconds),
#[cfg(feature = "datetime")]
b"tDs" => ArrowType::Duration32(crate::TimeUnit::Seconds),
#[cfg(feature = "datetime")]
b"tDm" => ArrowType::Duration32(crate::TimeUnit::Milliseconds),
#[cfg(feature = "datetime")]
b"tDu" => ArrowType::Duration64(crate::TimeUnit::Microseconds),
#[cfg(feature = "datetime")]
b"tDn" => ArrowType::Duration64(crate::TimeUnit::Nanoseconds),
#[cfg(feature = "datetime")]
b"tiM" => ArrowType::Interval(IntervalUnit::YearMonth),
#[cfg(feature = "datetime")]
b"tiD" => ArrowType::Interval(IntervalUnit::DaysTime),
#[cfg(feature = "datetime")]
b"tin" => ArrowType::Interval(IntervalUnit::MonthDaysNs),
#[cfg(feature = "datetime")]
_ if fmt.starts_with(b"tss")
|| fmt.starts_with(b"tsm")
|| fmt.starts_with(b"tsu")
|| fmt.starts_with(b"tsn") =>
{
let unit = match &fmt[..3] {
b"tss" => crate::TimeUnit::Seconds,
b"tsm" => crate::TimeUnit::Milliseconds,
b"tsu" => crate::TimeUnit::Microseconds,
b"tsn" => crate::TimeUnit::Nanoseconds,
_ => unreachable!(),
};
let tz = if fmt.len() > 4 {
let tz_bytes = &fmt[4..];
let tz_str = String::from_utf8_lossy(tz_bytes).into_owned();
if tz_str.is_empty() {
None
} else {
Some(tz_str)
}
} else {
None
};
ArrowType::Timestamp(unit, tz)
}
o => panic!("unsupported format {:?}", o),
};
#[cfg(all(feature = "default_categorical_8", not(feature = "extended_categorical")))]
if is_dict && matches!(dtype, ArrowType::Int32 | ArrowType::UInt32) {
return unsafe { import_categorical_narrow_to_u8(arr, sch, None) };
}
#[allow(unreachable_code)]
let maybe_cat_index = if is_dict {
Some(match dtype {
#[cfg(feature = "extended_numeric_types")]
#[cfg(feature = "default_categorical_8")]
ArrowType::Int8 | ArrowType::UInt8 => CategoricalIndexType::UInt8,
#[cfg(feature = "extended_numeric_types")]
#[cfg(feature = "extended_categorical")]
ArrowType::Int16 | ArrowType::UInt16 => CategoricalIndexType::UInt16,
#[cfg(any(not(feature = "default_categorical_8"), feature = "extended_categorical"))]
ArrowType::Int32 | ArrowType::UInt32 => CategoricalIndexType::UInt32,
#[cfg(feature = "extended_numeric_types")]
#[cfg(feature = "extended_categorical")]
ArrowType::Int64 | ArrowType::UInt64 => CategoricalIndexType::UInt64,
_ => panic!(
"FFI import_from_c: unsupported dictionary index type {:?}",
dtype
),
})
} else {
None
};
if let Some(idx_ty) = maybe_cat_index {
return unsafe { import_categorical(arr, sch, idx_ty, None) };
}
if is_dict {
unsafe {
import_categorical(
arr,
sch,
match dtype {
ArrowType::Dictionary(i) => i,
_ => panic!("Expected Dictionary type"),
},
None,
)
}
} else {
match dtype {
ArrowType::Boolean => unsafe { import_boolean(arr, None) },
#[cfg(feature = "extended_numeric_types")]
ArrowType::Int8 => unsafe { import_integer::<i8>(arr, None, Array::from_int8) },
#[cfg(feature = "extended_numeric_types")]
ArrowType::UInt8 => unsafe { import_integer::<u8>(arr, None, Array::from_uint8) },
#[cfg(feature = "extended_numeric_types")]
ArrowType::Int16 => unsafe { import_integer::<i16>(arr, None, Array::from_int16) },
#[cfg(feature = "extended_numeric_types")]
ArrowType::UInt16 => unsafe { import_integer::<u16>(arr, None, Array::from_uint16) },
ArrowType::Int32 => unsafe { import_integer::<i32>(arr, None, Array::from_int32) },
ArrowType::UInt32 => unsafe { import_integer::<u32>(arr, None, Array::from_uint32) },
ArrowType::Int64 => unsafe { import_integer::<i64>(arr, None, Array::from_int64) },
ArrowType::UInt64 => unsafe { import_integer::<u64>(arr, None, Array::from_uint64) },
ArrowType::Float32 => unsafe { import_float::<f32>(arr, None, Array::from_float32) },
ArrowType::Float64 => unsafe { import_float::<f64>(arr, None, Array::from_float64) },
ArrowType::String => unsafe { import_utf8::<u32>(arr, None) },
#[cfg(feature = "large_string")]
ArrowType::LargeString => unsafe { import_utf8::<u64>(arr, None) },
ArrowType::Utf8View => unsafe { import_utf8_view(arr, None) },
#[cfg(feature = "datetime")]
ArrowType::Date32 => unsafe {
import_datetime::<i32>(arr, None, crate::TimeUnit::Days)
},
#[cfg(feature = "datetime")]
ArrowType::Date64 => unsafe {
import_datetime::<i64>(arr, None, crate::TimeUnit::Milliseconds)
},
#[cfg(feature = "datetime")]
ArrowType::Time32(u) => unsafe { import_datetime::<i32>(arr, None, u) },
#[cfg(feature = "datetime")]
ArrowType::Time64(u) => unsafe { import_datetime::<i64>(arr, None, u) },
#[cfg(feature = "datetime")]
ArrowType::Timestamp(u, _tz) => unsafe { import_datetime::<i64>(arr, None, u) },
#[cfg(feature = "datetime")]
ArrowType::Duration32(u) => unsafe { import_datetime::<i32>(arr, None, u) },
#[cfg(feature = "datetime")]
ArrowType::Duration64(u) => unsafe { import_datetime::<i64>(arr, None, u) },
#[cfg(feature = "datetime")]
ArrowType::Interval(_u) => {
panic!("FFI import_from_c: Arrow Interval types are not yet supported");
}
ArrowType::Null => {
panic!("FFI import_from_c: Arrow Null arrays types are not yet supported")
}
ArrowType::Dictionary(idx) => {
if arr.dictionary.is_null() || sch.dictionary.is_null() {
panic!(
"FFI import_from_c: dictionary pointers missing for dictionary-encoded array"
);
}
unsafe { import_categorical(arr, sch, idx, None) }
}
}
}
}
pub unsafe fn import_from_c_owned(
arr_box: Box<ArrowArray>,
sch_box: Box<ArrowSchema>,
) -> (Arc<Array>, crate::Field) {
let arr_ptr = &*arr_box as *const ArrowArray;
let sch_ptr = &*sch_box as *const ArrowSchema;
let arr = unsafe { &*arr_ptr };
let sch = unsafe { &*sch_ptr };
if arr.release.is_none() {
panic!("FFI import_from_c_owned: ArrowArray has no release callback");
}
let mut field = unsafe { field_from_c_schema(sch) };
let dtype = field.dtype.clone();
let is_dict = !arr.dictionary.is_null() || !sch.dictionary.is_null();
if is_dict {
drop(sch_box);
let result = unsafe {
import_categorical(
arr,
sch,
match &dtype {
ArrowType::Dictionary(i) => i.clone(),
#[cfg(feature = "extended_numeric_types")]
ArrowType::Int8 | ArrowType::UInt8 => {
#[cfg(feature = "default_categorical_8")]
{
CategoricalIndexType::UInt8
}
#[cfg(not(feature = "default_categorical_8"))]
panic!("default_categorical_8 not enabled")
}
#[cfg(feature = "extended_numeric_types")]
ArrowType::Int16 | ArrowType::UInt16 => {
#[cfg(feature = "extended_categorical")]
{
CategoricalIndexType::UInt16
}
#[cfg(not(feature = "extended_categorical"))]
panic!("Extended categorical not enabled")
}
#[cfg(any(not(feature = "default_categorical_8"), feature = "extended_categorical"))]
ArrowType::Int32 | ArrowType::UInt32 => CategoricalIndexType::UInt32,
#[cfg(feature = "extended_numeric_types")]
ArrowType::Int64 | ArrowType::UInt64 => {
#[cfg(feature = "extended_categorical")]
{
CategoricalIndexType::UInt64
}
#[cfg(not(feature = "extended_categorical"))]
panic!("Extended categorical not enabled")
}
_ => panic!("FFI: unsupported dictionary index type {:?}", dtype),
},
Some(arr_box),
)
};
return (result, field);
}
drop(sch_box);
let array = unsafe {
match dtype {
ArrowType::Boolean => import_boolean(arr, Some(arr_box)),
#[cfg(feature = "extended_numeric_types")]
ArrowType::Int8 => import_integer::<i8>(arr, Some(arr_box), Array::from_int8),
#[cfg(feature = "extended_numeric_types")]
ArrowType::UInt8 => import_integer::<u8>(arr, Some(arr_box), Array::from_uint8),
#[cfg(feature = "extended_numeric_types")]
ArrowType::Int16 => import_integer::<i16>(arr, Some(arr_box), Array::from_int16),
#[cfg(feature = "extended_numeric_types")]
ArrowType::UInt16 => import_integer::<u16>(arr, Some(arr_box), Array::from_uint16),
ArrowType::Int32 => import_integer::<i32>(arr, Some(arr_box), Array::from_int32),
ArrowType::UInt32 => import_integer::<u32>(arr, Some(arr_box), Array::from_uint32),
ArrowType::Int64 => import_integer::<i64>(arr, Some(arr_box), Array::from_int64),
ArrowType::UInt64 => import_integer::<u64>(arr, Some(arr_box), Array::from_uint64),
ArrowType::Float32 => import_float::<f32>(arr, Some(arr_box), Array::from_float32),
ArrowType::Float64 => import_float::<f64>(arr, Some(arr_box), Array::from_float64),
ArrowType::String => import_utf8::<u32>(arr, Some(arr_box)),
#[cfg(feature = "large_string")]
ArrowType::LargeString => import_utf8::<u64>(arr, Some(arr_box)),
ArrowType::Utf8View => import_utf8_view(arr, Some(arr_box)),
#[cfg(feature = "datetime")]
ArrowType::Date32 => import_datetime::<i32>(arr, Some(arr_box), crate::TimeUnit::Days),
#[cfg(feature = "datetime")]
ArrowType::Date64 => {
import_datetime::<i64>(arr, Some(arr_box), crate::TimeUnit::Milliseconds)
}
#[cfg(feature = "datetime")]
ArrowType::Time32(u) => import_datetime::<i32>(arr, Some(arr_box), u),
#[cfg(feature = "datetime")]
ArrowType::Time64(u) => import_datetime::<i64>(arr, Some(arr_box), u),
#[cfg(feature = "datetime")]
ArrowType::Timestamp(u, ref _tz) => import_datetime::<i64>(arr, Some(arr_box), u),
#[cfg(feature = "datetime")]
ArrowType::Duration32(u) => import_datetime::<i32>(arr, Some(arr_box), u),
#[cfg(feature = "datetime")]
ArrowType::Duration64(u) => import_datetime::<i64>(arr, Some(arr_box), u),
#[cfg(feature = "datetime")]
ArrowType::Interval(_u) => {
panic!("FFI import_from_c_owned: Arrow Interval types are not yet supported");
}
ArrowType::Null => {
panic!("FFI import_from_c_owned: Arrow Null arrays types are not yet supported")
}
ArrowType::Dictionary(_) => unreachable!("Dictionary handled above"),
}
};
if field.dtype == ArrowType::Utf8View {
field.dtype = ArrowType::String;
}
(array, field)
}
unsafe fn import_array_zero_copy(
arr_box: Box<ArrowArray>,
dtype: ArrowType,
sch_ptr: *const ArrowSchema,
) -> Arc<Array> {
let arr = unsafe { &*(&*arr_box as *const ArrowArray) };
if !arr.dictionary.is_null() {
let sch = unsafe { &*sch_ptr };
#[cfg(all(feature = "default_categorical_8", not(feature = "extended_categorical")))]
if matches!(dtype, ArrowType::Int32 | ArrowType::UInt32 | ArrowType::Dictionary(_)) {
return unsafe { import_categorical_narrow_to_u8(arr, sch, Some(arr_box)) };
}
let idx_type = match dtype.clone() {
ArrowType::Dictionary(idx) => idx,
_ => {
#[allow(unused_imports)]
use crate::ffi::arrow_dtype::CategoricalIndexType;
match dtype {
#[cfg(all(
feature = "extended_numeric_types",
feature = "default_categorical_8"
))]
ArrowType::Int8 | ArrowType::UInt8 => CategoricalIndexType::UInt8,
#[cfg(all(
feature = "extended_numeric_types",
feature = "extended_categorical"
))]
ArrowType::Int16 | ArrowType::UInt16 => CategoricalIndexType::UInt16,
#[cfg(any(not(feature = "default_categorical_8"), feature = "extended_categorical"))]
ArrowType::Int32 | ArrowType::UInt32 => CategoricalIndexType::UInt32,
#[cfg(all(
feature = "extended_numeric_types",
feature = "extended_categorical"
))]
ArrowType::Int64 | ArrowType::UInt64 => CategoricalIndexType::UInt64,
_ => panic!(
"import_array_zero_copy: unsupported dictionary index type {:?}",
dtype
),
}
}
};
return unsafe { import_categorical(arr, sch, idx_type, Some(arr_box)) };
}
unsafe {
match dtype {
ArrowType::Boolean => import_boolean(arr, Some(arr_box)),
#[cfg(feature = "extended_numeric_types")]
ArrowType::Int8 => import_integer::<i8>(arr, Some(arr_box), Array::from_int8),
#[cfg(feature = "extended_numeric_types")]
ArrowType::UInt8 => import_integer::<u8>(arr, Some(arr_box), Array::from_uint8),
#[cfg(feature = "extended_numeric_types")]
ArrowType::Int16 => import_integer::<i16>(arr, Some(arr_box), Array::from_int16),
#[cfg(feature = "extended_numeric_types")]
ArrowType::UInt16 => import_integer::<u16>(arr, Some(arr_box), Array::from_uint16),
ArrowType::Int32 => import_integer::<i32>(arr, Some(arr_box), Array::from_int32),
ArrowType::UInt32 => import_integer::<u32>(arr, Some(arr_box), Array::from_uint32),
ArrowType::Int64 => import_integer::<i64>(arr, Some(arr_box), Array::from_int64),
ArrowType::UInt64 => import_integer::<u64>(arr, Some(arr_box), Array::from_uint64),
ArrowType::Float32 => import_float::<f32>(arr, Some(arr_box), Array::from_float32),
ArrowType::Float64 => import_float::<f64>(arr, Some(arr_box), Array::from_float64),
ArrowType::String => import_utf8::<u32>(arr, Some(arr_box)),
#[cfg(feature = "large_string")]
ArrowType::LargeString => import_utf8::<u64>(arr, Some(arr_box)),
ArrowType::Utf8View => import_utf8_view(arr, Some(arr_box)),
#[cfg(feature = "datetime")]
ArrowType::Date32 => import_datetime::<i32>(arr, Some(arr_box), crate::TimeUnit::Days),
#[cfg(feature = "datetime")]
ArrowType::Date64 => {
import_datetime::<i64>(arr, Some(arr_box), crate::TimeUnit::Milliseconds)
}
#[cfg(feature = "datetime")]
ArrowType::Time32(u) => import_datetime::<i32>(arr, Some(arr_box), u),
#[cfg(feature = "datetime")]
ArrowType::Time64(u) => import_datetime::<i64>(arr, Some(arr_box), u),
#[cfg(feature = "datetime")]
ArrowType::Timestamp(u, ref _tz) => import_datetime::<i64>(arr, Some(arr_box), u),
#[cfg(feature = "datetime")]
ArrowType::Duration32(u) => import_datetime::<i32>(arr, Some(arr_box), u),
#[cfg(feature = "datetime")]
ArrowType::Duration64(u) => import_datetime::<i64>(arr, Some(arr_box), u),
#[cfg(feature = "datetime")]
ArrowType::Interval(_u) => {
panic!("import_array_zero_copy: Interval types are not yet supported");
}
ArrowType::Null => {
panic!("import_array_zero_copy: Null array types are not yet supported");
}
ArrowType::Dictionary(_) => unreachable!("Dictionary handled above"),
}
}
}
unsafe fn import_integer<T: Integer>(
arr: &ArrowArray,
ownership: Option<Box<ArrowArray>>,
tag: fn(IntegerArray<T>) -> Array,
) -> Arc<Array> {
let len = arr.length as usize;
let buffers = unsafe { slice::from_raw_parts(arr.buffers, 2) };
let data_ptr = buffers[1] as *const T;
let data_len_bytes = len * std::mem::size_of::<T>();
let null_mask = if !buffers[0].is_null() {
Some(unsafe { Bitmask::from_raw_slice(buffers[0], len) })
} else {
None
};
let buffer: Buffer<T> = if len == 0 {
Buffer::default()
} else if let Some(arr_box) = ownership {
let foreign = ForeignBuffer {
ptr: data_ptr as *const u8,
len: data_len_bytes,
array: Some(arr_box),
};
let shared = SharedBuffer::from_owner(foreign);
Buffer::from_shared(shared)
} else {
let data = unsafe { slice::from_raw_parts(data_ptr, len) };
Vec64::from(data).into()
};
let int_arr = IntegerArray::<T>::new(buffer, null_mask);
Arc::new(tag(int_arr))
}
unsafe fn import_float<T>(
arr: &ArrowArray,
ownership: Option<Box<ArrowArray>>,
tag: fn(FloatArray<T>) -> Array,
) -> Arc<Array>
where
T: Float,
FloatArray<T>: 'static,
{
let len = arr.length as usize;
let buffers = unsafe { slice::from_raw_parts(arr.buffers, 2) };
let data_ptr = buffers[1] as *const T;
let data_len_bytes = len * std::mem::size_of::<T>();
let null_mask = if !buffers[0].is_null() {
Some(unsafe { Bitmask::from_raw_slice(buffers[0], len) })
} else {
None
};
let buffer: Buffer<T> = if len == 0 {
Buffer::default()
} else if let Some(arr_box) = ownership {
let foreign = ForeignBuffer {
ptr: data_ptr as *const u8,
len: data_len_bytes,
array: Some(arr_box),
};
let shared = SharedBuffer::from_owner(foreign);
Buffer::from_shared(shared)
} else {
let data = unsafe { slice::from_raw_parts(data_ptr, len) };
Vec64::from(data).into()
};
let float_arr = FloatArray::<T>::new(buffer, null_mask);
Arc::new(tag(float_arr))
}
unsafe fn import_boolean(arr: &ArrowArray, ownership: Option<Box<ArrowArray>>) -> Arc<Array> {
let len = arr.length as usize;
let buffers = unsafe { slice::from_raw_parts(arr.buffers, 2) };
let data_ptr = buffers[1];
let data_len = (len + 7) / 8;
let null_mask = if !buffers[0].is_null() {
Some(unsafe { Bitmask::from_raw_slice(buffers[0], len) })
} else {
None
};
let buffer: Buffer<u8> = if len == 0 {
Buffer::default()
} else if let Some(arr_box) = ownership {
let foreign = ForeignBuffer {
ptr: data_ptr as *const u8,
len: data_len,
array: Some(arr_box),
};
let shared = SharedBuffer::from_owner(foreign);
Buffer::from_shared(shared)
} else {
let data = unsafe { slice::from_raw_parts(data_ptr, data_len) };
Vec64::from(data).into()
};
let bool_mask = Bitmask::new(buffer, len);
let bool_arr = BooleanArray::new(bool_mask, null_mask);
Arc::new(Array::BooleanArray(bool_arr.into()))
}
unsafe fn import_utf8<T: Integer>(
arr: &ArrowArray,
ownership: Option<Box<ArrowArray>>,
) -> Arc<Array> {
let len = arr.length as usize;
if len == 0 {
return Arc::new(if std::mem::size_of::<T>() == 4 {
Array::from_string32(StringArray::<u32>::default())
} else {
#[cfg(feature = "large_string")]
{
Array::from_string64(StringArray::<u64>::default())
}
#[cfg(not(feature = "large_string"))]
{
panic!("LargeUtf8 (u64 offsets) requires the 'large_string' feature")
}
});
}
let buffers = unsafe { std::slice::from_raw_parts(arr.buffers, 3) };
let null_ptr = buffers[0];
let offsets_ptr = buffers[1];
let values_ptr = buffers[2];
let offsets_slice = unsafe { std::slice::from_raw_parts(offsets_ptr as *const T, len + 1) };
assert_eq!(
offsets_slice.len(),
len + 1,
"UTF8: offsets length must be len+1"
);
assert_eq!(
offsets_slice[0].to_usize(),
0,
"UTF8: first offset must be 0"
);
let mut prev = 0usize;
for (i, off) in offsets_slice.iter().enumerate().take(len + 1) {
let cur = off.to_usize().expect("Error: could not unwrap usize");
assert!(
cur >= prev,
"UTF8: offsets not monotonically non-decreasing at {i}: {cur} < {prev}"
);
prev = cur;
}
let data_len = offsets_slice[len].to_usize();
let null_mask = if !null_ptr.is_null() {
Some(unsafe { Bitmask::from_raw_slice(null_ptr, len) })
} else {
None
};
let offsets = Vec64::from(offsets_slice);
let values_buffer: Buffer<u8> = if data_len == 0 {
Buffer::default()
} else if let Some(arr_box) = ownership {
let foreign = ForeignBuffer {
ptr: values_ptr as *const u8,
len: data_len,
array: Some(arr_box),
};
let shared = SharedBuffer::from_owner(foreign);
Buffer::from_shared(shared)
} else {
let data = unsafe { std::slice::from_raw_parts(values_ptr, data_len) };
Vec64::from(data).into()
};
let str_arr = StringArray::<T>::new(values_buffer, null_mask, offsets);
#[cfg(feature = "large_string")]
if std::any::TypeId::of::<T>() == std::any::TypeId::of::<u64>() {
return Arc::new(Array::TextArray(TextArray::String64(Arc::new(unsafe {
std::mem::transmute::<StringArray<T>, StringArray<u64>>(str_arr)
}))));
}
if std::any::TypeId::of::<T>() == std::any::TypeId::of::<u32>() {
Arc::new(Array::TextArray(TextArray::String32(Arc::new(unsafe {
std::mem::transmute::<StringArray<T>, StringArray<u32>>(str_arr)
}))))
} else {
panic!("Unsupported offset type for StringArray (expected u32 or u64)");
}
}
unsafe fn import_utf8_view(arr: &ArrowArray, ownership: Option<Box<ArrowArray>>) -> Arc<Array> {
let len = arr.length as usize;
if len == 0 {
if let Some(mut arr_box) = ownership {
if let Some(release) = arr_box.release {
unsafe { release(&mut *arr_box as *mut ArrowArray) };
}
}
return Arc::new(Array::from_string32(StringArray::<u32>::default()));
}
let n_buffers = arr.n_buffers as usize;
let buffers = unsafe { slice::from_raw_parts(arr.buffers, n_buffers) };
let null_ptr = buffers[0];
let views_ptr = buffers[1] as *const u8;
let n_variadic = if n_buffers > 3 { n_buffers - 3 } else { 0 };
let null_bitmap: Option<&[u8]> = if !null_ptr.is_null() {
let bitmap_bytes = (len + 7) / 8;
Some(unsafe { slice::from_raw_parts(null_ptr, bitmap_bytes) })
} else {
None
};
let mut offsets = Vec64::<u32>::with_capacity(len + 1);
let mut data = Vec64::<u8>::new();
offsets.push(0u32);
for i in 0..len {
if let Some(bitmap) = null_bitmap {
let byte_idx = i / 8;
let bit_idx = i % 8;
if (bitmap[byte_idx] & (1 << bit_idx)) == 0 {
offsets.push(data.len() as u32);
continue;
}
}
let view = unsafe { views_ptr.add(i * 16) };
let str_len = unsafe { *(view as *const i32) } as usize;
if str_len <= 12 {
let inline_data = unsafe { slice::from_raw_parts(view.add(4), str_len) };
data.extend_from_slice(inline_data);
} else {
let buf_index = unsafe { *(view.add(8) as *const i32) } as usize;
let buf_offset = unsafe { *(view.add(12) as *const i32) } as usize;
assert!(
buf_index < n_variadic,
"Utf8View: buf_index {} out of range (have {} variadic buffers)",
buf_index,
n_variadic
);
let data_buf = buffers[2 + buf_index] as *const u8;
let str_data = unsafe { slice::from_raw_parts(data_buf.add(buf_offset), str_len) };
data.extend_from_slice(str_data);
}
offsets.push(data.len() as u32);
}
let null_mask = if !null_ptr.is_null() {
Some(unsafe { Bitmask::from_raw_slice(null_ptr, len) })
} else {
None
};
let str_arr = StringArray::<u32>::new(data, null_mask, offsets);
if let Some(mut arr_box) = ownership {
if let Some(release) = arr_box.release {
unsafe { release(&mut *arr_box as *mut ArrowArray) };
}
}
Arc::new(Array::TextArray(TextArray::String32(Arc::new(str_arr))))
}
unsafe fn import_categorical(
arr: &ArrowArray,
sch: &ArrowSchema,
index_type: CategoricalIndexType,
ownership: Option<Box<ArrowArray>>,
) -> Arc<Array> {
let len = arr.length as usize;
let buffers = unsafe { slice::from_raw_parts(arr.buffers, 2) };
let null_ptr = buffers[0];
let codes_ptr = buffers[1];
let synthetic_schema;
let dict_sch_ptr: *const ArrowSchema = if !sch.dictionary.is_null() {
sch.dictionary as *const _
} else {
synthetic_schema = ArrowSchema {
format: b"u\0".as_ptr() as *const i8,
name: b"\0".as_ptr() as *const i8,
metadata: ptr::null(),
flags: 0,
n_children: 0,
children: ptr::null_mut(),
dictionary: ptr::null_mut(),
release: None,
private_data: ptr::null_mut(),
};
&synthetic_schema as *const _
};
let dict = unsafe { import_from_c(arr.dictionary as *const _, dict_sch_ptr) };
let dict_strings = match dict.as_ref() {
Array::TextArray(TextArray::String32(s)) => (0..s.len())
.map(|i| s.get(i).unwrap_or_default().to_string())
.collect(),
#[cfg(feature = "large_string")]
Array::TextArray(TextArray::String64(s)) => (0..s.len())
.map(|i| s.get(i).unwrap_or_default().to_string())
.collect(),
_ => panic!("Expected String32 dictionary"),
};
let null_mask = if !null_ptr.is_null() {
Some(unsafe { Bitmask::from_raw_slice(null_ptr, len) })
} else {
None
};
unsafe fn build_codes<T: Integer>(
codes_ptr: *const u8,
len: usize,
ownership: Option<Box<ArrowArray>>,
) -> Buffer<T> {
if let Some(arr_box) = ownership {
let data_len_bytes = len * std::mem::size_of::<T>();
let foreign = ForeignBuffer {
ptr: codes_ptr,
len: data_len_bytes,
array: Some(arr_box),
};
let shared = SharedBuffer::from_owner(foreign);
Buffer::from_shared(shared)
} else {
let data = unsafe { slice::from_raw_parts(codes_ptr as *const T, len) };
Vec64::from(data).into()
}
}
match index_type {
#[cfg(feature = "default_categorical_8")]
CategoricalIndexType::UInt8 => {
let codes_buf = unsafe { build_codes::<u8>(codes_ptr, len, ownership) };
let arr = CategoricalArray::<u8>::new(codes_buf, dict_strings, null_mask);
Arc::new(Array::TextArray(TextArray::Categorical8(Arc::new(arr))))
}
#[cfg(feature = "extended_categorical")]
CategoricalIndexType::UInt16 => {
let codes_buf = unsafe { build_codes::<u16>(codes_ptr, len, ownership) };
let arr = CategoricalArray::<u16>::new(codes_buf, dict_strings, null_mask);
Arc::new(Array::TextArray(TextArray::Categorical16(Arc::new(arr))))
}
#[cfg(any(not(feature = "default_categorical_8"), feature = "extended_categorical"))]
CategoricalIndexType::UInt32 => {
let codes_buf = unsafe { build_codes::<u32>(codes_ptr, len, ownership) };
let arr = CategoricalArray::<u32>::new(codes_buf, dict_strings, null_mask);
Arc::new(Array::TextArray(TextArray::Categorical32(Arc::new(arr))))
}
#[cfg(feature = "extended_categorical")]
CategoricalIndexType::UInt64 => {
let codes_buf = unsafe { build_codes::<u64>(codes_ptr, len, ownership) };
let arr = CategoricalArray::<u64>::new(codes_buf, dict_strings, null_mask);
Arc::new(Array::TextArray(TextArray::Categorical64(Arc::new(arr))))
}
}
}
#[cfg(all(feature = "default_categorical_8", not(feature = "extended_categorical")))]
unsafe fn import_categorical_narrow_to_u8(
arr: &ArrowArray,
sch: &ArrowSchema,
ownership: Option<Box<ArrowArray>>,
) -> Arc<Array> {
let len = arr.length as usize;
let buffers = unsafe { slice::from_raw_parts(arr.buffers, 2) };
let null_ptr = buffers[0];
let codes_ptr = buffers[1];
let synthetic_schema;
let dict_sch_ptr: *const ArrowSchema = if !sch.dictionary.is_null() {
sch.dictionary as *const _
} else {
synthetic_schema = ArrowSchema {
format: b"u\0".as_ptr() as *const i8,
name: b"\0".as_ptr() as *const i8,
metadata: ptr::null(),
flags: 0,
n_children: 0,
children: ptr::null_mut(),
dictionary: ptr::null_mut(),
release: None,
private_data: ptr::null_mut(),
};
&synthetic_schema as *const _
};
let dict = unsafe { import_from_c(arr.dictionary as *const _, dict_sch_ptr) };
let dict_strings = match dict.as_ref() {
Array::TextArray(TextArray::String32(s)) => (0..s.len())
.map(|i| s.get(i).unwrap_or_default().to_string())
.collect(),
#[cfg(feature = "large_string")]
Array::TextArray(TextArray::String64(s)) => (0..s.len())
.map(|i| s.get(i).unwrap_or_default().to_string())
.collect(),
_ => panic!("Expected String32 dictionary"),
};
let null_mask = if !null_ptr.is_null() {
Some(unsafe { Bitmask::from_raw_slice(null_ptr, len) })
} else {
None
};
let i32_codes = unsafe { slice::from_raw_parts(codes_ptr as *const i32, len) };
let mut u8_codes = Vec64::<u8>::with_capacity(len);
for &code in i32_codes {
assert!(
code >= 0 && code <= u8::MAX as i32,
"Arrow dictionary index {} exceeds u8 range (0..255)",
code
);
u8_codes.push(code as u8);
}
if let Some(mut arr_box) = ownership {
if let Some(release) = arr_box.release {
unsafe { release(&mut *arr_box as *mut ArrowArray) };
}
}
let codes_buf: Buffer<u8> = u8_codes.into();
let cat = CategoricalArray::<u8>::new(codes_buf, dict_strings, null_mask);
Arc::new(Array::TextArray(TextArray::Categorical8(Arc::new(cat))))
}
#[cfg(feature = "datetime")]
unsafe fn import_datetime<T: Integer>(
arr: &ArrowArray,
ownership: Option<Box<ArrowArray>>,
unit: crate::TimeUnit,
) -> Arc<Array> {
let len = arr.length as usize;
let buffers = unsafe { std::slice::from_raw_parts(arr.buffers, 2) };
let data_ptr = buffers[1] as *const T;
let data_len_bytes = len * std::mem::size_of::<T>();
let null_mask = if !buffers[0].is_null() {
Some(unsafe { Bitmask::from_raw_slice(buffers[0], len) })
} else {
None
};
let buffer: Buffer<T> = if let Some(arr_box) = ownership {
let foreign = ForeignBuffer {
ptr: data_ptr as *const u8,
len: data_len_bytes,
array: Some(arr_box),
};
let shared = SharedBuffer::from_owner(foreign);
Buffer::from_shared(shared)
} else {
let data = unsafe { std::slice::from_raw_parts(data_ptr, len) };
Vec64::from(data).into()
};
let dt_arr = DatetimeArray::<T> {
data: buffer,
null_mask,
time_unit: unit,
};
if std::any::TypeId::of::<T>() == std::any::TypeId::of::<i64>() {
Arc::new(Array::TemporalArray(TemporalArray::Datetime64(Arc::new(
unsafe { std::mem::transmute::<DatetimeArray<T>, DatetimeArray<i64>>(dt_arr) },
))))
} else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<i32>() {
Arc::new(Array::TemporalArray(TemporalArray::Datetime32(Arc::new(
unsafe { std::mem::transmute::<DatetimeArray<T>, DatetimeArray<i32>>(dt_arr) },
))))
} else {
panic!("Unsupported DatetimeArray type (expected i32 or i64)");
}
}
fn check_alignment(buf_ptrs: &mut Vec64<*const u8>, length: i64) {
if length == 0 {
return;
}
for &p in buf_ptrs.iter().take(3) {
if !p.is_null() {
assert_eq!(
(p as usize) % 64,
0,
"FFI: Array buffer pointer {:p} is not 64-byte aligned",
p
);
}
}
}
fn create_arrow_export(
array: Arc<Array>,
schema: Schema,
mut buf_ptrs: Vec64<*const u8>,
n_buffers: i64,
length: i64,
name_cstr: CString,
) -> (*mut ArrowArray, *mut ArrowSchema) {
let null_count = if buf_ptrs[0].is_null() { 0 } else { -1 };
let field = &schema.fields[0];
let format_cstr = fmt_c(field.dtype.clone());
let format_ptr = format_cstr.as_ptr();
let metadata_bytes = if field.metadata.is_empty() {
None
} else {
Some(encode_arrow_metadata(&field.metadata))
};
let metadata_ptr = metadata_bytes
.as_ref()
.map(|b| b.as_ptr() as *const i8)
.unwrap_or(ptr::null());
let arr = Box::new(ArrowArray {
length,
null_count,
offset: 0,
n_buffers,
n_children: 0,
buffers: buf_ptrs.as_mut_ptr(),
children: ptr::null_mut(),
dictionary: ptr::null_mut(),
release: Some(release_arrow_array),
private_data: ptr::null_mut(),
});
let flags = if field.nullable { 1 } else { 0 };
let schema_box = Box::new(ArrowSchema {
format: format_ptr,
name: name_cstr.as_ptr(),
metadata: metadata_ptr,
flags,
n_children: 0,
children: ptr::null_mut(),
dictionary: ptr::null_mut(),
release: Some(release_arrow_schema),
private_data: ptr::null_mut(),
});
let holder = Box::new(Holder {
array,
_schema: schema_box.clone(),
buf_ptrs,
name_cstr,
format_cstr,
metadata_bytes,
});
let arr_ptr = Box::into_raw(arr);
unsafe {
(*arr_ptr).private_data = Box::into_raw(holder) as *mut c_void;
}
(arr_ptr, Box::into_raw(schema_box))
}
struct RecordBatchStreamHolder {
fields: Vec<crate::Field>,
batches: Vec<Vec<(Arc<Array>, Schema)>>,
cursor: usize,
last_error: Option<CString>,
schema_metadata: Option<Vec<u8>>,
}
struct ArrayStreamHolder {
field: crate::Field,
chunks: Vec<Arc<Array>>,
cursor: usize,
last_error: Option<CString>,
}
pub fn export_struct_to_c(
columns: Vec<(Arc<Array>, Schema)>,
metadata: Option<std::collections::BTreeMap<String, String>>,
) -> (*mut ArrowArray, *mut ArrowSchema) {
export_struct_to_c_inner(columns, metadata)
}
fn export_struct_to_c_inner(
columns: Vec<(Arc<Array>, Schema)>,
metadata: Option<std::collections::BTreeMap<String, String>>,
) -> (*mut ArrowArray, *mut ArrowSchema) {
let n_cols = columns.len();
let n_rows = if n_cols > 0 {
columns[0].0.len() as i64
} else {
0
};
let mut child_array_ptrs: Vec<*mut ArrowArray> = Vec::with_capacity(n_cols);
let mut child_schema_ptrs: Vec<*mut ArrowSchema> = Vec::with_capacity(n_cols);
for (array, schema) in &columns {
let (arr_ptr, sch_ptr) = export_to_c(array.clone(), schema.clone());
child_array_ptrs.push(arr_ptr);
child_schema_ptrs.push(sch_ptr);
}
let children_arr_box = child_array_ptrs.into_boxed_slice();
let children_arr_ptr = Box::into_raw(children_arr_box) as *mut *mut ArrowArray;
let struct_arr = Box::new(ArrowArray {
length: n_rows,
null_count: 0,
offset: 0,
n_buffers: 1, n_children: n_cols as i64,
buffers: Box::into_raw(Box::new(ptr::null::<u8>())) as *mut *const u8,
children: children_arr_ptr,
dictionary: ptr::null_mut(),
release: Some(release_struct_array),
private_data: ptr::null_mut(),
});
let children_sch_box = child_schema_ptrs.into_boxed_slice();
let children_sch_ptr = Box::into_raw(children_sch_box) as *mut *mut ArrowSchema;
let format_cstr = CString::new("+s").unwrap();
let name_cstr = CString::new("").unwrap();
let metadata_bytes = metadata.map(|m| encode_arrow_metadata(&m));
let metadata_ptr = metadata_bytes
.as_ref()
.map(|b| b.as_ptr() as *const i8)
.unwrap_or(ptr::null());
let struct_holder = Box::new(StructSchemaHolder {
format_cstr,
name_cstr,
metadata_bytes,
});
let struct_sch = Box::new(ArrowSchema {
format: struct_holder.format_cstr.as_ptr(),
name: struct_holder.name_cstr.as_ptr(),
metadata: metadata_ptr,
flags: 0,
n_children: n_cols as i64,
children: children_sch_ptr,
dictionary: ptr::null_mut(),
release: Some(release_struct_schema),
private_data: Box::into_raw(struct_holder) as *mut c_void,
});
(Box::into_raw(struct_arr), Box::into_raw(struct_sch))
}
#[allow(dead_code)]
struct StructSchemaHolder {
format_cstr: CString,
name_cstr: CString,
metadata_bytes: Option<Vec<u8>>,
}
fn encode_arrow_metadata(pairs: &std::collections::BTreeMap<String, String>) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend_from_slice(&(pairs.len() as i32).to_le_bytes());
for (k, v) in pairs {
buf.extend_from_slice(&(k.len() as i32).to_le_bytes());
buf.extend_from_slice(k.as_bytes());
buf.extend_from_slice(&(v.len() as i32).to_le_bytes());
buf.extend_from_slice(v.as_bytes());
}
buf
}
pub unsafe fn decode_arrow_metadata(
ptr: *const i8,
) -> Option<std::collections::BTreeMap<String, String>> {
if ptr.is_null() {
return None;
}
let mut cursor = ptr as *const u8;
let num_pairs = i32::from_le_bytes(unsafe {
let bytes = slice::from_raw_parts(cursor, 4);
cursor = cursor.add(4);
bytes.try_into().unwrap()
});
let mut map = std::collections::BTreeMap::new();
for _ in 0..num_pairs {
let key_len = i32::from_le_bytes(unsafe {
let bytes = slice::from_raw_parts(cursor, 4);
cursor = cursor.add(4);
bytes.try_into().unwrap()
}) as usize;
let key = unsafe {
let bytes = slice::from_raw_parts(cursor, key_len);
cursor = cursor.add(key_len);
String::from_utf8_lossy(bytes).into_owned()
};
let val_len = i32::from_le_bytes(unsafe {
let bytes = slice::from_raw_parts(cursor, 4);
cursor = cursor.add(4);
bytes.try_into().unwrap()
}) as usize;
let val = unsafe {
let bytes = slice::from_raw_parts(cursor, val_len);
cursor = cursor.add(val_len);
String::from_utf8_lossy(bytes).into_owned()
};
map.insert(key, val);
}
Some(map)
}
unsafe extern "C" fn release_struct_array(arr: *mut ArrowArray) {
if arr.is_null() || (unsafe { &*arr }).release.is_none() {
return;
}
let a = unsafe { &*arr };
let n_children = a.n_children as usize;
if !a.children.is_null() {
let children = unsafe { slice::from_raw_parts_mut(a.children, n_children) };
for child_ptr in children.iter_mut() {
if !child_ptr.is_null() {
let child = unsafe { &mut **child_ptr };
if let Some(release) = child.release {
unsafe { release(*child_ptr) };
}
let _ = unsafe { Box::from_raw(*child_ptr) };
}
}
let _ = unsafe {
Box::from_raw(
slice::from_raw_parts_mut(a.children, n_children) as *mut [*mut ArrowArray]
)
};
}
if !a.buffers.is_null() {
let _ = unsafe { Box::from_raw(a.buffers as *mut *const u8) };
}
unsafe { ptr::write_bytes(arr, 0, 1) };
}
unsafe extern "C" fn release_struct_schema(sch: *mut ArrowSchema) {
if sch.is_null() || (unsafe { &*sch }).release.is_none() {
return;
}
let s = unsafe { &*sch };
let n_children = s.n_children as usize;
if !s.children.is_null() {
let children = unsafe { slice::from_raw_parts_mut(s.children, n_children) };
for child_ptr in children.iter_mut() {
if !child_ptr.is_null() {
let child = unsafe { &mut **child_ptr };
if let Some(release) = child.release {
unsafe { release(*child_ptr) };
}
let _ = unsafe { Box::from_raw(*child_ptr) };
}
}
let _ = unsafe {
Box::from_raw(
slice::from_raw_parts_mut(s.children, n_children) as *mut [*mut ArrowSchema]
)
};
}
if !s.private_data.is_null() {
let _ = unsafe { Box::from_raw(s.private_data as *mut StructSchemaHolder) };
}
unsafe { ptr::write_bytes(sch, 0, 1) };
}
pub fn export_record_batch_stream(
batches: Vec<Vec<(Arc<Array>, Schema)>>,
fields: Vec<crate::Field>,
) -> Box<ArrowArrayStream> {
export_record_batch_stream_with_metadata(batches, fields, None)
}
pub fn export_record_batch_stream_with_metadata(
batches: Vec<Vec<(Arc<Array>, Schema)>>,
fields: Vec<crate::Field>,
metadata: Option<std::collections::BTreeMap<String, String>>,
) -> Box<ArrowArrayStream> {
let schema_metadata = metadata.map(|m| encode_arrow_metadata(&m));
let holder = Box::new(RecordBatchStreamHolder {
fields,
batches,
cursor: 0,
last_error: None,
schema_metadata,
});
let stream = Box::new(ArrowArrayStream {
get_schema: Some(rb_stream_get_schema),
get_next: Some(rb_stream_get_next),
get_last_error: Some(stream_get_last_error::<RecordBatchStreamHolder>),
release: Some(rb_stream_release),
private_data: Box::into_raw(holder) as *mut c_void,
});
stream
}
unsafe extern "C" fn rb_stream_get_schema(
stream: *mut ArrowArrayStream,
out: *mut ArrowSchema,
) -> i32 {
let holder = unsafe { &*((*stream).private_data as *const RecordBatchStreamHolder) };
let n_fields = holder.fields.len();
let mut child_schemas: Vec<*mut ArrowSchema> = Vec::with_capacity(n_fields);
for field in &holder.fields {
let format_cstr = fmt_c(field.dtype.clone());
let name_cstr = CString::new(field.name.clone()).unwrap_or_default();
let flags = if field.nullable { 2 } else { 0 };
let metadata_bytes = if field.metadata.is_empty() {
None
} else {
Some(encode_arrow_metadata(&field.metadata))
};
let metadata_ptr = metadata_bytes
.as_ref()
.map(|b| b.as_ptr() as *const i8)
.unwrap_or(ptr::null());
let child_holder = Box::new(StructSchemaHolder {
format_cstr,
name_cstr,
metadata_bytes,
});
let child = Box::new(ArrowSchema {
format: child_holder.format_cstr.as_ptr(),
name: child_holder.name_cstr.as_ptr(),
metadata: metadata_ptr,
flags,
n_children: 0,
children: ptr::null_mut(),
dictionary: ptr::null_mut(),
release: Some(release_struct_schema),
private_data: Box::into_raw(child_holder) as *mut c_void,
});
child_schemas.push(Box::into_raw(child));
}
let children_box = child_schemas.into_boxed_slice();
let children_ptr = Box::into_raw(children_box) as *mut *mut ArrowSchema;
let format_cstr = CString::new("+s").unwrap();
let name_cstr = CString::new("").unwrap();
let metadata_bytes = holder.schema_metadata.clone();
let metadata_ptr = metadata_bytes
.as_ref()
.map(|b| b.as_ptr() as *const i8)
.unwrap_or(ptr::null());
let schema_holder = Box::new(StructSchemaHolder {
format_cstr,
name_cstr,
metadata_bytes,
});
let struct_schema = ArrowSchema {
format: schema_holder.format_cstr.as_ptr(),
name: schema_holder.name_cstr.as_ptr(),
metadata: metadata_ptr,
flags: 0,
n_children: n_fields as i64,
children: children_ptr,
dictionary: ptr::null_mut(),
release: Some(release_struct_schema),
private_data: Box::into_raw(schema_holder) as *mut c_void,
};
unsafe { ptr::write(out, struct_schema) };
0
}
unsafe extern "C" fn rb_stream_get_next(
stream: *mut ArrowArrayStream,
out: *mut ArrowArray,
) -> i32 {
let holder = unsafe { &mut *((*stream).private_data as *mut RecordBatchStreamHolder) };
if holder.cursor >= holder.batches.len() {
unsafe { ptr::write(out, ArrowArray::empty()) };
return 0;
}
let batch = holder.batches[holder.cursor].clone();
holder.cursor += 1;
let (arr_ptr, _sch_ptr) = export_struct_to_c_inner(batch, None);
unsafe {
ptr::write(out, ptr::read(arr_ptr));
std::alloc::dealloc(arr_ptr as *mut u8, std::alloc::Layout::for_value(&*arr_ptr));
}
unsafe {
if let Some(release) = (*_sch_ptr).release {
release(_sch_ptr);
}
let _ = Box::from_raw(_sch_ptr);
}
0
}
unsafe extern "C" fn rb_stream_release(stream: *mut ArrowArrayStream) {
if stream.is_null() || (unsafe { &*stream }).release.is_none() {
return;
}
let _ = unsafe { Box::from_raw((*stream).private_data as *mut RecordBatchStreamHolder) };
unsafe { ptr::write_bytes(stream, 0, 1) };
}
pub fn export_array_stream(chunks: Vec<Arc<Array>>, field: crate::Field) -> Box<ArrowArrayStream> {
let holder = Box::new(ArrayStreamHolder {
field,
chunks,
cursor: 0,
last_error: None,
});
let stream = Box::new(ArrowArrayStream {
get_schema: Some(arr_stream_get_schema),
get_next: Some(arr_stream_get_next),
get_last_error: Some(stream_get_last_error::<ArrayStreamHolder>),
release: Some(arr_stream_release),
private_data: Box::into_raw(holder) as *mut c_void,
});
stream
}
unsafe extern "C" fn arr_stream_get_schema(
stream: *mut ArrowArrayStream,
out: *mut ArrowSchema,
) -> i32 {
let holder = unsafe { &*((*stream).private_data as *const ArrayStreamHolder) };
let field = &holder.field;
let format_cstr = fmt_c(field.dtype.clone());
let name_cstr = CString::new(field.name.clone()).unwrap_or_default();
let flags = if field.nullable { 2 } else { 0 };
let schema_holder = Box::new(StructSchemaHolder {
format_cstr,
name_cstr,
metadata_bytes: None,
});
let schema = ArrowSchema {
format: schema_holder.format_cstr.as_ptr(),
name: schema_holder.name_cstr.as_ptr(),
metadata: ptr::null(),
flags,
n_children: 0,
children: ptr::null_mut(),
dictionary: ptr::null_mut(),
release: Some(release_struct_schema),
private_data: Box::into_raw(schema_holder) as *mut c_void,
};
unsafe { ptr::write(out, schema) };
0
}
unsafe extern "C" fn arr_stream_get_next(
stream: *mut ArrowArrayStream,
out: *mut ArrowArray,
) -> i32 {
let holder = unsafe { &mut *((*stream).private_data as *mut ArrayStreamHolder) };
if holder.cursor >= holder.chunks.len() {
unsafe { ptr::write(out, ArrowArray::empty()) };
return 0;
}
let array = holder.chunks[holder.cursor].clone();
holder.cursor += 1;
let schema = Schema::from(vec![holder.field.clone()]);
let (arr_ptr, sch_ptr) = export_to_c(array, schema);
unsafe {
ptr::write(out, ptr::read(arr_ptr));
std::alloc::dealloc(arr_ptr as *mut u8, std::alloc::Layout::for_value(&*arr_ptr));
}
unsafe {
if let Some(release) = (*sch_ptr).release {
release(sch_ptr);
}
let _ = Box::from_raw(sch_ptr);
}
0
}
unsafe extern "C" fn arr_stream_release(stream: *mut ArrowArrayStream) {
if stream.is_null() || (unsafe { &*stream }).release.is_none() {
return;
}
let _ = unsafe { Box::from_raw((*stream).private_data as *mut ArrayStreamHolder) };
unsafe { ptr::write_bytes(stream, 0, 1) };
}
trait HasLastError {
fn last_error(&self) -> Option<&CString>;
}
impl HasLastError for RecordBatchStreamHolder {
fn last_error(&self) -> Option<&CString> {
self.last_error.as_ref()
}
}
impl HasLastError for ArrayStreamHolder {
fn last_error(&self) -> Option<&CString> {
self.last_error.as_ref()
}
}
unsafe extern "C" fn stream_get_last_error<T: HasLastError>(
stream: *mut ArrowArrayStream,
) -> *const i8 {
unsafe {
if stream.is_null() || (*stream).private_data.is_null() {
return ptr::null();
}
let holder = &*((*stream).private_data as *const T);
match holder.last_error() {
Some(err) => err.as_ptr(),
None => ptr::null(),
}
}
}
pub unsafe fn import_record_batch_stream(
stream: *mut ArrowArrayStream,
) -> Vec<Vec<(Arc<Array>, crate::Field)>> {
let (batches, _metadata) = unsafe { import_record_batch_stream_with_metadata(stream) };
batches
}
pub unsafe fn import_record_batch_stream_with_metadata(
stream: *mut ArrowArrayStream,
) -> (
Vec<Vec<(Arc<Array>, crate::Field)>>,
Option<std::collections::BTreeMap<String, String>>,
) {
unsafe {
let mut schema = ArrowSchema::empty();
let get_schema = ((*stream).get_schema).expect("stream has no get_schema callback");
let rc = get_schema(stream, &mut schema);
assert_eq!(
rc, 0,
"ArrowArrayStream get_schema returned error code {}",
rc
);
let metadata = decode_arrow_metadata(schema.metadata);
let n_fields = schema.n_children as usize;
let child_schemas: Vec<&ArrowSchema> = if n_fields > 0 && !schema.children.is_null() {
(0..n_fields).map(|i| &**schema.children.add(i)).collect()
} else {
Vec::new()
};
let mut batches = Vec::new();
let get_next = ((*stream).get_next).expect("stream has no get_next callback");
loop {
let mut arr = ArrowArray::empty();
let rc = get_next(stream, &mut arr);
assert_eq!(
rc, 0,
"ArrowArrayStream get_next returned error code {}",
rc
);
if arr.release.is_none() {
break;
}
let n_children = arr.n_children as usize;
assert_eq!(
n_children, n_fields,
"Struct array child count ({}) does not match schema ({})",
n_children, n_fields
);
let mut columns = Vec::with_capacity(n_children);
for i in 0..n_children {
let child_sch = child_schemas[i];
let mut field = field_from_c_schema(child_sch);
let dtype = field.dtype.clone();
let child_raw: *mut ArrowArray = *arr.children.add(i);
let child_content = ptr::read(child_raw);
ptr::write(child_raw, ArrowArray::empty());
let child_box = Box::new(child_content);
let imported =
import_array_zero_copy(child_box, dtype, child_sch as *const ArrowSchema);
if field.dtype == ArrowType::Utf8View {
field.dtype = ArrowType::String;
}
columns.push((imported, field));
}
if let Some(release) = arr.release {
release(&mut arr as *mut ArrowArray);
}
batches.push(columns);
}
if let Some(release) = schema.release {
release(&mut schema as *mut ArrowSchema);
}
if let Some(release) = (*stream).release {
release(stream);
}
(batches, metadata)
}
}
pub unsafe fn import_array_stream(
stream: *mut ArrowArrayStream,
) -> (Vec<Arc<Array>>, crate::Field) {
unsafe {
let mut schema_c = ArrowSchema::empty();
let get_schema = ((*stream).get_schema).expect("stream has no get_schema callback");
let rc = get_schema(stream, &mut schema_c);
assert_eq!(
rc, 0,
"ArrowArrayStream get_schema returned error code {}",
rc
);
let mut field = field_from_c_schema(&schema_c);
let mut arrays = Vec::new();
let get_next = ((*stream).get_next).expect("stream has no get_next callback");
loop {
let mut arr = ArrowArray::empty();
let rc = get_next(stream, &mut arr);
assert_eq!(
rc, 0,
"ArrowArrayStream get_next returned error code {}",
rc
);
if arr.release.is_none() {
break;
}
let arr_box = Box::new(arr);
let imported = import_array_zero_copy(
arr_box,
field.dtype.clone(),
&schema_c as *const ArrowSchema,
);
arrays.push(imported);
}
if field.dtype == ArrowType::Utf8View {
field.dtype = ArrowType::String;
}
if let Some(release) = schema_c.release {
release(&mut schema_c as *mut ArrowSchema);
}
if let Some(release) = (*stream).release {
release(stream);
}
(arrays, field)
}
}
unsafe fn field_from_c_schema(schema: &ArrowSchema) -> crate::Field {
let name = if schema.name.is_null() {
String::new()
} else {
unsafe { std::ffi::CStr::from_ptr(schema.name) }
.to_string_lossy()
.into_owned()
};
let nullable = (schema.flags & 2) != 0;
let fmt = unsafe { std::ffi::CStr::from_ptr(schema.format).to_bytes() };
let dtype = if !schema.dictionary.is_null() {
use crate::ffi::arrow_dtype::CategoricalIndexType;
let index_type = match fmt {
#[cfg(feature = "default_categorical_8")]
b"c" | b"C" => CategoricalIndexType::UInt8,
#[cfg(all(feature = "extended_numeric_types", feature = "extended_categorical"))]
b"s" | b"S" => CategoricalIndexType::UInt16,
#[cfg(any(not(feature = "default_categorical_8"), feature = "extended_categorical"))]
b"i" | b"I" => CategoricalIndexType::UInt32,
#[cfg(all(feature = "default_categorical_8", not(feature = "extended_categorical")))]
b"i" | b"I" => CategoricalIndexType::UInt8,
#[cfg(all(feature = "extended_numeric_types", feature = "extended_categorical"))]
b"l" | b"L" => CategoricalIndexType::UInt64,
_ => panic!(
"Unsupported dictionary index format: {:?}",
std::str::from_utf8(fmt).unwrap_or("??")
),
};
ArrowType::Dictionary(index_type)
} else {
parse_arrow_format(fmt)
};
let metadata = unsafe { decode_arrow_metadata(schema.metadata) };
crate::Field::new(name, dtype, nullable, metadata)
}
fn parse_arrow_format(fmt: &[u8]) -> ArrowType {
match fmt {
b"n" => ArrowType::Null,
b"b" => ArrowType::Boolean,
#[cfg(feature = "extended_numeric_types")]
b"c" => ArrowType::Int8,
#[cfg(feature = "extended_numeric_types")]
b"C" => ArrowType::UInt8,
#[cfg(feature = "extended_numeric_types")]
b"s" => ArrowType::Int16,
#[cfg(feature = "extended_numeric_types")]
b"S" => ArrowType::UInt16,
b"i" => ArrowType::Int32,
b"I" => ArrowType::UInt32,
b"l" => ArrowType::Int64,
b"L" => ArrowType::UInt64,
b"f" => ArrowType::Float32,
b"g" => ArrowType::Float64,
b"u" => ArrowType::String,
#[cfg(feature = "large_string")]
b"U" => ArrowType::LargeString,
b"vu" => ArrowType::Utf8View,
#[cfg(feature = "datetime")]
b"tdD" => ArrowType::Date32,
#[cfg(feature = "datetime")]
b"tdm" => ArrowType::Date64,
#[cfg(feature = "datetime")]
b"tts" => ArrowType::Time32(crate::TimeUnit::Seconds),
#[cfg(feature = "datetime")]
b"ttm" => ArrowType::Time32(crate::TimeUnit::Milliseconds),
#[cfg(feature = "datetime")]
b"ttu" => ArrowType::Time64(crate::TimeUnit::Microseconds),
#[cfg(feature = "datetime")]
b"ttn" => ArrowType::Time64(crate::TimeUnit::Nanoseconds),
#[cfg(feature = "datetime")]
b"tDs" => ArrowType::Duration32(crate::TimeUnit::Seconds),
#[cfg(feature = "datetime")]
b"tDm" => ArrowType::Duration32(crate::TimeUnit::Milliseconds),
#[cfg(feature = "datetime")]
b"tDu" => ArrowType::Duration64(crate::TimeUnit::Microseconds),
#[cfg(feature = "datetime")]
b"tDn" => ArrowType::Duration64(crate::TimeUnit::Nanoseconds),
#[cfg(feature = "datetime")]
b"tiM" => ArrowType::Interval(crate::IntervalUnit::YearMonth),
#[cfg(feature = "datetime")]
b"tiD" => ArrowType::Interval(crate::IntervalUnit::DaysTime),
#[cfg(feature = "datetime")]
b"tin" => ArrowType::Interval(crate::IntervalUnit::MonthDaysNs),
b"+s" => ArrowType::Null, #[cfg(feature = "datetime")]
_ if fmt.starts_with(b"tss")
|| fmt.starts_with(b"tsm")
|| fmt.starts_with(b"tsu")
|| fmt.starts_with(b"tsn") =>
{
let unit = match &fmt[..3] {
b"tss" => crate::TimeUnit::Seconds,
b"tsm" => crate::TimeUnit::Milliseconds,
b"tsu" => crate::TimeUnit::Microseconds,
b"tsn" => crate::TimeUnit::Nanoseconds,
_ => unreachable!(),
};
let tz = if fmt.len() > 4 {
let tz_bytes = &fmt[4..];
let tz_str = String::from_utf8_lossy(tz_bytes).into_owned();
if tz_str.is_empty() {
None
} else {
Some(tz_str)
}
} else {
None
};
ArrowType::Timestamp(unit, tz)
}
o => panic!(
"unsupported Arrow format {:?}",
std::str::from_utf8(o).unwrap_or("??")
),
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
#[cfg(feature = "datetime")]
use crate::DatetimeArray;
use crate::ffi::arrow_c_ffi::export_to_c;
#[cfg(feature = "datetime")]
use crate::ffi::arrow_c_ffi::import_from_c;
use crate::ffi::arrow_dtype::ArrowType;
use crate::ffi::schema::Schema;
use crate::{Array, BooleanArray, Field, FloatArray, IntegerArray, MaskedArray, StringArray};
fn schema_for(name: &str, ty: ArrowType, nullable: bool) -> Schema {
Schema {
fields: vec![Field::new(name, ty, nullable, None)],
metadata: Default::default(),
}
}
#[test]
fn test_arrow_c_export_int32() {
let mut arr = IntegerArray::<i32>::default();
arr.push(1);
arr.push(2);
arr.push(3);
let array = Arc::new(Array::from_int32(arr));
let schema = schema_for("ints", ArrowType::Int32, false);
let (arr_ptr, sch_ptr) = export_to_c(array, schema);
unsafe {
assert_eq!((*arr_ptr).length, 3);
let bufs = std::slice::from_raw_parts((*arr_ptr).buffers, 2);
let vals = std::slice::from_raw_parts(bufs[1] as *const i32, 3);
assert_eq!(vals, &[1, 2, 3]);
((*arr_ptr).release.unwrap())(arr_ptr);
((*sch_ptr).release.unwrap())(sch_ptr);
}
}
#[test]
fn test_arrow_c_export_int64() {
let mut arr = IntegerArray::<i64>::default();
arr.push(-42);
arr.push(99);
arr.push(1001);
let array = Arc::new(Array::from_int64(arr));
let schema = schema_for("big", ArrowType::Int64, false);
let (arr_ptr, sch_ptr) = export_to_c(array, schema);
unsafe {
assert_eq!((*arr_ptr).length, 3);
let bufs = std::slice::from_raw_parts((*arr_ptr).buffers, 2);
let vals = std::slice::from_raw_parts(bufs[1] as *const i64, 3);
assert_eq!(vals, &[-42, 99, 1001]);
((*arr_ptr).release.unwrap())(arr_ptr);
((*sch_ptr).release.unwrap())(sch_ptr);
}
}
#[test]
fn test_arrow_c_export_u32() {
let mut arr = IntegerArray::<u32>::default();
arr.push(100);
arr.push(200);
arr.push(300);
let array = Arc::new(Array::from_uint32(arr));
let schema = schema_for("uints", ArrowType::UInt32, false);
let (arr_ptr, sch_ptr) = export_to_c(array, schema);
unsafe {
assert_eq!((*arr_ptr).length, 3);
let bufs = std::slice::from_raw_parts((*arr_ptr).buffers, 2);
let vals = std::slice::from_raw_parts(bufs[1] as *const u32, 3);
assert_eq!(vals, &[100, 200, 300]);
((*arr_ptr).release.unwrap())(arr_ptr);
((*sch_ptr).release.unwrap())(sch_ptr);
}
}
#[test]
fn test_arrow_c_export_f32() {
let mut arr = FloatArray::<f32>::default();
arr.push(1.5);
arr.push(-2.0);
arr.push(3.25);
let array = Arc::new(Array::from_float32(arr));
let schema = schema_for("floats", ArrowType::Float32, false);
let (arr_ptr, sch_ptr) = export_to_c(array, schema);
unsafe {
assert_eq!((*arr_ptr).length, 3);
let bufs = std::slice::from_raw_parts((*arr_ptr).buffers, 2);
let vals = std::slice::from_raw_parts(bufs[1] as *const f32, 3);
assert_eq!(vals, &[1.5, -2.0, 3.25]);
((*arr_ptr).release.unwrap())(arr_ptr);
((*sch_ptr).release.unwrap())(sch_ptr);
}
}
#[test]
fn test_arrow_c_export_f64() {
let mut arr = FloatArray::<f64>::default();
arr.push(0.1);
arr.push(0.2);
arr.push(0.3);
let array = Arc::new(Array::from_float64(arr));
let schema = schema_for("doubles", ArrowType::Float64, false);
let (arr_ptr, sch_ptr) = export_to_c(array, schema);
unsafe {
assert_eq!((*arr_ptr).length, 3);
let bufs = std::slice::from_raw_parts((*arr_ptr).buffers, 2);
let vals = std::slice::from_raw_parts(bufs[1] as *const f64, 3);
assert_eq!(vals, &[0.1, 0.2, 0.3]);
((*arr_ptr).release.unwrap())(arr_ptr);
((*sch_ptr).release.unwrap())(sch_ptr);
}
}
#[test]
fn test_arrow_c_export_bool() {
let mut arr = BooleanArray::default();
arr.push(true);
arr.push(false);
arr.push(true);
let array = Arc::new(Array::BooleanArray(arr.into()));
let schema = schema_for("b", ArrowType::Boolean, false);
let (arr_ptr, sch_ptr) = export_to_c(array, schema);
unsafe {
assert_eq!((*arr_ptr).length, 3);
let bufs = std::slice::from_raw_parts((*arr_ptr).buffers, 2);
assert!(!bufs[1].is_null());
((*arr_ptr).release.unwrap())(arr_ptr);
((*sch_ptr).release.unwrap())(sch_ptr);
}
}
#[test]
fn test_arrow_c_export_str() {
let mut utf = StringArray::default();
utf.push_str("foo");
utf.push_str("bar");
let array = Arc::new(Array::from_string32(utf));
let schema = schema_for("txt", ArrowType::String, false);
let (arr_ptr, sch_ptr) = export_to_c(array, schema);
unsafe {
assert_eq!((*arr_ptr).length, 2);
let bufs = std::slice::from_raw_parts((*arr_ptr).buffers, 3);
assert!(!bufs[1].is_null(), "offsets buffer must be non-null");
assert!(!bufs[2].is_null(), "values buffer must be non-null");
((*arr_ptr).release.unwrap())(arr_ptr);
((*sch_ptr).release.unwrap())(sch_ptr);
}
}
#[test]
fn test_arrow_c_export_str_offsets() {
let mut utf = StringArray::default();
utf.push_str("foo");
utf.push_str("bar");
utf.push_str("baz");
let array = Arc::new(Array::from_string32(utf));
let schema = schema_for("txt", ArrowType::String, false);
let (arr_ptr, sch_ptr) = export_to_c(array, schema);
unsafe {
assert_eq!((*arr_ptr).length, 3);
let bufs = std::slice::from_raw_parts((*arr_ptr).buffers, 3);
let offsets = std::slice::from_raw_parts(bufs[1] as *const u32, 4);
assert_eq!(
offsets,
&[0, 3, 6, 9],
"UTF8 offsets must be monotonically increasing starting at 0"
);
((*arr_ptr).release.unwrap())(arr_ptr);
((*sch_ptr).release.unwrap())(sch_ptr);
}
}
#[test]
fn test_arrow_c_export_with_null_mask() {
let mut arr = IntegerArray::<i32>::default();
arr.push(42);
arr.push_null();
arr.push(88);
let array = Arc::new(Array::from_int32(arr));
let schema = schema_for("ints", ArrowType::Int32, true);
let (arr_ptr, sch_ptr) = export_to_c(array, schema);
unsafe {
assert_eq!((*arr_ptr).length, 3);
let bufs = std::slice::from_raw_parts((*arr_ptr).buffers, 2);
assert!(!bufs[0].is_null());
let bitmap = std::slice::from_raw_parts(bufs[0], 1);
assert_eq!(bitmap[0] & 0b111, 0b101);
((*arr_ptr).release.unwrap())(arr_ptr);
((*sch_ptr).release.unwrap())(sch_ptr);
}
}
#[cfg(feature = "datetime")]
#[test]
fn test_arrow_c_export_datetime() {
use crate::TimeUnit;
let mut dt = DatetimeArray::<i64>::default();
dt.push(1);
dt.push(2);
dt.time_unit = TimeUnit::Milliseconds;
let array = Arc::new(Array::from_datetime_i64(dt));
let schema = schema_for("dt", ArrowType::Date64, false);
let (arr_ptr, sch_ptr) = export_to_c(array, schema);
unsafe {
assert_eq!((*arr_ptr).length, 2);
let bufs = std::slice::from_raw_parts((*arr_ptr).buffers, 2);
assert!(!bufs[1].is_null());
((*arr_ptr).release.unwrap())(arr_ptr);
((*sch_ptr).release.unwrap())(sch_ptr);
}
}
#[cfg(feature = "datetime")]
#[test]
fn test_arrow_c_timezone_roundtrip_utc() {
use crate::TimeUnit;
let mut dt = DatetimeArray::<i64>::default();
dt.push(1609459200);
dt.push(1640995200);
dt.time_unit = TimeUnit::Seconds;
let array = Arc::new(Array::from_datetime_i64(dt));
let schema = schema_for(
"ts",
ArrowType::Timestamp(TimeUnit::Seconds, Some("UTC".to_string())),
false,
);
let (arr_ptr, sch_ptr) = export_to_c(array.clone(), schema);
unsafe {
let fmt = std::ffi::CStr::from_ptr((*sch_ptr).format).to_bytes();
assert_eq!(fmt, b"tss:UTC", "Format string should include timezone");
let imported = import_from_c(arr_ptr as *const _, sch_ptr as *const _);
if let Array::TemporalArray(crate::TemporalArray::Datetime64(imported_dt)) =
imported.as_ref()
{
assert_eq!(imported_dt.len(), 2);
assert_eq!(imported_dt.time_unit, TimeUnit::Seconds);
} else {
panic!("Expected Datetime64 array");
}
((*arr_ptr).release.unwrap())(arr_ptr);
((*sch_ptr).release.unwrap())(sch_ptr);
}
}
#[cfg(feature = "datetime")]
#[test]
fn test_arrow_c_timezone_roundtrip_iana() {
use crate::TimeUnit;
let mut dt = DatetimeArray::<i64>::default();
dt.push(1609459200000);
dt.push(1640995200000);
dt.time_unit = TimeUnit::Milliseconds;
let array = Arc::new(Array::from_datetime_i64(dt));
let tz = "America/New_York".to_string();
let schema = schema_for(
"ts",
ArrowType::Timestamp(TimeUnit::Milliseconds, Some(tz.clone())),
false,
);
let (arr_ptr, sch_ptr) = export_to_c(array.clone(), schema);
unsafe {
let fmt = std::ffi::CStr::from_ptr((*sch_ptr).format).to_bytes();
assert_eq!(
fmt, b"tsm:America/New_York",
"Format string should include IANA timezone"
);
let imported = import_from_c(arr_ptr as *const _, sch_ptr as *const _);
if let Array::TemporalArray(crate::TemporalArray::Datetime64(imported_dt)) =
imported.as_ref()
{
assert_eq!(imported_dt.len(), 2);
assert_eq!(imported_dt.time_unit, TimeUnit::Milliseconds);
} else {
panic!("Expected Datetime64 array");
}
((*arr_ptr).release.unwrap())(arr_ptr);
((*sch_ptr).release.unwrap())(sch_ptr);
}
}
#[cfg(feature = "datetime")]
#[test]
fn test_arrow_c_timezone_roundtrip_offset() {
use crate::TimeUnit;
let mut dt = DatetimeArray::<i64>::default();
dt.push(1609459200000000);
dt.time_unit = TimeUnit::Microseconds;
let array = Arc::new(Array::from_datetime_i64(dt));
let tz = "+05:30".to_string();
let schema = schema_for(
"ts",
ArrowType::Timestamp(TimeUnit::Microseconds, Some(tz.clone())),
false,
);
let (arr_ptr, sch_ptr) = export_to_c(array.clone(), schema);
unsafe {
let fmt = std::ffi::CStr::from_ptr((*sch_ptr).format).to_bytes();
assert_eq!(
fmt, b"tsu:+05:30",
"Format string should include offset timezone"
);
let imported = import_from_c(arr_ptr as *const _, sch_ptr as *const _);
if let Array::TemporalArray(crate::TemporalArray::Datetime64(imported_dt)) =
imported.as_ref()
{
assert_eq!(imported_dt.len(), 1);
assert_eq!(imported_dt.time_unit, TimeUnit::Microseconds);
} else {
panic!("Expected Datetime64 array");
}
((*arr_ptr).release.unwrap())(arr_ptr);
((*sch_ptr).release.unwrap())(sch_ptr);
}
}
#[cfg(feature = "datetime")]
#[test]
fn test_arrow_c_timezone_none() {
use crate::TimeUnit;
let mut dt = DatetimeArray::<i64>::default();
dt.push(1609459200);
dt.time_unit = TimeUnit::Seconds;
let array = Arc::new(Array::from_datetime_i64(dt));
let schema = schema_for("ts", ArrowType::Timestamp(TimeUnit::Seconds, None), false);
let (arr_ptr, sch_ptr) = export_to_c(array.clone(), schema);
unsafe {
let fmt = std::ffi::CStr::from_ptr((*sch_ptr).format).to_bytes();
assert_eq!(
fmt, b"tss:",
"Format string should have colon but no timezone"
);
((*arr_ptr).release.unwrap())(arr_ptr);
((*sch_ptr).release.unwrap())(sch_ptr);
}
}
#[test]
fn test_field_metadata_round_trip_via_export_import() {
use super::import_from_c_owned;
use std::collections::BTreeMap;
let mut meta = BTreeMap::new();
meta.insert("source".to_string(), "test_db".to_string());
meta.insert("units".to_string(), "kg".to_string());
let mut arr = IntegerArray::<i32>::default();
arr.push(1);
arr.push(2);
let array = Arc::new(Array::from_int32(arr));
let schema = Schema {
fields: vec![Field::new(
"col",
ArrowType::Int32,
false,
Some(meta.clone()),
)],
metadata: Default::default(),
};
let (arr_ptr, sch_ptr) = export_to_c(array, schema);
unsafe {
let arr_box = Box::from_raw(arr_ptr);
let sch_box = Box::from_raw(sch_ptr);
let (_, field) = import_from_c_owned(arr_box, sch_box);
assert_eq!(
field.metadata, meta,
"Field metadata should survive round-trip"
);
}
}
#[test]
fn test_field_metadata_round_trip_record_batch_stream() {
use super::{
export_record_batch_stream_with_metadata, import_record_batch_stream_with_metadata,
};
use std::collections::BTreeMap;
let mut field_meta = BTreeMap::new();
field_meta.insert("origin".to_string(), "sensor_1".to_string());
let field = Field::new("vals", ArrowType::Int32, false, Some(field_meta.clone()));
let mut arr = IntegerArray::<i32>::default();
arr.push(10);
arr.push(20);
let array = Arc::new(Array::from_int32(arr));
let col_schema = Schema {
fields: vec![field.clone()],
metadata: Default::default(),
};
let batches = vec![vec![(array, col_schema)]];
let fields = vec![field];
let mut table_meta = BTreeMap::new();
table_meta.insert("table_name".to_string(), "test_tbl".to_string());
let stream =
export_record_batch_stream_with_metadata(batches, fields, Some(table_meta.clone()));
let stream_ptr = Box::into_raw(stream);
unsafe {
let (columns_batches, schema_meta) =
import_record_batch_stream_with_metadata(stream_ptr);
assert_eq!(
schema_meta,
Some(table_meta),
"Schema metadata should survive round-trip"
);
assert_eq!(columns_batches.len(), 1);
let batch = &columns_batches[0];
assert_eq!(batch.len(), 1);
let (_, imported_field) = &batch[0];
assert_eq!(
imported_field.metadata, field_meta,
"Field metadata should survive record batch stream round-trip"
);
}
}
#[test]
#[cfg(feature = "table_metadata")]
fn test_table_metadata_round_trip() {
use super::{
export_record_batch_stream_with_metadata, import_record_batch_stream_with_metadata,
};
use std::collections::BTreeMap;
let mut arr = IntegerArray::<i32>::default();
arr.push(1);
arr.push(2);
arr.push(3);
let array = Arc::new(Array::from_int32(arr));
let field = Field::new("col1", ArrowType::Int32, false, None);
let col_schema = Schema {
fields: vec![field.clone()],
metadata: Default::default(),
};
let mut table_meta = BTreeMap::new();
table_meta.insert(
"pandas".to_string(),
r#"{"columns":[{"name":"col1","pandas_type":"categorical","metadata":{"ordered":true}}]}"#.to_string(),
);
let stream = export_record_batch_stream_with_metadata(
vec![vec![(array, col_schema)]],
vec![field],
Some(table_meta.clone()),
);
let stream_ptr = Box::into_raw(stream);
let (batches, schema_meta) =
unsafe { import_record_batch_stream_with_metadata(stream_ptr) };
assert_eq!(schema_meta, Some(table_meta.clone()));
let imported_cols: Vec<_> = batches[0]
.iter()
.map(|(arr, f)| crate::FieldArray::from_arr(f.name.as_str(), (**arr).clone()))
.collect();
let table = crate::Table::new_with_metadata(
"test".to_string(),
Some(imported_cols),
table_meta.clone(),
);
assert_eq!(table.metadata(), &table_meta);
assert_eq!(table.n_rows(), 3);
}
}