use arrow::array::{DictionaryArray, RecordBatchReader, StringArray, StringDictionaryBuilder};
use arrow::datatypes::ArrowDictionaryKeyType;
use arrow::error::ArrowError;
use arrow::ipc::writer::FileWriter;
pub mod index;
pub mod query;
pub fn batches_to_ipc(batches: impl RecordBatchReader) -> Result<Vec<u8>, ArrowError> {
let schema = batches.schema();
let mut writer = FileWriter::try_new(Vec::new(), &schema)?;
for batch in batches {
writer.write(&batch?)?;
}
writer.finish()?;
writer.into_inner()
}
pub(crate) fn reset_dictarray_builder<T: ArrowDictionaryKeyType>(
builder: &mut StringDictionaryBuilder<T>,
) -> DictionaryArray<T> {
let array = builder.finish();
let dict_values = array
.values()
.as_any()
.downcast_ref::<StringArray>()
.expect("Failed to downcast to StringArray")
.clone();
*builder =
StringDictionaryBuilder::<T>::new_with_dictionary(array.len(), &dict_values).unwrap();
array
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{ArrayRef, DictionaryArray, Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;
#[test]
fn test_batches_to_ipc_with_dictionary() {
let schema = Arc::new(Schema::new(vec![Field::new(
"dict_col",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
false,
)]));
let array1: ArrayRef = {
let keys = Int32Array::from(vec![Some(0), Some(1), Some(0), Some(2)]);
let values = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
let array = DictionaryArray::try_new(keys, values).unwrap();
Arc::new(array)
};
let batch1 = RecordBatch::try_new(schema.clone(), vec![array1.clone()]).unwrap();
let array2: ArrayRef = {
let keys = Int32Array::from(vec![Some(0), Some(0), Some(0), Some(2)]);
let values = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
let array = DictionaryArray::try_new(keys, values).unwrap();
Arc::new(array)
};
let batch2 = RecordBatch::try_new(schema.clone(), vec![array2.clone()]).unwrap();
let batches = vec![batch1, batch2];
let reader = arrow::record_batch::RecordBatchIterator::new(
batches.into_iter().map(Ok),
schema.clone(),
);
let ipc_bytes = batches_to_ipc(reader).expect("Serialization failed");
let cursor = std::io::Cursor::new(ipc_bytes);
let reader =
arrow::ipc::reader::FileReader::try_new(cursor, None).expect("Deserialization failed");
let deserialized_batches: Vec<RecordBatch> = reader.collect::<Result<_, _>>().unwrap();
assert_eq!(deserialized_batches.len(), 2);
assert_eq!(deserialized_batches[0].schema(), schema);
assert_eq!(deserialized_batches[0].column(0).as_ref(), array1.as_ref());
}
}