1use std::collections::HashSet;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
27
28use parking_lot::RwLock as SyncRwLock;
29use tokio::sync::{Notify, RwLock};
30
31use crate::directories::DirectoryWriter;
32use crate::error::{Error, Result};
33use crate::index::IndexMetadata;
34use crate::segment::{SegmentId, SegmentSnapshot, SegmentTracker};
35#[cfg(feature = "native")]
36use crate::segment::{SegmentMerger, SegmentReader};
37
38use super::{MergePolicy, SegmentInfo};
39
40const MAX_CONCURRENT_MERGES: usize = 2;
42
43pub struct SegmentManager<D: DirectoryWriter + 'static> {
51 directory: Arc<D>,
53 schema: Arc<crate::dsl::Schema>,
55 metadata: Arc<RwLock<IndexMetadata>>,
58 merge_policy: Box<dyn MergePolicy>,
60 pending_merges: Arc<AtomicUsize>,
62 merging_segments: Arc<SyncRwLock<HashSet<String>>>,
64 term_cache_blocks: usize,
66 merge_complete: Arc<Notify>,
68 tracker: Arc<SegmentTracker>,
70 merge_paused: Arc<AtomicBool>,
72}
73
74impl<D: DirectoryWriter + 'static> SegmentManager<D> {
75 pub fn new(
77 directory: Arc<D>,
78 schema: Arc<crate::dsl::Schema>,
79 metadata: IndexMetadata,
80 merge_policy: Box<dyn MergePolicy>,
81 term_cache_blocks: usize,
82 ) -> Self {
83 let tracker = Arc::new(SegmentTracker::new());
85 for seg_id in metadata.segment_metas.keys() {
86 tracker.register(seg_id);
87 }
88
89 Self {
90 directory,
91 schema,
92 metadata: Arc::new(RwLock::new(metadata)),
93 merge_policy,
94 pending_merges: Arc::new(AtomicUsize::new(0)),
95 merging_segments: Arc::new(SyncRwLock::new(HashSet::new())),
96 term_cache_blocks,
97 merge_complete: Arc::new(Notify::new()),
98 tracker,
99 merge_paused: Arc::new(AtomicBool::new(false)),
100 }
101 }
102
103 pub async fn get_segment_ids(&self) -> Vec<String> {
105 self.metadata.read().await.segment_ids()
106 }
107
108 pub fn pending_merge_count(&self) -> usize {
110 self.pending_merges.load(Ordering::SeqCst)
111 }
112
113 pub fn metadata(&self) -> Arc<RwLock<IndexMetadata>> {
115 Arc::clone(&self.metadata)
116 }
117
118 pub async fn update_metadata<F>(&self, f: F) -> Result<()>
120 where
121 F: FnOnce(&mut IndexMetadata),
122 {
123 let mut meta = self.metadata.write().await;
124 f(&mut meta);
125 meta.save(self.directory.as_ref()).await
126 }
127
128 pub async fn acquire_snapshot(&self) -> SegmentSnapshot<D> {
131 let acquired = {
132 let meta = self.metadata.read().await;
133 let segment_ids = meta.segment_ids();
134 self.tracker.acquire(&segment_ids)
135 };
136
137 let dir = Arc::clone(&self.directory);
140 let delete_fn: Arc<dyn Fn(Vec<SegmentId>) + Send + Sync> = Arc::new(move |segment_ids| {
141 let dir = Arc::clone(&dir);
142 tokio::spawn(async move {
143 for segment_id in segment_ids {
144 log::info!(
145 "[segment_cleanup] deleting deferred segment {}",
146 segment_id.0
147 );
148 let _ = crate::segment::delete_segment(dir.as_ref(), segment_id).await;
149 }
150 });
151 });
152
153 SegmentSnapshot::with_delete_fn(Arc::clone(&self.tracker), acquired, delete_fn)
154 }
155
156 pub fn tracker(&self) -> Arc<SegmentTracker> {
158 Arc::clone(&self.tracker)
159 }
160
161 pub fn directory(&self) -> Arc<D> {
163 Arc::clone(&self.directory)
164 }
165}
166
167#[cfg(feature = "native")]
169impl<D: DirectoryWriter + 'static> SegmentManager<D> {
170 pub async fn register_segment(&self, segment_id: String, num_docs: u32) -> Result<()> {
174 {
175 let mut meta = self.metadata.write().await;
176 if !meta.has_segment(&segment_id) {
177 meta.add_segment(segment_id.clone(), num_docs);
178 self.tracker.register(&segment_id);
179 }
180 meta.save(self.directory.as_ref()).await?;
181 }
182
183 self.maybe_merge().await;
185 Ok(())
186 }
187
188 pub async fn maybe_merge(&self) {
191 let segments: Vec<SegmentInfo> = {
193 let meta = self.metadata.read().await;
194 let merging = self.merging_segments.read();
195
196 meta.segment_metas
198 .iter()
199 .filter(|(id, _)| !merging.contains(*id) && !self.tracker.is_pending_deletion(id))
200 .map(|(id, info)| SegmentInfo {
201 id: id.clone(),
202 num_docs: info.num_docs,
203 size_bytes: None,
204 })
205 .collect()
206 };
207
208 let candidates = self.merge_policy.find_merges(&segments);
210
211 for candidate in candidates {
212 if candidate.segment_ids.len() >= 2 {
213 self.spawn_merge(candidate.segment_ids);
214 }
215 }
216 }
217
218 pub fn pause_merges(&self) {
220 self.merge_paused.store(true, Ordering::SeqCst);
221 }
222
223 pub fn resume_merges(&self) {
225 self.merge_paused.store(false, Ordering::SeqCst);
226 }
227
228 fn spawn_merge(&self, segment_ids_to_merge: Vec<String>) {
230 if self.merge_paused.load(Ordering::SeqCst) {
232 return;
233 }
234 if self.pending_merges.load(Ordering::SeqCst) >= MAX_CONCURRENT_MERGES {
236 return;
237 }
238
239 {
243 let mut merging = self.merging_segments.write();
244 if segment_ids_to_merge.iter().any(|id| merging.contains(id)) {
246 return;
248 }
249 for id in &segment_ids_to_merge {
251 merging.insert(id.clone());
252 }
253 }
254
255 let directory = Arc::clone(&self.directory);
256 let schema = Arc::clone(&self.schema);
257 let metadata = Arc::clone(&self.metadata);
258 let merging_segments = Arc::clone(&self.merging_segments);
259 let pending_merges = Arc::clone(&self.pending_merges);
260 let merge_complete = Arc::clone(&self.merge_complete);
261 let tracker = Arc::clone(&self.tracker);
262 let term_cache_blocks = self.term_cache_blocks;
263
264 pending_merges.fetch_add(1, Ordering::SeqCst);
265
266 tokio::spawn(async move {
267 let result = Self::do_merge(
268 directory.as_ref(),
269 &schema,
270 &segment_ids_to_merge,
271 term_cache_blocks,
272 &metadata,
273 )
274 .await;
275
276 match result {
277 Ok((new_segment_id, merged_doc_count)) => {
278 tracker.register(&new_segment_id);
280
281 {
283 let mut meta = metadata.write().await;
284 for id in &segment_ids_to_merge {
285 meta.remove_segment(id);
286 }
287 meta.add_segment(new_segment_id, merged_doc_count);
288 if let Err(e) = meta.save(directory.as_ref()).await {
289 eprintln!("[merge] Failed to save metadata after merge: {:?}", e);
290 }
291 }
292
293 let ready_to_delete = tracker.mark_for_deletion(&segment_ids_to_merge);
295 for segment_id in ready_to_delete {
296 let _ =
297 crate::segment::delete_segment(directory.as_ref(), segment_id).await;
298 }
299 }
300 Err(e) => {
301 eprintln!(
302 "Background merge failed for segments {:?}: {:?}",
303 segment_ids_to_merge, e
304 );
305 }
306 }
307
308 {
310 let mut merging = merging_segments.write();
311 for id in &segment_ids_to_merge {
312 merging.remove(id);
313 }
314 }
315
316 pending_merges.fetch_sub(1, Ordering::SeqCst);
318 merge_complete.notify_waiters();
319 });
320 }
321
322 pub async fn do_merge(
326 directory: &D,
327 schema: &crate::dsl::Schema,
328 segment_ids_to_merge: &[String],
329 term_cache_blocks: usize,
330 metadata: &RwLock<IndexMetadata>,
331 ) -> Result<(String, u32)> {
332 let mut readers = Vec::new();
334 let mut doc_offset = 0u32;
335 let mut total_docs = 0u32;
336
337 for id_str in segment_ids_to_merge {
338 let segment_id = SegmentId::from_hex(id_str)
339 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
340 let reader = match SegmentReader::open(
341 directory,
342 segment_id,
343 Arc::new(schema.clone()),
344 doc_offset,
345 term_cache_blocks,
346 )
347 .await
348 {
349 Ok(r) => r,
350 Err(e) => {
351 eprintln!("[merge] Failed to open segment {}: {:?}", id_str, e);
352 return Err(e);
353 }
354 };
355 let num_docs = reader.meta().num_docs;
356 total_docs += num_docs;
357 doc_offset += num_docs;
358 readers.push(reader);
359 }
360
361 let trained = {
363 let meta = metadata.read().await;
364 meta.load_trained_structures(directory).await
365 };
366
367 let merger = SegmentMerger::new(Arc::new(schema.clone()));
368 let new_segment_id = SegmentId::new();
369
370 log::info!(
371 "[merge] {} segments -> {} (trained={})",
372 segment_ids_to_merge.len(),
373 new_segment_id.to_hex(),
374 trained.as_ref().map_or(0, |t| t.centroids.len())
375 );
376
377 let merge_result = merger
378 .merge(directory, &readers, new_segment_id, trained.as_ref())
379 .await;
380
381 if let Err(e) = merge_result {
382 eprintln!(
383 "[merge] Merge failed for segments {:?} -> {}: {:?}",
384 segment_ids_to_merge,
385 new_segment_id.to_hex(),
386 e
387 );
388 return Err(e);
389 }
390
391 Ok((new_segment_id.to_hex(), total_docs))
393 }
394
395 pub async fn wait_for_merges(&self) {
397 while self.pending_merges.load(Ordering::SeqCst) > 0 {
398 self.merge_complete.notified().await;
399 }
400 }
401
402 pub async fn replace_segments(
408 &self,
409 new_segments: Vec<(String, u32)>,
410 old_to_delete: Vec<String>,
411 ) -> Result<()> {
412 for (seg_id, _) in &new_segments {
414 self.tracker.register(seg_id);
415 }
416
417 {
418 let mut meta = self.metadata.write().await;
419 for id in &old_to_delete {
420 meta.remove_segment(id);
421 }
422 for (seg_id, num_docs) in new_segments {
423 meta.add_segment(seg_id, num_docs);
424 }
425 meta.save(self.directory.as_ref()).await?;
426 }
427
428 let ready_to_delete = self.tracker.mark_for_deletion(&old_to_delete);
430 for segment_id in ready_to_delete {
431 let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
432 }
433 Ok(())
434 }
435
436 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
444 let registered_set: HashSet<String> = {
445 let meta = self.metadata.read().await;
446 meta.segment_metas.keys().cloned().collect()
447 };
448
449 let mut orphan_ids: HashSet<String> = HashSet::new();
451
452 if let Ok(entries) = self.directory.list_files(std::path::Path::new("")).await {
454 for entry in entries {
455 let filename = entry.to_string_lossy();
456 if filename.starts_with("seg_") && filename.len() > 37 {
458 let hex_part = &filename[4..36];
460 if !registered_set.contains(hex_part) {
461 orphan_ids.insert(hex_part.to_string());
462 }
463 }
464 }
465 }
466
467 let mut deleted = 0;
469 for hex_id in &orphan_ids {
470 if let Some(segment_id) = SegmentId::from_hex(hex_id)
471 && crate::segment::delete_segment(self.directory.as_ref(), segment_id)
472 .await
473 .is_ok()
474 {
475 deleted += 1;
476 }
477 }
478
479 Ok(deleted)
480 }
481}