hermes_core/merge/
scheduler.rs1use std::collections::HashSet;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9use tokio::sync::{Mutex as AsyncMutex, Notify};
10
11use crate::directories::DirectoryWriter;
12use crate::error::{Error, Result};
13use crate::segment::{SegmentId, SegmentMerger, SegmentReader};
14
15use super::{MergePolicy, SegmentInfo};
16
17pub struct SegmentManager<D: DirectoryWriter + 'static> {
25 directory: Arc<D>,
27 schema: Arc<crate::dsl::Schema>,
29 segment_ids: Arc<AsyncMutex<Vec<String>>>,
31 merge_policy: Box<dyn MergePolicy>,
33 pending_merges: Arc<AtomicUsize>,
35 merging_segments: Arc<AsyncMutex<HashSet<String>>>,
37 term_cache_blocks: usize,
39 merge_complete: Arc<Notify>,
41}
42
43impl<D: DirectoryWriter + 'static> SegmentManager<D> {
44 pub fn new(
46 directory: Arc<D>,
47 schema: Arc<crate::dsl::Schema>,
48 segment_ids: Vec<String>,
49 merge_policy: Box<dyn MergePolicy>,
50 term_cache_blocks: usize,
51 ) -> Self {
52 Self {
53 directory,
54 schema,
55 segment_ids: Arc::new(AsyncMutex::new(segment_ids)),
56 merge_policy,
57 pending_merges: Arc::new(AtomicUsize::new(0)),
58 merging_segments: Arc::new(AsyncMutex::new(HashSet::new())),
59 term_cache_blocks,
60 merge_complete: Arc::new(Notify::new()),
61 }
62 }
63
64 pub fn segment_ids(&self) -> Arc<AsyncMutex<Vec<String>>> {
66 Arc::clone(&self.segment_ids)
67 }
68
69 pub async fn get_segment_ids(&self) -> Vec<String> {
71 self.segment_ids.lock().await.clone()
72 }
73
74 pub async fn register_segment(&self, segment_id: String) {
80 {
81 let mut ids = self.segment_ids.lock().await;
82 ids.push(segment_id);
83 }
84
85 self.maybe_merge().await;
87 }
88
89 pub fn pending_merge_count(&self) -> usize {
91 self.pending_merges.load(Ordering::SeqCst)
92 }
93
94 pub async fn maybe_merge(&self) {
96 let ids = self.segment_ids.lock().await;
98 let merging = self.merging_segments.lock().await;
99
100 let available_segments: Vec<String> = ids
102 .iter()
103 .filter(|id| !merging.contains(*id))
104 .cloned()
105 .collect();
106
107 drop(merging);
108 drop(ids);
109
110 let segments: Vec<SegmentInfo> = available_segments
112 .iter()
113 .enumerate()
114 .map(|(i, id)| SegmentInfo {
115 id: id.clone(),
116 num_docs: ((i + 1) * 1000) as u32,
117 size_bytes: None,
118 })
119 .collect();
120
121 let candidates = self.merge_policy.find_merges(&segments);
123
124 for candidate in candidates {
125 if candidate.segment_ids.len() >= 2 {
126 self.spawn_merge(candidate.segment_ids).await;
127 }
128 }
129 }
130
131 async fn spawn_merge(&self, segment_ids_to_merge: Vec<String>) {
133 {
135 let mut merging = self.merging_segments.lock().await;
136 for id in &segment_ids_to_merge {
137 merging.insert(id.clone());
138 }
139 }
140
141 let directory = Arc::clone(&self.directory);
142 let schema = Arc::clone(&self.schema);
143 let segment_ids = Arc::clone(&self.segment_ids);
144 let merging_segments = Arc::clone(&self.merging_segments);
145 let pending_merges = Arc::clone(&self.pending_merges);
146 let merge_complete = Arc::clone(&self.merge_complete);
147 let term_cache_blocks = self.term_cache_blocks;
148
149 pending_merges.fetch_add(1, Ordering::SeqCst);
150
151 tokio::spawn(async move {
152 let result = Self::do_merge(
153 directory.as_ref(),
154 &schema,
155 &segment_ids_to_merge,
156 term_cache_blocks,
157 )
158 .await;
159
160 match result {
161 Ok(new_segment_id) => {
162 let mut ids = segment_ids.lock().await;
164 ids.retain(|id| !segment_ids_to_merge.contains(id));
165 ids.push(new_segment_id);
166 }
167 Err(e) => {
168 eprintln!(
169 "Background merge failed for segments {:?}: {:?}",
170 segment_ids_to_merge, e
171 );
172 }
173 }
174
175 let mut merging = merging_segments.lock().await;
177 for id in &segment_ids_to_merge {
178 merging.remove(id);
179 }
180
181 pending_merges.fetch_sub(1, Ordering::SeqCst);
183 merge_complete.notify_waiters();
184 });
185 }
186
187 async fn do_merge(
189 directory: &D,
190 schema: &crate::dsl::Schema,
191 segment_ids_to_merge: &[String],
192 term_cache_blocks: usize,
193 ) -> Result<String> {
194 let mut readers = Vec::new();
196 let mut doc_offset = 0u32;
197
198 for id_str in segment_ids_to_merge {
199 let segment_id = SegmentId::from_hex(id_str)
200 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
201 let reader = SegmentReader::open(
202 directory,
203 segment_id,
204 Arc::new(schema.clone()),
205 doc_offset,
206 term_cache_blocks,
207 )
208 .await?;
209 doc_offset += reader.meta().num_docs;
210 readers.push(reader);
211 }
212
213 let merger = SegmentMerger::new(Arc::new(schema.clone()));
215 let new_segment_id = SegmentId::new();
216 merger.merge(directory, &readers, new_segment_id).await?;
217
218 for id_str in segment_ids_to_merge {
220 if let Some(segment_id) = SegmentId::from_hex(id_str) {
221 let _ = crate::segment::delete_segment(directory, segment_id).await;
222 }
223 }
224
225 Ok(new_segment_id.to_hex())
226 }
227
228 pub async fn wait_for_merges(&self) {
230 while self.pending_merges.load(Ordering::SeqCst) > 0 {
231 self.merge_complete.notified().await;
232 }
233 }
234
235 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
243 let registered_set: HashSet<String> = {
244 let registered_ids = self.segment_ids.lock().await;
245 registered_ids.iter().cloned().collect()
246 };
247
248 let mut orphan_ids: HashSet<String> = HashSet::new();
250
251 if let Ok(entries) = self.directory.list_files(std::path::Path::new("")).await {
253 for entry in entries {
254 let filename = entry.to_string_lossy();
255 if filename.starts_with("seg_") && filename.len() > 37 {
257 let hex_part = &filename[4..36];
259 if !registered_set.contains(hex_part) {
260 orphan_ids.insert(hex_part.to_string());
261 }
262 }
263 }
264 }
265
266 let mut deleted = 0;
268 for hex_id in &orphan_ids {
269 if let Some(segment_id) = SegmentId::from_hex(hex_id)
270 && crate::segment::delete_segment(self.directory.as_ref(), segment_id)
271 .await
272 .is_ok()
273 {
274 deleted += 1;
275 }
276 }
277
278 Ok(deleted)
279 }
280}