hermes_core/merge/
scheduler.rs1use std::collections::HashSet;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicUsize, Ordering};
27
28use tokio::sync::{Mutex as AsyncMutex, Notify};
29
30use crate::directories::DirectoryWriter;
31use crate::error::{Error, Result};
32use crate::index::IndexMetadata;
33use crate::segment::{SegmentId, SegmentMerger, SegmentReader, SegmentSnapshot, SegmentTracker};
34
35use super::{MergePolicy, SegmentInfo};
36
37pub struct SegmentManager<D: DirectoryWriter + 'static> {
42 directory: Arc<D>,
44 schema: Arc<crate::dsl::Schema>,
46 metadata: Arc<AsyncMutex<IndexMetadata>>,
48 merge_policy: Box<dyn MergePolicy>,
50 pending_merges: Arc<AtomicUsize>,
52 merging_segments: Arc<AsyncMutex<HashSet<String>>>,
54 term_cache_blocks: usize,
56 merge_complete: Arc<Notify>,
58 tracker: Arc<SegmentTracker>,
60}
61
62impl<D: DirectoryWriter + 'static> SegmentManager<D> {
63 pub fn new(
65 directory: Arc<D>,
66 schema: Arc<crate::dsl::Schema>,
67 metadata: IndexMetadata,
68 merge_policy: Box<dyn MergePolicy>,
69 term_cache_blocks: usize,
70 ) -> Self {
71 let tracker = Arc::new(SegmentTracker::new());
73 for seg_id in &metadata.segments {
74 tracker.register(seg_id);
75 }
76
77 Self {
78 directory,
79 schema,
80 metadata: Arc::new(AsyncMutex::new(metadata)),
81 merge_policy,
82 pending_merges: Arc::new(AtomicUsize::new(0)),
83 merging_segments: Arc::new(AsyncMutex::new(HashSet::new())),
84 term_cache_blocks,
85 merge_complete: Arc::new(Notify::new()),
86 tracker,
87 }
88 }
89
90 pub async fn get_segment_ids(&self) -> Vec<String> {
92 self.metadata.lock().await.segments.clone()
93 }
94
95 pub async fn register_segment(&self, segment_id: String) -> Result<()> {
104 {
105 let mut meta = self.metadata.lock().await;
106 if !meta.segments.contains(&segment_id) {
107 meta.segments.push(segment_id.clone());
108 self.tracker.register(&segment_id);
109 }
110 meta.save(self.directory.as_ref()).await?;
111 }
112
113 self.maybe_merge().await;
115 Ok(())
116 }
117
118 pub fn pending_merge_count(&self) -> usize {
120 self.pending_merges.load(Ordering::SeqCst)
121 }
122
123 pub async fn maybe_merge(&self) {
125 let meta = self.metadata.lock().await;
127 let merging = self.merging_segments.lock().await;
128
129 let available_segments: Vec<String> = meta
131 .segments
132 .iter()
133 .filter(|id| !merging.contains(*id) && !self.tracker.is_pending_deletion(id))
134 .cloned()
135 .collect();
136
137 drop(merging);
138 drop(meta);
139
140 let segments: Vec<SegmentInfo> = available_segments
142 .iter()
143 .enumerate()
144 .map(|(i, id)| SegmentInfo {
145 id: id.clone(),
146 num_docs: ((i + 1) * 1000) as u32,
147 size_bytes: None,
148 })
149 .collect();
150
151 let candidates = self.merge_policy.find_merges(&segments);
153
154 for candidate in candidates {
155 if candidate.segment_ids.len() >= 2 {
156 self.spawn_merge(candidate.segment_ids).await;
157 }
158 }
159 }
160
161 async fn spawn_merge(&self, segment_ids_to_merge: Vec<String>) {
163 {
165 let mut merging = self.merging_segments.lock().await;
166 for id in &segment_ids_to_merge {
167 merging.insert(id.clone());
168 }
169 }
170
171 let directory = Arc::clone(&self.directory);
172 let schema = Arc::clone(&self.schema);
173 let metadata = Arc::clone(&self.metadata);
174 let merging_segments = Arc::clone(&self.merging_segments);
175 let pending_merges = Arc::clone(&self.pending_merges);
176 let merge_complete = Arc::clone(&self.merge_complete);
177 let tracker = Arc::clone(&self.tracker);
178 let term_cache_blocks = self.term_cache_blocks;
179
180 pending_merges.fetch_add(1, Ordering::SeqCst);
181
182 tokio::spawn(async move {
183 let result = Self::do_merge(
184 directory.as_ref(),
185 &schema,
186 &segment_ids_to_merge,
187 term_cache_blocks,
188 )
189 .await;
190
191 match result {
192 Ok(new_segment_id) => {
193 tracker.register(&new_segment_id);
195
196 let mut meta = metadata.lock().await;
198 meta.segments
199 .retain(|id| !segment_ids_to_merge.contains(id));
200 meta.segments.push(new_segment_id);
201 if let Err(e) = meta.save(directory.as_ref()).await {
202 eprintln!("[merge] Failed to save metadata after merge: {:?}", e);
203 }
204 drop(meta);
205
206 let ready_to_delete = tracker.mark_for_deletion(&segment_ids_to_merge);
208 for segment_id in ready_to_delete {
209 let _ =
210 crate::segment::delete_segment(directory.as_ref(), segment_id).await;
211 }
212 }
213 Err(e) => {
214 eprintln!(
215 "Background merge failed for segments {:?}: {:?}",
216 segment_ids_to_merge, e
217 );
218 }
219 }
220
221 let mut merging = merging_segments.lock().await;
223 for id in &segment_ids_to_merge {
224 merging.remove(id);
225 }
226
227 pending_merges.fetch_sub(1, Ordering::SeqCst);
229 merge_complete.notify_waiters();
230 });
231 }
232
233 async fn do_merge(
235 directory: &D,
236 schema: &crate::dsl::Schema,
237 segment_ids_to_merge: &[String],
238 term_cache_blocks: usize,
239 ) -> Result<String> {
240 let mut readers = Vec::new();
242 let mut doc_offset = 0u32;
243
244 for id_str in segment_ids_to_merge {
245 let segment_id = SegmentId::from_hex(id_str)
246 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
247 let reader = match SegmentReader::open(
248 directory,
249 segment_id,
250 Arc::new(schema.clone()),
251 doc_offset,
252 term_cache_blocks,
253 )
254 .await
255 {
256 Ok(r) => r,
257 Err(e) => {
258 eprintln!("[merge] Failed to open segment {}: {:?}", id_str, e);
259 return Err(e);
260 }
261 };
262 doc_offset += reader.meta().num_docs;
263 readers.push(reader);
264 }
265
266 let merger = SegmentMerger::new(Arc::new(schema.clone()));
268 let new_segment_id = SegmentId::new();
269 if let Err(e) = merger.merge(directory, &readers, new_segment_id).await {
270 eprintln!(
271 "[merge] Merge failed for segments {:?} -> {}: {:?}",
272 segment_ids_to_merge,
273 new_segment_id.to_hex(),
274 e
275 );
276 return Err(e);
277 }
278
279 Ok(new_segment_id.to_hex())
281 }
282
283 pub async fn wait_for_merges(&self) {
285 while self.pending_merges.load(Ordering::SeqCst) > 0 {
286 self.merge_complete.notified().await;
287 }
288 }
289
290 pub fn metadata(&self) -> Arc<AsyncMutex<IndexMetadata>> {
292 Arc::clone(&self.metadata)
293 }
294
295 pub async fn update_metadata<F>(&self, f: F) -> Result<()>
297 where
298 F: FnOnce(&mut IndexMetadata),
299 {
300 let mut meta = self.metadata.lock().await;
301 f(&mut meta);
302 meta.save(self.directory.as_ref()).await
303 }
304
305 pub async fn replace_segments(
307 &self,
308 new_segments: Vec<String>,
309 old_to_delete: Vec<String>,
310 ) -> Result<()> {
311 for seg_id in &new_segments {
313 self.tracker.register(seg_id);
314 }
315
316 {
317 let mut meta = self.metadata.lock().await;
318 meta.segments = new_segments;
319 meta.save(self.directory.as_ref()).await?;
320 }
321
322 let ready_to_delete = self.tracker.mark_for_deletion(&old_to_delete);
324 for segment_id in ready_to_delete {
325 let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
326 }
327 Ok(())
328 }
329
330 pub async fn acquire_snapshot(&self) -> SegmentSnapshot<D> {
336 let meta = self.metadata.lock().await;
338 let acquired = self.tracker.acquire(&meta.segments);
339 drop(meta); SegmentSnapshot::new(
342 Arc::clone(&self.tracker),
343 Arc::clone(&self.directory),
344 acquired,
345 )
346 }
347
348 pub fn tracker(&self) -> Arc<SegmentTracker> {
350 Arc::clone(&self.tracker)
351 }
352
353 pub fn directory(&self) -> Arc<D> {
355 Arc::clone(&self.directory)
356 }
357
358 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
366 let registered_set: HashSet<String> = {
367 let meta = self.metadata.lock().await;
368 meta.segments.iter().cloned().collect()
369 };
370
371 let mut orphan_ids: HashSet<String> = HashSet::new();
373
374 if let Ok(entries) = self.directory.list_files(std::path::Path::new("")).await {
376 for entry in entries {
377 let filename = entry.to_string_lossy();
378 if filename.starts_with("seg_") && filename.len() > 37 {
380 let hex_part = &filename[4..36];
382 if !registered_set.contains(hex_part) {
383 orphan_ids.insert(hex_part.to_string());
384 }
385 }
386 }
387 }
388
389 let mut deleted = 0;
391 for hex_id in &orphan_ids {
392 if let Some(segment_id) = SegmentId::from_hex(hex_id)
393 && crate::segment::delete_segment(self.directory.as_ref(), segment_id)
394 .await
395 .is_ok()
396 {
397 deleted += 1;
398 }
399 }
400
401 Ok(deleted)
402 }
403}