use std::sync::Arc;
use arrow::compute::{concat_batches, take};
use arrow_array::{
ArrayRef, Decimal128Array, Float32Array, RecordBatch, RecordBatchOptions, UInt32Array,
};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::{
error::{DataFusionError, Result as DfResult},
logical_expr::Expr,
scalar::ScalarValue,
};
use futures::{TryStreamExt, future::try_join_all};
use object_store::{ObjectStore, path::Path};
use parquet::arrow::{
ProjectionMask,
async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder},
};
use rayon::prelude::*;
use tokio::sync::oneshot;
use crate::{
superfile::{
SuperfileReader,
reader::{rank_back_indices, row_selection_for_ids},
},
supertable::{
handle::SupertableReader,
manifest::SuperfileUri,
options::{DECIMAL128_PRECISION, DECIMAL128_SCALE},
query::{SuperfileHit, superfile_reader::superfile_reader},
},
};
pub(crate) async fn resolve_hits_named(
reader: &SupertableReader,
hits: &[SuperfileHit],
projection: Option<&[&str]>,
what: &str,
) -> DfResult<RecordBatch> {
let scalar_schema = reader.options().scalar_schema();
let output_schema = output_schema_with_score(&scalar_schema);
let id_column = reader.options().id_column.clone();
let bare: [&str; 2] = [id_column.as_str(), SCORE_COLUMN];
let names: &[&str] = match projection {
Some(names) => names,
None => &bare,
};
let indices: Option<Vec<usize>> = Some(
names
.iter()
.map(|name| {
output_schema.index_of(name).map_err(|_| {
DataFusionError::Execution(format!("{what}: unknown column {name:?}"))
})
})
.collect::<Result<_, _>>()?,
);
resolve_hits(
reader,
hits,
&scalar_schema,
&output_schema,
indices.as_deref(),
)
.await
}
pub(crate) const SCORE_COLUMN: &str = "score";
pub(crate) fn output_schema_with_score(scalar_schema: &SchemaRef) -> SchemaRef {
let mut fields: Vec<Field> = scalar_schema
.fields()
.iter()
.map(|f| f.as_ref().clone())
.collect();
fields.push(Field::new(SCORE_COLUMN, DataType::Float32, false));
Arc::new(Schema::new(fields))
}
pub(crate) async fn resolve_hits(
reader: &SupertableReader,
hits: &[SuperfileHit],
scalar_schema: &SchemaRef,
output_schema: &SchemaRef,
projection: Option<&[usize]>,
) -> DfResult<RecordBatch> {
let projected_schema = match projection {
Some(indices) => Arc::new(
output_schema
.project(indices)
.map_err(|e| DataFusionError::Execution(e.to_string()))?,
),
None => Arc::clone(output_schema),
};
if hits.is_empty() {
return Ok(RecordBatch::new_empty(projected_schema));
}
let score_idx = scalar_schema.fields().len();
let requested: Vec<usize> = match projection {
Some(indices) => indices.to_vec(),
None => (0..output_schema.fields().len()).collect(),
};
let mut needed: Vec<&str> = Vec::new();
for &p in &requested {
if p != score_idx {
let name = scalar_schema.field(p).name().as_str();
if !needed.contains(&name) {
needed.push(name);
}
}
}
let id_column = reader.options().id_column.as_str();
let resolved = if needed.is_empty() {
None
} else if needed == [id_column] {
match resolve_ids_arithmetic(reader, hits) {
Some(batch) => Some(batch?),
None => Some(resolve_columns(reader, hits, &needed).await?),
}
} else {
Some(resolve_columns(reader, hits, &needed).await?)
};
let score = Arc::new(Float32Array::from_iter_values(hits.iter().map(|h| h.score))) as ArrayRef;
let mut columns: Vec<ArrayRef> = Vec::with_capacity(requested.len());
for &p in &requested {
if p == score_idx {
columns.push(Arc::clone(&score));
} else {
let name = scalar_schema.field(p).name();
let rb = resolved
.as_ref()
.expect("a scalar column is projected => columns resolved");
let idx = rb
.schema()
.index_of(name)
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
columns.push(Arc::clone(rb.column(idx)));
}
}
RecordBatch::try_new_with_options(
projected_schema,
columns,
&RecordBatchOptions::new().with_row_count(Some(hits.len())),
)
.map_err(|e| DataFusionError::Execution(e.to_string()))
}
fn resolve_ids_arithmetic(
reader: &SupertableReader,
hits: &[SuperfileHit],
) -> Option<DfResult<RecordBatch>> {
let manifest = reader.manifest();
let mut memo: Vec<(SuperfileUri, i128)> = Vec::new();
let mut ids: Vec<i128> = Vec::with_capacity(hits.len());
for hit in hits {
let base = match memo.iter().find(|(uri, _)| *uri == hit.superfile) {
Some((_, base)) => *base,
None => {
let entry = manifest
.superfiles
.iter()
.find(|e| e.uri == hit.superfile)?;
let n_docs = i128::from(entry.n_docs);
let span = entry.id_max.checked_sub(entry.id_min)?.checked_add(1)?;
if n_docs == 0 || span != n_docs {
return None;
}
memo.push((hit.superfile, entry.id_min));
entry.id_min
}
};
ids.push(base + i128::from(hit.local_doc_id));
}
let array = match Decimal128Array::from_iter_values(ids)
.with_precision_and_scale(DECIMAL128_PRECISION, DECIMAL128_SCALE)
{
Ok(a) => a,
Err(e) => return Some(Err(DataFusionError::Execution(e.to_string()))),
};
let schema = Arc::new(Schema::new(vec![Field::new(
reader.options().id_column.clone(),
DataType::Decimal128(DECIMAL128_PRECISION, DECIMAL128_SCALE),
false,
)]));
Some(
RecordBatch::try_new(schema, vec![Arc::new(array) as ArrayRef])
.map_err(|e| DataFusionError::Execution(e.to_string())),
)
}
async fn resolve_columns(
reader: &SupertableReader,
hits: &[SuperfileHit],
names: &[&str],
) -> DfResult<RecordBatch> {
let mut seg_order: Vec<SuperfileUri> = Vec::new();
let mut seg_locals: Vec<Vec<u32>> = Vec::new();
let mut placement: Vec<(usize, usize)> = Vec::with_capacity(hits.len());
for hit in hits {
let seg_idx = match seg_order.iter().position(|s| *s == hit.superfile) {
Some(i) => i,
None => {
seg_order.push(hit.superfile);
seg_locals.push(Vec::new());
seg_order.len() - 1
}
};
let row = seg_locals[seg_idx].len();
seg_locals[seg_idx].push(hit.local_doc_id);
placement.push((seg_idx, row));
}
let manifest = reader.manifest();
let store = &manifest.options.store;
let disk_cache = manifest.options.disk_cache.as_ref();
let storage = manifest.options.storage.as_ref();
let opened = try_join_all(
seg_order
.iter()
.map(|uri| superfile_reader(store, disk_cache, storage, uri, None)),
)
.await
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
let mut warm_inputs: Vec<(usize, Arc<SuperfileReader>, Vec<u32>)> = Vec::new();
let mut cold_units: Vec<(usize, &SuperfileUri, &Arc<SuperfileReader>, &[u32])> = Vec::new();
for (i, ((uri, rd), locals)) in seg_order
.iter()
.zip(opened.iter())
.zip(seg_locals.iter())
.enumerate()
{
if rd.parquet_bytes().is_some() {
warm_inputs.push((i, Arc::clone(rd), locals.clone()));
} else {
cold_units.push((i, uri, rd, locals.as_slice()));
}
}
let warm_wave = async {
if warm_inputs.is_empty() {
return Ok::<Vec<(usize, RecordBatch)>, DataFusionError>(Vec::new());
}
let owned_names: Vec<String> = names.iter().map(|s| (*s).to_string()).collect();
let pool = Arc::clone(&manifest.options.reader_pool);
let inputs = warm_inputs;
let (tx, rx) = oneshot::channel();
pool.spawn(move || {
let name_refs: Vec<&str> = owned_names.iter().map(String::as_str).collect();
let result: Result<Vec<(usize, RecordBatch)>, _> = inputs
.into_par_iter()
.map(|(i, sf, locals)| {
sf.take_by_local_doc_ids(&locals, &name_refs)
.map(|batch| (i, batch))
})
.collect();
let _ = tx.send(result);
});
rx.await
.map_err(|_| {
DataFusionError::Execution("resolve decode: reader pool dropped result".into())
})?
.map_err(|e| DataFusionError::Execution(e.to_string()))
};
let cold_wave = try_join_all(cold_units.into_iter().map(
|(i, uri, reader, locals)| {
let storage = storage.cloned();
let file_size = manifest
.superfiles
.iter()
.find(|e| e.uri == *uri)
.and_then(|e| e.subsection_offsets.as_ref())
.map(|o| o.total_size);
async move {
let storage = storage.ok_or_else(|| {
DataFusionError::Execution(format!(
"resolve_hits needs row bytes for {uri:?}, but the reader was lazy and no storage backend is attached"
))
})?;
let (store, path) =
storage.object_store_handle(&uri.storage_path()).ok_or_else(|| {
DataFusionError::Execution(format!(
"resolve_hits: storage backend exposes no object_store handle for {uri:?}"
))
})?;
take_rows_object_store(
store,
path,
file_size,
reader.schema(),
reader.n_docs(),
locals,
names,
)
.await
.map(|batch| (i, batch))
}
},
));
let (warm_done, cold_done) = tokio::join!(warm_wave, cold_wave);
let mut slots: Vec<Option<RecordBatch>> = vec![None; seg_order.len()];
for (i, batch) in warm_done?.into_iter().chain(cold_done?) {
slots[i] = Some(batch);
}
let per_superfile: Vec<RecordBatch> = slots
.into_iter()
.map(|s| s.expect("invariant: every superfile resolved by exactly one wave"))
.collect();
let cat_schema = per_superfile[0].schema();
let combined = concat_batches(&cat_schema, &per_superfile)
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
let mut offsets: Vec<u32> = Vec::with_capacity(per_superfile.len());
let mut acc: u32 = 0;
for batch in &per_superfile {
offsets.push(acc);
acc += batch.num_rows() as u32;
}
let reorder =
UInt32Array::from_iter_values(placement.iter().map(|(s, r)| offsets[*s] + *r as u32));
let mut columns: Vec<ArrayRef> = Vec::with_capacity(combined.num_columns());
for column in combined.columns() {
columns.push(
take(column, &reorder, None).map_err(|e| DataFusionError::Execution(e.to_string()))?,
);
}
RecordBatch::try_new(combined.schema(), columns)
.map_err(|e| DataFusionError::Execution(e.to_string()))
}
async fn take_rows_object_store(
store: Arc<dyn ObjectStore>,
path: Path,
file_size: Option<u64>,
file_schema: &SchemaRef,
n_docs: u64,
local_doc_ids: &[u32],
names: &[&str],
) -> DfResult<RecordBatch> {
let mut col_indices = Vec::with_capacity(names.len());
let mut out_fields: Vec<Field> = Vec::with_capacity(names.len());
for &name in names {
let idx = file_schema
.index_of(name)
.map_err(|_| DataFusionError::Execution(format!("unknown column {name}")))?;
col_indices.push(idx);
out_fields.push(file_schema.field(idx).clone());
}
let out_schema = Arc::new(Schema::new(out_fields));
if local_doc_ids.is_empty() {
return Ok(RecordBatch::new_empty(out_schema));
}
for &d in local_doc_ids {
if u64::from(d) >= n_docs {
return Err(DataFusionError::Execution(format!(
"doc id {d} out of range (n_docs={n_docs})"
)));
}
}
let (sorted, selection) = row_selection_for_ids(local_doc_ids);
let mut object_reader = ParquetObjectReader::new(store, path);
if let Some(size) = file_size.filter(|&s| s > 0) {
object_reader = object_reader.with_file_size(size);
}
let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
.await
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
let mask = ProjectionMask::roots(builder.parquet_schema(), col_indices.iter().copied());
let stream = builder
.with_projection(mask)
.with_row_selection(selection)
.build()
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
let batches: Vec<RecordBatch> = stream
.try_collect()
.await
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
if batches.is_empty() {
return Ok(RecordBatch::new_empty(out_schema));
}
let read_schema = batches[0].schema();
let selected = concat_batches(&read_schema, &batches)
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
let indices = rank_back_indices(local_doc_ids, &sorted);
let mut columns: Vec<ArrayRef> = Vec::with_capacity(names.len());
for &name in names {
let idx = selected
.schema()
.index_of(name)
.map_err(|_| DataFusionError::Execution(format!("unknown column {name}")))?;
columns.push(
take(selected.column(idx), &indices, None)
.map_err(|e| DataFusionError::Execution(e.to_string()))?,
);
}
RecordBatch::try_new(out_schema, columns).map_err(|e| DataFusionError::Execution(e.to_string()))
}
pub(crate) fn arg_to_string(expr: &Expr, what: &str) -> DfResult<String> {
match expr {
Expr::Literal(ScalarValue::Utf8(Some(s)), _)
| Expr::Literal(ScalarValue::LargeUtf8(Some(s)), _)
| Expr::Literal(ScalarValue::Utf8View(Some(s)), _) => Ok(s.clone()),
other => Err(DataFusionError::Plan(format!(
"{what} must be a string literal, got {other:?}"
))),
}
}
pub(crate) fn arg_to_usize(expr: &Expr, what: &str) -> DfResult<usize> {
let n: i64 = match expr {
Expr::Literal(ScalarValue::Int64(Some(n)), _) => *n,
Expr::Literal(ScalarValue::Int32(Some(n)), _) => i64::from(*n),
Expr::Literal(ScalarValue::UInt64(Some(n)), _) => *n as i64,
Expr::Literal(ScalarValue::UInt32(Some(n)), _) => i64::from(*n),
other => {
return Err(DataFusionError::Plan(format!(
"{what} must be an integer literal, got {other:?}"
)));
}
};
usize::try_from(n).map_err(|_| DataFusionError::Plan(format!("{what} must be >= 0, got {n}")))
}
#[cfg(test)]
mod tests {
use arrow_array::{Array, FixedSizeListArray, LargeStringArray};
use arrow_schema::Field;
use bytes::Bytes;
use datafusion::prelude::lit;
use object_store::{ObjectStore, ObjectStoreExt, PutPayload, memory, path::Path as ObjPath};
use rayon::ThreadPoolBuilder;
use tempfile::TempDir;
use super::*;
use crate::{
storage::{LocalFsStorageProvider, StorageProvider},
superfile::{
builder::{BuilderOptions, FtsConfig, SuperfileBuilder, VectorConfig},
fts::reader::BoolMode,
vector::{distance::Metric, rerank_codec::RerankCodec},
},
supertable::{
Supertable, SupertableOptions,
reader_cache::{ColdFetchMode, DiskCacheConfig, DiskCacheStore},
},
test_helpers::{
build_title_batch, decimal128_id_field, decimal128_ids, default_supertable_options,
default_tokenizer as tok,
},
};
#[test]
fn arg_to_string_accepts_utf8_literal_rejects_int() {
assert_eq!(
arg_to_string(&lit("emb"), "column").expect("utf8 literal"),
"emb"
);
assert!(arg_to_string(&lit(3_i64), "column").is_err());
}
#[test]
fn arg_to_string_accepts_large_utf8_and_utf8_view() {
let large = Expr::Literal(ScalarValue::LargeUtf8(Some("body".into())), None);
assert_eq!(arg_to_string(&large, "column").expect("large utf8"), "body");
let view = Expr::Literal(ScalarValue::Utf8View(Some("title".into())), None);
assert_eq!(arg_to_string(&view, "column").expect("utf8 view"), "title");
}
#[test]
fn arg_to_usize_accepts_int_rejects_negative_and_nonint() {
assert_eq!(arg_to_usize(&lit(10_i64), "k").expect("int literal"), 10);
assert!(arg_to_usize(&lit(-1_i64), "k").is_err());
assert!(arg_to_usize(&lit("nope"), "k").is_err());
}
#[test]
fn arg_to_usize_accepts_all_integer_widths() {
let i32e = Expr::Literal(ScalarValue::Int32(Some(7)), None);
let u64e = Expr::Literal(ScalarValue::UInt64(Some(8)), None);
let u32e = Expr::Literal(ScalarValue::UInt32(Some(9)), None);
assert_eq!(arg_to_usize(&i32e, "k").expect("i32"), 7);
assert_eq!(arg_to_usize(&u64e, "k").expect("u64"), 8);
assert_eq!(arg_to_usize(&u32e, "k").expect("u32"), 9);
}
#[test]
fn output_schema_appends_score() {
let s = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
let out = output_schema_with_score(&s);
assert_eq!(out.fields().len(), 2);
assert_eq!(out.field(1).name(), "score");
assert_eq!(out.field(1).data_type(), &DataType::Float32);
}
fn fixed_list_f32(dim: usize) -> DataType {
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dim as i32,
)
}
fn options_title_emb(dim: usize) -> SupertableOptions {
let pool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(1)
.build()
.expect("pool"),
);
let schema = Arc::new(Schema::new(vec![
Field::new("title", DataType::LargeUtf8, false),
Field::new("emb", fixed_list_f32(dim), false),
]));
SupertableOptions::new(
schema,
vec![FtsConfig {
column: "title".into(),
}],
vec![VectorConfig {
column: "emb".into(),
dim,
n_cent: 4,
rot_seed: 7,
metric: Metric::Cosine,
rerank_codec: RerankCodec::Fp32,
}],
Some(tok()),
)
.expect("valid options")
.with_writer_pool(pool)
}
fn build_batch(titles: &[&str], dim: usize, schema: Arc<Schema>) -> RecordBatch {
let n = titles.len();
let title_arr = LargeStringArray::from(titles.to_vec());
let mut flat = Vec::<f32>::with_capacity(n * dim);
for i in 0..n {
for d in 0..dim {
flat.push(if d == i % dim { 1.0 } else { 0.0 });
}
}
let fsl = FixedSizeListArray::try_new(
Arc::new(Field::new("item", DataType::Float32, true)),
dim as i32,
Arc::new(Float32Array::from(flat)) as ArrayRef,
None,
)
.expect("FSL");
RecordBatch::try_new(schema, vec![Arc::new(title_arr), Arc::new(fsl)]).expect("batch")
}
fn demo(dim: usize) -> Supertable {
let st = Supertable::create(options_title_emb(dim)).expect("create");
let mut w = st.writer().expect("writer");
let schema = st.options().schema.clone();
w.append(&build_batch(
&["rust async", "python data", "rust systems", "go routines"],
dim,
schema,
))
.expect("append");
w.commit().expect("commit");
st
}
#[test]
fn resolve_hits_named_id_only_takes_arithmetic_fast_path() {
let st = demo(16);
let batches = st
.reader()
.bm25_search("title", "rust", 10, BoolMode::Or, Some(&["_id"]))
.expect("bm25_search _id");
let b = &batches[0];
assert_eq!(b.num_columns(), 1);
assert_eq!(b.schema().field(0).name(), "_id");
assert_eq!(b.num_rows(), 2, "two docs contain 'rust'");
}
#[test]
fn resolve_hits_named_default_is_id_and_score() {
let st = demo(16);
let batches = st
.reader()
.bm25_search("title", "rust", 10, BoolMode::Or, None)
.expect("bm25_search default");
let b = &batches[0];
assert_eq!(b.num_columns(), 2);
assert_eq!(b.schema().field(0).name(), "_id");
assert_eq!(b.schema().field(1).name(), "score");
}
#[test]
fn resolve_hits_named_scalar_column_decodes_via_resolve_columns() {
let st = demo(16);
let batches = st
.reader()
.bm25_search(
"title",
"rust",
10,
BoolMode::Or,
Some(&["_id", "title", "score"]),
)
.expect("bm25_search title");
let b = &batches[0];
assert_eq!(b.num_columns(), 3);
let titles = b
.column(1)
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("title col");
for i in 0..titles.len() {
assert!(titles.value(i).contains("rust"));
}
}
#[test]
fn resolve_hits_named_unknown_column_errors() {
let st = demo(16);
let res = st
.reader()
.bm25_search("title", "rust", 10, BoolMode::Or, Some(&["nope"]));
assert!(res.is_err(), "unknown projected column must error");
}
#[test]
fn resolve_hits_named_empty_hits_returns_empty_batch() {
let st = demo(16);
let batches = st
.reader()
.bm25_search("title", "nonexistentterm", 10, BoolMode::Or, Some(&["_id"]))
.expect("bm25_search empty");
let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
assert_eq!(total, 0);
}
fn two_hits(reader: &SupertableReader) -> Vec<SuperfileHit> {
let entry = Arc::clone(&reader.manifest().superfiles[0]);
vec![
SuperfileHit {
superfile: entry.uri,
local_doc_id: 0,
score: 1.5,
},
SuperfileHit {
superfile: entry.uri,
local_doc_id: (entry.n_docs - 1) as u32,
score: 0.5,
},
]
}
#[test]
fn resolve_hits_score_only_synthesizes_score_without_decoding_scalars() {
let st = demo(16);
let reader = st.reader();
let hits = two_hits(&reader);
let scalar_schema = reader.options().scalar_schema();
let output_schema = output_schema_with_score(&scalar_schema);
let score_idx = scalar_schema.fields().len();
let batch = reader
.block_on(resolve_hits(
&reader,
&hits,
&scalar_schema,
&output_schema,
Some(&[score_idx]),
))
.expect("score-only resolve");
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.schema().field(0).name(), SCORE_COLUMN);
assert_eq!(batch.num_rows(), hits.len());
let scores = batch
.column(0)
.as_any()
.downcast_ref::<Float32Array>()
.expect("score col");
assert_eq!(scores.value(0), 1.5);
assert_eq!(scores.value(1), 0.5);
}
#[test]
fn resolve_hits_none_projection_returns_all_scalar_columns_and_score() {
let st = demo(16);
let reader = st.reader();
let hits = two_hits(&reader);
let scalar_schema = reader.options().scalar_schema();
let output_schema = output_schema_with_score(&scalar_schema);
let batch = reader
.block_on(resolve_hits(
&reader,
&hits,
&scalar_schema,
&output_schema,
None,
))
.expect("none-projection resolve");
assert_eq!(batch.num_columns(), output_schema.fields().len());
assert_eq!(batch.num_rows(), hits.len());
let last = batch.num_columns() - 1;
assert_eq!(batch.schema().field(last).name(), SCORE_COLUMN);
assert_eq!(batch.schema().field(0).name(), "_id");
}
#[test]
fn resolve_hits_empty_projection_preserves_hit_row_count() {
let st = demo(16);
let reader = st.reader();
let hits = two_hits(&reader);
let scalar_schema = reader.options().scalar_schema();
let output_schema = output_schema_with_score(&scalar_schema);
let batch = reader
.block_on(resolve_hits(
&reader,
&hits,
&scalar_schema,
&output_schema,
Some(&[]),
))
.expect("empty-projection resolve");
assert_eq!(batch.num_columns(), 0);
assert_eq!(batch.num_rows(), hits.len());
}
#[test]
fn resolve_ids_arithmetic_maps_local_ids_via_manifest_span() {
let st = demo(16);
let reader = st.reader();
let entry = Arc::clone(&reader.manifest().superfiles[0]);
let last = (entry.n_docs - 1) as u32;
let hits = vec![
SuperfileHit {
superfile: entry.uri,
local_doc_id: 0,
score: 0.0,
},
SuperfileHit {
superfile: entry.uri,
local_doc_id: last,
score: 0.0,
},
];
let batch = resolve_ids_arithmetic(&reader, &hits)
.expect("contiguous span => Some")
.expect("ok batch");
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.schema().field(0).name(), "_id");
let ids = batch
.column(0)
.as_any()
.downcast_ref::<Decimal128Array>()
.expect("decimal id col");
assert_eq!(ids.value(0), entry.id_min);
assert_eq!(ids.value(1), entry.id_min + i128::from(last));
}
#[test]
fn resolve_ids_arithmetic_returns_none_when_superfile_absent() {
let st = demo(16);
let reader = st.reader();
let hits = vec![SuperfileHit {
superfile: SuperfileUri::new_v4(),
local_doc_id: 0,
score: 0.0,
}];
assert!(
resolve_ids_arithmetic(&reader, &hits).is_none(),
"unknown superfile must abandon the arithmetic fast path",
);
}
const TITLES: [&str; 5] = ["alpha", "bravo", "charlie", "delta", "echo"];
fn titled_superfile_bytes() -> Bytes {
let schema: Arc<Schema> = Arc::new(Schema::new(vec![
decimal128_id_field("doc_id"),
Field::new("title", DataType::LargeUtf8, false),
]));
let opts = BuilderOptions::new(schema.clone(), "doc_id", vec![], vec![], None);
let mut b = SuperfileBuilder::new(opts).expect("builder");
let ids = decimal128_ids(0..TITLES.len() as u64);
let title = LargeStringArray::from(TITLES.to_vec());
let batch = RecordBatch::try_new(schema, vec![Arc::new(ids), Arc::new(title)])
.expect("build batch");
b.add_batch(&batch, &[]).expect("add_batch");
Bytes::from(b.finish().expect("finish builder"))
}
fn schema_and_n_docs(bytes: &Bytes) -> (SchemaRef, u64) {
let reader = SuperfileReader::open(bytes.clone()).expect("open");
(Arc::clone(reader.schema()), reader.n_docs())
}
async fn object_store_with(bytes: &Bytes) -> (Arc<dyn ObjectStore>, ObjPath) {
let store: Arc<dyn ObjectStore> = Arc::new(memory::InMemory::new());
let path = ObjPath::from("data/seg.sf.parquet");
store
.put(&path, PutPayload::from_bytes(bytes.clone()))
.await
.expect("put superfile into object store");
(store, path)
}
fn titles_of(batch: &RecordBatch) -> Vec<String> {
let arr = batch
.column(0)
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("title col");
(0..arr.len()).map(|i| arr.value(i).to_string()).collect()
}
#[tokio::test]
async fn take_rows_object_store_streams_rows_in_caller_rank_order() {
let bytes = titled_superfile_bytes();
let (schema, n_docs) = schema_and_n_docs(&bytes);
let (store, path) = object_store_with(&bytes).await;
let batch = take_rows_object_store(
store,
path,
Some(bytes.len() as u64),
&schema,
n_docs,
&[2, 0, 3],
&["title"],
)
.await
.expect("take rows");
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.schema().field(0).name(), "title");
assert_eq!(titles_of(&batch), vec!["charlie", "alpha", "delta"]);
}
#[tokio::test]
async fn take_rows_object_store_ranks_back_duplicate_ids() {
let bytes = titled_superfile_bytes();
let (schema, n_docs) = schema_and_n_docs(&bytes);
let (store, path) = object_store_with(&bytes).await;
let batch = take_rows_object_store(
store,
path,
Some(bytes.len() as u64),
&schema,
n_docs,
&[1, 1, 0],
&["title"],
)
.await
.expect("take rows");
assert_eq!(titles_of(&batch), vec!["bravo", "bravo", "alpha"]);
}
#[tokio::test]
async fn take_rows_object_store_discovers_size_when_file_size_none() {
let bytes = titled_superfile_bytes();
let (schema, n_docs) = schema_and_n_docs(&bytes);
let (store, path) = object_store_with(&bytes).await;
let batch = take_rows_object_store(store, path, None, &schema, n_docs, &[4], &["title"])
.await
.expect("take rows");
assert_eq!(titles_of(&batch), vec!["echo"]);
}
#[tokio::test]
async fn take_rows_object_store_empty_ids_returns_empty_batch() {
let bytes = titled_superfile_bytes();
let (schema, n_docs) = schema_and_n_docs(&bytes);
let (store, path) = object_store_with(&bytes).await;
let batch = take_rows_object_store(
store,
path,
Some(bytes.len() as u64),
&schema,
n_docs,
&[],
&["title"],
)
.await
.expect("empty ids");
assert_eq!(batch.num_rows(), 0);
assert_eq!(batch.schema().field(0).name(), "title");
}
#[tokio::test]
async fn take_rows_object_store_out_of_range_id_errors() {
let bytes = titled_superfile_bytes();
let (schema, n_docs) = schema_and_n_docs(&bytes);
let (store, path) = object_store_with(&bytes).await;
let err = take_rows_object_store(
store,
path,
Some(bytes.len() as u64),
&schema,
n_docs,
&[n_docs as u32],
&["title"],
)
.await
.expect_err("doc id past n_docs must error");
assert!(
err.to_string().contains("out of range"),
"expected an out-of-range error, got {err}",
);
}
#[tokio::test]
async fn take_rows_object_store_unknown_column_errors() {
let bytes = titled_superfile_bytes();
let (schema, n_docs) = schema_and_n_docs(&bytes);
let (store, path) = object_store_with(&bytes).await;
let err = take_rows_object_store(
store,
path,
Some(bytes.len() as u64),
&schema,
n_docs,
&[0],
&["nope"],
)
.await
.expect_err("unknown column must error");
assert!(
err.to_string().contains("unknown column"),
"expected an unknown-column error, got {err}",
);
}
fn commit_titles_to(storage: &Arc<dyn StorageProvider>) {
let producer =
Supertable::create(default_supertable_options().with_storage(Arc::clone(storage)))
.expect("create producer");
let mut w = producer.writer().expect("writer");
w.append(&build_title_batch(&[
"rust async",
"python data",
"rust systems",
"go routines",
]))
.expect("append");
w.commit().expect("commit");
}
fn open_cold(
consumer_storage: Arc<dyn StorageProvider>,
cache_dir: &TempDir,
) -> (Arc<DiskCacheStore>, Supertable) {
let cfg = DiskCacheConfig {
cache_root: cache_dir.path().to_path_buf(),
cold_fetch_mode: ColdFetchMode::LazyForegroundWithBackgroundFill,
mmap_cold_threshold_secs: 0,
mmap_sweep_interval_secs: 0,
..Default::default()
};
let cache =
DiskCacheStore::new_unpinned(Arc::clone(&consumer_storage), cfg).expect("cache");
let consumer = Supertable::open(
default_supertable_options()
.with_storage(consumer_storage)
.with_disk_cache(Arc::clone(&cache)),
)
.expect("open consumer");
(cache, consumer)
}
fn cold_consumer() -> (TempDir, TempDir, Arc<DiskCacheStore>, Supertable) {
let storage_dir = TempDir::new().expect("storage tempdir");
let cache_dir = TempDir::new().expect("cache tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(storage_dir.path()).expect("provider"));
commit_titles_to(&storage);
let (cache, consumer) = open_cold(Arc::clone(&storage), &cache_dir);
(storage_dir, cache_dir, cache, consumer)
}
#[test]
fn resolve_columns_cold_path_streams_scalar_via_object_store() {
let (_sd, _cd, cache, consumer) = cold_consumer();
let batches = consumer
.reader()
.bm25_search("title", "rust", 10, BoolMode::Or, Some(&["title", "score"]))
.expect("cold bm25 with scalar projection");
let b = &batches[0];
assert_eq!(b.num_columns(), 2);
assert_eq!(b.schema().field(0).name(), "title");
assert_eq!(b.schema().field(1).name(), "score");
let titles = b
.column(0)
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("title col");
assert_eq!(titles.len(), 2, "two docs contain 'rust'");
for i in 0..titles.len() {
assert!(titles.value(i).contains("rust"));
}
assert!(
cache.stats().n_cold_fetches >= 1,
"scalar resolution must cold-fetch lazy readers; got {}",
cache.stats().n_cold_fetches,
);
}
#[test]
fn resolve_columns_cold_path_empty_hits_opens_no_readers() {
let (_sd, _cd, _cache, consumer) = cold_consumer();
let batches = consumer
.reader()
.bm25_search(
"title",
"nonexistentterm",
10,
BoolMode::Or,
Some(&["title", "score"]),
)
.expect("cold bm25 with no matches");
let rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
assert_eq!(rows, 0);
}
}