Skip to main content

hermes_core/merge/
scheduler.rs

1//! Segment manager - coordinates segment registration and background merging
2//!
3//! This module is only compiled with the "native" feature.
4//!
5//! The SegmentManager is the SOLE owner of `metadata.json`. All writes to metadata
6//! go through this manager, ensuring linearized access and consistency between
7//! the in-memory segment list and persisted state.
8//!
9//! **State separation:**
10//! - Building segments: Managed by IndexWriter (pending_builds)
11//! - Committed segments: Managed by SegmentManager (metadata.segments)
12//! - Merging segments: Subset of committed, tracked here (merging_segments)
13//!
14//! **Commit workflow:**
15//! 1. IndexWriter flushes builders, waits for builds to complete
16//! 2. Calls `register_segment()` for each completed segment
17//! 3. SegmentManager updates metadata atomically, triggers merge check (non-blocking)
18//!
19//! **Merge workflow (background):**
20//! 1. Acquires segments to merge (marks as merging)
21//! 2. Merges into new segment
22//! 3. Calls internal `complete_merge()` which atomically updates metadata
23
24use std::collections::HashSet;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
27
28use parking_lot::RwLock as SyncRwLock;
29use tokio::sync::{Notify, RwLock};
30
31use crate::directories::DirectoryWriter;
32use crate::error::{Error, Result};
33use crate::index::IndexMetadata;
34use crate::segment::{SegmentId, SegmentSnapshot, SegmentTracker};
35#[cfg(feature = "native")]
36use crate::segment::{SegmentMerger, SegmentReader};
37
38use super::{MergePolicy, SegmentInfo};
39
40/// Maximum number of concurrent merge operations
41const MAX_CONCURRENT_MERGES: usize = 2;
42
43/// Segment manager - coordinates segment registration and background merging
44///
45/// This is the SOLE owner of `metadata.json` ensuring linearized access.
46/// All segment list modifications and metadata updates go through here.
47///
48/// Uses RwLock for metadata to allow concurrent reads (search) while
49/// writes (indexing/merge) get exclusive access.
50pub struct SegmentManager<D: DirectoryWriter + 'static> {
51    /// Directory for segment operations
52    directory: Arc<D>,
53    /// Schema for segment operations
54    schema: Arc<crate::dsl::Schema>,
55    /// Unified metadata (segments + vector index state) - SOLE owner
56    /// RwLock allows concurrent reads, exclusive writes
57    metadata: Arc<RwLock<IndexMetadata>>,
58    /// The merge policy to use
59    merge_policy: Box<dyn MergePolicy>,
60    /// Count of in-flight background merges
61    pending_merges: Arc<AtomicUsize>,
62    /// Segments currently being merged (to avoid double-merging)
63    merging_segments: Arc<SyncRwLock<HashSet<String>>>,
64    /// Term cache blocks for segment readers during merge
65    term_cache_blocks: usize,
66    /// Notifier for merge completion (avoids busy-waiting)
67    merge_complete: Arc<Notify>,
68    /// Segment lifecycle tracker for reference counting
69    tracker: Arc<SegmentTracker>,
70    /// Pause flag to prevent new merges during ANN rebuild
71    merge_paused: Arc<AtomicBool>,
72}
73
74impl<D: DirectoryWriter + 'static> SegmentManager<D> {
75    /// Create a new segment manager with existing metadata
76    pub fn new(
77        directory: Arc<D>,
78        schema: Arc<crate::dsl::Schema>,
79        metadata: IndexMetadata,
80        merge_policy: Box<dyn MergePolicy>,
81        term_cache_blocks: usize,
82    ) -> Self {
83        // Initialize tracker and register existing segments
84        let tracker = Arc::new(SegmentTracker::new());
85        for seg_id in metadata.segment_metas.keys() {
86            tracker.register(seg_id);
87        }
88
89        Self {
90            directory,
91            schema,
92            metadata: Arc::new(RwLock::new(metadata)),
93            merge_policy,
94            pending_merges: Arc::new(AtomicUsize::new(0)),
95            merging_segments: Arc::new(SyncRwLock::new(HashSet::new())),
96            term_cache_blocks,
97            merge_complete: Arc::new(Notify::new()),
98            tracker,
99            merge_paused: Arc::new(AtomicBool::new(false)),
100        }
101    }
102
103    /// Get the current segment IDs (snapshot)
104    pub async fn get_segment_ids(&self) -> Vec<String> {
105        self.metadata.read().await.segment_ids()
106    }
107
108    /// Get the number of pending background merges
109    pub fn pending_merge_count(&self) -> usize {
110        self.pending_merges.load(Ordering::SeqCst)
111    }
112
113    /// Get a clone of the metadata Arc for read access
114    pub fn metadata(&self) -> Arc<RwLock<IndexMetadata>> {
115        Arc::clone(&self.metadata)
116    }
117
118    /// Update metadata with a closure and persist atomically
119    pub async fn update_metadata<F>(&self, f: F) -> Result<()>
120    where
121        F: FnOnce(&mut IndexMetadata),
122    {
123        let mut meta = self.metadata.write().await;
124        f(&mut meta);
125        meta.save(self.directory.as_ref()).await
126    }
127
128    /// Acquire a snapshot of current segments for reading
129    /// The snapshot holds references - segments won't be deleted while snapshot exists
130    pub async fn acquire_snapshot(&self) -> SegmentSnapshot<D> {
131        let acquired = {
132            let meta = self.metadata.read().await;
133            let segment_ids = meta.segment_ids();
134            self.tracker.acquire(&segment_ids)
135        };
136
137        // Provide a deletion callback so deferred segment cleanup happens
138        // when this snapshot is dropped (after in-flight searches finish)
139        let dir = Arc::clone(&self.directory);
140        let delete_fn: Arc<dyn Fn(Vec<SegmentId>) + Send + Sync> = Arc::new(move |segment_ids| {
141            let dir = Arc::clone(&dir);
142            tokio::spawn(async move {
143                for segment_id in segment_ids {
144                    log::info!(
145                        "[segment_cleanup] deleting deferred segment {}",
146                        segment_id.0
147                    );
148                    let _ = crate::segment::delete_segment(dir.as_ref(), segment_id).await;
149                }
150            });
151        });
152
153        SegmentSnapshot::with_delete_fn(
154            Arc::clone(&self.tracker),
155            Arc::clone(&self.directory),
156            acquired,
157            delete_fn,
158        )
159    }
160
161    /// Get the segment tracker
162    pub fn tracker(&self) -> Arc<SegmentTracker> {
163        Arc::clone(&self.tracker)
164    }
165
166    /// Get the directory
167    pub fn directory(&self) -> Arc<D> {
168        Arc::clone(&self.directory)
169    }
170}
171
172/// Native-only methods for SegmentManager (merging, segment registration)
173#[cfg(feature = "native")]
174impl<D: DirectoryWriter + 'static> SegmentManager<D> {
175    /// Register a new segment with its doc count, persist metadata, and trigger merge check
176    ///
177    /// This is the main entry point for adding segments after builds complete.
178    pub async fn register_segment(&self, segment_id: String, num_docs: u32) -> Result<()> {
179        {
180            let mut meta = self.metadata.write().await;
181            if !meta.has_segment(&segment_id) {
182                meta.add_segment(segment_id.clone(), num_docs);
183                self.tracker.register(&segment_id);
184            }
185            meta.save(self.directory.as_ref()).await?;
186        }
187
188        // Check if we should trigger a merge (non-blocking)
189        self.maybe_merge().await;
190        Ok(())
191    }
192
193    /// Check merge policy and spawn background merges if needed
194    /// Uses doc counts from metadata - no segment loading required
195    pub async fn maybe_merge(&self) {
196        // Get current segment info from metadata (no segment loading!)
197        let segments: Vec<SegmentInfo> = {
198            let meta = self.metadata.read().await;
199            let merging = self.merging_segments.read();
200
201            // Filter out segments currently being merged or pending deletion
202            meta.segment_metas
203                .iter()
204                .filter(|(id, _)| !merging.contains(*id) && !self.tracker.is_pending_deletion(id))
205                .map(|(id, info)| SegmentInfo {
206                    id: id.clone(),
207                    num_docs: info.num_docs,
208                    size_bytes: None,
209                })
210                .collect()
211        };
212
213        // Ask merge policy for candidates
214        let candidates = self.merge_policy.find_merges(&segments);
215
216        for candidate in candidates {
217            if candidate.segment_ids.len() >= 2 {
218                self.spawn_merge(candidate.segment_ids);
219            }
220        }
221    }
222
223    /// Pause background merges (used during ANN rebuild to prevent races)
224    pub fn pause_merges(&self) {
225        self.merge_paused.store(true, Ordering::SeqCst);
226    }
227
228    /// Resume background merges
229    pub fn resume_merges(&self) {
230        self.merge_paused.store(false, Ordering::SeqCst);
231    }
232
233    /// Spawn a background merge task
234    fn spawn_merge(&self, segment_ids_to_merge: Vec<String>) {
235        // Skip if merges are paused (during ANN rebuild)
236        if self.merge_paused.load(Ordering::SeqCst) {
237            return;
238        }
239        // Limit concurrent merges to avoid overwhelming the system during heavy indexing
240        if self.pending_merges.load(Ordering::SeqCst) >= MAX_CONCURRENT_MERGES {
241            return;
242        }
243
244        // Atomically check and mark segments as being merged
245        // This prevents race conditions where multiple maybe_merge calls
246        // could pick the same segments before they're marked
247        {
248            let mut merging = self.merging_segments.write();
249            // Check if any segment is already being merged
250            if segment_ids_to_merge.iter().any(|id| merging.contains(id)) {
251                // Some segment already being merged, skip this merge
252                return;
253            }
254            // Mark all segments as being merged
255            for id in &segment_ids_to_merge {
256                merging.insert(id.clone());
257            }
258        }
259
260        let directory = Arc::clone(&self.directory);
261        let schema = Arc::clone(&self.schema);
262        let metadata = Arc::clone(&self.metadata);
263        let merging_segments = Arc::clone(&self.merging_segments);
264        let pending_merges = Arc::clone(&self.pending_merges);
265        let merge_complete = Arc::clone(&self.merge_complete);
266        let tracker = Arc::clone(&self.tracker);
267        let term_cache_blocks = self.term_cache_blocks;
268
269        pending_merges.fetch_add(1, Ordering::SeqCst);
270
271        tokio::spawn(async move {
272            let result = Self::do_merge(
273                directory.as_ref(),
274                &schema,
275                &segment_ids_to_merge,
276                term_cache_blocks,
277                &metadata,
278            )
279            .await;
280
281            match result {
282                Ok((new_segment_id, merged_doc_count)) => {
283                    // Register new segment with tracker
284                    tracker.register(&new_segment_id);
285
286                    // Atomically update metadata: remove merged segments, add new one with doc count
287                    {
288                        let mut meta = metadata.write().await;
289                        for id in &segment_ids_to_merge {
290                            meta.remove_segment(id);
291                        }
292                        meta.add_segment(new_segment_id, merged_doc_count);
293                        if let Err(e) = meta.save(directory.as_ref()).await {
294                            eprintln!("[merge] Failed to save metadata after merge: {:?}", e);
295                        }
296                    }
297
298                    // Mark old segments for deletion via tracker (deferred if refs exist)
299                    let ready_to_delete = tracker.mark_for_deletion(&segment_ids_to_merge);
300                    for segment_id in ready_to_delete {
301                        let _ =
302                            crate::segment::delete_segment(directory.as_ref(), segment_id).await;
303                    }
304                }
305                Err(e) => {
306                    eprintln!(
307                        "Background merge failed for segments {:?}: {:?}",
308                        segment_ids_to_merge, e
309                    );
310                }
311            }
312
313            // Remove from merging set
314            {
315                let mut merging = merging_segments.write();
316                for id in &segment_ids_to_merge {
317                    merging.remove(id);
318                }
319            }
320
321            // Decrement pending merges counter and notify waiters
322            pending_merges.fetch_sub(1, Ordering::SeqCst);
323            merge_complete.notify_waiters();
324        });
325    }
326
327    /// Perform the actual merge operation (runs in background task)
328    /// Returns (new_segment_id, total_doc_count)
329    async fn do_merge(
330        directory: &D,
331        schema: &crate::dsl::Schema,
332        segment_ids_to_merge: &[String],
333        term_cache_blocks: usize,
334        metadata: &RwLock<IndexMetadata>,
335    ) -> Result<(String, u32)> {
336        // Load segment readers
337        let mut readers = Vec::new();
338        let mut doc_offset = 0u32;
339        let mut total_docs = 0u32;
340
341        for id_str in segment_ids_to_merge {
342            let segment_id = SegmentId::from_hex(id_str)
343                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
344            let reader = match SegmentReader::open(
345                directory,
346                segment_id,
347                Arc::new(schema.clone()),
348                doc_offset,
349                term_cache_blocks,
350            )
351            .await
352            {
353                Ok(r) => r,
354                Err(e) => {
355                    eprintln!("[merge] Failed to open segment {}: {:?}", id_str, e);
356                    return Err(e);
357                }
358            };
359            let num_docs = reader.meta().num_docs;
360            total_docs += num_docs;
361            doc_offset += num_docs;
362            readers.push(reader);
363        }
364
365        // Check if trained structures exist — if so, use merge_with_ann
366        // to preserve ANN indexes when merging mixed IVF+Flat segments
367        let (trained_centroids, trained_codebooks) = {
368            let meta = metadata.read().await;
369            meta.load_trained_structures(directory).await
370        };
371
372        let merger = SegmentMerger::new(Arc::new(schema.clone()));
373        let new_segment_id = SegmentId::new();
374
375        let merge_result = if !trained_centroids.is_empty() {
376            let trained = crate::segment::TrainedVectorStructures {
377                centroids: trained_centroids,
378                codebooks: trained_codebooks,
379            };
380            merger
381                .merge_with_ann(directory, &readers, new_segment_id, &trained)
382                .await
383        } else {
384            merger.merge(directory, &readers, new_segment_id).await
385        };
386
387        if let Err(e) = merge_result {
388            eprintln!(
389                "[merge] Merge failed for segments {:?} -> {}: {:?}",
390                segment_ids_to_merge,
391                new_segment_id.to_hex(),
392                e
393            );
394            return Err(e);
395        }
396
397        // Note: Segment deletion is handled by the caller via tracker
398        Ok((new_segment_id.to_hex(), total_docs))
399    }
400
401    /// Wait for all pending merges to complete
402    pub async fn wait_for_merges(&self) {
403        while self.pending_merges.load(Ordering::SeqCst) > 0 {
404            self.merge_complete.notified().await;
405        }
406    }
407
408    /// Replace segment list atomically (for force merge / rebuild)
409    /// new_segments: Vec of (segment_id, num_docs) pairs
410    pub async fn replace_segments(
411        &self,
412        new_segments: Vec<(String, u32)>,
413        old_to_delete: Vec<String>,
414    ) -> Result<()> {
415        // Register new segments with tracker
416        for (seg_id, _) in &new_segments {
417            self.tracker.register(seg_id);
418        }
419
420        {
421            let mut meta = self.metadata.write().await;
422            meta.segment_metas.clear();
423            for (seg_id, num_docs) in new_segments {
424                meta.add_segment(seg_id, num_docs);
425            }
426            meta.save(self.directory.as_ref()).await?;
427        }
428
429        // Mark old segments for deletion via tracker (deferred if refs exist)
430        let ready_to_delete = self.tracker.mark_for_deletion(&old_to_delete);
431        for segment_id in ready_to_delete {
432            let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
433        }
434        Ok(())
435    }
436
437    /// Clean up orphan segment files that are not registered
438    ///
439    /// This can happen if the process halts after segment files are written
440    /// but before they are registered in metadata.json. Call this on startup
441    /// to reclaim disk space from incomplete operations.
442    ///
443    /// Returns the number of orphan segments deleted.
444    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
445        let registered_set: HashSet<String> = {
446            let meta = self.metadata.read().await;
447            meta.segment_metas.keys().cloned().collect()
448        };
449
450        // Find all segment files in directory
451        let mut orphan_ids: HashSet<String> = HashSet::new();
452
453        // List directory and find segment files
454        if let Ok(entries) = self.directory.list_files(std::path::Path::new("")).await {
455            for entry in entries {
456                let filename = entry.to_string_lossy();
457                // Match pattern: seg_{32 hex chars}.{ext}
458                if filename.starts_with("seg_") && filename.len() > 37 {
459                    // Extract the hex ID (32 chars after "seg_")
460                    let hex_part = &filename[4..36];
461                    if !registered_set.contains(hex_part) {
462                        orphan_ids.insert(hex_part.to_string());
463                    }
464                }
465            }
466        }
467
468        // Delete orphan segments
469        let mut deleted = 0;
470        for hex_id in &orphan_ids {
471            if let Some(segment_id) = SegmentId::from_hex(hex_id)
472                && crate::segment::delete_segment(self.directory.as_ref(), segment_id)
473                    .await
474                    .is_ok()
475            {
476                deleted += 1;
477            }
478        }
479
480        Ok(deleted)
481    }
482}