use crate::lexical::core::field::FieldValue;
use std::sync::Arc;
use ahash::AHashSet;
use crate::error::{LaurusError, Result};
use crate::lexical::core::document::Document;
use crate::lexical::index::inverted::core::posting::TermPostingIndex;
use crate::lexical::index::inverted::reader::InvertedIndexReader;
use crate::lexical::index::inverted::segment::SegmentInfo;
use crate::lexical::index::inverted::segment::manager::{
ManagedSegmentInfo, MergeCandidate, MergeStrategy,
};
use crate::lexical::index::structures::dictionary::TermDictionaryBuilder;
use crate::lexical::index::structures::dictionary::TermInfo;
use crate::lexical::reader::LexicalIndexReader;
use crate::storage::Storage;
use crate::storage::structured::StructWriter;
#[derive(Debug, Clone)]
pub struct MergeConfig {
pub max_memory_mb: u64,
pub batch_size: usize,
pub enable_compression: bool,
pub remove_deleted_docs: bool,
pub sort_by_doc_id: bool,
pub verify_after_merge: bool,
}
impl Default for MergeConfig {
fn default() -> Self {
MergeConfig {
max_memory_mb: 256,
batch_size: 10000,
enable_compression: true,
remove_deleted_docs: true,
sort_by_doc_id: true,
verify_after_merge: true,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct MergeStats {
pub segments_merged: usize,
pub docs_processed: u64,
pub deleted_docs_removed: u64,
pub size_before: u64,
pub size_after: u64,
pub merge_time_ms: u64,
pub compression_ratio: f64,
pub terms_merged: u64,
pub postings_merged: u64,
pub shard_id: u16,
}
impl MergeStats {
pub fn space_savings(&self) -> f64 {
if self.size_before == 0 {
0.0
} else {
((self.size_before - self.size_after) as f64 / self.size_before as f64) * 100.0
}
}
}
#[derive(Debug)]
pub struct MergeResult {
pub new_segment: ManagedSegmentInfo,
pub stats: MergeStats,
pub file_paths: Vec<String>,
}
#[derive(Debug)]
pub struct MergeEngine {
config: MergeConfig,
storage: Arc<dyn Storage>,
}
impl MergeEngine {
pub fn new(config: MergeConfig, storage: Arc<dyn Storage>) -> Self {
MergeEngine { config, storage }
}
pub fn merge_segments(
&self,
candidate: &MergeCandidate,
segments: &[ManagedSegmentInfo],
next_generation: u64,
) -> Result<MergeResult> {
let start_millis = crate::util::time::now_millis();
let segments_to_merge: Vec<_> = segments
.iter()
.filter(|seg| candidate.segments.contains(&seg.segment_info.segment_id))
.collect();
if segments_to_merge.is_empty() {
return Err(LaurusError::index("No segments found to merge"));
}
let new_segment_id = format!("merged_{next_generation}");
let mut stats = MergeStats {
segments_merged: segments_to_merge.len(),
size_before: segments_to_merge.iter().map(|s| s.size_bytes).sum(),
..Default::default()
};
let merge_result = match candidate.strategy {
MergeStrategy::SizeBased => self.merge_by_size(&segments_to_merge, &new_segment_id)?,
MergeStrategy::DeletionBased => {
self.merge_by_deletion(&segments_to_merge, &new_segment_id)?
}
MergeStrategy::TimeBased => self.merge_by_time(&segments_to_merge, &new_segment_id)?,
MergeStrategy::Balanced => self.merge_balanced(&segments_to_merge, &new_segment_id)?,
};
let end_millis = crate::util::time::now_millis();
stats.merge_time_ms = end_millis.saturating_sub(start_millis);
stats.size_after = merge_result.new_segment.size_bytes;
stats.compression_ratio = if stats.size_before > 0 {
stats.size_after as f64 / stats.size_before as f64
} else {
1.0
};
let mut final_result = merge_result;
final_result.stats = stats;
if self.config.verify_after_merge {
self.verify_merged_segment(&final_result.new_segment)?;
}
Ok(final_result)
}
fn merge_by_size(
&self,
segments: &[&ManagedSegmentInfo],
new_segment_id: &str,
) -> Result<MergeResult> {
let mut sorted_segments = segments.to_vec();
sorted_segments.sort_by_key(|s| s.size_bytes);
self.perform_merge(&sorted_segments, new_segment_id)
}
fn merge_by_deletion(
&self,
segments: &[&ManagedSegmentInfo],
new_segment_id: &str,
) -> Result<MergeResult> {
let mut sorted_segments = segments.to_vec();
sorted_segments
.sort_by(|a, b| b.deletion_ratio().partial_cmp(&a.deletion_ratio()).unwrap());
self.perform_merge(&sorted_segments, new_segment_id)
}
fn merge_by_time(
&self,
segments: &[&ManagedSegmentInfo],
new_segment_id: &str,
) -> Result<MergeResult> {
let mut sorted_segments = segments.to_vec();
sorted_segments.sort_by_key(|s| s.created_at);
self.perform_merge(&sorted_segments, new_segment_id)
}
fn merge_balanced(
&self,
segments: &[&ManagedSegmentInfo],
new_segment_id: &str,
) -> Result<MergeResult> {
let mut scored_segments: Vec<_> = segments
.iter()
.map(|seg| {
let size_score = 1.0 / (seg.size_bytes as f64 + 1.0); let deletion_score = seg.deletion_ratio() * 2.0; let age_score = 1.0 / (seg.created_at as f64 + 1.0);
let composite_score = size_score + deletion_score + age_score;
(*seg, composite_score)
})
.collect();
scored_segments.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
let sorted_segments: Vec<_> = scored_segments.into_iter().map(|(seg, _)| seg).collect();
self.perform_merge(&sorted_segments, new_segment_id)
}
fn perform_merge(
&self,
segments: &[&ManagedSegmentInfo],
new_segment_id: &str,
) -> Result<MergeResult> {
let mut stats = MergeStats {
segments_merged: segments.len(),
shard_id: segments
.first()
.map(|s| s.segment_info.shard_id)
.unwrap_or(0),
..Default::default()
};
let mut merged_index = TermPostingIndex::new();
let mut all_documents = Vec::new();
let mut deleted_doc_ids = AHashSet::<u64>::new();
for segment in segments {
if segment.segment_info.has_deletions {
let bitmap_file = format!("{}.delmap", segment.segment_info.segment_id);
if let Ok(input) = self.storage.open_input(&bitmap_file) {
use crate::maintenance::deletion::DeletionBitmap;
use crate::storage::structured::StructReader;
if let Ok(mut reader) = StructReader::new(input)
&& let Ok(bitmap) = DeletionBitmap::read_from_storage(&mut reader)
{
for doc_id in bitmap.get_deleted_docs() {
deleted_doc_ids.insert(doc_id);
}
}
}
}
}
for segment in segments {
let segment_reader = self.load_segment_reader(&segment.segment_info)?;
let mut batch_docs = Vec::new();
for doc_id in segment_reader.doc_ids()? {
if self.config.remove_deleted_docs && deleted_doc_ids.contains(&doc_id) {
stats.deleted_docs_removed += 1;
continue;
}
if let Some(document) = segment_reader.document(doc_id)? {
batch_docs.push((doc_id, document));
if batch_docs.len() >= self.config.batch_size {
self.process_document_batch(&mut merged_index, &mut batch_docs)?;
all_documents.append(&mut batch_docs);
stats.docs_processed += self.config.batch_size as u64;
}
}
}
if !batch_docs.is_empty() {
self.process_document_batch(&mut merged_index, &mut batch_docs)?;
stats.docs_processed += batch_docs.len() as u64;
all_documents.extend(batch_docs);
}
}
if self.config.sort_by_doc_id {
all_documents.sort_by_key(|(doc_id, _)| *doc_id);
}
let min_doc_id = all_documents.iter().map(|(id, _)| *id).min().unwrap_or(0);
let max_doc_id = all_documents.iter().map(|(id, _)| *id).max().unwrap_or(0);
let segment_info = SegmentInfo {
segment_id: new_segment_id.to_string(),
doc_count: all_documents.len() as u64,
min_doc_id,
max_doc_id,
generation: 0, has_deletions: false, shard_id: stats.shard_id,
};
let file_paths = self.write_merged_segment(&segment_info, &merged_index, &all_documents)?;
let size_bytes = file_paths
.iter()
.map(|path| {
self.storage
.metadata(path)
.map(|meta| meta.size)
.unwrap_or(0)
})
.sum();
let mut managed_info = ManagedSegmentInfo::new(segment_info);
managed_info.size_bytes = size_bytes;
managed_info.file_paths = file_paths.clone();
stats.terms_merged = merged_index.term_count();
stats.postings_merged = merged_index.doc_count();
Ok(MergeResult {
new_segment: managed_info,
stats,
file_paths,
})
}
fn load_segment_reader(
&self,
segment_info: &SegmentInfo,
) -> Result<Box<dyn LexicalIndexReader>> {
let segments = vec![segment_info.clone()];
let config = crate::lexical::index::inverted::reader::InvertedIndexReaderConfig::default();
let reader = InvertedIndexReader::new(segments, self.storage.clone(), config)?;
Ok(Box::new(reader) as Box<dyn LexicalIndexReader>)
}
fn process_document_batch(
&self,
merged_index: &mut TermPostingIndex,
documents: &mut [(u64, Document)],
) -> Result<()> {
for (doc_id, document) in documents {
let document_terms: Vec<(String, u32, Option<Vec<u32>>)> = document
.fields
.keys()
.map(|field_name| {
(field_name.clone(), 1, None) })
.collect();
merged_index.add_document(*doc_id, document_terms);
}
Ok(())
}
fn write_merged_segment(
&self,
segment_info: &SegmentInfo,
merged_index: &TermPostingIndex,
documents: &[(u64, Document)],
) -> Result<Vec<String>> {
let mut file_paths = Vec::new();
let index_file = format!("{}.idx", segment_info.segment_id);
{
let output = self.storage.create_output(&index_file)?;
let mut writer = StructWriter::new(output);
merged_index.write_to_storage(&mut writer)?;
writer.close()?;
file_paths.push(index_file);
}
let dict_file = format!("{}.dict", segment_info.segment_id);
{
let output = self.storage.create_output(&dict_file)?;
let mut writer = StructWriter::new(output);
let mut builder = TermDictionaryBuilder::new();
for term in merged_index.terms() {
let postings = merged_index.get_posting_list(term).unwrap();
let term_info = TermInfo::new(
0, postings.len() as u64 * 16, postings.len() as u64,
postings.iter().map(|p| p.frequency as u64).sum(),
);
builder.add_term(term.clone(), term_info);
}
let dictionary = builder.build_sorted();
dictionary.write_to_storage(&mut writer)?;
writer.close()?;
file_paths.push(dict_file);
}
let docs_file = format!("{}.docs", segment_info.segment_id);
{
let output = self.storage.create_output(&docs_file)?;
let mut writer = StructWriter::new(output);
writer.write_varint(documents.len() as u64)?;
for (doc_id, document) in documents {
writer.write_u64(*doc_id)?;
writer.write_varint(document.fields.len() as u64)?;
for (field_name, field_value) in &document.fields {
writer.write_string(field_name)?;
let field_str = match field_value {
FieldValue::Text(s) => s.clone(),
FieldValue::Int64(i) => i.to_string(),
FieldValue::Float64(f) => f.to_string(),
FieldValue::Bool(b) => b.to_string(),
FieldValue::Bytes(data, mime) => {
format!("[blob: {:?} ({} bytes)]", mime, data.len())
}
FieldValue::DateTime(dt) => dt.to_rfc3339(),
FieldValue::Geo(lat, lon) => {
format!("{},{}", lat, lon)
}
FieldValue::Vector(v) => format!("[vector: {} dims]", v.len()),
FieldValue::Null => "null".to_string(),
};
writer.write_string(&field_str)?;
}
}
writer.close()?;
file_paths.push(docs_file);
}
Ok(file_paths)
}
fn verify_merged_segment(&self, segment: &ManagedSegmentInfo) -> Result<()> {
let reader = self.load_segment_reader(&segment.segment_info)?;
if reader.doc_count() != segment.segment_info.doc_count {
return Err(LaurusError::index("Document count mismatch after merge"));
}
Ok(())
}
pub fn get_config(&self) -> &MergeConfig {
&self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::lexical::index::inverted::segment::SegmentInfo;
use crate::lexical::index::inverted::segment::manager::ManagedSegmentInfo;
use crate::storage::memory::MemoryStorage;
use crate::storage::memory::MemoryStorageConfig;
#[allow(dead_code)]
fn create_test_segment(id: &str, doc_count: u64) -> ManagedSegmentInfo {
let segment_info = SegmentInfo {
segment_id: id.to_string(),
doc_count,
min_doc_id: 0,
max_doc_id: doc_count.saturating_sub(1),
generation: 1,
has_deletions: false,
shard_id: 0, };
ManagedSegmentInfo::new(segment_info)
}
#[test]
fn test_merge_engine_creation() {
let config = MergeConfig::default();
let storage = Arc::new(MemoryStorage::new(MemoryStorageConfig::default()));
let engine = MergeEngine::new(config, storage);
assert_eq!(engine.config.batch_size, 10000);
assert!(engine.config.remove_deleted_docs);
}
#[test]
fn test_merge_config_default() {
let config = MergeConfig::default();
assert_eq!(config.max_memory_mb, 256);
assert_eq!(config.batch_size, 10000);
assert!(config.enable_compression);
assert!(config.remove_deleted_docs);
assert!(config.sort_by_doc_id);
assert!(config.verify_after_merge);
}
#[test]
fn test_merge_stats_space_savings() {
let stats = MergeStats {
size_before: 1000,
size_after: 800,
..Default::default()
};
assert_eq!(stats.space_savings(), 20.0);
let stats_zero = MergeStats {
size_before: 0,
size_after: 0,
..Default::default()
};
assert_eq!(stats_zero.space_savings(), 0.0);
}
}