Skip to main content

hermes_core/merge/
scheduler.rs

1//! Segment manager - coordinates segment registration and background merging
2//!
3//! This module is only compiled with the "native" feature.
4//!
5//! The SegmentManager is the SOLE owner of `metadata.json`. All writes to metadata
6//! go through this manager, ensuring linearized access and consistency between
7//! the in-memory segment list and persisted state.
8//!
9//! **State separation:**
10//! - Building segments: Managed by IndexWriter (pending_builds)
11//! - Committed segments: Managed by SegmentManager (metadata.segments)
12//! - Merging segments: Subset of committed, tracked here (merging_segments)
13//!
14//! **Commit workflow:**
15//! 1. IndexWriter flushes builders, waits for builds to complete
16//! 2. Calls `register_segment()` for each completed segment
17//! 3. SegmentManager updates metadata atomically, triggers merge check (non-blocking)
18//!
19//! **Merge workflow (background):**
20//! 1. Acquires segments to merge (marks as merging)
21//! 2. Merges into new segment
22//! 3. Calls internal `complete_merge()` which atomically updates metadata
23
24use std::collections::HashSet;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
27
28use parking_lot::RwLock as SyncRwLock;
29use tokio::sync::{Notify, RwLock};
30
31use crate::directories::DirectoryWriter;
32use crate::error::{Error, Result};
33use crate::index::IndexMetadata;
34use crate::segment::{SegmentId, SegmentSnapshot, SegmentTracker};
35#[cfg(feature = "native")]
36use crate::segment::{SegmentMerger, SegmentReader};
37
38use super::{MergePolicy, SegmentInfo};
39
40/// Maximum number of concurrent merge operations
41const MAX_CONCURRENT_MERGES: usize = 2;
42
43/// Segment manager - coordinates segment registration and background merging
44///
45/// This is the SOLE owner of `metadata.json` ensuring linearized access.
46/// All segment list modifications and metadata updates go through here.
47///
48/// Uses RwLock for metadata to allow concurrent reads (search) while
49/// writes (indexing/merge) get exclusive access.
50pub struct SegmentManager<D: DirectoryWriter + 'static> {
51    /// Directory for segment operations
52    directory: Arc<D>,
53    /// Schema for segment operations
54    schema: Arc<crate::dsl::Schema>,
55    /// Unified metadata (segments + vector index state) - SOLE owner
56    /// RwLock allows concurrent reads, exclusive writes
57    metadata: Arc<RwLock<IndexMetadata>>,
58    /// The merge policy to use
59    merge_policy: Box<dyn MergePolicy>,
60    /// Count of in-flight background merges
61    pending_merges: Arc<AtomicUsize>,
62    /// Segments currently being merged (to avoid double-merging)
63    merging_segments: Arc<SyncRwLock<HashSet<String>>>,
64    /// Term cache blocks for segment readers during merge
65    term_cache_blocks: usize,
66    /// Notifier for merge completion (avoids busy-waiting)
67    merge_complete: Arc<Notify>,
68    /// Segment lifecycle tracker for reference counting
69    tracker: Arc<SegmentTracker>,
70    /// Pause flag to prevent new merges during ANN rebuild
71    merge_paused: Arc<AtomicBool>,
72}
73
74impl<D: DirectoryWriter + 'static> SegmentManager<D> {
75    /// Create a new segment manager with existing metadata
76    pub fn new(
77        directory: Arc<D>,
78        schema: Arc<crate::dsl::Schema>,
79        metadata: IndexMetadata,
80        merge_policy: Box<dyn MergePolicy>,
81        term_cache_blocks: usize,
82    ) -> Self {
83        // Initialize tracker and register existing segments
84        let tracker = Arc::new(SegmentTracker::new());
85        for seg_id in metadata.segment_metas.keys() {
86            tracker.register(seg_id);
87        }
88
89        Self {
90            directory,
91            schema,
92            metadata: Arc::new(RwLock::new(metadata)),
93            merge_policy,
94            pending_merges: Arc::new(AtomicUsize::new(0)),
95            merging_segments: Arc::new(SyncRwLock::new(HashSet::new())),
96            term_cache_blocks,
97            merge_complete: Arc::new(Notify::new()),
98            tracker,
99            merge_paused: Arc::new(AtomicBool::new(false)),
100        }
101    }
102
103    /// Get the current segment IDs (snapshot)
104    pub async fn get_segment_ids(&self) -> Vec<String> {
105        self.metadata.read().await.segment_ids()
106    }
107
108    /// Get the number of pending background merges
109    pub fn pending_merge_count(&self) -> usize {
110        self.pending_merges.load(Ordering::SeqCst)
111    }
112
113    /// Get a clone of the metadata Arc for read access
114    pub fn metadata(&self) -> Arc<RwLock<IndexMetadata>> {
115        Arc::clone(&self.metadata)
116    }
117
118    /// Update metadata with a closure and persist atomically
119    pub async fn update_metadata<F>(&self, f: F) -> Result<()>
120    where
121        F: FnOnce(&mut IndexMetadata),
122    {
123        let mut meta = self.metadata.write().await;
124        f(&mut meta);
125        meta.save(self.directory.as_ref()).await
126    }
127
128    /// Acquire a snapshot of current segments for reading
129    /// The snapshot holds references - segments won't be deleted while snapshot exists
130    pub async fn acquire_snapshot(&self) -> SegmentSnapshot<D> {
131        let acquired = {
132            let meta = self.metadata.read().await;
133            let segment_ids = meta.segment_ids();
134            self.tracker.acquire(&segment_ids)
135        };
136
137        // Provide a deletion callback so deferred segment cleanup happens
138        // when this snapshot is dropped (after in-flight searches finish)
139        let dir = Arc::clone(&self.directory);
140        let delete_fn: Arc<dyn Fn(Vec<SegmentId>) + Send + Sync> = Arc::new(move |segment_ids| {
141            let dir = Arc::clone(&dir);
142            tokio::spawn(async move {
143                for segment_id in segment_ids {
144                    log::info!(
145                        "[segment_cleanup] deleting deferred segment {}",
146                        segment_id.0
147                    );
148                    let _ = crate::segment::delete_segment(dir.as_ref(), segment_id).await;
149                }
150            });
151        });
152
153        SegmentSnapshot::with_delete_fn(Arc::clone(&self.tracker), acquired, delete_fn)
154    }
155
156    /// Get the segment tracker
157    pub fn tracker(&self) -> Arc<SegmentTracker> {
158        Arc::clone(&self.tracker)
159    }
160
161    /// Get the directory
162    pub fn directory(&self) -> Arc<D> {
163        Arc::clone(&self.directory)
164    }
165}
166
167/// Native-only methods for SegmentManager (merging, segment registration)
168#[cfg(feature = "native")]
169impl<D: DirectoryWriter + 'static> SegmentManager<D> {
170    /// Register a new segment with its doc count, persist metadata, and trigger merge check
171    ///
172    /// This is the main entry point for adding segments after builds complete.
173    pub async fn register_segment(&self, segment_id: String, num_docs: u32) -> Result<()> {
174        {
175            let mut meta = self.metadata.write().await;
176            if !meta.has_segment(&segment_id) {
177                meta.add_segment(segment_id.clone(), num_docs);
178                self.tracker.register(&segment_id);
179            }
180            meta.save(self.directory.as_ref()).await?;
181        }
182
183        // Check if we should trigger a merge (non-blocking)
184        self.maybe_merge().await;
185        Ok(())
186    }
187
188    /// Check merge policy and spawn background merges if needed
189    /// Uses doc counts from metadata - no segment loading required
190    pub async fn maybe_merge(&self) {
191        // Get current segment info from metadata (no segment loading!)
192        let segments: Vec<SegmentInfo> = {
193            let meta = self.metadata.read().await;
194            let merging = self.merging_segments.read();
195
196            // Filter out segments currently being merged or pending deletion
197            meta.segment_metas
198                .iter()
199                .filter(|(id, _)| !merging.contains(*id) && !self.tracker.is_pending_deletion(id))
200                .map(|(id, info)| SegmentInfo {
201                    id: id.clone(),
202                    num_docs: info.num_docs,
203                    size_bytes: None,
204                })
205                .collect()
206        };
207
208        // Ask merge policy for candidates
209        let candidates = self.merge_policy.find_merges(&segments);
210
211        for candidate in candidates {
212            if candidate.segment_ids.len() >= 2 {
213                self.spawn_merge(candidate.segment_ids);
214            }
215        }
216    }
217
218    /// Pause background merges (used during ANN rebuild to prevent races)
219    pub fn pause_merges(&self) {
220        self.merge_paused.store(true, Ordering::SeqCst);
221    }
222
223    /// Resume background merges
224    pub fn resume_merges(&self) {
225        self.merge_paused.store(false, Ordering::SeqCst);
226    }
227
228    /// Spawn a background merge task
229    fn spawn_merge(&self, segment_ids_to_merge: Vec<String>) {
230        // Skip if merges are paused (during ANN rebuild)
231        if self.merge_paused.load(Ordering::SeqCst) {
232            return;
233        }
234        // Limit concurrent merges to avoid overwhelming the system during heavy indexing
235        if self.pending_merges.load(Ordering::SeqCst) >= MAX_CONCURRENT_MERGES {
236            return;
237        }
238
239        // Atomically check and mark segments as being merged
240        // This prevents race conditions where multiple maybe_merge calls
241        // could pick the same segments before they're marked
242        {
243            let mut merging = self.merging_segments.write();
244            // Check if any segment is already being merged
245            if segment_ids_to_merge.iter().any(|id| merging.contains(id)) {
246                // Some segment already being merged, skip this merge
247                return;
248            }
249            // Mark all segments as being merged
250            for id in &segment_ids_to_merge {
251                merging.insert(id.clone());
252            }
253        }
254
255        let directory = Arc::clone(&self.directory);
256        let schema = Arc::clone(&self.schema);
257        let metadata = Arc::clone(&self.metadata);
258        let merging_segments = Arc::clone(&self.merging_segments);
259        let pending_merges = Arc::clone(&self.pending_merges);
260        let merge_complete = Arc::clone(&self.merge_complete);
261        let tracker = Arc::clone(&self.tracker);
262        let term_cache_blocks = self.term_cache_blocks;
263
264        pending_merges.fetch_add(1, Ordering::SeqCst);
265
266        tokio::spawn(async move {
267            let result = Self::do_merge(
268                directory.as_ref(),
269                &schema,
270                &segment_ids_to_merge,
271                term_cache_blocks,
272                &metadata,
273            )
274            .await;
275
276            match result {
277                Ok((new_segment_id, merged_doc_count)) => {
278                    // Register new segment with tracker
279                    tracker.register(&new_segment_id);
280
281                    // Atomically update metadata: remove merged segments, add new one with doc count
282                    {
283                        let mut meta = metadata.write().await;
284                        for id in &segment_ids_to_merge {
285                            meta.remove_segment(id);
286                        }
287                        meta.add_segment(new_segment_id, merged_doc_count);
288                        if let Err(e) = meta.save(directory.as_ref()).await {
289                            log::error!("[merge] Failed to save metadata after merge: {:?}", e);
290                        }
291                    }
292
293                    // Mark old segments for deletion via tracker (deferred if refs exist)
294                    let ready_to_delete = tracker.mark_for_deletion(&segment_ids_to_merge);
295                    for segment_id in ready_to_delete {
296                        let _ =
297                            crate::segment::delete_segment(directory.as_ref(), segment_id).await;
298                    }
299                }
300                Err(e) => {
301                    log::error!(
302                        "[merge] Background merge failed for segments {:?}: {:?}",
303                        segment_ids_to_merge,
304                        e
305                    );
306                }
307            }
308
309            // Remove from merging set
310            {
311                let mut merging = merging_segments.write();
312                for id in &segment_ids_to_merge {
313                    merging.remove(id);
314                }
315            }
316
317            // Decrement pending merges counter and notify waiters
318            pending_merges.fetch_sub(1, Ordering::SeqCst);
319            merge_complete.notify_waiters();
320        });
321    }
322
323    /// Perform the actual merge operation.
324    /// Returns (new_segment_id_hex, total_doc_count).
325    /// Used by both background merges and force_merge.
326    pub async fn do_merge(
327        directory: &D,
328        schema: &crate::dsl::Schema,
329        segment_ids_to_merge: &[String],
330        term_cache_blocks: usize,
331        metadata: &RwLock<IndexMetadata>,
332    ) -> Result<(String, u32)> {
333        let load_start = std::time::Instant::now();
334
335        // Parse segment IDs upfront
336        let segment_ids: Vec<SegmentId> = segment_ids_to_merge
337            .iter()
338            .map(|id_str| {
339                SegmentId::from_hex(id_str)
340                    .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))
341            })
342            .collect::<Result<Vec<_>>>()?;
343
344        // Load segment readers in parallel (doc_offset=0; merger computes its own)
345        let schema_arc = Arc::new(schema.clone());
346        let futures: Vec<_> = segment_ids
347            .iter()
348            .map(|&sid| {
349                let sch = Arc::clone(&schema_arc);
350                async move { SegmentReader::open(directory, sid, sch, 0, term_cache_blocks).await }
351            })
352            .collect();
353
354        let results = futures::future::join_all(futures).await;
355        let mut readers = Vec::with_capacity(results.len());
356        let mut total_docs = 0u32;
357        for (i, result) in results.into_iter().enumerate() {
358            match result {
359                Ok(r) => {
360                    total_docs += r.meta().num_docs;
361                    readers.push(r);
362                }
363                Err(e) => {
364                    log::error!(
365                        "[merge] Failed to open segment {}: {:?}",
366                        segment_ids_to_merge[i],
367                        e
368                    );
369                    return Err(e);
370                }
371            }
372        }
373
374        log::info!(
375            "[merge] loaded {} segment readers in {:.1}s",
376            readers.len(),
377            load_start.elapsed().as_secs_f64()
378        );
379
380        // Load trained structures (if any) for ANN-aware merging
381        let trained = {
382            let meta = metadata.read().await;
383            meta.load_trained_structures(directory).await
384        };
385
386        let merger = SegmentMerger::new(schema_arc);
387        let new_segment_id = SegmentId::new();
388
389        log::info!(
390            "[merge] {} segments -> {} (trained={})",
391            segment_ids_to_merge.len(),
392            new_segment_id.to_hex(),
393            trained.as_ref().map_or(0, |t| t.centroids.len())
394        );
395
396        let merge_result = merger
397            .merge(directory, &readers, new_segment_id, trained.as_ref())
398            .await;
399
400        if let Err(e) = merge_result {
401            log::error!(
402                "[merge] Merge failed for segments {:?} -> {}: {:?}",
403                segment_ids_to_merge,
404                new_segment_id.to_hex(),
405                e
406            );
407            return Err(e);
408        }
409
410        log::info!(
411            "[merge] total wall-clock: {:.1}s ({} segments, {} docs)",
412            load_start.elapsed().as_secs_f64(),
413            readers.len(),
414            total_docs,
415        );
416
417        // Note: Segment deletion is handled by the caller via tracker
418        Ok((new_segment_id.to_hex(), total_docs))
419    }
420
421    /// Wait for all pending merges to complete
422    pub async fn wait_for_merges(&self) {
423        while self.pending_merges.load(Ordering::SeqCst) > 0 {
424            self.merge_complete.notified().await;
425        }
426    }
427
428    /// Replace specific old segments with new ones atomically.
429    ///
430    /// Only removes `old_to_delete` from metadata (not all segments), then adds
431    /// `new_segments`. This is safe against concurrent ingestion: segments committed
432    /// between `get_segment_ids()` and this call are preserved.
433    pub async fn replace_segments(
434        &self,
435        new_segments: Vec<(String, u32)>,
436        old_to_delete: Vec<String>,
437    ) -> Result<()> {
438        // Register new segments with tracker
439        for (seg_id, _) in &new_segments {
440            self.tracker.register(seg_id);
441        }
442
443        {
444            let mut meta = self.metadata.write().await;
445            for id in &old_to_delete {
446                meta.remove_segment(id);
447            }
448            for (seg_id, num_docs) in new_segments {
449                meta.add_segment(seg_id, num_docs);
450            }
451            meta.save(self.directory.as_ref()).await?;
452        }
453
454        // Mark old segments for deletion via tracker (deferred if refs exist)
455        let ready_to_delete = self.tracker.mark_for_deletion(&old_to_delete);
456        for segment_id in ready_to_delete {
457            let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
458        }
459        Ok(())
460    }
461
462    /// Clean up orphan segment files that are not registered
463    ///
464    /// This can happen if the process halts after segment files are written
465    /// but before they are registered in metadata.json. Call this on startup
466    /// to reclaim disk space from incomplete operations.
467    ///
468    /// Returns the number of orphan segments deleted.
469    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
470        let registered_set: HashSet<String> = {
471            let meta = self.metadata.read().await;
472            meta.segment_metas.keys().cloned().collect()
473        };
474
475        // Find all segment files in directory
476        let mut orphan_ids: HashSet<String> = HashSet::new();
477
478        // List directory and find segment files
479        if let Ok(entries) = self.directory.list_files(std::path::Path::new("")).await {
480            for entry in entries {
481                let filename = entry.to_string_lossy();
482                // Match pattern: seg_{32 hex chars}.{ext}
483                if filename.starts_with("seg_") && filename.len() > 37 {
484                    // Extract the hex ID (32 chars after "seg_")
485                    let hex_part = &filename[4..36];
486                    if !registered_set.contains(hex_part) {
487                        orphan_ids.insert(hex_part.to_string());
488                    }
489                }
490            }
491        }
492
493        // Delete orphan segments
494        let mut deleted = 0;
495        for hex_id in &orphan_ids {
496            if let Some(segment_id) = SegmentId::from_hex(hex_id)
497                && crate::segment::delete_segment(self.directory.as_ref(), segment_id)
498                    .await
499                    .is_ok()
500            {
501                deleted += 1;
502            }
503        }
504
505        Ok(deleted)
506    }
507}