use std::io::Write;
#[cfg(feature = "native")]
use rayon::prelude::*;
use rustc_hash::FxHashMap;
use crate::Result;
#[cfg(feature = "native")]
use crate::dsl::VectorIndexType;
use crate::dsl::{DenseVectorQuantization, Field, Schema};
use crate::segment::format::{DenseVectorTocEntry, write_dense_toc_and_footer};
use crate::segment::vector_data::FlatVectorData;
use crate::DocId;
pub(super) struct DenseVectorBuilder {
pub dim: usize,
pub doc_ids: Vec<(DocId, u16)>,
pub vectors: Vec<f32>,
}
impl DenseVectorBuilder {
pub fn new(dim: usize) -> Self {
Self {
dim,
doc_ids: Vec::with_capacity(16),
vectors: Vec::with_capacity(16 * dim),
}
}
pub fn add(&mut self, doc_id: DocId, ordinal: u16, vector: &[f32]) {
debug_assert_eq!(vector.len(), self.dim, "Vector dimension mismatch");
self.doc_ids.push((doc_id, ordinal));
self.vectors.extend_from_slice(vector);
}
pub fn len(&self) -> usize {
self.doc_ids.len()
}
}
pub(super) struct BinaryDenseVectorBuilder {
pub dim_bits: usize,
pub byte_len: usize,
pub doc_ids: Vec<(DocId, u16)>,
pub vectors: Vec<u8>,
}
impl BinaryDenseVectorBuilder {
pub fn new(dim_bits: usize) -> Self {
let byte_len = dim_bits.div_ceil(8);
Self {
dim_bits,
byte_len,
doc_ids: Vec::with_capacity(16),
vectors: Vec::with_capacity(16 * byte_len),
}
}
pub fn add(&mut self, doc_id: DocId, ordinal: u16, packed_bytes: &[u8]) {
debug_assert_eq!(
packed_bytes.len(),
self.byte_len,
"Binary vector byte length mismatch: expected {}, got {}",
self.byte_len,
packed_bytes.len()
);
self.doc_ids.push((doc_id, ordinal));
self.vectors.extend_from_slice(packed_bytes);
}
pub fn len(&self) -> usize {
self.doc_ids.len()
}
}
pub(super) fn build_vectors_streaming(
dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
binary_vectors: FxHashMap<u32, BinaryDenseVectorBuilder>,
schema: &Schema,
trained: Option<&super::super::TrainedVectorStructures>,
writer: &mut dyn Write,
) -> Result<()> {
let mut fields: Vec<(u32, DenseVectorBuilder)> = dense_vectors
.into_iter()
.filter(|(_, b)| b.len() > 0)
.collect();
fields.sort_by_key(|(id, _)| *id);
let mut binary_fields: Vec<(u32, BinaryDenseVectorBuilder)> = binary_vectors
.into_iter()
.filter(|(_, b)| b.len() > 0)
.collect();
binary_fields.sort_by_key(|(id, _)| *id);
if fields.is_empty() && binary_fields.is_empty() {
return Ok(());
}
let quants: Vec<DenseVectorQuantization> = fields
.iter()
.map(|(field_id, _)| {
schema
.get_field_entry(Field(*field_id))
.and_then(|e| e.dense_vector_config.as_ref())
.map(|c| c.quantization)
.unwrap_or(DenseVectorQuantization::F32)
})
.collect();
let mut field_sizes: Vec<usize> = Vec::with_capacity(fields.len());
for (i, (_field_id, builder)) in fields.iter().enumerate() {
field_sizes.push(FlatVectorData::serialized_binary_size(
builder.dim,
builder.len(),
quants[i],
));
}
let mut toc: Vec<DenseVectorTocEntry> = Vec::with_capacity(fields.len() * 2);
let mut current_offset = 0u64;
#[cfg(feature = "native")]
let ann_blobs: Vec<(u32, u8, Vec<u8>)> = if let Some(trained) = trained {
let ann_blob_fn =
|(field_id, builder): &(u32, DenseVectorBuilder)| -> Option<(u32, u8, Vec<u8>)> {
let config = schema
.get_field_entry(Field(*field_id))
.and_then(|e| e.dense_vector_config.as_ref())?;
let dim = builder.dim;
let blob = match config.index_type {
VectorIndexType::IvfRaBitQ if trained.centroids.contains_key(field_id) => {
let centroids = &trained.centroids[field_id];
let (mut index, codebook) =
super::super::ann_build::new_ivf_rabitq(dim, centroids);
for (i, (doc_id, ordinal)) in builder.doc_ids.iter().enumerate() {
let v = &builder.vectors[i * dim..(i + 1) * dim];
index.add_vector(centroids, &codebook, *doc_id, *ordinal, v);
}
super::super::ann_build::serialize_ivf_rabitq(index, codebook)
.map(|b| (super::super::ann_build::IVF_RABITQ_TYPE, b))
}
VectorIndexType::ScaNN
if trained.centroids.contains_key(field_id)
&& trained.codebooks.contains_key(field_id) =>
{
let centroids = &trained.centroids[field_id];
let codebook = &trained.codebooks[field_id];
let mut index =
super::super::ann_build::new_scann(dim, centroids, codebook);
for (i, (doc_id, ordinal)) in builder.doc_ids.iter().enumerate() {
let v = &builder.vectors[i * dim..(i + 1) * dim];
index.add_vector(centroids, codebook, *doc_id, *ordinal, v);
}
super::super::ann_build::serialize_scann(index, codebook)
.map(|b| (super::super::ann_build::SCANN_TYPE, b))
}
_ => return None,
};
match blob {
Ok((index_type, bytes)) => {
log::info!(
"[segment_build] built ANN(type={}) for field {} ({} vectors, {} bytes)",
index_type,
field_id,
builder.doc_ids.len(),
bytes.len()
);
Some((*field_id, index_type, bytes))
}
Err(e) => {
log::warn!(
"[segment_build] ANN serialize failed for field {}: {}",
field_id,
e
);
None
}
}
};
fields.par_iter().filter_map(ann_blob_fn).collect()
} else {
Vec::new()
};
#[cfg(not(feature = "native"))]
let ann_blobs: Vec<(u32, u8, Vec<u8>)> = {
let _ = trained; Vec::new()
};
for (i, (_field_id, builder)) in fields.into_iter().enumerate() {
let data_offset = current_offset;
FlatVectorData::serialize_binary_from_flat_streaming(
builder.dim,
&builder.vectors,
&builder.doc_ids,
quants[i],
writer,
)
.map_err(crate::Error::Io)?;
current_offset += field_sizes[i] as u64;
toc.push(DenseVectorTocEntry {
field_id: _field_id,
index_type: super::super::ann_build::FLAT_TYPE,
offset: data_offset,
size: field_sizes[i] as u64,
});
let pad = (8 - (current_offset % 8)) % 8;
if pad > 0 {
writer.write_all(&[0u8; 8][..pad as usize])?;
current_offset += pad;
}
}
for (field_id, index_type, blob) in ann_blobs {
let data_offset = current_offset;
let blob_len = blob.len() as u64;
writer.write_all(&blob)?;
current_offset += blob_len;
toc.push(DenseVectorTocEntry {
field_id,
index_type,
offset: data_offset,
size: blob_len,
});
let pad = (8 - (current_offset % 8)) % 8;
if pad > 0 {
writer.write_all(&[0u8; 8][..pad as usize])?;
current_offset += pad;
}
}
for (field_id, builder) in binary_fields.into_iter() {
let data_offset = current_offset;
let byte_len = builder.byte_len;
let num_vectors = builder.len();
let data_size = crate::segment::format::FLAT_BINARY_HEADER_SIZE
+ num_vectors * byte_len
+ num_vectors * crate::segment::format::DOC_ID_ENTRY_SIZE;
FlatVectorData::serialize_binary_from_bits_streaming(
builder.dim_bits,
&builder.vectors,
&builder.doc_ids,
writer,
)
.map_err(crate::Error::Io)?;
current_offset += data_size as u64;
toc.push(DenseVectorTocEntry {
field_id,
index_type: super::super::ann_build::FLAT_TYPE,
offset: data_offset,
size: data_size as u64,
});
let pad = (8 - (current_offset % 8)) % 8;
if pad > 0 {
writer.write_all(&[0u8; 8][..pad as usize])?;
current_offset += pad;
}
}
write_dense_toc_and_footer(writer, current_offset, &toc)?;
Ok(())
}