hermes_core/merge/
scheduler.rs1use std::collections::HashSet;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicUsize, Ordering};
27
28use parking_lot::RwLock as SyncRwLock;
29use tokio::sync::{Notify, RwLock};
30
31use crate::directories::DirectoryWriter;
32use crate::error::{Error, Result};
33use crate::index::IndexMetadata;
34use crate::segment::{SegmentId, SegmentSnapshot, SegmentTracker};
35#[cfg(feature = "native")]
36use crate::segment::{SegmentMerger, SegmentReader};
37
38use super::{MergePolicy, SegmentInfo};
39
40const MAX_CONCURRENT_MERGES: usize = 2;
42
43pub struct SegmentManager<D: DirectoryWriter + 'static> {
51 directory: Arc<D>,
53 schema: Arc<crate::dsl::Schema>,
55 metadata: Arc<RwLock<IndexMetadata>>,
58 merge_policy: Box<dyn MergePolicy>,
60 pending_merges: Arc<AtomicUsize>,
62 merging_segments: Arc<SyncRwLock<HashSet<String>>>,
64 term_cache_blocks: usize,
66 merge_complete: Arc<Notify>,
68 tracker: Arc<SegmentTracker>,
70}
71
72impl<D: DirectoryWriter + 'static> SegmentManager<D> {
73 pub fn new(
75 directory: Arc<D>,
76 schema: Arc<crate::dsl::Schema>,
77 metadata: IndexMetadata,
78 merge_policy: Box<dyn MergePolicy>,
79 term_cache_blocks: usize,
80 ) -> Self {
81 let tracker = Arc::new(SegmentTracker::new());
83 for seg_id in metadata.segment_metas.keys() {
84 tracker.register(seg_id);
85 }
86
87 Self {
88 directory,
89 schema,
90 metadata: Arc::new(RwLock::new(metadata)),
91 merge_policy,
92 pending_merges: Arc::new(AtomicUsize::new(0)),
93 merging_segments: Arc::new(SyncRwLock::new(HashSet::new())),
94 term_cache_blocks,
95 merge_complete: Arc::new(Notify::new()),
96 tracker,
97 }
98 }
99
100 pub async fn get_segment_ids(&self) -> Vec<String> {
102 self.metadata.read().await.segment_ids()
103 }
104
105 pub fn pending_merge_count(&self) -> usize {
107 self.pending_merges.load(Ordering::SeqCst)
108 }
109
110 pub fn metadata(&self) -> Arc<RwLock<IndexMetadata>> {
112 Arc::clone(&self.metadata)
113 }
114
115 pub async fn update_metadata<F>(&self, f: F) -> Result<()>
117 where
118 F: FnOnce(&mut IndexMetadata),
119 {
120 let mut meta = self.metadata.write().await;
121 f(&mut meta);
122 meta.save(self.directory.as_ref()).await
123 }
124
125 pub async fn acquire_snapshot(&self) -> SegmentSnapshot<D> {
128 let acquired = {
129 let meta = self.metadata.read().await;
130 let segment_ids = meta.segment_ids();
131 self.tracker.acquire(&segment_ids)
132 };
133
134 SegmentSnapshot::new(
135 Arc::clone(&self.tracker),
136 Arc::clone(&self.directory),
137 acquired,
138 )
139 }
140
141 pub fn tracker(&self) -> Arc<SegmentTracker> {
143 Arc::clone(&self.tracker)
144 }
145
146 pub fn directory(&self) -> Arc<D> {
148 Arc::clone(&self.directory)
149 }
150}
151
152#[cfg(feature = "native")]
154impl<D: DirectoryWriter + 'static> SegmentManager<D> {
155 pub async fn register_segment(&self, segment_id: String, num_docs: u32) -> Result<()> {
159 {
160 let mut meta = self.metadata.write().await;
161 if !meta.has_segment(&segment_id) {
162 meta.add_segment(segment_id.clone(), num_docs);
163 self.tracker.register(&segment_id);
164 }
165 meta.save(self.directory.as_ref()).await?;
166 }
167
168 self.maybe_merge().await;
170 Ok(())
171 }
172
173 pub async fn maybe_merge(&self) {
176 let segments: Vec<SegmentInfo> = {
178 let meta = self.metadata.read().await;
179 let merging = self.merging_segments.read();
180
181 meta.segment_metas
183 .iter()
184 .filter(|(id, _)| !merging.contains(*id) && !self.tracker.is_pending_deletion(id))
185 .map(|(id, info)| SegmentInfo {
186 id: id.clone(),
187 num_docs: info.num_docs,
188 size_bytes: None,
189 })
190 .collect()
191 };
192
193 let candidates = self.merge_policy.find_merges(&segments);
195
196 for candidate in candidates {
197 if candidate.segment_ids.len() >= 2 {
198 self.spawn_merge(candidate.segment_ids);
199 }
200 }
201 }
202
203 fn spawn_merge(&self, segment_ids_to_merge: Vec<String>) {
205 if self.pending_merges.load(Ordering::SeqCst) >= MAX_CONCURRENT_MERGES {
207 return;
208 }
209
210 {
214 let mut merging = self.merging_segments.write();
215 if segment_ids_to_merge.iter().any(|id| merging.contains(id)) {
217 return;
219 }
220 for id in &segment_ids_to_merge {
222 merging.insert(id.clone());
223 }
224 }
225
226 let directory = Arc::clone(&self.directory);
227 let schema = Arc::clone(&self.schema);
228 let metadata = Arc::clone(&self.metadata);
229 let merging_segments = Arc::clone(&self.merging_segments);
230 let pending_merges = Arc::clone(&self.pending_merges);
231 let merge_complete = Arc::clone(&self.merge_complete);
232 let tracker = Arc::clone(&self.tracker);
233 let term_cache_blocks = self.term_cache_blocks;
234
235 pending_merges.fetch_add(1, Ordering::SeqCst);
236
237 tokio::spawn(async move {
238 let result = Self::do_merge(
239 directory.as_ref(),
240 &schema,
241 &segment_ids_to_merge,
242 term_cache_blocks,
243 )
244 .await;
245
246 match result {
247 Ok((new_segment_id, merged_doc_count)) => {
248 tracker.register(&new_segment_id);
250
251 {
253 let mut meta = metadata.write().await;
254 for id in &segment_ids_to_merge {
255 meta.remove_segment(id);
256 }
257 meta.add_segment(new_segment_id, merged_doc_count);
258 if let Err(e) = meta.save(directory.as_ref()).await {
259 eprintln!("[merge] Failed to save metadata after merge: {:?}", e);
260 }
261 }
262
263 let ready_to_delete = tracker.mark_for_deletion(&segment_ids_to_merge);
265 for segment_id in ready_to_delete {
266 let _ =
267 crate::segment::delete_segment(directory.as_ref(), segment_id).await;
268 }
269 }
270 Err(e) => {
271 eprintln!(
272 "Background merge failed for segments {:?}: {:?}",
273 segment_ids_to_merge, e
274 );
275 }
276 }
277
278 {
280 let mut merging = merging_segments.write();
281 for id in &segment_ids_to_merge {
282 merging.remove(id);
283 }
284 }
285
286 pending_merges.fetch_sub(1, Ordering::SeqCst);
288 merge_complete.notify_waiters();
289 });
290 }
291
292 async fn do_merge(
295 directory: &D,
296 schema: &crate::dsl::Schema,
297 segment_ids_to_merge: &[String],
298 term_cache_blocks: usize,
299 ) -> Result<(String, u32)> {
300 let mut readers = Vec::new();
302 let mut doc_offset = 0u32;
303 let mut total_docs = 0u32;
304
305 for id_str in segment_ids_to_merge {
306 let segment_id = SegmentId::from_hex(id_str)
307 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
308 let reader = match SegmentReader::open(
309 directory,
310 segment_id,
311 Arc::new(schema.clone()),
312 doc_offset,
313 term_cache_blocks,
314 )
315 .await
316 {
317 Ok(r) => r,
318 Err(e) => {
319 eprintln!("[merge] Failed to open segment {}: {:?}", id_str, e);
320 return Err(e);
321 }
322 };
323 let num_docs = reader.meta().num_docs;
324 total_docs += num_docs;
325 doc_offset += num_docs;
326 readers.push(reader);
327 }
328
329 let merger = SegmentMerger::new(Arc::new(schema.clone()));
331 let new_segment_id = SegmentId::new();
332 if let Err(e) = merger.merge(directory, &readers, new_segment_id).await {
333 eprintln!(
334 "[merge] Merge failed for segments {:?} -> {}: {:?}",
335 segment_ids_to_merge,
336 new_segment_id.to_hex(),
337 e
338 );
339 return Err(e);
340 }
341
342 Ok((new_segment_id.to_hex(), total_docs))
344 }
345
346 pub async fn wait_for_merges(&self) {
348 while self.pending_merges.load(Ordering::SeqCst) > 0 {
349 self.merge_complete.notified().await;
350 }
351 }
352
353 pub async fn replace_segments(
356 &self,
357 new_segments: Vec<(String, u32)>,
358 old_to_delete: Vec<String>,
359 ) -> Result<()> {
360 for (seg_id, _) in &new_segments {
362 self.tracker.register(seg_id);
363 }
364
365 {
366 let mut meta = self.metadata.write().await;
367 meta.segment_metas.clear();
368 for (seg_id, num_docs) in new_segments {
369 meta.add_segment(seg_id, num_docs);
370 }
371 meta.save(self.directory.as_ref()).await?;
372 }
373
374 let ready_to_delete = self.tracker.mark_for_deletion(&old_to_delete);
376 for segment_id in ready_to_delete {
377 let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
378 }
379 Ok(())
380 }
381
382 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
390 let registered_set: HashSet<String> = {
391 let meta = self.metadata.read().await;
392 meta.segment_metas.keys().cloned().collect()
393 };
394
395 let mut orphan_ids: HashSet<String> = HashSet::new();
397
398 if let Ok(entries) = self.directory.list_files(std::path::Path::new("")).await {
400 for entry in entries {
401 let filename = entry.to_string_lossy();
402 if filename.starts_with("seg_") && filename.len() > 37 {
404 let hex_part = &filename[4..36];
406 if !registered_set.contains(hex_part) {
407 orphan_ids.insert(hex_part.to_string());
408 }
409 }
410 }
411 }
412
413 let mut deleted = 0;
415 for hex_id in &orphan_ids {
416 if let Some(segment_id) = SegmentId::from_hex(hex_id)
417 && crate::segment::delete_segment(self.directory.as_ref(), segment_id)
418 .await
419 .is_ok()
420 {
421 deleted += 1;
422 }
423 }
424
425 Ok(deleted)
426 }
427}