use crate::FromArrowRobj;
use anyhow::Result;
use arrow::{
array::{make_array, ArrayData},
datatypes::{DataType, Field, Schema},
ffi::{self, FFI_ArrowArray, FFI_ArrowSchema},
ffi_stream::{self, ArrowArrayStreamReader, FFI_ArrowArrayStream},
record_batch::RecordBatch,
};
use extendr_api::prelude::*;
pub fn nanoarrow_addr(robj: &Robj) -> Result<Robj, Error> {
R!("nanoarrow::nanoarrow_pointer_addr_chr")
.expect("`nanoarrow` must be installed")
.as_function()
.expect("`nanoarrow_pointer_addr_ch()` must be available")
.call(pairlist!(robj))
}
pub fn nanoarrow_export(source: &Robj, dest: String) -> Result<Robj, Error> {
R!("nanoarrow::nanoarrow_pointer_export")
.expect("`nanoarrow` must be installed")
.as_function()
.expect("`nanoarrow_pointer_export()` must be available")
.call(pairlist!(source, dest))
}
impl FromArrowRobj for Field {
fn from_arrow_robj(robj: &Robj) -> std::result::Result<Self, anyhow::Error> {
if robj.inherits("nanoarrow_schema") {
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema as usize;
let _ = nanoarrow_export(robj, c_schema_ptr.to_string());
return Ok(Field::try_from(&c_schema)?);
}
if !robj.inherits("Field") {
return Err(anyhow::anyhow!(
"did not find a `Field` or `nanoarrow_schema`"
));
}
let export_to_c = robj
.dollar("export_to_c")
.expect("export_to_c() method to be available")
.as_function()
.unwrap();
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema as usize;
let _ = export_to_c.call(pairlist!(c_schema_ptr.to_string()));
Ok(Field::try_from(&c_schema)?)
}
}
impl FromArrowRobj for DataType {
fn from_arrow_robj(robj: &Robj) -> std::result::Result<Self, anyhow::Error> {
if robj.inherits("nanoarrow_schema") {
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema as usize;
let _ = nanoarrow_export(robj, c_schema_ptr.to_string());
return Ok(DataType::try_from(&c_schema)?);
}
if !robj.inherits("DataType") {
return Err(anyhow::anyhow!(
"did not find a `DataType` or `nanoarrow_schema`"
));
}
let export_to_c = robj
.dollar("export_to_c")
.expect("export_to_c() method to be available")
.as_function()
.unwrap();
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema as usize;
let _ = export_to_c.call(pairlist!(c_schema_ptr.to_string()));
Ok(DataType::try_from(&c_schema)?)
}
}
impl FromArrowRobj for Schema {
fn from_arrow_robj(robj: &Robj) -> std::result::Result<Self, anyhow::Error> {
if robj.inherits("nanoarrow_schema") {
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema as usize;
let _ = nanoarrow_export(robj, c_schema_ptr.to_string());
return Ok(Schema::try_from(&c_schema)?);
}
if !robj.inherits("Schema") {
return Err(anyhow::anyhow!(
"did not find a `Schema` or `nanoarrow_schema`"
));
}
let export_to_c = robj
.dollar("export_to_c")
.expect("export_to_c() method to be available")
.as_function()
.unwrap();
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema as usize;
let _ = export_to_c.call(pairlist!(c_schema_ptr.to_string()));
Ok(Schema::try_from(&c_schema)?)
}
}
impl FromArrowRobj for ArrayData {
fn from_arrow_robj(robj: &Robj) -> std::result::Result<Self, anyhow::Error> {
if robj.inherits("nanoarrow_array") {
let array = FFI_ArrowArray::empty();
let schema = FFI_ArrowSchema::empty();
let c_array_ptr = &array as *const FFI_ArrowArray as usize;
let c_schema_ptr = &schema as *const FFI_ArrowSchema as usize;
let robj_schema = R!("nanoarrow::infer_nanoarrow_schema")
.unwrap()
.as_function()
.unwrap()
.call(pairlist!(robj))
.expect("unable to infer nanoarrow schema");
let _ = nanoarrow_export(robj, c_array_ptr.to_string());
let _ = nanoarrow_export(&robj_schema, c_schema_ptr.to_string());
return Ok(unsafe { ffi::from_ffi(array, &schema)? });
}
if !robj.inherits("Array") {
return Err(anyhow::anyhow!("did not find a `Array`"));
}
let array = FFI_ArrowArray::empty();
let schema = FFI_ArrowSchema::empty();
let c_array_ptr = &array as *const FFI_ArrowArray as usize;
let c_schema_ptr = &schema as *const FFI_ArrowSchema as usize;
let export_to_c = robj
.dollar("export_to_c")
.expect("export_to_c() method to be available")
.as_function()
.unwrap();
let _ = export_to_c.call(pairlist!(c_array_ptr.to_string(), c_schema_ptr.to_string()));
Ok(unsafe { ffi::from_ffi(array, &schema)? })
}
}
impl FromArrowRobj for RecordBatch {
fn from_arrow_robj(robj: &Robj) -> std::result::Result<Self, anyhow::Error> {
if robj.inherits("nanoarrow_array_stream") {
let stream = ffi_stream::FFI_ArrowArrayStream::empty();
let c_stream_ptr = &stream as *const FFI_ArrowArrayStream as usize;
let _ = nanoarrow_export(robj, c_stream_ptr.to_string());
let res = ArrowArrayStreamReader::try_new(stream)?;
let r2 = res.into_iter().map(|xi| xi.unwrap()).nth(0).unwrap();
return Ok(r2);
}
if !robj.inherits("RecordBatch") {
return Err(anyhow::anyhow!(
"did not find a `RecordBatch` or `nanoarrow_array_stream`"
));
}
let array = FFI_ArrowArray::empty();
let schema = FFI_ArrowSchema::empty();
let c_array_ptr = &array as *const FFI_ArrowArray as usize;
let c_schema_ptr = &schema as *const FFI_ArrowSchema as usize;
let export_to_c = robj
.dollar("export_to_c")
.expect("export_to_c() method to be available")
.as_function()
.unwrap();
let _ = export_to_c.call(pairlist!(c_array_ptr.to_string(), c_schema_ptr.to_string()));
let res = unsafe { ffi::from_ffi(array, &schema)? };
let schema = Schema::try_from(&schema)?;
let res_arrays = res
.child_data()
.iter()
.map(|xi| make_array(xi.clone()))
.collect::<Vec<_>>();
Ok(RecordBatch::try_new(schema.into(), res_arrays)?)
}
}
impl FromArrowRobj for ArrowArrayStreamReader {
fn from_arrow_robj(robj: &Robj) -> std::result::Result<Self, anyhow::Error> {
if !robj.inherits("nanoarrow_array_stream") {
return Err(anyhow::anyhow!("did not find `nanoarrow_array_stream`"));
}
let stream = ffi_stream::FFI_ArrowArrayStream::empty();
let c_stream_ptr = &stream as *const FFI_ArrowArrayStream as usize;
let _ = nanoarrow_export(robj, c_stream_ptr.to_string());
Ok(ArrowArrayStreamReader::try_new(stream)?)
}
}