use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::sync::Arc;
use rustc_hash::FxHashMap;
use super::builder::{SegmentBuilder, SegmentBuilderConfig};
use super::reader::SegmentReader;
use super::store::StoreMerger;
use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
use crate::Result;
use crate::directories::{Directory, DirectoryWriter};
use crate::dsl::{FieldType, Schema};
use crate::structures::{
BlockPostingList, PostingList, RaBitQConfig, RaBitQIndex, SSTableWriter, TERMINATED, TermInfo,
};
#[derive(Debug, Clone, Default)]
pub struct MergeStats {
pub terms_processed: usize,
pub postings_merged: usize,
pub peak_memory_bytes: usize,
pub current_memory_bytes: usize,
pub term_dict_bytes: usize,
pub postings_bytes: usize,
pub store_bytes: usize,
pub vectors_bytes: usize,
}
impl MergeStats {
pub fn format_memory(bytes: usize) -> String {
if bytes >= 1024 * 1024 * 1024 {
format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
} else if bytes >= 1024 * 1024 {
format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
} else if bytes >= 1024 {
format!("{:.2} KB", bytes as f64 / 1024.0)
} else {
format!("{} B", bytes)
}
}
}
struct MergeEntry {
key: Vec<u8>,
term_info: TermInfo,
segment_idx: usize,
doc_offset: u32,
}
impl PartialEq for MergeEntry {
fn eq(&self, other: &Self) -> bool {
self.key == other.key
}
}
impl Eq for MergeEntry {}
impl PartialOrd for MergeEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for MergeEntry {
fn cmp(&self, other: &Self) -> Ordering {
other.key.cmp(&self.key)
}
}
pub struct SegmentMerger {
schema: Arc<Schema>,
}
impl SegmentMerger {
pub fn new(schema: Arc<Schema>) -> Self {
Self { schema }
}
pub async fn merge<D: Directory + DirectoryWriter>(
&self,
dir: &D,
segments: &[SegmentReader],
new_segment_id: SegmentId,
) -> Result<SegmentMeta> {
let (meta, _stats) = self.merge_with_stats(dir, segments, new_segment_id).await?;
Ok(meta)
}
pub async fn merge_with_stats<D: Directory + DirectoryWriter>(
&self,
dir: &D,
segments: &[SegmentReader],
new_segment_id: SegmentId,
) -> Result<(SegmentMeta, MergeStats)> {
let can_stack_stores = segments.iter().all(|s| !s.store_has_dict());
let has_positions = self
.schema
.fields()
.any(|(_, entry)| entry.positions.is_some());
if can_stack_stores && !has_positions {
self.merge_optimized_with_stats(dir, segments, new_segment_id)
.await
} else {
self.merge_rebuild_with_stats(dir, segments, new_segment_id)
.await
}
}
async fn merge_optimized_with_stats<D: Directory + DirectoryWriter>(
&self,
dir: &D,
segments: &[SegmentReader],
new_segment_id: SegmentId,
) -> Result<(SegmentMeta, MergeStats)> {
let mut stats = MergeStats::default();
let files = SegmentFiles::new(new_segment_id.0);
let mut term_dict_data = Vec::new();
let mut postings_data = Vec::new();
let terms_processed = self
.merge_postings_with_stats(
segments,
&mut term_dict_data,
&mut postings_data,
&mut stats,
)
.await?;
stats.terms_processed = terms_processed;
stats.term_dict_bytes = term_dict_data.len();
stats.postings_bytes = postings_data.len();
let current_mem = term_dict_data.capacity() + postings_data.capacity();
stats.current_memory_bytes = current_mem;
stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
let mut store_data = Vec::new();
{
let mut store_merger = StoreMerger::new(&mut store_data);
for segment in segments {
let raw_blocks = segment.store_raw_blocks();
let data_slice = segment.store_data_slice();
store_merger.append_store(data_slice, &raw_blocks).await?;
}
store_merger.finish()?;
}
stats.store_bytes = store_data.len();
let current_mem =
term_dict_data.capacity() + postings_data.capacity() + store_data.capacity();
stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
dir.write(&files.term_dict, &term_dict_data).await?;
dir.write(&files.postings, &postings_data).await?;
dir.write(&files.store, &store_data).await?;
drop(term_dict_data);
drop(postings_data);
drop(store_data);
let vectors_bytes = self
.merge_dense_vectors_with_stats(dir, segments, &files)
.await?;
stats.vectors_bytes = vectors_bytes;
let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
for segment in segments {
for (&field_id, field_stats) in &segment.meta().field_stats {
let entry = merged_field_stats.entry(field_id).or_default();
entry.total_tokens += field_stats.total_tokens;
entry.doc_count += field_stats.doc_count;
}
}
let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
let meta = SegmentMeta {
id: new_segment_id.0,
num_docs: total_docs,
field_stats: merged_field_stats,
};
dir.write(&files.meta, &meta.serialize()?).await?;
log::info!(
"Merge complete: {} terms, output: term_dict={}, postings={}, store={}, vectors={}",
stats.terms_processed,
MergeStats::format_memory(stats.term_dict_bytes),
MergeStats::format_memory(stats.postings_bytes),
MergeStats::format_memory(stats.store_bytes),
MergeStats::format_memory(stats.vectors_bytes),
);
Ok((meta, stats))
}
async fn merge_rebuild_with_stats<D: Directory + DirectoryWriter>(
&self,
dir: &D,
segments: &[SegmentReader],
new_segment_id: SegmentId,
) -> Result<(SegmentMeta, MergeStats)> {
let mut stats = MergeStats::default();
let mut builder =
SegmentBuilder::new((*self.schema).clone(), SegmentBuilderConfig::default())?;
for segment in segments {
for doc_id in 0..segment.num_docs() {
if let Some(doc) = segment.doc(doc_id).await? {
builder.add_document(doc)?;
}
if doc_id % 10000 == 0 {
let builder_stats = builder.stats();
stats.current_memory_bytes = builder_stats.estimated_memory_bytes;
stats.peak_memory_bytes =
stats.peak_memory_bytes.max(stats.current_memory_bytes);
}
}
}
let meta = builder.build(dir, new_segment_id).await?;
Ok((meta, stats))
}
async fn merge_postings_with_stats(
&self,
segments: &[SegmentReader],
term_dict: &mut Vec<u8>,
postings_out: &mut Vec<u8>,
stats: &mut MergeStats,
) -> Result<usize> {
let mut doc_offsets = Vec::with_capacity(segments.len());
let mut offset = 0u32;
for segment in segments {
doc_offsets.push(offset);
offset += segment.num_docs();
}
let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
for (seg_idx, iter) in iterators.iter_mut().enumerate() {
if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
heap.push(MergeEntry {
key,
term_info,
segment_idx: seg_idx,
doc_offset: doc_offsets[seg_idx],
});
}
}
let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::new();
let mut terms_processed = 0usize;
while !heap.is_empty() {
let first = heap.pop().unwrap();
let current_key = first.key.clone();
let mut sources: Vec<(usize, TermInfo, u32)> =
vec![(first.segment_idx, first.term_info, first.doc_offset)];
if let Some((key, term_info)) = iterators[first.segment_idx]
.next()
.await
.map_err(crate::Error::from)?
{
heap.push(MergeEntry {
key,
term_info,
segment_idx: first.segment_idx,
doc_offset: doc_offsets[first.segment_idx],
});
}
while let Some(entry) = heap.peek() {
if entry.key != current_key {
break;
}
let entry = heap.pop().unwrap();
sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
if let Some((key, term_info)) = iterators[entry.segment_idx]
.next()
.await
.map_err(crate::Error::from)?
{
heap.push(MergeEntry {
key,
term_info,
segment_idx: entry.segment_idx,
doc_offset: doc_offsets[entry.segment_idx],
});
}
}
let term_info = if sources.len() == 1 {
let (seg_idx, source_info, seg_doc_offset) = &sources[0];
self.copy_term_posting(
&segments[*seg_idx],
source_info,
*seg_doc_offset,
postings_out,
)
.await?
} else {
self.merge_term_postings(segments, &sources, postings_out)
.await?
};
term_results.push((current_key, term_info));
terms_processed += 1;
if terms_processed.is_multiple_of(100_000) {
log::debug!("Merge progress: {} terms processed", terms_processed);
}
}
log::info!(
"Merge complete: {} terms processed from {} segments",
terms_processed,
segments.len()
);
let results_mem = term_results.capacity() * std::mem::size_of::<(Vec<u8>, TermInfo)>();
stats.current_memory_bytes = results_mem + postings_out.capacity();
stats.peak_memory_bytes = stats.peak_memory_bytes.max(stats.current_memory_bytes);
let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
for (key, term_info) in term_results {
writer.insert(&key, &term_info)?;
}
writer.finish()?;
Ok(terms_processed)
}
async fn copy_term_posting(
&self,
segment: &SegmentReader,
source_info: &TermInfo,
doc_offset: u32,
postings_out: &mut Vec<u8>,
) -> Result<TermInfo> {
if let Some((doc_ids, term_freqs)) = source_info.decode_inline() {
let remapped_ids: Vec<u32> = doc_ids.iter().map(|&id| id + doc_offset).collect();
if let Some(inline) = TermInfo::try_inline(&remapped_ids, &term_freqs) {
return Ok(inline);
}
let mut pl = PostingList::with_capacity(remapped_ids.len());
for (doc_id, tf) in remapped_ids.into_iter().zip(term_freqs.into_iter()) {
pl.push(doc_id, tf);
}
let posting_offset = postings_out.len() as u64;
let block_list = BlockPostingList::from_posting_list(&pl)?;
let mut encoded = Vec::new();
block_list.serialize(&mut encoded)?;
postings_out.extend_from_slice(&encoded);
return Ok(TermInfo::external(
posting_offset,
encoded.len() as u32,
pl.doc_count(),
));
}
let (offset, len) = source_info.external_info().unwrap();
let posting_bytes = segment.read_postings(offset, len).await?;
let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
let mut remapped = PostingList::with_capacity(source_postings.doc_count() as usize);
let mut iter = source_postings.iterator();
while iter.doc() != TERMINATED {
remapped.add(iter.doc() + doc_offset, iter.term_freq());
iter.advance();
}
let doc_ids: Vec<u32> = remapped.iter().map(|p| p.doc_id).collect();
let term_freqs: Vec<u32> = remapped.iter().map(|p| p.term_freq).collect();
if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
return Ok(inline);
}
let posting_offset = postings_out.len() as u64;
let block_list = BlockPostingList::from_posting_list(&remapped)?;
let mut encoded = Vec::new();
block_list.serialize(&mut encoded)?;
postings_out.extend_from_slice(&encoded);
Ok(TermInfo::external(
posting_offset,
encoded.len() as u32,
remapped.doc_count(),
))
}
async fn merge_term_postings(
&self,
segments: &[SegmentReader],
sources: &[(usize, TermInfo, u32)],
postings_out: &mut Vec<u8>,
) -> Result<TermInfo> {
let mut sorted_sources: Vec<_> = sources.to_vec();
sorted_sources.sort_by_key(|(_, _, doc_offset)| *doc_offset);
let all_external = sorted_sources
.iter()
.all(|(_, term_info, _)| term_info.external_info().is_some());
if all_external && sorted_sources.len() > 1 {
let mut block_sources = Vec::with_capacity(sorted_sources.len());
for (seg_idx, term_info, doc_offset) in &sorted_sources {
let segment = &segments[*seg_idx];
let (offset, len) = term_info.external_info().unwrap();
let posting_bytes = segment.read_postings(offset, len).await?;
let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
block_sources.push((source_postings, *doc_offset));
}
let merged_blocks = BlockPostingList::concatenate_blocks(&block_sources)?;
let posting_offset = postings_out.len() as u64;
let mut encoded = Vec::new();
merged_blocks.serialize(&mut encoded)?;
postings_out.extend_from_slice(&encoded);
return Ok(TermInfo::external(
posting_offset,
encoded.len() as u32,
merged_blocks.doc_count(),
));
}
let mut merged = PostingList::new();
for (seg_idx, term_info, doc_offset) in &sorted_sources {
let segment = &segments[*seg_idx];
if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
merged.add(doc_id + doc_offset, tf);
}
} else {
let (offset, len) = term_info.external_info().unwrap();
let posting_bytes = segment.read_postings(offset, len).await?;
let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
let mut iter = source_postings.iterator();
while iter.doc() != TERMINATED {
merged.add(iter.doc() + doc_offset, iter.term_freq());
iter.advance();
}
}
}
let doc_ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
let term_freqs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
return Ok(inline);
}
let posting_offset = postings_out.len() as u64;
let block_list = BlockPostingList::from_posting_list(&merged)?;
let mut encoded = Vec::new();
block_list.serialize(&mut encoded)?;
postings_out.extend_from_slice(&encoded);
Ok(TermInfo::external(
posting_offset,
encoded.len() as u32,
merged.doc_count(),
))
}
async fn merge_dense_vectors_with_stats<D: Directory + DirectoryWriter>(
&self,
dir: &D,
segments: &[SegmentReader],
files: &SegmentFiles,
) -> Result<usize> {
use byteorder::{LittleEndian, WriteBytesExt};
let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
for (field, entry) in self.schema.fields() {
if !matches!(entry.field_type, FieldType::DenseVector) {
continue;
}
let scann_indexes: Vec<_> = segments
.iter()
.filter_map(|s| s.get_scann_vector_index(field))
.collect();
if scann_indexes.len()
== segments
.iter()
.filter(|s| s.has_dense_vector_index(field))
.count()
&& !scann_indexes.is_empty()
{
let refs: Vec<&crate::structures::IVFPQIndex> =
scann_indexes.iter().map(|(idx, _)| idx.as_ref()).collect();
let mut doc_offsets = Vec::with_capacity(segments.len());
let mut offset = 0u32;
for segment in segments {
doc_offsets.push(offset);
offset += segment.num_docs();
}
match crate::structures::IVFPQIndex::merge(&refs, &doc_offsets) {
Ok(merged) => {
let bytes = merged
.to_bytes()
.map_err(|e| crate::Error::Serialization(e.to_string()))?;
field_indexes.push((field.0, 2u8, bytes)); continue;
}
Err(e) => {
log::warn!("ScaNN merge failed: {}, falling back to IVF", e);
}
}
}
let ivf_indexes: Vec<_> = segments
.iter()
.filter_map(|s| s.get_ivf_vector_index(field))
.collect();
if ivf_indexes.len()
== segments
.iter()
.filter(|s| s.has_dense_vector_index(field))
.count()
&& !ivf_indexes.is_empty()
{
let refs: Vec<&crate::structures::IVFRaBitQIndex> =
ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
let mut doc_offsets = Vec::with_capacity(segments.len());
let mut offset = 0u32;
for segment in segments {
doc_offsets.push(offset);
offset += segment.num_docs();
}
match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offsets) {
Ok(merged) => {
let bytes = merged
.to_bytes()
.map_err(|e| crate::Error::Serialization(e.to_string()))?;
field_indexes.push((field.0, 1u8, bytes)); continue;
}
Err(e) => {
log::warn!("IVF merge failed: {}, falling back to rebuild", e);
}
}
}
let mut all_vectors: Vec<Vec<f32>> = Vec::new();
for segment in segments {
if let Some(index) = segment.get_dense_vector_index(field)
&& let Some(raw_vecs) = &index.raw_vectors
{
all_vectors.extend(raw_vecs.iter().cloned());
}
}
if !all_vectors.is_empty() {
let dim = all_vectors[0].len();
let config = RaBitQConfig::new(dim);
let merged_index = RaBitQIndex::build(config, &all_vectors, true);
let index_bytes = serde_json::to_vec(&merged_index)
.map_err(|e| crate::Error::Serialization(e.to_string()))?;
field_indexes.push((field.0, 0u8, index_bytes)); }
}
if !field_indexes.is_empty() {
field_indexes.sort_by_key(|(id, _, _)| *id);
let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
let mut output = Vec::new();
output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
let mut current_offset = header_size as u64;
for (field_id, index_type, data) in &field_indexes {
output.write_u32::<LittleEndian>(*field_id)?;
output.write_u8(*index_type)?;
output.write_u64::<LittleEndian>(current_offset)?;
output.write_u64::<LittleEndian>(data.len() as u64)?;
current_offset += data.len() as u64;
}
for (_, _, data) in field_indexes {
output.extend_from_slice(&data);
}
let output_size = output.len();
dir.write(&files.vectors, &output).await?;
return Ok(output_size);
}
Ok(0)
}
}
pub struct TrainedVectorStructures {
pub centroids: rustc_hash::FxHashMap<u32, Arc<crate::structures::CoarseCentroids>>,
pub codebooks: rustc_hash::FxHashMap<u32, Arc<crate::structures::PQCodebook>>,
}
impl SegmentMerger {
pub async fn merge_with_ann<D: Directory + DirectoryWriter>(
&self,
dir: &D,
segments: &[SegmentReader],
new_segment_id: SegmentId,
trained: &TrainedVectorStructures,
) -> Result<SegmentMeta> {
let files = SegmentFiles::new(new_segment_id.0);
let mut term_dict_data = Vec::new();
let mut postings_data = Vec::new();
let mut stats = MergeStats::default();
self.merge_postings_with_stats(
segments,
&mut term_dict_data,
&mut postings_data,
&mut stats,
)
.await?;
let mut store_data = Vec::new();
{
let mut store_merger = StoreMerger::new(&mut store_data);
for segment in segments {
let raw_blocks = segment.store_raw_blocks();
let data_slice = segment.store_data_slice();
store_merger.append_store(data_slice, &raw_blocks).await?;
}
store_merger.finish()?;
}
dir.write(&files.term_dict, &term_dict_data).await?;
dir.write(&files.postings, &postings_data).await?;
dir.write(&files.store, &store_data).await?;
drop(term_dict_data);
drop(postings_data);
drop(store_data);
let vectors_bytes = self
.build_ann_vectors(dir, segments, &files, trained)
.await?;
let mut merged_field_stats: rustc_hash::FxHashMap<u32, FieldStats> =
rustc_hash::FxHashMap::default();
for segment in segments {
for (&field_id, field_stats) in &segment.meta().field_stats {
let entry = merged_field_stats.entry(field_id).or_default();
entry.total_tokens += field_stats.total_tokens;
entry.doc_count += field_stats.doc_count;
}
}
let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
let meta = SegmentMeta {
id: new_segment_id.0,
num_docs: total_docs,
field_stats: merged_field_stats,
};
dir.write(&files.meta, &meta.serialize()?).await?;
log::info!(
"ANN merge complete: {} docs, vectors={}",
total_docs,
MergeStats::format_memory(vectors_bytes)
);
Ok(meta)
}
async fn build_ann_vectors<D: Directory + DirectoryWriter>(
&self,
dir: &D,
segments: &[SegmentReader],
files: &SegmentFiles,
trained: &TrainedVectorStructures,
) -> Result<usize> {
use crate::dsl::VectorIndexType;
use byteorder::{LittleEndian, WriteBytesExt};
let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
for (field, entry) in self.schema.fields() {
if !matches!(entry.field_type, FieldType::DenseVector) || !entry.indexed {
continue;
}
let config = match &entry.dense_vector_config {
Some(c) => c,
None => continue,
};
let mut all_vectors: Vec<Vec<f32>> = Vec::new();
let mut all_doc_ids: Vec<u32> = Vec::new();
let mut doc_offset = 0u32;
for segment in segments {
if let Some(super::VectorIndex::Flat(flat_data)) =
segment.vector_indexes().get(&field.0)
{
for (vec, &local_doc_id) in
flat_data.vectors.iter().zip(flat_data.doc_ids.iter())
{
all_vectors.push(vec.clone());
all_doc_ids.push(doc_offset + local_doc_id);
}
}
doc_offset += segment.num_docs();
}
if all_vectors.is_empty() {
continue;
}
let dim = config.index_dim();
match config.index_type {
VectorIndexType::IvfRaBitQ => {
if let Some(centroids) = trained.centroids.get(&field.0) {
let rabitq_config = crate::structures::RaBitQConfig::new(dim);
let codebook = crate::structures::RaBitQCodebook::new(rabitq_config);
let ivf_config = crate::structures::IVFRaBitQConfig::new(dim)
.with_store_raw(config.store_raw);
let ivf_index = crate::structures::IVFRaBitQIndex::build(
ivf_config,
centroids,
&codebook,
&all_vectors,
Some(&all_doc_ids),
);
let index_data = super::builder::IVFRaBitQIndexData {
centroids: (**centroids).clone(),
codebook,
index: ivf_index,
};
let bytes = index_data
.to_bytes()
.map_err(|e| crate::Error::Serialization(e.to_string()))?;
field_indexes.push((field.0, 1u8, bytes));
log::info!(
"Built IVF-RaBitQ index for field {} with {} vectors",
field.0,
all_vectors.len()
);
continue;
}
}
VectorIndexType::ScaNN => {
if let (Some(centroids), Some(codebook)) = (
trained.centroids.get(&field.0),
trained.codebooks.get(&field.0),
) {
let ivf_pq_config = crate::structures::IVFPQConfig::new(dim);
let ivf_pq_index = crate::structures::IVFPQIndex::build(
ivf_pq_config,
centroids,
codebook,
&all_vectors,
Some(&all_doc_ids),
);
let index_data = super::builder::ScaNNIndexData {
centroids: (**centroids).clone(),
codebook: (**codebook).clone(),
index: ivf_pq_index,
};
let bytes = index_data
.to_bytes()
.map_err(|e| crate::Error::Serialization(e.to_string()))?;
field_indexes.push((field.0, 2u8, bytes));
log::info!(
"Built ScaNN index for field {} with {} vectors",
field.0,
all_vectors.len()
);
continue;
}
}
_ => {}
}
let flat_data = super::builder::FlatVectorData {
dim,
vectors: all_vectors,
doc_ids: all_doc_ids,
};
let bytes = serde_json::to_vec(&flat_data)
.map_err(|e| crate::Error::Serialization(e.to_string()))?;
field_indexes.push((field.0, 3u8, bytes)); }
if !field_indexes.is_empty() {
field_indexes.sort_by_key(|(id, _, _)| *id);
let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
let mut output = Vec::new();
output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
let mut current_offset = header_size as u64;
for (field_id, index_type, data) in &field_indexes {
output.write_u32::<LittleEndian>(*field_id)?;
output.write_u8(*index_type)?;
output.write_u64::<LittleEndian>(current_offset)?;
output.write_u64::<LittleEndian>(data.len() as u64)?;
current_offset += data.len() as u64;
}
for (_, _, data) in field_indexes {
output.extend_from_slice(&data);
}
let output_size = output.len();
dir.write(&files.vectors, &output).await?;
return Ok(output_size);
}
Ok(0)
}
}
pub async fn delete_segment<D: Directory + DirectoryWriter>(
dir: &D,
segment_id: SegmentId,
) -> Result<()> {
let files = SegmentFiles::new(segment_id.0);
let _ = dir.delete(&files.term_dict).await;
let _ = dir.delete(&files.postings).await;
let _ = dir.delete(&files.store).await;
let _ = dir.delete(&files.meta).await;
let _ = dir.delete(&files.vectors).await;
Ok(())
}