use minarrow::ffi::arrow_c_ffi::{
export_array_stream, export_record_batch_stream_with_metadata, export_to_c, ArrowArray,
ArrowArrayStream, ArrowSchema,
};
use minarrow::ffi::arrow_dtype::{ArrowType, CategoricalIndexType};
#[cfg(feature = "datetime")]
use minarrow::enums::time_units::TimeUnit;
use minarrow::ffi::schema::Schema;
use minarrow::{Array, Field, SuperArray, SuperTable, Table};
use pyo3::ffi::Py_uintptr_t;
use pyo3::prelude::*;
use pyo3::types::{IntoPyDict, PyList};
use std::sync::Arc;
use crate::error::PyMinarrowError;
pub(crate) const TABLE_NAME_KEY: &str = "minarrow:table_name";
#[cfg(feature = "datetime")]
fn time_unit_to_str(unit: &TimeUnit) -> &'static str {
match unit {
TimeUnit::Seconds => "s",
TimeUnit::Milliseconds => "ms",
TimeUnit::Microseconds => "us",
TimeUnit::Nanoseconds => "ns",
TimeUnit::Days => "s", }
}
fn arrow_type_to_pyarrow<'py>(
dtype: &ArrowType,
py: Python<'py>,
) -> PyResult<Bound<'py, PyAny>> {
let pa = py.import("pyarrow")?;
match dtype {
ArrowType::Null => pa.call_method0("null"),
ArrowType::Boolean => pa.call_method0("bool_"),
#[cfg(feature = "extended_numeric_types")]
ArrowType::Int8 => pa.call_method0("int8"),
#[cfg(feature = "extended_numeric_types")]
ArrowType::Int16 => pa.call_method0("int16"),
ArrowType::Int32 => pa.call_method0("int32"),
ArrowType::Int64 => pa.call_method0("int64"),
#[cfg(feature = "extended_numeric_types")]
ArrowType::UInt8 => pa.call_method0("uint8"),
#[cfg(feature = "extended_numeric_types")]
ArrowType::UInt16 => pa.call_method0("uint16"),
ArrowType::UInt32 => pa.call_method0("uint32"),
ArrowType::UInt64 => pa.call_method0("uint64"),
ArrowType::Float32 => pa.call_method0("float32"),
ArrowType::Float64 => pa.call_method0("float64"),
ArrowType::String => pa.call_method0("utf8"),
ArrowType::LargeString => pa.call_method0("large_utf8"),
ArrowType::Utf8View => pa.call_method0("utf8"),
#[cfg(feature = "datetime")]
ArrowType::Date32 => pa.call_method0("date32"),
#[cfg(feature = "datetime")]
ArrowType::Date64 => pa.call_method0("date64"),
#[cfg(feature = "datetime")]
ArrowType::Time32(unit) => {
let unit_str = time_unit_to_str(unit);
pa.call_method1("time32", (unit_str,))
}
#[cfg(feature = "datetime")]
ArrowType::Time64(unit) => {
let unit_str = time_unit_to_str(unit);
pa.call_method1("time64", (unit_str,))
}
#[cfg(feature = "datetime")]
ArrowType::Duration32(unit) => {
let unit_str = time_unit_to_str(unit);
pa.call_method1("duration", (unit_str,))
}
#[cfg(feature = "datetime")]
ArrowType::Duration64(unit) => {
let unit_str = time_unit_to_str(unit);
pa.call_method1("duration", (unit_str,))
}
#[cfg(feature = "datetime")]
ArrowType::Timestamp(unit, tz) => {
let unit_str = time_unit_to_str(unit);
let tz_str = tz.as_deref().unwrap_or("");
pa.call_method1("timestamp", (unit_str, tz_str))
}
#[cfg(feature = "datetime")]
ArrowType::Interval(_) => {
pa.call_method0("null")
}
ArrowType::Dictionary(key_type) => {
let index_ty = match key_type {
#[cfg(all(feature = "extended_categorical", feature = "extended_numeric_types"))]
CategoricalIndexType::UInt8 => pa.call_method0("uint8")?,
#[cfg(all(feature = "extended_categorical", feature = "extended_numeric_types"))]
CategoricalIndexType::UInt16 => pa.call_method0("uint16")?,
CategoricalIndexType::UInt32 => pa.call_method0("uint32")?,
#[cfg(feature = "extended_categorical")]
CategoricalIndexType::UInt64 => pa.call_method0("uint64")?,
};
let value_ty = pa.call_method0("utf8")?;
pa.call_method1("dictionary", (index_ty, value_ty))
}
}
}
fn build_stream_metadata(table: &Table) -> Option<std::collections::BTreeMap<String, String>> {
#[cfg(feature = "table_metadata")]
let mut m = table.metadata.clone();
#[cfg(not(feature = "table_metadata"))]
let mut m = std::collections::BTreeMap::new();
if !table.name.is_empty() {
m.insert(TABLE_NAME_KEY.to_string(), table.name.clone());
}
if m.is_empty() { None } else { Some(m) }
}
pub fn array_to_py<'py>(
array: Arc<Array>,
field: &Field,
py: Python<'py>,
) -> PyResult<Bound<'py, PyAny>> {
let pyarrow = py.import("pyarrow")?;
let schema = Schema::from(vec![field.clone()]);
let (array_ptr, schema_ptr) = export_to_c(array, schema);
let result = pyarrow
.getattr("Array")?
.call_method1(
"_import_from_c",
(array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t),
)
.map_err(|e| {
PyMinarrowError::PyArrow(format!("Failed to import array into PyArrow: {}", e))
});
unsafe {
if let Some(release) = (*schema_ptr).release {
release(schema_ptr);
}
let _ = Box::from_raw(schema_ptr);
if let Some(release) = (*array_ptr).release {
release(array_ptr);
}
let _ = Box::from_raw(array_ptr);
}
result.map_err(|e| e.into())
}
pub fn table_to_py<'py>(table: &Table, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let pyarrow = py.import("pyarrow")?;
let mut py_fields = Vec::with_capacity(table.n_cols());
let mut py_arrays = Vec::with_capacity(table.n_cols());
for fa in &table.cols {
let array = Arc::new(fa.array.clone());
let py_array = array_to_py(array, &fa.field, py)?;
let py_field = pyarrow.call_method1(
"field",
(fa.field.name.clone(), py_array.getattr("type")?),
)?;
py_fields.push(py_field);
py_arrays.push(py_array);
}
let py_fields_list = PyList::new(py, &py_fields)?;
let mut schema = pyarrow.call_method1("schema", (py_fields_list,))?;
#[cfg(feature = "table_metadata")]
{
let mut meta_entries: Vec<(String, String)> = table
.metadata
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
if !table.name.is_empty() {
meta_entries.push((TABLE_NAME_KEY.to_string(), table.name.clone()));
}
if !meta_entries.is_empty() {
let metadata = meta_entries.into_py_dict(py)?;
schema = schema.call_method1("with_metadata", (metadata,))?;
}
}
#[cfg(not(feature = "table_metadata"))]
if !table.name.is_empty() {
let metadata = [(TABLE_NAME_KEY, &table.name)].into_py_dict(py)?;
schema = schema.call_method1("with_metadata", (metadata,))?;
}
let py_arrays_list = PyList::new(py, py_arrays)?;
let kwargs = [("schema", schema)].into_py_dict(py)?;
pyarrow
.getattr("RecordBatch")?
.call_method("from_arrays", (py_arrays_list,), Some(&kwargs))
.map_err(|e| {
PyMinarrowError::PyArrow(format!("Failed to create PyArrow RecordBatch: {}", e)).into()
})
}
pub fn super_table_to_py<'py>(
super_table: &SuperTable,
py: Python<'py>,
) -> PyResult<Bound<'py, PyAny>> {
let pyarrow = py.import("pyarrow")?;
if super_table.batches.is_empty() {
let mut py_fields = Vec::with_capacity(super_table.schema.len());
for f in &super_table.schema {
let pa_type = arrow_type_to_pyarrow(&f.dtype, py)?;
let pa_field = pyarrow.call_method1("field", (&f.name, pa_type))?;
py_fields.push(pa_field);
}
let py_fields_list = PyList::new(py, &py_fields)?;
let mut schema = pyarrow.call_method1("schema", (py_fields_list,))?;
#[cfg(feature = "table_metadata")]
{
let mut meta_entries: Vec<(String, String)> = super_table
.metadata()
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
if !super_table.name.is_empty() {
meta_entries.push((TABLE_NAME_KEY.to_string(), super_table.name.clone()));
}
if !meta_entries.is_empty() {
let metadata = meta_entries.into_py_dict(py)?;
schema = schema.call_method1("with_metadata", (metadata,))?;
}
}
#[cfg(not(feature = "table_metadata"))]
if !super_table.name.is_empty() {
let metadata = [(TABLE_NAME_KEY, &super_table.name)].into_py_dict(py)?;
schema = schema.call_method1("with_metadata", (metadata,))?;
}
let empty_list = PyList::empty(py);
let kwargs = [("schema", schema)].into_py_dict(py)?;
return pyarrow
.getattr("Table")?
.call_method("from_batches", (empty_list,), Some(&kwargs))
.map_err(|e| {
PyMinarrowError::PyArrow(format!(
"Failed to create empty PyArrow Table: {}",
e
))
.into()
});
}
let mut py_batches = Vec::with_capacity(super_table.batches.len());
for batch in &super_table.batches {
let py_batch = table_to_py(batch, py)?;
py_batches.push(py_batch);
}
let py_batches_list = PyList::new(py, py_batches)?;
let py_table = pyarrow
.getattr("Table")?
.call_method1("from_batches", (py_batches_list,))
.map_err(|e| {
PyMinarrowError::PyArrow(format!("Failed to create PyArrow Table: {}", e))
})?;
#[cfg(feature = "table_metadata")]
{
let mut meta_entries: Vec<(String, String)> = super_table
.metadata()
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
if !super_table.name.is_empty() {
meta_entries.push((TABLE_NAME_KEY.to_string(), super_table.name.clone()));
}
if !meta_entries.is_empty() {
let metadata = meta_entries.into_py_dict(py)?;
return py_table
.call_method1("replace_schema_metadata", (metadata,))
.map_err(|e| {
PyMinarrowError::PyArrow(format!("Failed to set schema metadata: {}", e))
.into()
});
}
}
#[cfg(not(feature = "table_metadata"))]
if !super_table.name.is_empty() {
let metadata = [(TABLE_NAME_KEY, &super_table.name)]
.into_py_dict(py)?;
return py_table
.call_method1("replace_schema_metadata", (metadata,))
.map_err(|e| {
PyMinarrowError::PyArrow(format!("Failed to set schema metadata: {}", e)).into()
});
}
Ok(py_table)
}
pub fn super_array_to_py<'py>(
super_array: &SuperArray,
py: Python<'py>,
) -> PyResult<Bound<'py, PyAny>> {
let pyarrow = py.import("pyarrow")?;
let chunks = super_array.chunks();
if chunks.is_empty() {
let pa_type = if let Some(field) = super_array.field() {
arrow_type_to_pyarrow(&field.dtype, py)?
} else {
pyarrow.call_method0("null")?
};
let empty_list = PyList::empty(py);
let kwargs = [("type", pa_type)].into_py_dict(py)?;
return pyarrow
.call_method("chunked_array", (empty_list,), Some(&kwargs))
.map_err(|e| {
PyMinarrowError::PyArrow(format!(
"Failed to create empty PyArrow ChunkedArray: {}",
e
))
.into()
});
}
let field = super_array.field_ref();
let mut py_arrays = Vec::with_capacity(chunks.len());
for chunk in chunks {
let array = Arc::new(chunk.clone());
let py_array = array_to_py(array, field, py)?;
py_arrays.push(py_array);
}
let py_arrays_list = PyList::new(py, py_arrays)?;
pyarrow
.call_method1("chunked_array", (py_arrays_list,))
.map_err(|e| {
PyMinarrowError::PyArrow(format!("Failed to create PyArrow ChunkedArray: {}", e))
.into()
})
}
unsafe extern "C" fn arrow_schema_capsule_destructor(capsule: *mut pyo3::ffi::PyObject) {
unsafe {
let name = c"arrow_schema";
let ptr = pyo3::ffi::PyCapsule_GetPointer(capsule, name.as_ptr()) as *mut ArrowSchema;
if !ptr.is_null() {
let schema = &mut *ptr;
if let Some(release) = schema.release {
release(schema);
}
let _ = Box::from_raw(ptr);
}
}
}
unsafe extern "C" fn arrow_array_capsule_destructor(capsule: *mut pyo3::ffi::PyObject) {
unsafe {
let name = c"arrow_array";
let ptr = pyo3::ffi::PyCapsule_GetPointer(capsule, name.as_ptr()) as *mut ArrowArray;
if !ptr.is_null() {
let array = &mut *ptr;
if let Some(release) = array.release {
release(array);
}
let _ = Box::from_raw(ptr);
}
}
}
unsafe extern "C" fn arrow_stream_capsule_destructor(capsule: *mut pyo3::ffi::PyObject) {
unsafe {
let name = c"arrow_array_stream";
let ptr =
pyo3::ffi::PyCapsule_GetPointer(capsule, name.as_ptr()) as *mut ArrowArrayStream;
if !ptr.is_null() {
let stream = &mut *ptr;
if let Some(release) = stream.release {
release(stream);
}
let _ = Box::from_raw(ptr);
}
}
}
pub fn array_to_capsules<'py>(
array: Arc<Array>,
field: &Field,
py: Python<'py>,
) -> PyResult<(PyObject, PyObject)> {
let schema = Schema::from(vec![field.clone()]);
let (arr_ptr, sch_ptr) = export_to_c(array, schema);
let schema_name = c"arrow_schema";
let schema_capsule = unsafe {
let cap = pyo3::ffi::PyCapsule_New(
sch_ptr as *mut std::ffi::c_void,
schema_name.as_ptr(),
Some(arrow_schema_capsule_destructor),
);
if cap.is_null() {
let s = &mut *sch_ptr;
if let Some(release) = s.release {
release(sch_ptr);
}
let _ = Box::from_raw(sch_ptr);
let a = &mut *arr_ptr;
if let Some(release) = a.release {
release(arr_ptr);
}
let _ = Box::from_raw(arr_ptr);
return Err(pyo3::exceptions::PyRuntimeError::new_err(
"Failed to create schema PyCapsule",
));
}
Bound::from_owned_ptr(py, cap)
};
let array_name = c"arrow_array";
let array_capsule = unsafe {
let cap = pyo3::ffi::PyCapsule_New(
arr_ptr as *mut std::ffi::c_void,
array_name.as_ptr(),
Some(arrow_array_capsule_destructor),
);
if cap.is_null() {
let a = &mut *arr_ptr;
if let Some(release) = a.release {
release(arr_ptr);
}
let _ = Box::from_raw(arr_ptr);
return Err(pyo3::exceptions::PyRuntimeError::new_err(
"Failed to create array PyCapsule",
));
}
Bound::from_owned_ptr(py, cap)
};
Ok((schema_capsule.unbind(), array_capsule.unbind()))
}
pub fn table_to_stream_capsule<'py>(table: &Table, py: Python<'py>) -> PyResult<PyObject> {
let fields: Vec<Field> = table.cols.iter().map(|fa| (*fa.field).clone()).collect();
let columns: Vec<(Arc<Array>, Schema)> = table
.cols
.iter()
.map(|fa| {
(
Arc::new(fa.array.clone()),
Schema::from(vec![(*fa.field).clone()]),
)
})
.collect();
let metadata = build_stream_metadata(table);
let stream = export_record_batch_stream_with_metadata(vec![columns], fields, metadata);
let stream_ptr = Box::into_raw(stream);
let name = c"arrow_array_stream";
let capsule = unsafe {
let cap = pyo3::ffi::PyCapsule_New(
stream_ptr as *mut std::ffi::c_void,
name.as_ptr(),
Some(arrow_stream_capsule_destructor),
);
if cap.is_null() {
let s = &mut *stream_ptr;
if let Some(release) = s.release {
release(stream_ptr);
}
let _ = Box::from_raw(stream_ptr);
return Err(pyo3::exceptions::PyRuntimeError::new_err(
"Failed to create stream PyCapsule",
));
}
Bound::from_owned_ptr(py, cap)
};
Ok(capsule.unbind())
}
pub fn super_table_to_stream_capsule<'py>(
super_table: &SuperTable,
py: Python<'py>,
) -> PyResult<PyObject> {
if super_table.batches.is_empty() {
return Err(pyo3::exceptions::PyValueError::new_err(
"Cannot export empty SuperTable as stream capsule",
));
}
let fields: Vec<Field> = super_table.batches[0]
.cols
.iter()
.map(|fa| (*fa.field).clone())
.collect();
let batches: Vec<Vec<(Arc<Array>, Schema)>> = super_table
.batches
.iter()
.map(|table| {
table
.cols
.iter()
.map(|fa| {
(
Arc::new(fa.array.clone()),
Schema::from(vec![(*fa.field).clone()]),
)
})
.collect()
})
.collect();
let metadata = build_stream_metadata(&super_table.batches[0]);
let stream = export_record_batch_stream_with_metadata(batches, fields, metadata);
let stream_ptr = Box::into_raw(stream);
let name = c"arrow_array_stream";
let capsule = unsafe {
let cap = pyo3::ffi::PyCapsule_New(
stream_ptr as *mut std::ffi::c_void,
name.as_ptr(),
Some(arrow_stream_capsule_destructor),
);
if cap.is_null() {
let s = &mut *stream_ptr;
if let Some(release) = s.release {
release(stream_ptr);
}
let _ = Box::from_raw(stream_ptr);
return Err(pyo3::exceptions::PyRuntimeError::new_err(
"Failed to create stream PyCapsule",
));
}
Bound::from_owned_ptr(py, cap)
};
Ok(capsule.unbind())
}
pub fn super_array_to_stream_capsule<'py>(
super_array: &SuperArray,
py: Python<'py>,
) -> PyResult<PyObject> {
let chunks = super_array.chunks();
if chunks.is_empty() {
return Err(pyo3::exceptions::PyValueError::new_err(
"Cannot export empty SuperArray as stream capsule",
));
}
let field = super_array.field_ref().clone();
let array_chunks: Vec<Arc<Array>> = chunks.iter().map(|c| Arc::new(c.clone())).collect();
let stream = export_array_stream(array_chunks, field);
let stream_ptr = Box::into_raw(stream);
let name = c"arrow_array_stream";
let capsule = unsafe {
let cap = pyo3::ffi::PyCapsule_New(
stream_ptr as *mut std::ffi::c_void,
name.as_ptr(),
Some(arrow_stream_capsule_destructor),
);
if cap.is_null() {
let s = &mut *stream_ptr;
if let Some(release) = s.release {
release(stream_ptr);
}
let _ = Box::from_raw(stream_ptr);
return Err(pyo3::exceptions::PyRuntimeError::new_err(
"Failed to create stream PyCapsule",
));
}
Bound::from_owned_ptr(py, cap)
};
Ok(capsule.unbind())
}