use std::sync::Arc;
use crate::types::{
schema::{ColumnType, TableSchema},
value::FieldValue,
MeruError, Result,
};
use arrow_array::{
ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array,
RecordBatch, StringArray, UInt64Array,
};
use arrow_schema::{DataType, Field, Schema};
use crate::sql::ChangeRecord;
pub fn change_feed_schema(table: &TableSchema) -> Arc<Schema> {
let mut fields: Vec<Field> = Vec::with_capacity(3 + table.columns.len());
fields.push(Field::new("seq", DataType::UInt64, false));
fields.push(Field::new("op", DataType::Utf8, false));
fields.push(Field::new("pk_bytes", DataType::Binary, false));
for col in &table.columns {
fields.push(Field::new(
&col.name,
column_type_to_arrow(&col.col_type),
true,
));
}
Arc::new(Schema::new(fields))
}
fn column_type_to_arrow(t: &ColumnType) -> DataType {
match t {
ColumnType::Boolean => DataType::Boolean,
ColumnType::Int32 => DataType::Int32,
ColumnType::Int64 => DataType::Int64,
ColumnType::Float => DataType::Float32,
ColumnType::Double => DataType::Float64,
ColumnType::ByteArray | ColumnType::FixedLenByteArray(_) => DataType::Binary,
}
}
pub fn records_to_record_batch(
records: &[ChangeRecord],
table: &TableSchema,
) -> Result<RecordBatch> {
let schema = change_feed_schema(table);
let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
let seqs: Vec<u64> = records.iter().map(|r| r.seq).collect();
columns.push(Arc::new(UInt64Array::from(seqs)));
let ops: Vec<&str> = records.iter().map(|r| r.op.as_sql_str()).collect();
columns.push(Arc::new(StringArray::from(ops)));
let pks: Vec<&[u8]> = records.iter().map(|r| r.pk_bytes.as_slice()).collect();
columns.push(Arc::new(BinaryArray::from(pks)));
for (col_idx, col) in table.columns.iter().enumerate() {
columns.push(build_user_column(&col.col_type, records, col_idx)?);
}
RecordBatch::try_new(schema, columns).map_err(|e| {
MeruError::InvalidArgument(format!("change-feed RecordBatch assembly failed: {e}"))
})
}
fn build_user_column(
col_type: &ColumnType,
records: &[ChangeRecord],
col_idx: usize,
) -> Result<ArrayRef> {
fn field_at(r: &ChangeRecord, col_idx: usize) -> Option<&FieldValue> {
r.row.fields.get(col_idx).and_then(|f| f.as_ref())
}
match col_type {
ColumnType::Boolean => Ok(Arc::new(BooleanArray::from(
records
.iter()
.map(|r| match field_at(r, col_idx) {
Some(FieldValue::Boolean(v)) => Some(*v),
_ => None,
})
.collect::<Vec<_>>(),
))),
ColumnType::Int32 => Ok(Arc::new(Int32Array::from(
records
.iter()
.map(|r| match field_at(r, col_idx) {
Some(FieldValue::Int32(v)) => Some(*v),
_ => None,
})
.collect::<Vec<_>>(),
))),
ColumnType::Int64 => Ok(Arc::new(Int64Array::from(
records
.iter()
.map(|r| match field_at(r, col_idx) {
Some(FieldValue::Int64(v)) => Some(*v),
_ => None,
})
.collect::<Vec<_>>(),
))),
ColumnType::Float => Ok(Arc::new(Float32Array::from(
records
.iter()
.map(|r| match field_at(r, col_idx) {
Some(FieldValue::Float(v)) => Some(*v),
_ => None,
})
.collect::<Vec<_>>(),
))),
ColumnType::Double => Ok(Arc::new(Float64Array::from(
records
.iter()
.map(|r| match field_at(r, col_idx) {
Some(FieldValue::Double(v)) => Some(*v),
_ => None,
})
.collect::<Vec<_>>(),
))),
ColumnType::ByteArray | ColumnType::FixedLenByteArray(_) => {
let owned: Vec<Option<bytes::Bytes>> = records
.iter()
.map(|r| match field_at(r, col_idx) {
Some(FieldValue::Bytes(b)) => Some(b.clone()),
_ => None,
})
.collect();
let refs: Vec<Option<&[u8]>> = owned.iter().map(|o| o.as_deref()).collect();
Ok(Arc::new(BinaryArray::from_opt_vec(refs)))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sql::ChangeOp;
use crate::types::{
schema::ColumnDef,
value::{FieldValue, Row},
};
use arrow_array::Array;
fn two_col_schema() -> TableSchema {
TableSchema {
table_name: "arrow-test".into(),
columns: vec![
ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
},
ColumnDef {
name: "v".into(),
col_type: ColumnType::Int64,
nullable: true,
..Default::default()
},
],
primary_key: vec![0],
..Default::default()
}
}
#[test]
fn schema_includes_meta_and_user_columns() {
let sch = change_feed_schema(&two_col_schema());
assert_eq!(sch.fields().len(), 5);
assert_eq!(sch.field(0).name(), "seq");
assert_eq!(sch.field(0).data_type(), &DataType::UInt64);
assert_eq!(sch.field(1).name(), "op");
assert_eq!(sch.field(2).name(), "pk_bytes");
assert_eq!(sch.field(3).name(), "id");
assert_eq!(sch.field(4).name(), "v");
}
#[test]
fn records_to_batch_populates_all_columns() {
let table = two_col_schema();
let records = vec![
ChangeRecord {
seq: 1,
op: ChangeOp::Insert,
row: Row::new(vec![
Some(FieldValue::Int64(1)),
Some(FieldValue::Int64(100)),
]),
pk_bytes: vec![0xAA, 0xBB],
},
ChangeRecord {
seq: 2,
op: ChangeOp::Update,
row: Row::new(vec![
Some(FieldValue::Int64(1)),
Some(FieldValue::Int64(200)),
]),
pk_bytes: vec![0xAA, 0xBB],
},
];
let batch = records_to_record_batch(&records, &table).unwrap();
assert_eq!(batch.num_rows(), 2);
let seqs = batch
.column(0)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
assert_eq!(seqs.value(0), 1);
assert_eq!(seqs.value(1), 2);
let ops = batch
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(ops.value(0), "INSERT");
assert_eq!(ops.value(1), "UPDATE");
let v = batch
.column(4)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(v.value(0), 100);
assert_eq!(v.value(1), 200);
}
#[test]
fn delete_with_empty_row_produces_null_user_columns() {
let table = two_col_schema();
let records = vec![ChangeRecord {
seq: 10,
op: ChangeOp::Delete,
row: Row::default(), pk_bytes: vec![0xFF],
}];
let batch = records_to_record_batch(&records, &table).unwrap();
assert_eq!(batch.num_rows(), 1);
let id = batch
.column(3)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert!(id.is_null(0));
let v = batch
.column(4)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert!(v.is_null(0));
let seqs = batch
.column(0)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
assert_eq!(seqs.value(0), 10);
let ops = batch
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(ops.value(0), "DELETE");
}
#[test]
fn empty_records_produces_schema_with_zero_rows() {
let batch = records_to_record_batch(&[], &two_col_schema()).unwrap();
assert_eq!(batch.num_rows(), 0);
assert_eq!(batch.num_columns(), 5);
}
}