hermes_core/merge/
scheduler.rs

1//! Segment manager - coordinates segment registration and background merging
2//!
3//! This module is only compiled with the "native" feature.
4
5use std::collections::HashSet;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9use tokio::sync::{Mutex as AsyncMutex, Notify};
10
11use crate::directories::DirectoryWriter;
12use crate::error::{Error, Result};
13use crate::segment::{SegmentId, SegmentMerger, SegmentReader};
14
15use super::{MergePolicy, SegmentInfo};
16
17/// Segment manager - coordinates segment registration and background merging
18///
19/// This is the central point for:
20/// - Tracking all segment IDs
21/// - Registering new segments (from builds or merges)
22/// - Triggering merge checks when segments are added
23/// - Coordinating background merge tasks
24pub struct SegmentManager<D: DirectoryWriter + 'static> {
25    /// Directory for segment operations
26    directory: Arc<D>,
27    /// Schema for segment operations
28    schema: Arc<crate::dsl::Schema>,
29    /// List of committed segment IDs (hex strings)
30    segment_ids: Arc<AsyncMutex<Vec<String>>>,
31    /// The merge policy to use
32    merge_policy: Box<dyn MergePolicy>,
33    /// Count of in-flight background merges
34    pending_merges: Arc<AtomicUsize>,
35    /// Segments currently being merged (to avoid double-merging)
36    merging_segments: Arc<AsyncMutex<HashSet<String>>>,
37    /// Term cache blocks for segment readers during merge
38    term_cache_blocks: usize,
39    /// Notifier for merge completion (avoids busy-waiting)
40    merge_complete: Arc<Notify>,
41}
42
43impl<D: DirectoryWriter + 'static> SegmentManager<D> {
44    /// Create a new segment manager
45    pub fn new(
46        directory: Arc<D>,
47        schema: Arc<crate::dsl::Schema>,
48        segment_ids: Vec<String>,
49        merge_policy: Box<dyn MergePolicy>,
50        term_cache_blocks: usize,
51    ) -> Self {
52        Self {
53            directory,
54            schema,
55            segment_ids: Arc::new(AsyncMutex::new(segment_ids)),
56            merge_policy,
57            pending_merges: Arc::new(AtomicUsize::new(0)),
58            merging_segments: Arc::new(AsyncMutex::new(HashSet::new())),
59            term_cache_blocks,
60            merge_complete: Arc::new(Notify::new()),
61        }
62    }
63
64    /// Get a clone of the segment_ids Arc for sharing with background tasks
65    pub fn segment_ids(&self) -> Arc<AsyncMutex<Vec<String>>> {
66        Arc::clone(&self.segment_ids)
67    }
68
69    /// Get the current segment IDs
70    pub async fn get_segment_ids(&self) -> Vec<String> {
71        self.segment_ids.lock().await.clone()
72    }
73
74    /// Register a new segment and trigger merge check
75    ///
76    /// This is the main entry point for adding segments. It:
77    /// 1. Adds the segment ID to the list
78    /// 2. Checks the merge policy and spawns background merges if needed
79    pub async fn register_segment(&self, segment_id: String) {
80        {
81            let mut ids = self.segment_ids.lock().await;
82            ids.push(segment_id);
83        }
84
85        // Check if we should trigger a merge
86        self.maybe_merge().await;
87    }
88
89    /// Get the number of pending background merges
90    pub fn pending_merge_count(&self) -> usize {
91        self.pending_merges.load(Ordering::SeqCst)
92    }
93
94    /// Check merge policy and spawn background merges if needed
95    pub async fn maybe_merge(&self) {
96        // Get current segment info (excluding segments being merged)
97        let ids = self.segment_ids.lock().await;
98        let merging = self.merging_segments.lock().await;
99
100        // Filter out segments currently being merged
101        let available_segments: Vec<String> = ids
102            .iter()
103            .filter(|id| !merging.contains(*id))
104            .cloned()
105            .collect();
106
107        drop(merging);
108        drop(ids);
109
110        // Build segment info - we estimate doc count based on segment age (newer = smaller)
111        let segments: Vec<SegmentInfo> = available_segments
112            .iter()
113            .enumerate()
114            .map(|(i, id)| SegmentInfo {
115                id: id.clone(),
116                num_docs: ((i + 1) * 1000) as u32,
117                size_bytes: None,
118            })
119            .collect();
120
121        // Ask merge policy for candidates
122        let candidates = self.merge_policy.find_merges(&segments);
123
124        for candidate in candidates {
125            if candidate.segment_ids.len() >= 2 {
126                self.spawn_merge(candidate.segment_ids).await;
127            }
128        }
129    }
130
131    /// Spawn a background merge task
132    async fn spawn_merge(&self, segment_ids_to_merge: Vec<String>) {
133        // Mark segments as being merged
134        {
135            let mut merging = self.merging_segments.lock().await;
136            for id in &segment_ids_to_merge {
137                merging.insert(id.clone());
138            }
139        }
140
141        let directory = Arc::clone(&self.directory);
142        let schema = Arc::clone(&self.schema);
143        let segment_ids = Arc::clone(&self.segment_ids);
144        let merging_segments = Arc::clone(&self.merging_segments);
145        let pending_merges = Arc::clone(&self.pending_merges);
146        let merge_complete = Arc::clone(&self.merge_complete);
147        let term_cache_blocks = self.term_cache_blocks;
148
149        pending_merges.fetch_add(1, Ordering::SeqCst);
150
151        tokio::spawn(async move {
152            let result = Self::do_merge(
153                directory.as_ref(),
154                &schema,
155                &segment_ids_to_merge,
156                term_cache_blocks,
157            )
158            .await;
159
160            match result {
161                Ok(new_segment_id) => {
162                    // Update segment list: remove merged segments, add new one
163                    let mut ids = segment_ids.lock().await;
164                    ids.retain(|id| !segment_ids_to_merge.contains(id));
165                    ids.push(new_segment_id);
166                }
167                Err(e) => {
168                    eprintln!("Background merge failed: {:?}", e);
169                }
170            }
171
172            // Remove from merging set
173            let mut merging = merging_segments.lock().await;
174            for id in &segment_ids_to_merge {
175                merging.remove(id);
176            }
177
178            // Decrement pending merges counter and notify waiters
179            pending_merges.fetch_sub(1, Ordering::SeqCst);
180            merge_complete.notify_waiters();
181        });
182    }
183
184    /// Perform the actual merge operation (runs in background task)
185    async fn do_merge(
186        directory: &D,
187        schema: &crate::dsl::Schema,
188        segment_ids_to_merge: &[String],
189        term_cache_blocks: usize,
190    ) -> Result<String> {
191        // Load segment readers
192        let mut readers = Vec::new();
193        let mut doc_offset = 0u32;
194
195        for id_str in segment_ids_to_merge {
196            let segment_id = SegmentId::from_hex(id_str)
197                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
198            let reader = SegmentReader::open(
199                directory,
200                segment_id,
201                Arc::new(schema.clone()),
202                doc_offset,
203                term_cache_blocks,
204            )
205            .await?;
206            doc_offset += reader.meta().num_docs;
207            readers.push(reader);
208        }
209
210        // Merge into new segment
211        let merger = SegmentMerger::new(Arc::new(schema.clone()));
212        let new_segment_id = SegmentId::new();
213        merger.merge(directory, &readers, new_segment_id).await?;
214
215        // Delete old segments
216        for id_str in segment_ids_to_merge {
217            if let Some(segment_id) = SegmentId::from_hex(id_str) {
218                let _ = crate::segment::delete_segment(directory, segment_id).await;
219            }
220        }
221
222        Ok(new_segment_id.to_hex())
223    }
224
225    /// Wait for all pending merges to complete
226    pub async fn wait_for_merges(&self) {
227        while self.pending_merges.load(Ordering::SeqCst) > 0 {
228            self.merge_complete.notified().await;
229        }
230    }
231
232    /// Clean up orphan segment files that are not registered
233    ///
234    /// This can happen if the process halts after segment files are written
235    /// but before they are registered in segments.json. Call this on startup
236    /// to reclaim disk space from incomplete operations.
237    ///
238    /// Returns the number of orphan segments deleted.
239    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
240        let registered_set: HashSet<String> = {
241            let registered_ids = self.segment_ids.lock().await;
242            registered_ids.iter().cloned().collect()
243        };
244
245        // Find all segment files in directory
246        let mut orphan_ids: HashSet<String> = HashSet::new();
247
248        // List directory and find segment files
249        if let Ok(entries) = self.directory.list_files(std::path::Path::new("")).await {
250            for entry in entries {
251                let filename = entry.to_string_lossy();
252                // Match pattern: seg_{32 hex chars}.{ext}
253                if filename.starts_with("seg_") && filename.len() > 37 {
254                    // Extract the hex ID (32 chars after "seg_")
255                    let hex_part = &filename[4..36];
256                    if !registered_set.contains(hex_part) {
257                        orphan_ids.insert(hex_part.to_string());
258                    }
259                }
260            }
261        }
262
263        // Delete orphan segments
264        let mut deleted = 0;
265        for hex_id in &orphan_ids {
266            if let Some(segment_id) = SegmentId::from_hex(hex_id)
267                && crate::segment::delete_segment(self.directory.as_ref(), segment_id)
268                    .await
269                    .is_ok()
270            {
271                deleted += 1;
272            }
273        }
274
275        Ok(deleted)
276    }
277}