use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::io::Write;
use byteorder::{LittleEndian, WriteBytesExt};
use rustc_hash::FxHashMap;
pub(crate) struct VidLookup {
map: FxHashMap<(crate::DocId, u16), u32>,
}
impl VidLookup {
pub fn from_sorted_pairs(vid_pairs: &[(crate::DocId, u16)]) -> Self {
let mut map = FxHashMap::with_capacity_and_hasher(vid_pairs.len(), Default::default());
for (vid, &pair) in vid_pairs.iter().enumerate() {
map.insert(pair, vid as u32);
}
Self { map }
}
#[inline]
pub fn get(&self, key: (crate::DocId, u16)) -> u32 {
self.map[&key]
}
}
use crate::DocId;
use crate::segment::format::BMP_BLOB_MAGIC_V13;
use crate::segment::reader::bmp::BMP_SUPERBLOCK_SIZE;
#[allow(clippy::too_many_arguments)]
pub(crate) fn build_bmp_blob(
mut postings: FxHashMap<u32, Vec<(DocId, u16, f32)>>,
bmp_block_size: u32,
weight_threshold: f32,
pruning_fraction: Option<f32>,
dims: u32,
max_weight: f32,
min_terms: usize,
writer: &mut dyn Write,
) -> std::io::Result<u64> {
if postings.is_empty() {
return Ok(0);
}
for (_dim_id, dim_postings) in postings.iter_mut() {
if let Some(fraction) = pruning_fraction
&& dim_postings.len() >= min_terms
&& fraction < 1.0
{
dim_postings.sort_unstable_by(|a, b| {
b.2.abs()
.partial_cmp(&a.2.abs())
.unwrap_or(std::cmp::Ordering::Equal)
});
let keep = ((dim_postings.len() as f64 * fraction as f64).ceil() as usize).max(1);
dim_postings.truncate(keep);
dim_postings.sort_unstable_by_key(|(doc_id, ordinal, _)| (*doc_id, *ordinal));
}
}
let max_dim_postings: usize = postings.values().map(|v| v.len()).max().unwrap_or(0);
let mut vid_set: rustc_hash::FxHashSet<(DocId, u16)> =
rustc_hash::FxHashSet::with_capacity_and_hasher(max_dim_postings, Default::default());
for dim_postings in postings.values() {
let skip_threshold = dim_postings.len() < min_terms;
for &(doc_id, ordinal, weight) in dim_postings {
let abs_w = weight.abs();
if !skip_threshold && abs_w < weight_threshold {
continue;
}
if quantize_weight(abs_w, max_weight) > 0 {
vid_set.insert((doc_id, ordinal));
}
}
}
if vid_set.is_empty() {
return Ok(0);
}
let max_weight_scale = max_weight;
let mut vid_pairs: Vec<(DocId, u16)> = vid_set.into_iter().collect();
vid_pairs.sort_unstable();
let num_real_docs = vid_pairs.len();
let vid_lookup = VidLookup::from_sorted_pairs(&vid_pairs);
let effective_block_size = bmp_block_size.min(256);
let num_virtual_docs =
num_real_docs.div_ceil(effective_block_size as usize) * effective_block_size as usize;
let num_blocks = num_virtual_docs / effective_block_size as usize;
let mut dim_ids: Vec<u32> = postings.keys().copied().collect();
dim_ids.sort_unstable();
let dim_vecs: Vec<Vec<(DocId, u16, f32)>> = dim_ids
.iter()
.map(|&d| postings.remove(&d).unwrap_or_default())
.collect();
drop(postings);
let dim_slices: Vec<&[(DocId, u16, f32)]> = dim_vecs.iter().map(|v| v.as_slice()).collect();
let dim_skip_threshold: Vec<bool> = dim_slices.iter().map(|s| s.len() < min_terms).collect();
let num_dims = dim_ids.len();
let mut cursors: Vec<usize> = vec![0; num_dims];
let mut heap: BinaryHeap<Reverse<(u32, u32, usize)>> = BinaryHeap::with_capacity(num_dims);
let bs64 = effective_block_size as u64;
for (dim_idx, &dim_id) in dim_ids.iter().enumerate() {
let posts = dim_slices[dim_idx];
let skip_wt = dim_skip_threshold[dim_idx];
for (pos, &(doc_id, ordinal, weight)) in posts.iter().enumerate() {
let abs_w = weight.abs();
if !skip_wt && abs_w < weight_threshold {
continue;
}
let impact = quantize_weight(abs_w, max_weight_scale);
if impact == 0 {
continue;
}
let virtual_id = vid_lookup.get((doc_id, ordinal)) as u64;
let block_id = (virtual_id / bs64) as u32;
cursors[dim_idx] = pos;
heap.push(Reverse((block_id, dim_id, dim_idx)));
break;
}
}
if heap.is_empty() {
return Ok(0);
}
let mut block_data_starts: Vec<u64> = Vec::with_capacity(num_blocks + 1);
let mut grid_entries: Vec<(u32, u32, u8)> = Vec::new();
let mut total_terms: u32 = 0;
let mut total_postings: u32 = 0;
let mut cumulative_bytes: u64 = 0;
let mut last_block_filled: i64 = -1;
let mut blk_buf: Vec<u8> = Vec::with_capacity(4096);
let mut blk_dim_ids: Vec<u32> = Vec::new();
let mut blk_posting_counts: Vec<u16> = Vec::new();
let mut blk_postings: Vec<u8> = Vec::new();
while let Some(&Reverse((block_id, _, _))) = heap.peek() {
for _ in (last_block_filled + 1) as u32..block_id {
block_data_starts.push(cumulative_bytes);
}
block_data_starts.push(cumulative_bytes);
last_block_filled = block_id as i64;
blk_dim_ids.clear();
blk_posting_counts.clear();
blk_postings.clear();
while let Some(&Reverse((bid, dim_id, dim_idx))) = heap.peek() {
if bid != block_id {
break;
}
heap.pop();
let posts = dim_slices[dim_idx];
let skip_wt = dim_skip_threshold[dim_idx];
let mut pos = cursors[dim_idx];
let mut max_impact = 0u8;
let mut next_block: Option<u32> = None;
let mut term_posting_count: u16 = 0;
blk_dim_ids.push(dim_id);
while pos < posts.len() {
let (doc_id, ordinal, weight) = posts[pos];
let abs_w = weight.abs();
if !skip_wt && abs_w < weight_threshold {
pos += 1;
continue;
}
let impact = quantize_weight(abs_w, max_weight_scale);
if impact == 0 {
pos += 1;
continue;
}
let virtual_id = vid_lookup.get((doc_id, ordinal)) as u64;
let bid2 = (virtual_id / bs64) as u32;
if bid2 != block_id {
next_block = Some(bid2);
break;
}
let local_slot = (virtual_id % bs64) as u8;
blk_postings.push(local_slot);
blk_postings.push(impact);
term_posting_count += 1;
max_impact = max_impact.max(impact);
pos += 1;
}
blk_posting_counts.push(term_posting_count);
total_postings += term_posting_count as u32;
total_terms += 1;
grid_entries.push((dim_id, block_id, max_impact));
if let Some(nb) = next_block {
cursors[dim_idx] = pos;
heap.push(Reverse((nb, dim_id, dim_idx)));
} else {
cursors[dim_idx] = pos;
while pos < posts.len() {
let (doc_id, ordinal, weight) = posts[pos];
let abs_w = weight.abs();
if skip_wt || abs_w >= weight_threshold {
let impact = quantize_weight(abs_w, max_weight_scale);
if impact > 0 {
let virtual_id = vid_lookup.get((doc_id, ordinal)) as u64;
let nb = (virtual_id / bs64) as u32;
cursors[dim_idx] = pos;
heap.push(Reverse((nb, dim_id, dim_idx)));
break;
}
}
pos += 1;
}
}
}
if !blk_dim_ids.is_empty() {
blk_buf.clear();
let nt = blk_dim_ids.len();
blk_buf.extend_from_slice(&(nt as u16).to_le_bytes());
for &did in &blk_dim_ids {
blk_buf.extend_from_slice(&did.to_le_bytes());
}
let mut cum: u16 = 0;
for &count in &blk_posting_counts {
blk_buf.extend_from_slice(&cum.to_le_bytes());
cum += count;
}
blk_buf.extend_from_slice(&cum.to_le_bytes());
blk_buf.extend_from_slice(&blk_postings);
writer.write_all(&blk_buf)?;
cumulative_bytes += blk_buf.len() as u64;
}
}
for _ in (last_block_filled + 1) as u32..num_blocks as u32 {
block_data_starts.push(cumulative_bytes);
}
block_data_starts.push(cumulative_bytes);
grid_entries.sort_unstable();
log::info!(
"[bmp_build] V13 vectors={} padded={} blocks={} dims={} \
terms={} postings={} grid_entries={}",
num_real_docs,
num_virtual_docs,
num_blocks,
dims,
total_terms,
total_postings,
grid_entries.len(),
);
drop(dim_slices); drop(dim_vecs);
drop(vid_lookup);
let mut bytes_written: u64 = cumulative_bytes;
let padding = (8 - (bytes_written % 8) as usize) % 8;
if padding > 0 {
writer.write_all(&[0u8; 8][..padding])?;
bytes_written += padding as u64;
}
bytes_written += write_u64_slice_le(writer, &block_data_starts)?;
drop(block_data_starts);
let grid_offset = bytes_written;
let (packed_bytes, sb_bytes) =
stream_write_grids(&grid_entries, dims as usize, num_blocks, writer)?;
let sb_grid_offset = bytes_written + packed_bytes;
bytes_written += packed_bytes + sb_bytes;
drop(grid_entries);
let doc_map_offset = bytes_written;
for &(doc_id, _) in &vid_pairs {
writer.write_u32::<LittleEndian>(doc_id)?;
}
for _ in num_real_docs..num_virtual_docs {
writer.write_u32::<LittleEndian>(u32::MAX)?;
}
bytes_written += num_virtual_docs as u64 * 4;
for &(_, ord) in &vid_pairs {
writer.write_u16::<LittleEndian>(ord)?;
}
for _ in num_real_docs..num_virtual_docs {
writer.write_u16::<LittleEndian>(0)?;
}
bytes_written += num_virtual_docs as u64 * 2;
drop(vid_pairs);
write_v13_footer(
writer,
total_terms,
total_postings,
grid_offset,
sb_grid_offset,
num_blocks as u32,
dims,
effective_block_size,
num_virtual_docs as u32,
max_weight_scale,
doc_map_offset,
num_real_docs as u32,
)?;
bytes_written += 64;
Ok(bytes_written)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn write_v13_footer(
writer: &mut dyn Write,
total_terms: u32,
total_postings: u32,
grid_offset: u64,
sb_grid_offset: u64,
num_blocks: u32,
dims: u32,
bmp_block_size: u32,
num_virtual_docs: u32,
max_weight_scale: f32,
doc_map_offset: u64,
num_real_docs: u32,
) -> std::io::Result<()> {
writer.write_u32::<LittleEndian>(total_terms)?; writer.write_u32::<LittleEndian>(total_postings)?; writer.write_u64::<LittleEndian>(grid_offset)?; writer.write_u64::<LittleEndian>(sb_grid_offset)?; writer.write_u32::<LittleEndian>(num_blocks)?; writer.write_u32::<LittleEndian>(dims)?; writer.write_u32::<LittleEndian>(bmp_block_size)?; writer.write_u32::<LittleEndian>(num_virtual_docs)?; writer.write_f32::<LittleEndian>(max_weight_scale)?; writer.write_u64::<LittleEndian>(doc_map_offset)?; writer.write_u32::<LittleEndian>(num_real_docs)?; writer.write_u32::<LittleEndian>(0)?; writer.write_u32::<LittleEndian>(BMP_BLOB_MAGIC_V13)?; Ok(())
}
pub(crate) fn write_u64_slice_le(writer: &mut dyn Write, data: &[u64]) -> std::io::Result<u64> {
if data.is_empty() {
return Ok(0);
}
#[cfg(target_endian = "little")]
{
let bytes =
unsafe { std::slice::from_raw_parts(data.as_ptr() as *const u8, data.len() * 8) };
writer.write_all(bytes)?;
}
#[cfg(target_endian = "big")]
{
for &v in data {
writer.write_all(&v.to_le_bytes())?;
}
}
Ok(data.len() as u64 * 8)
}
#[inline]
pub(crate) fn quantize_u8_to_u4_ceil(val: u8) -> u8 {
if val == 0 {
return 0;
}
(val as u16 * 15).div_ceil(255) as u8
}
pub(crate) fn stream_write_grids(
grid_entries: &[(u32, u32, u8)],
num_dims: usize,
num_blocks: usize,
writer: &mut dyn Write,
) -> std::io::Result<(u64, u64)> {
let packed_row_size = num_blocks.div_ceil(2);
let num_superblocks = num_blocks.div_ceil(BMP_SUPERBLOCK_SIZE as usize);
let mut row_buf = vec![0u8; packed_row_size];
let mut sb_row = vec![0u8; num_superblocks];
let mut gi = 0;
for dim_id in 0..num_dims as u32 {
row_buf.fill(0);
while gi < grid_entries.len() && grid_entries[gi].0 == dim_id {
let b = grid_entries[gi].1 as usize;
let q4 = quantize_u8_to_u4_ceil(grid_entries[gi].2);
if b.is_multiple_of(2) {
row_buf[b / 2] |= q4;
} else {
row_buf[b / 2] |= q4 << 4;
}
gi += 1;
}
writer.write_all(&row_buf)?;
}
let packed_bytes = (num_dims * packed_row_size) as u64;
gi = 0;
for dim_id in 0..num_dims as u32 {
sb_row.fill(0);
while gi < grid_entries.len() && grid_entries[gi].0 == dim_id {
let b = grid_entries[gi].1 as usize;
let sb = b / BMP_SUPERBLOCK_SIZE as usize;
if grid_entries[gi].2 > sb_row[sb] {
sb_row[sb] = grid_entries[gi].2;
}
gi += 1;
}
writer.write_all(&sb_row)?;
}
let sb_bytes = (num_dims * num_superblocks) as u64;
Ok((packed_bytes, sb_bytes))
}
const GRID_ENTRY_DISK_SIZE: usize = 9;
#[cfg(feature = "native")]
pub(crate) struct GridRunReader {
reader: std::io::BufReader<std::fs::File>,
pub current: Option<(u32, u32, u8)>,
}
#[cfg(feature = "native")]
impl GridRunReader {
pub fn open(path: &std::path::Path) -> std::io::Result<Self> {
let file = std::fs::File::open(path)?;
let mut reader = std::io::BufReader::with_capacity(256 * 1024, file);
let current = Self::read_entry(&mut reader)?;
Ok(Self { reader, current })
}
fn read_entry(
reader: &mut std::io::BufReader<std::fs::File>,
) -> std::io::Result<Option<(u32, u32, u8)>> {
use std::io::Read;
let mut buf = [0u8; GRID_ENTRY_DISK_SIZE];
match reader.read_exact(&mut buf) {
Ok(()) => {
let dim_id = u32::from_le_bytes(buf[0..4].try_into().unwrap());
let block_id = u32::from_le_bytes(buf[4..8].try_into().unwrap());
let impact = buf[8];
Ok(Some((dim_id, block_id, impact)))
}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => Ok(None),
Err(e) => Err(e),
}
}
pub fn advance(&mut self) -> std::io::Result<()> {
self.current = Self::read_entry(&mut self.reader)?;
Ok(())
}
pub fn reset(&mut self) -> std::io::Result<()> {
use std::io::Seek;
self.reader.seek(std::io::SeekFrom::Start(0))?;
self.current = Self::read_entry(&mut self.reader)?;
Ok(())
}
}
#[cfg(feature = "native")]
pub(crate) fn write_grid_run(
entries: &[(u32, u32, u8)],
path: &std::path::Path,
) -> std::io::Result<()> {
use std::io::BufWriter;
let file = std::fs::File::create(path)?;
let mut w = BufWriter::with_capacity(256 * 1024, file);
let mut buf = [0u8; GRID_ENTRY_DISK_SIZE];
for &(dim_id, block_id, impact) in entries {
buf[0..4].copy_from_slice(&dim_id.to_le_bytes());
buf[4..8].copy_from_slice(&block_id.to_le_bytes());
buf[8] = impact;
w.write_all(&buf)?;
}
w.flush()?;
Ok(())
}
#[cfg(feature = "native")]
pub(crate) fn stream_write_grids_merged(
run_readers: &mut [GridRunReader],
num_dims: usize,
num_blocks: usize,
writer: &mut dyn Write,
) -> std::io::Result<(u64, u64)> {
let packed_row_size = num_blocks.div_ceil(2);
let num_superblocks = num_blocks.div_ceil(BMP_SUPERBLOCK_SIZE as usize);
let mut row_buf = vec![0u8; packed_row_size];
let mut sb_row = vec![0u8; num_superblocks];
for dim_id in 0..num_dims as u32 {
row_buf.fill(0);
for reader in run_readers.iter_mut() {
while let Some((d, block_id, impact)) = reader.current {
if d != dim_id {
break;
}
let b = block_id as usize;
let q4 = quantize_u8_to_u4_ceil(impact);
if b.is_multiple_of(2) {
row_buf[b / 2] |= q4;
} else {
row_buf[b / 2] |= q4 << 4;
}
reader.advance()?;
}
}
writer.write_all(&row_buf)?;
}
let packed_bytes = (num_dims * packed_row_size) as u64;
for reader in run_readers.iter_mut() {
reader.reset()?;
}
for dim_id in 0..num_dims as u32 {
sb_row.fill(0);
for reader in run_readers.iter_mut() {
while let Some((d, block_id, impact)) = reader.current {
if d != dim_id {
break;
}
let sb = block_id as usize / BMP_SUPERBLOCK_SIZE as usize;
if impact > sb_row[sb] {
sb_row[sb] = impact;
}
reader.advance()?;
}
}
writer.write_all(&sb_row)?;
}
let sb_bytes = (num_dims * num_superblocks) as u64;
Ok((packed_bytes, sb_bytes))
}
#[inline]
fn quantize_weight(weight: f32, max_scale: f32) -> u8 {
if max_scale <= 0.0 {
return 0;
}
let normalized = (weight / max_scale * 255.0).round();
normalized.clamp(0.0, 255.0) as u8
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_quantize_weight() {
assert_eq!(quantize_weight(1.0, 1.0), 255);
assert_eq!(quantize_weight(0.5, 1.0), 128);
assert_eq!(quantize_weight(0.0, 1.0), 0);
assert_eq!(quantize_weight(1.0, 2.0), 128);
}
#[test]
fn test_build_bmp_blob_empty() {
let postings = FxHashMap::default();
let mut buf = Vec::new();
let size = build_bmp_blob(postings, 64, 0.0, None, 105879, 5.0, 4, &mut buf).unwrap();
assert_eq!(size, 0);
assert!(buf.is_empty());
}
#[test]
fn test_build_bmp_blob_basic() {
let mut postings = FxHashMap::default();
postings.insert(0u32, vec![(0u32, 0u16, 1.0f32), (1, 0, 0.5)]);
postings.insert(1, vec![(0, 0, 0.8)]);
let mut buf = Vec::new();
let size = build_bmp_blob(postings, 64, 0.0, None, 105879, 5.0, 4, &mut buf).unwrap();
assert!(size > 0);
assert_eq!(buf.len(), size as usize);
let footer_start = buf.len() - 4;
let magic = u32::from_le_bytes(buf[footer_start..].try_into().unwrap());
assert_eq!(magic, BMP_BLOB_MAGIC_V13);
}
#[test]
fn test_build_bmp_blob_multi_ordinal() {
let mut postings = FxHashMap::default();
postings.insert(0u32, vec![(0u32, 0u16, 1.0f32), (0, 1, 0.8), (1, 0, 0.5)]);
let mut buf = Vec::new();
let size = build_bmp_blob(postings, 64, 0.0, None, 105879, 5.0, 4, &mut buf).unwrap();
assert!(size > 0);
let footer_start = buf.len() - 64;
let fb = &buf[footer_start..];
let num_virtual_docs = u32::from_le_bytes(fb[36..40].try_into().unwrap());
assert_eq!(num_virtual_docs, 64);
let num_real_docs = u32::from_le_bytes(fb[52..56].try_into().unwrap());
assert_eq!(num_real_docs, 3);
}
#[test]
fn test_build_bmp_blob_fixed_scale() {
let mut postings = FxHashMap::default();
postings.insert(0u32, vec![(0u32, 0u16, 2.0f32), (1, 0, 1.0)]);
let mut buf = Vec::new();
let size = build_bmp_blob(postings, 64, 0.0, None, 105879, 5.0, 4, &mut buf).unwrap();
assert!(size > 0);
let footer_start = buf.len() - 64;
let fb = &buf[footer_start..];
let scale = f32::from_le_bytes(fb[40..44].try_into().unwrap());
assert!((scale - 5.0).abs() < 0.001, "scale={}, expected 5.0", scale);
}
#[test]
fn test_fixed_scale_across_segments() {
let mut postings_a = FxHashMap::default();
postings_a.insert(0u32, vec![(0u32, 0u16, 3.0f32), (1, 0, 1.5)]);
let mut postings_b = FxHashMap::default();
postings_b.insert(0u32, vec![(0u32, 0u16, 1.0f32), (1, 0, 0.5)]);
let mut buf_a = Vec::new();
build_bmp_blob(postings_a, 64, 0.0, None, 105879, 5.0, 4, &mut buf_a).unwrap();
let scale_a = f32::from_le_bytes(
buf_a[buf_a.len() - 64 + 40..buf_a.len() - 64 + 44]
.try_into()
.unwrap(),
);
let mut buf_b = Vec::new();
build_bmp_blob(postings_b, 64, 0.0, None, 105879, 5.0, 4, &mut buf_b).unwrap();
let scale_b = f32::from_le_bytes(
buf_b[buf_b.len() - 64 + 40..buf_b.len() - 64 + 44]
.try_into()
.unwrap(),
);
assert_eq!(
scale_a, scale_b,
"Fixed max_weight scales must be identical"
);
assert!((scale_a - 5.0).abs() < 0.001);
}
}