Skip to main content

hermes_core/merge/
scheduler.rs

1//! Segment manager - coordinates segment registration and background merging
2//!
3//! This module is only compiled with the "native" feature.
4//!
5//! The SegmentManager is the SOLE owner of `metadata.json`. All writes to metadata
6//! go through this manager, ensuring linearized access and consistency between
7//! the in-memory segment list and persisted state.
8//!
9//! **State separation:**
10//! - Building segments: Managed by IndexWriter (pending_builds)
11//! - Committed segments: Managed by SegmentManager (metadata.segments)
12//! - Merging segments: Subset of committed, tracked here (merging_segments)
13//!
14//! **Commit workflow:**
15//! 1. IndexWriter flushes builders, waits for builds to complete
16//! 2. Calls `register_segment()` for each completed segment
17//! 3. SegmentManager updates metadata atomically, triggers merge check (non-blocking)
18//!
19//! **Merge workflow (background):**
20//! 1. Acquires segments to merge (marks as merging)
21//! 2. Merges into new segment
22//! 3. Calls internal `complete_merge()` which atomically updates metadata
23
24use std::collections::HashSet;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicUsize, Ordering};
27
28use tokio::sync::{Mutex as AsyncMutex, Notify};
29
30use crate::directories::DirectoryWriter;
31use crate::error::{Error, Result};
32use crate::index::IndexMetadata;
33use crate::segment::{SegmentId, SegmentMerger, SegmentReader, SegmentSnapshot, SegmentTracker};
34
35use super::{MergePolicy, SegmentInfo};
36
37/// Segment manager - coordinates segment registration and background merging
38///
39/// This is the SOLE owner of `metadata.json` ensuring linearized access.
40/// All segment list modifications and metadata updates go through here.
41pub struct SegmentManager<D: DirectoryWriter + 'static> {
42    /// Directory for segment operations
43    directory: Arc<D>,
44    /// Schema for segment operations
45    schema: Arc<crate::dsl::Schema>,
46    /// Unified metadata (segments + vector index state) - SOLE owner
47    metadata: Arc<AsyncMutex<IndexMetadata>>,
48    /// The merge policy to use
49    merge_policy: Box<dyn MergePolicy>,
50    /// Count of in-flight background merges
51    pending_merges: Arc<AtomicUsize>,
52    /// Segments currently being merged (to avoid double-merging)
53    merging_segments: Arc<AsyncMutex<HashSet<String>>>,
54    /// Term cache blocks for segment readers during merge
55    term_cache_blocks: usize,
56    /// Notifier for merge completion (avoids busy-waiting)
57    merge_complete: Arc<Notify>,
58    /// Segment lifecycle tracker for reference counting
59    tracker: Arc<SegmentTracker>,
60}
61
62impl<D: DirectoryWriter + 'static> SegmentManager<D> {
63    /// Create a new segment manager with existing metadata
64    pub fn new(
65        directory: Arc<D>,
66        schema: Arc<crate::dsl::Schema>,
67        metadata: IndexMetadata,
68        merge_policy: Box<dyn MergePolicy>,
69        term_cache_blocks: usize,
70    ) -> Self {
71        // Initialize tracker and register existing segments
72        let tracker = Arc::new(SegmentTracker::new());
73        for seg_id in &metadata.segments {
74            tracker.register(seg_id);
75        }
76
77        Self {
78            directory,
79            schema,
80            metadata: Arc::new(AsyncMutex::new(metadata)),
81            merge_policy,
82            pending_merges: Arc::new(AtomicUsize::new(0)),
83            merging_segments: Arc::new(AsyncMutex::new(HashSet::new())),
84            term_cache_blocks,
85            merge_complete: Arc::new(Notify::new()),
86            tracker,
87        }
88    }
89
90    /// Get the current segment IDs (snapshot)
91    pub async fn get_segment_ids(&self) -> Vec<String> {
92        self.metadata.lock().await.segments.clone()
93    }
94
95    /// Register a new segment, persist metadata, and trigger merge check
96    ///
97    /// This is the main entry point for adding segments after builds complete.
98    /// It atomically:
99    /// 1. Adds the segment ID to the list
100    /// 2. Registers with tracker for reference counting
101    /// 3. Persists metadata to disk
102    /// 4. Triggers merge check (spawns background merges if needed)
103    pub async fn register_segment(&self, segment_id: String) -> Result<()> {
104        {
105            let mut meta = self.metadata.lock().await;
106            if !meta.segments.contains(&segment_id) {
107                meta.segments.push(segment_id.clone());
108                self.tracker.register(&segment_id);
109            }
110            meta.save(self.directory.as_ref()).await?;
111        }
112
113        // Check if we should trigger a merge (non-blocking)
114        self.maybe_merge().await;
115        Ok(())
116    }
117
118    /// Get the number of pending background merges
119    pub fn pending_merge_count(&self) -> usize {
120        self.pending_merges.load(Ordering::SeqCst)
121    }
122
123    /// Check merge policy and spawn background merges if needed
124    pub async fn maybe_merge(&self) {
125        // Get current segment info (excluding segments being merged or pending deletion)
126        let meta = self.metadata.lock().await;
127        let merging = self.merging_segments.lock().await;
128
129        // Filter out segments currently being merged or pending deletion
130        let available_segments: Vec<String> = meta
131            .segments
132            .iter()
133            .filter(|id| !merging.contains(*id) && !self.tracker.is_pending_deletion(id))
134            .cloned()
135            .collect();
136
137        drop(merging);
138        drop(meta);
139
140        // Build segment info - we estimate doc count based on segment age (newer = smaller)
141        let segments: Vec<SegmentInfo> = available_segments
142            .iter()
143            .enumerate()
144            .map(|(i, id)| SegmentInfo {
145                id: id.clone(),
146                num_docs: ((i + 1) * 1000) as u32,
147                size_bytes: None,
148            })
149            .collect();
150
151        // Ask merge policy for candidates
152        let candidates = self.merge_policy.find_merges(&segments);
153
154        for candidate in candidates {
155            if candidate.segment_ids.len() >= 2 {
156                self.spawn_merge(candidate.segment_ids).await;
157            }
158        }
159    }
160
161    /// Spawn a background merge task
162    async fn spawn_merge(&self, segment_ids_to_merge: Vec<String>) {
163        // Mark segments as being merged
164        {
165            let mut merging = self.merging_segments.lock().await;
166            for id in &segment_ids_to_merge {
167                merging.insert(id.clone());
168            }
169        }
170
171        let directory = Arc::clone(&self.directory);
172        let schema = Arc::clone(&self.schema);
173        let metadata = Arc::clone(&self.metadata);
174        let merging_segments = Arc::clone(&self.merging_segments);
175        let pending_merges = Arc::clone(&self.pending_merges);
176        let merge_complete = Arc::clone(&self.merge_complete);
177        let tracker = Arc::clone(&self.tracker);
178        let term_cache_blocks = self.term_cache_blocks;
179
180        pending_merges.fetch_add(1, Ordering::SeqCst);
181
182        tokio::spawn(async move {
183            let result = Self::do_merge(
184                directory.as_ref(),
185                &schema,
186                &segment_ids_to_merge,
187                term_cache_blocks,
188            )
189            .await;
190
191            match result {
192                Ok(new_segment_id) => {
193                    // Register new segment with tracker
194                    tracker.register(&new_segment_id);
195
196                    // Atomically update metadata: remove merged segments, add new one, persist
197                    let mut meta = metadata.lock().await;
198                    meta.segments
199                        .retain(|id| !segment_ids_to_merge.contains(id));
200                    meta.segments.push(new_segment_id);
201                    if let Err(e) = meta.save(directory.as_ref()).await {
202                        eprintln!("[merge] Failed to save metadata after merge: {:?}", e);
203                    }
204                    drop(meta);
205
206                    // Mark old segments for deletion via tracker (deferred if refs exist)
207                    let ready_to_delete = tracker.mark_for_deletion(&segment_ids_to_merge);
208                    for segment_id in ready_to_delete {
209                        let _ =
210                            crate::segment::delete_segment(directory.as_ref(), segment_id).await;
211                    }
212                }
213                Err(e) => {
214                    eprintln!(
215                        "Background merge failed for segments {:?}: {:?}",
216                        segment_ids_to_merge, e
217                    );
218                }
219            }
220
221            // Remove from merging set
222            let mut merging = merging_segments.lock().await;
223            for id in &segment_ids_to_merge {
224                merging.remove(id);
225            }
226
227            // Decrement pending merges counter and notify waiters
228            pending_merges.fetch_sub(1, Ordering::SeqCst);
229            merge_complete.notify_waiters();
230        });
231    }
232
233    /// Perform the actual merge operation (runs in background task)
234    async fn do_merge(
235        directory: &D,
236        schema: &crate::dsl::Schema,
237        segment_ids_to_merge: &[String],
238        term_cache_blocks: usize,
239    ) -> Result<String> {
240        // Load segment readers
241        let mut readers = Vec::new();
242        let mut doc_offset = 0u32;
243
244        for id_str in segment_ids_to_merge {
245            let segment_id = SegmentId::from_hex(id_str)
246                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
247            let reader = match SegmentReader::open(
248                directory,
249                segment_id,
250                Arc::new(schema.clone()),
251                doc_offset,
252                term_cache_blocks,
253            )
254            .await
255            {
256                Ok(r) => r,
257                Err(e) => {
258                    eprintln!("[merge] Failed to open segment {}: {:?}", id_str, e);
259                    return Err(e);
260                }
261            };
262            doc_offset += reader.meta().num_docs;
263            readers.push(reader);
264        }
265
266        // Merge into new segment
267        let merger = SegmentMerger::new(Arc::new(schema.clone()));
268        let new_segment_id = SegmentId::new();
269        if let Err(e) = merger.merge(directory, &readers, new_segment_id).await {
270            eprintln!(
271                "[merge] Merge failed for segments {:?} -> {}: {:?}",
272                segment_ids_to_merge,
273                new_segment_id.to_hex(),
274                e
275            );
276            return Err(e);
277        }
278
279        // Note: Segment deletion is handled by the caller via tracker
280        Ok(new_segment_id.to_hex())
281    }
282
283    /// Wait for all pending merges to complete
284    pub async fn wait_for_merges(&self) {
285        while self.pending_merges.load(Ordering::SeqCst) > 0 {
286            self.merge_complete.notified().await;
287        }
288    }
289
290    /// Get a clone of the metadata Arc for read access
291    pub fn metadata(&self) -> Arc<AsyncMutex<IndexMetadata>> {
292        Arc::clone(&self.metadata)
293    }
294
295    /// Update metadata with a closure and persist atomically
296    pub async fn update_metadata<F>(&self, f: F) -> Result<()>
297    where
298        F: FnOnce(&mut IndexMetadata),
299    {
300        let mut meta = self.metadata.lock().await;
301        f(&mut meta);
302        meta.save(self.directory.as_ref()).await
303    }
304
305    /// Replace segment list atomically (for force merge / rebuild)
306    pub async fn replace_segments(
307        &self,
308        new_segments: Vec<String>,
309        old_to_delete: Vec<String>,
310    ) -> Result<()> {
311        // Register new segments with tracker
312        for seg_id in &new_segments {
313            self.tracker.register(seg_id);
314        }
315
316        {
317            let mut meta = self.metadata.lock().await;
318            meta.segments = new_segments;
319            meta.save(self.directory.as_ref()).await?;
320        }
321
322        // Mark old segments for deletion via tracker (deferred if refs exist)
323        let ready_to_delete = self.tracker.mark_for_deletion(&old_to_delete);
324        for segment_id in ready_to_delete {
325            let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
326        }
327        Ok(())
328    }
329
330    /// Acquire a snapshot of current segments for reading
331    /// The snapshot holds references - segments won't be deleted while snapshot exists
332    ///
333    /// This is atomic: we hold the metadata lock while acquiring refs to prevent
334    /// a race where a merge completes between getting segment IDs and acquiring refs.
335    pub async fn acquire_snapshot(&self) -> SegmentSnapshot<D> {
336        // Hold metadata lock while acquiring refs - this prevents race with merge completion
337        let meta = self.metadata.lock().await;
338        let acquired = self.tracker.acquire(&meta.segments);
339        drop(meta); // Safe to drop now - refs are acquired
340
341        SegmentSnapshot::new(
342            Arc::clone(&self.tracker),
343            Arc::clone(&self.directory),
344            acquired,
345        )
346    }
347
348    /// Get the segment tracker
349    pub fn tracker(&self) -> Arc<SegmentTracker> {
350        Arc::clone(&self.tracker)
351    }
352
353    /// Get the directory
354    pub fn directory(&self) -> Arc<D> {
355        Arc::clone(&self.directory)
356    }
357
358    /// Clean up orphan segment files that are not registered
359    ///
360    /// This can happen if the process halts after segment files are written
361    /// but before they are registered in metadata.json. Call this on startup
362    /// to reclaim disk space from incomplete operations.
363    ///
364    /// Returns the number of orphan segments deleted.
365    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
366        let registered_set: HashSet<String> = {
367            let meta = self.metadata.lock().await;
368            meta.segments.iter().cloned().collect()
369        };
370
371        // Find all segment files in directory
372        let mut orphan_ids: HashSet<String> = HashSet::new();
373
374        // List directory and find segment files
375        if let Ok(entries) = self.directory.list_files(std::path::Path::new("")).await {
376            for entry in entries {
377                let filename = entry.to_string_lossy();
378                // Match pattern: seg_{32 hex chars}.{ext}
379                if filename.starts_with("seg_") && filename.len() > 37 {
380                    // Extract the hex ID (32 chars after "seg_")
381                    let hex_part = &filename[4..36];
382                    if !registered_set.contains(hex_part) {
383                        orphan_ids.insert(hex_part.to_string());
384                    }
385                }
386            }
387        }
388
389        // Delete orphan segments
390        let mut deleted = 0;
391        for hex_id in &orphan_ids {
392            if let Some(segment_id) = SegmentId::from_hex(hex_id)
393                && crate::segment::delete_segment(self.directory.as_ref(), segment_id)
394                    .await
395                    .is_ok()
396            {
397                deleted += 1;
398            }
399        }
400
401        Ok(deleted)
402    }
403}