hermes_core/merge/
scheduler.rs1use std::collections::HashSet;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9use tokio::sync::{Mutex as AsyncMutex, Notify};
10
11use crate::directories::DirectoryWriter;
12use crate::error::{Error, Result};
13use crate::segment::{SegmentId, SegmentMerger, SegmentReader};
14
15use super::{MergePolicy, SegmentInfo};
16
17pub struct SegmentManager<D: DirectoryWriter + 'static> {
25 directory: Arc<D>,
27 schema: Arc<crate::dsl::Schema>,
29 segment_ids: Arc<AsyncMutex<Vec<String>>>,
31 merge_policy: Box<dyn MergePolicy>,
33 pending_merges: Arc<AtomicUsize>,
35 merging_segments: Arc<AsyncMutex<HashSet<String>>>,
37 term_cache_blocks: usize,
39 merge_complete: Arc<Notify>,
41}
42
43impl<D: DirectoryWriter + 'static> SegmentManager<D> {
44 pub fn new(
46 directory: Arc<D>,
47 schema: Arc<crate::dsl::Schema>,
48 segment_ids: Vec<String>,
49 merge_policy: Box<dyn MergePolicy>,
50 term_cache_blocks: usize,
51 ) -> Self {
52 Self {
53 directory,
54 schema,
55 segment_ids: Arc::new(AsyncMutex::new(segment_ids)),
56 merge_policy,
57 pending_merges: Arc::new(AtomicUsize::new(0)),
58 merging_segments: Arc::new(AsyncMutex::new(HashSet::new())),
59 term_cache_blocks,
60 merge_complete: Arc::new(Notify::new()),
61 }
62 }
63
64 pub fn segment_ids(&self) -> Arc<AsyncMutex<Vec<String>>> {
66 Arc::clone(&self.segment_ids)
67 }
68
69 pub async fn get_segment_ids(&self) -> Vec<String> {
71 self.segment_ids.lock().await.clone()
72 }
73
74 pub async fn register_segment(&self, segment_id: String) {
80 {
81 let mut ids = self.segment_ids.lock().await;
82 ids.push(segment_id);
83 }
84
85 self.maybe_merge().await;
87 }
88
89 pub fn pending_merge_count(&self) -> usize {
91 self.pending_merges.load(Ordering::SeqCst)
92 }
93
94 pub async fn maybe_merge(&self) {
96 let ids = self.segment_ids.lock().await;
98 let merging = self.merging_segments.lock().await;
99
100 let available_segments: Vec<String> = ids
102 .iter()
103 .filter(|id| !merging.contains(*id))
104 .cloned()
105 .collect();
106
107 drop(merging);
108 drop(ids);
109
110 let segments: Vec<SegmentInfo> = available_segments
112 .iter()
113 .enumerate()
114 .map(|(i, id)| SegmentInfo {
115 id: id.clone(),
116 num_docs: ((i + 1) * 1000) as u32,
117 size_bytes: None,
118 })
119 .collect();
120
121 let candidates = self.merge_policy.find_merges(&segments);
123
124 for candidate in candidates {
125 if candidate.segment_ids.len() >= 2 {
126 self.spawn_merge(candidate.segment_ids).await;
127 }
128 }
129 }
130
131 async fn spawn_merge(&self, segment_ids_to_merge: Vec<String>) {
133 {
135 let mut merging = self.merging_segments.lock().await;
136 for id in &segment_ids_to_merge {
137 merging.insert(id.clone());
138 }
139 }
140
141 let directory = Arc::clone(&self.directory);
142 let schema = Arc::clone(&self.schema);
143 let segment_ids = Arc::clone(&self.segment_ids);
144 let merging_segments = Arc::clone(&self.merging_segments);
145 let pending_merges = Arc::clone(&self.pending_merges);
146 let merge_complete = Arc::clone(&self.merge_complete);
147 let term_cache_blocks = self.term_cache_blocks;
148
149 pending_merges.fetch_add(1, Ordering::SeqCst);
150
151 tokio::spawn(async move {
152 let result = Self::do_merge(
153 directory.as_ref(),
154 &schema,
155 &segment_ids_to_merge,
156 term_cache_blocks,
157 )
158 .await;
159
160 match result {
161 Ok(new_segment_id) => {
162 let mut ids = segment_ids.lock().await;
164 ids.retain(|id| !segment_ids_to_merge.contains(id));
165 ids.push(new_segment_id);
166 }
167 Err(e) => {
168 eprintln!("Background merge failed: {:?}", e);
169 }
170 }
171
172 let mut merging = merging_segments.lock().await;
174 for id in &segment_ids_to_merge {
175 merging.remove(id);
176 }
177
178 pending_merges.fetch_sub(1, Ordering::SeqCst);
180 merge_complete.notify_waiters();
181 });
182 }
183
184 async fn do_merge(
186 directory: &D,
187 schema: &crate::dsl::Schema,
188 segment_ids_to_merge: &[String],
189 term_cache_blocks: usize,
190 ) -> Result<String> {
191 let mut readers = Vec::new();
193 let mut doc_offset = 0u32;
194
195 for id_str in segment_ids_to_merge {
196 let segment_id = SegmentId::from_hex(id_str)
197 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
198 let reader = SegmentReader::open(
199 directory,
200 segment_id,
201 Arc::new(schema.clone()),
202 doc_offset,
203 term_cache_blocks,
204 )
205 .await?;
206 doc_offset += reader.meta().num_docs;
207 readers.push(reader);
208 }
209
210 let merger = SegmentMerger::new(Arc::new(schema.clone()));
212 let new_segment_id = SegmentId::new();
213 merger.merge(directory, &readers, new_segment_id).await?;
214
215 for id_str in segment_ids_to_merge {
217 if let Some(segment_id) = SegmentId::from_hex(id_str) {
218 let _ = crate::segment::delete_segment(directory, segment_id).await;
219 }
220 }
221
222 Ok(new_segment_id.to_hex())
223 }
224
225 pub async fn wait_for_merges(&self) {
227 while self.pending_merges.load(Ordering::SeqCst) > 0 {
228 self.merge_complete.notified().await;
229 }
230 }
231
232 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
240 let registered_set: HashSet<String> = {
241 let registered_ids = self.segment_ids.lock().await;
242 registered_ids.iter().cloned().collect()
243 };
244
245 let mut orphan_ids: HashSet<String> = HashSet::new();
247
248 if let Ok(entries) = self.directory.list_files(std::path::Path::new("")).await {
250 for entry in entries {
251 let filename = entry.to_string_lossy();
252 if filename.starts_with("seg_") && filename.len() > 37 {
254 let hex_part = &filename[4..36];
256 if !registered_set.contains(hex_part) {
257 orphan_ids.insert(hex_part.to_string());
258 }
259 }
260 }
261 }
262
263 let mut deleted = 0;
265 for hex_id in &orphan_ids {
266 if let Some(segment_id) = SegmentId::from_hex(hex_id)
267 && crate::segment::delete_segment(self.directory.as_ref(), segment_id)
268 .await
269 .is_ok()
270 {
271 deleted += 1;
272 }
273 }
274
275 Ok(deleted)
276 }
277}