1use parking_lot::RwLock;
7use std::collections::HashSet;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, Ordering};
11
12use crate::catalog::Catalog;
13use crate::config::EngineConfig;
14use crate::error::Result;
15use crate::segment::{Segment, SegmentWriter};
16use crate::types::*;
17
18pub struct LsmManager {
20 config: EngineConfig,
21 data_dir: PathBuf,
22 mutable_segment: RwLock<Option<MutableSegment>>,
24 sealed_segments: RwLock<Vec<Arc<Segment>>>,
26 next_segment_id: AtomicU64,
28 tombstones: RwLock<HashSet<(SegmentId, VectorId)>>,
30}
31
32impl LsmManager {
33 pub fn new(config: EngineConfig, data_dir: PathBuf) -> Self {
35 Self {
36 config,
37 data_dir,
38 mutable_segment: RwLock::new(None),
39 sealed_segments: RwLock::new(Vec::new()),
40 next_segment_id: AtomicU64::new(1),
41 tombstones: RwLock::new(HashSet::new()),
42 }
43 }
44
45 pub fn load_from_catalog(&self, catalog: &Catalog, collection_id: i64) -> Result<()> {
47 let segment_infos = catalog.get_segments(collection_id)?;
48
49 let mut sealed = self.sealed_segments.write();
50 for info in segment_infos {
51 if info.state == SegmentState::Sealed {
52 let segment = Segment::open(&info.path)?;
53 sealed.push(Arc::new(segment));
54 }
55
56 let current_max = self.next_segment_id.load(Ordering::SeqCst);
58 if info.id >= current_max {
59 self.next_segment_id.store(info.id + 1, Ordering::SeqCst);
60 }
61 }
62
63 for info in catalog.get_segments(collection_id)? {
65 let tombstone_ids = catalog.get_tombstones(info.id)?;
66 let mut tombstones = self.tombstones.write();
67 for vid in tombstone_ids {
68 tombstones.insert((info.id, vid));
69 }
70 }
71
72 Ok(())
73 }
74
75 pub fn insert(&self, vector: &[f32]) -> Result<(SegmentId, VectorId)> {
77 let mut mutable = self.mutable_segment.write();
78
79 if mutable.is_none() {
81 let seg_id = self.next_segment_id.fetch_add(1, Ordering::SeqCst);
82 let writer = SegmentWriter::new(self.config.clone())?;
83 *mutable = Some(MutableSegment { id: seg_id, writer });
84 }
85
86 let seg = mutable.as_mut().unwrap();
87 let vid = seg.writer.add(vector)?;
88 let current_seg_id = seg.id;
89
90 let should_seal = seg.writer.len() >= self.config.lsm.max_mutable_size;
92
93 if should_seal {
94 let mutable_seg = mutable.take().unwrap();
95 let sealed_seg = self.seal_mutable(mutable_seg)?;
96 drop(mutable); let mut sealed = self.sealed_segments.write();
99 sealed.insert(0, sealed_seg); if sealed.len() > self.config.lsm.max_segments {
103 drop(sealed);
104 self.trigger_compaction()?;
105 }
106 }
107
108 Ok((current_seg_id, vid))
109 }
110
111 pub fn delete(&self, segment_id: SegmentId, vec_id: VectorId) -> Result<()> {
113 let mut tombstones = self.tombstones.write();
114 tombstones.insert((segment_id, vec_id));
115 Ok(())
116 }
117
118 fn seal_mutable(&self, mutable: MutableSegment) -> Result<Arc<Segment>> {
120 let path = self.segment_path(mutable.id);
121 mutable.writer.build(&path)?;
122
123 let segment = Segment::open(&path)?;
124 Ok(Arc::new(segment))
125 }
126
127 fn segment_path(&self, seg_id: SegmentId) -> PathBuf {
129 self.data_dir.join(format!("segment_{:016x}.seg", seg_id))
130 }
131
132 pub fn flush(&self) -> Result<Option<Arc<Segment>>> {
134 let mut mutable = self.mutable_segment.write();
135
136 if let Some(seg) = mutable.take() {
137 if seg.writer.len() > 0 {
138 let sealed = self.seal_mutable(seg)?;
139 let mut sealed_list = self.sealed_segments.write();
140 sealed_list.insert(0, Arc::clone(&sealed));
141 return Ok(Some(sealed));
142 }
143 }
144
145 Ok(None)
146 }
147
148 pub fn get_query_segments(&self) -> Vec<Arc<Segment>> {
150 self.sealed_segments.read().clone()
151 }
152
153 pub fn is_tombstoned(&self, segment_id: SegmentId, vec_id: VectorId) -> bool {
155 self.tombstones.read().contains(&(segment_id, vec_id))
156 }
157
158 fn trigger_compaction(&self) -> Result<()> {
160 let sealed = self.sealed_segments.read();
161 let n_segments = sealed.len();
162 drop(sealed);
163
164 if n_segments >= self.config.lsm.compaction_ratio {
165 tracing::info!(
166 "Background compaction triggered: {} sealed segments (threshold: {})",
167 n_segments,
168 self.config.lsm.compaction_ratio
169 );
170 } else {
174 tracing::debug!(
175 "Compaction check: {} segments < threshold {}",
176 n_segments,
177 self.config.lsm.compaction_ratio
178 );
179 }
180 Ok(())
181 }
182
183 pub fn compact(&self, catalog: &Catalog, _collection_id: i64) -> Result<()> {
185 let mut sealed = self.sealed_segments.write();
186
187 if sealed.len() < self.config.lsm.compaction_ratio {
188 return Ok(());
189 }
190
191 let num_to_compact = self.config.lsm.compaction_ratio;
193 let start_idx = sealed.len() - num_to_compact;
194 let to_compact: Vec<Arc<Segment>> = sealed.drain(start_idx..).collect();
195
196 drop(sealed);
197
198 let new_seg_id = self.next_segment_id.fetch_add(1, Ordering::SeqCst);
200 let mut writer = SegmentWriter::new(self.config.clone())?;
201
202 for old_seg in &to_compact {
203 if let Some(fp32) = old_seg.fp32_data() {
204 let dim = old_seg.dim() as usize;
205 for vid in 0..old_seg.num_vectors() {
206 let old_seg_id = new_seg_id - 1; if self.is_tombstoned(old_seg_id, vid) {
209 continue;
210 }
211
212 let offset = vid as usize * dim;
213 let vec = &fp32[offset..offset + dim];
214 writer.add(vec)?;
215 }
216 }
217 }
218
219 let new_path = self.segment_path(new_seg_id);
221 if writer.len() > 0 {
222 writer.build(&new_path)?;
223 let new_segment = Segment::open(&new_path)?;
224
225 let mut sealed = self.sealed_segments.write();
227 sealed.push(Arc::new(new_segment));
228 }
229
230 for old_seg in &to_compact {
232 let path = old_seg.path();
234 if let Some(seg_id_str) = path.split("segment_").last() {
235 if let Some(id_hex) = seg_id_str.strip_suffix(".seg") {
236 if let Ok(seg_id) = u64::from_str_radix(id_hex, 16) {
237 catalog.update_segment_state(seg_id, SegmentState::Deleted)?;
238 catalog.clear_tombstones(seg_id)?;
239 }
240 }
241 }
242 }
243
244 Ok(())
245 }
246
247 pub fn vector_count(&self) -> u32 {
249 let mutable_count = self
250 .mutable_segment
251 .read()
252 .as_ref()
253 .map(|s| s.writer.len() as u32)
254 .unwrap_or(0);
255
256 let sealed_count: u32 = self
257 .sealed_segments
258 .read()
259 .iter()
260 .map(|s| s.num_vectors())
261 .sum();
262
263 let tombstone_count = self.tombstones.read().len() as u32;
264
265 mutable_count + sealed_count - tombstone_count
266 }
267}
268
269struct MutableSegment {
271 id: SegmentId,
272 writer: SegmentWriter,
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278 use tempfile::tempdir;
279
280 #[test]
281 fn test_lsm_insert_flush() {
282 let dir = tempdir().unwrap();
283 let config = EngineConfig::with_dim(64);
284 let lsm = LsmManager::new(config, dir.path().to_path_buf());
285
286 for i in 0..100 {
288 let vec: Vec<f32> = (0..64).map(|j| (i * 64 + j) as f32 / 1000.0).collect();
289 lsm.insert(&vec).unwrap();
290 }
291
292 let flushed = lsm.flush().unwrap();
294 assert!(flushed.is_some());
295
296 let segments = lsm.get_query_segments();
298 assert_eq!(segments.len(), 1);
299 assert_eq!(segments[0].num_vectors(), 100);
300 }
301
302 #[test]
303 fn test_lsm_tombstones() {
304 let dir = tempdir().unwrap();
305 let config = EngineConfig::with_dim(64);
306 let lsm = LsmManager::new(config, dir.path().to_path_buf());
307
308 let vec: Vec<f32> = (0..64).map(|i| i as f32).collect();
310 let (seg_id, vid) = lsm.insert(&vec).unwrap();
311 lsm.flush().unwrap();
312
313 lsm.delete(1, 0).unwrap();
315 assert!(lsm.is_tombstoned(1, 0));
316 }
317}