use std::sync::Arc;
use arrow::array::{ArrayRef, Int64Array, TimestampMicrosecondArray};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
use crate::config::MetaColumns;
use crate::error::Result;
pub const COL_EXPORTED_AT: &str = "_rivet_exported_at";
pub const COL_ROW_HASH: &str = "_rivet_row_hash";
pub fn enrich_schema(schema: &SchemaRef, meta: &MetaColumns) -> SchemaRef {
if !meta.exported_at && !meta.row_hash {
return schema.clone();
}
let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
if meta.exported_at {
fields.push(Arc::new(Field::new(
COL_EXPORTED_AT,
DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
false,
)));
}
if meta.row_hash {
fields.push(Arc::new(Field::new(COL_ROW_HASH, DataType::Int64, false)));
}
Arc::new(Schema::new(fields))
}
pub fn enrich_batch(
batch: &RecordBatch,
meta: &MetaColumns,
enriched_schema: &SchemaRef,
exported_at_us: i64,
) -> Result<RecordBatch> {
if !meta.exported_at && !meta.row_hash {
return Ok(batch.clone());
}
let n = batch.num_rows();
let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
if meta.exported_at {
let ts_array =
TimestampMicrosecondArray::from(vec![Some(exported_at_us); n]).with_timezone("UTC");
columns.push(Arc::new(ts_array));
}
if meta.row_hash {
columns.push(Arc::new(hash_column(batch, n)));
}
Ok(RecordBatch::try_new(enriched_schema.clone(), columns)?)
}
fn hash_column(batch: &RecordBatch, n: usize) -> Int64Array {
use std::io::Write as IoWrite;
use xxhash_rust::xxh3::xxh3_128;
let options = arrow::util::display::FormatOptions::default();
let formatters: Vec<Option<arrow::util::display::ArrayFormatter>> = (0..batch.num_columns())
.map(|i| {
arrow::util::display::ArrayFormatter::try_new(batch.column(i).as_ref(), &options).ok()
})
.collect();
let mut buf = Vec::with_capacity(256);
let mut hashes = Vec::with_capacity(n);
for row in 0..n {
buf.clear();
for (col_idx, fmt_opt) in formatters.iter().enumerate() {
let array = batch.column(col_idx);
if array.is_null(row) {
buf.extend_from_slice(b"\x00");
} else if let Some(fmt) = fmt_opt {
let _ = write!(buf, "{}", fmt.value(row));
}
buf.push(b'\x1f');
}
let h = xxh3_128(&buf);
hashes.push(h as i64);
}
Int64Array::from(hashes)
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::StringArray;
use arrow::datatypes::Field;
fn sample_batch() -> (SchemaRef, RecordBatch) {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec![
Some("alice"),
None,
Some("charlie"),
])),
],
)
.unwrap();
(schema, batch)
}
#[test]
fn enrich_disabled_is_noop() {
let (schema, batch) = sample_batch();
let meta = MetaColumns {
exported_at: false,
row_hash: false,
};
let enriched_schema = enrich_schema(&schema, &meta);
assert_eq!(enriched_schema.fields().len(), 2);
let result = enrich_batch(&batch, &meta, &enriched_schema, 0).unwrap();
assert_eq!(result.num_columns(), 2);
}
#[test]
fn enrich_exported_at_only() {
let (schema, batch) = sample_batch();
let meta = MetaColumns {
exported_at: true,
row_hash: false,
};
let enriched_schema = enrich_schema(&schema, &meta);
assert_eq!(enriched_schema.fields().len(), 3);
assert_eq!(enriched_schema.field(2).name(), COL_EXPORTED_AT);
let ts = 1_711_612_800_000_000i64;
let result = enrich_batch(&batch, &meta, &enriched_schema, ts).unwrap();
assert_eq!(result.num_columns(), 3);
assert_eq!(result.num_rows(), 3);
let ts_col = result
.column(2)
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap();
assert_eq!(ts_col.value(0), ts);
assert_eq!(ts_col.value(2), ts);
}
#[test]
fn enrich_row_hash_only() {
let (schema, batch) = sample_batch();
let meta = MetaColumns {
exported_at: false,
row_hash: true,
};
let enriched_schema = enrich_schema(&schema, &meta);
assert_eq!(enriched_schema.field(2).name(), COL_ROW_HASH);
assert_eq!(*enriched_schema.field(2).data_type(), DataType::Int64);
let result = enrich_batch(&batch, &meta, &enriched_schema, 0).unwrap();
let hash_col = result
.column(2)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_ne!(hash_col.value(0), hash_col.value(1));
assert_ne!(hash_col.value(1), hash_col.value(2));
}
#[test]
fn enrich_both_columns() {
let (schema, batch) = sample_batch();
let meta = MetaColumns {
exported_at: true,
row_hash: true,
};
let enriched_schema = enrich_schema(&schema, &meta);
assert_eq!(enriched_schema.fields().len(), 4);
assert_eq!(enriched_schema.field(2).name(), COL_EXPORTED_AT);
assert_eq!(enriched_schema.field(3).name(), COL_ROW_HASH);
let result = enrich_batch(&batch, &meta, &enriched_schema, 123456).unwrap();
assert_eq!(result.num_columns(), 4);
assert_eq!(result.num_rows(), 3);
}
#[test]
fn hash_is_deterministic() {
let (schema, batch) = sample_batch();
let meta = MetaColumns {
exported_at: false,
row_hash: true,
};
let enriched_schema = enrich_schema(&schema, &meta);
let r1 = enrich_batch(&batch, &meta, &enriched_schema, 0).unwrap();
let r2 = enrich_batch(&batch, &meta, &enriched_schema, 0).unwrap();
let h1 = r1.column(2).as_any().downcast_ref::<Int64Array>().unwrap();
let h2 = r2.column(2).as_any().downcast_ref::<Int64Array>().unwrap();
for i in 0..3 {
assert_eq!(
h1.value(i),
h2.value(i),
"hash should be deterministic for row {i}"
);
}
}
#[test]
fn hash_distinguishes_null_from_empty() {
let schema = Arc::new(Schema::new(vec![Field::new("val", DataType::Utf8, true)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(vec![None, Some("")]))],
)
.unwrap();
let meta = MetaColumns {
exported_at: false,
row_hash: true,
};
let enriched_schema = enrich_schema(&schema, &meta);
let result = enrich_batch(&batch, &meta, &enriched_schema, 0).unwrap();
let hashes = result
.column(1)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_ne!(
hashes.value(0),
hashes.value(1),
"NULL and empty string should hash differently"
);
}
}