use parking_lot::RwLock;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::catalog::Catalog;
use crate::config::EngineConfig;
use crate::error::Result;
use crate::segment::{Segment, SegmentWriter};
use crate::types::*;
pub struct LsmManager {
config: EngineConfig,
data_dir: PathBuf,
mutable_segment: RwLock<Option<MutableSegment>>,
sealed_segments: RwLock<Vec<Arc<Segment>>>,
next_segment_id: AtomicU64,
tombstones: RwLock<HashSet<(SegmentId, VectorId)>>,
}
impl LsmManager {
pub fn new(config: EngineConfig, data_dir: PathBuf) -> Self {
Self {
config,
data_dir,
mutable_segment: RwLock::new(None),
sealed_segments: RwLock::new(Vec::new()),
next_segment_id: AtomicU64::new(1),
tombstones: RwLock::new(HashSet::new()),
}
}
pub fn load_from_catalog(&self, catalog: &Catalog, collection_id: i64) -> Result<()> {
let segment_infos = catalog.get_segments(collection_id)?;
let mut sealed = self.sealed_segments.write();
for info in segment_infos {
if info.state == SegmentState::Sealed {
let segment = Segment::open(&info.path)?;
sealed.push(Arc::new(segment));
}
let current_max = self.next_segment_id.load(Ordering::SeqCst);
if info.id >= current_max {
self.next_segment_id.store(info.id + 1, Ordering::SeqCst);
}
}
for info in catalog.get_segments(collection_id)? {
let tombstone_ids = catalog.get_tombstones(info.id)?;
let mut tombstones = self.tombstones.write();
for vid in tombstone_ids {
tombstones.insert((info.id, vid));
}
}
Ok(())
}
pub fn insert(&self, vector: &[f32]) -> Result<(SegmentId, VectorId)> {
let mut mutable = self.mutable_segment.write();
if mutable.is_none() {
let seg_id = self.next_segment_id.fetch_add(1, Ordering::SeqCst);
let writer = SegmentWriter::new(self.config.clone())?;
*mutable = Some(MutableSegment { id: seg_id, writer });
}
let seg = mutable.as_mut().unwrap();
let vid = seg.writer.add(vector)?;
let current_seg_id = seg.id;
let should_seal = seg.writer.len() >= self.config.lsm.max_mutable_size;
if should_seal {
let mutable_seg = mutable.take().unwrap();
let sealed_seg = self.seal_mutable(mutable_seg)?;
drop(mutable);
let mut sealed = self.sealed_segments.write();
sealed.insert(0, sealed_seg);
if sealed.len() > self.config.lsm.max_segments {
drop(sealed);
self.trigger_compaction()?;
}
}
Ok((current_seg_id, vid))
}
pub fn delete(&self, segment_id: SegmentId, vec_id: VectorId) -> Result<()> {
let mut tombstones = self.tombstones.write();
tombstones.insert((segment_id, vec_id));
Ok(())
}
fn seal_mutable(&self, mutable: MutableSegment) -> Result<Arc<Segment>> {
let path = self.segment_path(mutable.id);
mutable.writer.build(&path)?;
let segment = Segment::open(&path)?;
Ok(Arc::new(segment))
}
fn segment_path(&self, seg_id: SegmentId) -> PathBuf {
self.data_dir.join(format!("segment_{:016x}.seg", seg_id))
}
pub fn flush(&self) -> Result<Option<Arc<Segment>>> {
let mut mutable = self.mutable_segment.write();
if let Some(seg) = mutable.take() {
if seg.writer.len() > 0 {
let sealed = self.seal_mutable(seg)?;
let mut sealed_list = self.sealed_segments.write();
sealed_list.insert(0, Arc::clone(&sealed));
return Ok(Some(sealed));
}
}
Ok(None)
}
pub fn get_query_segments(&self) -> Vec<Arc<Segment>> {
self.sealed_segments.read().clone()
}
pub fn is_tombstoned(&self, segment_id: SegmentId, vec_id: VectorId) -> bool {
self.tombstones.read().contains(&(segment_id, vec_id))
}
fn trigger_compaction(&self) -> Result<()> {
let sealed = self.sealed_segments.read();
let n_segments = sealed.len();
drop(sealed);
if n_segments >= self.config.lsm.compaction_ratio {
tracing::info!(
"Background compaction triggered: {} sealed segments (threshold: {})",
n_segments,
self.config.lsm.compaction_ratio
);
} else {
tracing::debug!(
"Compaction check: {} segments < threshold {}",
n_segments,
self.config.lsm.compaction_ratio
);
}
Ok(())
}
pub fn compact(&self, catalog: &Catalog, _collection_id: i64) -> Result<()> {
let mut sealed = self.sealed_segments.write();
if sealed.len() < self.config.lsm.compaction_ratio {
return Ok(());
}
let num_to_compact = self.config.lsm.compaction_ratio;
let start_idx = sealed.len() - num_to_compact;
let to_compact: Vec<Arc<Segment>> = sealed.drain(start_idx..).collect();
drop(sealed);
let new_seg_id = self.next_segment_id.fetch_add(1, Ordering::SeqCst);
let mut writer = SegmentWriter::new(self.config.clone())?;
for old_seg in &to_compact {
if let Some(fp32) = old_seg.fp32_data() {
let dim = old_seg.dim() as usize;
for vid in 0..old_seg.num_vectors() {
let old_seg_id = new_seg_id - 1; if self.is_tombstoned(old_seg_id, vid) {
continue;
}
let offset = vid as usize * dim;
let vec = &fp32[offset..offset + dim];
writer.add(vec)?;
}
}
}
let new_path = self.segment_path(new_seg_id);
if writer.len() > 0 {
writer.build(&new_path)?;
let new_segment = Segment::open(&new_path)?;
let mut sealed = self.sealed_segments.write();
sealed.push(Arc::new(new_segment));
}
for old_seg in &to_compact {
let path = old_seg.path();
if let Some(seg_id_str) = path.split("segment_").last() {
if let Some(id_hex) = seg_id_str.strip_suffix(".seg") {
if let Ok(seg_id) = u64::from_str_radix(id_hex, 16) {
catalog.update_segment_state(seg_id, SegmentState::Deleted)?;
catalog.clear_tombstones(seg_id)?;
}
}
}
}
Ok(())
}
pub fn vector_count(&self) -> u32 {
let mutable_count = self
.mutable_segment
.read()
.as_ref()
.map(|s| s.writer.len() as u32)
.unwrap_or(0);
let sealed_count: u32 = self
.sealed_segments
.read()
.iter()
.map(|s| s.num_vectors())
.sum();
let tombstone_count = self.tombstones.read().len() as u32;
mutable_count + sealed_count - tombstone_count
}
}
struct MutableSegment {
id: SegmentId,
writer: SegmentWriter,
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_lsm_insert_flush() {
let dir = tempdir().unwrap();
let config = EngineConfig::with_dim(64);
let lsm = LsmManager::new(config, dir.path().to_path_buf());
for i in 0..100 {
let vec: Vec<f32> = (0..64).map(|j| (i * 64 + j) as f32 / 1000.0).collect();
lsm.insert(&vec).unwrap();
}
let flushed = lsm.flush().unwrap();
assert!(flushed.is_some());
let segments = lsm.get_query_segments();
assert_eq!(segments.len(), 1);
assert_eq!(segments[0].num_vectors(), 100);
}
#[test]
fn test_lsm_tombstones() {
let dir = tempdir().unwrap();
let config = EngineConfig::with_dim(64);
let lsm = LsmManager::new(config, dir.path().to_path_buf());
let vec: Vec<f32> = (0..64).map(|i| i as f32).collect();
let (seg_id, vid) = lsm.insert(&vec).unwrap();
lsm.flush().unwrap();
lsm.delete(1, 0).unwrap();
assert!(lsm.is_tombstoned(1, 0));
}
}