1use std::collections::HashSet;
28use std::sync::Arc;
29
30use arc_swap::ArcSwapOption;
31use tokio::sync::Mutex as AsyncMutex;
32use tokio::task::JoinHandle;
33
34use crate::directories::DirectoryWriter;
35use crate::error::{Error, Result};
36use crate::index::IndexMetadata;
37use crate::segment::{SegmentId, SegmentSnapshot, SegmentTracker, TrainedVectorStructures};
38#[cfg(feature = "native")]
39use crate::segment::{SegmentMerger, SegmentReader};
40
41use super::{MergePolicy, SegmentInfo};
42
43struct MergeInventory {
54 inner: parking_lot::Mutex<HashSet<String>>,
55}
56
57impl MergeInventory {
58 fn new() -> Self {
59 Self {
60 inner: parking_lot::Mutex::new(HashSet::new()),
61 }
62 }
63
64 fn try_register(self: &Arc<Self>, segment_ids: Vec<String>) -> Option<MergeGuard> {
67 let mut inner = self.inner.lock();
68 for id in &segment_ids {
70 if inner.contains(id) {
71 log::debug!(
72 "[merge_inventory] rejected: {} overlaps with active merge ({} active IDs)",
73 id,
74 inner.len()
75 );
76 return None;
77 }
78 }
79 log::debug!(
80 "[merge_inventory] registered {} IDs (total active: {})",
81 segment_ids.len(),
82 inner.len() + segment_ids.len()
83 );
84 for id in &segment_ids {
85 inner.insert(id.clone());
86 }
87 Some(MergeGuard {
88 inventory: Arc::clone(self),
89 segment_ids,
90 })
91 }
92
93 fn snapshot(&self) -> HashSet<String> {
95 self.inner.lock().clone()
96 }
97
98 fn contains(&self, segment_id: &str) -> bool {
100 self.inner.lock().contains(segment_id)
101 }
102}
103
104struct MergeGuard {
108 inventory: Arc<MergeInventory>,
109 segment_ids: Vec<String>,
110}
111
112impl Drop for MergeGuard {
113 fn drop(&mut self) {
114 let mut inner = self.inventory.inner.lock();
115 for id in &self.segment_ids {
116 inner.remove(id);
117 }
118 }
119}
120
121struct ManagerState {
123 metadata: IndexMetadata,
124 merge_policy: Box<dyn MergePolicy>,
125}
126
127pub struct SegmentManager<D: DirectoryWriter + 'static> {
131 state: AsyncMutex<ManagerState>,
133
134 merge_inventory: Arc<MergeInventory>,
137
138 merge_handles: AsyncMutex<Vec<JoinHandle<()>>>,
140
141 trained: ArcSwapOption<TrainedVectorStructures>,
143
144 tracker: Arc<SegmentTracker>,
146
147 delete_fn: Arc<dyn Fn(Vec<SegmentId>) + Send + Sync>,
149
150 directory: Arc<D>,
152 schema: Arc<crate::dsl::Schema>,
154 term_cache_blocks: usize,
156 max_concurrent_merges: usize,
158}
159
160impl<D: DirectoryWriter + 'static> SegmentManager<D> {
161 pub fn new(
163 directory: Arc<D>,
164 schema: Arc<crate::dsl::Schema>,
165 metadata: IndexMetadata,
166 merge_policy: Box<dyn MergePolicy>,
167 term_cache_blocks: usize,
168 max_concurrent_merges: usize,
169 ) -> Self {
170 let tracker = Arc::new(SegmentTracker::new());
171 for seg_id in metadata.segment_metas.keys() {
172 tracker.register(seg_id);
173 }
174
175 let delete_fn: Arc<dyn Fn(Vec<SegmentId>) + Send + Sync> = {
176 let dir = Arc::clone(&directory);
177 Arc::new(move |segment_ids| {
178 let Ok(handle) = tokio::runtime::Handle::try_current() else {
181 return;
182 };
183 let dir = Arc::clone(&dir);
184 handle.spawn(async move {
185 for segment_id in segment_ids {
186 log::info!(
187 "[segment_cleanup] deleting deferred segment {}",
188 segment_id.0
189 );
190 let _ = crate::segment::delete_segment(dir.as_ref(), segment_id).await;
191 }
192 });
193 })
194 };
195
196 Self {
197 state: AsyncMutex::new(ManagerState {
198 metadata,
199 merge_policy,
200 }),
201 merge_inventory: Arc::new(MergeInventory::new()),
202 merge_handles: AsyncMutex::new(Vec::new()),
203 trained: ArcSwapOption::new(None),
204 tracker,
205 delete_fn,
206 directory,
207 schema,
208 term_cache_blocks,
209 max_concurrent_merges: max_concurrent_merges.max(1),
210 }
211 }
212
213 pub async fn get_segment_ids(&self) -> Vec<String> {
219 self.state.lock().await.metadata.segment_ids()
220 }
221
222 pub fn trained(&self) -> Option<Arc<TrainedVectorStructures>> {
224 self.trained.load_full()
225 }
226
227 pub async fn load_and_publish_trained(&self) {
230 let vector_fields = {
232 let st = self.state.lock().await;
233 st.metadata.vector_fields.clone()
234 };
235 let trained =
237 IndexMetadata::load_trained_from_fields(&vector_fields, self.directory.as_ref()).await;
238 if let Some(t) = trained {
239 self.trained.store(Some(Arc::new(t)));
240 }
241 }
242
243 pub(crate) fn clear_trained(&self) {
245 self.trained.store(None);
246 }
247
248 pub(crate) async fn read_metadata<F, R>(&self, f: F) -> R
250 where
251 F: FnOnce(&IndexMetadata) -> R,
252 {
253 let st = self.state.lock().await;
254 f(&st.metadata)
255 }
256
257 pub(crate) async fn update_metadata<F>(&self, f: F) -> Result<()>
259 where
260 F: FnOnce(&mut IndexMetadata),
261 {
262 let mut st = self.state.lock().await;
263 f(&mut st.metadata);
264 st.metadata.save(self.directory.as_ref()).await
265 }
266
267 pub async fn acquire_snapshot(&self) -> SegmentSnapshot {
270 let acquired = {
271 let st = self.state.lock().await;
272 let segment_ids = st.metadata.segment_ids();
273 self.tracker.acquire(&segment_ids)
274 };
275
276 SegmentSnapshot::with_delete_fn(
277 Arc::clone(&self.tracker),
278 acquired,
279 Arc::clone(&self.delete_fn),
280 )
281 }
282
283 pub fn tracker(&self) -> Arc<SegmentTracker> {
285 Arc::clone(&self.tracker)
286 }
287
288 pub fn directory(&self) -> Arc<D> {
290 Arc::clone(&self.directory)
291 }
292}
293
294#[cfg(feature = "native")]
299impl<D: DirectoryWriter + 'static> SegmentManager<D> {
300 pub async fn commit(&self, new_segments: Vec<(String, u32)>) -> Result<()> {
302 let mut st = self.state.lock().await;
303 for (segment_id, num_docs) in new_segments {
304 if !st.metadata.has_segment(&segment_id) {
305 st.metadata.add_segment(segment_id.clone(), num_docs);
306 self.tracker.register(&segment_id);
307 }
308 }
309 st.metadata.save(self.directory.as_ref()).await
310 }
311
312 pub async fn maybe_merge(self: &Arc<Self>) {
324 let slots_available = {
326 let mut handles = self.merge_handles.lock().await;
327 handles.retain(|h| !h.is_finished());
328 self.max_concurrent_merges.saturating_sub(handles.len())
329 };
330
331 if slots_available == 0 {
332 log::debug!("[maybe_merge] at max concurrent merges, skipping");
333 return;
334 }
335
336 let new_handles = {
340 let st = self.state.lock().await;
341
342 let segments: Vec<SegmentInfo> = st
344 .metadata
345 .segment_metas
346 .iter()
347 .filter(|(id, _)| {
348 !self.tracker.is_pending_deletion(id) && !self.merge_inventory.contains(id)
349 })
350 .map(|(id, info)| SegmentInfo {
351 id: id.clone(),
352 num_docs: info.num_docs,
353 })
354 .collect();
355
356 log::debug!("[maybe_merge] {} eligible segments", segments.len());
357
358 let candidates = st.merge_policy.find_merges(&segments);
359
360 if candidates.is_empty() {
361 return;
362 }
363
364 log::debug!(
365 "[maybe_merge] {} merge candidates, {} slots available",
366 candidates.len(),
367 slots_available
368 );
369
370 let mut handles = Vec::new();
371 for c in candidates {
372 if handles.len() >= slots_available {
373 break;
374 }
375 if let Some(h) = self.spawn_merge(c.segment_ids) {
376 handles.push(h);
377 }
378 }
379 handles
380 };
382
383 if !new_handles.is_empty() {
384 self.merge_handles.lock().await.extend(new_handles);
385 }
386 }
387
388 fn spawn_merge(self: &Arc<Self>, segment_ids_to_merge: Vec<String>) -> Option<JoinHandle<()>> {
397 let output_id = SegmentId::new();
398 let output_hex = output_id.to_hex();
399
400 let mut all_ids = segment_ids_to_merge.clone();
401 all_ids.push(output_hex);
402
403 let guard = match self.merge_inventory.try_register(all_ids) {
404 Some(g) => g,
405 None => {
406 log::debug!("[spawn_merge] skipped: segments overlap with active merge");
407 return None;
408 }
409 };
410
411 let sm = Arc::clone(self);
412 let ids = segment_ids_to_merge;
413
414 Some(tokio::spawn(async move {
415 let _guard = guard;
416
417 let trained_snap = sm.trained();
418 let result = Self::do_merge(
419 sm.directory.as_ref(),
420 &sm.schema,
421 &ids,
422 output_id,
423 sm.term_cache_blocks,
424 trained_snap.as_deref(),
425 )
426 .await;
427
428 match result {
429 Ok((new_id, doc_count)) => {
430 if let Err(e) = sm.replace_segments(&ids, new_id, doc_count).await {
431 log::error!("[merge] Failed to replace segments after merge: {:?}", e);
432 }
433 }
434 Err(e) => {
435 log::error!(
436 "[merge] Background merge failed for segments {:?}: {:?}",
437 ids,
438 e
439 );
440 }
441 }
442 sm.maybe_merge().await;
447 }))
448 }
449
450 async fn replace_segments(
453 &self,
454 old_ids: &[String],
455 new_id: String,
456 doc_count: u32,
457 ) -> Result<()> {
458 self.tracker.register(&new_id);
459
460 {
461 let mut st = self.state.lock().await;
462 let parent_gen = old_ids
464 .iter()
465 .filter_map(|id| st.metadata.segment_metas.get(id))
466 .map(|info| info.generation)
467 .max()
468 .unwrap_or(0);
469 let ancestors: Vec<String> = old_ids.to_vec();
470
471 for id in old_ids {
472 st.metadata.remove_segment(id);
473 }
474 st.metadata
475 .add_merged_segment(new_id, doc_count, ancestors, parent_gen + 1);
476 st.metadata.save(self.directory.as_ref()).await?;
478 }
479
480 let ready_to_delete = self.tracker.mark_for_deletion(old_ids);
481 for segment_id in ready_to_delete {
482 let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
483 }
484 Ok(())
485 }
486
487 pub(crate) async fn do_merge(
491 directory: &D,
492 schema: &Arc<crate::dsl::Schema>,
493 segment_ids_to_merge: &[String],
494 output_segment_id: SegmentId,
495 term_cache_blocks: usize,
496 trained: Option<&TrainedVectorStructures>,
497 ) -> Result<(String, u32)> {
498 let output_hex = output_segment_id.to_hex();
499 let load_start = std::time::Instant::now();
500
501 let segment_ids: Vec<SegmentId> = segment_ids_to_merge
502 .iter()
503 .map(|id_str| {
504 SegmentId::from_hex(id_str)
505 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))
506 })
507 .collect::<Result<Vec<_>>>()?;
508
509 let schema_arc = Arc::clone(schema);
510 let futures: Vec<_> = segment_ids
511 .iter()
512 .map(|&sid| {
513 let sch = Arc::clone(&schema_arc);
514 async move { SegmentReader::open(directory, sid, sch, term_cache_blocks).await }
515 })
516 .collect();
517
518 let results = futures::future::join_all(futures).await;
519 let mut readers = Vec::with_capacity(results.len());
520 let mut total_docs = 0u64;
521 for (i, result) in results.into_iter().enumerate() {
522 match result {
523 Ok(r) => {
524 total_docs += r.meta().num_docs as u64;
525 readers.push(r);
526 }
527 Err(e) => {
528 log::error!(
529 "[merge] Failed to open segment {}: {:?}",
530 segment_ids_to_merge[i],
531 e
532 );
533 return Err(e);
534 }
535 }
536 }
537
538 for (i, reader) in readers.iter().enumerate() {
542 let meta_docs = reader.meta().num_docs;
543 let store_docs = reader.store().num_docs();
544 if store_docs != meta_docs {
545 return Err(Error::Corruption(format!(
546 "pre-merge validation: segment {} store has {} docs but meta says {}",
547 segment_ids_to_merge[i], store_docs, meta_docs
548 )));
549 }
550 }
551
552 log::info!(
553 "[merge] loaded {} segment readers in {:.1}s",
554 readers.len(),
555 load_start.elapsed().as_secs_f64()
556 );
557
558 let merger = SegmentMerger::new(Arc::clone(schema));
559
560 log::info!(
561 "[merge] {} segments -> {} (trained={})",
562 segment_ids_to_merge.len(),
563 output_hex,
564 trained.map_or(0, |t| t.centroids.len())
565 );
566
567 merger
568 .merge(directory, &readers, output_segment_id, trained)
569 .await?;
570
571 log::info!(
572 "[merge] total wall-clock: {:.1}s ({} segments, {} docs)",
573 load_start.elapsed().as_secs_f64(),
574 readers.len(),
575 total_docs,
576 );
577
578 Ok((output_hex, total_docs.min(u32::MAX as u64) as u32))
579 }
580
581 pub async fn wait_for_merging_thread(self: &Arc<Self>) {
583 let handles: Vec<JoinHandle<()>> =
584 { std::mem::take(&mut *self.merge_handles.lock().await) };
585 for h in handles {
586 let _ = h.await;
587 }
588 }
589
590 pub async fn wait_for_all_merges(self: &Arc<Self>) {
596 loop {
597 let handles: Vec<JoinHandle<()>> =
598 { std::mem::take(&mut *self.merge_handles.lock().await) };
599 if handles.is_empty() {
600 break;
601 }
602 for h in handles {
603 let _ = h.await;
604 }
605 }
606 }
607
608 pub async fn force_merge(self: &Arc<Self>) -> Result<()> {
613 const FORCE_MERGE_BATCH: usize = 64;
614
615 self.wait_for_all_merges().await;
618
619 loop {
620 let ids_to_merge = self.get_segment_ids().await;
621 if ids_to_merge.len() < 2 {
622 return Ok(());
623 }
624
625 let batch: Vec<String> = ids_to_merge.into_iter().take(FORCE_MERGE_BATCH).collect();
626
627 log::info!("[force_merge] merging batch of {} segments", batch.len());
628
629 let output_id = SegmentId::new();
630 let output_hex = output_id.to_hex();
631
632 let mut all_ids = batch.clone();
634 all_ids.push(output_hex);
635 let _guard = match self.merge_inventory.try_register(all_ids) {
636 Some(g) => g,
637 None => {
638 self.wait_for_merging_thread().await;
640 continue;
641 }
642 };
643
644 let trained_snap = self.trained();
645 let (new_segment_id, total_docs) = Self::do_merge(
646 self.directory.as_ref(),
647 &self.schema,
648 &batch,
649 output_id,
650 self.term_cache_blocks,
651 trained_snap.as_deref(),
652 )
653 .await?;
654
655 self.replace_segments(&batch, new_segment_id, total_docs)
656 .await?;
657
658 }
660 }
661
662 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
667 let (registered_set, in_merge_set) = {
671 let st = self.state.lock().await;
672 let registered = st
673 .metadata
674 .segment_metas
675 .keys()
676 .cloned()
677 .collect::<HashSet<String>>();
678 let in_merge = self.merge_inventory.snapshot();
679 (registered, in_merge)
680 };
681
682 let mut orphan_ids: HashSet<String> = HashSet::new();
683
684 if let Ok(entries) = self.directory.list_files(std::path::Path::new("")).await {
685 for entry in entries {
686 let filename = entry.to_string_lossy();
687 if filename.starts_with("seg_") && filename.len() > 37 {
688 let hex_part = &filename[4..36];
689 if !registered_set.contains(hex_part) && !in_merge_set.contains(hex_part) {
690 orphan_ids.insert(hex_part.to_string());
691 }
692 }
693 }
694 }
695
696 let mut deleted = 0;
697 for hex_id in &orphan_ids {
698 if let Some(segment_id) = SegmentId::from_hex(hex_id)
699 && crate::segment::delete_segment(self.directory.as_ref(), segment_id)
700 .await
701 .is_ok()
702 {
703 deleted += 1;
704 }
705 }
706
707 Ok(deleted)
708 }
709}
710
711#[cfg(test)]
712mod tests {
713 use super::*;
714
715 #[test]
716 fn test_inventory_guard_drop_unregisters() {
717 let inv = Arc::new(MergeInventory::new());
718 {
719 let _guard = inv.try_register(vec!["a".into(), "b".into()]).unwrap();
720 let snap = inv.snapshot();
721 assert!(snap.contains("a"));
722 assert!(snap.contains("b"));
723 }
724 assert!(inv.snapshot().is_empty());
726 }
727
728 #[test]
729 fn test_inventory_concurrent_non_overlapping_merges() {
730 let inv = Arc::new(MergeInventory::new());
731 let _g1 = inv.try_register(vec!["a".into(), "b".into()]).unwrap();
732 let _g2 = inv.try_register(vec!["c".into(), "d".into()]).unwrap();
734 let snap = inv.snapshot();
735 assert_eq!(snap.len(), 4);
736
737 drop(_g1);
739 let snap = inv.snapshot();
740 assert_eq!(snap.len(), 2);
741 assert!(snap.contains("c"));
742 assert!(snap.contains("d"));
743 }
744
745 #[test]
746 fn test_inventory_overlapping_merge_rejected() {
747 let inv = Arc::new(MergeInventory::new());
748 let _g1 = inv.try_register(vec!["a".into(), "b".into()]).unwrap();
749 assert!(inv.try_register(vec!["b".into(), "c".into()]).is_none());
751 drop(_g1);
753 assert!(inv.try_register(vec!["b".into(), "c".into()]).is_some());
754 }
755
756 #[test]
757 fn test_inventory_snapshot() {
758 let inv = Arc::new(MergeInventory::new());
759 let _g = inv.try_register(vec!["x".into(), "y".into()]).unwrap();
760 let snap = inv.snapshot();
761 assert!(snap.contains("x"));
762 assert!(snap.contains("y"));
763 assert!(!snap.contains("z"));
764 }
765}