use alloc::{
boxed::Box,
format,
string::{String, ToString},
sync::Arc,
vec,
vec::Vec,
};
use core::ops::{Add, Sub};
use arrow::{
array::{Array, ArrayRef, BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array},
datatypes::DataType,
error::ArrowError,
record_batch::RecordBatch,
};
use arrow_arith::numeric::{add, sub};
use gguppy_core::data::{ChunkedSlices, DataFrameAdapter, SeriesAdapter, SeriesAdapterError};
pub struct ArrowSeriesAdapter {
pub array: ArrayRef,
pub name: String,
}
impl SeriesAdapter<'_> for ArrowSeriesAdapter {
type LibError = ArrowError;
fn name(&self) -> &str {
&self.name
}
fn len(&self) -> usize {
self.array.len()
}
fn is_empty(&self) -> bool {
self.array.len() == 0
}
fn as_chunked_slices(&self) -> Result<ChunkedSlices<'_>, SeriesAdapterError> {
match self.array.data_type() {
DataType::Int32 => {
if let Some(array) = self.array.as_any().downcast_ref::<Int32Array>() {
Ok(ChunkedSlices::I32(vec![array.values()]))
} else {
Err(SeriesAdapterError::DowncastError {
expected_type: "Int32Array".to_string(),
actual_type: format!("{:?}", self.array.data_type()),
reason: "From arrow".to_string(),
})
}
}
DataType::Int64 => {
if let Some(array) = self.array.as_any().downcast_ref::<Int64Array>() {
Ok(ChunkedSlices::I64(vec![array.values()]))
} else {
Err(SeriesAdapterError::DowncastError {
expected_type: "Int64Array".to_string(),
actual_type: format!("{:?}", self.array.data_type()),
reason: "From arrow".to_string(),
})
}
}
DataType::Float32 => {
if let Some(array) = self.array.as_any().downcast_ref::<Float32Array>() {
Ok(ChunkedSlices::F32(vec![array.values()]))
} else {
Err(SeriesAdapterError::DowncastError {
expected_type: "Float32Array".to_string(),
actual_type: format!("{:?}", self.array.data_type()),
reason: "From arrow".to_string(),
})
}
}
DataType::Float64 => {
if let Some(array) = self.array.as_any().downcast_ref::<Float64Array>() {
Ok(ChunkedSlices::F64(vec![array.values()]))
} else {
Err(SeriesAdapterError::DowncastError {
expected_type: "Float64Array".to_string(),
actual_type: format!("{:?}", self.array.data_type()),
reason: "From arrow".to_string(),
})
}
}
DataType::Boolean => {
if let Some(array) = self.array.as_any().downcast_ref::<BooleanArray>() {
let values: Vec<bool> = (0..array.len()).map(|i| array.value(i)).collect();
let leaked_slice = Box::leak(values.into_boxed_slice());
Ok(ChunkedSlices::Bool(vec![leaked_slice]))
} else {
Err(SeriesAdapterError::DowncastError {
expected_type: "BooleanArray".to_string(),
actual_type: format!("{:?}", self.array.data_type()),
reason: "From arrow".to_string(),
})
}
}
_ => Err(SeriesAdapterError::UnsupportedDataType(format!(
"{:?}",
self.array.data_type()
))),
}
}
}
impl Add for ArrowSeriesAdapter {
type Output = Result<Self, ArrowError>;
fn add(self, rhs: Self) -> Self::Output {
let result = add(&self.array, &rhs.array)?;
Ok(ArrowSeriesAdapter {
array: result,
name: format!("({}+{})", self.name, rhs.name),
})
}
}
impl Sub for ArrowSeriesAdapter {
type Output = Result<Self, ArrowError>;
fn sub(self, rhs: Self) -> Self::Output {
let result = sub(&self.array, &rhs.array)?;
Ok(ArrowSeriesAdapter {
array: result,
name: format!("({}-{})", self.name, rhs.name),
})
}
}
pub struct ArrowDataFrameAdapter(RecordBatch);
impl<'d> DataFrameAdapter<'d> for ArrowDataFrameAdapter {
type DataFrame = ArrowDataFrameAdapter;
type Series = ArrowSeriesAdapter;
type LibError = ArrowError;
fn column_names(&self) -> Vec<String> {
self.0
.schema()
.fields()
.iter()
.map(|field| field.name().clone())
.collect()
}
fn col(&'d self, name: &str) -> Result<Self::Series, Self::LibError> {
let column_index = self.0.schema().index_of(name).map_err(|_| {
ArrowError::InvalidArgumentError(format!("Column \"{name}\" not found"))
})?;
let array = self.0.column(column_index).clone();
Ok(ArrowSeriesAdapter {
array,
name: name.to_string(),
})
}
fn select(&'d self, names: Vec<&str>) -> Result<Self::DataFrame, Self::LibError> {
let selected_columns: Result<Vec<ArrayRef>, ArrowError> = names
.iter()
.map(|&name| {
let column_index = self.0.schema().index_of(name).map_err(|_| {
ArrowError::InvalidArgumentError(format!("Column \"{name}\" not found"))
})?;
Ok(self.0.column(column_index).clone())
})
.collect();
let selected_columns = selected_columns?;
let selected_indices: Result<Vec<usize>, ArrowError> = names
.iter()
.map(|&name| {
let column_index = self.0.schema().index_of(name).map_err(|_| {
ArrowError::InvalidArgumentError(format!("Column {name} not found"))
})?;
Ok(column_index)
})
.collect();
let selected_indices = selected_indices?;
let selected_schema = self.0.schema().project(&selected_indices)?;
let selected_record_batch =
RecordBatch::try_new(Arc::new(selected_schema), selected_columns)?;
Ok(ArrowDataFrameAdapter(selected_record_batch))
}
fn shape(&self) -> (usize, usize) {
(self.0.num_rows(), self.0.num_columns())
}
}
#[cfg(test)]
mod test {
use arrow::array::StringArray;
use super::*;
#[test]
fn series_adapter_name() {
let series = ArrowSeriesAdapter {
array: Arc::new(Int32Array::from(vec![1, 2, 3])),
name: "test".to_string(),
};
assert_eq!(series.name(), "test");
}
#[test]
fn series_adapter_len() {
let series = ArrowSeriesAdapter {
array: Arc::new(Int32Array::from(vec![1, 2, 3])),
name: "test".to_string(),
};
assert_eq!(series.len(), 3);
assert!(!series.is_empty());
}
#[test]
fn series_adapter_empty() {
let series = ArrowSeriesAdapter {
array: Arc::new(Int32Array::from(vec![0i32; 0])),
name: "empty".to_string(),
};
assert_eq!(series.len(), 0);
assert!(series.is_empty());
}
#[test]
fn series_adapter_as_chunked_slice() {
let series = ArrowSeriesAdapter {
array: Arc::new(Int32Array::from(vec![1, 2, 3])),
name: "test".to_string(),
};
let chunked_slices = series.as_chunked_slices();
assert!(chunked_slices.is_ok());
assert!(matches!(chunked_slices, Ok(ChunkedSlices::I32(_))));
if let Ok(ChunkedSlices::I32(slices)) = chunked_slices {
assert_eq!(slices.len(), 1);
assert_eq!(slices[0], &[1, 2, 3]);
}
}
#[test]
fn series_adapter_add() {
let series1 = ArrowSeriesAdapter {
array: Arc::new(Int32Array::from(vec![1, 2, 3])),
name: "series1".to_string(),
};
let series2 = ArrowSeriesAdapter {
array: Arc::new(Int32Array::from(vec![4, 5, 6])),
name: "series2".to_string(),
};
let result = series1 + series2;
assert!(result.is_ok());
let result_series = result.unwrap();
assert_eq!(result_series.name, "(series1+series2)");
assert_eq!(result_series.len(), 3);
assert!(matches!(result_series.array.data_type(), DataType::Int32));
assert_eq!(
result_series
.array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.values(),
&[5, 7, 9]
);
}
#[test]
fn series_adapter_sub() {
let series1 = ArrowSeriesAdapter {
array: Arc::new(Int32Array::from(vec![4, 5, 6])),
name: "series1".to_string(),
};
let series2 = ArrowSeriesAdapter {
array: Arc::new(Int32Array::from(vec![1, 2, 3])),
name: "series2".to_string(),
};
let result = series1 - series2;
assert!(result.is_ok());
let result_series = result.unwrap();
assert_eq!(result_series.name, "(series1-series2)");
assert_eq!(result_series.len(), 3);
assert!(matches!(result_series.array.data_type(), DataType::Int32));
assert_eq!(
result_series
.array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.values(),
&[3, 3, 3]
);
}
#[test]
fn dataframe_adapter_column_names() {
let arrays: Vec<(&str, ArrayRef)> = vec![
("alpha", Arc::new(Int32Array::from(vec![1, 2, 3]))),
("beta", Arc::new(Int32Array::from(vec![4, 5, 6]))),
("gamma", Arc::new(Int32Array::from(vec![7, 8, 9]))),
];
let df = RecordBatch::try_from_iter(arrays).unwrap();
let adapter = ArrowDataFrameAdapter(df);
let names = adapter.column_names();
assert_eq!(names.len(), 3);
assert!(names.contains(&"alpha".to_string()));
assert!(names.contains(&"beta".to_string()));
assert!(names.contains(&"gamma".to_string()));
assert_eq!(
names,
vec!["alpha".to_string(), "beta".to_string(), "gamma".to_string()]
);
}
#[test]
fn dataframe_adapter_col() {
let arrays: Vec<(&str, ArrayRef)> = vec![
("numbers", Arc::new(Int32Array::from(vec![10, 20, 30]))),
("letters", Arc::new(StringArray::from(vec!["x", "y", "z"]))),
];
let df = RecordBatch::try_from_iter(arrays).unwrap();
let adapter = ArrowDataFrameAdapter(df);
let numbers_col = adapter.col("numbers").unwrap();
assert_eq!(numbers_col.name(), "numbers");
assert_eq!(numbers_col.len(), 3);
let letters_col = adapter.col("letters").unwrap();
assert_eq!(letters_col.name(), "letters");
assert_eq!(letters_col.len(), 3);
let result = adapter.col("nonexistent");
assert!(result.is_err());
assert!(matches!(result, Err(ArrowError::InvalidArgumentError(_))));
if let Err(ArrowError::InvalidArgumentError(name)) = result {
assert_eq!(name.to_string(), "Column \"nonexistent\" not found");
}
}
#[test]
fn dataframe_adapter_select() {
let arrays: Vec<(&str, ArrayRef)> = vec![
("a", Arc::new(Int32Array::from(vec![1, 2, 3]))),
("b", Arc::new(Int32Array::from(vec![4, 5, 6]))),
("c", Arc::new(Int32Array::from(vec![7, 8, 9]))),
("d", Arc::new(StringArray::from(vec!["x", "y", "z"]))),
];
let df = RecordBatch::try_from_iter(arrays).unwrap();
let adapter = ArrowDataFrameAdapter(df);
let selected = adapter.select(vec!["a", "c"]).unwrap();
assert_eq!(selected.shape(), (3, 2));
assert_eq!(
selected.column_names(),
vec!["a".to_string(), "c".to_string()]
);
let single = adapter.select(vec!["d"]).unwrap();
assert_eq!(single.shape(), (3, 1));
assert_eq!(single.column_names(), vec!["d".to_string()]);
let empty = adapter.select(vec![]);
assert!(empty.is_err()); assert!(matches!(empty, Err(ArrowError::InvalidArgumentError(_))));
if let Err(ArrowError::InvalidArgumentError(name)) = empty {
assert_eq!(
name.to_string(),
"must either specify a row count or at least one column"
);
}
}
}