Skip to main content

hermes_core/merge/
scheduler.rs

1//! Segment manager - coordinates segment registration and background merging
2//!
3//! This module is only compiled with the "native" feature.
4
5use std::collections::HashSet;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9use tokio::sync::{Mutex as AsyncMutex, Notify};
10
11use crate::directories::DirectoryWriter;
12use crate::error::{Error, Result};
13use crate::segment::{SegmentId, SegmentMerger, SegmentReader};
14
15use super::{MergePolicy, SegmentInfo};
16
17/// Segment manager - coordinates segment registration and background merging
18///
19/// This is the central point for:
20/// - Tracking all segment IDs
21/// - Registering new segments (from builds or merges)
22/// - Triggering merge checks when segments are added
23/// - Coordinating background merge tasks
24pub struct SegmentManager<D: DirectoryWriter + 'static> {
25    /// Directory for segment operations
26    directory: Arc<D>,
27    /// Schema for segment operations
28    schema: Arc<crate::dsl::Schema>,
29    /// List of committed segment IDs (hex strings)
30    segment_ids: Arc<AsyncMutex<Vec<String>>>,
31    /// The merge policy to use
32    merge_policy: Box<dyn MergePolicy>,
33    /// Count of in-flight background merges
34    pending_merges: Arc<AtomicUsize>,
35    /// Segments currently being merged (to avoid double-merging)
36    merging_segments: Arc<AsyncMutex<HashSet<String>>>,
37    /// Term cache blocks for segment readers during merge
38    term_cache_blocks: usize,
39    /// Notifier for merge completion (avoids busy-waiting)
40    merge_complete: Arc<Notify>,
41}
42
43impl<D: DirectoryWriter + 'static> SegmentManager<D> {
44    /// Create a new segment manager
45    pub fn new(
46        directory: Arc<D>,
47        schema: Arc<crate::dsl::Schema>,
48        segment_ids: Vec<String>,
49        merge_policy: Box<dyn MergePolicy>,
50        term_cache_blocks: usize,
51    ) -> Self {
52        Self {
53            directory,
54            schema,
55            segment_ids: Arc::new(AsyncMutex::new(segment_ids)),
56            merge_policy,
57            pending_merges: Arc::new(AtomicUsize::new(0)),
58            merging_segments: Arc::new(AsyncMutex::new(HashSet::new())),
59            term_cache_blocks,
60            merge_complete: Arc::new(Notify::new()),
61        }
62    }
63
64    /// Get a clone of the segment_ids Arc for sharing with background tasks
65    pub fn segment_ids(&self) -> Arc<AsyncMutex<Vec<String>>> {
66        Arc::clone(&self.segment_ids)
67    }
68
69    /// Get the current segment IDs
70    pub async fn get_segment_ids(&self) -> Vec<String> {
71        self.segment_ids.lock().await.clone()
72    }
73
74    /// Register a new segment and trigger merge check
75    ///
76    /// This is the main entry point for adding segments. It:
77    /// 1. Adds the segment ID to the list
78    /// 2. Checks the merge policy and spawns background merges if needed
79    pub async fn register_segment(&self, segment_id: String) {
80        {
81            let mut ids = self.segment_ids.lock().await;
82            ids.push(segment_id);
83        }
84
85        // Check if we should trigger a merge
86        self.maybe_merge().await;
87    }
88
89    /// Get the number of pending background merges
90    pub fn pending_merge_count(&self) -> usize {
91        self.pending_merges.load(Ordering::SeqCst)
92    }
93
94    /// Check merge policy and spawn background merges if needed
95    pub async fn maybe_merge(&self) {
96        // Get current segment info (excluding segments being merged)
97        let ids = self.segment_ids.lock().await;
98        let merging = self.merging_segments.lock().await;
99
100        // Filter out segments currently being merged
101        let available_segments: Vec<String> = ids
102            .iter()
103            .filter(|id| !merging.contains(*id))
104            .cloned()
105            .collect();
106
107        drop(merging);
108        drop(ids);
109
110        // Build segment info - we estimate doc count based on segment age (newer = smaller)
111        let segments: Vec<SegmentInfo> = available_segments
112            .iter()
113            .enumerate()
114            .map(|(i, id)| SegmentInfo {
115                id: id.clone(),
116                num_docs: ((i + 1) * 1000) as u32,
117                size_bytes: None,
118            })
119            .collect();
120
121        // Ask merge policy for candidates
122        let candidates = self.merge_policy.find_merges(&segments);
123
124        for candidate in candidates {
125            if candidate.segment_ids.len() >= 2 {
126                self.spawn_merge(candidate.segment_ids).await;
127            }
128        }
129    }
130
131    /// Spawn a background merge task
132    async fn spawn_merge(&self, segment_ids_to_merge: Vec<String>) {
133        // Mark segments as being merged
134        {
135            let mut merging = self.merging_segments.lock().await;
136            for id in &segment_ids_to_merge {
137                merging.insert(id.clone());
138            }
139        }
140
141        let directory = Arc::clone(&self.directory);
142        let schema = Arc::clone(&self.schema);
143        let segment_ids = Arc::clone(&self.segment_ids);
144        let merging_segments = Arc::clone(&self.merging_segments);
145        let pending_merges = Arc::clone(&self.pending_merges);
146        let merge_complete = Arc::clone(&self.merge_complete);
147        let term_cache_blocks = self.term_cache_blocks;
148
149        pending_merges.fetch_add(1, Ordering::SeqCst);
150
151        tokio::spawn(async move {
152            let result = Self::do_merge(
153                directory.as_ref(),
154                &schema,
155                &segment_ids_to_merge,
156                term_cache_blocks,
157            )
158            .await;
159
160            match result {
161                Ok(new_segment_id) => {
162                    // Update segment list: remove merged segments, add new one
163                    let mut ids = segment_ids.lock().await;
164                    ids.retain(|id| !segment_ids_to_merge.contains(id));
165                    ids.push(new_segment_id);
166                }
167                Err(e) => {
168                    eprintln!(
169                        "Background merge failed for segments {:?}: {:?}",
170                        segment_ids_to_merge, e
171                    );
172                }
173            }
174
175            // Remove from merging set
176            let mut merging = merging_segments.lock().await;
177            for id in &segment_ids_to_merge {
178                merging.remove(id);
179            }
180
181            // Decrement pending merges counter and notify waiters
182            pending_merges.fetch_sub(1, Ordering::SeqCst);
183            merge_complete.notify_waiters();
184        });
185    }
186
187    /// Perform the actual merge operation (runs in background task)
188    async fn do_merge(
189        directory: &D,
190        schema: &crate::dsl::Schema,
191        segment_ids_to_merge: &[String],
192        term_cache_blocks: usize,
193    ) -> Result<String> {
194        // Load segment readers
195        let mut readers = Vec::new();
196        let mut doc_offset = 0u32;
197
198        for id_str in segment_ids_to_merge {
199            let segment_id = SegmentId::from_hex(id_str)
200                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
201            let reader = SegmentReader::open(
202                directory,
203                segment_id,
204                Arc::new(schema.clone()),
205                doc_offset,
206                term_cache_blocks,
207            )
208            .await?;
209            doc_offset += reader.meta().num_docs;
210            readers.push(reader);
211        }
212
213        // Merge into new segment
214        let merger = SegmentMerger::new(Arc::new(schema.clone()));
215        let new_segment_id = SegmentId::new();
216        merger.merge(directory, &readers, new_segment_id).await?;
217
218        // Delete old segments
219        for id_str in segment_ids_to_merge {
220            if let Some(segment_id) = SegmentId::from_hex(id_str) {
221                let _ = crate::segment::delete_segment(directory, segment_id).await;
222            }
223        }
224
225        Ok(new_segment_id.to_hex())
226    }
227
228    /// Wait for all pending merges to complete
229    pub async fn wait_for_merges(&self) {
230        while self.pending_merges.load(Ordering::SeqCst) > 0 {
231            self.merge_complete.notified().await;
232        }
233    }
234
235    /// Clean up orphan segment files that are not registered
236    ///
237    /// This can happen if the process halts after segment files are written
238    /// but before they are registered in segments.json. Call this on startup
239    /// to reclaim disk space from incomplete operations.
240    ///
241    /// Returns the number of orphan segments deleted.
242    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
243        let registered_set: HashSet<String> = {
244            let registered_ids = self.segment_ids.lock().await;
245            registered_ids.iter().cloned().collect()
246        };
247
248        // Find all segment files in directory
249        let mut orphan_ids: HashSet<String> = HashSet::new();
250
251        // List directory and find segment files
252        if let Ok(entries) = self.directory.list_files(std::path::Path::new("")).await {
253            for entry in entries {
254                let filename = entry.to_string_lossy();
255                // Match pattern: seg_{32 hex chars}.{ext}
256                if filename.starts_with("seg_") && filename.len() > 37 {
257                    // Extract the hex ID (32 chars after "seg_")
258                    let hex_part = &filename[4..36];
259                    if !registered_set.contains(hex_part) {
260                        orphan_ids.insert(hex_part.to_string());
261                    }
262                }
263            }
264        }
265
266        // Delete orphan segments
267        let mut deleted = 0;
268        for hex_id in &orphan_ids {
269            if let Some(segment_id) = SegmentId::from_hex(hex_id)
270                && crate::segment::delete_segment(self.directory.as_ref(), segment_id)
271                    .await
272                    .is_ok()
273            {
274                deleted += 1;
275            }
276        }
277
278        Ok(deleted)
279    }
280}