#![feature(allocator_api)]
#![feature(slice_ptr_get)]
#![feature(portable_simd)]
use pyo3::prelude::*;
use std::sync::Arc;
pub mod error;
pub mod ffi;
pub mod types;
pub use error::{PyMinarrowError, PyMinarrowResult};
pub use types::{PyArray, PyChunkedArray, PyField, PyRecordBatch, PyTable};
pub use minarrow::{Array, Field, FieldArray, MaskedArray, NumericArray, SuperArray, SuperTable, Table, TextArray};
#[pyfunction]
fn echo_array(arr: PyArray) -> PyResult<PyArray> {
Ok(arr)
}
#[pyfunction]
fn echo_batch(batch: PyRecordBatch) -> PyResult<PyRecordBatch> {
Ok(batch)
}
#[pyfunction]
fn array_info(arr: PyArray) -> PyResult<String> {
let inner = arr.inner();
Ok(format!(
"MinArrow Array: len={}, null_count={}",
inner.len(),
inner.null_count()
))
}
#[pyfunction]
fn batch_info(batch: PyRecordBatch) -> PyResult<String> {
let inner = batch.inner();
Ok(format!(
"MinArrow Table: rows={}, cols={}",
inner.n_rows(),
inner.n_cols()
))
}
#[pyfunction]
fn echo_table(table: PyTable) -> PyResult<PyTable> {
Ok(table)
}
#[pyfunction]
fn echo_chunked(arr: PyChunkedArray) -> PyResult<PyChunkedArray> {
Ok(arr)
}
#[pyfunction]
fn table_info(table: PyTable) -> PyResult<String> {
let inner = table.inner();
Ok(format!(
"MinArrow SuperTable: batches={}, rows={}, cols={}",
inner.batches.len(),
inner.n_rows,
inner.schema.len()
))
}
#[pyfunction]
fn chunked_info(arr: PyChunkedArray) -> PyResult<String> {
let inner = arr.inner();
Ok(format!(
"MinArrow SuperArray: chunks={}, len={}",
inner.n_chunks(),
inner.len()
))
}
#[pyfunction]
fn export_array_capsule(py: Python, arr: PyArray) -> PyResult<ArrowArrayWrapper> {
let fa = arr.field_array();
let array = Arc::new(fa.array.clone());
let (schema_capsule, array_capsule) = ffi::to_py::array_to_capsules(array, &fa.field, py)?;
Ok(ArrowArrayWrapper {
schema_capsule: Some(schema_capsule),
array_capsule: Some(array_capsule),
})
}
#[pyfunction]
fn export_batch_stream_capsule(py: Python, batch: PyRecordBatch) -> PyResult<ArrowStream> {
let table = batch.inner();
let capsule = ffi::to_py::table_to_stream_capsule(table, py)?;
Ok(ArrowStream {
capsule: Some(capsule),
})
}
#[pyfunction]
fn export_table_stream_capsule(py: Python, table: PyTable) -> PyResult<ArrowStream> {
let super_table = table.inner();
let capsule = ffi::to_py::super_table_to_stream_capsule(super_table, py)?;
Ok(ArrowStream {
capsule: Some(capsule),
})
}
#[pyfunction]
fn export_chunked_stream_capsule(py: Python, arr: PyChunkedArray) -> PyResult<ArrowStream> {
let super_array = arr.inner();
let capsule = ffi::to_py::super_array_to_stream_capsule(super_array, py)?;
Ok(ArrowStream {
capsule: Some(capsule),
})
}
#[pyclass(name = "ArrowStream")]
struct ArrowStream {
capsule: Option<PyObject>,
}
#[pymethods]
impl ArrowStream {
#[pyo3(signature = (requested_schema=None))]
fn __arrow_c_stream__(&mut self, requested_schema: Option<PyObject>) -> PyResult<PyObject> {
let _ = requested_schema;
self.capsule.take().ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err(
"ArrowStream capsule has already been consumed",
)
})
}
}
#[pyclass(name = "ArrowArray")]
struct ArrowArrayWrapper {
schema_capsule: Option<PyObject>,
array_capsule: Option<PyObject>,
}
#[pymethods]
impl ArrowArrayWrapper {
#[pyo3(signature = (requested_schema=None))]
fn __arrow_c_array__(
&mut self,
requested_schema: Option<PyObject>,
) -> PyResult<(PyObject, PyObject)> {
let _ = requested_schema;
let schema = self.schema_capsule.take();
let array = self.array_capsule.take();
match (schema, array) {
(Some(s), Some(a)) => Ok((s, a)),
_ => Err(pyo3::exceptions::PyValueError::new_err(
"ArrowArray capsules have already been consumed",
)),
}
}
}
#[pyfunction]
fn generate_sample_batch(py: Python) -> PyResult<ArrowStream> {
use minarrow::ffi::arrow_dtype::ArrowType;
let mut ids = minarrow::IntegerArray::<i64>::default();
let mut scores = minarrow::FloatArray::<f64>::default();
let mut labels = minarrow::StringArray::<u32>::default();
for i in 0..5 {
ids.push(i + 1);
scores.push((i as f64 + 1.0) * 1.1);
labels.push_str(&format!("item_{}", i + 1));
}
let table = Table::new(
"sample".to_string(),
Some(vec![
FieldArray::new(
Field::new("id", ArrowType::Int64, false, None),
Array::from_int64(ids),
),
FieldArray::new(
Field::new("score", ArrowType::Float64, false, None),
Array::from_float64(scores),
),
FieldArray::new(
Field::new("label", ArrowType::String, false, None),
Array::from_string32(labels),
),
]),
);
let capsule = ffi::to_py::table_to_stream_capsule(&table, py)?;
Ok(ArrowStream {
capsule: Some(capsule),
})
}
#[pyfunction]
fn generate_nullable_array(py: Python) -> PyResult<ArrowArrayWrapper> {
use minarrow::ffi::arrow_dtype::ArrowType;
let mut arr = minarrow::IntegerArray::<i64>::default();
arr.push(10);
arr.push_null();
arr.push(30);
arr.push_null();
arr.push(50);
let array = Array::from_int64(arr);
let field = Field::new("values", ArrowType::Int64, true, None);
let (schema_capsule, array_capsule) =
ffi::to_py::array_to_capsules(Arc::new(array), &field, py)?;
Ok(ArrowArrayWrapper {
schema_capsule: Some(schema_capsule),
array_capsule: Some(array_capsule),
})
}
#[pymodule]
fn minarrow_pyo3(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add("__doc__", "PyO3 bindings for MinArrow - zero-copy Arrow interop with Python")?;
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
m.add_function(wrap_pyfunction!(echo_array, m)?)?;
m.add_function(wrap_pyfunction!(echo_batch, m)?)?;
m.add_function(wrap_pyfunction!(echo_table, m)?)?;
m.add_function(wrap_pyfunction!(echo_chunked, m)?)?;
m.add_function(wrap_pyfunction!(array_info, m)?)?;
m.add_function(wrap_pyfunction!(batch_info, m)?)?;
m.add_function(wrap_pyfunction!(table_info, m)?)?;
m.add_function(wrap_pyfunction!(chunked_info, m)?)?;
m.add_function(wrap_pyfunction!(export_array_capsule, m)?)?;
m.add_function(wrap_pyfunction!(export_batch_stream_capsule, m)?)?;
m.add_function(wrap_pyfunction!(export_table_stream_capsule, m)?)?;
m.add_function(wrap_pyfunction!(export_chunked_stream_capsule, m)?)?;
m.add_class::<ArrowStream>()?;
m.add_class::<ArrowArrayWrapper>()?;
m.add_function(wrap_pyfunction!(generate_sample_batch, m)?)?;
m.add_function(wrap_pyfunction!(generate_nullable_array, m)?)?;
Ok(())
}