use minarrow::Consolidate;
use minarrow::ffi::arrow_c_ffi::{
ArrowArray, ArrowArrayStream, ArrowSchema, import_from_c_owned,
import_record_batch_stream_with_metadata,
};
use minarrow::{Field, FieldArray, SuperArray, SuperTable};
use pyo3::ffi::Py_uintptr_t;
use pyo3::prelude::*;
use pyo3::types::PyTuple;
use std::sync::Arc;
use crate::error::{PyMinarrowError, PyMinarrowResult};
use crate::ffi::to_py::TABLE_NAME_KEY;
fn extract_table_name_from_pyarrow_schema(schema: &Bound<PyAny>) -> String {
schema
.getattr("metadata")
.ok()
.and_then(|meta| {
if meta.is_none() {
return None;
}
let key = TABLE_NAME_KEY.as_bytes();
meta.call_method1("get", (key,))
.ok()
.and_then(|val| {
if val.is_none() {
return None;
}
val.extract::<Vec<u8>>()
.ok()
.and_then(|bytes| String::from_utf8(bytes).ok())
})
})
.unwrap_or_default()
}
#[cfg(feature = "table_metadata")]
fn split_stream_metadata(
metadata: Option<std::collections::BTreeMap<String, String>>,
) -> (String, std::collections::BTreeMap<String, String>) {
match metadata {
None => (String::new(), std::collections::BTreeMap::new()),
Some(mut m) => {
let name = m.remove(TABLE_NAME_KEY).unwrap_or_default();
(name, m)
}
}
}
#[cfg(not(feature = "table_metadata"))]
fn extract_table_name(
metadata: &Option<std::collections::BTreeMap<String, String>>,
) -> String {
metadata
.as_ref()
.and_then(|m| m.get(TABLE_NAME_KEY))
.cloned()
.unwrap_or_default()
}
pub fn try_capsule_array(obj: &Bound<PyAny>) -> Option<PyMinarrowResult<FieldArray>> {
let has_method = obj.hasattr("__arrow_c_array__").ok()?;
if !has_method {
return None;
}
Some(import_capsule_array(obj))
}
fn import_capsule_array(obj: &Bound<PyAny>) -> PyMinarrowResult<FieldArray> {
let py = obj.py();
let result = obj
.call_method1("__arrow_c_array__", (py.None(),))
.map_err(|e| {
PyMinarrowError::PyArrow(format!("Failed to call __arrow_c_array__: {}", e))
})?;
let tuple: &Bound<PyTuple> = result.downcast().map_err(|e| {
PyMinarrowError::PyArrow(format!(
"__arrow_c_array__ did not return a tuple: {}",
e
))
})?;
let schema_capsule = tuple.get_item(0).map_err(|e| {
PyMinarrowError::PyArrow(format!("Failed to get schema capsule: {}", e))
})?;
let array_capsule = tuple.get_item(1).map_err(|e| {
PyMinarrowError::PyArrow(format!("Failed to get array capsule: {}", e))
})?;
let schema_ptr = capsule_to_ptr(&schema_capsule, c"arrow_schema")?;
let array_ptr = capsule_to_ptr(&array_capsule, c"arrow_array")?;
let schema_box = unsafe {
let moved = Box::new(std::ptr::read(schema_ptr as *const ArrowSchema));
std::ptr::write(schema_ptr as *mut ArrowSchema, ArrowSchema::empty());
moved
};
let array_box = unsafe {
let moved = Box::new(std::ptr::read(array_ptr as *const ArrowArray));
std::ptr::write(array_ptr as *mut ArrowArray, ArrowArray::empty());
moved
};
let (array, field) = unsafe { import_from_c_owned(array_box, schema_box) };
Ok(FieldArray::new(field, (*array).clone()))
}
type StreamImportResult = (
Vec<Vec<(Arc<minarrow::Array>, Field)>>,
Option<std::collections::BTreeMap<String, String>>,
);
pub fn try_capsule_record_batch_stream(
obj: &Bound<PyAny>,
) -> Option<PyMinarrowResult<StreamImportResult>> {
let has_method = obj.hasattr("__arrow_c_stream__").ok()?;
if !has_method {
return None;
}
Some(import_capsule_record_batch_stream(obj))
}
fn import_capsule_record_batch_stream(
obj: &Bound<PyAny>,
) -> PyMinarrowResult<StreamImportResult> {
let py = obj.py();
let capsule = obj
.call_method1("__arrow_c_stream__", (py.None(),))
.map_err(|e| {
PyMinarrowError::PyArrow(format!("Failed to call __arrow_c_stream__: {}", e))
})?;
let stream_ptr = capsule_to_ptr(&capsule, c"arrow_array_stream")? as *mut ArrowArrayStream;
let moved_stream = unsafe {
let s = std::ptr::read(stream_ptr);
std::ptr::write(stream_ptr, ArrowArrayStream::empty());
s
};
let stream_box = Box::new(moved_stream);
let raw_ptr = Box::into_raw(stream_box);
let (batches, metadata) = unsafe { import_record_batch_stream_with_metadata(raw_ptr) };
Ok((batches, metadata))
}
fn try_capsule_array_stream(
obj: &Bound<PyAny>,
) -> Option<PyMinarrowResult<(Vec<Arc<minarrow::Array>>, Field)>> {
let has_method = obj.hasattr("__arrow_c_stream__").ok()?;
if !has_method {
return None;
}
Some(import_capsule_array_stream(obj))
}
fn import_capsule_array_stream(
obj: &Bound<PyAny>,
) -> PyMinarrowResult<(Vec<Arc<minarrow::Array>>, Field)> {
let py = obj.py();
let capsule = obj
.call_method1("__arrow_c_stream__", (py.None(),))
.map_err(|e| {
PyMinarrowError::PyArrow(format!("Failed to call __arrow_c_stream__: {}", e))
})?;
let stream_ptr = capsule_to_ptr(&capsule, c"arrow_array_stream")? as *mut ArrowArrayStream;
let moved_stream = unsafe {
let s = std::ptr::read(stream_ptr);
std::ptr::write(stream_ptr, ArrowArrayStream::empty());
s
};
let stream_box = Box::new(moved_stream);
let raw_ptr = Box::into_raw(stream_box);
let result = unsafe { minarrow::ffi::arrow_c_ffi::import_array_stream(raw_ptr) };
Ok(result)
}
fn capsule_to_ptr(capsule: &Bound<PyAny>, name: &std::ffi::CStr) -> PyMinarrowResult<usize> {
let ptr = unsafe {
pyo3::ffi::PyCapsule_GetPointer(capsule.as_ptr(), name.as_ptr())
};
if ptr.is_null() {
let py = capsule.py();
if let Some(err) = PyErr::take(py) {
return Err(PyMinarrowError::PyArrow(format!(
"PyCapsule_GetPointer failed: {}",
err,
)));
}
return Err(PyMinarrowError::PyArrow(
"PyCapsule pointer is null (capsule may have been consumed already)".to_string(),
));
}
Ok(ptr as usize)
}
pub fn array_to_rust(obj: &Bound<PyAny>) -> PyMinarrowResult<FieldArray> {
if let Some(result) = try_capsule_array(obj) {
return result;
}
if let Some(result) = try_capsule_array_stream(obj) {
let (arrays, field) = result?;
if arrays.is_empty() {
return Ok(FieldArray::new(field, minarrow::Array::Null));
}
if arrays.len() == 1 {
let array = Arc::try_unwrap(arrays.into_iter().next().unwrap())
.unwrap_or_else(|arc| (*arc).clone());
return Ok(FieldArray::new(field, array));
}
use minarrow::Concatenate;
let mut iter = arrays.into_iter();
let first = Arc::try_unwrap(iter.next().unwrap())
.unwrap_or_else(|arc| (*arc).clone());
let combined = iter.fold(first, |acc, chunk| {
let arr = Arc::try_unwrap(chunk).unwrap_or_else(|arc| (*arc).clone());
acc.concat(arr).expect("Failed to concatenate array chunks")
});
return Ok(FieldArray::new(field, combined));
}
array_to_rust_c(obj)
}
fn array_to_rust_c(obj: &Bound<PyAny>) -> PyMinarrowResult<FieldArray> {
let array = Box::new(ArrowArray::empty());
let schema = Box::new(ArrowSchema::empty());
let array_ptr = Box::into_raw(array);
let schema_ptr = Box::into_raw(schema);
obj.call_method1(
"_export_to_c",
(array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t),
)
.map_err(|e| PyMinarrowError::PyArrow(format!("Failed to export PyArrow array: {}", e)))?;
let array_box = unsafe { Box::from_raw(array_ptr) };
let schema_box = unsafe { Box::from_raw(schema_ptr) };
let (array, field) = unsafe { import_from_c_owned(array_box, schema_box) };
Ok(FieldArray::new(field, (*array).clone()))
}
pub fn record_batch_to_rust(obj: &Bound<PyAny>) -> PyMinarrowResult<minarrow::Table> {
if let Some(result) = try_capsule_record_batch_stream(obj) {
let (batches, metadata) = result?;
#[cfg(feature = "table_metadata")]
let (table_name, remaining_meta) = split_stream_metadata(metadata);
#[cfg(not(feature = "table_metadata"))]
let table_name = extract_table_name(&metadata);
if batches.is_empty() {
return Ok(minarrow::Table::new(table_name, None));
}
let tables: Vec<minarrow::Table> = batches
.into_iter()
.map(|columns| {
let cols: Vec<FieldArray> = columns
.into_iter()
.map(|(array, field)| FieldArray::new(field, (*array).clone()))
.collect();
#[cfg(feature = "table_metadata")]
let table = if remaining_meta.is_empty() {
minarrow::Table::new(table_name.clone(), Some(cols))
} else {
minarrow::Table::new_with_metadata(
table_name.clone(),
Some(cols),
remaining_meta.clone(),
)
};
#[cfg(not(feature = "table_metadata"))]
let table = minarrow::Table::new(table_name.clone(), Some(cols));
table
})
.collect();
return Ok(tables.consolidate());
}
record_batch_to_rust_legacy(obj)
}
fn record_batch_to_rust_legacy(obj: &Bound<PyAny>) -> PyMinarrowResult<minarrow::Table> {
let num_columns: usize = obj
.getattr("num_columns")
.map_err(|e| PyMinarrowError::PyArrow(format!("Failed to get num_columns: {}", e)))?
.extract()
.map_err(|e| PyMinarrowError::PyArrow(format!("Failed to extract num_columns: {}", e)))?;
let schema = obj
.getattr("schema")
.map_err(|e| PyMinarrowError::PyArrow(format!("Failed to get schema: {}", e)))?;
let table_name = extract_table_name_from_pyarrow_schema(&schema);
let mut cols = Vec::with_capacity(num_columns);
for i in 0..num_columns {
let column = obj
.call_method1("column", (i,))
.map_err(|e| {
PyMinarrowError::PyArrow(format!("Failed to get column {}: {}", i, e))
})?;
let field = schema
.call_method1("field", (i,))
.map_err(|e| {
PyMinarrowError::PyArrow(format!("Failed to get field {}: {}", i, e))
})?;
let name: String = field
.getattr("name")
.map_err(|e| PyMinarrowError::PyArrow(format!("Failed to get field name: {}", e)))?
.extract()
.map_err(|e| {
PyMinarrowError::PyArrow(format!("Failed to extract field name: {}", e))
})?;
let mut field_array = array_to_rust(&column)?;
field_array.field = Arc::new(Field::new(
name,
field_array.field.dtype.clone(),
field_array.field.nullable,
None,
));
cols.push(field_array);
}
let table = minarrow::Table::new(table_name, Some(cols));
Ok(table)
}
pub fn table_to_rust(obj: &Bound<PyAny>) -> PyMinarrowResult<SuperTable> {
if let Some(result) = try_capsule_record_batch_stream(obj) {
let (batches, metadata) = result?;
#[cfg(feature = "table_metadata")]
let (table_name, remaining_meta) = split_stream_metadata(metadata);
#[cfg(not(feature = "table_metadata"))]
let table_name = extract_table_name(&metadata);
if batches.is_empty() {
return Ok(SuperTable::new(table_name));
}
let mut tables = Vec::with_capacity(batches.len());
for columns in batches {
let cols: Vec<FieldArray> = columns
.into_iter()
.map(|(array, field)| FieldArray::new(field, (*array).clone()))
.collect();
#[cfg(feature = "table_metadata")]
let table = if remaining_meta.is_empty() {
minarrow::Table::new(table_name.clone(), Some(cols))
} else {
minarrow::Table::new_with_metadata(
table_name.clone(),
Some(cols),
remaining_meta.clone(),
)
};
#[cfg(not(feature = "table_metadata"))]
let table = minarrow::Table::new(table_name.clone(), Some(cols));
tables.push(Arc::new(table));
}
return Ok(SuperTable::from_batches(tables, None));
}
table_to_rust_legacy(obj)
}
fn table_to_rust_legacy(obj: &Bound<PyAny>) -> PyMinarrowResult<SuperTable> {
let table_name = obj
.getattr("schema")
.ok()
.map(|s| extract_table_name_from_pyarrow_schema(&s))
.unwrap_or_default();
let batches = obj
.call_method0("to_batches")
.map_err(|e| {
PyMinarrowError::PyArrow(format!("Failed to get batches from Table: {}", e))
})?;
let batches_list: Vec<Bound<PyAny>> = batches
.extract()
.map_err(|e| {
PyMinarrowError::PyArrow(format!("Failed to extract batches list: {}", e))
})?;
if batches_list.is_empty() {
return Ok(SuperTable::new(table_name));
}
let mut tables = Vec::with_capacity(batches_list.len());
for batch in batches_list {
let table = record_batch_to_rust(&batch)?;
tables.push(Arc::new(table));
}
Ok(SuperTable::from_batches(tables, None))
}
pub fn chunked_array_to_rust(obj: &Bound<PyAny>) -> PyMinarrowResult<SuperArray> {
if let Some(result) = try_capsule_array_stream(obj) {
let (arrays, field) = result?;
if arrays.is_empty() {
return Ok(SuperArray::new());
}
let field_arrays: Vec<FieldArray> = arrays
.into_iter()
.map(|array| FieldArray::new(field.clone(), (*array).clone()))
.collect();
return Ok(SuperArray::from_field_array_chunks(field_arrays));
}
chunked_array_to_rust_legacy(obj)
}
fn chunked_array_to_rust_legacy(obj: &Bound<PyAny>) -> PyMinarrowResult<SuperArray> {
let chunks = obj
.getattr("chunks")
.map_err(|e| PyMinarrowError::PyArrow(format!("Failed to get chunks: {}", e)))?;
let chunks_list: Vec<Bound<PyAny>> = chunks
.extract()
.map_err(|e| {
PyMinarrowError::PyArrow(format!("Failed to extract chunks list: {}", e))
})?;
if chunks_list.is_empty() {
return Ok(SuperArray::new());
}
let first_fa = array_to_rust(&chunks_list[0])?;
let field = first_fa.field.clone();
let mut field_arrays = Vec::with_capacity(chunks_list.len());
field_arrays.push(first_fa);
for chunk in chunks_list.iter().skip(1) {
let chunk_fa = array_to_rust(chunk)?;
let field_array = FieldArray::new((*field).clone(), chunk_fa.array);
field_arrays.push(field_array);
}
Ok(SuperArray::from_field_array_chunks(field_arrays))
}