use arrow::{
array::{
Array, ArrayRef, BooleanArray, BooleanBuilder, Float64Array, Float64Builder, Int64Array,
Int64Builder, StringArray, StringBuilder,
},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use std::sync::Arc;
use crate::core::error::{Error, Result};
use crate::dataframe::DataFrame;
use crate::series::base::Series;
pub fn dataframe_to_record_batch(df: &DataFrame) -> Result<RecordBatch> {
let column_names = df.column_names();
let mut fields: Vec<Field> = Vec::with_capacity(column_names.len());
let mut arrays: Vec<ArrayRef> = Vec::with_capacity(column_names.len());
for name in &column_names {
let (field, array) = column_to_arrow(df, name)?;
fields.push(field);
arrays.push(array);
}
let schema = Arc::new(Schema::new(fields));
if arrays.is_empty() {
return Ok(RecordBatch::new_empty(schema));
}
RecordBatch::try_new(schema, arrays)
.map_err(|e| Error::InvalidOperation(format!("Failed to create RecordBatch: {e}")))
}
pub fn record_batches_to_dataframe(batches: &[RecordBatch]) -> Result<DataFrame> {
if batches.is_empty() {
return Ok(DataFrame::new());
}
let schema = batches[0].schema();
let num_columns = schema.fields().len();
let mut df = DataFrame::new();
for col_idx in 0..num_columns {
let field = schema.field(col_idx);
let col_name = field.name().clone();
let mut values: Vec<String> = Vec::new();
for batch in batches {
let array = batch.column(col_idx);
let col_values = arrow_array_to_strings(array)?;
values.extend(col_values);
}
let series = Series::new(values, Some(col_name.clone()))
.map_err(|e| Error::InvalidOperation(format!("Series creation failed: {e}")))?;
df.add_column(col_name, series)?;
}
Ok(df)
}
pub fn record_batch_to_dataframe(batch: &RecordBatch) -> Result<DataFrame> {
record_batches_to_dataframe(&[batch.clone()])
}
fn column_to_arrow(df: &DataFrame, col_name: &str) -> Result<(Field, ArrayRef)> {
let row_count = df.row_count();
let values: Vec<Option<String>> = collect_column_values(df, col_name, row_count)?;
let detected = detect_column_type(&values);
match detected {
ColumnKind::Int64 => {
let mut builder = Int64Builder::with_capacity(row_count);
for v in &values {
match v {
None => builder.append_null(),
Some(s) if s == "null" || s.is_empty() => builder.append_null(),
Some(s) => {
let parsed = s.parse::<i64>().map_err(|e| {
Error::InvalidOperation(format!(
"Failed to parse '{s}' as Int64 in column '{col_name}': {e}"
))
})?;
builder.append_value(parsed);
}
}
}
let field = Field::new(col_name, DataType::Int64, true);
Ok((field, Arc::new(builder.finish()) as ArrayRef))
}
ColumnKind::Float64 => {
let mut builder = Float64Builder::with_capacity(row_count);
for v in &values {
match v {
None => builder.append_null(),
Some(s) if s == "null" || s.is_empty() => builder.append_null(),
Some(s) => {
let parsed = s.parse::<f64>().map_err(|e| {
Error::InvalidOperation(format!(
"Failed to parse '{s}' as Float64 in column '{col_name}': {e}"
))
})?;
builder.append_value(parsed);
}
}
}
let field = Field::new(col_name, DataType::Float64, true);
Ok((field, Arc::new(builder.finish()) as ArrayRef))
}
ColumnKind::Boolean => {
let mut builder = BooleanBuilder::with_capacity(row_count);
for v in &values {
match v {
None => builder.append_null(),
Some(s) if s == "null" || s.is_empty() => builder.append_null(),
Some(s) => {
let b = match s.to_lowercase().as_str() {
"true" | "1" | "yes" => true,
_ => false,
};
builder.append_value(b);
}
}
}
let field = Field::new(col_name, DataType::Boolean, true);
Ok((field, Arc::new(builder.finish()) as ArrayRef))
}
ColumnKind::Utf8 => {
let mut builder = StringBuilder::with_capacity(row_count, row_count * 8);
for v in &values {
match v {
None => builder.append_null(),
Some(s) if s == "null" => builder.append_null(),
Some(s) => builder.append_value(s),
}
}
let field = Field::new(col_name, DataType::Utf8, true);
Ok((field, Arc::new(builder.finish()) as ArrayRef))
}
}
}
fn collect_column_values(
df: &DataFrame,
col_name: &str,
_row_count: usize,
) -> Result<Vec<Option<String>>> {
let strings = df.get_column_string_values(col_name)?;
Ok(strings.into_iter().map(Some).collect())
}
#[derive(Debug, Clone, Copy, PartialEq)]
enum ColumnKind {
Int64,
Float64,
Boolean,
Utf8,
}
fn detect_column_type(values: &[Option<String>]) -> ColumnKind {
let non_null: Vec<&str> = values
.iter()
.filter_map(|v| v.as_deref())
.filter(|s| !s.is_empty() && *s != "null")
.collect();
if non_null.is_empty() {
return ColumnKind::Utf8;
}
let bool_words = ["true", "false", "yes", "no", "1", "0"];
if non_null
.iter()
.all(|s| bool_words.contains(&s.to_lowercase().as_str()))
{
let has_bool_word = non_null
.iter()
.any(|s| matches!(s.to_lowercase().as_str(), "true" | "false" | "yes" | "no"));
if has_bool_word {
return ColumnKind::Boolean;
}
}
if non_null.iter().all(|s| s.parse::<i64>().is_ok()) {
return ColumnKind::Int64;
}
if non_null.iter().all(|s| s.parse::<f64>().is_ok()) {
return ColumnKind::Float64;
}
ColumnKind::Utf8
}
fn arrow_array_to_strings(array: &dyn Array) -> Result<Vec<String>> {
let len = array.len();
let mut out = Vec::with_capacity(len);
match array.data_type() {
DataType::Int64 => {
let arr = array
.as_any()
.downcast_ref::<Int64Array>()
.ok_or_else(|| Error::InvalidOperation("Expected Int64Array".into()))?;
for i in 0..len {
if arr.is_null(i) {
out.push("null".to_string());
} else {
out.push(arr.value(i).to_string());
}
}
}
DataType::Float64 => {
let arr = array
.as_any()
.downcast_ref::<Float64Array>()
.ok_or_else(|| Error::InvalidOperation("Expected Float64Array".into()))?;
for i in 0..len {
if arr.is_null(i) {
out.push("null".to_string());
} else {
out.push(arr.value(i).to_string());
}
}
}
DataType::Boolean => {
let arr = array
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or_else(|| Error::InvalidOperation("Expected BooleanArray".into()))?;
for i in 0..len {
if arr.is_null(i) {
out.push("null".to_string());
} else {
out.push(arr.value(i).to_string());
}
}
}
DataType::Utf8 => {
let arr = array
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| Error::InvalidOperation("Expected StringArray".into()))?;
for i in 0..len {
if arr.is_null(i) {
out.push("null".to_string());
} else {
out.push(arr.value(i).to_string());
}
}
}
other => {
return Err(Error::NotImplemented(format!(
"Arrow type {other:?} conversion is not yet supported"
)));
}
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::dataframe::DataFrame;
use crate::series::base::Series;
fn make_df() -> DataFrame {
let mut df = DataFrame::new();
df.add_column(
"int_col".to_string(),
Series::new(
vec!["1".to_string(), "2".to_string(), "3".to_string()],
Some("int_col".to_string()),
)
.expect("series creation"),
)
.expect("add column");
df.add_column(
"str_col".to_string(),
Series::new(
vec!["a".to_string(), "b".to_string(), "c".to_string()],
Some("str_col".to_string()),
)
.expect("series creation"),
)
.expect("add column");
df
}
#[test]
fn test_dataframe_to_record_batch_shape() {
let df = make_df();
let batch = dataframe_to_record_batch(&df).expect("conversion");
assert_eq!(batch.num_columns(), 2);
assert_eq!(batch.num_rows(), 3);
}
#[test]
fn test_record_batch_to_dataframe_roundtrip() {
let df = make_df();
let batch = dataframe_to_record_batch(&df).expect("to_record_batch");
let df2 = record_batch_to_dataframe(&batch).expect("to_dataframe");
assert_eq!(df2.column_names(), df.column_names());
assert_eq!(df2.row_count(), df.row_count());
}
#[test]
fn test_record_batches_to_dataframe_concat() {
let df = make_df();
let batch = dataframe_to_record_batch(&df).expect("to_record_batch");
let combined =
record_batches_to_dataframe(&[batch.clone(), batch.clone()]).expect("concat");
assert_eq!(combined.row_count(), 6);
assert_eq!(combined.column_names(), df.column_names());
}
#[test]
fn test_empty_dataframe_to_record_batch() {
let df = DataFrame::new();
let batch = dataframe_to_record_batch(&df).expect("empty conversion");
assert_eq!(batch.num_columns(), 0);
assert_eq!(batch.num_rows(), 0);
}
#[test]
fn test_detect_column_type_int() {
let vals: Vec<Option<String>> =
vec![Some("1".into()), Some("2".into()), Some("100".into())];
assert_eq!(detect_column_type(&vals), ColumnKind::Int64);
}
#[test]
fn test_detect_column_type_float() {
let vals: Vec<Option<String>> =
vec![Some("1.5".into()), Some("2.7".into()), Some("3.0".into())];
assert_eq!(detect_column_type(&vals), ColumnKind::Float64);
}
#[test]
fn test_detect_column_type_string() {
let vals: Vec<Option<String>> = vec![Some("hello".into()), Some("world".into())];
assert_eq!(detect_column_type(&vals), ColumnKind::Utf8);
}
}