use std::{
cmp::Ordering,
collections::BinaryHeap,
env, fs,
fs::File,
hash::{BuildHasher, Hash},
io::{self, BufReader, BufWriter, Error, ErrorKind, Read, Seek, SeekFrom, Write},
mem,
path::{Path, PathBuf},
sync::Arc,
time::{Duration, Instant},
vec::IntoIter,
};
use bumpalo::Bump;
use hashbrown::hash_map::{HashMap as HbHashMap, RawEntryMut};
use memmap2::Mmap;
use rustc_hash::{FxBuildHasher, FxHashMap};
use crate::superfile::{
BuildError,
format::{
self, FST_SEPARATOR,
checksum::{crc32c, crc32c_append},
fts::HEADER_SIZE as FTS_HEADER_SIZE,
},
fts::{
bm25,
dict::{DictBuilder, StreamingDictBuilder},
fst_value::FstValue,
posting::{BLOCK_LEN, Block, EncodedBlock, encode_block},
tokenize::{AsciiLowerTokenizer, Tokenizer},
},
};
type TermIdMap = HbHashMap<&'static str, u32, FxBuildHasher>;
#[derive(Default)]
struct FinishProfile {
enabled: bool,
encode_calls: u64,
encode_df1: u64,
encode_pfor: u64,
encode_total: Duration,
encode_block_build: Duration,
encode_meta_write: Duration,
encode_skip_write: Duration,
encode_block_write: Duration,
fst_insert: Duration,
partition_flush: Duration,
lex_rank_build: Duration,
partition_sort: Duration,
mmap_open: Duration,
scratch_cleanup: Duration,
fst_close: Duration,
postings_close: Duration,
doc_lengths_emit: Duration,
blob_copy: Duration,
}
impl FinishProfile {
fn from_env() -> Self {
Self {
enabled: env::var_os("INFINO_FTS_PROFILE").is_some(),
..Self::default()
}
}
}
pub(crate) const TERM_META_SIZE: usize = 20;
pub(crate) const SKIP_ENTRY_SIZE: usize = 16;
pub(crate) const DOC_LENGTHS_ENTRY_SIZE: usize = 16;
pub const DEFAULT_SPILL_THRESHOLD_BYTES: usize = 256 * 1024 * 1024;
pub const DEFAULT_SPILL_PARTITIONS: usize = 128;
pub const DEFAULT_MAX_PARTITION_BYTES: u64 = 256 * 1024 * 1024;
const PARTITION_BUF_SIZE: usize = 64 * 1024;
const ACCUM_NEW_TERM_FIXED_BYTES: usize = 24;
const ACCUM_POSTING_BYTES: usize = 8;
const RADIX_SORT_MIN_TRIPLES: usize = 256;
const EXTERNAL_MERGE_CHUNK_CAP_TRIPLES: usize = 1024 * 1024;
const SORT_OUTPUT_BATCH_TRIPLES: usize = 4096;
struct ColumnState {
name: String,
doc_lengths: Vec<u32>,
total_tokens: u64,
}
enum ColumnPostings {
InRam {
terms: FxHashMap<Box<str>, Vec<(u32, u32)>>,
bytes: usize,
},
Spilled {
partitions: Vec<SpillPartition>,
term_to_id: TermIdMap,
id_to_term: Vec<&'static str>,
dense_doc_tf: Vec<u32>,
updated_terms: Vec<u32>,
term_arena: Bump,
},
}
impl ColumnPostings {
fn new() -> Self {
Self::InRam {
terms: FxHashMap::default(),
bytes: 0,
}
}
fn is_spilled(&self) -> bool {
matches!(self, Self::Spilled { .. })
}
}
const SPILL_BATCH_TRIPLES: usize = 341;
struct SpillPartition {
path: PathBuf,
writer: Option<BufWriter<File>>,
batch: Vec<Triple>,
}
const TRIPLE_BYTES: usize = mem::size_of::<Triple>();
type Triple = [u32; 3];
#[inline(always)]
fn triple_term_id(t: &Triple) -> u32 {
t[0]
}
#[inline(always)]
fn triple_doc_id(t: &Triple) -> u32 {
t[1]
}
#[inline(always)]
fn triple_tf(t: &Triple) -> u32 {
t[2]
}
#[cfg(not(target_endian = "little"))]
#[inline(always)]
fn write_triple<W: Write>(w: &mut W, term_id: u32, doc_id: u32, tf: u32) -> Result<(), BuildError> {
let mut buf = [0u8; TRIPLE_BYTES];
buf[0..4].copy_from_slice(&term_id.to_le_bytes());
buf[4..8].copy_from_slice(&doc_id.to_le_bytes());
buf[8..12].copy_from_slice(&tf.to_le_bytes());
w.write_all(&buf)?;
Ok(())
}
#[inline(always)]
fn push_triple_batched(
partition: &mut SpillPartition,
term_id: u32,
doc_id: u32,
tf: u32,
) -> Result<(), BuildError> {
partition.batch.push([term_id, doc_id, tf]);
if partition.batch.len() >= SPILL_BATCH_TRIPLES {
flush_partition_batch(partition)?;
}
Ok(())
}
#[inline]
fn flush_partition_batch(partition: &mut SpillPartition) -> Result<(), BuildError> {
if partition.batch.is_empty() {
return Ok(());
}
let writer = partition
.writer
.as_mut()
.expect("partition writer is open before finish");
#[cfg(target_endian = "little")]
{
writer.write_all(bytemuck::cast_slice::<Triple, u8>(&partition.batch))?;
}
#[cfg(not(target_endian = "little"))]
{
for t in &partition.batch {
write_triple(writer, t[0], t[1], t[2])?;
}
}
partition.batch.clear();
Ok(())
}
fn read_partition_triples(path: &Path) -> Result<Vec<Triple>, BuildError> {
let mut bytes = Vec::new();
let mut f = File::open(path)?;
f.read_to_end(&mut bytes)?;
if bytes.is_empty() {
return Ok(Vec::new());
}
if bytes.len() % TRIPLE_BYTES != 0 {
return Err(BuildError::Io(Error::new(
ErrorKind::InvalidData,
format!(
"spill partition {path:?} length {} not a multiple of {}",
bytes.len(),
TRIPLE_BYTES
),
)));
}
#[cfg(target_endian = "little")]
{
let triples: &[Triple] = bytemuck::try_cast_slice(&bytes).map_err(|_| {
BuildError::Io(Error::new(
ErrorKind::InvalidData,
"bytemuck: spill bytes failed alignment for &[Triple]",
))
})?;
Ok(triples.to_vec())
}
#[cfg(not(target_endian = "little"))]
{
let n = bytes.len() / TRIPLE_BYTES;
let mut out = Vec::with_capacity(n);
for i in 0..n {
let off = i * TRIPLE_BYTES;
let t = [
u32::from_le_bytes(
bytes[off..off + 4]
.try_into()
.expect("invariant: 4-byte triple field"),
),
u32::from_le_bytes(
bytes[off + 4..off + 8]
.try_into()
.expect("invariant: 4-byte triple field"),
),
u32::from_le_bytes(
bytes[off + 8..off + 12]
.try_into()
.expect("invariant: 4-byte triple field"),
),
];
out.push(t);
}
Ok(out)
}
}
fn build_lex_rank(id_to_term: &[&str]) -> (Vec<u32>, Vec<u32>) {
let n = id_to_term.len();
let mut by_lex: Vec<u32> = (0..n as u32).collect();
by_lex.sort_unstable_by(|&a, &b| {
id_to_term[a as usize]
.as_bytes()
.cmp(id_to_term[b as usize].as_bytes())
});
let mut rank = vec![0u32; n];
for (r, id) in by_lex.iter().enumerate() {
rank[*id as usize] = r as u32;
}
(rank, by_lex)
}
#[inline(always)]
fn compute_hash<Q: Hash + ?Sized, S: BuildHasher>(hash_builder: &S, key: &Q) -> u64 {
hash_builder.hash_one(key)
}
#[inline(always)]
fn intern_term_id(
term_to_id: &mut TermIdMap,
id_to_term: &mut Vec<&'static str>,
arena: &Bump,
term: &str,
) -> (u32, bool) {
let hash = compute_hash(term_to_id.hasher(), term);
match term_to_id
.raw_entry_mut()
.from_hash(hash, |existing| *existing == term)
{
RawEntryMut::Occupied(entry) => (*entry.get(), false),
RawEntryMut::Vacant(entry) => {
let id = id_to_term.len() as u32;
let arena_str: &str = arena.alloc_str(term);
let static_str: &'static str = unsafe { std::mem::transmute(arena_str) };
id_to_term.push(static_str);
entry.insert_hashed_nocheck(hash, static_str, id);
(id, true)
}
}
}
struct MergeEntry {
sort_key: u64,
term_id: u32,
tf: u32,
reader_idx: usize,
}
impl PartialEq for MergeEntry {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl Eq for MergeEntry {}
impl PartialOrd for MergeEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for MergeEntry {
fn cmp(&self, other: &Self) -> Ordering {
other
.sort_key
.cmp(&self.sort_key)
.then(other.reader_idx.cmp(&self.reader_idx))
}
}
#[inline(always)]
fn pack_sort_key(lex_rank: u32, doc_id: u32) -> u64 {
((lex_rank as u64) << 32) | (doc_id as u64)
}
enum PartitionIter {
InMemory(IntoIter<Triple>),
Merge {
readers: Vec<BufReader<File>>,
heap: BinaryHeap<MergeEntry>,
_chunk_paths: Vec<PathBuf>,
},
}
impl PartitionIter {
fn next_with(&mut self, lex_rank: &[u32]) -> Option<Result<Triple, BuildError>> {
match self {
PartitionIter::InMemory(it) => it.next().map(Ok),
PartitionIter::Merge { readers, heap, .. } => {
let MergeEntry {
sort_key,
term_id,
tf,
reader_idx,
} = heap.pop()?;
let popped: Triple = [term_id, sort_key as u32, tf];
match read_one_triple(&mut readers[reader_idx]) {
Ok(Some(next_t)) => {
let next_id = triple_term_id(&next_t);
let next_doc = triple_doc_id(&next_t);
let key = pack_sort_key(lex_rank[next_id as usize], next_doc);
heap.push(MergeEntry {
sort_key: key,
term_id: next_id,
tf: triple_tf(&next_t),
reader_idx,
});
}
Ok(None) => { }
Err(e) => return Some(Err(e)),
}
Some(Ok(popped))
}
}
}
}
fn read_one_triple<R: Read>(r: &mut R) -> Result<Option<Triple>, BuildError> {
let mut buf = [0u8; TRIPLE_BYTES];
match r.read_exact(&mut buf) {
Ok(()) => {}
Err(e) if e.kind() == ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(BuildError::Io(e)),
}
Ok(Some([
u32::from_le_bytes(buf[0..4].try_into().expect("slice len 4")),
u32::from_le_bytes(buf[4..8].try_into().expect("slice len 4")),
u32::from_le_bytes(buf[8..12].try_into().expect("slice len 4")),
]))
}
fn write_triples_sorted(triples: &[Triple], path: &Path) -> Result<(), BuildError> {
let mut w = BufWriter::with_capacity(PARTITION_BUF_SIZE, File::create(path)?);
#[cfg(target_endian = "little")]
{
let bytes: &[u8] = bytemuck::cast_slice(triples);
w.write_all(bytes)?;
}
#[cfg(not(target_endian = "little"))]
{
for t in triples {
write_triple(&mut w, t[0], t[1], t[2])?;
}
}
w.flush()?;
Ok(())
}
fn spill_sorted_chunk(
chunk: &mut Vec<Triple>,
scratch_dir: &Path,
partition_label: &str,
chunk_idx: usize,
lex_rank: &[u32],
out_paths: &mut Vec<PathBuf>,
) -> Result<(), BuildError> {
radix_sort_triples_by_lex_rank(chunk, lex_rank);
let path = scratch_dir.join(format!("{partition_label}_sorted{chunk_idx}.bin"));
write_triples_sorted(chunk, &path)?;
chunk.clear();
#[cfg(test)]
finish_debug::record_chunk_path(&path);
out_paths.push(path);
Ok(())
}
#[cfg(test)]
mod finish_debug {
use std::{
cell::RefCell,
path::{Path, PathBuf},
};
thread_local! {
static OBSERVED_CHUNKS: RefCell<Vec<PathBuf>> = const { RefCell::new(Vec::new()) };
}
pub fn reset() {
OBSERVED_CHUNKS.with(|c| c.borrow_mut().clear());
}
pub fn record_chunk_path(path: &Path) {
OBSERVED_CHUNKS.with(|c| c.borrow_mut().push(path.to_path_buf()));
}
pub fn observed() -> Vec<PathBuf> {
OBSERVED_CHUNKS.with(|c| c.borrow().clone())
}
}
fn radix_sort_triples_by_lex_rank(triples: &mut Vec<Triple>, lex_rank: &[u32]) {
let n = triples.len();
if n < RADIX_SORT_MIN_TRIPLES {
triples.sort_unstable_by(|a, b| {
lex_rank[triple_term_id(a) as usize]
.cmp(&lex_rank[triple_term_id(b) as usize])
.then(triple_doc_id(a).cmp(&triple_doc_id(b)))
});
return;
}
let vocab_size = lex_rank.len();
let mut offsets: Vec<u32> = vec![0u32; vocab_size + 1];
for t in triples.iter() {
let rank = unsafe { *lex_rank.get_unchecked(t[0] as usize) } as usize;
offsets[rank] = offsets[rank].wrapping_add(1);
}
let mut sum: u32 = 0;
for c in offsets.iter_mut() {
let tmp = *c;
*c = sum;
sum = sum.wrapping_add(tmp);
}
debug_assert_eq!(sum as usize, n, "histogram total != triple count");
let mut out: Vec<Triple> = vec![[0u32; 3]; n];
for t in triples.iter() {
let rank = unsafe { *lex_rank.get_unchecked(t[0] as usize) } as usize;
let dst = unsafe { *offsets.get_unchecked(rank) } as usize;
unsafe {
*out.get_unchecked_mut(dst) = *t;
*offsets.get_unchecked_mut(rank) = (dst as u32).wrapping_add(1);
}
}
*triples = out;
}
fn open_partition_sorted(
partition_path: &Path,
max_partition_bytes: u64,
scratch_dir: &Path,
partition_label: &str,
lex_rank: &[u32],
) -> Result<PartitionIter, BuildError> {
let len = fs::metadata(partition_path)?.len();
if len <= max_partition_bytes {
let mut triples = read_partition_triples(partition_path)?;
radix_sort_triples_by_lex_rank(&mut triples, lex_rank);
return Ok(PartitionIter::InMemory(triples.into_iter()));
}
let chunk_triples = (max_partition_bytes as usize) / TRIPLE_BYTES;
let mut sorted_chunk_paths: Vec<PathBuf> = Vec::new();
let mut r = BufReader::with_capacity(PARTITION_BUF_SIZE, File::open(partition_path)?);
let mut chunk: Vec<Triple> =
Vec::with_capacity(chunk_triples.min(EXTERNAL_MERGE_CHUNK_CAP_TRIPLES));
let mut chunk_idx: usize = 0;
while let Some(t) = read_one_triple(&mut r)? {
chunk.push(t);
if chunk.len() >= chunk_triples {
spill_sorted_chunk(
&mut chunk,
scratch_dir,
partition_label,
chunk_idx,
lex_rank,
&mut sorted_chunk_paths,
)?;
chunk_idx += 1;
}
}
if !chunk.is_empty() {
spill_sorted_chunk(
&mut chunk,
scratch_dir,
partition_label,
chunk_idx,
lex_rank,
&mut sorted_chunk_paths,
)?;
}
let mut readers: Vec<BufReader<File>> = Vec::with_capacity(sorted_chunk_paths.len());
for p in &sorted_chunk_paths {
readers.push(BufReader::with_capacity(PARTITION_BUF_SIZE, File::open(p)?));
}
let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::with_capacity(readers.len());
for (idx, reader) in readers.iter_mut().enumerate() {
if let Some(t) = read_one_triple(reader)? {
let term_id = triple_term_id(&t);
let doc_id = triple_doc_id(&t);
heap.push(MergeEntry {
sort_key: pack_sort_key(lex_rank[term_id as usize], doc_id),
term_id,
tf: triple_tf(&t),
reader_idx: idx,
});
}
}
Ok(PartitionIter::Merge {
readers,
heap,
_chunk_paths: sorted_chunk_paths,
})
}
pub struct FtsBuilder {
tokenizer: Arc<dyn Tokenizer>,
columns: Vec<ColumnState>,
postings: Vec<ColumnPostings>,
scratch_dir: tempfile::TempDir,
spill_threshold_bytes: usize,
spill_partitions: usize,
max_partition_bytes: u64,
n_docs: u32,
bump: Bump,
}
impl FtsBuilder {
pub fn new(tokenizer: Arc<dyn Tokenizer>) -> Self {
let scratch_dir = tempfile::tempdir().expect("create FtsBuilder scratch tempdir");
Self::from_parts(tokenizer, scratch_dir)
}
pub fn with_scratch(
tokenizer: Arc<dyn Tokenizer>,
scratch: PathBuf,
) -> Result<Self, BuildError> {
let scratch_dir = tempfile::Builder::new()
.prefix("infino-fts-")
.tempdir_in(&scratch)?;
Ok(Self::from_parts(tokenizer, scratch_dir))
}
fn from_parts(tokenizer: Arc<dyn Tokenizer>, scratch_dir: tempfile::TempDir) -> Self {
Self {
tokenizer,
columns: Vec::new(),
postings: Vec::new(),
scratch_dir,
spill_threshold_bytes: DEFAULT_SPILL_THRESHOLD_BYTES,
spill_partitions: DEFAULT_SPILL_PARTITIONS,
max_partition_bytes: DEFAULT_MAX_PARTITION_BYTES,
n_docs: 0,
bump: Bump::new(),
}
}
pub fn set_spill_threshold_bytes(&mut self, threshold: usize) {
assert!(
threshold > 0,
"FtsBuilder: spill_threshold_bytes must be > 0"
);
self.spill_threshold_bytes = threshold;
}
pub fn set_spill_partitions(&mut self, n: usize) -> Result<(), BuildError> {
if !self.columns.is_empty() {
return Err(BuildError::Io(Error::new(
ErrorKind::InvalidInput,
"FtsBuilder::set_spill_partitions must be called before any register_column",
)));
}
if n == 0 {
return Err(BuildError::Io(Error::new(
ErrorKind::InvalidInput,
"FtsBuilder: spill_partitions must be ≥ 1",
)));
}
if !n.is_power_of_two() {
return Err(BuildError::Io(Error::new(
ErrorKind::InvalidInput,
format!("FtsBuilder: spill_partitions must be a power of two; got {n}"),
)));
}
self.spill_partitions = n;
Ok(())
}
pub fn set_max_partition_bytes(&mut self, bytes: u64) {
assert!(bytes > 0, "FtsBuilder: max_partition_bytes must be > 0");
self.max_partition_bytes = bytes;
}
pub fn register_column(&mut self, name: String) -> Result<u32, BuildError> {
if name.as_bytes().contains(&FST_SEPARATOR) {
return Err(BuildError::ReservedSeparatorInColumnName(name));
}
if name.starts_with(format::RESERVED_PREFIX) {
return Err(BuildError::ReservedPrefixInColumnName(name));
}
if self.columns.iter().any(|c| c.name == name) {
return Err(BuildError::DuplicateColumnName(name));
}
let column_id = self.columns.len() as u32;
self.columns.push(ColumnState {
name,
doc_lengths: Vec::new(),
total_tokens: 0,
});
self.postings.push(ColumnPostings::new());
Ok(column_id)
}
fn open_partitions_for_column(
scratch_dir: &Path,
column_id: u32,
n_partitions: usize,
) -> Result<Vec<SpillPartition>, BuildError> {
let mut partitions = Vec::with_capacity(n_partitions);
for partition in 0..n_partitions {
let path = scratch_dir.join(format!("fts_col{column_id}_part{partition}.bin"));
let file = File::create(&path)?;
partitions.push(SpillPartition {
path,
writer: Some(BufWriter::with_capacity(PARTITION_BUF_SIZE, file)),
batch: Vec::with_capacity(SPILL_BATCH_TRIPLES),
});
}
Ok(partitions)
}
fn flush_in_ram_to_partitions(
terms: FxHashMap<Box<str>, Vec<(u32, u32)>>,
partitions: &mut [SpillPartition],
term_to_id: &mut TermIdMap,
id_to_term: &mut Vec<&'static str>,
arena: &Bump,
) -> Result<(), BuildError> {
let n_part = partitions.len();
debug_assert!(
n_part.is_power_of_two(),
"spill_partitions must be a power of 2; got {n_part}"
);
let mask = n_part - 1;
for (term, postings) in terms {
let (term_id, _is_new) = intern_term_id(term_to_id, id_to_term, arena, &term);
let p = (term_id as usize) & mask;
for (doc_id, tf) in postings {
push_triple_batched(&mut partitions[p], term_id, doc_id, tf)?;
}
}
Ok(())
}
pub fn add_doc(
&mut self,
column_id: u32,
local_doc_id: u32,
text: &str,
) -> Result<(), BuildError> {
let col_idx = column_id as usize;
if col_idx >= self.columns.len() {
return Err(BuildError::FtsColumnTypeInvalid {
column: format!("(unregistered column_id {column_id})"),
actual: "n/a".to_string(),
});
}
debug_assert!(
local_doc_id as usize == self.columns[col_idx].doc_lengths.len(),
"FtsBuilder::add_doc: local_doc_id ({local_doc_id}) must equal \
this column's next index ({}); doc_ids must be consecutive \
from 0 within a column",
self.columns[col_idx].doc_lengths.len(),
);
if self.postings[col_idx].is_spilled() {
self.add_doc_spilled(col_idx, local_doc_id, text)
} else {
self.add_doc_inram(col_idx, local_doc_id, text)
}
}
#[inline(always)]
fn add_doc_spilled(
&mut self,
col_idx: usize,
local_doc_id: u32,
text: &str,
) -> Result<(), BuildError> {
let tokenizer = &self.tokenizer;
let ascii_tok = tokenizer
.as_ref()
.as_any()
.downcast_ref::<AsciiLowerTokenizer>();
let mut tokens_in_doc: u64 = 0;
let col_post = &mut self.postings[col_idx];
let (partitions, term_to_id, id_to_term, dense_doc_tf, updated_terms, term_arena) =
match col_post {
ColumnPostings::Spilled {
partitions,
term_to_id,
id_to_term,
dense_doc_tf,
updated_terms,
term_arena,
} => (
partitions,
term_to_id,
id_to_term,
dense_doc_tf,
updated_terms,
term_arena,
),
ColumnPostings::InRam { .. } => {
unreachable!("add_doc_spilled called on InRam column")
}
};
let n_part = partitions.len();
debug_assert!(
n_part.is_power_of_two(),
"spill_partitions must be a power of 2"
);
let mask = n_part - 1;
updated_terms.clear();
let mut on_token = |tok: &str| {
tokens_in_doc += 1;
let (term_id, is_new) = intern_term_id(term_to_id, id_to_term, term_arena, tok);
let idx = term_id as usize;
if is_new {
debug_assert_eq!(idx, dense_doc_tf.len());
dense_doc_tf.push(0);
}
let slot = unsafe { dense_doc_tf.get_unchecked_mut(idx) };
if *slot == 0 {
updated_terms.push(term_id);
}
*slot += 1;
};
if let Some(ascii) = ascii_tok {
ascii.tokenize_each_inline(text, &mut on_token);
} else {
tokenizer.tokenize_each(text, &mut on_token);
}
let col = &mut self.columns[col_idx];
let dl_clamped: u32 = tokens_in_doc.min(u32::MAX as u64) as u32;
col.doc_lengths.push(dl_clamped);
col.total_tokens = col.total_tokens.saturating_add(tokens_in_doc);
let docs_now = local_doc_id.saturating_add(1);
if docs_now > self.n_docs {
self.n_docs = docs_now;
}
for &term_id in updated_terms.iter() {
let idx = term_id as usize;
let slot = unsafe { dense_doc_tf.get_unchecked_mut(idx) };
let tf = *slot;
*slot = 0;
let p = (term_id as usize) & mask;
let partition = unsafe { partitions.get_unchecked_mut(p) };
push_triple_batched(partition, term_id, local_doc_id, tf)?;
}
Ok(())
}
#[inline(always)]
fn add_doc_inram(
&mut self,
col_idx: usize,
local_doc_id: u32,
text: &str,
) -> Result<(), BuildError> {
let tokenizer = &self.tokenizer;
let ascii_tok = tokenizer
.as_ref()
.as_any()
.downcast_ref::<AsciiLowerTokenizer>();
let mut tokens_in_doc: u64 = 0;
self.bump.reset();
let bump = &self.bump;
let mut tf_per_term: HbHashMap<&'static str, u32, FxBuildHasher> =
HbHashMap::with_hasher(FxBuildHasher);
let mut on_token = |tok: &str| {
tokens_in_doc += 1;
let hash = compute_hash(tf_per_term.hasher(), tok);
match tf_per_term
.raw_entry_mut()
.from_hash(hash, |existing| *existing == tok)
{
RawEntryMut::Occupied(mut e) => {
*e.get_mut() += 1;
}
RawEntryMut::Vacant(e) => {
let bumped: &str = bump.alloc_str(tok);
let extended: &'static str = unsafe { std::mem::transmute(bumped) };
e.insert_hashed_nocheck(hash, extended, 1);
}
}
};
if let Some(ascii) = ascii_tok {
ascii.tokenize_each_inline(text, &mut on_token);
} else {
tokenizer.tokenize_each(text, &mut on_token);
}
let col = &mut self.columns[col_idx];
let dl_clamped: u32 = tokens_in_doc.min(u32::MAX as u64) as u32;
col.doc_lengths.push(dl_clamped);
col.total_tokens = col.total_tokens.saturating_add(tokens_in_doc);
let docs_now = local_doc_id.saturating_add(1);
if docs_now > self.n_docs {
self.n_docs = docs_now;
}
let column_id = col_idx as u32;
let col_post = &mut self.postings[col_idx];
let (terms, bytes) = match col_post {
ColumnPostings::InRam { terms, bytes } => (terms, bytes),
ColumnPostings::Spilled { .. } => {
unreachable!("add_doc_inram called on Spilled column")
}
};
let mut new_bytes: usize = 0;
for (term, tf) in tf_per_term {
let term_len = term.len();
match terms.get_mut(term) {
Some(acc) => {
acc.push((local_doc_id, tf));
new_bytes = new_bytes.saturating_add(ACCUM_POSTING_BYTES);
}
None => {
terms.insert(Box::<str>::from(term), vec![(local_doc_id, tf)]);
new_bytes = new_bytes.saturating_add(
ACCUM_NEW_TERM_FIXED_BYTES + term_len + ACCUM_POSTING_BYTES,
);
}
}
}
let new_total = bytes.saturating_add(new_bytes);
if new_total > self.spill_threshold_bytes {
let drained = mem::take(terms);
let mut partitions = Self::open_partitions_for_column(
self.scratch_dir.path(),
column_id,
self.spill_partitions,
)?;
let term_arena = Bump::new();
let mut term_to_id: TermIdMap = TermIdMap::default();
let mut id_to_term: Vec<&'static str> = Vec::with_capacity(drained.len());
Self::flush_in_ram_to_partitions(
drained,
&mut partitions,
&mut term_to_id,
&mut id_to_term,
&term_arena,
)?;
let dense_doc_tf = vec![0u32; id_to_term.len()];
let updated_terms: Vec<u32> = Vec::new();
*col_post = ColumnPostings::Spilled {
partitions,
term_to_id,
id_to_term,
dense_doc_tf,
updated_terms,
term_arena,
};
} else {
*bytes = new_total;
}
Ok(())
}
pub fn finish(self) -> Result<Vec<u8>, BuildError> {
let mut blob = Vec::new();
self.finish_to(&mut blob)?;
Ok(blob)
}
pub fn finish_to<W: Write>(self, w: W) -> Result<(), BuildError> {
if self.postings.iter().any(|c| c.is_spilled()) {
self.finish_to_spilled(w)
} else {
self.finish_to_inram(w)
}
}
fn finish_to_inram<W: Write>(self, mut w: W) -> Result<(), BuildError> {
let FtsBuilder {
tokenizer: _,
columns,
postings,
scratch_dir,
spill_threshold_bytes: _,
spill_partitions: _,
max_partition_bytes: _,
n_docs,
bump: _,
} = self;
let n_columns = columns.len() as u32;
let mut n_terms_total_usize: usize = 0;
let mut work: Vec<(usize, ColumnState, ColumnPostings)> = columns
.into_iter()
.zip(postings)
.enumerate()
.map(|(orig_idx, (state, posting_state))| (orig_idx, state, posting_state))
.collect();
work.sort_unstable_by(|a, b| a.1.name.cmp(&b.1.name));
let mut avgdl_per_col: Vec<f32> = vec![0.0; n_columns as usize];
for (orig_idx, state, _) in &work {
let n = state.doc_lengths.len() as u64;
avgdl_per_col[*orig_idx] = if n == 0 {
0.0
} else {
(state.total_tokens as f32) / (n as f32)
};
}
let scratch_path = scratch_dir.path().to_path_buf();
let postings_path = scratch_path.join("infino_fts_postings.bin");
let mut postings_writer = BufWriter::new(File::create(&postings_path)?);
let mut postings_len: u64 = 0;
let mut postings_crc_acc: u32 = 0;
let mut key_buf: Vec<u8> = Vec::with_capacity(64);
let mut term_scratch = TermScratch::default();
let mut finish_profile = FinishProfile::from_env();
let mut fst_inram = DictBuilder::new();
let mut doc_lengths_by_orig_col: Vec<Option<Vec<u32>>> =
(0..n_columns as usize).map(|_| None).collect();
for (orig_col_idx, col_state, posting_state) in work.drain(..) {
let ColumnState {
name: col_name,
doc_lengths: col_doc_lengths_owned,
total_tokens: _,
} = col_state;
let col_name_bytes = col_name.as_bytes();
let avgdl = avgdl_per_col[orig_col_idx];
let col_doc_lengths: &[u32] = &col_doc_lengths_owned;
let terms = match posting_state {
ColumnPostings::InRam { terms, bytes: _ } => terms,
ColumnPostings::Spilled { .. } => unreachable!(
"finish_to_inram dispatched on !any_spilled; \
Spilled column cannot appear here"
),
};
type InRamEntries = Vec<(Box<str>, Vec<(u32, u32)>)>;
let mut entries: InRamEntries = terms.into_iter().collect();
entries.sort_unstable_by(|a, b| a.0.as_bytes().cmp(b.0.as_bytes()));
for (term, postings) in entries {
encode_and_emit_term(
&term,
&postings,
col_name_bytes,
col_doc_lengths,
avgdl,
n_docs,
&mut key_buf,
&mut postings_writer,
&mut postings_crc_acc,
&mut postings_len,
Some(&mut fst_inram),
None,
&mut finish_profile,
&mut term_scratch,
)?;
n_terms_total_usize += 1;
}
doc_lengths_by_orig_col[orig_col_idx] = Some(col_doc_lengths_owned);
}
assemble_and_write_blob(
BlobAssemblyInputs {
postings_writer,
postings_path,
postings_crc_acc,
postings_len,
fst_sink: FstSinkFinish::InRam(fst_inram),
n_columns,
n_docs,
n_terms_total_usize,
avgdl_per_col,
doc_lengths_by_orig_col,
scratch_dir,
finish_profile,
},
&mut w,
)
}
fn finish_to_spilled<W: Write>(self, mut w: W) -> Result<(), BuildError> {
let FtsBuilder {
tokenizer: _,
columns,
postings,
scratch_dir,
spill_threshold_bytes: _,
spill_partitions: _,
max_partition_bytes,
n_docs,
bump: _,
} = self;
let n_columns = columns.len() as u32;
let mut n_terms_total_usize: usize = 0;
let mut work: Vec<(usize, ColumnState, ColumnPostings)> = columns
.into_iter()
.zip(postings)
.enumerate()
.map(|(orig_idx, (state, posting_state))| (orig_idx, state, posting_state))
.collect();
work.sort_unstable_by(|a, b| a.1.name.cmp(&b.1.name));
let mut avgdl_per_col: Vec<f32> = vec![0.0; n_columns as usize];
for (orig_idx, state, _) in &work {
let n = state.doc_lengths.len() as u64;
avgdl_per_col[*orig_idx] = if n == 0 {
0.0
} else {
(state.total_tokens as f32) / (n as f32)
};
}
let scratch_path = scratch_dir.path().to_path_buf();
let postings_path = scratch_path.join("infino_fts_postings.bin");
let mut postings_writer = BufWriter::new(File::create(&postings_path)?);
let mut postings_len: u64 = 0;
let mut postings_crc_acc: u32 = 0;
let mut key_buf: Vec<u8> = Vec::with_capacity(64);
let mut term_scratch = TermScratch::default();
let mut finish_profile = FinishProfile::from_env();
let fst_streaming_path = scratch_path.join("infino_fts_dict.bin");
let mut fst_streaming = {
let fst_file = File::create(&fst_streaming_path)?;
let bw = BufWriter::new(fst_file);
StreamingDictBuilder::new(bw).map_err(map_fst_err)?
};
let partition_flush_start = finish_profile.enabled.then(Instant::now);
for (_, _, cp) in &mut work {
if let ColumnPostings::Spilled { partitions, .. } = cp {
for partition in partitions {
flush_partition_batch(partition)?;
if let Some(mut writer) = partition.writer.take() {
writer.flush()?;
}
}
}
}
if let Some(t) = partition_flush_start {
finish_profile.partition_flush += t.elapsed();
}
let mut doc_lengths_by_orig_col: Vec<Option<Vec<u32>>> =
(0..n_columns as usize).map(|_| None).collect();
for (orig_col_idx, col_state, posting_state) in work.drain(..) {
let ColumnState {
name: col_name,
doc_lengths: col_doc_lengths_owned,
total_tokens: _,
} = col_state;
let col_name_bytes = col_name.as_bytes();
let avgdl = avgdl_per_col[orig_col_idx];
let col_doc_lengths: &[u32] = &col_doc_lengths_owned;
match posting_state {
ColumnPostings::InRam { terms, bytes: _ } => {
type InRamEntries = Vec<(Box<str>, Vec<(u32, u32)>)>;
let mut entries: InRamEntries = terms.into_iter().collect();
entries.sort_unstable_by(|a, b| a.0.as_bytes().cmp(b.0.as_bytes()));
for (term, postings) in entries {
encode_and_emit_term(
&term,
&postings,
col_name_bytes,
col_doc_lengths,
avgdl,
n_docs,
&mut key_buf,
&mut postings_writer,
&mut postings_crc_acc,
&mut postings_len,
None,
Some(&mut fst_streaming),
&mut finish_profile,
&mut term_scratch,
)?;
n_terms_total_usize += 1;
}
}
ColumnPostings::Spilled {
partitions,
term_to_id,
id_to_term,
dense_doc_tf: _,
updated_terms: _,
term_arena,
} => {
drop(term_to_id);
let lex_rank_start = finish_profile.enabled.then(Instant::now);
let (lex_rank, term_id_in_lex_order) = build_lex_rank(&id_to_term);
if let Some(t) = lex_rank_start {
finish_profile.lex_rank_build += t.elapsed();
}
let sort_start = finish_profile.enabled.then(Instant::now);
let mut sorted_files: Vec<PathBuf> = Vec::with_capacity(partitions.len());
for (partition_idx, partition) in partitions.iter().enumerate() {
let sorted_path = scratch_path.join(format!(
"fts_col{orig_col_idx}_part{partition_idx}.sorted.bin"
));
sort_partition_to_file(
&partition.path,
&sorted_path,
max_partition_bytes,
&scratch_path,
&format!("c{orig_col_idx}_p{partition_idx}"),
&lex_rank,
)?;
sorted_files.push(sorted_path);
}
if let Some(t) = sort_start {
finish_profile.partition_sort += t.elapsed();
}
let mmap_start = finish_profile.enabled.then(Instant::now);
let mut mmaps: Vec<Mmap> = Vec::with_capacity(sorted_files.len());
for p in &sorted_files {
let f = File::open(p)?;
let mmap = unsafe { Mmap::map(&f)? };
mmaps.push(mmap);
}
if let Some(t) = mmap_start {
finish_profile.mmap_open += t.elapsed();
}
let sorted_slices: Vec<&[Triple]> = mmaps
.iter()
.map(|m| {
if m.is_empty() {
&[][..]
} else {
bytemuck::cast_slice::<u8, Triple>(&m[..])
}
})
.collect();
let mask = (sorted_slices.len() - 1) as u32;
let mut cursors: Vec<usize> = vec![0usize; sorted_slices.len()];
let mut group: Vec<(u32, u32)> = Vec::new();
let merge_profile_start = Instant::now();
let encode_calls_before = finish_profile.encode_calls;
let encode_df1_before = finish_profile.encode_df1;
let encode_pfor_before = finish_profile.encode_pfor;
let encode_total_before = finish_profile.encode_total;
let encode_block_build_before = finish_profile.encode_block_build;
let encode_meta_write_before = finish_profile.encode_meta_write;
let encode_skip_write_before = finish_profile.encode_skip_write;
let encode_block_write_before = finish_profile.encode_block_write;
let fst_insert_before = finish_profile.fst_insert;
for &term_id in &term_id_in_lex_order {
let p = (term_id & mask) as usize;
let slice = sorted_slices[p];
let mut pos = cursors[p];
group.clear();
while pos < slice.len() {
let t = &slice[pos];
if triple_term_id(t) != term_id {
break;
}
group.push((triple_doc_id(t), triple_tf(t)));
pos += 1;
}
cursors[p] = pos;
if group.is_empty() {
continue;
}
let term_bytes: &str = id_to_term[term_id as usize];
encode_and_emit_term(
term_bytes,
&group,
col_name_bytes,
col_doc_lengths,
avgdl,
n_docs,
&mut key_buf,
&mut postings_writer,
&mut postings_crc_acc,
&mut postings_len,
None,
Some(&mut fst_streaming),
&mut finish_profile,
&mut term_scratch,
)?;
n_terms_total_usize += 1;
}
debug_assert!(
cursors
.iter()
.zip(sorted_slices.iter())
.all(|(c, s)| *c == s.len()),
"lex-order partition traversal did not drain all triples; \
partition assignment or sort invariant violated"
);
if finish_profile.enabled {
let merge_total = merge_profile_start.elapsed();
let encode_total = finish_profile.encode_total - encode_total_before;
let non_encode = merge_total.saturating_sub(encode_total);
eprintln!(
"[fts-profile] col='{}' merge_total={:.3}s non_encode_merge={:.3}s encode_total={:.3}s calls={} df1={} pfor={} block_build={:.3}s meta_write={:.3}s skip_write={:.3}s block_write={:.3}s fst_insert={:.3}s",
col_name,
merge_total.as_secs_f64(),
non_encode.as_secs_f64(),
encode_total.as_secs_f64(),
finish_profile.encode_calls - encode_calls_before,
finish_profile.encode_df1 - encode_df1_before,
finish_profile.encode_pfor - encode_pfor_before,
(finish_profile.encode_block_build - encode_block_build_before)
.as_secs_f64(),
(finish_profile.encode_meta_write - encode_meta_write_before)
.as_secs_f64(),
(finish_profile.encode_skip_write - encode_skip_write_before)
.as_secs_f64(),
(finish_profile.encode_block_write - encode_block_write_before)
.as_secs_f64(),
(finish_profile.fst_insert - fst_insert_before).as_secs_f64(),
);
}
let cleanup_start = finish_profile.enabled.then(Instant::now);
drop(sorted_slices);
drop(mmaps);
for p in &sorted_files {
let _ = fs::remove_file(p);
}
drop(partitions);
drop(id_to_term);
drop(term_arena);
drop(lex_rank);
if let Some(t) = cleanup_start {
finish_profile.scratch_cleanup += t.elapsed();
}
}
}
doc_lengths_by_orig_col[orig_col_idx] = Some(col_doc_lengths_owned);
}
assemble_and_write_blob(
BlobAssemblyInputs {
postings_writer,
postings_path,
postings_crc_acc,
postings_len,
fst_sink: FstSinkFinish::Streaming {
builder: fst_streaming,
path: fst_streaming_path,
},
n_columns,
n_docs,
n_terms_total_usize,
avgdl_per_col,
doc_lengths_by_orig_col,
scratch_dir,
finish_profile,
},
&mut w,
)
}
}
struct BlobAssemblyInputs {
postings_writer: BufWriter<File>,
postings_path: PathBuf,
postings_crc_acc: u32,
postings_len: u64,
fst_sink: FstSinkFinish,
n_columns: u32,
n_docs: u32,
n_terms_total_usize: usize,
avgdl_per_col: Vec<f32>,
doc_lengths_by_orig_col: Vec<Option<Vec<u32>>>,
scratch_dir: tempfile::TempDir,
finish_profile: FinishProfile,
}
enum FstSinkFinish {
InRam(DictBuilder),
Streaming {
builder: StreamingDictBuilder<BufWriter<File>>,
path: PathBuf,
},
}
fn assemble_and_write_blob<W: Write>(
inputs: BlobAssemblyInputs,
w: &mut W,
) -> Result<(), BuildError> {
let BlobAssemblyInputs {
mut postings_writer,
postings_path,
postings_crc_acc,
mut postings_len,
fst_sink,
n_columns,
n_docs,
n_terms_total_usize,
avgdl_per_col,
mut doc_lengths_by_orig_col,
scratch_dir,
mut finish_profile,
} = inputs;
debug_assert!(
n_terms_total_usize <= u32::MAX as usize,
"term count overflows u32"
);
let n_terms_total = n_terms_total_usize as u32;
let postings_close_start = finish_profile.enabled.then(Instant::now);
let postings_crc = postings_crc_acc;
let postings_crc_le = postings_crc.to_le_bytes();
postings_writer.write_all(&postings_crc_le)?;
postings_writer.flush()?;
drop(postings_writer);
postings_len += postings_crc_le.len() as u64;
if let Some(t) = postings_close_start {
finish_profile.postings_close += t.elapsed();
}
enum FstSource {
InRam(Vec<u8>),
Streamed { path: PathBuf, len: u64, crc: u32 },
}
let fst_close_start = finish_profile.enabled.then(Instant::now);
let fst_source = match fst_sink {
FstSinkFinish::InRam(db) => {
let mut bytes = db.finish();
let crc = crc32c(&bytes);
bytes.extend_from_slice(&crc.to_le_bytes());
FstSource::InRam(bytes)
}
FstSinkFinish::Streaming {
builder,
path: fst_streaming_path,
} => {
let mut bw = builder.finish().map_err(map_fst_err)?;
bw.flush()?;
let write_file = bw
.into_inner()
.map_err(|e| BuildError::Io(e.into_error()))?;
drop(write_file);
let mut read_file = File::open(&fst_streaming_path)?;
let fst_body_len = read_file.metadata()?.len();
read_file.seek(SeekFrom::Start(0))?;
let mut reader = BufReader::with_capacity(PARTITION_BUF_SIZE, read_file);
let mut crc: u32 = 0;
let mut buf = vec![0u8; PARTITION_BUF_SIZE];
loop {
let n = match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => n,
Err(e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(BuildError::Io(e)),
};
crc = crc32c_append(crc, &buf[..n]);
}
drop(reader);
FstSource::Streamed {
path: fst_streaming_path,
len: fst_body_len + 4,
crc,
}
}
};
if let Some(t) = fst_close_start {
finish_profile.fst_close += t.elapsed();
}
let dl_emit_start = finish_profile.enabled.then(Instant::now);
let fst_total_len: u64 = match &fst_source {
FstSource::InRam(bytes) => bytes.len() as u64,
FstSource::Streamed { len, .. } => *len,
};
let header_size: u64 = FTS_HEADER_SIZE as u64;
let fst_offset: u64 = header_size;
let postings_offset: u64 = fst_offset + fst_total_len;
let doc_lengths_table_offset: u64 = postings_offset + postings_len;
let mut doc_lengths_array_offset: u64 =
doc_lengths_table_offset + (n_columns as u64) * (DOC_LENGTHS_ENTRY_SIZE as u64) + 4 ;
let mut dir_buf: Vec<u8> = Vec::with_capacity(n_columns as usize * DOC_LENGTHS_ENTRY_SIZE);
let mut arrays_buf: Vec<u8> = Vec::new();
for i in 0..n_columns as usize {
let avgdl_x1000 = (avgdl_per_col[i] * format::fts::AVGDL_FIXED_POINT_SCALE)
.max(0.0)
.min(u32::MAX as f32) as u32;
dir_buf.extend_from_slice(&(i as u32).to_le_bytes());
dir_buf.extend_from_slice(&doc_lengths_array_offset.to_le_bytes());
dir_buf.extend_from_slice(&avgdl_x1000.to_le_bytes());
let col_dls = doc_lengths_by_orig_col[i]
.take()
.expect("doc_lengths recorded for every registered column");
let array_start = arrays_buf.len();
#[cfg(target_endian = "little")]
arrays_buf.extend_from_slice(bytemuck::cast_slice::<u32, u8>(&col_dls));
#[cfg(not(target_endian = "little"))]
for &dl in &col_dls {
arrays_buf.extend_from_slice(&dl.to_le_bytes());
}
let array_bytes = &arrays_buf[array_start..];
let array_crc = crc32c(array_bytes);
arrays_buf.extend_from_slice(&array_crc.to_le_bytes());
doc_lengths_array_offset += (col_dls.len() as u64) * 4 + 4;
}
let dir_crc = crc32c(&dir_buf);
dir_buf.extend_from_slice(&dir_crc.to_le_bytes());
if let Some(t) = dl_emit_start {
finish_profile.doc_lengths_emit += t.elapsed();
}
let blob_copy_start = finish_profile.enabled.then(Instant::now);
let mut header = Vec::with_capacity(header_size as usize);
header.extend_from_slice(format::fts::MAGIC); header.extend_from_slice(&format::fts::VERSION.to_le_bytes()); header.extend_from_slice(&n_columns.to_le_bytes()); header.extend_from_slice(&n_docs.to_le_bytes()); header.extend_from_slice(&n_terms_total.to_le_bytes()); header.extend_from_slice(&fst_offset.to_le_bytes()); header.extend_from_slice(&postings_offset.to_le_bytes()); header.extend_from_slice(&doc_lengths_table_offset.to_le_bytes()); debug_assert_eq!(header.len(), header_size as usize, "header size mismatch");
w.write_all(&header)?;
match fst_source {
FstSource::InRam(bytes) => w.write_all(&bytes)?,
FstSource::Streamed { path, crc, .. } => {
let mut reader = BufReader::with_capacity(PARTITION_BUF_SIZE, File::open(&path)?);
io::copy(&mut reader, w)?;
w.write_all(&crc.to_le_bytes())?;
}
}
let mut postings_reader =
BufReader::with_capacity(PARTITION_BUF_SIZE, File::open(&postings_path)?);
io::copy(&mut postings_reader, w)?;
drop(postings_reader);
drop(scratch_dir);
w.write_all(&dir_buf)?;
w.write_all(&arrays_buf)?;
if let Some(t) = blob_copy_start {
finish_profile.blob_copy += t.elapsed();
}
if finish_profile.enabled {
eprintln!(
"[fts-finish] partition_flush={:.3}s lex_rank={:.3}s partition_sort={:.3}s mmap_open={:.3}s scratch_cleanup={:.3}s postings_close={:.3}s fst_close={:.3}s doc_lengths_emit={:.3}s blob_copy={:.3}s",
finish_profile.partition_flush.as_secs_f64(),
finish_profile.lex_rank_build.as_secs_f64(),
finish_profile.partition_sort.as_secs_f64(),
finish_profile.mmap_open.as_secs_f64(),
finish_profile.scratch_cleanup.as_secs_f64(),
finish_profile.postings_close.as_secs_f64(),
finish_profile.fst_close.as_secs_f64(),
finish_profile.doc_lengths_emit.as_secs_f64(),
finish_profile.blob_copy.as_secs_f64(),
);
}
Ok(())
}
#[inline]
fn map_fst_err(e: fst::Error) -> BuildError {
BuildError::Io(Error::new(ErrorKind::InvalidData, e))
}
#[derive(Default)]
struct TermScratch {
doc_ids: Vec<u32>,
tfs: Vec<u32>,
min_dl_per_block: Vec<u32>,
encoded_blocks: Vec<EncodedBlock>,
term_buf: Vec<u8>,
}
#[allow(clippy::too_many_arguments)]
fn encode_and_emit_term<W: Write>(
term: &str,
pairs: &[(u32, u32)],
col_name_bytes: &[u8],
col_doc_lengths: &[u32],
avgdl: f32,
n_docs: u32,
key_buf: &mut Vec<u8>,
postings_writer: &mut W,
postings_crc_acc: &mut u32,
postings_len: &mut u64,
fst_entries_inram: Option<&mut DictBuilder>,
mut fst_streaming: Option<&mut StreamingDictBuilder<BufWriter<File>>>,
profile: &mut FinishProfile,
scratch: &mut TermScratch,
) -> Result<(), BuildError> {
let encode_start = profile.enabled.then(Instant::now);
profile.encode_calls += 1;
key_buf.clear();
key_buf.extend_from_slice(col_name_bytes);
key_buf.push(FST_SEPARATOR);
key_buf.extend_from_slice(term.as_bytes());
debug_assert!(
pairs.windows(2).all(|w| w[0].0 < w[1].0),
"posting list not sorted by doc_id"
);
let df = pairs.len() as u64;
let fst_value: u64 = if df == 1 {
profile.encode_df1 += 1;
let (doc_id, tf) = pairs[0];
FstValue::pack_inline(doc_id, tf)
} else {
profile.encode_pfor += 1;
let idf_t = bm25::idf(n_docs as u64, df);
let encoded_blocks = &mut scratch.encoded_blocks;
let min_dl_per_block = &mut scratch.min_dl_per_block;
encoded_blocks.clear();
min_dl_per_block.clear();
let block_build_start = profile.enabled.then(Instant::now);
let mut block_doc_ids = mem::take(&mut scratch.doc_ids);
let mut block_tfs = mem::take(&mut scratch.tfs);
if block_doc_ids.capacity() < BLOCK_LEN {
block_doc_ids.reserve(BLOCK_LEN - block_doc_ids.capacity());
}
if block_tfs.capacity() < BLOCK_LEN {
block_tfs.reserve(BLOCK_LEN - block_tfs.capacity());
}
for chunk in pairs.chunks(BLOCK_LEN) {
block_doc_ids.clear();
block_tfs.clear();
block_doc_ids.extend(chunk.iter().map(|&(d, _)| d));
block_tfs.extend(chunk.iter().map(|&(_, t)| t));
let min_dl = block_doc_ids
.iter()
.map(|d| col_doc_lengths[*d as usize])
.min()
.unwrap_or(0);
min_dl_per_block.push(min_dl);
let block = Block {
doc_ids: mem::take(&mut block_doc_ids),
tfs: mem::take(&mut block_tfs),
};
encoded_blocks.push(encode_block(&block));
block_doc_ids = block.doc_ids;
block_tfs = block.tfs;
}
scratch.doc_ids = block_doc_ids;
scratch.tfs = block_tfs;
if let Some(start) = block_build_start {
profile.encode_block_build += start.elapsed();
}
let num_blocks = encoded_blocks.len() as u32;
let metadata_offset = *postings_len;
let skip_table_size = encoded_blocks.len() * SKIP_ENTRY_SIZE;
let blocks_total_size: usize = encoded_blocks.iter().map(|b| b.bytes.len()).sum();
let postings_length = (TERM_META_SIZE + skip_table_size + blocks_total_size) as u64;
debug_assert!(df <= u32::MAX as u64, "df overflows u32");
debug_assert!(
postings_length <= u32::MAX as u64,
"single-term posting > 4 GiB"
);
let term_buf = &mut scratch.term_buf;
term_buf.clear();
if term_buf.capacity() < postings_length as usize {
term_buf.reserve(postings_length as usize - term_buf.capacity());
}
let meta_write_start = profile.enabled.then(Instant::now);
term_buf.extend_from_slice(&(df as u32).to_le_bytes());
term_buf.extend_from_slice(&metadata_offset.to_le_bytes());
term_buf.extend_from_slice(&(postings_length as u32).to_le_bytes());
term_buf.extend_from_slice(&num_blocks.to_le_bytes());
debug_assert_eq!(term_buf.len(), TERM_META_SIZE);
if let Some(start) = meta_write_start {
profile.encode_meta_write += start.elapsed();
}
let mut block_offset: u32 = (TERM_META_SIZE + skip_table_size) as u32;
let skip_write_start = profile.enabled.then(Instant::now);
for (i, blk) in encoded_blocks.iter().enumerate() {
let max_bm25 = bm25::block_upper_bound(idf_t, blk.max_tf, min_dl_per_block[i], avgdl);
let max_bm25_x1000 = (max_bm25 * format::fts::BLOCK_MAX_BM25_FIXED_POINT_SCALE)
.ceil()
.max(0.0)
.min(u32::MAX as f32) as u32;
term_buf.extend_from_slice(&blk.last_doc_id.to_le_bytes());
term_buf.extend_from_slice(&block_offset.to_le_bytes());
term_buf.extend_from_slice(&max_bm25_x1000.to_le_bytes());
term_buf.extend_from_slice(&0u32.to_le_bytes());
block_offset += blk.bytes.len() as u32;
}
if let Some(start) = skip_write_start {
profile.encode_skip_write += start.elapsed();
}
let block_write_start = profile.enabled.then(Instant::now);
for blk in encoded_blocks.iter() {
term_buf.extend_from_slice(&blk.bytes);
}
debug_assert_eq!(term_buf.len(), postings_length as usize);
write_counted(postings_writer, postings_crc_acc, postings_len, term_buf)?;
if let Some(start) = block_write_start {
profile.encode_block_write += start.elapsed();
}
FstValue::pack_pfor(metadata_offset, postings_length as u32)
};
let fst_insert_start = profile.enabled.then(Instant::now);
if let Some(db) = fst_entries_inram {
db.insert(key_buf, fst_value);
} else if let Some(sb) = fst_streaming.as_mut() {
sb.insert_sorted(key_buf, fst_value).map_err(map_fst_err)?;
}
if let Some(start) = fst_insert_start {
profile.fst_insert += start.elapsed();
}
if let Some(start) = encode_start {
profile.encode_total += start.elapsed();
}
Ok(())
}
fn write_counted<W: Write>(
w: &mut W,
crc_acc: &mut u32,
len: &mut u64,
bytes: &[u8],
) -> Result<(), BuildError> {
w.write_all(bytes)?;
*crc_acc = crc32c_append(*crc_acc, bytes);
*len += bytes.len() as u64;
Ok(())
}
fn sort_partition_to_file(
in_path: &Path,
out_path: &Path,
max_partition_bytes: u64,
scratch_dir: &Path,
partition_label: &str,
lex_rank: &[u32],
) -> Result<(), BuildError> {
let mut iter = open_partition_sorted(
in_path,
max_partition_bytes,
scratch_dir,
partition_label,
lex_rank,
)?;
let mut w = BufWriter::with_capacity(PARTITION_BUF_SIZE, File::create(out_path)?);
let mut batch: Vec<Triple> = Vec::with_capacity(SORT_OUTPUT_BATCH_TRIPLES);
while let Some(triple) = iter.next_with(lex_rank) {
let t = triple?;
batch.push(t);
if batch.len() == SORT_OUTPUT_BATCH_TRIPLES {
#[cfg(target_endian = "little")]
{
w.write_all(bytemuck::cast_slice::<Triple, u8>(&batch))?;
}
#[cfg(not(target_endian = "little"))]
for t in &batch {
write_triple(&mut w, t[0], t[1], t[2])?;
}
batch.clear();
}
}
if !batch.is_empty() {
#[cfg(target_endian = "little")]
{
w.write_all(bytemuck::cast_slice::<Triple, u8>(&batch))?;
}
#[cfg(not(target_endian = "little"))]
for t in &batch {
write_triple(&mut w, t[0], t[1], t[2])?;
}
}
w.flush()?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_helpers::default_tokenizer as tokenizer;
#[test]
fn register_column_returns_sequential_ids() {
let mut b = FtsBuilder::new(tokenizer());
assert_eq!(
b.register_column("title".into()).expect("register column"),
0
);
assert_eq!(
b.register_column("body".into()).expect("register column"),
1
);
assert_eq!(b.register_column("tag".into()).expect("register column"), 2);
}
#[test]
fn register_column_rejects_separator_byte() {
let mut b = FtsBuilder::new(tokenizer());
let bad = String::from("ti\x1Ftle");
let err = b.register_column(bad).expect_err("expected error");
assert!(matches!(err, BuildError::ReservedSeparatorInColumnName(_)));
}
#[test]
fn register_column_rejects_reserved_prefix() {
let mut b = FtsBuilder::new(tokenizer());
let err = b
.register_column("inf.title".into())
.expect_err("expected error");
assert!(matches!(err, BuildError::ReservedPrefixInColumnName(_)));
}
#[test]
fn register_column_rejects_duplicates() {
let mut b = FtsBuilder::new(tokenizer());
b.register_column("title".into()).expect("register column");
let err = b
.register_column("title".into())
.expect_err("expected error");
assert!(matches!(err, BuildError::DuplicateColumnName(_)));
}
#[test]
fn add_doc_unknown_column_id_errors() {
let mut b = FtsBuilder::new(tokenizer());
b.register_column("title".into()).expect("register column");
let err = b.add_doc(99, 0, "text").expect_err("expected error");
assert!(matches!(err, BuildError::FtsColumnTypeInvalid { .. }));
}
#[tokio::test]
async fn add_doc_accumulates_tf_within_doc() {
use bytes::Bytes;
use crate::superfile::fts::reader::{BoolMode, FtsReader};
let mut b = FtsBuilder::new(tokenizer());
b.register_column("title".into()).expect("register column");
b.add_doc(0, 0, "rust rust rust async").expect("add doc");
let blob = Bytes::from(b.finish().expect("finish"));
let r =
FtsReader::open(blob, r#"[{"name":"title","tokenizer":"ascii_lower"}]"#).expect("open");
let rust_hits = r
.search("title", &["rust"], 10, BoolMode::Or)
.await
.expect("rust search");
let async_hits = r
.search("title", &["async"], 10, BoolMode::Or)
.await
.expect("async search");
assert_eq!(rust_hits.len(), 1);
assert_eq!(rust_hits[0].0, 0);
assert_eq!(async_hits.len(), 1);
assert_eq!(async_hits[0].0, 0);
}
#[tokio::test]
async fn cross_column_same_term_stays_isolated_through_round_trip() {
use bytes::Bytes;
use crate::superfile::fts::reader::{BoolMode, FtsReader};
let mut b = FtsBuilder::new(tokenizer());
let title_id = b.register_column("title".into()).expect("register title");
let body_id = b.register_column("body".into()).expect("register body");
b.add_doc(title_id, 0, "rust tokio")
.expect("add title doc 0");
b.add_doc(body_id, 0, "rust async").expect("add body doc 0");
b.add_doc(body_id, 1, "rust").expect("add body doc 1");
b.add_doc(title_id, 1, "rust").expect("add title doc 1");
let blob = Bytes::from(b.finish().expect("finish"));
let json = r#"[{"name":"title","tokenizer":"ascii_lower"},{"name":"body","tokenizer":"ascii_lower"}]"#;
let r = FtsReader::open(blob, json).expect("open");
let hits_t = r
.search("title", &["rust"], 10, BoolMode::Or)
.await
.expect("title search");
let ids_t: Vec<u32> = hits_t.iter().map(|(d, _)| *d).collect();
assert_eq!(ids_t.len(), 2, "title 'rust' hit count");
assert!(ids_t.contains(&0));
assert!(ids_t.contains(&1));
let hits_b = r
.search("body", &["rust"], 10, BoolMode::Or)
.await
.expect("body search");
let ids_b: Vec<u32> = hits_b.iter().map(|(d, _)| *d).collect();
assert_eq!(ids_b.len(), 2, "body 'rust' hit count");
assert!(ids_b.contains(&0));
assert!(ids_b.contains(&1));
let hits_async_in_title = r
.search("title", &["async"], 10, BoolMode::Or)
.await
.expect("title async search");
assert!(
hits_async_in_title.is_empty(),
"title must not return 'async' (lives only in body)"
);
let hits_tokio_in_body = r
.search("body", &["tokio"], 10, BoolMode::Or)
.await
.expect("body tokio search");
assert!(
hits_tokio_in_body.is_empty(),
"body must not return 'tokio' (lives only in title)"
);
}
#[test]
fn add_doc_tracks_doc_lengths_clamped() {
let mut b = FtsBuilder::new(tokenizer());
b.register_column("body".into()).expect("register column");
b.add_doc(0, 0, "alpha beta gamma").expect("add doc");
b.add_doc(0, 1, "").expect("add doc"); b.add_doc(0, 2, "delta").expect("add doc");
let col = &b.columns[0];
assert_eq!(col.doc_lengths, vec![3, 0, 1]);
assert_eq!(col.total_tokens, 4);
}
#[test]
fn add_doc_updates_n_docs_per_call() {
let mut b = FtsBuilder::new(tokenizer());
b.register_column("body".into()).expect("register column");
b.add_doc(0, 0, "a").expect("add doc");
b.add_doc(0, 1, "b").expect("add doc");
b.add_doc(0, 2, "c").expect("add doc");
assert_eq!(b.n_docs, 3);
}
#[test]
fn finish_emits_valid_header() {
let mut b = FtsBuilder::new(tokenizer());
b.register_column("title".into()).expect("register column");
b.add_doc(0, 0, "hello world").expect("add doc");
let blob = b.finish().expect("finish");
assert_eq!(&blob[0..8], format::fts::MAGIC);
let version = u32::from_le_bytes([blob[8], blob[9], blob[10], blob[11]]);
assert_eq!(version, format::fts::VERSION);
let n_cols = u32::from_le_bytes([blob[12], blob[13], blob[14], blob[15]]);
assert_eq!(n_cols, 1);
let n_docs = u32::from_le_bytes([blob[16], blob[17], blob[18], blob[19]]);
assert_eq!(n_docs, 1);
let n_terms = u32::from_le_bytes([blob[20], blob[21], blob[22], blob[23]]);
assert_eq!(n_terms, 2);
let mut buf = [0u8; 8];
buf.copy_from_slice(&blob[24..32]);
let fst_off = u64::from_le_bytes(buf);
assert_eq!(fst_off, 48);
}
#[test]
fn finish_to_matches_finish_byte_for_byte() {
fn build() -> FtsBuilder {
let mut b = FtsBuilder::new(tokenizer());
b.register_column("title".into()).expect("register title");
for (i, text) in [
"rust async rust",
"tokio runtime",
"rust search engine",
"async search",
]
.iter()
.enumerate()
{
b.add_doc(0, i as u32, text).expect("add doc");
}
b
}
let via_finish = build().finish().expect("finish");
let mut via_finish_to = Vec::new();
build()
.finish_to(&mut via_finish_to)
.expect("finish_to Vec");
assert_eq!(via_finish_to, via_finish);
}
#[tokio::test]
async fn finish_to_temp_file_round_trips_through_reader() {
use std::io::BufWriter;
use bytes::Bytes;
use crate::superfile::fts::reader::{BoolMode, FtsReader};
let mut b = FtsBuilder::new(tokenizer());
b.register_column("title".into()).expect("register title");
for i in 0..256u32 {
b.add_doc(0, i, &format!("common term{i:03}"))
.expect("add doc");
}
let tmp = tempfile::tempdir().expect("tempdir");
let path = tmp.path().join("fts.blob");
{
let file = File::create(&path).expect("create blob");
let writer = BufWriter::new(file);
b.finish_to(writer).expect("finish_to file");
}
let blob = fs::read(&path).expect("read blob");
let r = FtsReader::open(
Bytes::from(blob),
r#"[{"name":"title","tokenizer":"ascii_lower"}]"#,
)
.expect("open FTS reader");
let hits = r
.search("title", &["common"], 10, BoolMode::Or)
.await
.expect("search");
assert_eq!(hits.len(), 10);
}
#[test]
fn finish_with_no_docs_still_produces_valid_blob() {
let mut b = FtsBuilder::new(tokenizer());
b.register_column("title".into()).expect("register column");
let blob = b.finish().expect("finish");
assert_eq!(&blob[0..8], format::fts::MAGIC);
assert_eq!(
u32::from_le_bytes([blob[16], blob[17], blob[18], blob[19]]),
0
);
assert_eq!(
u32::from_le_bytes([blob[20], blob[21], blob[22], blob[23]]),
0
);
}
#[test]
fn small_build_stays_in_ram_no_spill_files_created() {
let parent = tempfile::tempdir().expect("parent");
let mut b = FtsBuilder::with_scratch(tokenizer(), parent.path().to_path_buf())
.expect("with_scratch");
b.register_column("body".into()).expect("register col");
for i in 0..100u32 {
b.add_doc(0, i, &format!("alpha beta gamma{i}"))
.expect("add doc");
}
for cp in &b.postings {
assert!(
!cp.is_spilled(),
"small build must not have spilled to disk"
);
}
let mut spill_files_found = 0usize;
for entry in walkdir_files(parent.path()) {
let name = entry.file_name().to_string_lossy().to_string();
if name.starts_with("fts_col") && name.ends_with(".bin") {
spill_files_found += 1;
}
}
assert_eq!(
spill_files_found, 0,
"small build must not pre-create posting spill files"
);
let _blob = b.finish().expect("finish");
}
#[test]
fn build_above_threshold_spills_and_matches_in_ram_byte_for_byte() {
fn build_corpus(b: &mut FtsBuilder) {
b.register_column("body".into()).expect("register col");
for i in 0..1000u32 {
b.add_doc(
0,
i,
&format!("common shared term{i:04} payload{i:04} extra word{i:04}"),
)
.expect("add doc");
}
}
let mut baseline = FtsBuilder::new(tokenizer());
build_corpus(&mut baseline);
for cp in &baseline.postings {
assert!(!cp.is_spilled(), "baseline must stay in RAM");
}
let baseline_blob = baseline.finish().expect("finish baseline");
let parent = tempfile::tempdir().expect("parent");
let mut spilled = FtsBuilder::with_scratch(tokenizer(), parent.path().to_path_buf())
.expect("with_scratch");
spilled.set_spill_threshold_bytes(16 * 1024);
build_corpus(&mut spilled);
let any_spilled = spilled.postings.iter().any(|c| c.is_spilled());
assert!(any_spilled, "low threshold must force spill");
let mut spill_files_found = 0usize;
for entry in walkdir_files(parent.path()) {
let name = entry.file_name().to_string_lossy().to_string();
if name.starts_with("fts_col") && name.ends_with(".bin") {
spill_files_found += 1;
}
}
assert!(
spill_files_found > 0,
"spilled build must materialise at least one fts_col*.bin partition file on disk"
);
let spilled_blob = spilled.finish().expect("finish spilled");
assert_eq!(
spilled_blob, baseline_blob,
"streaming-FST + spill path must produce byte-identical blob"
);
}
fn walkdir_files(root: &Path) -> Vec<fs::DirEntry> {
let mut out = Vec::new();
let mut stack: Vec<PathBuf> = vec![root.to_path_buf()];
while let Some(dir) = stack.pop() {
let rd = match fs::read_dir(&dir) {
Ok(r) => r,
Err(_) => continue,
};
for entry in rd.flatten() {
let ft = match entry.file_type() {
Ok(t) => t,
Err(_) => continue,
};
if ft.is_dir() {
stack.push(entry.path());
} else if ft.is_file() {
out.push(entry);
}
}
}
out
}
#[test]
fn external_merge_path_matches_in_memory_path_byte_for_byte() {
fn build_corpus(builder: &mut FtsBuilder) {
builder
.register_column("body".into())
.expect("register col");
for i in 0..600u32 {
builder
.add_doc(0, i, &format!("common term{i:04} payload{i:04}"))
.expect("add doc");
}
}
let mut baseline = FtsBuilder::new(tokenizer());
baseline.set_spill_threshold_bytes(1);
build_corpus(&mut baseline);
let baseline_blob = baseline.finish().expect("finish baseline");
finish_debug::reset();
let mut tight = FtsBuilder::new(tokenizer());
tight.set_spill_threshold_bytes(1);
tight.set_max_partition_bytes(1024);
build_corpus(&mut tight);
let tight_blob = tight.finish().expect("finish tight");
assert_eq!(
tight_blob, baseline_blob,
"external-merge path must produce identical blob bytes"
);
let chunks = finish_debug::observed();
assert!(
!chunks.is_empty(),
"external-merge path must have written at least one sorted-chunk file; \
observed chunks were empty (test no longer exercises the over-budget branch)"
);
}
#[test]
fn scratch_dir_under_with_scratch_is_removed_after_finish() {
let parent = tempfile::tempdir().expect("parent tempdir");
let dir_count_before = fs::read_dir(parent.path()).expect("read parent").count();
let mut b = FtsBuilder::with_scratch(tokenizer(), parent.path().to_path_buf())
.expect("with_scratch");
b.register_column("body".into()).expect("register col");
b.add_doc(0, 0, "alpha beta gamma").expect("add doc");
let _blob = b.finish().expect("finish");
let dir_count_after = fs::read_dir(parent.path()).expect("read parent").count();
assert_eq!(
dir_count_after, dir_count_before,
"FtsBuilder scratch tempdir leaked under override path"
);
}
#[tokio::test]
async fn configurable_spill_partitions_round_trips_through_reader() {
use bytes::Bytes;
use crate::superfile::fts::reader::{BoolMode, FtsReader};
let mut b = FtsBuilder::new(tokenizer());
b.set_spill_partitions(256).expect("set partitions");
b.register_column("body".into()).expect("register col");
for i in 0..50u32 {
b.add_doc(0, i, &format!("alpha beta gamma{i:02}"))
.expect("add doc");
}
let blob = b.finish().expect("finish");
let r = FtsReader::open(
Bytes::from(blob),
r#"[{"name":"body","tokenizer":"ascii_lower"}]"#,
)
.expect("open reader");
let hits = r
.search("body", &["alpha"], 100, BoolMode::Or)
.await
.expect("search alpha");
assert_eq!(hits.len(), 50, "alpha is in every doc");
}
#[test]
fn finish_offsets_are_consistent() {
let mut b = FtsBuilder::new(tokenizer());
b.register_column("body".into()).expect("register column");
for i in 0..10 {
b.add_doc(0, i, &format!("term{i} common"))
.expect("add doc");
}
let blob = b.finish().expect("finish");
let mut buf = [0u8; 8];
buf.copy_from_slice(&blob[24..32]);
let fst_off = u64::from_le_bytes(buf) as usize;
buf.copy_from_slice(&blob[32..40]);
let postings_off = u64::from_le_bytes(buf) as usize;
buf.copy_from_slice(&blob[40..48]);
let dir_off = u64::from_le_bytes(buf) as usize;
assert_eq!(fst_off, 48);
assert!(postings_off > fst_off, "postings after FST");
assert!(dir_off > postings_off, "directory after postings");
assert!(dir_off <= blob.len(), "directory offset within blob");
}
#[test]
fn set_spill_partitions_rejects_after_register_column() {
let mut b = FtsBuilder::new(tokenizer());
b.register_column("body".into()).expect("register col");
let err = b.set_spill_partitions(16).expect_err("expected error");
match err {
BuildError::Io(e) => {
assert!(e.to_string().contains("before any register_column"));
}
other => panic!("expected Io error, got {:?}", other),
}
}
#[test]
fn set_spill_partitions_rejects_zero() {
let mut b = FtsBuilder::new(tokenizer());
let err = b.set_spill_partitions(0).expect_err("expected error");
match err {
BuildError::Io(e) => assert!(e.to_string().contains("must be ≥ 1")),
other => panic!("expected Io error, got {:?}", other),
}
}
#[test]
fn set_spill_partitions_rejects_non_power_of_two() {
const NON_PO2: usize = 7;
let mut b = FtsBuilder::new(tokenizer());
let err = b.set_spill_partitions(NON_PO2).expect_err("expected error");
match err {
BuildError::Io(e) => assert!(e.to_string().contains("power of two")),
other => panic!("expected Io error, got {:?}", other),
}
}
#[test]
fn read_partition_triples_empty_file_is_empty() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("empty.part");
fs::write(&path, []).expect("write empty file");
let triples = read_partition_triples(&path).expect("read empty");
assert!(triples.is_empty());
}
#[test]
fn read_partition_triples_round_trips_le_bytes() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("good.part");
let mut bytes = Vec::new();
for field in [3u32, 4, 5, 6, 7, 8] {
bytes.extend_from_slice(&field.to_le_bytes());
}
fs::write(&path, &bytes).expect("write triples");
let triples = read_partition_triples(&path).expect("read triples");
assert_eq!(triples, vec![[3u32, 4, 5], [6u32, 7, 8]]);
}
#[test]
fn read_partition_triples_rejects_non_multiple_length() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("ragged.part");
fs::write(&path, vec![0u8; TRIPLE_BYTES + 1]).expect("write ragged file");
let err = read_partition_triples(&path).expect_err("expected error");
match err {
BuildError::Io(e) => {
assert_eq!(e.kind(), ErrorKind::InvalidData);
assert!(e.to_string().contains("not a multiple"));
}
other => panic!("expected Io error, got {:?}", other),
}
}
#[test]
fn merge_entry_orders_by_sort_key_reversed() {
let small = MergeEntry {
sort_key: 10,
term_id: 1,
tf: 1,
reader_idx: 0,
};
let large = MergeEntry {
sort_key: 20,
term_id: 2,
tf: 1,
reader_idx: 1,
};
assert_eq!(small.cmp(&large), Ordering::Greater);
assert_eq!(small.partial_cmp(&large), Some(Ordering::Greater));
assert!(small != large);
let small_dup = MergeEntry {
sort_key: 10,
term_id: 9,
tf: 9,
reader_idx: 0,
};
assert!(small == small_dup);
let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
heap.push(large);
heap.push(small);
assert_eq!(heap.pop().expect("non-empty heap").sort_key, 10);
}
}