use crate::column::Column;
use crate::error::{CudfError, Result};
use crate::table::Table;
use crate::types::checked_i32;
#[cfg(feature = "arrow-interop")]
use arrow_array::Array as _;
impl Column {
pub fn to_arrow_ipc(&self) -> Result<Vec<u8>> {
cudf_cxx::interop::ffi::column_to_arrow_ipc(&self.inner).map_err(CudfError::from_cxx)
}
pub fn from_arrow_ipc(data: &[u8]) -> Result<Self> {
let raw =
cudf_cxx::interop::ffi::column_from_arrow_ipc(data).map_err(CudfError::from_cxx)?;
Ok(Self { inner: raw })
}
}
impl Table {
pub fn to_arrow_ipc(&self) -> Result<Vec<u8>> {
cudf_cxx::interop::ffi::table_to_arrow_ipc(&self.inner).map_err(CudfError::from_cxx)
}
pub fn from_arrow_ipc(data: &[u8]) -> Result<Self> {
let raw =
cudf_cxx::interop::ffi::table_from_arrow_ipc(data).map_err(CudfError::from_cxx)?;
Ok(Self { inner: raw })
}
}
#[cfg(feature = "arrow-interop")]
impl Column {
pub fn to_arrow_array(&self) -> Result<arrow_array::ArrayRef> {
let mut pair = cudf_cxx::interop::ffi::column_to_arrow_pair(&self.inner)
.map_err(CudfError::from_cxx)?;
let schema_ptr = cudf_cxx::interop::ffi::arrow_pair_schema(pair.pin_mut());
let array_ptr = cudf_cxx::interop::ffi::arrow_pair_array(pair.pin_mut());
unsafe {
let ffi_schema = arrow::ffi::FFI_ArrowSchema::from_raw(
schema_ptr as *mut arrow::ffi::FFI_ArrowSchema,
);
let ffi_array =
arrow::ffi::FFI_ArrowArray::from_raw(array_ptr as *mut arrow::ffi::FFI_ArrowArray);
let data = arrow::ffi::from_ffi(ffi_array, &ffi_schema).map_err(CudfError::Arrow)?;
Ok(arrow_array::make_array(data))
}
}
pub fn from_arrow_array(array: &dyn arrow_array::Array) -> Result<Self> {
let data = array.to_data();
let (ffi_array, ffi_schema) = arrow::ffi::to_ffi(&data).map_err(CudfError::Arrow)?;
let schema_ptr = Box::into_raw(Box::new(ffi_schema)) as u64;
let array_ptr = Box::into_raw(Box::new(ffi_array)) as u64;
let result = cudf_cxx::interop::ffi::column_from_arrow_cdata(schema_ptr, array_ptr);
if result.is_err() {
unsafe {
drop(Box::from_raw(
schema_ptr as *mut arrow::ffi::FFI_ArrowSchema,
));
drop(Box::from_raw(array_ptr as *mut arrow::ffi::FFI_ArrowArray));
}
}
let raw = result.map_err(CudfError::from_cxx)?;
Ok(Self { inner: raw })
}
}
#[cfg(feature = "arrow-interop")]
impl Table {
pub fn to_arrow_batch(&self) -> Result<arrow::record_batch::RecordBatch> {
let mut pair = cudf_cxx::interop::ffi::table_to_arrow_pair(&self.inner)
.map_err(CudfError::from_cxx)?;
let schema_ptr = cudf_cxx::interop::ffi::arrow_pair_schema(pair.pin_mut());
let array_ptr = cudf_cxx::interop::ffi::arrow_pair_array(pair.pin_mut());
unsafe {
let ffi_schema = arrow::ffi::FFI_ArrowSchema::from_raw(
schema_ptr as *mut arrow::ffi::FFI_ArrowSchema,
);
let ffi_array =
arrow::ffi::FFI_ArrowArray::from_raw(array_ptr as *mut arrow::ffi::FFI_ArrowArray);
let data = arrow::ffi::from_ffi(ffi_array, &ffi_schema).map_err(CudfError::Arrow)?;
let struct_array = arrow_array::StructArray::from(data);
Ok(arrow::record_batch::RecordBatch::from(struct_array))
}
}
pub fn from_arrow_batch(batch: &arrow::record_batch::RecordBatch) -> Result<Self> {
let struct_array = arrow_array::StructArray::new(
batch.schema().fields().clone(),
batch.columns().to_vec(),
None,
);
let data = struct_array.to_data();
let (ffi_array, ffi_schema) = arrow::ffi::to_ffi(&data).map_err(CudfError::Arrow)?;
let schema_ptr = Box::into_raw(Box::new(ffi_schema)) as u64;
let array_ptr = Box::into_raw(Box::new(ffi_array)) as u64;
let result = cudf_cxx::interop::ffi::table_from_arrow_cdata(schema_ptr, array_ptr);
if result.is_err() {
unsafe {
drop(Box::from_raw(
schema_ptr as *mut arrow::ffi::FFI_ArrowSchema,
));
drop(Box::from_raw(array_ptr as *mut arrow::ffi::FFI_ArrowArray));
}
}
let raw = result.map_err(CudfError::from_cxx)?;
Ok(Self { inner: raw })
}
}
#[cfg(feature = "arrow-interop")]
mod arrow_conv {
use arrow::ipc;
use arrow::record_batch::RecordBatch;
use crate::error::{CudfError, Result};
use crate::table::Table;
impl Table {
pub fn to_record_batch(&self) -> Result<RecordBatch> {
let ipc_bytes = self.to_arrow_ipc()?;
let cursor = std::io::Cursor::new(ipc_bytes);
let reader =
ipc::reader::FileReader::try_new(cursor, None).map_err(CudfError::Arrow)?;
reader
.into_iter()
.next()
.ok_or_else(|| CudfError::InvalidArgument("empty IPC stream".into()))?
.map_err(CudfError::Arrow)
}
pub fn from_record_batch(batch: &RecordBatch) -> Result<Self> {
let mut buf = Vec::new();
{
let mut writer = ipc::writer::FileWriter::try_new(&mut buf, batch.schema_ref())
.map_err(CudfError::Arrow)?;
writer.write(batch).map_err(CudfError::Arrow)?;
writer.finish().map_err(CudfError::Arrow)?;
}
Self::from_arrow_ipc(&buf)
}
}
}
pub struct DLPackTensor {
ptr: u64,
}
impl DLPackTensor {
pub fn from_table(table: &Table) -> Result<Self> {
let ptr =
cudf_cxx::interop::ffi::table_to_dlpack(&table.inner).map_err(CudfError::from_cxx)?;
Ok(Self { ptr })
}
pub fn to_table(self) -> Result<Table> {
let this = std::mem::ManuallyDrop::new(self);
match cudf_cxx::interop::ffi::table_from_dlpack(this.ptr) {
Ok(raw) => {
Ok(Table { inner: raw })
}
Err(e) => {
cudf_cxx::interop::ffi::free_dlpack(this.ptr);
Err(CudfError::from_cxx(e))
}
}
}
pub fn as_raw_ptr(&self) -> usize {
self.ptr as usize
}
pub unsafe fn from_raw_ptr(ptr: usize) -> Self {
Self { ptr: ptr as u64 }
}
}
impl Drop for DLPackTensor {
fn drop(&mut self) {
cudf_cxx::interop::ffi::free_dlpack(self.ptr);
}
}
pub struct PackedTable {
inner: cxx::UniquePtr<cudf_cxx::interop::ffi::OwnedPackedColumns>,
}
impl PackedTable {
pub fn pack(table: &Table) -> Result<Self> {
let raw = cudf_cxx::interop::ffi::pack_table(&table.inner).map_err(CudfError::from_cxx)?;
Ok(Self { inner: raw })
}
pub fn metadata(&self) -> Result<Vec<u8>> {
cudf_cxx::interop::ffi::packed_metadata(&self.inner).map_err(CudfError::from_cxx)
}
pub fn gpu_data_size(&self) -> Result<i64> {
cudf_cxx::interop::ffi::packed_gpu_data_size(&self.inner).map_err(CudfError::from_cxx)
}
pub fn unpack(&self) -> Result<Table> {
let raw = cudf_cxx::interop::ffi::unpack_table(&self.inner).map_err(CudfError::from_cxx)?;
Ok(Table { inner: raw })
}
}
pub struct SplitResult {
handle: u64,
num_parts: usize,
extracted: Vec<bool>,
}
impl SplitResult {
pub fn split(table: &Table, splits: &[i32]) -> Result<Self> {
let info = cudf_cxx::interop::ffi::contiguous_split_table(&table.inner, splits)
.map_err(CudfError::from_cxx)?;
if info.len() < 2 {
return Err(CudfError::InvalidArgument(
"unexpected contiguous_split return".into(),
));
}
let num_parts = info[1] as usize;
Ok(Self {
handle: info[0],
num_parts,
extracted: vec![false; num_parts],
})
}
pub fn num_parts(&self) -> usize {
self.num_parts
}
pub fn get(&mut self, index: usize) -> Result<PackedTable> {
if index >= self.num_parts {
return Err(CudfError::IndexOutOfBounds {
index,
size: self.num_parts,
});
}
if self.extracted[index] {
return Err(CudfError::InvalidArgument(format!(
"partition {index} has already been extracted from this SplitResult; \
each partition can only be retrieved once (the C++ side moves ownership)"
)));
}
let raw = cudf_cxx::interop::ffi::contiguous_split_get(self.handle, checked_i32(index)?)
.map_err(CudfError::from_cxx)?;
self.extracted[index] = true;
Ok(PackedTable { inner: raw })
}
}
impl Drop for SplitResult {
fn drop(&mut self) {
cudf_cxx::interop::ffi::contiguous_split_free(self.handle);
}
}