use std::{collections::HashSet, fmt, sync::Arc};
use arrow_array::{Array, LargeStringArray, RecordBatch};
use arrow_schema::{DataType, Schema};
use parquet::basic::{Compression, ZstdLevel};
use roaring::RoaringBitmap;
pub use crate::superfile::vector::builder::VectorConfig;
use crate::superfile::{
BuildError, SuperfileReader,
format::{
self,
footer::{encode_parquet_body, splice_index_blobs},
kv,
},
fts::{
builder::FtsBuilder,
tokenize::{AsciiLowerTokenizer, Tokenizer},
},
stats::SuperfileStats,
vector::{builder::VectorBuilder, distance::Metric, reader::ColumnReader},
};
#[derive(Clone)]
pub struct FtsConfig {
pub column: String,
}
#[derive(Clone)]
pub struct BuilderOptions {
pub schema: Arc<Schema>,
pub id_column: String,
pub fts_columns: Vec<FtsConfig>,
pub vector_columns: Vec<VectorConfig>,
pub tokenizer: Option<Arc<dyn Tokenizer>>,
pub row_group_size: usize,
pub compression: Compression,
pub id_page_size_limit: usize,
}
pub const DEFAULT_ID_PAGE_SIZE_LIMIT: usize = 8 * 1024;
impl BuilderOptions {
pub fn new(
schema: Arc<Schema>,
id_column: impl Into<String>,
fts_columns: Vec<FtsConfig>,
vector_columns: Vec<VectorConfig>,
tokenizer: Option<Arc<dyn Tokenizer>>,
) -> Self {
Self {
schema,
id_column: id_column.into(),
fts_columns,
vector_columns,
tokenizer,
row_group_size: 65_536,
compression: Compression::ZSTD(
ZstdLevel::try_new(3).expect("zstd level 3 is in the valid 1..=22 range"),
),
id_page_size_limit: DEFAULT_ID_PAGE_SIZE_LIMIT,
}
}
pub fn new_from_reader(reader: &SuperfileReader) -> Self {
let tokenizer = Arc::new(AsciiLowerTokenizer);
let fts_columns = if let Some(fts) = &reader.fts() {
fts.fts_columns_config()
.map(|c| FtsConfig {
column: c.name.clone(),
})
.collect::<Vec<_>>()
} else {
Vec::new()
};
let vector_columns = if let Some(vec) = &reader.vec() {
vec.vector_columns_config()
.map(|v| {
VectorConfig::new(
v.name.clone(),
v.dim,
v.n_cent as usize,
v.rot_seed,
v.metric,
)
.with_rerank_codec(v.rerank_codec)
})
.collect::<Vec<_>>()
} else {
Vec::new()
};
BuilderOptions::new(
reader.schema().clone(),
reader.id_column(),
fts_columns,
vector_columns,
Some(tokenizer),
)
}
fn check_mergeability(
&self,
remote_id_col: &str,
remote_schema: &Arc<Schema>,
remote_fts_columns: Option<Vec<&str>>,
remote_vector_columns: Option<Vec<&ColumnReader>>,
) -> Result<bool, BuildError> {
if self.id_column != *remote_id_col {
return Err(BuildError::IdColumnMismatch(
self.id_column.clone(),
remote_id_col.to_string(),
));
}
if self.schema.fields() != remote_schema.fields() {
return Err(BuildError::SchemaMismatch {
mine: self.schema.to_string(),
other: remote_schema.to_string(),
});
}
if let Some(remote_fts_columns) = remote_fts_columns {
let self_fts_columns = &self.fts_columns;
if self_fts_columns.len() != remote_fts_columns.len() {
return Err(BuildError::FTSSchemaMismatch(format!(
"mismatched column len. self {} vs other {}",
self_fts_columns.len(),
remote_fts_columns.len()
)));
}
for (self_fts_column, remote_fts_column) in
self_fts_columns.iter().zip(remote_fts_columns.iter())
{
if self_fts_column.column != *remote_fts_column {
return Err(BuildError::FTSSchemaMismatch(format!(
"mismatched column name. self {} vs other {}",
self_fts_column.column, remote_fts_column
)));
}
}
}
if let Some(remote_vector_columns) = remote_vector_columns {
let self_vec_columns = &self.vector_columns;
if self_vec_columns.len() != remote_vector_columns.len() {
return Err(BuildError::VectorSchemaMismatch(format!(
"mismatched column len. self {} vs other {}",
self_vec_columns.len(),
remote_vector_columns.len()
)));
}
for (self_vec_column, remote_vector_column) in
self_vec_columns.iter().zip(remote_vector_columns.iter())
{
if self_vec_column.column != remote_vector_column.name {
return Err(BuildError::VectorSchemaMismatch(format!(
"mismatched column name. self {} vs other {}",
self_vec_column.column, remote_vector_column.name
)));
}
if self_vec_column.dim != remote_vector_column.dim {
return Err(BuildError::VectorSchemaMismatch(format!(
"mismatched column dim. self {} vs other {}",
self_vec_column.dim, remote_vector_column.dim
)));
}
}
}
Ok(true)
}
}
impl fmt::Debug for SuperfileBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SuperfileBuilder")
.field("id_column", &self.opts.id_column)
.field("n_fts_columns", &self.opts.fts_columns.len())
.field("n_vector_columns", &self.opts.vector_columns.len())
.field("n_batches", &self.batches.len())
.field("next_local_doc_id", &self.next_local_doc_id)
.finish()
}
}
pub struct SuperfileBuilder {
opts: BuilderOptions,
fts_col_idxs: Vec<usize>,
batches: Vec<RecordBatch>,
fts_builder: Option<FtsBuilder>,
vec_builder: Option<VectorBuilder>,
next_local_doc_id: u32,
}
impl SuperfileBuilder {
pub fn new(opts: BuilderOptions) -> Result<Self, BuildError> {
let id_idx = opts
.schema
.index_of(&opts.id_column)
.map_err(|_| BuildError::MissingIdColumn(opts.id_column.clone()))?;
let id_field = opts.schema.field(id_idx);
let expected = DataType::Decimal128(38, 0);
if id_field.data_type() != &expected {
return Err(BuildError::IdColumnWrongType(
opts.id_column.clone(),
format!("{:?}", id_field.data_type()),
));
}
let mut fts_col_idxs = Vec::with_capacity(opts.fts_columns.len());
for fc in &opts.fts_columns {
let idx = opts
.schema
.index_of(&fc.column)
.map_err(|_| BuildError::FtsColumnMissing(fc.column.clone()))?;
let f = opts.schema.field(idx);
if f.data_type() != &DataType::LargeUtf8 {
return Err(BuildError::FtsColumnMustBeLargeUtf8 {
column: fc.column.clone(),
actual: format!("{:?}", f.data_type()),
});
}
fts_col_idxs.push(idx);
}
let mut seen_logical: HashSet<&str> = HashSet::new();
for fc in &opts.fts_columns {
check_user_column_name(&fc.column)?;
if !seen_logical.insert(fc.column.as_str()) {
return Err(BuildError::DuplicateLogicalName(fc.column.clone()));
}
}
for vc in &opts.vector_columns {
check_user_column_name(&vc.column)?;
if !seen_logical.insert(vc.column.as_str()) {
return Err(BuildError::DuplicateLogicalName(vc.column.clone()));
}
if opts.schema.index_of(&vc.column).is_ok() {
return Err(BuildError::DuplicateLogicalName(vc.column.clone()));
}
}
if !opts.fts_columns.is_empty() && opts.tokenizer.is_none() {
return Err(BuildError::FtsColumnTypeInvalid {
column: opts.fts_columns[0].column.clone(),
actual: "missing tokenizer in BuilderOptions".to_string(),
});
}
let fts_builder = if opts.fts_columns.is_empty() {
None
} else {
let tk = opts
.tokenizer
.as_ref()
.expect("validated non-empty FTS implies Some tokenizer")
.clone();
let mut fb = FtsBuilder::new(tk);
for fc in &opts.fts_columns {
fb.register_column(fc.column.clone())?;
}
Some(fb)
};
let vec_builder = if opts.vector_columns.is_empty() {
None
} else {
let mut vb = VectorBuilder::new();
for vc in &opts.vector_columns {
vb.register_column(vc.clone())?;
}
Some(vb)
};
Ok(Self {
opts,
fts_col_idxs,
batches: Vec::new(),
fts_builder,
vec_builder,
next_local_doc_id: 0,
})
}
pub fn set_fts_spill_threshold_bytes(&mut self, threshold: usize) {
if let Some(fb) = self.fts_builder.as_mut() {
fb.set_spill_threshold_bytes(threshold);
}
}
pub fn add_batch(&mut self, batch: &RecordBatch, vectors: &[&[f32]]) -> Result<(), BuildError> {
if batch.schema().fields() != self.opts.schema.fields() {
return Err(BuildError::BatchSchemaMismatch);
}
if vectors.len() != self.opts.vector_columns.len() {
return Err(BuildError::VectorCountMismatch {
expected: self.opts.vector_columns.len(),
actual: vectors.len(),
});
}
let n_rows = batch.num_rows() as u32;
for (i, vc) in self.opts.vector_columns.iter().enumerate() {
let expected_total = (n_rows as usize) * vc.dim;
if vectors[i].len() != expected_total {
return Err(BuildError::VectorDimMismatch {
column: vc.column.clone(),
expected: expected_total,
actual: vectors[i].len(),
});
}
}
if let Some(fb) = self.fts_builder.as_mut() {
for (col_id, &schema_idx) in self.fts_col_idxs.iter().enumerate() {
let arr = batch.column(schema_idx);
let strs = arr
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("schema validated as LargeUtf8");
for row in 0..(n_rows as usize) {
let local_doc_id = self.next_local_doc_id + row as u32;
let text = if strs.is_null(row) {
""
} else {
strs.value(row)
};
fb.add_doc(col_id as u32, local_doc_id, text)?;
}
}
}
if let Some(vb) = self.vec_builder.as_mut() {
for (i, vc) in self.opts.vector_columns.iter().enumerate() {
let dim = vc.dim;
for row in 0..(n_rows as usize) {
let start = row * dim;
vb.add(i as u32, &vectors[i][start..start + dim])?;
}
}
}
self.next_local_doc_id += n_rows;
self.batches.push(batch.clone());
Ok(())
}
pub fn add_batch_from_reader(
&mut self,
reader: &SuperfileReader,
deleted_docs_bitmap: Option<Arc<RoaringBitmap>>,
) -> Result<SuperfileStats, BuildError> {
self.opts.check_mergeability(
reader.id_column(),
reader.schema(),
reader.fts().map(|f| f.fts_columns().collect::<Vec<_>>()),
reader
.vec()
.map(|v| v.vector_columns_config().collect::<Vec<_>>()),
)?;
let record_batch = reader
.get_record_batch(deleted_docs_bitmap.clone())
.map_err(|_| BuildError::BatchReadError)?;
let superfile_stats = SuperfileStats::try_compute_from_record_batch(&record_batch)?;
let num_rows = record_batch.num_rows();
let mut vectors: Vec<Vec<f32>> = Vec::new();
if let Some(v) = reader.vec() {
let reader_columns: Vec<_> = v.vector_columns_config().collect();
if reader_columns.len() != self.opts.vector_columns.len() {
return Err(BuildError::VectorDimMismatch {
column: format!(
"vector column count mismatch: expected {}, got {}",
self.opts.vector_columns.len(),
reader_columns.len()
),
expected: self.opts.vector_columns.len(),
actual: reader_columns.len(),
});
}
for (reader_col, builder_col) in reader_columns.iter().zip(&self.opts.vector_columns) {
if reader_col.name != builder_col.column || reader_col.dim != builder_col.dim {
return Err(BuildError::VectorDimMismatch {
column: reader_col.name.clone(),
expected: builder_col.dim,
actual: reader_col.dim,
});
}
let mut this_col_vectors = Vec::with_capacity(builder_col.dim * num_rows);
let result = v
.get_vectors_decoded(&reader_col.name)
.map_err(|_| BuildError::VectorReadError)?;
for (row_idx, single_row) in result.iter().enumerate() {
if let Some(ref bitmap) = deleted_docs_bitmap
&& bitmap.contains(row_idx as u32)
{
continue;
}
this_col_vectors.extend_from_slice(single_row.as_slice());
}
vectors.push(this_col_vectors);
}
}
let slices: Vec<&[f32]> = vectors.iter().map(|row| row.as_slice()).collect();
self.add_batch(&record_batch, &slices)?;
Ok(superfile_stats)
}
pub fn build_from_readers(
readers: &[(Arc<SuperfileReader>, Option<Arc<RoaringBitmap>>)],
) -> Result<(Vec<u8>, SuperfileStats), BuildError> {
let first = readers.first().ok_or(BuildError::BatchReadError)?;
let builder_opts = BuilderOptions::new_from_reader(&first.0);
let mut superfile_builder = SuperfileBuilder::new(builder_opts)?;
let mut stats_collector = Vec::with_capacity(readers.len());
for reader in readers {
let stats = superfile_builder.add_batch_from_reader(&reader.0, reader.1.clone())?;
stats_collector.push(stats);
}
let bytes = superfile_builder.finish()?;
let stats = SuperfileStats::from_children(stats_collector.as_slice());
Ok((bytes, stats))
}
pub fn finish(mut self) -> Result<Vec<u8>, BuildError> {
if self.next_local_doc_id == 0 {
return Ok(Vec::new());
}
let n_docs = self.next_local_doc_id as u64;
let fts_builder = self.fts_builder.take();
let vec_builder = self.vec_builder.take();
let mut kvs: Vec<(String, String)> = vec![
(kv::FORMAT.into(), kv::FORMAT_VALUE.into()),
(kv::FORMAT_VERSION.into(), format::FORMAT_VERSION.into()),
(kv::ID_COLUMN.into(), self.opts.id_column.clone()),
(kv::N_DOCS.into(), n_docs.to_string()),
(kv::BUILDER.into(), crate::BUILDER_ID.to_string()),
];
if !self.opts.fts_columns.is_empty() {
kvs.push((
kv::FTS_COLUMNS.into(),
fts_columns_json(&self.opts.fts_columns),
));
}
if !self.opts.vector_columns.is_empty() {
kvs.push((
kv::VEC_COLUMNS.into(),
vec_columns_json(&self.opts.vector_columns),
));
}
let id_page_limit = [(self.opts.id_column.as_str(), self.opts.id_page_size_limit)];
let encode_body = || {
encode_parquet_body(
&self.opts.schema,
&self.batches,
self.opts.compression,
self.opts.row_group_size,
&id_page_limit,
)
};
let (body, fts_blob, vec_blob) = if vec_builder.is_some() {
let (fts_blob, vec_blob) = finish_index_blobs(fts_builder, vec_builder)?;
let body = encode_body()?;
(body, fts_blob, vec_blob)
} else {
let (body_res, blobs_res) =
rayon::join(encode_body, || finish_index_blobs(fts_builder, vec_builder));
let body = body_res?;
let (fts_blob, vec_blob) = blobs_res?;
(body, fts_blob, vec_blob)
};
let parts = splice_index_blobs(body, &fts_blob, &vec_blob, &kvs)?;
Ok(parts.bytes)
}
}
fn finish_index_blobs(
fts_builder: Option<FtsBuilder>,
vec_builder: Option<VectorBuilder>,
) -> Result<(Vec<u8>, Vec<u8>), BuildError> {
match (fts_builder, vec_builder) {
(Some(fb), Some(vb)) => {
let (fts, vec) = rayon::join(|| fb.finish(), || vb.finish());
Ok((fts?, vec?))
}
(Some(fb), None) => Ok((fb.finish()?, Vec::new())),
(None, Some(vb)) => Ok((Vec::new(), vb.finish()?)),
(None, None) => Ok((Vec::new(), Vec::new())),
}
}
fn check_user_column_name(name: &str) -> Result<(), BuildError> {
if name.as_bytes().contains(&format::FST_SEPARATOR) {
return Err(BuildError::ReservedSeparatorInColumnName(name.to_string()));
}
if name.starts_with(format::RESERVED_PREFIX) {
return Err(BuildError::ReservedPrefixInColumnName(name.to_string()));
}
Ok(())
}
fn fts_columns_json(cols: &[FtsConfig]) -> String {
let mut s = String::from("[");
for (i, c) in cols.iter().enumerate() {
if i > 0 {
s.push(',');
}
s.push_str(r#"{"name":""#);
s.push_str(&escape_json(&c.column));
s.push_str(r#"","tokenizer":"ascii_lower"}"#);
}
s.push(']');
s
}
fn vec_columns_json(cols: &[VectorConfig]) -> String {
let mut s = String::from("[");
for (i, c) in cols.iter().enumerate() {
if i > 0 {
s.push(',');
}
s.push_str(r#"{"column":""#);
s.push_str(&escape_json(&c.column));
s.push_str(r#"","dim":"#);
s.push_str(&c.dim.to_string());
s.push_str(r#","n_cent":"#);
s.push_str(&c.n_cent.to_string());
s.push_str(r#","rot_seed":"#);
s.push_str(&c.rot_seed.to_string());
s.push_str(r#","metric":""#);
s.push_str(metric_str(c.metric));
s.push_str("\"}");
}
s.push(']');
s
}
fn metric_str(m: Metric) -> &'static str {
match m {
Metric::L2Sq => "l2sq",
Metric::Cosine => "cosine",
Metric::NegDot => "negdot",
}
}
fn escape_json(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for c in s.chars() {
match c {
'"' => out.push_str("\\\""),
'\\' => out.push_str("\\\\"),
'\n' => out.push_str("\\n"),
'\r' => out.push_str("\\r"),
'\t' => out.push_str("\\t"),
c if (c as u32) < 0x20 => out.push_str(&format!("\\u{:04x}", c as u32)),
c => out.push(c),
}
}
out
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow_array::{Decimal128Array, LargeStringArray, UInt64Array};
use arrow_schema::Field;
use bytes::Bytes;
use roaring::RoaringBitmap;
use super::*;
use crate::{
superfile::{
format::footer::read_kv_metadata, fts::reader::BoolMode,
vector::rerank_codec::RerankCodec,
},
test_helpers::{decimal128_ids, default_tokenizer, default_vector_config},
};
fn schema_with_fts() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("doc_id", DataType::Decimal128(38, 0), false),
Field::new("title", DataType::LargeUtf8, false),
Field::new("body", DataType::LargeUtf8, false),
]))
}
fn opts_minimal() -> BuilderOptions {
BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(default_tokenizer()),
)
}
#[test]
fn new_rejects_missing_id_column() {
let mut opts = opts_minimal();
opts.id_column = "nope".into();
let err = SuperfileBuilder::new(opts).expect_err("expected error");
assert!(matches!(err, BuildError::MissingIdColumn(_)));
}
#[test]
fn new_rejects_id_column_not_decimal128_38_0() {
let cases = [
DataType::UInt64,
DataType::Int64,
DataType::Decimal128(38, 1),
DataType::Decimal128(37, 0),
];
for ty in cases {
let schema = Arc::new(Schema::new(vec![
Field::new("doc_id", ty.clone(), false),
Field::new("title", DataType::LargeUtf8, false),
]));
let opts = BuilderOptions::new(
schema,
"doc_id",
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(default_tokenizer()),
);
let err =
SuperfileBuilder::new(opts).expect_err(&format!("expected rejection for {ty:?}"));
assert!(
matches!(err, BuildError::IdColumnWrongType(_, _)),
"wrong error variant for {ty:?}: {err:?}",
);
}
}
#[test]
fn new_rejects_fts_column_missing_from_schema() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![FtsConfig {
column: "nope".into(),
}],
vec![],
Some(default_tokenizer()),
);
let err = SuperfileBuilder::new(opts).expect_err("expected error");
assert!(matches!(err, BuildError::FtsColumnMissing(_)));
}
#[test]
fn new_rejects_fts_column_wrong_type() {
let schema = Arc::new(Schema::new(vec![
Field::new("doc_id", DataType::Decimal128(38, 0), false),
Field::new("title", DataType::Utf8, false),
]));
let opts = BuilderOptions::new(
schema,
"doc_id",
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(default_tokenizer()),
);
let err = SuperfileBuilder::new(opts).expect_err("expected error");
assert!(matches!(err, BuildError::FtsColumnMustBeLargeUtf8 { .. }));
}
#[test]
fn new_rejects_duplicate_logical_name_across_fts_and_vector() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![FtsConfig {
column: "title".into(),
}],
vec![default_vector_config("title", 1)],
Some(default_tokenizer()),
);
let err = SuperfileBuilder::new(opts).expect_err("expected error");
assert!(matches!(err, BuildError::DuplicateLogicalName(_)));
}
#[test]
fn new_rejects_vector_column_collides_with_schema() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![],
vec![default_vector_config("body", 1)], None,
);
let err = SuperfileBuilder::new(opts).expect_err("expected error");
assert!(matches!(err, BuildError::DuplicateLogicalName(_)));
}
#[test]
fn new_rejects_reserved_prefix_in_logical_name() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![],
vec![default_vector_config("inf.bad", 1)],
None,
);
let err = SuperfileBuilder::new(opts).expect_err("expected error");
assert!(matches!(err, BuildError::ReservedPrefixInColumnName(_)));
}
#[test]
fn new_with_fts_requires_tokenizer() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![FtsConfig {
column: "title".into(),
}],
vec![],
None,
);
let err = SuperfileBuilder::new(opts).expect_err("expected error");
assert!(matches!(err, BuildError::FtsColumnTypeInvalid { .. }));
}
fn batch_two_rows(schema: &Arc<Schema>) -> RecordBatch {
let ids = decimal128_ids(vec![10u64, 11]);
let title = LargeStringArray::from(vec!["hello world", "rust async"]);
let body = LargeStringArray::from(vec!["foo bar", "baz quux"]);
RecordBatch::try_new(
schema.clone(),
vec![Arc::new(ids), Arc::new(title), Arc::new(body)],
)
.expect("build RecordBatch")
}
#[test]
fn add_batch_increments_next_local_doc_id() {
let mut b = SuperfileBuilder::new(opts_minimal()).expect("new SuperfileBuilder");
let schema = b.opts.schema.clone();
let batch = batch_two_rows(&schema);
b.add_batch(&batch, &[]).expect("add_batch");
assert_eq!(b.next_local_doc_id, 2);
b.add_batch(&batch, &[]).expect("add_batch");
assert_eq!(b.next_local_doc_id, 4);
}
#[test]
fn add_batch_rejects_schema_mismatch() {
let mut b = SuperfileBuilder::new(opts_minimal()).expect("new SuperfileBuilder");
let other = Arc::new(Schema::new(vec![Field::new(
"doc_id",
DataType::UInt64,
false,
)]));
let bad = RecordBatch::try_new(other, vec![Arc::new(UInt64Array::from(vec![1u64]))])
.expect("build RecordBatch");
let err = b.add_batch(&bad, &[]).expect_err("expected error");
assert!(matches!(err, BuildError::BatchSchemaMismatch));
}
#[test]
fn add_batch_rejects_wrong_vector_count() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![],
vec![default_vector_config("emb", 1)],
None,
);
let mut b = SuperfileBuilder::new(opts).expect("new SuperfileBuilder");
let schema = b.opts.schema.clone();
let batch = batch_two_rows(&schema);
let err = b.add_batch(&batch, &[]).expect_err("expected error");
assert!(matches!(err, BuildError::VectorCountMismatch { .. }));
}
#[test]
fn add_batch_rejects_wrong_vector_dim() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![],
vec![default_vector_config("emb", 1)],
None,
);
let mut b = SuperfileBuilder::new(opts).expect("new SuperfileBuilder");
let schema = b.opts.schema.clone();
let batch = batch_two_rows(&schema);
let bad: Vec<f32> = vec![0.0; 30];
let err = b
.add_batch(&batch, &[bad.as_slice()])
.expect_err("expected error");
assert!(matches!(err, BuildError::VectorDimMismatch { .. }));
}
#[test]
fn finish_with_no_indexes_produces_valid_parquet() {
let schema = Arc::new(Schema::new(vec![
Field::new("doc_id", DataType::Decimal128(38, 0), false),
Field::new("title", DataType::LargeUtf8, false),
]));
let opts = BuilderOptions::new(schema.clone(), "doc_id", vec![], vec![], None);
let mut b = SuperfileBuilder::new(opts).expect("new SuperfileBuilder");
let ids = decimal128_ids(vec![1u64, 2, 3]);
let titles = LargeStringArray::from(vec!["a", "b", "c"]);
let batch = RecordBatch::try_new(schema, vec![Arc::new(ids), Arc::new(titles)])
.expect("build RecordBatch");
b.add_batch(&batch, &[]).expect("add_batch");
let bytes = b.finish().expect("finish builder");
assert_eq!(&bytes[..4], b"PAR1");
assert_eq!(&bytes[bytes.len() - 4..], b"PAR1");
}
#[test]
fn finish_emits_required_kv_pointers_for_fts() {
let mut b = SuperfileBuilder::new(opts_minimal()).expect("new SuperfileBuilder");
let schema = b.opts.schema.clone();
let batch = batch_two_rows(&schema);
b.add_batch(&batch, &[]).expect("add_batch");
let bytes = b.finish().expect("finish builder");
let kv = read_kv_metadata(&bytes).expect("read kv metadata");
assert_eq!(
kv.get("inf.format").map(String::as_str),
Some("infino-superfile")
);
assert_eq!(kv.get("inf.id_column").map(String::as_str), Some("doc_id"));
assert_eq!(kv.get("inf.n_docs").map(String::as_str), Some("2"));
assert!(kv.contains_key("inf.fts.offset"));
assert!(kv.contains_key("inf.fts.length"));
assert!(kv.contains_key("inf.fts.columns"));
assert!(!kv.contains_key("inf.vec.offset"));
}
#[test]
fn finish_emits_kv_pointers_for_vectors() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![],
vec![default_vector_config("emb", 7)],
None,
);
let mut b = SuperfileBuilder::new(opts).expect("new SuperfileBuilder");
let schema = b.opts.schema.clone();
let batch = batch_two_rows(&schema);
let mut v: Vec<f32> = vec![0.0; 32];
v[0] = 1.0;
v[16 + 1] = 1.0;
b.add_batch(&batch, &[v.as_slice()]).expect("add_batch");
let bytes = b.finish().expect("finish builder");
let kv = read_kv_metadata(&bytes).expect("read kv metadata");
assert!(kv.contains_key("inf.vec.offset"));
assert!(kv.contains_key("inf.vec.length"));
assert!(kv.contains_key("inf.vec.columns"));
assert!(!kv.contains_key("inf.fts.offset"));
}
#[test]
fn fts_columns_json_round_trip_shape() {
let cols = vec![
FtsConfig {
column: "title".into(),
},
FtsConfig {
column: "body".into(),
},
];
let s = fts_columns_json(&cols);
assert!(s.starts_with('['));
assert!(s.contains(r#""name":"title""#));
assert!(s.contains(r#""name":"body""#));
assert!(s.contains(r#""tokenizer":"ascii_lower""#));
}
#[test]
fn vec_columns_json_round_trip_shape() {
let cols = vec![VectorConfig {
column: "emb".into(),
dim: 384,
n_cent: 64,
rot_seed: 99,
metric: Metric::L2Sq,
rerank_codec: RerankCodec::Fp32,
}];
let s = vec_columns_json(&cols);
assert!(s.contains(r#""column":"emb""#));
assert!(s.contains(r#""dim":384"#));
assert!(s.contains(r#""n_cent":64"#));
assert!(s.contains(r#""rot_seed":99"#));
assert!(s.contains(r#""metric":"l2sq""#));
}
#[test]
fn escape_json_handles_control_chars() {
assert_eq!(escape_json(r#"a"b"#), r#"a\"b"#);
assert_eq!(escape_json("a\\b"), "a\\\\b");
assert_eq!(escape_json("a\nb"), "a\\nb");
assert_eq!(escape_json("a\x01b"), "a\\u0001b");
}
#[test]
fn add_batch_from_reader_on_empty_builder_produces_identical_superfile() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![FtsConfig {
column: "title".into(),
}],
vec![default_vector_config("emb", 7)],
Some(default_tokenizer()),
);
let mut b1 = SuperfileBuilder::new(opts.clone()).expect("new SuperfileBuilder");
let schema = b1.opts.schema.clone();
let batch = batch_two_rows(&schema);
let mut v: Vec<f32> = vec![0.0; 32]; v[0] = 1.0;
v[16 + 1] = 1.0;
b1.add_batch(&batch, &[v.as_slice()]).expect("add_batch");
let original_bytes = b1.finish().expect("finish builder");
let reader = SuperfileReader::open(Bytes::from(original_bytes.clone()))
.expect("open superfile reader");
let mut b2 = SuperfileBuilder::new(opts).expect("new SuperfileBuilder");
let stats = b2
.add_batch_from_reader(&reader, None)
.expect("add_batch_from_reader");
let merged_bytes = b2.finish().expect("finish builder");
assert_eq!(stats.n_docs, 2, "stats should report 2 documents");
assert_eq!(stats.id_min, 10, "id_min should be 10");
assert_eq!(stats.id_max, 11, "id_max should be 11");
assert!(
!stats.scalar_stats.is_empty(),
"scalar_stats should have column entries"
);
assert!(
stats.scalar_stats.contains_key("doc_id"),
"scalar_stats should contain id_column"
);
assert!(
stats.scalar_stats.contains_key("title"),
"scalar_stats should contain FTS column"
);
assert!(
stats.scalar_stats.contains_key("body"),
"scalar_stats should contain body column"
);
let id_agg = stats
.scalar_stats
.get("doc_id")
.expect("doc_id should have stats");
let (id_min_arr, id_max_arr) = (&id_agg.min, &id_agg.max);
let id_min = id_min_arr
.as_any()
.downcast_ref::<Decimal128Array>()
.expect("id min should be Decimal128")
.value(0);
let id_max = id_max_arr
.as_any()
.downcast_ref::<Decimal128Array>()
.expect("id max should be Decimal128")
.value(0);
assert_eq!(id_min, 10i128, "doc_id min should be 10");
assert_eq!(id_max, 11i128, "doc_id max should be 11");
let title_agg = stats
.scalar_stats
.get("title")
.expect("title should have stats");
let (title_min_arr, title_max_arr) = (&title_agg.min, &title_agg.max);
let title_min = title_min_arr
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("title min should be LargeUtf8")
.value(0);
let title_max = title_max_arr
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("title max should be LargeUtf8")
.value(0);
assert_eq!(
title_min, "hello world",
"title min should be 'hello world'"
);
assert_eq!(title_max, "rust async", "title max should be 'rust async'");
let body_agg = stats
.scalar_stats
.get("body")
.expect("body should have stats");
let (body_min_arr, body_max_arr) = (&body_agg.min, &body_agg.max);
let body_min = body_min_arr
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("body min should be LargeUtf8")
.value(0);
let body_max = body_max_arr
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("body max should be LargeUtf8")
.value(0);
assert_eq!(body_min, "baz quux", "body min should be 'baz quux'");
assert_eq!(body_max, "foo bar", "body max should be 'foo bar'");
assert_eq!(
original_bytes, merged_bytes,
"superfile created from reader should be identical to original"
);
}
#[test]
fn add_batch_from_reader_adds_parquet_data_correctly() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(default_tokenizer()),
);
let mut b1 = SuperfileBuilder::new(opts.clone()).expect("new SuperfileBuilder");
let schema = b1.opts.schema.clone();
let batch = batch_two_rows(&schema);
b1.add_batch(&batch, &[]).expect("add_batch");
let bytes = b1.finish().expect("finish builder");
let reader = SuperfileReader::open(Bytes::from(bytes)).expect("open superfile reader");
let reader_batch = reader
.get_record_batch(None)
.expect("get_record_batch from reader");
assert_eq!(reader_batch.num_rows(), 2);
let mut b2 = SuperfileBuilder::new(opts).expect("new SuperfileBuilder");
let stats = b2
.add_batch_from_reader(&reader, None)
.expect("add_batch_from_reader");
assert_eq!(stats.n_docs, 2, "stats should report 2 documents");
assert_eq!(stats.id_min, 10, "id_min should be 10");
assert_eq!(stats.id_max, 11, "id_max should be 11");
assert!(
!stats.scalar_stats.is_empty(),
"scalar_stats should have column entries"
);
let merged_bytes = b2.finish().expect("finish builder");
let reader2 =
SuperfileReader::open(Bytes::from(merged_bytes)).expect("open merged superfile reader");
let merged_batch = reader2
.get_record_batch(None)
.expect("get_record_batch from merged reader");
assert_eq!(merged_batch.num_rows(), 2);
}
#[test]
fn add_batch_from_reader_adds_vectors_correctly() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![],
vec![default_vector_config("emb", 7)],
None,
);
let mut b1 = SuperfileBuilder::new(opts.clone()).expect("new SuperfileBuilder");
let schema = b1.opts.schema.clone();
let batch = batch_two_rows(&schema);
let mut v: Vec<f32> = vec![0.0; 32]; v[0] = 1.0;
v[16 + 1] = 1.0;
b1.add_batch(&batch, &[v.as_slice()]).expect("add_batch");
let bytes = b1.finish().expect("finish builder");
let reader = SuperfileReader::open(Bytes::from(bytes)).expect("open superfile reader");
let vectors_before = reader
.vec()
.expect("get vector reader")
.get_vectors_fp32("emb")
.expect("get vectors fp32");
let mut b2 = SuperfileBuilder::new(opts).expect("new SuperfileBuilder");
let stats = b2
.add_batch_from_reader(&reader, None)
.expect("add_batch_from_reader");
assert_eq!(stats.n_docs, 2, "stats should report 2 documents");
assert_eq!(stats.id_min, 10, "id_min should be 10");
assert_eq!(stats.id_max, 11, "id_max should be 11");
assert!(
!stats.scalar_stats.is_empty(),
"scalar_stats should have column entries"
);
let merged_bytes = b2.finish().expect("finish builder");
let reader2 =
SuperfileReader::open(Bytes::from(merged_bytes)).expect("open merged superfile reader");
let vectors_after = reader2
.vec()
.expect("get vector reader")
.get_vectors_fp32("emb")
.expect("get vectors fp32");
assert_eq!(vectors_before.len(), vectors_after.len());
for (v1, v2) in vectors_before.iter().zip(vectors_after.iter()) {
for (val1, val2) in v1.iter().zip(v2.iter()) {
assert!((val1 - val2).abs() < 1e-6);
}
}
}
#[tokio::test]
async fn add_batch_from_reader_adds_fts_correctly() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(default_tokenizer()),
);
let mut b1 = SuperfileBuilder::new(opts.clone()).expect("new SuperfileBuilder");
let schema = b1.opts.schema.clone();
let batch = batch_two_rows(&schema);
b1.add_batch(&batch, &[]).expect("add_batch");
let bytes = b1.finish().expect("finish builder");
let reader = SuperfileReader::open(Bytes::from(bytes)).expect("open superfile reader");
let fts_reader = reader.fts().expect("get fts reader");
let results = fts_reader
.search("title", &["hello"], 10, BoolMode::Or)
.await
.expect("search fts");
assert_eq!(results.len(), 1);
let mut b2 = SuperfileBuilder::new(opts).expect("new SuperfileBuilder");
let stats = b2
.add_batch_from_reader(&reader, None)
.expect("add_batch_from_reader");
assert_eq!(stats.n_docs, 2, "stats should report 2 documents");
assert_eq!(stats.id_min, 10, "id_min should be 10");
assert_eq!(stats.id_max, 11, "id_max should be 11");
assert!(
!stats.scalar_stats.is_empty(),
"scalar_stats should have column entries"
);
let merged_bytes = b2.finish().expect("finish builder");
let reader2 =
SuperfileReader::open(Bytes::from(merged_bytes)).expect("open merged superfile reader");
let fts_reader2 = reader2.fts().expect("get fts reader");
let results2 = fts_reader2
.search("title", &["hello"], 10, BoolMode::Or)
.await
.expect("search fts in merged");
assert_eq!(results2.len(), 1);
}
#[tokio::test]
async fn add_batch_from_reader_to_non_empty_builder_includes_both_datasets() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![FtsConfig {
column: "title".into(),
}],
vec![default_vector_config("emb", 7)],
Some(default_tokenizer()),
);
let mut b1 = SuperfileBuilder::new(opts.clone()).expect("new SuperfileBuilder");
let schema = b1.opts.schema.clone();
let batch1 = batch_two_rows(&schema);
let mut v1: Vec<f32> = vec![0.0; 32];
v1[0] = 1.0;
v1[16 + 1] = 1.0;
b1.add_batch(&batch1, &[v1.as_slice()]).expect("add_batch");
let bytes1 = b1.finish().expect("finish builder");
let mut b2 = SuperfileBuilder::new(opts.clone()).expect("new SuperfileBuilder");
let ids2 = decimal128_ids(vec![20u64, 21]);
let title2 = LargeStringArray::from(vec!["foo bar", "baz qux"]);
let body2 = LargeStringArray::from(vec!["quux corge", "grault garply"]);
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(ids2), Arc::new(title2), Arc::new(body2)],
)
.expect("build RecordBatch");
let mut v2: Vec<f32> = vec![0.0; 32];
v2[1] = 1.0;
v2[16] = 1.0;
b2.add_batch(&batch2, &[v2.as_slice()]).expect("add_batch");
let _bytes2 = b2.finish().expect("finish builder");
let reader1 = SuperfileReader::open(Bytes::from(bytes1)).expect("open reader1");
let mut merged = SuperfileBuilder::new(opts.clone()).expect("new SuperfileBuilder");
merged
.add_batch(&batch2, &[v2.as_slice()])
.expect("add_batch");
let stats = merged
.add_batch_from_reader(&reader1, None)
.expect("add_batch_from_reader");
assert_eq!(stats.n_docs, 2, "stats should report 2 documents");
assert_eq!(stats.id_min, 10, "id_min should be 10");
assert_eq!(stats.id_max, 11, "id_max should be 11");
assert!(
!stats.scalar_stats.is_empty(),
"scalar_stats should have column entries"
);
let merged_bytes = merged.finish().expect("finish builder");
let merged_reader =
SuperfileReader::open(Bytes::from(merged_bytes)).expect("open merged reader");
let merged_batch = merged_reader
.get_record_batch(None)
.expect("get_record_batch");
assert_eq!(merged_batch.num_rows(), 4);
let merged_vectors = merged_reader
.vec()
.expect("get vector reader")
.get_vectors_fp32("emb")
.expect("get vectors");
assert_eq!(merged_vectors.len(), 4);
let fts_reader = merged_reader.fts().expect("get fts reader");
let hello_results = fts_reader
.search("title", &["hello"], 10, BoolMode::Or)
.await
.expect("search for hello");
assert!(
!hello_results.is_empty(),
"should find 'hello' from first dataset"
);
let foo_results = fts_reader
.search("title", &["foo"], 10, BoolMode::Or)
.await
.expect("search for foo");
assert!(
!foo_results.is_empty(),
"should find 'foo' from second dataset"
);
}
#[test]
fn add_vector_fp32_returns_correct_vectors() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![],
vec![default_vector_config("emb", 7)],
None,
);
let mut b = SuperfileBuilder::new(opts).expect("new SuperfileBuilder");
let schema = b.opts.schema.clone();
let batch = batch_two_rows(&schema);
let mut v: Vec<f32> = vec![0.0; 32]; v[0] = 1.0;
v[16] = 1.0;
v[17] = 1.0;
v[31] = 1.0;
b.add_batch(&batch, &[v.as_slice()]).expect("add_batch");
let bytes = b.finish().expect("finish builder");
let reader = SuperfileReader::open(Bytes::from(bytes)).expect("open superfile reader");
let vectors = reader
.vec()
.expect("get vector reader")
.get_vectors_fp32("emb")
.expect("get vectors fp32");
assert_eq!(vectors.len(), 2, "should have 2 vectors");
assert_eq!(
vectors[0].len(),
16,
"first vector should have 16 dimensions"
);
assert_eq!(
vectors[1].len(),
16,
"second vector should have 16 dimensions"
);
assert!((vectors[0][0] - 1.0).abs() < 1e-6);
assert!((vectors[0][1] - 0.0).abs() < 1e-6);
assert!((vectors[1][0] - 1.0).abs() < 1e-6);
assert!((vectors[1][1] - 1.0).abs() < 1e-6);
assert!((vectors[1][15] - 1.0).abs() < 1e-6);
}
#[test]
fn add_vector_fp32_rejects_non_fp32_codec() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![],
vec![VectorConfig {
column: "emb".into(),
dim: 16,
n_cent: 4,
rot_seed: 7,
metric: Metric::L2Sq,
rerank_codec: RerankCodec::Sq8ResidualEpsilon,
}],
None,
);
let mut b = SuperfileBuilder::new(opts).expect("new SuperfileBuilder");
let schema = b.opts.schema.clone();
let batch = batch_two_rows(&schema);
let v: Vec<f32> = vec![0.0; 32];
b.add_batch(&batch, &[v.as_slice()]).expect("add_batch");
let bytes = b.finish().expect("finish builder");
let reader = SuperfileReader::open(Bytes::from(bytes)).expect("open superfile reader");
let result = reader
.vec()
.expect("get vector reader")
.get_vectors_fp32("emb");
assert!(result.is_err(), "should reject Sq8ResidualEpsilon codec");
}
#[tokio::test]
async fn add_batch_from_reader_queries_work_correctly() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![FtsConfig {
column: "title".into(),
}],
vec![default_vector_config("emb", 7)],
Some(default_tokenizer()),
);
let mut b1 = SuperfileBuilder::new(opts.clone()).expect("new SuperfileBuilder");
let schema = b1.opts.schema.clone();
let batch = batch_two_rows(&schema);
let mut v: Vec<f32> = vec![0.0; 32]; v[0] = 1.0;
v[16 + 1] = 1.0;
b1.add_batch(&batch, &[v.as_slice()]).expect("add_batch");
let bytes1 = b1.finish().expect("finish builder");
let reader1 = SuperfileReader::open(Bytes::from(bytes1)).expect("open reader1");
let mut b_merged = SuperfileBuilder::new(opts).expect("new SuperfileBuilder");
let stats = b_merged
.add_batch_from_reader(&reader1, None)
.expect("add_batch_from_reader");
assert_eq!(stats.n_docs, 2, "stats should report 2 documents");
assert_eq!(stats.id_min, 10, "id_min should be 10");
assert_eq!(stats.id_max, 11, "id_max should be 11");
assert!(
!stats.scalar_stats.is_empty(),
"scalar_stats should have column entries"
);
let merged_bytes = b_merged.finish().expect("finish builder");
let reader_merged =
SuperfileReader::open(Bytes::from(merged_bytes)).expect("open merged reader");
let vec_reader = reader_merged.vec().expect("get vector reader");
let search_results = vec_reader
.search(
"emb",
&[
1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
],
10,
4,
100,
)
.await
.expect("vector search");
assert!(
!search_results.is_empty(),
"vector search should return results"
);
let fts_reader = reader_merged.fts().expect("get fts reader");
let fts_results = fts_reader
.search("title", &["hello"], 10, BoolMode::Or)
.await
.expect("fts search");
assert!(!fts_results.is_empty(), "fts search should return results");
let batch = reader_merged
.get_record_batch(None)
.expect("get_record_batch");
assert_eq!(batch.num_rows(), 2);
}
#[test]
fn build_from_readers_rejects_empty_readers_array() {
let result = SuperfileBuilder::build_from_readers(&[]);
assert!(result.is_err(), "should reject empty readers array");
}
fn empty_bitmap() -> Option<Arc<RoaringBitmap>> {
None
}
#[test]
fn build_from_readers_single_reader_produces_valid_superfile() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(default_tokenizer()),
);
let mut b = SuperfileBuilder::new(opts).expect("new SuperfileBuilder");
let schema = b.opts.schema.clone();
let batch = batch_two_rows(&schema);
b.add_batch(&batch, &[]).expect("add_batch");
let original_bytes = b.finish().expect("finish builder");
let reader = SuperfileReader::open(Bytes::from(original_bytes.clone()))
.expect("open superfile reader");
let (merged_bytes, stats) =
SuperfileBuilder::build_from_readers(&[(Arc::new(reader), empty_bitmap())])
.expect("build_from_readers");
assert_eq!(&merged_bytes[..4], b"PAR1");
assert_eq!(&merged_bytes[merged_bytes.len() - 4..], b"PAR1");
assert_eq!(stats.n_docs, 2);
assert_eq!(stats.id_min, 10);
assert_eq!(stats.id_max, 11);
assert!(stats.scalar_stats.contains_key("doc_id"));
assert!(stats.scalar_stats.contains_key("title"));
assert!(stats.scalar_stats.contains_key("body"));
let merged_reader =
SuperfileReader::open(Bytes::from(merged_bytes)).expect("open merged reader");
let merged_batch = merged_reader
.get_record_batch(None)
.expect("get_record_batch");
assert_eq!(merged_batch.num_rows(), 2);
}
#[test]
fn build_from_readers_merges_multiple_readers_correctly() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(default_tokenizer()),
);
let mut b1 = SuperfileBuilder::new(opts.clone()).expect("new SuperfileBuilder");
let schema = b1.opts.schema.clone();
let batch1 = batch_two_rows(&schema);
b1.add_batch(&batch1, &[]).expect("add_batch");
let bytes1 = b1.finish().expect("finish builder");
let mut b2 = SuperfileBuilder::new(opts).expect("new SuperfileBuilder");
let ids2 = decimal128_ids(vec![20u64, 21]);
let title2 = LargeStringArray::from(vec!["foo bar", "baz qux"]);
let body2 = LargeStringArray::from(vec!["quux corge", "grault garply"]);
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(ids2), Arc::new(title2), Arc::new(body2)],
)
.expect("build RecordBatch");
b2.add_batch(&batch2, &[]).expect("add_batch");
let bytes2 = b2.finish().expect("finish builder");
let reader1 = SuperfileReader::open(Bytes::from(bytes1)).expect("open reader1");
let reader2 = SuperfileReader::open(Bytes::from(bytes2)).expect("open reader2");
let (merged_bytes, stats) = SuperfileBuilder::build_from_readers(&[
(Arc::new(reader1), empty_bitmap()),
(Arc::new(reader2), empty_bitmap()),
])
.expect("build_from_readers");
assert_eq!(stats.n_docs, 4, "should have 4 total documents");
assert_eq!(stats.id_min, 10, "id_min should be 10");
assert_eq!(stats.id_max, 21, "id_max should be 21");
assert_eq!(stats.scalar_stats.len(), 3, "should have 3 columns");
let merged_reader =
SuperfileReader::open(Bytes::from(merged_bytes)).expect("open merged reader");
let merged_batch = merged_reader
.get_record_batch(None)
.expect("get_record_batch");
assert_eq!(merged_batch.num_rows(), 4);
}
#[test]
fn build_from_readers_preserves_vectors_and_fts() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![FtsConfig {
column: "title".into(),
}],
vec![default_vector_config("emb", 7)],
Some(default_tokenizer()),
);
let mut b1 = SuperfileBuilder::new(opts).expect("new SuperfileBuilder");
let schema = b1.opts.schema.clone();
let batch = batch_two_rows(&schema);
let mut v: Vec<f32> = vec![0.0; 32]; v[0] = 1.0;
v[16 + 1] = 1.0;
b1.add_batch(&batch, &[v.as_slice()]).expect("add_batch");
let bytes1 = b1.finish().expect("finish builder");
let reader = SuperfileReader::open(Bytes::from(bytes1)).expect("open reader");
let (merged_bytes, stats) =
SuperfileBuilder::build_from_readers(&[(Arc::new(reader), empty_bitmap())])
.expect("build_from_readers");
assert_eq!(stats.n_docs, 2);
assert_eq!(stats.id_min, 10);
assert_eq!(stats.id_max, 11);
let merged_reader =
SuperfileReader::open(Bytes::from(merged_bytes)).expect("open merged reader");
assert!(merged_reader.fts().is_some(), "FTS index should be present");
assert!(
merged_reader.vec().is_some(),
"Vector index should be present"
);
}
#[tokio::test]
async fn build_from_readers_preserves_fts_search_functionality() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(default_tokenizer()),
);
let mut b = SuperfileBuilder::new(opts.clone()).expect("new SuperfileBuilder");
let schema = b.opts.schema.clone();
let batch = batch_two_rows(&schema);
b.add_batch(&batch, &[]).expect("add_batch");
let bytes = b.finish().expect("finish builder");
let reader1 = SuperfileReader::open(Bytes::from(bytes)).expect("open reader");
let mut b2 = SuperfileBuilder::new(opts.clone()).expect("new SuperfileBuilder");
b2.add_batch(&batch, &[]).expect("add batch");
let bytes = b2.finish().expect("finish builder");
let reader2 = SuperfileReader::open(Bytes::from(bytes)).expect("open reader");
let (merged_bytes, stats) = SuperfileBuilder::build_from_readers(&[
(Arc::new(reader1), empty_bitmap()),
(Arc::new(reader2), empty_bitmap()),
])
.expect("build_from_readers");
assert_eq!(stats.n_docs, 4, "should have 4 documents (2 + 2)");
assert_eq!(stats.id_min, 10);
assert_eq!(stats.id_max, 11);
let merged_reader =
SuperfileReader::open(Bytes::from(merged_bytes)).expect("open merged reader");
let fts_reader_merged = merged_reader.fts().expect("get fts reader from merged");
let results_merged = fts_reader_merged
.search("title", &["hello"], 10, BoolMode::Or)
.await
.expect("search merged");
assert_eq!(results_merged.len(), 2);
}
#[test]
fn build_from_readers_three_superfiles() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(default_tokenizer()),
);
let mut bytes_list = Vec::new();
for base_id in [10u64, 20u64, 30u64] {
let mut b = SuperfileBuilder::new(opts.clone()).expect("new SuperfileBuilder");
let schema = b.opts.schema.clone();
let ids = decimal128_ids(vec![base_id, base_id + 1]);
let title = LargeStringArray::from(vec!["foo", "bar"]);
let body = LargeStringArray::from(vec!["baz", "qux"]);
let batch =
RecordBatch::try_new(schema, vec![Arc::new(ids), Arc::new(title), Arc::new(body)])
.expect("build RecordBatch");
b.add_batch(&batch, &[]).expect("add_batch");
bytes_list.push(b.finish().expect("finish builder"));
}
let readers: Vec<_> = bytes_list
.iter()
.map(|b| {
(
Arc::new(SuperfileReader::open(Bytes::from(b.clone())).expect("open reader")),
empty_bitmap(),
)
})
.collect();
let (merged_bytes, stats) =
SuperfileBuilder::build_from_readers(&readers).expect("build_from_readers");
assert_eq!(stats.n_docs, 6, "should have 6 total documents");
assert_eq!(stats.id_min, 10, "id_min should be 10");
assert_eq!(stats.id_max, 31, "id_max should be 31");
let merged_reader =
SuperfileReader::open(Bytes::from(merged_bytes)).expect("open merged reader");
let merged_batch = merged_reader
.get_record_batch(None)
.expect("get_record_batch");
assert_eq!(merged_batch.num_rows(), 6);
}
#[tokio::test]
async fn build_from_readers_with_only_vectors_and_search() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![],
vec![default_vector_config("emb", 7)],
None,
);
let mut b1 = SuperfileBuilder::new(opts.clone()).expect("new SuperfileBuilder");
let schema = b1.opts.schema.clone();
let batch1 = batch_two_rows(&schema);
let mut v1: Vec<f32> = vec![0.0; 32]; v1[0] = 1.0;
v1[16 + 1] = 1.0;
b1.add_batch(&batch1, &[v1.as_slice()]).expect("add_batch");
let bytes1 = b1.finish().expect("finish builder");
let mut b2 = SuperfileBuilder::new(opts.clone()).expect("new SuperfileBuilder");
let ids2 = decimal128_ids(vec![20u64, 21]);
let title2 = LargeStringArray::from(vec!["foo bar", "baz qux"]);
let body2 = LargeStringArray::from(vec!["quux corge", "grault garply"]);
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(ids2), Arc::new(title2), Arc::new(body2)],
)
.expect("build RecordBatch");
let mut v2: Vec<f32> = vec![0.0; 32];
v2[1] = 1.0;
v2[16 + 2] = 1.0;
b2.add_batch(&batch2, &[v2.as_slice()]).expect("add_batch");
let bytes2 = b2.finish().expect("finish builder");
let reader1 = SuperfileReader::open(Bytes::from(bytes1)).expect("open reader1");
let reader2 = SuperfileReader::open(Bytes::from(bytes2)).expect("open reader2");
let (merged_bytes, stats) = SuperfileBuilder::build_from_readers(&[
(Arc::new(reader1), empty_bitmap()),
(Arc::new(reader2), empty_bitmap()),
])
.expect("build_from_readers");
assert_eq!(stats.n_docs, 4, "should have 4 total documents");
assert_eq!(stats.id_min, 10, "id_min should be 10");
assert_eq!(stats.id_max, 21, "id_max should be 21");
let merged_reader =
SuperfileReader::open(Bytes::from(merged_bytes)).expect("open merged reader");
assert!(merged_reader.vec().is_some(), "should have vector index");
assert!(merged_reader.fts().is_none(), "should not have FTS index");
let batch = merged_reader
.get_record_batch(None)
.expect("get_record_batch");
assert_eq!(batch.num_rows(), 4, "should have 4 rows (2 + 2)");
let vec_reader = merged_reader.vec().expect("get vector reader");
let query = [
1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
];
let search_results = vec_reader
.search("emb", &query, 10, 4, 100)
.await
.expect("vector search");
assert_eq!(
search_results.len(),
4,
"vector search should return all 4 vectors from merged superfiles"
);
}
#[test]
fn build_from_readers_filters_deleted_documents() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(default_tokenizer()),
);
let mut b1 = SuperfileBuilder::new(opts.clone()).expect("new SuperfileBuilder");
let schema = b1.opts.schema.clone();
let batch1 = batch_two_rows(&schema);
b1.add_batch(&batch1, &[]).expect("add_batch");
let bytes1 = b1.finish().expect("finish builder");
let mut b2 = SuperfileBuilder::new(opts.clone()).expect("new SuperfileBuilder");
let ids2 = decimal128_ids(vec![20u64, 21]);
let title2 = LargeStringArray::from(vec!["foo bar", "baz qux"]);
let body2 = LargeStringArray::from(vec!["quux corge", "grault garply"]);
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(ids2), Arc::new(title2), Arc::new(body2)],
)
.expect("build RecordBatch");
b2.add_batch(&batch2, &[]).expect("add_batch");
let bytes2 = b2.finish().expect("finish builder");
let reader1 = SuperfileReader::open(Bytes::from(bytes1)).expect("open reader1");
let reader2 = SuperfileReader::open(Bytes::from(bytes2)).expect("open reader2");
let mut bitmap1 = RoaringBitmap::new();
bitmap1.insert(0);
let mut bitmap2 = RoaringBitmap::new();
bitmap2.insert(1);
let (merged_bytes, stats) = SuperfileBuilder::build_from_readers(&[
(Arc::new(reader1), Some(Arc::new(bitmap1))),
(Arc::new(reader2), Some(Arc::new(bitmap2))),
])
.expect("build_from_readers");
assert_eq!(stats.n_docs, 2, "should have 2 documents after filtering");
assert_eq!(stats.id_min, 11, "id_min should be 11 (from reader1 row 1)");
assert_eq!(stats.id_max, 20, "id_max should be 20 (from reader2 row 0)");
let merged_reader =
SuperfileReader::open(Bytes::from(merged_bytes)).expect("open merged reader");
let merged_batch = merged_reader
.get_record_batch(None)
.expect("get_record_batch");
assert_eq!(
merged_batch.num_rows(),
2,
"merged superfile should have 2 rows after filtering deleted documents"
);
}
#[test]
fn build_from_readers_validates_scalar_stats_min_max_single_reader() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(default_tokenizer()),
);
let mut b = SuperfileBuilder::new(opts).expect("new SuperfileBuilder");
let schema = b.opts.schema.clone();
let batch = batch_two_rows(&schema);
b.add_batch(&batch, &[]).expect("add_batch");
let bytes = b.finish().expect("finish builder");
let reader = SuperfileReader::open(Bytes::from(bytes)).expect("open reader");
let (_, stats) =
SuperfileBuilder::build_from_readers(&[(Arc::new(reader), empty_bitmap())])
.expect("build_from_readers");
let doc_id_agg = stats.scalar_stats.get("doc_id").expect("doc_id column");
let (doc_id_min_arr, doc_id_max_arr) = (&doc_id_agg.min, &doc_id_agg.max);
let doc_id_min = doc_id_min_arr
.as_ref()
.as_any()
.downcast_ref::<Decimal128Array>()
.expect("downcast to Decimal128")
.value(0);
let doc_id_max = doc_id_max_arr
.as_ref()
.as_any()
.downcast_ref::<Decimal128Array>()
.expect("downcast to Decimal128")
.value(0);
assert_eq!(doc_id_min, 10, "doc_id min should be 10");
assert_eq!(doc_id_max, 11, "doc_id max should be 11");
let title_agg = stats.scalar_stats.get("title").expect("title column");
let (title_min_arr, title_max_arr) = (&title_agg.min, &title_agg.max);
let title_min = title_min_arr
.as_ref()
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("downcast to LargeStringArray")
.value(0);
let title_max = title_max_arr
.as_ref()
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("downcast to LargeStringArray")
.value(0);
assert_eq!(
title_min, "hello world",
"title min should be 'hello world'"
);
assert_eq!(title_max, "rust async", "title max should be 'rust async'");
let body_agg = stats.scalar_stats.get("body").expect("body column");
let (body_min_arr, body_max_arr) = (&body_agg.min, &body_agg.max);
let body_min = body_min_arr
.as_ref()
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("downcast to LargeStringArray")
.value(0);
let body_max = body_max_arr
.as_ref()
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("downcast to LargeStringArray")
.value(0);
assert_eq!(body_min, "baz quux", "body min should be 'baz quux'");
assert_eq!(body_max, "foo bar", "body max should be 'foo bar'");
}
#[test]
fn build_from_readers_validates_scalar_stats_across_multiple_readers() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(default_tokenizer()),
);
let mut b1 = SuperfileBuilder::new(opts.clone()).expect("new SuperfileBuilder");
let schema = b1.opts.schema.clone();
let batch1 = batch_two_rows(&schema);
b1.add_batch(&batch1, &[]).expect("add_batch");
let bytes1 = b1.finish().expect("finish builder");
let mut b2 = SuperfileBuilder::new(opts).expect("new SuperfileBuilder");
let ids2 = decimal128_ids(vec![20u64, 21]);
let title2 = LargeStringArray::from(vec!["alpha", "zeta"]);
let body2 = LargeStringArray::from(vec!["aaa", "zzz"]);
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(ids2), Arc::new(title2), Arc::new(body2)],
)
.expect("build RecordBatch");
b2.add_batch(&batch2, &[]).expect("add_batch");
let bytes2 = b2.finish().expect("finish builder");
let reader1 = SuperfileReader::open(Bytes::from(bytes1)).expect("open reader1");
let reader2 = SuperfileReader::open(Bytes::from(bytes2)).expect("open reader2");
let (_, stats) = SuperfileBuilder::build_from_readers(&[
(Arc::new(reader1), empty_bitmap()),
(Arc::new(reader2), empty_bitmap()),
])
.expect("build_from_readers");
let doc_id_agg = stats.scalar_stats.get("doc_id").expect("doc_id column");
let (doc_id_min_arr, doc_id_max_arr) = (&doc_id_agg.min, &doc_id_agg.max);
let doc_id_min = doc_id_min_arr
.as_ref()
.as_any()
.downcast_ref::<Decimal128Array>()
.expect("downcast to Decimal128")
.value(0);
let doc_id_max = doc_id_max_arr
.as_ref()
.as_any()
.downcast_ref::<Decimal128Array>()
.expect("downcast to Decimal128")
.value(0);
assert_eq!(doc_id_min, 10, "merged doc_id min should be 10");
assert_eq!(doc_id_max, 21, "merged doc_id max should be 21");
let title_agg = stats.scalar_stats.get("title").expect("title column");
let (title_min_arr, title_max_arr) = (&title_agg.min, &title_agg.max);
let title_min = title_min_arr
.as_ref()
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("downcast to LargeStringArray")
.value(0);
let title_max = title_max_arr
.as_ref()
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("downcast to LargeStringArray")
.value(0);
assert_eq!(title_min, "alpha", "merged title min should be 'alpha'");
assert_eq!(title_max, "zeta", "merged title max should be 'zeta'");
let body_agg = stats.scalar_stats.get("body").expect("body column");
let (body_min_arr, body_max_arr) = (&body_agg.min, &body_agg.max);
let body_min = body_min_arr
.as_ref()
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("downcast to LargeStringArray")
.value(0);
let body_max = body_max_arr
.as_ref()
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("downcast to LargeStringArray")
.value(0);
assert_eq!(body_min, "aaa", "merged body min should be 'aaa'");
assert_eq!(body_max, "zzz", "merged body max should be 'zzz'");
}
#[test]
fn build_from_readers_validates_scalar_stats_with_string_columns() {
let opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(default_tokenizer()),
);
let mut b = SuperfileBuilder::new(opts).expect("new SuperfileBuilder");
let schema = b.opts.schema.clone();
let ids = decimal128_ids(vec![1u64, 2]);
let titles = LargeStringArray::from(vec!["zebra", "apple"]);
let bodies = LargeStringArray::from(vec!["xyz", "abc"]);
let batch = RecordBatch::try_new(
schema,
vec![Arc::new(ids), Arc::new(titles), Arc::new(bodies)],
)
.expect("build RecordBatch");
b.add_batch(&batch, &[]).expect("add_batch");
let bytes = b.finish().expect("finish builder");
let reader = SuperfileReader::open(Bytes::from(bytes)).expect("open reader");
let (_, stats) =
SuperfileBuilder::build_from_readers(&[(Arc::new(reader), empty_bitmap())])
.expect("build_from_readers");
let title_agg = stats.scalar_stats.get("title").expect("title column");
let (title_min_arr, title_max_arr) = (&title_agg.min, &title_agg.max);
let title_min = title_min_arr
.as_ref()
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("downcast to LargeStringArray")
.value(0);
let title_max = title_max_arr
.as_ref()
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("downcast to LargeStringArray")
.value(0);
assert_eq!(title_min, "apple", "title min should be 'apple'");
assert_eq!(title_max, "zebra", "title max should be 'zebra'");
let body_agg = stats.scalar_stats.get("body").expect("body column");
let (body_min_arr, body_max_arr) = (&body_agg.min, &body_agg.max);
let body_min = body_min_arr
.as_ref()
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("downcast to LargeStringArray")
.value(0);
let body_max = body_max_arr
.as_ref()
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("downcast to LargeStringArray")
.value(0);
assert_eq!(body_min, "abc", "body min should be 'abc'");
assert_eq!(body_max, "xyz", "body max should be 'xyz'");
}
#[tokio::test]
async fn add_batch_from_reader_sq8_succeeds_and_search_works() {
let sq8_opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![],
vec![
default_vector_config("emb", 7).with_rerank_codec(RerankCodec::Sq8ResidualEpsilon),
],
None,
);
let mut b1 = SuperfileBuilder::new(sq8_opts.clone()).expect("new SuperfileBuilder");
let schema = b1.opts.schema.clone();
let batch = batch_two_rows(&schema);
let mut v: Vec<f32> = vec![0.0; 32]; v[0] = 1.0; v[16 + 1] = 1.0; b1.add_batch(&batch, &[v.as_slice()]).expect("add_batch");
let source_bytes = b1.finish().expect("finish builder");
let reader = SuperfileReader::open(Bytes::from(source_bytes)).expect("open source reader");
let mut b2 = SuperfileBuilder::new(sq8_opts).expect("new SuperfileBuilder");
b2.add_batch_from_reader(&reader, None)
.expect("add_batch_from_reader must succeed for Sq8 source");
let merged_bytes = b2.finish().expect("finish merged builder");
let merged = SuperfileReader::open(Bytes::from(merged_bytes)).expect("open merged reader");
assert_eq!(merged.n_docs(), 2);
let col = merged
.vec()
.expect("vector index present")
.vector_columns_config()
.next()
.expect("has column");
assert_eq!(
col.rerank_codec,
RerankCodec::Sq8ResidualEpsilon,
"merged superfile must carry Sq8ResidualEpsilon codec"
);
let mut query = vec![0.0f32; 16];
query[0] = 1.0;
let hits = merged
.vec()
.expect("vector reader")
.search("emb", &query, 1, 4, 100)
.await
.expect("vector search on merged Sq8 superfile");
assert!(!hits.is_empty(), "search should return at least one result");
assert_eq!(hits[0].0, 0, "top hit for axis-0 query must be doc 0");
}
#[tokio::test]
async fn build_from_readers_fp32_codec_preserved_by_new_from_reader() {
let fp32_opts = BuilderOptions::new(
schema_with_fts(),
"doc_id",
vec![],
vec![default_vector_config("emb", 7)], None,
);
let mut b = SuperfileBuilder::new(fp32_opts).expect("new SuperfileBuilder");
let schema = b.opts.schema.clone();
let batch = batch_two_rows(&schema);
let mut v: Vec<f32> = vec![0.0; 32];
v[0] = 1.0;
v[16 + 1] = 1.0;
b.add_batch(&batch, &[v.as_slice()]).expect("add_batch");
let source_bytes = b.finish().expect("finish builder");
let reader = SuperfileReader::open(Bytes::from(source_bytes)).expect("open reader");
let (merged_bytes, stats) =
SuperfileBuilder::build_from_readers(&[(Arc::new(reader), empty_bitmap())])
.expect("build_from_readers");
assert_eq!(stats.n_docs, 2);
let merged = SuperfileReader::open(Bytes::from(merged_bytes)).expect("open merged reader");
let col = merged
.vec()
.expect("vector index")
.vector_columns_config()
.next()
.expect("has column");
assert_eq!(
col.rerank_codec,
RerankCodec::Fp32,
"build_from_readers must preserve Fp32 codec from source superfile"
);
let mut query = vec![0.0f32; 16];
query[0] = 1.0;
let hits = merged
.vec()
.expect("vector reader")
.search("emb", &query, 1, 4, 100)
.await
.expect("vector search on merged Fp32 superfile");
assert!(!hits.is_empty());
assert_eq!(hits[0].0, 0, "top hit for axis-0 query must be doc 0");
}
#[test]
fn debug_and_set_fts_spill_threshold() {
const FORCE_SPILL_THRESHOLD: usize = 1;
let mut b = SuperfileBuilder::new(opts_minimal()).expect("new SuperfileBuilder");
b.set_fts_spill_threshold_bytes(FORCE_SPILL_THRESHOLD);
let rendered = format!("{b:?}");
assert!(
rendered.contains("SuperfileBuilder"),
"debug output names the struct: {rendered}"
);
assert!(
rendered.contains("n_fts_columns"),
"debug output lists fts columns: {rendered}"
);
}
}