Skip to main content

hermes_core/merge/
scheduler.rs

1//! Segment manager - coordinates segment registration and background merging
2//!
3//! This module is only compiled with the "native" feature.
4//!
5//! The SegmentManager is the SOLE owner of `metadata.json`. All writes to metadata
6//! go through this manager, ensuring linearized access and consistency between
7//! the in-memory segment list and persisted state.
8//!
9//! **State separation:**
10//! - Building segments: Managed by IndexWriter (pending_builds)
11//! - Committed segments: Managed by SegmentManager (metadata.segments)
12//! - Merging segments: Subset of committed, tracked here (merging_segments)
13//!
14//! **Commit workflow:**
15//! 1. IndexWriter flushes builders, waits for builds to complete
16//! 2. Calls `register_segment()` for each completed segment
17//! 3. SegmentManager updates metadata atomically, triggers merge check (non-blocking)
18//!
19//! **Merge workflow (background):**
20//! 1. Acquires segments to merge (marks as merging)
21//! 2. Merges into new segment
22//! 3. Calls internal `complete_merge()` which atomically updates metadata
23
24use std::collections::HashSet;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
27
28use parking_lot::RwLock as SyncRwLock;
29use tokio::sync::{Notify, RwLock};
30
31use crate::directories::DirectoryWriter;
32use crate::error::{Error, Result};
33use crate::index::IndexMetadata;
34use crate::segment::{SegmentId, SegmentSnapshot, SegmentTracker};
35#[cfg(feature = "native")]
36use crate::segment::{SegmentMerger, SegmentReader};
37
38use super::{MergePolicy, SegmentInfo};
39
40/// Maximum number of concurrent merge operations
41const MAX_CONCURRENT_MERGES: usize = 2;
42
43/// Segment manager - coordinates segment registration and background merging
44///
45/// This is the SOLE owner of `metadata.json` ensuring linearized access.
46/// All segment list modifications and metadata updates go through here.
47///
48/// Uses RwLock for metadata to allow concurrent reads (search) while
49/// writes (indexing/merge) get exclusive access.
50pub struct SegmentManager<D: DirectoryWriter + 'static> {
51    /// Directory for segment operations
52    directory: Arc<D>,
53    /// Schema for segment operations
54    schema: Arc<crate::dsl::Schema>,
55    /// Unified metadata (segments + vector index state) - SOLE owner
56    /// RwLock allows concurrent reads, exclusive writes
57    metadata: Arc<RwLock<IndexMetadata>>,
58    /// The merge policy to use
59    merge_policy: Box<dyn MergePolicy>,
60    /// Count of in-flight background merges
61    pending_merges: Arc<AtomicUsize>,
62    /// Segments currently being merged (to avoid double-merging)
63    merging_segments: Arc<SyncRwLock<HashSet<String>>>,
64    /// Term cache blocks for segment readers during merge
65    term_cache_blocks: usize,
66    /// Notifier for merge completion (avoids busy-waiting)
67    merge_complete: Arc<Notify>,
68    /// Segment lifecycle tracker for reference counting
69    tracker: Arc<SegmentTracker>,
70    /// Pause flag to prevent new merges during ANN rebuild
71    merge_paused: Arc<AtomicBool>,
72}
73
74impl<D: DirectoryWriter + 'static> SegmentManager<D> {
75    /// Create a new segment manager with existing metadata
76    pub fn new(
77        directory: Arc<D>,
78        schema: Arc<crate::dsl::Schema>,
79        metadata: IndexMetadata,
80        merge_policy: Box<dyn MergePolicy>,
81        term_cache_blocks: usize,
82    ) -> Self {
83        // Initialize tracker and register existing segments
84        let tracker = Arc::new(SegmentTracker::new());
85        for seg_id in metadata.segment_metas.keys() {
86            tracker.register(seg_id);
87        }
88
89        Self {
90            directory,
91            schema,
92            metadata: Arc::new(RwLock::new(metadata)),
93            merge_policy,
94            pending_merges: Arc::new(AtomicUsize::new(0)),
95            merging_segments: Arc::new(SyncRwLock::new(HashSet::new())),
96            term_cache_blocks,
97            merge_complete: Arc::new(Notify::new()),
98            tracker,
99            merge_paused: Arc::new(AtomicBool::new(false)),
100        }
101    }
102
103    /// Get the current segment IDs (snapshot)
104    pub async fn get_segment_ids(&self) -> Vec<String> {
105        self.metadata.read().await.segment_ids()
106    }
107
108    /// Get the number of pending background merges
109    pub fn pending_merge_count(&self) -> usize {
110        self.pending_merges.load(Ordering::SeqCst)
111    }
112
113    /// Get a clone of the metadata Arc for read access
114    pub fn metadata(&self) -> Arc<RwLock<IndexMetadata>> {
115        Arc::clone(&self.metadata)
116    }
117
118    /// Update metadata with a closure and persist atomically
119    pub async fn update_metadata<F>(&self, f: F) -> Result<()>
120    where
121        F: FnOnce(&mut IndexMetadata),
122    {
123        let mut meta = self.metadata.write().await;
124        f(&mut meta);
125        meta.save(self.directory.as_ref()).await
126    }
127
128    /// Acquire a snapshot of current segments for reading
129    /// The snapshot holds references - segments won't be deleted while snapshot exists
130    pub async fn acquire_snapshot(&self) -> SegmentSnapshot<D> {
131        let acquired = {
132            let meta = self.metadata.read().await;
133            let segment_ids = meta.segment_ids();
134            self.tracker.acquire(&segment_ids)
135        };
136
137        SegmentSnapshot::new(
138            Arc::clone(&self.tracker),
139            Arc::clone(&self.directory),
140            acquired,
141        )
142    }
143
144    /// Get the segment tracker
145    pub fn tracker(&self) -> Arc<SegmentTracker> {
146        Arc::clone(&self.tracker)
147    }
148
149    /// Get the directory
150    pub fn directory(&self) -> Arc<D> {
151        Arc::clone(&self.directory)
152    }
153}
154
155/// Native-only methods for SegmentManager (merging, segment registration)
156#[cfg(feature = "native")]
157impl<D: DirectoryWriter + 'static> SegmentManager<D> {
158    /// Register a new segment with its doc count, persist metadata, and trigger merge check
159    ///
160    /// This is the main entry point for adding segments after builds complete.
161    pub async fn register_segment(&self, segment_id: String, num_docs: u32) -> Result<()> {
162        {
163            let mut meta = self.metadata.write().await;
164            if !meta.has_segment(&segment_id) {
165                meta.add_segment(segment_id.clone(), num_docs);
166                self.tracker.register(&segment_id);
167            }
168            meta.save(self.directory.as_ref()).await?;
169        }
170
171        // Check if we should trigger a merge (non-blocking)
172        self.maybe_merge().await;
173        Ok(())
174    }
175
176    /// Check merge policy and spawn background merges if needed
177    /// Uses doc counts from metadata - no segment loading required
178    pub async fn maybe_merge(&self) {
179        // Get current segment info from metadata (no segment loading!)
180        let segments: Vec<SegmentInfo> = {
181            let meta = self.metadata.read().await;
182            let merging = self.merging_segments.read();
183
184            // Filter out segments currently being merged or pending deletion
185            meta.segment_metas
186                .iter()
187                .filter(|(id, _)| !merging.contains(*id) && !self.tracker.is_pending_deletion(id))
188                .map(|(id, info)| SegmentInfo {
189                    id: id.clone(),
190                    num_docs: info.num_docs,
191                    size_bytes: None,
192                })
193                .collect()
194        };
195
196        // Ask merge policy for candidates
197        let candidates = self.merge_policy.find_merges(&segments);
198
199        for candidate in candidates {
200            if candidate.segment_ids.len() >= 2 {
201                self.spawn_merge(candidate.segment_ids);
202            }
203        }
204    }
205
206    /// Pause background merges (used during ANN rebuild to prevent races)
207    pub fn pause_merges(&self) {
208        self.merge_paused.store(true, Ordering::SeqCst);
209    }
210
211    /// Resume background merges
212    pub fn resume_merges(&self) {
213        self.merge_paused.store(false, Ordering::SeqCst);
214    }
215
216    /// Spawn a background merge task
217    fn spawn_merge(&self, segment_ids_to_merge: Vec<String>) {
218        // Skip if merges are paused (during ANN rebuild)
219        if self.merge_paused.load(Ordering::SeqCst) {
220            return;
221        }
222        // Limit concurrent merges to avoid overwhelming the system during heavy indexing
223        if self.pending_merges.load(Ordering::SeqCst) >= MAX_CONCURRENT_MERGES {
224            return;
225        }
226
227        // Atomically check and mark segments as being merged
228        // This prevents race conditions where multiple maybe_merge calls
229        // could pick the same segments before they're marked
230        {
231            let mut merging = self.merging_segments.write();
232            // Check if any segment is already being merged
233            if segment_ids_to_merge.iter().any(|id| merging.contains(id)) {
234                // Some segment already being merged, skip this merge
235                return;
236            }
237            // Mark all segments as being merged
238            for id in &segment_ids_to_merge {
239                merging.insert(id.clone());
240            }
241        }
242
243        let directory = Arc::clone(&self.directory);
244        let schema = Arc::clone(&self.schema);
245        let metadata = Arc::clone(&self.metadata);
246        let merging_segments = Arc::clone(&self.merging_segments);
247        let pending_merges = Arc::clone(&self.pending_merges);
248        let merge_complete = Arc::clone(&self.merge_complete);
249        let tracker = Arc::clone(&self.tracker);
250        let term_cache_blocks = self.term_cache_blocks;
251
252        pending_merges.fetch_add(1, Ordering::SeqCst);
253
254        tokio::spawn(async move {
255            let result = Self::do_merge(
256                directory.as_ref(),
257                &schema,
258                &segment_ids_to_merge,
259                term_cache_blocks,
260                &metadata,
261            )
262            .await;
263
264            match result {
265                Ok((new_segment_id, merged_doc_count)) => {
266                    // Register new segment with tracker
267                    tracker.register(&new_segment_id);
268
269                    // Atomically update metadata: remove merged segments, add new one with doc count
270                    {
271                        let mut meta = metadata.write().await;
272                        for id in &segment_ids_to_merge {
273                            meta.remove_segment(id);
274                        }
275                        meta.add_segment(new_segment_id, merged_doc_count);
276                        if let Err(e) = meta.save(directory.as_ref()).await {
277                            eprintln!("[merge] Failed to save metadata after merge: {:?}", e);
278                        }
279                    }
280
281                    // Mark old segments for deletion via tracker (deferred if refs exist)
282                    let ready_to_delete = tracker.mark_for_deletion(&segment_ids_to_merge);
283                    for segment_id in ready_to_delete {
284                        let _ =
285                            crate::segment::delete_segment(directory.as_ref(), segment_id).await;
286                    }
287                }
288                Err(e) => {
289                    eprintln!(
290                        "Background merge failed for segments {:?}: {:?}",
291                        segment_ids_to_merge, e
292                    );
293                }
294            }
295
296            // Remove from merging set
297            {
298                let mut merging = merging_segments.write();
299                for id in &segment_ids_to_merge {
300                    merging.remove(id);
301                }
302            }
303
304            // Decrement pending merges counter and notify waiters
305            pending_merges.fetch_sub(1, Ordering::SeqCst);
306            merge_complete.notify_waiters();
307        });
308    }
309
310    /// Perform the actual merge operation (runs in background task)
311    /// Returns (new_segment_id, total_doc_count)
312    async fn do_merge(
313        directory: &D,
314        schema: &crate::dsl::Schema,
315        segment_ids_to_merge: &[String],
316        term_cache_blocks: usize,
317        metadata: &RwLock<IndexMetadata>,
318    ) -> Result<(String, u32)> {
319        // Load segment readers
320        let mut readers = Vec::new();
321        let mut doc_offset = 0u32;
322        let mut total_docs = 0u32;
323
324        for id_str in segment_ids_to_merge {
325            let segment_id = SegmentId::from_hex(id_str)
326                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
327            let reader = match SegmentReader::open(
328                directory,
329                segment_id,
330                Arc::new(schema.clone()),
331                doc_offset,
332                term_cache_blocks,
333            )
334            .await
335            {
336                Ok(r) => r,
337                Err(e) => {
338                    eprintln!("[merge] Failed to open segment {}: {:?}", id_str, e);
339                    return Err(e);
340                }
341            };
342            let num_docs = reader.meta().num_docs;
343            total_docs += num_docs;
344            doc_offset += num_docs;
345            readers.push(reader);
346        }
347
348        // Check if trained structures exist — if so, use merge_with_ann
349        // to preserve ANN indexes when merging mixed IVF+Flat segments
350        let (trained_centroids, trained_codebooks) = {
351            let meta = metadata.read().await;
352            meta.load_trained_structures(directory).await
353        };
354
355        let merger = SegmentMerger::new(Arc::new(schema.clone()));
356        let new_segment_id = SegmentId::new();
357
358        let merge_result = if !trained_centroids.is_empty() {
359            let trained = crate::segment::TrainedVectorStructures {
360                centroids: trained_centroids,
361                codebooks: trained_codebooks,
362            };
363            merger
364                .merge_with_ann(directory, &readers, new_segment_id, &trained)
365                .await
366        } else {
367            merger.merge(directory, &readers, new_segment_id).await
368        };
369
370        if let Err(e) = merge_result {
371            eprintln!(
372                "[merge] Merge failed for segments {:?} -> {}: {:?}",
373                segment_ids_to_merge,
374                new_segment_id.to_hex(),
375                e
376            );
377            return Err(e);
378        }
379
380        // Note: Segment deletion is handled by the caller via tracker
381        Ok((new_segment_id.to_hex(), total_docs))
382    }
383
384    /// Wait for all pending merges to complete
385    pub async fn wait_for_merges(&self) {
386        while self.pending_merges.load(Ordering::SeqCst) > 0 {
387            self.merge_complete.notified().await;
388        }
389    }
390
391    /// Replace segment list atomically (for force merge / rebuild)
392    /// new_segments: Vec of (segment_id, num_docs) pairs
393    pub async fn replace_segments(
394        &self,
395        new_segments: Vec<(String, u32)>,
396        old_to_delete: Vec<String>,
397    ) -> Result<()> {
398        // Register new segments with tracker
399        for (seg_id, _) in &new_segments {
400            self.tracker.register(seg_id);
401        }
402
403        {
404            let mut meta = self.metadata.write().await;
405            meta.segment_metas.clear();
406            for (seg_id, num_docs) in new_segments {
407                meta.add_segment(seg_id, num_docs);
408            }
409            meta.save(self.directory.as_ref()).await?;
410        }
411
412        // Mark old segments for deletion via tracker (deferred if refs exist)
413        let ready_to_delete = self.tracker.mark_for_deletion(&old_to_delete);
414        for segment_id in ready_to_delete {
415            let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
416        }
417        Ok(())
418    }
419
420    /// Clean up orphan segment files that are not registered
421    ///
422    /// This can happen if the process halts after segment files are written
423    /// but before they are registered in metadata.json. Call this on startup
424    /// to reclaim disk space from incomplete operations.
425    ///
426    /// Returns the number of orphan segments deleted.
427    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
428        let registered_set: HashSet<String> = {
429            let meta = self.metadata.read().await;
430            meta.segment_metas.keys().cloned().collect()
431        };
432
433        // Find all segment files in directory
434        let mut orphan_ids: HashSet<String> = HashSet::new();
435
436        // List directory and find segment files
437        if let Ok(entries) = self.directory.list_files(std::path::Path::new("")).await {
438            for entry in entries {
439                let filename = entry.to_string_lossy();
440                // Match pattern: seg_{32 hex chars}.{ext}
441                if filename.starts_with("seg_") && filename.len() > 37 {
442                    // Extract the hex ID (32 chars after "seg_")
443                    let hex_part = &filename[4..36];
444                    if !registered_set.contains(hex_part) {
445                        orphan_ids.insert(hex_part.to_string());
446                    }
447                }
448            }
449        }
450
451        // Delete orphan segments
452        let mut deleted = 0;
453        for hex_id in &orphan_ids {
454            if let Some(segment_id) = SegmentId::from_hex(hex_id)
455                && crate::segment::delete_segment(self.directory.as_ref(), segment_id)
456                    .await
457                    .is_ok()
458            {
459                deleted += 1;
460            }
461        }
462
463        Ok(deleted)
464    }
465}