use std::convert::{From, TryFrom};
use std::ptr::{addr_of, addr_of_mut};
use std::sync::Arc;
use pyo3::exceptions::{PyTypeError, PyValueError};
use pyo3::ffi::Py_uintptr_t;
use pyo3::import_exception;
use pyo3::prelude::*;
use pyo3::types::{PyDict, PyList, PyTuple};
use crate::array::{make_array, Array, ArrayData};
use crate::datatypes::{DataType, Field, Schema};
use crate::error::ArrowError;
use crate::ffi;
use crate::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
use crate::ffi_stream::{
export_reader_into_raw, ArrowArrayStreamReader, FFI_ArrowArrayStream,
};
use crate::record_batch::RecordBatch;
import_exception!(pyarrow, ArrowException);
pub type PyArrowException = ArrowException;
fn to_py_err(err: ArrowError) -> PyErr {
PyArrowException::new_err(err.to_string())
}
pub trait FromPyArrow: Sized {
fn from_pyarrow(value: &PyAny) -> PyResult<Self>;
}
pub trait ToPyArrow {
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject>;
}
pub trait IntoPyArrow {
fn into_pyarrow(self, py: Python) -> PyResult<PyObject>;
}
impl<T: ToPyArrow> IntoPyArrow for T {
fn into_pyarrow(self, py: Python) -> PyResult<PyObject> {
self.to_pyarrow(py)
}
}
fn validate_class(expected: &str, value: &PyAny) -> PyResult<()> {
let pyarrow = PyModule::import(value.py(), "pyarrow")?;
let class = pyarrow.getattr(expected)?;
if !value.is_instance(class)? {
let expected_module = class.getattr("__module__")?.extract::<&str>()?;
let expected_name = class.getattr("__name__")?.extract::<&str>()?;
let found_class = value.get_type();
let found_module = found_class.getattr("__module__")?.extract::<&str>()?;
let found_name = found_class.getattr("__name__")?.extract::<&str>()?;
return Err(PyTypeError::new_err(format!(
"Expected instance of {}.{}, got {}.{}",
expected_module, expected_name, found_module, found_name
)));
}
Ok(())
}
impl FromPyArrow for DataType {
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
validate_class("DataType", value)?;
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
value.call_method1("_export_to_c", (c_schema_ptr as Py_uintptr_t,))?;
let dtype = DataType::try_from(&c_schema).map_err(to_py_err)?;
Ok(dtype)
}
}
impl ToPyArrow for DataType {
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let module = py.import("pyarrow")?;
let class = module.getattr("DataType")?;
let dtype =
class.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
Ok(dtype.into())
}
}
impl FromPyArrow for Field {
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
validate_class("Field", value)?;
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
value.call_method1("_export_to_c", (c_schema_ptr as Py_uintptr_t,))?;
let field = Field::try_from(&c_schema).map_err(to_py_err)?;
Ok(field)
}
}
impl ToPyArrow for Field {
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let module = py.import("pyarrow")?;
let class = module.getattr("Field")?;
let dtype =
class.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
Ok(dtype.into())
}
}
impl FromPyArrow for Schema {
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
validate_class("Schema", value)?;
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
value.call_method1("_export_to_c", (c_schema_ptr as Py_uintptr_t,))?;
let schema = Schema::try_from(&c_schema).map_err(to_py_err)?;
Ok(schema)
}
}
impl ToPyArrow for Schema {
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let module = py.import("pyarrow")?;
let class = module.getattr("Schema")?;
let schema =
class.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
Ok(schema.into())
}
}
impl FromPyArrow for ArrayData {
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
validate_class("Array", value)?;
let mut array = FFI_ArrowArray::empty();
let mut schema = FFI_ArrowSchema::empty();
value.call_method1(
"_export_to_c",
(
addr_of_mut!(array) as Py_uintptr_t,
addr_of_mut!(schema) as Py_uintptr_t,
),
)?;
let ffi_array = ffi::ArrowArray::new(array, schema);
let data = ArrayData::try_from(ffi_array).map_err(to_py_err)?;
Ok(data)
}
}
impl ToPyArrow for ArrayData {
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let array = FFI_ArrowArray::new(self);
let schema = FFI_ArrowSchema::try_from(self.data_type()).map_err(to_py_err)?;
let module = py.import("pyarrow")?;
let class = module.getattr("Array")?;
let array = class.call_method1(
"_import_from_c",
(
addr_of!(array) as Py_uintptr_t,
addr_of!(schema) as Py_uintptr_t,
),
)?;
Ok(array.to_object(py))
}
}
impl<T: FromPyArrow> FromPyArrow for Vec<T> {
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
let list = value.downcast::<PyList>()?;
list.iter().map(|x| T::from_pyarrow(x)).collect()
}
}
impl<T: ToPyArrow> ToPyArrow for Vec<T> {
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let values = self
.iter()
.map(|v| v.to_pyarrow(py))
.collect::<PyResult<Vec<_>>>()?;
Ok(values.to_object(py))
}
}
impl FromPyArrow for RecordBatch {
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
validate_class("RecordBatch", value)?;
let schema = value.getattr("schema")?;
let schema = Arc::new(Schema::from_pyarrow(schema)?);
let arrays = value.getattr("columns")?.downcast::<PyList>()?;
let arrays = arrays
.iter()
.map(|a| Ok(make_array(ArrayData::from_pyarrow(a)?)))
.collect::<PyResult<_>>()?;
let batch = RecordBatch::try_new(schema, arrays).map_err(to_py_err)?;
Ok(batch)
}
}
impl ToPyArrow for RecordBatch {
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let mut py_arrays = vec![];
let schema = self.schema();
let columns = self.columns().iter();
for array in columns {
py_arrays.push(array.to_data().to_pyarrow(py)?);
}
let py_schema = schema.to_pyarrow(py)?;
let module = py.import("pyarrow")?;
let class = module.getattr("RecordBatch")?;
let args = (py_arrays,);
let kwargs = PyDict::new(py);
kwargs.set_item("schema", py_schema)?;
let record = class.call_method("from_arrays", args, Some(kwargs))?;
Ok(PyObject::from(record))
}
}
impl FromPyArrow for ArrowArrayStreamReader {
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
validate_class("RecordBatchReader", value)?;
let mut stream = FFI_ArrowArrayStream::empty();
let stream_ptr = &mut stream as *mut FFI_ArrowArrayStream;
let args = PyTuple::new(value.py(), [stream_ptr as Py_uintptr_t]);
value.call_method1("_export_to_c", args)?;
let stream_reader = ArrowArrayStreamReader::try_new(stream)
.map_err(|err| PyValueError::new_err(err.to_string()))?;
Ok(stream_reader)
}
}
impl IntoPyArrow for ArrowArrayStreamReader {
fn into_pyarrow(self, py: Python) -> PyResult<PyObject> {
let mut stream = FFI_ArrowArrayStream::empty();
unsafe { export_reader_into_raw(Box::new(self), &mut stream) };
let stream_ptr = (&mut stream) as *mut FFI_ArrowArrayStream;
let module = py.import("pyarrow")?;
let class = module.getattr("RecordBatchReader")?;
let args = PyTuple::new(py, [stream_ptr as Py_uintptr_t]);
let reader = class.call_method1("_import_from_c", args)?;
Ok(PyObject::from(reader))
}
}
#[derive(Debug)]
pub struct PyArrowType<T: FromPyArrow + IntoPyArrow>(pub T);
impl<'source, T: FromPyArrow + IntoPyArrow> FromPyObject<'source> for PyArrowType<T> {
fn extract(value: &'source PyAny) -> PyResult<Self> {
Ok(Self(T::from_pyarrow(value)?))
}
}
impl<T: FromPyArrow + IntoPyArrow> IntoPy<PyObject> for PyArrowType<T> {
fn into_py(self, py: Python) -> PyObject {
match self.0.into_pyarrow(py) {
Ok(obj) => obj,
Err(err) => err.to_object(py),
}
}
}
impl<T: FromPyArrow + IntoPyArrow> From<T> for PyArrowType<T> {
fn from(s: T) -> Self {
Self(s)
}
}