Skip to main content

hermes_core/merge/
scheduler.rs

1//! Segment manager - coordinates segment registration and background merging
2//!
3//! This module is only compiled with the "native" feature.
4//!
5//! The SegmentManager is the SOLE owner of `metadata.json`. All writes to metadata
6//! go through this manager, ensuring linearized access and consistency between
7//! the in-memory segment list and persisted state.
8//!
9//! **State separation:**
10//! - Building segments: Managed by IndexWriter (pending_builds)
11//! - Committed segments: Managed by SegmentManager (metadata.segments)
12//! - Merging segments: Subset of committed, tracked here (merging_segments)
13//!
14//! **Commit workflow:**
15//! 1. IndexWriter flushes builders, waits for builds to complete
16//! 2. Calls `register_segment()` for each completed segment
17//! 3. SegmentManager updates metadata atomically, triggers merge check (non-blocking)
18//!
19//! **Merge workflow (background):**
20//! 1. Acquires segments to merge (marks as merging)
21//! 2. Merges into new segment
22//! 3. Calls internal `complete_merge()` which atomically updates metadata
23
24use std::collections::HashSet;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicUsize, Ordering};
27
28use parking_lot::RwLock as SyncRwLock;
29use tokio::sync::{Notify, RwLock};
30
31use crate::directories::DirectoryWriter;
32use crate::error::{Error, Result};
33use crate::index::IndexMetadata;
34use crate::segment::{SegmentId, SegmentSnapshot, SegmentTracker};
35#[cfg(feature = "native")]
36use crate::segment::{SegmentMerger, SegmentReader};
37
38use super::{MergePolicy, SegmentInfo};
39
40/// Maximum number of concurrent merge operations
41const MAX_CONCURRENT_MERGES: usize = 2;
42
43/// Segment manager - coordinates segment registration and background merging
44///
45/// This is the SOLE owner of `metadata.json` ensuring linearized access.
46/// All segment list modifications and metadata updates go through here.
47///
48/// Uses RwLock for metadata to allow concurrent reads (search) while
49/// writes (indexing/merge) get exclusive access.
50pub struct SegmentManager<D: DirectoryWriter + 'static> {
51    /// Directory for segment operations
52    directory: Arc<D>,
53    /// Schema for segment operations
54    schema: Arc<crate::dsl::Schema>,
55    /// Unified metadata (segments + vector index state) - SOLE owner
56    /// RwLock allows concurrent reads, exclusive writes
57    metadata: Arc<RwLock<IndexMetadata>>,
58    /// The merge policy to use
59    merge_policy: Box<dyn MergePolicy>,
60    /// Count of in-flight background merges
61    pending_merges: Arc<AtomicUsize>,
62    /// Segments currently being merged (to avoid double-merging)
63    merging_segments: Arc<SyncRwLock<HashSet<String>>>,
64    /// Term cache blocks for segment readers during merge
65    term_cache_blocks: usize,
66    /// Notifier for merge completion (avoids busy-waiting)
67    merge_complete: Arc<Notify>,
68    /// Segment lifecycle tracker for reference counting
69    tracker: Arc<SegmentTracker>,
70}
71
72impl<D: DirectoryWriter + 'static> SegmentManager<D> {
73    /// Create a new segment manager with existing metadata
74    pub fn new(
75        directory: Arc<D>,
76        schema: Arc<crate::dsl::Schema>,
77        metadata: IndexMetadata,
78        merge_policy: Box<dyn MergePolicy>,
79        term_cache_blocks: usize,
80    ) -> Self {
81        // Initialize tracker and register existing segments
82        let tracker = Arc::new(SegmentTracker::new());
83        for seg_id in metadata.segment_metas.keys() {
84            tracker.register(seg_id);
85        }
86
87        Self {
88            directory,
89            schema,
90            metadata: Arc::new(RwLock::new(metadata)),
91            merge_policy,
92            pending_merges: Arc::new(AtomicUsize::new(0)),
93            merging_segments: Arc::new(SyncRwLock::new(HashSet::new())),
94            term_cache_blocks,
95            merge_complete: Arc::new(Notify::new()),
96            tracker,
97        }
98    }
99
100    /// Get the current segment IDs (snapshot)
101    pub async fn get_segment_ids(&self) -> Vec<String> {
102        self.metadata.read().await.segment_ids()
103    }
104
105    /// Get the number of pending background merges
106    pub fn pending_merge_count(&self) -> usize {
107        self.pending_merges.load(Ordering::SeqCst)
108    }
109
110    /// Get a clone of the metadata Arc for read access
111    pub fn metadata(&self) -> Arc<RwLock<IndexMetadata>> {
112        Arc::clone(&self.metadata)
113    }
114
115    /// Update metadata with a closure and persist atomically
116    pub async fn update_metadata<F>(&self, f: F) -> Result<()>
117    where
118        F: FnOnce(&mut IndexMetadata),
119    {
120        let mut meta = self.metadata.write().await;
121        f(&mut meta);
122        meta.save(self.directory.as_ref()).await
123    }
124
125    /// Acquire a snapshot of current segments for reading
126    /// The snapshot holds references - segments won't be deleted while snapshot exists
127    pub async fn acquire_snapshot(&self) -> SegmentSnapshot<D> {
128        let acquired = {
129            let meta = self.metadata.read().await;
130            let segment_ids = meta.segment_ids();
131            self.tracker.acquire(&segment_ids)
132        };
133
134        SegmentSnapshot::new(
135            Arc::clone(&self.tracker),
136            Arc::clone(&self.directory),
137            acquired,
138        )
139    }
140
141    /// Get the segment tracker
142    pub fn tracker(&self) -> Arc<SegmentTracker> {
143        Arc::clone(&self.tracker)
144    }
145
146    /// Get the directory
147    pub fn directory(&self) -> Arc<D> {
148        Arc::clone(&self.directory)
149    }
150}
151
152/// Native-only methods for SegmentManager (merging, segment registration)
153#[cfg(feature = "native")]
154impl<D: DirectoryWriter + 'static> SegmentManager<D> {
155    /// Register a new segment with its doc count, persist metadata, and trigger merge check
156    ///
157    /// This is the main entry point for adding segments after builds complete.
158    pub async fn register_segment(&self, segment_id: String, num_docs: u32) -> Result<()> {
159        {
160            let mut meta = self.metadata.write().await;
161            if !meta.has_segment(&segment_id) {
162                meta.add_segment(segment_id.clone(), num_docs);
163                self.tracker.register(&segment_id);
164            }
165            meta.save(self.directory.as_ref()).await?;
166        }
167
168        // Check if we should trigger a merge (non-blocking)
169        self.maybe_merge().await;
170        Ok(())
171    }
172
173    /// Check merge policy and spawn background merges if needed
174    /// Uses doc counts from metadata - no segment loading required
175    pub async fn maybe_merge(&self) {
176        // Get current segment info from metadata (no segment loading!)
177        let segments: Vec<SegmentInfo> = {
178            let meta = self.metadata.read().await;
179            let merging = self.merging_segments.read();
180
181            // Filter out segments currently being merged or pending deletion
182            meta.segment_metas
183                .iter()
184                .filter(|(id, _)| !merging.contains(*id) && !self.tracker.is_pending_deletion(id))
185                .map(|(id, info)| SegmentInfo {
186                    id: id.clone(),
187                    num_docs: info.num_docs,
188                    size_bytes: None,
189                })
190                .collect()
191        };
192
193        // Ask merge policy for candidates
194        let candidates = self.merge_policy.find_merges(&segments);
195
196        for candidate in candidates {
197            if candidate.segment_ids.len() >= 2 {
198                self.spawn_merge(candidate.segment_ids);
199            }
200        }
201    }
202
203    /// Spawn a background merge task
204    fn spawn_merge(&self, segment_ids_to_merge: Vec<String>) {
205        // Limit concurrent merges to avoid overwhelming the system during heavy indexing
206        if self.pending_merges.load(Ordering::SeqCst) >= MAX_CONCURRENT_MERGES {
207            return;
208        }
209
210        // Atomically check and mark segments as being merged
211        // This prevents race conditions where multiple maybe_merge calls
212        // could pick the same segments before they're marked
213        {
214            let mut merging = self.merging_segments.write();
215            // Check if any segment is already being merged
216            if segment_ids_to_merge.iter().any(|id| merging.contains(id)) {
217                // Some segment already being merged, skip this merge
218                return;
219            }
220            // Mark all segments as being merged
221            for id in &segment_ids_to_merge {
222                merging.insert(id.clone());
223            }
224        }
225
226        let directory = Arc::clone(&self.directory);
227        let schema = Arc::clone(&self.schema);
228        let metadata = Arc::clone(&self.metadata);
229        let merging_segments = Arc::clone(&self.merging_segments);
230        let pending_merges = Arc::clone(&self.pending_merges);
231        let merge_complete = Arc::clone(&self.merge_complete);
232        let tracker = Arc::clone(&self.tracker);
233        let term_cache_blocks = self.term_cache_blocks;
234
235        pending_merges.fetch_add(1, Ordering::SeqCst);
236
237        tokio::spawn(async move {
238            let result = Self::do_merge(
239                directory.as_ref(),
240                &schema,
241                &segment_ids_to_merge,
242                term_cache_blocks,
243            )
244            .await;
245
246            match result {
247                Ok((new_segment_id, merged_doc_count)) => {
248                    // Register new segment with tracker
249                    tracker.register(&new_segment_id);
250
251                    // Atomically update metadata: remove merged segments, add new one with doc count
252                    {
253                        let mut meta = metadata.write().await;
254                        for id in &segment_ids_to_merge {
255                            meta.remove_segment(id);
256                        }
257                        meta.add_segment(new_segment_id, merged_doc_count);
258                        if let Err(e) = meta.save(directory.as_ref()).await {
259                            eprintln!("[merge] Failed to save metadata after merge: {:?}", e);
260                        }
261                    }
262
263                    // Mark old segments for deletion via tracker (deferred if refs exist)
264                    let ready_to_delete = tracker.mark_for_deletion(&segment_ids_to_merge);
265                    for segment_id in ready_to_delete {
266                        let _ =
267                            crate::segment::delete_segment(directory.as_ref(), segment_id).await;
268                    }
269                }
270                Err(e) => {
271                    eprintln!(
272                        "Background merge failed for segments {:?}: {:?}",
273                        segment_ids_to_merge, e
274                    );
275                }
276            }
277
278            // Remove from merging set
279            {
280                let mut merging = merging_segments.write();
281                for id in &segment_ids_to_merge {
282                    merging.remove(id);
283                }
284            }
285
286            // Decrement pending merges counter and notify waiters
287            pending_merges.fetch_sub(1, Ordering::SeqCst);
288            merge_complete.notify_waiters();
289        });
290    }
291
292    /// Perform the actual merge operation (runs in background task)
293    /// Returns (new_segment_id, total_doc_count)
294    async fn do_merge(
295        directory: &D,
296        schema: &crate::dsl::Schema,
297        segment_ids_to_merge: &[String],
298        term_cache_blocks: usize,
299    ) -> Result<(String, u32)> {
300        // Load segment readers
301        let mut readers = Vec::new();
302        let mut doc_offset = 0u32;
303        let mut total_docs = 0u32;
304
305        for id_str in segment_ids_to_merge {
306            let segment_id = SegmentId::from_hex(id_str)
307                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
308            let reader = match SegmentReader::open(
309                directory,
310                segment_id,
311                Arc::new(schema.clone()),
312                doc_offset,
313                term_cache_blocks,
314            )
315            .await
316            {
317                Ok(r) => r,
318                Err(e) => {
319                    eprintln!("[merge] Failed to open segment {}: {:?}", id_str, e);
320                    return Err(e);
321                }
322            };
323            let num_docs = reader.meta().num_docs;
324            total_docs += num_docs;
325            doc_offset += num_docs;
326            readers.push(reader);
327        }
328
329        // Merge into new segment
330        let merger = SegmentMerger::new(Arc::new(schema.clone()));
331        let new_segment_id = SegmentId::new();
332        if let Err(e) = merger.merge(directory, &readers, new_segment_id).await {
333            eprintln!(
334                "[merge] Merge failed for segments {:?} -> {}: {:?}",
335                segment_ids_to_merge,
336                new_segment_id.to_hex(),
337                e
338            );
339            return Err(e);
340        }
341
342        // Note: Segment deletion is handled by the caller via tracker
343        Ok((new_segment_id.to_hex(), total_docs))
344    }
345
346    /// Wait for all pending merges to complete
347    pub async fn wait_for_merges(&self) {
348        while self.pending_merges.load(Ordering::SeqCst) > 0 {
349            self.merge_complete.notified().await;
350        }
351    }
352
353    /// Replace segment list atomically (for force merge / rebuild)
354    /// new_segments: Vec of (segment_id, num_docs) pairs
355    pub async fn replace_segments(
356        &self,
357        new_segments: Vec<(String, u32)>,
358        old_to_delete: Vec<String>,
359    ) -> Result<()> {
360        // Register new segments with tracker
361        for (seg_id, _) in &new_segments {
362            self.tracker.register(seg_id);
363        }
364
365        {
366            let mut meta = self.metadata.write().await;
367            meta.segment_metas.clear();
368            for (seg_id, num_docs) in new_segments {
369                meta.add_segment(seg_id, num_docs);
370            }
371            meta.save(self.directory.as_ref()).await?;
372        }
373
374        // Mark old segments for deletion via tracker (deferred if refs exist)
375        let ready_to_delete = self.tracker.mark_for_deletion(&old_to_delete);
376        for segment_id in ready_to_delete {
377            let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
378        }
379        Ok(())
380    }
381
382    /// Clean up orphan segment files that are not registered
383    ///
384    /// This can happen if the process halts after segment files are written
385    /// but before they are registered in metadata.json. Call this on startup
386    /// to reclaim disk space from incomplete operations.
387    ///
388    /// Returns the number of orphan segments deleted.
389    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
390        let registered_set: HashSet<String> = {
391            let meta = self.metadata.read().await;
392            meta.segment_metas.keys().cloned().collect()
393        };
394
395        // Find all segment files in directory
396        let mut orphan_ids: HashSet<String> = HashSet::new();
397
398        // List directory and find segment files
399        if let Ok(entries) = self.directory.list_files(std::path::Path::new("")).await {
400            for entry in entries {
401                let filename = entry.to_string_lossy();
402                // Match pattern: seg_{32 hex chars}.{ext}
403                if filename.starts_with("seg_") && filename.len() > 37 {
404                    // Extract the hex ID (32 chars after "seg_")
405                    let hex_part = &filename[4..36];
406                    if !registered_set.contains(hex_part) {
407                        orphan_ids.insert(hex_part.to_string());
408                    }
409                }
410            }
411        }
412
413        // Delete orphan segments
414        let mut deleted = 0;
415        for hex_id in &orphan_ids {
416            if let Some(segment_id) = SegmentId::from_hex(hex_id)
417                && crate::segment::delete_segment(self.directory.as_ref(), segment_id)
418                    .await
419                    .is_ok()
420            {
421                deleted += 1;
422            }
423        }
424
425        Ok(deleted)
426    }
427}