use arrow_array::{
array::{
Array, BinaryArray, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array,
Int64Array, Int8Array, LargeBinaryArray, LargeStringArray, StringArray, UInt16Array,
UInt32Array, UInt64Array, UInt8Array,
},
RecordBatch,
};
use arrow_schema::DataType;
use zer_core::record::{FieldValue, Record};
use crate::DatasetConfig;
pub fn arrow_cell_to_field_value(col: &dyn Array, row: usize) -> FieldValue {
if col.is_null(row) {
return FieldValue::Null;
}
match col.data_type() {
DataType::Boolean => {
let arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
FieldValue::Bool(arr.value(row))
}
DataType::Int8 => {
let arr = col.as_any().downcast_ref::<Int8Array>().unwrap();
FieldValue::Int(arr.value(row) as i64)
}
DataType::Int16 => {
let arr = col.as_any().downcast_ref::<Int16Array>().unwrap();
FieldValue::Int(arr.value(row) as i64)
}
DataType::Int32 => {
let arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
FieldValue::Int(arr.value(row) as i64)
}
DataType::Int64 => {
let arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
FieldValue::Int(arr.value(row))
}
DataType::UInt8 => {
let arr = col.as_any().downcast_ref::<UInt8Array>().unwrap();
FieldValue::Int(arr.value(row) as i64)
}
DataType::UInt16 => {
let arr = col.as_any().downcast_ref::<UInt16Array>().unwrap();
FieldValue::Int(arr.value(row) as i64)
}
DataType::UInt32 => {
let arr = col.as_any().downcast_ref::<UInt32Array>().unwrap();
FieldValue::Int(arr.value(row) as i64)
}
DataType::UInt64 => {
let arr = col.as_any().downcast_ref::<UInt64Array>().unwrap();
FieldValue::UInt(arr.value(row))
}
DataType::Float32 => {
let arr = col.as_any().downcast_ref::<Float32Array>().unwrap();
FieldValue::Float(arr.value(row) as f64)
}
DataType::Float64 => {
let arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
FieldValue::Float(arr.value(row))
}
DataType::Utf8 => {
let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
FieldValue::Text(arr.value(row).to_owned())
}
DataType::LargeUtf8 => {
let arr = col.as_any().downcast_ref::<LargeStringArray>().unwrap();
FieldValue::Text(arr.value(row).to_owned())
}
DataType::Binary => {
let arr = col.as_any().downcast_ref::<BinaryArray>().unwrap();
FieldValue::Bytes(arr.value(row).to_vec())
}
DataType::LargeBinary => {
let arr = col.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
FieldValue::Bytes(arr.value(row).to_vec())
}
other => FieldValue::Text(format!("{other:?}")),
}
}
fn key_from_cell(col: &dyn Array, row: usize, row_idx: usize) -> String {
if col.is_null(row) {
return row_idx.to_string();
}
match arrow_cell_to_field_value(col, row) {
FieldValue::Text(s) => s,
FieldValue::Int(i) => i.to_string(),
FieldValue::UInt(u) => u.to_string(),
FieldValue::Float(f) => f.to_string(),
FieldValue::Bool(b) => b.to_string(),
_ => row_idx.to_string(),
}
}
pub trait ArrowIngest {
fn into_records(self, config: &DatasetConfig) -> Vec<Record>;
}
impl ArrowIngest for RecordBatch {
fn into_records(self, config: &DatasetConfig) -> Vec<Record> {
let schema = self.schema();
let n_rows = self.num_rows();
let n_cols = self.num_columns();
let key_col_idx = schema
.fields()
.iter()
.position(|f| f.name() == &config.key_column);
let mut records = Vec::with_capacity(n_rows);
for row in 0..n_rows {
let key = if let Some(idx) = key_col_idx {
key_from_cell(self.column(idx).as_ref(), row, row)
} else {
row.to_string()
};
let mut record = Record::from_key(&config.source, &key);
for col_idx in 0..n_cols {
let field_name = schema.field(col_idx).name().clone();
let col = self.column(col_idx).as_ref();
let value = arrow_cell_to_field_value(col, row);
record = record.insert(field_name, value);
}
records.push(record);
}
records
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_array::{
BinaryArray, BooleanArray, Float64Array, Int32Array, Int64Array, StringArray, UInt64Array,
};
use arrow_schema::{DataType, Field, Schema};
use std::sync::Arc;
use zer_core::record::derive_record_id;
fn make_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, true),
Field::new("name", DataType::Utf8, true),
Field::new("age", DataType::Int64, true),
Field::new("score", DataType::Float64, true),
Field::new("active", DataType::Boolean, true),
]));
RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec![
Some("k001"),
Some("k002"),
None::<&str>,
])),
Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob"), None])),
Arc::new(Int64Array::from(vec![Some(30i64), Some(25i64), None])),
Arc::new(Float64Array::from(vec![Some(0.9f64), Some(0.7f64), None])),
Arc::new(BooleanArray::from(vec![Some(true), Some(false), None])),
],
)
.unwrap()
}
#[test]
fn batch_into_records_count_and_keys() {
let config = DatasetConfig::new("src", "id");
let records = make_batch().into_records(&config);
assert_eq!(records.len(), 3);
assert_eq!(records[0].key, "k001");
assert_eq!(records[1].key, "k002");
assert_eq!(records[2].key, "2");
}
#[test]
fn batch_into_records_ids_derived_from_key() {
let config = DatasetConfig::new("src", "id");
let records = make_batch().into_records(&config);
assert_eq!(records[0].id, derive_record_id("src", "k001"));
assert_eq!(records[1].id, derive_record_id("src", "k002"));
}
#[test]
fn batch_into_records_source_attached() {
let config = DatasetConfig::new("brp", "id");
let records = make_batch().into_records(&config);
for r in &records {
assert_eq!(r.source.as_deref(), Some("brp"));
}
}
#[test]
fn batch_missing_key_column_falls_back_to_row_index() {
let config = DatasetConfig::new("src", "nonexistent");
let records = make_batch().into_records(&config);
assert_eq!(records[0].key, "0");
assert_eq!(records[1].key, "1");
assert_eq!(records[2].key, "2");
}
#[test]
fn batch_into_records_string_column() {
let config = DatasetConfig::new("src", "id");
let records = make_batch().into_records(&config);
assert_eq!(
records[0].get("name"),
Some(&FieldValue::Text("Alice".into()))
);
assert_eq!(records[2].get("name"), Some(&FieldValue::Null));
}
#[test]
fn batch_into_records_int64_column() {
let config = DatasetConfig::new("src", "id");
let records = make_batch().into_records(&config);
assert_eq!(records[0].get("age"), Some(&FieldValue::Int(30)));
assert_eq!(records[2].get("age"), Some(&FieldValue::Null));
}
#[test]
fn batch_into_records_float64_column() {
let config = DatasetConfig::new("src", "id");
let records = make_batch().into_records(&config);
assert_eq!(records[0].get("score"), Some(&FieldValue::Float(0.9)));
}
#[test]
fn batch_into_records_boolean_column() {
let config = DatasetConfig::new("src", "id");
let records = make_batch().into_records(&config);
assert_eq!(records[0].get("active"), Some(&FieldValue::Bool(true)));
assert_eq!(records[1].get("active"), Some(&FieldValue::Bool(false)));
}
#[test]
fn batch_uint64_preserved() {
let schema = Arc::new(Schema::new(vec![Field::new(
"uid",
DataType::UInt64,
false,
)]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(UInt64Array::from(vec![u64::MAX]))])
.unwrap();
let config = DatasetConfig::new("src", "uid");
let records = batch.into_records(&config);
assert_eq!(records[0].get("uid"), Some(&FieldValue::UInt(u64::MAX)));
assert_eq!(records[0].key, u64::MAX.to_string());
}
#[test]
fn batch_int32_widened_to_int64() {
let schema = Arc::new(Schema::new(vec![Field::new("val", DataType::Int32, false)]));
let batch =
RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![42i32]))]).unwrap();
let config = DatasetConfig::new("src", "nonexistent");
let records = batch.into_records(&config);
assert_eq!(records[0].get("val"), Some(&FieldValue::Int(42)));
}
#[test]
fn batch_binary_column() {
let schema = Arc::new(Schema::new(vec![Field::new(
"blob",
DataType::Binary,
false,
)]));
let batch = RecordBatch::try_new(
schema,
vec![Arc::new(BinaryArray::from_vec(vec![&[1u8, 2u8, 3u8][..]]))],
)
.unwrap();
let config = DatasetConfig::new("src", "nonexistent");
let records = batch.into_records(&config);
assert_eq!(
records[0].get("blob"),
Some(&FieldValue::Bytes(vec![1, 2, 3]))
);
}
}