use crate::query::{ColumnInfo, QueryMetadata, QueryResult, QueryRow};
use crate::schema::CqlType;
use crate::types::{DataType, Value};
use crate::util::value_fmt::ValueFormatter;
use arrow::array::{
ArrayRef, BinaryArray, BooleanArray, Date32Array, Float32Array, Float64Array, Int16Array,
Int32Array, Int64Array, Int8Array, ListArray, MapArray, StringArray, StructArray,
Time64NanosecondArray, TimestampMillisecondArray,
};
use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow::datatypes::{DataType as ArrowDataType, Field, Fields, Schema, TimeUnit};
use arrow::record_batch::RecordBatch;
use parquet::arrow::ArrowWriter;
use parquet::basic::{Compression, ZstdLevel};
use parquet::file::properties::WriterProperties;
use std::collections::HashMap;
use std::fs::File;
use std::io::Write;
use std::sync::Arc;
use thiserror::Error;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ParquetCompression {
#[default]
Snappy,
Zstd,
Uncompressed,
}
impl ParquetCompression {
fn to_parquet(self) -> Compression {
match self {
ParquetCompression::Snappy => Compression::SNAPPY,
ParquetCompression::Zstd => Compression::ZSTD(ZstdLevel::default()),
ParquetCompression::Uncompressed => Compression::UNCOMPRESSED,
}
}
}
#[derive(Debug, Clone)]
pub struct ParquetExportOptions {
pub row_limit: Option<usize>,
pub row_group_size: usize,
pub compression: ParquetCompression,
}
impl Default for ParquetExportOptions {
fn default() -> Self {
Self {
row_limit: None,
row_group_size: 10_000,
compression: ParquetCompression::default(),
}
}
}
#[derive(Debug, Error)]
pub enum ParquetExportError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("Arrow error: {0}")]
Arrow(#[from] arrow::error::ArrowError),
#[error("Parquet error: {0}")]
Parquet(#[from] ::parquet::errors::ParquetError),
#[error("{0}")]
InvalidValue(String),
#[error("invalid Parquet export options: {0}")]
InvalidOptions(String),
}
const DECIMAL_FIXED_SCALE: i32 = 9;
const DECIMAL_MAX_PRECISION: u8 = 38;
const ARROW_EXTENSION_NAME_KEY: &str = "ARROW:extension:name";
const ARROW_UUID_EXTENSION_NAME: &str = "arrow.uuid";
fn bigint_to_i128(n: &num_bigint::BigInt) -> Result<i128, ParquetExportError> {
let tc_bytes = n.to_signed_bytes_be();
if tc_bytes.len() > 16 {
return Err(ParquetExportError::InvalidValue(
"BigInt value requires more than 16 bytes; cannot fit in i128".to_string(),
));
}
let pad: u8 = if n.sign() == num_bigint::Sign::Minus {
0xFF
} else {
0x00
};
let mut buf = [pad; 16];
buf[16 - tc_bytes.len()..].copy_from_slice(&tc_bytes);
Ok(i128::from_be_bytes(buf))
}
pub struct ParquetWriter;
impl ParquetWriter {
pub fn write(
result: &QueryResult,
options: &ParquetExportOptions,
) -> Result<Vec<u8>, ParquetExportError> {
if result.metadata.columns.is_empty() {
return Self::write_empty_parquet(options);
}
let schema = Self::build_schema(&result.metadata.columns)?;
let rows_to_process = if let Some(limit) = options.row_limit {
&result.rows[..result.rows.len().min(limit)]
} else {
&result.rows
};
let arrays = Self::convert_to_arrays(&result.metadata.columns, rows_to_process)?;
let batch = RecordBatch::try_new(Arc::new(schema), arrays)?;
Self::write_parquet(&batch, options.compression)
}
fn write_empty_parquet(options: &ParquetExportOptions) -> Result<Vec<u8>, ParquetExportError> {
let schema = Schema::empty();
let batch = RecordBatch::new_empty(Arc::new(schema));
Self::write_parquet(&batch, options.compression)
}
fn build_schema(columns: &[ColumnInfo]) -> Result<Schema, ParquetExportError> {
let fields: Vec<Field> = columns.iter().map(Self::column_to_field).collect();
Ok(Schema::new(fields))
}
fn column_to_field(col: &ColumnInfo) -> Field {
if let Some(cql_type) = &col.cql_type {
if let Some(field) = Self::cql_type_to_arrow_field(&col.name, cql_type, col.nullable) {
return field;
}
}
let arrow_type = Self::data_type_to_arrow(&col.data_type);
Field::new(&col.name, arrow_type, col.nullable)
}
fn cql_type_to_arrow_field(name: &str, cql_type: &CqlType, nullable: bool) -> Option<Field> {
match cql_type {
CqlType::Date => Some(Field::new(name, ArrowDataType::Date32, nullable)),
CqlType::Time => Some(Field::new(
name,
ArrowDataType::Time64(TimeUnit::Nanosecond),
nullable,
)),
CqlType::Decimal => Some(Field::new(
name,
ArrowDataType::Decimal128(DECIMAL_MAX_PRECISION, DECIMAL_FIXED_SCALE as i8),
nullable,
)),
CqlType::Varint => {
Some(Field::new(
name,
ArrowDataType::Decimal128(DECIMAL_MAX_PRECISION, 0),
nullable,
))
}
CqlType::Duration => {
Some(Field::new(name, ArrowDataType::Utf8, nullable))
}
CqlType::Uuid | CqlType::TimeUuid => {
let mut meta = HashMap::new();
meta.insert(
ARROW_EXTENSION_NAME_KEY.to_string(),
ARROW_UUID_EXTENSION_NAME.to_string(),
);
Some(
Field::new(name, ArrowDataType::FixedSizeBinary(16), nullable)
.with_metadata(meta),
)
}
CqlType::Inet => Some(Field::new(name, ArrowDataType::Utf8, nullable)),
CqlType::Counter => Some(Field::new(name, ArrowDataType::Int64, nullable)),
CqlType::List(inner) | CqlType::Set(inner) => {
let item_type = Self::cql_type_to_arrow_data_type(inner);
let item_field = Arc::new(Field::new("item", item_type, true));
Some(Field::new(name, ArrowDataType::List(item_field), nullable))
}
CqlType::Frozen(inner) => Self::cql_type_to_arrow_field(name, inner, nullable),
CqlType::Map(key_type, val_type) => {
let key_arrow = Self::cql_type_to_arrow_data_type(key_type);
let val_arrow = Self::cql_type_to_arrow_data_type(val_type);
let entries_field = Arc::new(Field::new(
"entries",
ArrowDataType::Struct(Fields::from(vec![
Field::new("key", key_arrow, false),
Field::new("value", val_arrow, true),
])),
false,
));
Some(Field::new(
name,
ArrowDataType::Map(entries_field, false),
nullable,
))
}
CqlType::Tuple(element_types) => {
if element_types.is_empty() {
return Some(Field::new(name, ArrowDataType::Utf8, nullable));
}
let struct_type = Self::cql_type_to_arrow_data_type(cql_type);
Some(Field::new(name, struct_type, nullable))
}
CqlType::Udt(_udt_name, udt_fields) => {
if udt_fields.is_empty() {
return Some(Field::new(name, ArrowDataType::Utf8, nullable));
}
let struct_type = Self::cql_type_to_arrow_data_type(cql_type);
Some(Field::new(name, struct_type, nullable))
}
_ => None,
}
}
fn cql_type_to_arrow_data_type(cql_type: &CqlType) -> ArrowDataType {
match cql_type {
CqlType::Boolean => ArrowDataType::Boolean,
CqlType::TinyInt => ArrowDataType::Int8,
CqlType::SmallInt => ArrowDataType::Int16,
CqlType::Int => ArrowDataType::Int32,
CqlType::BigInt => ArrowDataType::Int64,
CqlType::Counter => ArrowDataType::Int64,
CqlType::Float => ArrowDataType::Float32,
CqlType::Double => ArrowDataType::Float64,
CqlType::Text | CqlType::Ascii | CqlType::Varchar => ArrowDataType::Utf8,
CqlType::Blob => ArrowDataType::Binary,
CqlType::Timestamp => {
ArrowDataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into()))
}
CqlType::Date => ArrowDataType::Date32,
CqlType::Time => ArrowDataType::Time64(TimeUnit::Nanosecond),
CqlType::Decimal => {
ArrowDataType::Decimal128(DECIMAL_MAX_PRECISION, DECIMAL_FIXED_SCALE as i8)
}
CqlType::Varint => ArrowDataType::Decimal128(DECIMAL_MAX_PRECISION, 0),
CqlType::Duration => ArrowDataType::Utf8,
CqlType::Uuid | CqlType::TimeUuid => ArrowDataType::FixedSizeBinary(16),
CqlType::Inet => ArrowDataType::Utf8,
CqlType::List(inner) | CqlType::Set(inner) => {
let item_type = Self::cql_type_to_arrow_data_type(inner);
ArrowDataType::List(Arc::new(Field::new("item", item_type, true)))
}
CqlType::Frozen(inner) => Self::cql_type_to_arrow_data_type(inner),
CqlType::Map(key_type, val_type) => {
let key_arrow = Self::cql_type_to_arrow_data_type(key_type);
let val_arrow = Self::cql_type_to_arrow_data_type(val_type);
ArrowDataType::Map(
Arc::new(Field::new(
"entries",
ArrowDataType::Struct(Fields::from(vec![
Field::new("key", key_arrow, false),
Field::new("value", val_arrow, true),
])),
false,
)),
false,
)
}
CqlType::Tuple(element_types) => {
if element_types.is_empty() {
return ArrowDataType::Utf8;
}
let struct_fields: Vec<Field> = element_types
.iter()
.enumerate()
.map(|(i, t)| {
Field::new(
format!("field_{i}"),
Self::cql_type_to_arrow_data_type(t),
true, )
})
.collect();
ArrowDataType::Struct(Fields::from(struct_fields))
}
CqlType::Udt(_udt_name, udt_fields) => {
if udt_fields.is_empty() {
return ArrowDataType::Utf8;
}
let struct_fields: Vec<Field> = udt_fields
.iter()
.map(|(field_name, field_type)| {
Field::new(
field_name.as_str(),
Self::cql_type_to_arrow_data_type(field_type),
true, )
})
.collect();
ArrowDataType::Struct(Fields::from(struct_fields))
}
CqlType::Custom(_) => ArrowDataType::Utf8,
}
}
fn build_typed_value_array(
cql_type: &CqlType,
values: &[Option<&Value>],
) -> Result<ArrayRef, ParquetExportError> {
let effective_type = Self::unwrap_frozen_type(cql_type);
match effective_type {
CqlType::Boolean => {
let arr: Vec<Option<bool>> = values
.iter()
.filter_map(|opt| {
let v = Self::unwrap_frozen_value(*opt)?;
Some(match v {
Value::Boolean(b) => Some(*b),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(BooleanArray::from(arr)))
}
CqlType::TinyInt => {
let arr: Vec<Option<i8>> = values
.iter()
.filter_map(|opt| {
let v = Self::unwrap_frozen_value(*opt)?;
Some(match v {
Value::TinyInt(i) => Some(*i),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(Int8Array::from(arr)))
}
CqlType::SmallInt => {
let arr: Vec<Option<i16>> = values
.iter()
.filter_map(|opt| {
let v = Self::unwrap_frozen_value(*opt)?;
Some(match v {
Value::SmallInt(i) => Some(*i),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(Int16Array::from(arr)))
}
CqlType::Int => {
let arr: Vec<Option<i32>> = values
.iter()
.filter_map(|opt| {
let v = Self::unwrap_frozen_value(*opt)?;
Some(match v {
Value::Integer(i) => Some(*i),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(Int32Array::from(arr)))
}
CqlType::BigInt => {
let arr: Vec<Option<i64>> = values
.iter()
.filter_map(|opt| {
let v = Self::unwrap_frozen_value(*opt)?;
Some(match v {
Value::BigInt(i) => Some(*i),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(Int64Array::from(arr)))
}
CqlType::Counter => {
let arr: Vec<Option<i64>> = values
.iter()
.filter_map(|opt| {
let v = Self::unwrap_frozen_value(*opt)?;
Some(match v {
Value::Counter(c) => Some(*c),
Value::BigInt(i) => Some(*i),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(Int64Array::from(arr)))
}
CqlType::Float => {
let arr: Vec<Option<f32>> = values
.iter()
.filter_map(|opt| {
let v = Self::unwrap_frozen_value(*opt)?;
Some(match v {
Value::Float32(f) => Some(*f),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(Float32Array::from(arr)))
}
CqlType::Double => {
let arr: Vec<Option<f64>> = values
.iter()
.filter_map(|opt| {
let v = Self::unwrap_frozen_value(*opt)?;
Some(match v {
Value::Float(f) => Some(*f),
Value::Float32(f) => Some(*f as f64),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(Float64Array::from(arr)))
}
CqlType::Text | CqlType::Ascii | CqlType::Varchar => {
let arr: Vec<Option<String>> = values
.iter()
.filter_map(|opt| {
let v = Self::unwrap_frozen_value(*opt)?;
Some(match v {
Value::Text(s) => Some(s.clone()),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(StringArray::from(arr)))
}
CqlType::Blob => {
let byte_slices: Vec<Option<Vec<u8>>> = values
.iter()
.filter_map(|opt| {
let v = Self::unwrap_frozen_value(*opt)?;
Some(match v {
Value::Blob(b) => Some(b.clone()),
Value::Null => None,
_ => None,
})
})
.collect();
let refs: Vec<Option<&[u8]>> = byte_slices.iter().map(|o| o.as_deref()).collect();
Ok(Arc::new(BinaryArray::from(refs)))
}
CqlType::Timestamp => {
let arr: Vec<Option<i64>> = values
.iter()
.filter_map(|opt| {
let v = Self::unwrap_frozen_value(*opt)?;
Some(match v {
Value::Timestamp(ts) => Some(*ts),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(
TimestampMillisecondArray::from(arr).with_timezone("UTC"),
))
}
CqlType::Date => {
let arr: Vec<Option<i32>> = values
.iter()
.filter_map(|opt| {
let v = Self::unwrap_frozen_value(*opt)?;
Some(match v {
Value::Date(d) => Some(*d),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(Date32Array::from(arr)))
}
CqlType::Time => {
let arr: Vec<Option<i64>> = values
.iter()
.filter_map(|opt| {
let v = Self::unwrap_frozen_value(*opt)?;
Some(match v {
Value::Time(t) => Some(*t),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(Time64NanosecondArray::from(arr)))
}
CqlType::Decimal => {
let mut builder = arrow::array::Decimal128Builder::new()
.with_precision_and_scale(DECIMAL_MAX_PRECISION, DECIMAL_FIXED_SCALE as i8)?;
for opt in values {
let v = Self::unwrap_frozen_value(*opt);
match v {
Some(Value::Decimal { scale, unscaled }) => {
let rescaled = Self::rescale_decimal(*scale, unscaled)?;
builder.append_value(rescaled);
}
Some(Value::Null) | None => builder.append_null(),
Some(other) => {
return Err(ParquetExportError::InvalidValue(format!(
"expected Decimal value in element, got {:?}",
other
)));
}
}
}
Ok(Arc::new(builder.finish()))
}
CqlType::Varint => {
use num_bigint::BigInt;
let mut builder = arrow::array::Decimal128Builder::new()
.with_precision_and_scale(DECIMAL_MAX_PRECISION, 0)?;
for opt in values {
let v = Self::unwrap_frozen_value(*opt);
match v {
Some(Value::Varint(bytes)) => {
if bytes.is_empty() {
builder.append_value(0);
} else {
let bigint = BigInt::from_signed_bytes_be(bytes);
let max_abs = BigInt::from(10i64).pow(38u32) - BigInt::from(1i64);
let abs_val = if bigint.sign() == num_bigint::Sign::Minus {
-bigint.clone()
} else {
bigint.clone()
};
if abs_val > max_abs {
return Err(ParquetExportError::InvalidValue(
"varint element exceeds Decimal128(38, 0) range"
.to_string(),
));
}
let i128_val = bigint_to_i128(&bigint)?;
builder.append_value(i128_val);
}
}
Some(Value::Null) | None => builder.append_null(),
Some(other) => {
return Err(ParquetExportError::InvalidValue(format!(
"expected Varint value in element, got {:?}",
other
)));
}
}
}
Ok(Arc::new(builder.finish()))
}
CqlType::Duration => {
let arr: Vec<Option<String>> = values
.iter()
.filter_map(|opt| {
let v = Self::unwrap_frozen_value(*opt)?;
Some(match v {
Value::Duration { .. } => Some(ValueFormatter::format_value(v)),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(StringArray::from(arr)))
}
CqlType::Uuid | CqlType::TimeUuid => {
let mut builder = arrow::array::FixedSizeBinaryBuilder::new(16);
for opt in values {
let v = Self::unwrap_frozen_value(*opt);
match v {
Some(Value::Uuid(bytes)) => builder.append_value(bytes)?,
Some(Value::Null) | None => builder.append_null(),
Some(other) => {
return Err(ParquetExportError::InvalidValue(format!(
"expected Uuid value in element, got {:?}",
other
)));
}
}
}
Ok(Arc::new(builder.finish()))
}
CqlType::Inet => {
let arr: Vec<Option<String>> = values
.iter()
.filter_map(|opt| {
let v = Self::unwrap_frozen_value(*opt)?;
Some(match v {
Value::Inet(bytes) => {
Some(ValueFormatter::format_value(&Value::Inet(bytes.clone())))
}
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(StringArray::from(arr)))
}
CqlType::List(inner) | CqlType::Set(inner) => {
let element_type = Self::cql_type_to_arrow_data_type(inner);
let item_field = Arc::new(Field::new("item", element_type, true));
let mut offsets: Vec<i32> = vec![0];
let mut flat_elements: Vec<Option<&Value>> = Vec::new();
let mut null_bitmap: Vec<bool> = Vec::new();
for opt in values {
let v = Self::unwrap_frozen_value(*opt);
match v {
Some(Value::List(items)) | Some(Value::Set(items)) => {
null_bitmap.push(true);
for item in items {
flat_elements.push(Some(item));
}
offsets.push(flat_elements.len() as i32);
}
Some(Value::Null) | None => {
null_bitmap.push(false);
offsets.push(flat_elements.len() as i32);
}
Some(other) => {
null_bitmap.push(false);
offsets.push(flat_elements.len() as i32);
let _ = other; }
}
}
let elements_array = Self::build_typed_value_array(inner, &flat_elements)?;
let offset_buffer = OffsetBuffer::new(offsets.into());
let null_buffer = NullBuffer::from(null_bitmap);
Ok(Arc::new(ListArray::new(
item_field,
offset_buffer,
elements_array,
Some(null_buffer),
)))
}
CqlType::Frozen(inner) => Self::build_typed_value_array(inner, values),
CqlType::Map(key_type, val_type) => {
let key_arrow = Self::cql_type_to_arrow_data_type(key_type);
let val_arrow = Self::cql_type_to_arrow_data_type(val_type);
let mut offsets: Vec<i32> = vec![0];
let mut flat_keys: Vec<Option<&Value>> = Vec::new();
let mut flat_vals: Vec<Option<&Value>> = Vec::new();
let mut null_bitmap: Vec<bool> = Vec::new();
for opt in values {
let v = Self::unwrap_frozen_value(*opt);
match v {
Some(Value::Map(pairs)) => {
null_bitmap.push(true);
for (k, val) in pairs {
if matches!(k, Value::Null) {
return Err(ParquetExportError::InvalidValue(
"null key in map is not allowed in Arrow MapArray"
.to_string(),
));
}
flat_keys.push(Some(k));
flat_vals.push(Some(val));
}
offsets.push(flat_keys.len() as i32);
}
Some(Value::Null) | None => {
null_bitmap.push(false);
offsets.push(flat_keys.len() as i32);
}
Some(other) => {
null_bitmap.push(false);
offsets.push(flat_keys.len() as i32);
let _ = other;
}
}
}
let key_array = Self::build_typed_value_array(key_type, &flat_keys)?;
let val_array = Self::build_typed_value_array(val_type, &flat_vals)?;
let struct_fields = Fields::from(vec![
Field::new("key", key_arrow, false),
Field::new("value", val_arrow, true),
]);
let entries_array =
StructArray::new(struct_fields.clone(), vec![key_array, val_array], None);
let map_field = Arc::new(Field::new(
"entries",
ArrowDataType::Struct(struct_fields),
false,
));
let offset_buffer = OffsetBuffer::new(offsets.into());
let null_buffer = NullBuffer::from(null_bitmap);
Ok(Arc::new(MapArray::new(
map_field,
offset_buffer,
entries_array,
Some(null_buffer),
false,
)))
}
CqlType::Tuple(element_types) => {
if element_types.is_empty() {
let arr: Vec<Option<String>> = values
.iter()
.map(|opt| match opt {
Some(Value::Null) | None => None,
Some(v) => Some(ValueFormatter::format_value(v)),
})
.collect();
return Ok(Arc::new(StringArray::from(arr)));
}
let n_rows = values.len();
let n_fields = element_types.len();
let unwrapped: Vec<Option<&Value>> = values
.iter()
.map(|opt| Self::unwrap_frozen_value(*opt))
.collect();
let null_bitmap: Vec<bool> = unwrapped
.iter()
.map(|v| !matches!(v, Some(Value::Null) | None))
.collect();
let null_sentinel = Value::Null;
let mut child_arrays: Vec<ArrayRef> = Vec::with_capacity(n_fields);
for (field_idx, element_type) in element_types.iter().enumerate() {
let child_values: Vec<Option<&Value>> = (0..n_rows)
.map(|row_idx| {
match unwrapped[row_idx] {
Some(Value::Tuple(items)) => {
Some(
items
.get(field_idx)
.map(|v| v as &Value)
.unwrap_or(&null_sentinel),
)
}
_ => Some(&null_sentinel),
}
})
.collect();
let child_arr = Self::build_typed_value_array(element_type, &child_values)?;
child_arrays.push(child_arr);
}
let struct_fields: Fields = Fields::from(
element_types
.iter()
.enumerate()
.map(|(i, t)| {
Field::new(
format!("field_{i}"),
Self::cql_type_to_arrow_data_type(t),
true,
)
})
.collect::<Vec<_>>(),
);
let null_buffer = NullBuffer::from(null_bitmap);
Ok(Arc::new(StructArray::new(
struct_fields,
child_arrays,
Some(null_buffer),
)))
}
CqlType::Udt(_udt_name, udt_fields) => {
if udt_fields.is_empty() {
let arr: Vec<Option<String>> = values
.iter()
.map(|opt| match opt {
Some(Value::Null) | None => None,
Some(v) => Some(ValueFormatter::format_value(v)),
})
.collect();
return Ok(Arc::new(StringArray::from(arr)));
}
let n_rows = values.len();
let unwrapped: Vec<Option<&Value>> = values
.iter()
.map(|opt| Self::unwrap_frozen_value(*opt))
.collect();
let null_bitmap: Vec<bool> = unwrapped
.iter()
.map(|v| !matches!(v, Some(Value::Null) | None))
.collect();
let null_sentinel = Value::Null;
let mut child_arrays: Vec<ArrayRef> = Vec::with_capacity(udt_fields.len());
for (field_name, field_type) in udt_fields.iter() {
let child_values: Vec<Option<&Value>> = (0..n_rows)
.map(|row_idx| match unwrapped[row_idx] {
Some(Value::Udt(udt_val)) => {
Some(
udt_val
.fields
.iter()
.find(|f| &f.name == field_name)
.and_then(|f| f.value.as_ref().map(|v| v as &Value))
.unwrap_or(&null_sentinel),
)
}
_ => Some(&null_sentinel),
})
.collect();
let child_arr = Self::build_typed_value_array(field_type, &child_values)?;
child_arrays.push(child_arr);
}
let struct_fields: Fields = Fields::from(
udt_fields
.iter()
.map(|(field_name, field_type)| {
Field::new(
field_name.as_str(),
Self::cql_type_to_arrow_data_type(field_type),
true,
)
})
.collect::<Vec<_>>(),
);
let null_buffer = NullBuffer::from(null_bitmap);
Ok(Arc::new(StructArray::new(
struct_fields,
child_arrays,
Some(null_buffer),
)))
}
CqlType::Custom(_) => {
let arr: Vec<Option<String>> = values
.iter()
.map(|opt| match opt {
Some(Value::Null) | None => None,
Some(v) => Some(ValueFormatter::format_value(v)),
})
.collect();
Ok(Arc::new(StringArray::from(arr)))
}
}
}
fn unwrap_frozen_type(cql_type: &CqlType) -> &CqlType {
let mut t = cql_type;
while let CqlType::Frozen(inner) = t {
t = inner.as_ref();
}
t
}
fn unwrap_frozen_value(v: Option<&Value>) -> Option<&Value> {
match v {
Some(Value::Frozen(inner)) => Some(inner.as_ref()),
other => other,
}
}
fn data_type_to_arrow(data_type: &DataType) -> ArrowDataType {
match data_type {
DataType::Null => ArrowDataType::Null,
DataType::Boolean => ArrowDataType::Boolean,
DataType::TinyInt => ArrowDataType::Int8,
DataType::SmallInt => ArrowDataType::Int16,
DataType::Integer => ArrowDataType::Int32,
DataType::BigInt => ArrowDataType::Int64,
DataType::Float32 => ArrowDataType::Float32,
DataType::Float => ArrowDataType::Float64,
DataType::Text => ArrowDataType::Utf8,
DataType::Blob => ArrowDataType::Binary,
DataType::Timestamp => {
ArrowDataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into()))
}
DataType::Uuid => ArrowDataType::FixedSizeBinary(16),
DataType::Json => ArrowDataType::Utf8,
DataType::List => {
ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::Utf8, true)))
}
DataType::Set => {
ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::Utf8, true)))
}
DataType::Map => ArrowDataType::Map(
Arc::new(Field::new(
"entries",
ArrowDataType::Struct(Fields::from(vec![
Field::new("key", ArrowDataType::Utf8, false),
Field::new("value", ArrowDataType::Utf8, true),
])),
false,
)),
false,
),
DataType::Tuple => ArrowDataType::Utf8, DataType::Udt => ArrowDataType::Utf8, DataType::Frozen => ArrowDataType::Utf8,
DataType::Tombstone => ArrowDataType::Utf8,
}
}
fn convert_to_arrays(
columns: &[ColumnInfo],
rows: &[crate::query::QueryRow],
) -> Result<Vec<ArrayRef>, ParquetExportError> {
columns
.iter()
.map(|col| Self::convert_column_to_array(col, rows))
.collect()
}
fn convert_column_to_array(
col: &ColumnInfo,
rows: &[crate::query::QueryRow],
) -> Result<ArrayRef, ParquetExportError> {
if let Some(cql_type) = &col.cql_type {
let effective = Self::unwrap_frozen_type(cql_type);
match effective {
CqlType::Date => return Self::build_date32_array(col, rows),
CqlType::Time => return Self::build_time64_ns_array(col, rows),
CqlType::Decimal => return Self::build_decimal128_array(col, rows),
CqlType::Varint => return Self::build_varint_as_decimal128_array(col, rows),
CqlType::Duration => return Self::build_duration_utf8_array(col, rows),
CqlType::Uuid | CqlType::TimeUuid => {
return Self::build_uuid_fixed_binary_array(col, rows)
}
CqlType::Inet => return Self::build_inet_utf8_array(col, rows),
CqlType::Counter => return Self::build_int64_array(col, rows),
CqlType::List(_)
| CqlType::Set(_)
| CqlType::Map(_, _)
| CqlType::Tuple(_)
| CqlType::Udt(_, _) => {
let column_values: Vec<Option<&Value>> =
rows.iter().map(|row| row.values.get(&col.name)).collect();
return Self::build_typed_value_array(cql_type, &column_values);
}
_ => {}
}
}
match &col.data_type {
DataType::Boolean => Self::build_boolean_array(col, rows),
DataType::TinyInt => Self::build_int8_array(col, rows),
DataType::SmallInt => Self::build_int16_array(col, rows),
DataType::Integer => Self::build_int32_array(col, rows),
DataType::BigInt => Self::build_int64_array(col, rows),
DataType::Float32 => Self::build_float32_array(col, rows),
DataType::Float => Self::build_float64_array(col, rows),
DataType::Text | DataType::Json => Self::build_string_array(col, rows),
DataType::Blob => Self::build_binary_array(col, rows),
DataType::Timestamp => Self::build_timestamp_array(col, rows),
DataType::Uuid => Self::build_uuid_array(col, rows),
DataType::List | DataType::Set => Self::build_list_array(col, rows),
DataType::Map => Self::build_map_array(col, rows),
DataType::Tuple
| DataType::Udt
| DataType::Frozen
| DataType::Tombstone
| DataType::Null => {
Self::build_string_array(col, rows) }
}
}
fn build_boolean_array(
col: &ColumnInfo,
rows: &[crate::query::QueryRow],
) -> Result<ArrayRef, ParquetExportError> {
let values: Vec<Option<bool>> = rows
.iter()
.map(|row| {
row.values.get(&col.name).and_then(|v| match v {
Value::Boolean(b) => Some(*b),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(BooleanArray::from(values)))
}
fn build_int8_array(
col: &ColumnInfo,
rows: &[crate::query::QueryRow],
) -> Result<ArrayRef, ParquetExportError> {
let values: Vec<Option<i8>> = rows
.iter()
.map(|row| {
row.values.get(&col.name).and_then(|v| match v {
Value::TinyInt(i) => Some(*i),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(Int8Array::from(values)))
}
fn build_int16_array(
col: &ColumnInfo,
rows: &[crate::query::QueryRow],
) -> Result<ArrayRef, ParquetExportError> {
let values: Vec<Option<i16>> = rows
.iter()
.map(|row| {
row.values.get(&col.name).and_then(|v| match v {
Value::SmallInt(i) => Some(*i),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(Int16Array::from(values)))
}
fn build_int32_array(
col: &ColumnInfo,
rows: &[crate::query::QueryRow],
) -> Result<ArrayRef, ParquetExportError> {
let values: Vec<Option<i32>> = rows
.iter()
.map(|row| {
row.values.get(&col.name).and_then(|v| match v {
Value::Integer(i) => Some(*i),
Value::Date(d) => Some(*d), Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(Int32Array::from(values)))
}
fn build_int64_array(
col: &ColumnInfo,
rows: &[crate::query::QueryRow],
) -> Result<ArrayRef, ParquetExportError> {
let values: Vec<Option<i64>> = rows
.iter()
.map(|row| {
row.values.get(&col.name).and_then(|v| match v {
Value::BigInt(i) => Some(*i),
Value::Counter(c) => Some(*c),
Value::Time(t) => Some(*t), Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(Int64Array::from(values)))
}
fn build_float32_array(
col: &ColumnInfo,
rows: &[crate::query::QueryRow],
) -> Result<ArrayRef, ParquetExportError> {
let values: Vec<Option<f32>> = rows
.iter()
.map(|row| {
row.values.get(&col.name).and_then(|v| match v {
Value::Float32(f) => Some(*f),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(Float32Array::from(values)))
}
fn build_float64_array(
col: &ColumnInfo,
rows: &[crate::query::QueryRow],
) -> Result<ArrayRef, ParquetExportError> {
let values: Vec<Option<f64>> = rows
.iter()
.map(|row| {
row.values.get(&col.name).and_then(|v| match v {
Value::Float(f) => Some(*f),
Value::Float32(f) => Some(*f as f64),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(Float64Array::from(values)))
}
fn build_string_array(
col: &ColumnInfo,
rows: &[crate::query::QueryRow],
) -> Result<ArrayRef, ParquetExportError> {
let values: Vec<Option<String>> = rows
.iter()
.map(|row| {
row.values.get(&col.name).and_then(|v| match v {
Value::Null => None,
Value::Text(s) => Some(s.clone()),
Value::Json(j) => Some(j.to_string()),
other => Some(ValueFormatter::format_value(other)),
})
})
.collect();
Ok(Arc::new(StringArray::from(values)))
}
fn build_binary_array(
col: &ColumnInfo,
rows: &[crate::query::QueryRow],
) -> Result<ArrayRef, ParquetExportError> {
let values: Vec<Option<&[u8]>> = rows
.iter()
.map(|row| {
row.values.get(&col.name).and_then(|v| match v {
Value::Blob(b) => Some(b.as_slice()),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(BinaryArray::from(values)))
}
fn build_timestamp_array(
col: &ColumnInfo,
rows: &[crate::query::QueryRow],
) -> Result<ArrayRef, ParquetExportError> {
let values: Vec<Option<i64>> = rows
.iter()
.map(|row| {
row.values.get(&col.name).and_then(|v| match v {
Value::Timestamp(ts) => Some(*ts),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(
TimestampMillisecondArray::from(values).with_timezone("UTC"),
))
}
fn build_uuid_array(
col: &ColumnInfo,
rows: &[crate::query::QueryRow],
) -> Result<ArrayRef, ParquetExportError> {
let values: Vec<Option<[u8; 16]>> = rows
.iter()
.map(|row| {
row.values.get(&col.name).and_then(|v| match v {
Value::Uuid(uuid) => Some(*uuid),
Value::Null => None,
_ => None,
})
})
.collect();
let mut builder = arrow::array::FixedSizeBinaryBuilder::new(16);
for opt in values {
match opt {
Some(uuid) => builder.append_value(uuid)?,
None => builder.append_null(),
}
}
Ok(Arc::new(builder.finish()))
}
fn build_date32_array(
col: &ColumnInfo,
rows: &[crate::query::QueryRow],
) -> Result<ArrayRef, ParquetExportError> {
let values: Vec<Option<i32>> = rows
.iter()
.map(|row| {
row.values.get(&col.name).and_then(|v| match v {
Value::Date(days) => Some(*days),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(Date32Array::from(values)))
}
fn build_time64_ns_array(
col: &ColumnInfo,
rows: &[crate::query::QueryRow],
) -> Result<ArrayRef, ParquetExportError> {
let values: Vec<Option<i64>> = rows
.iter()
.map(|row| {
row.values.get(&col.name).and_then(|v| match v {
Value::Time(nanos) => Some(*nanos),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(Time64NanosecondArray::from(values)))
}
fn rescale_decimal(scale: i32, unscaled: &[u8]) -> Result<i128, ParquetExportError> {
use num_bigint::BigInt;
if unscaled.is_empty() {
return Ok(0i128);
}
let bigint = BigInt::from_signed_bytes_be(unscaled);
let delta = DECIMAL_FIXED_SCALE - scale;
let rescaled = if delta == 0 {
bigint
} else if delta > 0 {
let factor = BigInt::from(10i64).pow(delta as u32);
bigint * factor
} else {
let factor = BigInt::from(10i64).pow((-delta) as u32);
bigint / factor
};
let max_abs = BigInt::from(10i64).pow(38u32) - BigInt::from(1i64);
let abs_rescaled = if rescaled.sign() == num_bigint::Sign::Minus {
-rescaled.clone()
} else {
rescaled.clone()
};
if abs_rescaled > max_abs {
return Err(ParquetExportError::InvalidValue(format!(
"Decimal value exceeds Decimal128(38, {DECIMAL_FIXED_SCALE}) range after rescaling"
)));
}
bigint_to_i128(&rescaled)
}
fn build_decimal128_array(
col: &ColumnInfo,
rows: &[crate::query::QueryRow],
) -> Result<ArrayRef, ParquetExportError> {
let mut builder = arrow::array::Decimal128Builder::new()
.with_precision_and_scale(DECIMAL_MAX_PRECISION, DECIMAL_FIXED_SCALE as i8)?;
for row in rows {
match row.values.get(&col.name) {
Some(Value::Decimal { scale, unscaled }) => {
let rescaled = Self::rescale_decimal(*scale, unscaled).map_err(|e| {
ParquetExportError::InvalidValue(format!("Column '{}': {e}", col.name))
})?;
builder.append_value(rescaled);
}
Some(Value::Null) | None => {
builder.append_null();
}
Some(other) => {
return Err(ParquetExportError::InvalidValue(format!(
"Column '{}': expected Decimal value, got {:?}",
col.name, other
)));
}
}
}
Ok(Arc::new(builder.finish()))
}
fn build_varint_as_decimal128_array(
col: &ColumnInfo,
rows: &[crate::query::QueryRow],
) -> Result<ArrayRef, ParquetExportError> {
use num_bigint::BigInt;
let mut builder = arrow::array::Decimal128Builder::new()
.with_precision_and_scale(DECIMAL_MAX_PRECISION, 0)?;
for row in rows {
match row.values.get(&col.name) {
Some(Value::Varint(bytes)) => {
if bytes.is_empty() {
builder.append_value(0);
} else {
let bigint = BigInt::from_signed_bytes_be(bytes);
let max_abs = BigInt::from(10i64).pow(38u32) - BigInt::from(1i64);
let abs_val = if bigint.sign() == num_bigint::Sign::Minus {
-bigint.clone()
} else {
bigint.clone()
};
if abs_val > max_abs {
return Err(ParquetExportError::InvalidValue(format!(
"Column '{}': varint value exceeds Decimal128(38, 0) range",
col.name
)));
}
let i128_val = bigint_to_i128(&bigint).map_err(|e| {
ParquetExportError::InvalidValue(format!("Column '{}': {e}", col.name))
})?;
builder.append_value(i128_val);
}
}
Some(Value::Null) | None => {
builder.append_null();
}
Some(other) => {
return Err(ParquetExportError::InvalidValue(format!(
"Column '{}': expected Varint value, got {:?}",
col.name, other
)));
}
}
}
Ok(Arc::new(builder.finish()))
}
fn build_duration_utf8_array(
col: &ColumnInfo,
rows: &[crate::query::QueryRow],
) -> Result<ArrayRef, ParquetExportError> {
let values: Vec<Option<String>> = rows
.iter()
.map(|row| {
row.values.get(&col.name).and_then(|v| match v {
Value::Duration { .. } => Some(ValueFormatter::format_value(v)),
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(StringArray::from(values)))
}
fn build_uuid_fixed_binary_array(
col: &ColumnInfo,
rows: &[crate::query::QueryRow],
) -> Result<ArrayRef, ParquetExportError> {
let mut builder = arrow::array::FixedSizeBinaryBuilder::new(16);
for row in rows {
match row.values.get(&col.name) {
Some(Value::Uuid(bytes)) => builder.append_value(bytes)?,
Some(Value::Null) | None => builder.append_null(),
Some(other) => {
return Err(ParquetExportError::InvalidValue(format!(
"Column '{}': expected Uuid value, got {:?}",
col.name, other
)));
}
}
}
Ok(Arc::new(builder.finish()))
}
fn build_inet_utf8_array(
col: &ColumnInfo,
rows: &[crate::query::QueryRow],
) -> Result<ArrayRef, ParquetExportError> {
let values: Vec<Option<String>> = rows
.iter()
.map(|row| {
row.values.get(&col.name).and_then(|v| match v {
Value::Inet(bytes) => {
Some(ValueFormatter::format_value(&Value::Inet(bytes.clone())))
}
Value::Null => None,
_ => None,
})
})
.collect();
Ok(Arc::new(StringArray::from(values)))
}
fn build_list_array(
col: &ColumnInfo,
rows: &[crate::query::QueryRow],
) -> Result<ArrayRef, ParquetExportError> {
let mut offsets: Vec<i32> = vec![0];
let mut values: Vec<Option<String>> = Vec::new();
let mut null_bitmap: Vec<bool> = Vec::new();
for row in rows {
match row.values.get(&col.name) {
Some(Value::List(items)) | Some(Value::Set(items)) => {
null_bitmap.push(true);
for item in items {
values.push(Some(ValueFormatter::format_value(item)));
}
offsets.push(values.len() as i32);
}
Some(Value::Null) | None => {
null_bitmap.push(false);
offsets.push(values.len() as i32);
}
_ => {
null_bitmap.push(false);
offsets.push(values.len() as i32);
}
}
}
let values_array = Arc::new(StringArray::from(values)) as ArrayRef;
let field = Arc::new(Field::new("item", ArrowDataType::Utf8, true));
let offset_buffer = OffsetBuffer::new(offsets.into());
let null_buffer = NullBuffer::from(null_bitmap);
Ok(Arc::new(ListArray::new(
field,
offset_buffer,
values_array,
Some(null_buffer),
)))
}
fn build_map_array(
col: &ColumnInfo,
rows: &[crate::query::QueryRow],
) -> Result<ArrayRef, ParquetExportError> {
let mut offsets: Vec<i32> = vec![0];
let mut keys: Vec<Option<String>> = Vec::new();
let mut values: Vec<Option<String>> = Vec::new();
let mut null_bitmap: Vec<bool> = Vec::new();
for row in rows {
match row.values.get(&col.name) {
Some(Value::Map(pairs)) => {
null_bitmap.push(true);
for (k, v) in pairs {
keys.push(Some(ValueFormatter::format_value(k)));
values.push(Some(ValueFormatter::format_value(v)));
}
offsets.push(keys.len() as i32);
}
Some(Value::Null) | None => {
null_bitmap.push(false);
offsets.push(keys.len() as i32);
}
_ => {
null_bitmap.push(false);
offsets.push(keys.len() as i32);
}
}
}
let key_array = Arc::new(StringArray::from(keys)) as ArrayRef;
let value_array = Arc::new(StringArray::from(values)) as ArrayRef;
let struct_fields = Fields::from(vec![
Field::new("key", ArrowDataType::Utf8, false),
Field::new("value", ArrowDataType::Utf8, true),
]);
let entries_array =
StructArray::new(struct_fields.clone(), vec![key_array, value_array], None);
let map_field = Arc::new(Field::new(
"entries",
ArrowDataType::Struct(struct_fields),
false,
));
let offset_buffer = OffsetBuffer::new(offsets.into());
let null_buffer = NullBuffer::from(null_bitmap);
Ok(Arc::new(MapArray::new(
map_field,
offset_buffer,
entries_array,
Some(null_buffer),
false,
)))
}
fn write_parquet(
batch: &RecordBatch,
compression: ParquetCompression,
) -> Result<Vec<u8>, ParquetExportError> {
let mut buffer = Vec::new();
let props = WriterProperties::builder()
.set_compression(compression.to_parquet())
.build();
let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props))?;
writer.write(batch)?;
writer.close()?;
Ok(buffer)
}
}
pub struct StreamingParquetWriter<W: Write + Send> {
writer: Option<ArrowWriter<W>>,
schema: Arc<Schema>,
columns: Vec<ColumnInfo>,
row_buffer: Vec<QueryRow>,
row_group_size: usize,
rows_written: u64,
}
impl<W: Write + Send> StreamingParquetWriter<W> {
pub fn new(
output: W,
metadata: &QueryMetadata,
options: &ParquetExportOptions,
) -> Result<Self, ParquetExportError> {
if options.row_group_size == 0 {
return Err(ParquetExportError::InvalidOptions(
"row_group_size must be greater than 0".to_string(),
));
}
let schema = Arc::new(ParquetWriter::build_schema(&metadata.columns)?);
let props = WriterProperties::builder()
.set_compression(options.compression.to_parquet())
.set_max_row_group_size(options.row_group_size)
.build();
let arrow_writer = ArrowWriter::try_new(output, Arc::clone(&schema), Some(props))?;
Ok(Self {
writer: Some(arrow_writer),
schema,
columns: metadata.columns.clone(),
row_buffer: Vec::with_capacity(options.row_group_size),
row_group_size: options.row_group_size,
rows_written: 0,
})
}
pub fn write_chunk(&mut self, rows: &[QueryRow]) -> Result<usize, ParquetExportError> {
self.row_buffer.extend(rows.iter().cloned());
self.rows_written += rows.len() as u64;
let mut flushed = 0;
while self.row_buffer.len() >= self.row_group_size {
let chunk: Vec<QueryRow> = self.row_buffer.drain(..self.row_group_size).collect();
self.write_row_group(&chunk)?;
flushed += self.row_group_size;
}
Ok(flushed)
}
pub fn finalize(&mut self) -> Result<(), ParquetExportError> {
if !self.row_buffer.is_empty() {
let remaining = std::mem::take(&mut self.row_buffer);
self.write_row_group(&remaining)?;
}
if let Some(writer) = self.writer.take() {
writer.close()?;
}
Ok(())
}
pub fn rows_written(&self) -> u64 {
self.rows_written
}
fn write_row_group(&mut self, rows: &[QueryRow]) -> Result<(), ParquetExportError> {
let writer = self.writer.as_mut().ok_or_else(|| {
ParquetExportError::InvalidOptions(
"writer already finalized - cannot write more rows".to_string(),
)
})?;
let arrays = ParquetWriter::convert_to_arrays(&self.columns, rows)?;
let batch = RecordBatch::try_new(Arc::clone(&self.schema), arrays)?;
writer.write(&batch)?;
Ok(())
}
}
pub fn create_streaming_parquet_writer(
file: File,
metadata: &QueryMetadata,
options: &ParquetExportOptions,
) -> Result<StreamingParquetWriter<File>, ParquetExportError> {
StreamingParquetWriter::new(file, metadata, options)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::query::{ColumnInfo, QueryRow};
use crate::{RowKey, Value};
use arrow::array::{Array, FixedSizeBinaryArray};
use bytes::Bytes;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use std::collections::HashMap;
use std::error::Error as StdError;
fn default_options() -> ParquetExportOptions {
ParquetExportOptions::default()
}
fn read_parquet_back(bytes: &[u8]) -> Result<RecordBatch, Box<dyn StdError>> {
let bytes = Bytes::copy_from_slice(bytes);
let builder = ParquetRecordBatchReaderBuilder::try_new(bytes)?;
let mut reader = builder.build()?;
reader
.next()
.ok_or_else(|| "No batches in Parquet file".to_string())?
.map_err(|e| Box::new(e) as Box<dyn StdError>)
}
#[test]
fn test_empty_result() {
let result = QueryResult::new();
let bytes = ParquetWriter::write(&result, &default_options()).unwrap();
assert!(!bytes.is_empty());
assert_eq!(&bytes[0..4], b"PAR1");
}
#[test]
fn test_boolean_values() {
let mut result = QueryResult::new();
result.metadata.columns = vec![ColumnInfo::new(
"bool_col".to_string(),
DataType::Boolean,
true,
0,
)];
let mut values = HashMap::new();
values.insert("bool_col".to_string(), Value::Boolean(true));
let row = QueryRow::with_values(RowKey::new(vec![1]), values);
result.rows.push(row);
let bytes = ParquetWriter::write(&result, &default_options()).unwrap();
let batch = read_parquet_back(&bytes).unwrap();
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.num_columns(), 1);
}
#[test]
fn test_integer_types() {
let mut result = QueryResult::new();
result.metadata.columns = vec![
ColumnInfo::new("tiny".to_string(), DataType::TinyInt, false, 0),
ColumnInfo::new("small".to_string(), DataType::SmallInt, false, 1),
ColumnInfo::new("int".to_string(), DataType::Integer, false, 2),
ColumnInfo::new("big".to_string(), DataType::BigInt, false, 3),
];
let mut values = HashMap::new();
values.insert("tiny".to_string(), Value::TinyInt(127));
values.insert("small".to_string(), Value::SmallInt(32767));
values.insert("int".to_string(), Value::Integer(2147483647));
values.insert("big".to_string(), Value::BigInt(9223372036854775807));
let row = QueryRow::with_values(RowKey::new(vec![1]), values);
result.rows.push(row);
let bytes = ParquetWriter::write(&result, &default_options()).unwrap();
let batch = read_parquet_back(&bytes).unwrap();
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.num_columns(), 4);
}
#[test]
fn test_float_types() {
let mut result = QueryResult::new();
result.metadata.columns = vec![
ColumnInfo::new("f32".to_string(), DataType::Float32, false, 0),
ColumnInfo::new("f64".to_string(), DataType::Float, false, 1),
];
let mut values = HashMap::new();
values.insert("f32".to_string(), Value::Float32(3.5));
values.insert("f64".to_string(), Value::Float(2.75));
let row = QueryRow::with_values(RowKey::new(vec![1]), values);
result.rows.push(row);
let bytes = ParquetWriter::write(&result, &default_options()).unwrap();
let batch = read_parquet_back(&bytes).unwrap();
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.num_columns(), 2);
}
#[test]
fn test_text_values() {
let mut result = QueryResult::new();
result.metadata.columns = vec![ColumnInfo::new(
"text_col".to_string(),
DataType::Text,
false,
0,
)];
let mut values = HashMap::new();
values.insert(
"text_col".to_string(),
Value::Text("Hello, Parquet!".to_string()),
);
let row = QueryRow::with_values(RowKey::new(vec![1]), values);
result.rows.push(row);
let bytes = ParquetWriter::write(&result, &default_options()).unwrap();
let batch = read_parquet_back(&bytes).unwrap();
assert_eq!(batch.num_rows(), 1);
let col = batch.column(0);
let string_array = col.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(string_array.value(0), "Hello, Parquet!");
}
#[test]
fn test_blob_values() {
let mut result = QueryResult::new();
result.metadata.columns = vec![ColumnInfo::new(
"blob_col".to_string(),
DataType::Blob,
false,
0,
)];
let mut values = HashMap::new();
values.insert(
"blob_col".to_string(),
Value::Blob(vec![0xDE, 0xAD, 0xBE, 0xEF]),
);
let row = QueryRow::with_values(RowKey::new(vec![1]), values);
result.rows.push(row);
let bytes = ParquetWriter::write(&result, &default_options()).unwrap();
let batch = read_parquet_back(&bytes).unwrap();
assert_eq!(batch.num_rows(), 1);
let col = batch.column(0);
let binary_array = col.as_any().downcast_ref::<BinaryArray>().unwrap();
assert_eq!(binary_array.value(0), &[0xDE, 0xAD, 0xBE, 0xEF]);
}
#[test]
fn test_timestamp_values() {
let mut result = QueryResult::new();
result.metadata.columns = vec![ColumnInfo::new(
"ts_col".to_string(),
DataType::Timestamp,
false,
0,
)];
let mut values = HashMap::new();
values.insert("ts_col".to_string(), Value::Timestamp(1673778645123));
let row = QueryRow::with_values(RowKey::new(vec![1]), values);
result.rows.push(row);
let bytes = ParquetWriter::write(&result, &default_options()).unwrap();
let batch = read_parquet_back(&bytes).unwrap();
assert_eq!(batch.num_rows(), 1);
}
#[test]
fn test_uuid_values() {
let mut result = QueryResult::new();
result.metadata.columns = vec![ColumnInfo::new(
"uuid_col".to_string(),
DataType::Uuid,
false,
0,
)];
let uuid_bytes = [
0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf0, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66,
0x77, 0x88,
];
let mut values = HashMap::new();
values.insert("uuid_col".to_string(), Value::Uuid(uuid_bytes));
let row = QueryRow::with_values(RowKey::new(vec![1]), values);
result.rows.push(row);
let bytes = ParquetWriter::write(&result, &default_options()).unwrap();
let batch = read_parquet_back(&bytes).unwrap();
assert_eq!(batch.num_rows(), 1);
let col = batch.column(0);
let uuid_array = col.as_any().downcast_ref::<FixedSizeBinaryArray>().unwrap();
assert_eq!(uuid_array.value(0), uuid_bytes);
}
#[test]
fn test_null_values() {
let mut result = QueryResult::new();
result.metadata.columns = vec![ColumnInfo::new(
"nullable_col".to_string(),
DataType::Text,
true,
0,
)];
let mut values1 = HashMap::new();
values1.insert(
"nullable_col".to_string(),
Value::Text("present".to_string()),
);
result
.rows
.push(QueryRow::with_values(RowKey::new(vec![1]), values1));
let mut values2 = HashMap::new();
values2.insert("nullable_col".to_string(), Value::Null);
result
.rows
.push(QueryRow::with_values(RowKey::new(vec![2]), values2));
let bytes = ParquetWriter::write(&result, &default_options()).unwrap();
let batch = read_parquet_back(&bytes).unwrap();
assert_eq!(batch.num_rows(), 2);
let col = batch.column(0);
let string_array = col.as_any().downcast_ref::<StringArray>().unwrap();
assert!(string_array.is_valid(0));
assert!(!string_array.is_valid(1)); }
#[test]
fn test_list_values() {
let mut result = QueryResult::new();
result.metadata.columns = vec![ColumnInfo::new(
"list_col".to_string(),
DataType::List,
false,
0,
)];
let mut values = HashMap::new();
values.insert(
"list_col".to_string(),
Value::List(vec![
Value::Integer(1),
Value::Integer(2),
Value::Integer(3),
]),
);
let row = QueryRow::with_values(RowKey::new(vec![1]), values);
result.rows.push(row);
let bytes = ParquetWriter::write(&result, &default_options()).unwrap();
let batch = read_parquet_back(&bytes).unwrap();
assert_eq!(batch.num_rows(), 1);
}
#[test]
fn test_map_values() {
let mut result = QueryResult::new();
result.metadata.columns = vec![ColumnInfo::new(
"map_col".to_string(),
DataType::Map,
false,
0,
)];
let mut values = HashMap::new();
values.insert(
"map_col".to_string(),
Value::Map(vec![
(Value::Text("key1".to_string()), Value::Integer(1)),
(Value::Text("key2".to_string()), Value::Integer(2)),
]),
);
let row = QueryRow::with_values(RowKey::new(vec![1]), values);
result.rows.push(row);
let bytes = ParquetWriter::write(&result, &default_options()).unwrap();
let batch = read_parquet_back(&bytes).unwrap();
assert_eq!(batch.num_rows(), 1);
}
#[test]
fn test_config_limit() {
let mut result = QueryResult::new();
result.metadata.columns = vec![ColumnInfo::new(
"id".to_string(),
DataType::Integer,
false,
0,
)];
for i in 1..=10 {
let mut values = HashMap::new();
values.insert("id".to_string(), Value::Integer(i));
let row = QueryRow::with_values(RowKey::new(vec![i as u8]), values);
result.rows.push(row);
}
let options = ParquetExportOptions {
row_limit: Some(3),
..Default::default()
};
let bytes = ParquetWriter::write(&result, &options).unwrap();
let batch = read_parquet_back(&bytes).unwrap();
assert_eq!(
batch.num_rows(),
3,
"Limit should restrict output to 3 rows"
);
}
#[test]
fn test_multiple_rows() {
let mut result = QueryResult::new();
result.metadata.columns = vec![
ColumnInfo::new("id".to_string(), DataType::Integer, false, 0),
ColumnInfo::new("name".to_string(), DataType::Text, false, 1),
];
for i in 1..=5 {
let mut values = HashMap::new();
values.insert("id".to_string(), Value::Integer(i));
values.insert("name".to_string(), Value::Text(format!("row_{i}")));
let row = QueryRow::with_values(RowKey::new(vec![i as u8]), values);
result.rows.push(row);
}
let bytes = ParquetWriter::write(&result, &default_options()).unwrap();
let batch = read_parquet_back(&bytes).unwrap();
assert_eq!(batch.num_rows(), 5);
assert_eq!(batch.num_columns(), 2);
}
#[test]
fn test_counter_values() {
let mut result = QueryResult::new();
result.metadata.columns = vec![ColumnInfo::new(
"counter_col".to_string(),
DataType::BigInt, false,
0,
)];
let mut values = HashMap::new();
values.insert("counter_col".to_string(), Value::Counter(1000000));
let row = QueryRow::with_values(RowKey::new(vec![1]), values);
result.rows.push(row);
let bytes = ParquetWriter::write(&result, &default_options()).unwrap();
let batch = read_parquet_back(&bytes).unwrap();
assert_eq!(batch.num_rows(), 1);
}
#[test]
fn test_parquet_magic_bytes() {
let mut result = QueryResult::new();
result.metadata.columns = vec![ColumnInfo::new(
"col".to_string(),
DataType::Integer,
false,
0,
)];
let mut values = HashMap::new();
values.insert("col".to_string(), Value::Integer(42));
result
.rows
.push(QueryRow::with_values(RowKey::new(vec![1]), values));
let bytes = ParquetWriter::write(&result, &default_options()).unwrap();
assert_eq!(&bytes[0..4], b"PAR1", "Should start with PAR1 magic bytes");
assert_eq!(
&bytes[bytes.len() - 4..],
b"PAR1",
"Should end with PAR1 magic bytes"
);
}
}