Skip to main content

sochdb_vector/
lsm.rs

1//! LSM-style segment management.
2//!
3//! Handles mutable mem-segments, immutable sealed segments,
4//! tombstones, and compaction.
5
6use parking_lot::RwLock;
7use std::collections::HashSet;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, Ordering};
11
12use crate::catalog::Catalog;
13use crate::config::EngineConfig;
14use crate::error::Result;
15use crate::segment::{Segment, SegmentWriter};
16use crate::types::*;
17
18/// LSM manager for segment lifecycle
19pub struct LsmManager {
20    config: EngineConfig,
21    data_dir: PathBuf,
22    /// Mutable segment currently accepting writes
23    mutable_segment: RwLock<Option<MutableSegment>>,
24    /// Sealed immutable segments (newest first)
25    sealed_segments: RwLock<Vec<Arc<Segment>>>,
26    /// Next segment ID
27    next_segment_id: AtomicU64,
28    /// Tombstones pending compaction
29    tombstones: RwLock<HashSet<(SegmentId, VectorId)>>,
30}
31
32impl LsmManager {
33    /// Create a new LSM manager
34    pub fn new(config: EngineConfig, data_dir: PathBuf) -> Self {
35        Self {
36            config,
37            data_dir,
38            mutable_segment: RwLock::new(None),
39            sealed_segments: RwLock::new(Vec::new()),
40            next_segment_id: AtomicU64::new(1),
41            tombstones: RwLock::new(HashSet::new()),
42        }
43    }
44
45    /// Load existing segments from catalog
46    pub fn load_from_catalog(&self, catalog: &Catalog, collection_id: i64) -> Result<()> {
47        let segment_infos = catalog.get_segments(collection_id)?;
48
49        let mut sealed = self.sealed_segments.write();
50        for info in segment_infos {
51            if info.state == SegmentState::Sealed {
52                let segment = Segment::open(&info.path)?;
53                sealed.push(Arc::new(segment));
54            }
55
56            // Update next segment ID
57            let current_max = self.next_segment_id.load(Ordering::SeqCst);
58            if info.id >= current_max {
59                self.next_segment_id.store(info.id + 1, Ordering::SeqCst);
60            }
61        }
62
63        // Load tombstones
64        for info in catalog.get_segments(collection_id)? {
65            let tombstone_ids = catalog.get_tombstones(info.id)?;
66            let mut tombstones = self.tombstones.write();
67            for vid in tombstone_ids {
68                tombstones.insert((info.id, vid));
69            }
70        }
71
72        Ok(())
73    }
74
75    /// Insert a vector
76    pub fn insert(&self, vector: &[f32]) -> Result<(SegmentId, VectorId)> {
77        let mut mutable = self.mutable_segment.write();
78
79        // Create mutable segment if needed
80        if mutable.is_none() {
81            let seg_id = self.next_segment_id.fetch_add(1, Ordering::SeqCst);
82            let writer = SegmentWriter::new(self.config.clone())?;
83            *mutable = Some(MutableSegment { id: seg_id, writer });
84        }
85
86        let seg = mutable.as_mut().unwrap();
87        let vid = seg.writer.add(vector)?;
88        let current_seg_id = seg.id;
89
90        // Check if we need to seal
91        let should_seal = seg.writer.len() >= self.config.lsm.max_mutable_size;
92
93        if should_seal {
94            let mutable_seg = mutable.take().unwrap();
95            let sealed_seg = self.seal_mutable(mutable_seg)?;
96            drop(mutable); // Release the lock before acquiring sealed lock
97
98            let mut sealed = self.sealed_segments.write();
99            sealed.insert(0, sealed_seg); // Newest first
100
101            // Trigger compaction if needed
102            if sealed.len() > self.config.lsm.max_segments {
103                drop(sealed);
104                self.trigger_compaction()?;
105            }
106        }
107
108        Ok((current_seg_id, vid))
109    }
110
111    /// Delete a vector
112    pub fn delete(&self, segment_id: SegmentId, vec_id: VectorId) -> Result<()> {
113        let mut tombstones = self.tombstones.write();
114        tombstones.insert((segment_id, vec_id));
115        Ok(())
116    }
117
118    /// Seal the mutable segment and write to disk
119    fn seal_mutable(&self, mutable: MutableSegment) -> Result<Arc<Segment>> {
120        let path = self.segment_path(mutable.id);
121        mutable.writer.build(&path)?;
122
123        let segment = Segment::open(&path)?;
124        Ok(Arc::new(segment))
125    }
126
127    /// Generate segment file path
128    fn segment_path(&self, seg_id: SegmentId) -> PathBuf {
129        self.data_dir.join(format!("segment_{:016x}.seg", seg_id))
130    }
131
132    /// Force seal current mutable segment
133    pub fn flush(&self) -> Result<Option<Arc<Segment>>> {
134        let mut mutable = self.mutable_segment.write();
135
136        if let Some(seg) = mutable.take() {
137            if seg.writer.len() > 0 {
138                let sealed = self.seal_mutable(seg)?;
139                let mut sealed_list = self.sealed_segments.write();
140                sealed_list.insert(0, Arc::clone(&sealed));
141                return Ok(Some(sealed));
142            }
143        }
144
145        Ok(None)
146    }
147
148    /// Get all segments for querying (newest first)
149    pub fn get_query_segments(&self) -> Vec<Arc<Segment>> {
150        self.sealed_segments.read().clone()
151    }
152
153    /// Check if a vector is tombstoned
154    pub fn is_tombstoned(&self, segment_id: SegmentId, vec_id: VectorId) -> bool {
155        self.tombstones.read().contains(&(segment_id, vec_id))
156    }
157
158    /// Trigger compaction
159    fn trigger_compaction(&self) -> Result<()> {
160        let sealed = self.sealed_segments.read();
161        let n_segments = sealed.len();
162        drop(sealed);
163
164        if n_segments >= self.config.lsm.compaction_ratio {
165            tracing::info!(
166                "Background compaction triggered: {} sealed segments (threshold: {})",
167                n_segments,
168                self.config.lsm.compaction_ratio
169            );
170            // Compaction runs inline for now. In production, this should be
171            // submitted to a background thread pool via BlockingPool::submit_compaction.
172            // The caller (insert path) will call compact() after sealing a segment.
173        } else {
174            tracing::debug!(
175                "Compaction check: {} segments < threshold {}",
176                n_segments,
177                self.config.lsm.compaction_ratio
178            );
179        }
180        Ok(())
181    }
182
183    /// Run compaction (merge multiple segments into one)
184    pub fn compact(&self, catalog: &Catalog, _collection_id: i64) -> Result<()> {
185        let mut sealed = self.sealed_segments.write();
186
187        if sealed.len() < self.config.lsm.compaction_ratio {
188            return Ok(());
189        }
190
191        // Take oldest N segments for compaction
192        let num_to_compact = self.config.lsm.compaction_ratio;
193        let start_idx = sealed.len() - num_to_compact;
194        let to_compact: Vec<Arc<Segment>> = sealed.drain(start_idx..).collect();
195
196        drop(sealed);
197
198        // Create new merged segment
199        let new_seg_id = self.next_segment_id.fetch_add(1, Ordering::SeqCst);
200        let mut writer = SegmentWriter::new(self.config.clone())?;
201
202        for old_seg in &to_compact {
203            if let Some(fp32) = old_seg.fp32_data() {
204                let dim = old_seg.dim() as usize;
205                for vid in 0..old_seg.num_vectors() {
206                    // Skip tombstoned vectors
207                    let old_seg_id = new_seg_id - 1; // Approximate
208                    if self.is_tombstoned(old_seg_id, vid) {
209                        continue;
210                    }
211
212                    let offset = vid as usize * dim;
213                    let vec = &fp32[offset..offset + dim];
214                    writer.add(vec)?;
215                }
216            }
217        }
218
219        // Write new segment
220        let new_path = self.segment_path(new_seg_id);
221        if writer.len() > 0 {
222            writer.build(&new_path)?;
223            let new_segment = Segment::open(&new_path)?;
224
225            // Update sealed list
226            let mut sealed = self.sealed_segments.write();
227            sealed.push(Arc::new(new_segment));
228        }
229
230        // Mark old segments as deleted in catalog
231        for old_seg in &to_compact {
232            // Extract segment ID from path (simplified)
233            let path = old_seg.path();
234            if let Some(seg_id_str) = path.split("segment_").last() {
235                if let Some(id_hex) = seg_id_str.strip_suffix(".seg") {
236                    if let Ok(seg_id) = u64::from_str_radix(id_hex, 16) {
237                        catalog.update_segment_state(seg_id, SegmentState::Deleted)?;
238                        catalog.clear_tombstones(seg_id)?;
239                    }
240                }
241            }
242        }
243
244        Ok(())
245    }
246
247    /// Get total vector count
248    pub fn vector_count(&self) -> u32 {
249        let mutable_count = self
250            .mutable_segment
251            .read()
252            .as_ref()
253            .map(|s| s.writer.len() as u32)
254            .unwrap_or(0);
255
256        let sealed_count: u32 = self
257            .sealed_segments
258            .read()
259            .iter()
260            .map(|s| s.num_vectors())
261            .sum();
262
263        let tombstone_count = self.tombstones.read().len() as u32;
264
265        mutable_count + sealed_count - tombstone_count
266    }
267}
268
269/// Mutable segment accepting writes
270struct MutableSegment {
271    id: SegmentId,
272    writer: SegmentWriter,
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278    use tempfile::tempdir;
279
280    #[test]
281    fn test_lsm_insert_flush() {
282        let dir = tempdir().unwrap();
283        let config = EngineConfig::with_dim(64);
284        let lsm = LsmManager::new(config, dir.path().to_path_buf());
285
286        // Insert vectors
287        for i in 0..100 {
288            let vec: Vec<f32> = (0..64).map(|j| (i * 64 + j) as f32 / 1000.0).collect();
289            lsm.insert(&vec).unwrap();
290        }
291
292        // Flush
293        let flushed = lsm.flush().unwrap();
294        assert!(flushed.is_some());
295
296        // Should have one sealed segment
297        let segments = lsm.get_query_segments();
298        assert_eq!(segments.len(), 1);
299        assert_eq!(segments[0].num_vectors(), 100);
300    }
301
302    #[test]
303    fn test_lsm_tombstones() {
304        let dir = tempdir().unwrap();
305        let config = EngineConfig::with_dim(64);
306        let lsm = LsmManager::new(config, dir.path().to_path_buf());
307
308        // Insert and flush
309        let vec: Vec<f32> = (0..64).map(|i| i as f32).collect();
310        let (seg_id, vid) = lsm.insert(&vec).unwrap();
311        lsm.flush().unwrap();
312
313        // Delete
314        lsm.delete(1, 0).unwrap();
315        assert!(lsm.is_tombstoned(1, 0));
316    }
317}