Skip to main content

hermes_core/merge/
segment_manager.rs

1//! Segment manager — coordinates segment commit, background merging, and trained structures.
2//!
3//! Architecture:
4//! - **Single mutation queue**: All metadata mutations serialize through `tokio::sync::Mutex<ManagerState>`.
5//! - **Concurrent merges**: Multiple non-overlapping merges can run in parallel.
6//!   Each merge registers its segment IDs in `MergeInventory` via RAII `MergeGuard`.
7//!   New merges are rejected only if they share segments with an active merge.
8//! - **Auto-trigger**: Each completed merge re-evaluates the merge policy and spawns
9//!   new merges if eligible (cascading merges for higher tiers).
10//! - **ArcSwap for trained**: Lock-free reads of trained vector structures.
11//!
12//! # Locking model (deadlock-free by construction)
13//!
14//! ```text
15//! Lock ordering (acquire in this order):
16//!   1. state               — tokio::sync::Mutex, held for mutations + disk I/O
17//!   2. merge_inventory     — parking_lot::Mutex (sync), sub-μs hold, RAII via MergeGuard
18//!   3. tracker.inner       — parking_lot::Mutex (sync), sub-μs hold
19//!
20//! Lock-free state:
21//!   trained                — arc_swap::ArcSwapOption, no ordering constraint
22//!   merge_handles          — tokio::sync::Mutex, never held with state
23//! ```
24//!
25//! **Rule:** Never hold a sync lock while `.await`-ing.
26
27use std::collections::HashSet;
28use std::sync::Arc;
29
30use arc_swap::ArcSwapOption;
31use tokio::sync::Mutex as AsyncMutex;
32use tokio::task::JoinHandle;
33
34use crate::directories::DirectoryWriter;
35use crate::error::{Error, Result};
36use crate::index::IndexMetadata;
37use crate::segment::{SegmentId, SegmentSnapshot, SegmentTracker, TrainedVectorStructures};
38#[cfg(feature = "native")]
39use crate::segment::{SegmentMerger, SegmentReader};
40
41use super::{MergePolicy, SegmentInfo};
42
43// ============================================================================
44// RAII merge tracking
45// ============================================================================
46
47/// Tracks which segments are involved in active merges.
48///
49/// Supports multiple concurrent merges. Each merge registers its segment IDs;
50/// a new merge is rejected only if any of its segments overlap with an active merge.
51/// Uses RAII via `MergeGuard`: when a merge task ends, the guard drops and
52/// its segment IDs are automatically unregistered.
53struct MergeInventory {
54    inner: parking_lot::Mutex<HashSet<String>>,
55}
56
57impl MergeInventory {
58    fn new() -> Self {
59        Self {
60            inner: parking_lot::Mutex::new(HashSet::new()),
61        }
62    }
63
64    /// Try to register a merge. Returns `MergeGuard` on success, `None` if
65    /// any of the requested segments are already in an active merge.
66    fn try_register(self: &Arc<Self>, segment_ids: Vec<String>) -> Option<MergeGuard> {
67        let mut inner = self.inner.lock();
68        // Check for overlap with any active merge
69        for id in &segment_ids {
70            if inner.contains(id) {
71                log::debug!(
72                    "[merge_inventory] rejected: {} overlaps with active merge ({} active IDs)",
73                    id,
74                    inner.len()
75                );
76                return None;
77            }
78        }
79        log::debug!(
80            "[merge_inventory] registered {} IDs (total active: {})",
81            segment_ids.len(),
82            inner.len() + segment_ids.len()
83        );
84        for id in &segment_ids {
85            inner.insert(id.clone());
86        }
87        Some(MergeGuard {
88            inventory: Arc::clone(self),
89            segment_ids,
90        })
91    }
92
93    /// Snapshot of all in-merge segment IDs (for cleanup_orphan_segments)
94    fn snapshot(&self) -> HashSet<String> {
95        self.inner.lock().clone()
96    }
97
98    /// Check if a specific segment is currently involved in a merge.
99    fn contains(&self, segment_id: &str) -> bool {
100        self.inner.lock().contains(segment_id)
101    }
102}
103
104/// RAII guard for a merge operation.
105/// Dropped when the merge task completes (success, failure, or panic) —
106/// automatically unregisters this merge's segment IDs from the inventory.
107struct MergeGuard {
108    inventory: Arc<MergeInventory>,
109    segment_ids: Vec<String>,
110}
111
112impl Drop for MergeGuard {
113    fn drop(&mut self) {
114        let mut inner = self.inventory.inner.lock();
115        for id in &self.segment_ids {
116            inner.remove(id);
117        }
118    }
119}
120
121/// All mutable state behind the single async Mutex.
122struct ManagerState {
123    metadata: IndexMetadata,
124    merge_policy: Box<dyn MergePolicy>,
125}
126
127/// Segment manager — coordinates segment commit, background merging, and trained structures.
128///
129/// SOLE owner of `metadata.json`. All metadata mutations go through `state` Mutex.
130pub struct SegmentManager<D: DirectoryWriter + 'static> {
131    /// Serializes ALL metadata mutations.
132    state: AsyncMutex<ManagerState>,
133
134    /// RAII merge tracking: segments are registered on merge start, automatically
135    /// unregistered when the merge task ends (via MergeGuard drop).
136    merge_inventory: Arc<MergeInventory>,
137
138    /// In-flight merge JoinHandles — supports multiple concurrent merges.
139    merge_handles: AsyncMutex<Vec<JoinHandle<()>>>,
140
141    /// Trained vector structures — lock-free reads via ArcSwap.
142    trained: ArcSwapOption<TrainedVectorStructures>,
143
144    /// Reference counting for safe segment deletion (sync Mutex for Drop).
145    tracker: Arc<SegmentTracker>,
146
147    /// Cached deletion callback for snapshots (avoids allocation per acquire_snapshot).
148    delete_fn: Arc<dyn Fn(Vec<SegmentId>) + Send + Sync>,
149
150    /// Directory for segment I/O
151    directory: Arc<D>,
152    /// Schema for segment operations
153    schema: Arc<crate::dsl::Schema>,
154    /// Term cache blocks for segment readers during merge
155    term_cache_blocks: usize,
156    /// Maximum number of concurrent background merges
157    max_concurrent_merges: usize,
158}
159
160impl<D: DirectoryWriter + 'static> SegmentManager<D> {
161    /// Create a new segment manager with existing metadata
162    pub fn new(
163        directory: Arc<D>,
164        schema: Arc<crate::dsl::Schema>,
165        metadata: IndexMetadata,
166        merge_policy: Box<dyn MergePolicy>,
167        term_cache_blocks: usize,
168        max_concurrent_merges: usize,
169    ) -> Self {
170        let tracker = Arc::new(SegmentTracker::new());
171        for seg_id in metadata.segment_metas.keys() {
172            tracker.register(seg_id);
173        }
174
175        let delete_fn: Arc<dyn Fn(Vec<SegmentId>) + Send + Sync> = {
176            let dir = Arc::clone(&directory);
177            Arc::new(move |segment_ids| {
178                // Guard: if the tokio runtime is gone (program exit), skip async
179                // deletion. Segment files become orphans cleaned up on next startup.
180                let Ok(handle) = tokio::runtime::Handle::try_current() else {
181                    return;
182                };
183                let dir = Arc::clone(&dir);
184                handle.spawn(async move {
185                    for segment_id in segment_ids {
186                        log::info!(
187                            "[segment_cleanup] deleting deferred segment {}",
188                            segment_id.0
189                        );
190                        let _ = crate::segment::delete_segment(dir.as_ref(), segment_id).await;
191                    }
192                });
193            })
194        };
195
196        Self {
197            state: AsyncMutex::new(ManagerState {
198                metadata,
199                merge_policy,
200            }),
201            merge_inventory: Arc::new(MergeInventory::new()),
202            merge_handles: AsyncMutex::new(Vec::new()),
203            trained: ArcSwapOption::new(None),
204            tracker,
205            delete_fn,
206            directory,
207            schema,
208            term_cache_blocks,
209            max_concurrent_merges: max_concurrent_merges.max(1),
210        }
211    }
212
213    // ========================================================================
214    // Read path (brief lock or lock-free)
215    // ========================================================================
216
217    /// Get the current segment IDs
218    pub async fn get_segment_ids(&self) -> Vec<String> {
219        self.state.lock().await.metadata.segment_ids()
220    }
221
222    /// Get trained vector structures (lock-free via ArcSwap)
223    pub fn trained(&self) -> Option<Arc<TrainedVectorStructures>> {
224        self.trained.load_full()
225    }
226
227    /// Load trained structures from disk and publish to ArcSwap.
228    /// Copies metadata under lock, releases lock, then does disk I/O.
229    pub async fn load_and_publish_trained(&self) {
230        // Copy vector_fields under lock (cheap clone of HashMap<u32, FieldMeta>)
231        let vector_fields = {
232            let st = self.state.lock().await;
233            st.metadata.vector_fields.clone()
234        };
235        // Disk I/O happens WITHOUT holding the state lock
236        let trained =
237            IndexMetadata::load_trained_from_fields(&vector_fields, self.directory.as_ref()).await;
238        if let Some(t) = trained {
239            self.trained.store(Some(Arc::new(t)));
240        }
241    }
242
243    /// Clear trained structures (sets ArcSwap to None)
244    pub(crate) fn clear_trained(&self) {
245        self.trained.store(None);
246    }
247
248    /// Read metadata with a closure (no persist)
249    pub(crate) async fn read_metadata<F, R>(&self, f: F) -> R
250    where
251        F: FnOnce(&IndexMetadata) -> R,
252    {
253        let st = self.state.lock().await;
254        f(&st.metadata)
255    }
256
257    /// Update metadata with a closure and persist atomically
258    pub(crate) async fn update_metadata<F>(&self, f: F) -> Result<()>
259    where
260        F: FnOnce(&mut IndexMetadata),
261    {
262        let mut st = self.state.lock().await;
263        f(&mut st.metadata);
264        st.metadata.save(self.directory.as_ref()).await
265    }
266
267    /// Acquire a snapshot of current segments for reading.
268    /// The snapshot holds references — segments won't be deleted while snapshot exists.
269    pub async fn acquire_snapshot(&self) -> SegmentSnapshot {
270        let acquired = {
271            let st = self.state.lock().await;
272            let segment_ids = st.metadata.segment_ids();
273            self.tracker.acquire(&segment_ids)
274        };
275
276        SegmentSnapshot::with_delete_fn(
277            Arc::clone(&self.tracker),
278            acquired,
279            Arc::clone(&self.delete_fn),
280        )
281    }
282
283    /// Get the segment tracker
284    pub fn tracker(&self) -> Arc<SegmentTracker> {
285        Arc::clone(&self.tracker)
286    }
287
288    /// Get the directory
289    pub fn directory(&self) -> Arc<D> {
290        Arc::clone(&self.directory)
291    }
292}
293
294// ============================================================================
295// Native-only: commit, merging, force_merge
296// ============================================================================
297
298#[cfg(feature = "native")]
299impl<D: DirectoryWriter + 'static> SegmentManager<D> {
300    /// Atomic commit: register new segments + persist metadata.
301    pub async fn commit(&self, new_segments: Vec<(String, u32)>) -> Result<()> {
302        let mut st = self.state.lock().await;
303        for (segment_id, num_docs) in new_segments {
304            if !st.metadata.has_segment(&segment_id) {
305                st.metadata.add_segment(segment_id.clone(), num_docs);
306                self.tracker.register(&segment_id);
307            }
308        }
309        st.metadata.save(self.directory.as_ref()).await
310    }
311
312    /// Evaluate merge policy and spawn background merges for all eligible candidates.
313    ///
314    /// **Atomicity**: The entire filter → find_merges → spawn_merge sequence runs
315    /// under the `state` lock to prevent a TOCTOU race where concurrent callers
316    /// both see segments as eligible before either registers them in the inventory.
317    /// `spawn_merge` is non-blocking (just `try_register` + `tokio::spawn`), so
318    /// holding the state lock through it is safe and sub-microsecond.
319    ///
320    /// Note: `max_concurrent_merges` is a soft limit — concurrent auto-triggers
321    /// may briefly exceed it by one or two due to TOCTOU between slot counting
322    /// and handle registration.
323    pub async fn maybe_merge(self: &Arc<Self>) {
324        // Drain completed handles and check how many slots are available
325        let slots_available = {
326            let mut handles = self.merge_handles.lock().await;
327            handles.retain(|h| !h.is_finished());
328            self.max_concurrent_merges.saturating_sub(handles.len())
329        };
330
331        if slots_available == 0 {
332            log::debug!("[maybe_merge] at max concurrent merges, skipping");
333            return;
334        }
335
336        // Hold state lock through spawn_merge to make filter + register atomic.
337        // This closes the TOCTOU window where concurrent maybe_merge calls could
338        // both see the same segments as eligible before either registers them.
339        let new_handles = {
340            let st = self.state.lock().await;
341
342            // Exclude segments that are pending deletion OR already in an active merge.
343            let segments: Vec<SegmentInfo> = st
344                .metadata
345                .segment_metas
346                .iter()
347                .filter(|(id, _)| {
348                    !self.tracker.is_pending_deletion(id) && !self.merge_inventory.contains(id)
349                })
350                .map(|(id, info)| SegmentInfo {
351                    id: id.clone(),
352                    num_docs: info.num_docs,
353                })
354                .collect();
355
356            log::debug!("[maybe_merge] {} eligible segments", segments.len());
357
358            let candidates = st.merge_policy.find_merges(&segments);
359
360            if candidates.is_empty() {
361                return;
362            }
363
364            log::debug!(
365                "[maybe_merge] {} merge candidates, {} slots available",
366                candidates.len(),
367                slots_available
368            );
369
370            let mut handles = Vec::new();
371            for c in candidates {
372                if handles.len() >= slots_available {
373                    break;
374                }
375                if let Some(h) = self.spawn_merge(c.segment_ids) {
376                    handles.push(h);
377                }
378            }
379            handles
380            // state lock released here — after spawn_merge registered IDs in inventory
381        };
382
383        if !new_handles.is_empty() {
384            self.merge_handles.lock().await.extend(new_handles);
385        }
386    }
387
388    /// Spawn a background merge task with RAII tracking.
389    ///
390    /// Pre-generates the output segment ID. `MergeGuard` registers all segment IDs
391    /// (old + output) in `merge_inventory`. When the task ends (success, failure, or
392    /// panic), the guard drops and segments are automatically unregistered.
393    ///
394    /// On completion, the task auto-triggers `maybe_merge` to evaluate cascading merges.
395    /// Returns the JoinHandle if the merge was spawned, None if it was skipped.
396    fn spawn_merge(self: &Arc<Self>, segment_ids_to_merge: Vec<String>) -> Option<JoinHandle<()>> {
397        let output_id = SegmentId::new();
398        let output_hex = output_id.to_hex();
399
400        let mut all_ids = segment_ids_to_merge.clone();
401        all_ids.push(output_hex);
402
403        let guard = match self.merge_inventory.try_register(all_ids) {
404            Some(g) => g,
405            None => {
406                log::debug!("[spawn_merge] skipped: segments overlap with active merge");
407                return None;
408            }
409        };
410
411        let sm = Arc::clone(self);
412        let ids = segment_ids_to_merge;
413
414        Some(tokio::spawn(async move {
415            let _guard = guard;
416
417            let trained_snap = sm.trained();
418            let result = Self::do_merge(
419                sm.directory.as_ref(),
420                &sm.schema,
421                &ids,
422                output_id,
423                sm.term_cache_blocks,
424                trained_snap.as_deref(),
425                false,
426            )
427            .await;
428
429            match result {
430                Ok((new_id, doc_count)) => {
431                    if let Err(e) = sm.replace_segments(&ids, new_id, doc_count).await {
432                        log::error!("[merge] Failed to replace segments after merge: {:?}", e);
433                    }
434                }
435                Err(e) => {
436                    log::error!(
437                        "[merge] Background merge failed for segments {:?}: {:?}",
438                        ids,
439                        e
440                    );
441                }
442            }
443            // _guard drops here → segment IDs unregistered from inventory
444
445            // Auto-trigger: re-evaluate merge policy after this merge completes.
446            // The merged output may now be eligible for a higher-tier merge.
447            sm.maybe_merge().await;
448        }))
449    }
450
451    /// Atomically replace old segments with a new merged segment.
452    /// Computes merge generation as max(parent gens) + 1 and records ancestors.
453    async fn replace_segments(
454        &self,
455        old_ids: &[String],
456        new_id: String,
457        doc_count: u32,
458    ) -> Result<()> {
459        self.tracker.register(&new_id);
460
461        {
462            let mut st = self.state.lock().await;
463            // Compute generation from parents before removing them
464            let parent_gen = old_ids
465                .iter()
466                .filter_map(|id| st.metadata.segment_metas.get(id))
467                .map(|info| info.generation)
468                .max()
469                .unwrap_or(0);
470            let ancestors: Vec<String> = old_ids.to_vec();
471
472            for id in old_ids {
473                st.metadata.remove_segment(id);
474            }
475            st.metadata
476                .add_merged_segment(new_id, doc_count, ancestors, parent_gen + 1);
477            // Mutation + persist must be atomic — keep under lock
478            st.metadata.save(self.directory.as_ref()).await?;
479        }
480
481        let ready_to_delete = self.tracker.mark_for_deletion(old_ids);
482        for segment_id in ready_to_delete {
483            let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
484        }
485        Ok(())
486    }
487
488    /// Perform the actual merge operation (pure function — no shared state access).
489    /// `output_segment_id` is pre-generated by the caller so it can be tracked in `merge_inventory`.
490    /// When `force_reorder` is true, all BMP fields get record-level BP reordering.
491    /// Returns (new_segment_id_hex, total_doc_count).
492    pub(crate) async fn do_merge(
493        directory: &D,
494        schema: &Arc<crate::dsl::Schema>,
495        segment_ids_to_merge: &[String],
496        output_segment_id: SegmentId,
497        term_cache_blocks: usize,
498        trained: Option<&TrainedVectorStructures>,
499        force_reorder: bool,
500    ) -> Result<(String, u32)> {
501        let output_hex = output_segment_id.to_hex();
502        let load_start = std::time::Instant::now();
503
504        let segment_ids: Vec<SegmentId> = segment_ids_to_merge
505            .iter()
506            .map(|id_str| {
507                SegmentId::from_hex(id_str)
508                    .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))
509            })
510            .collect::<Result<Vec<_>>>()?;
511
512        let schema_arc = Arc::clone(schema);
513        let futures: Vec<_> = segment_ids
514            .iter()
515            .map(|&sid| {
516                let sch = Arc::clone(&schema_arc);
517                async move { SegmentReader::open(directory, sid, sch, term_cache_blocks).await }
518            })
519            .collect();
520
521        let results = futures::future::join_all(futures).await;
522        let mut readers = Vec::with_capacity(results.len());
523        let mut total_docs = 0u64;
524        for (i, result) in results.into_iter().enumerate() {
525            match result {
526                Ok(r) => {
527                    total_docs += r.meta().num_docs as u64;
528                    readers.push(r);
529                }
530                Err(e) => {
531                    log::error!(
532                        "[merge] Failed to open segment {}: {:?}",
533                        segment_ids_to_merge[i],
534                        e
535                    );
536                    return Err(e);
537                }
538            }
539        }
540
541        // Pre-merge validation: verify each source segment's store doc count
542        // matches its metadata. Catching mismatches early avoids building a
543        // corrupted merged segment and leaving orphan files on disk.
544        for (i, reader) in readers.iter().enumerate() {
545            let meta_docs = reader.meta().num_docs;
546            let store_docs = reader.store().num_docs();
547            if store_docs != meta_docs {
548                return Err(Error::Corruption(format!(
549                    "pre-merge validation: segment {} store has {} docs but meta says {}",
550                    segment_ids_to_merge[i], store_docs, meta_docs
551                )));
552            }
553        }
554
555        log::info!(
556            "[merge] loaded {} segment readers in {:.1}s",
557            readers.len(),
558            load_start.elapsed().as_secs_f64()
559        );
560
561        let merger = SegmentMerger::new(Arc::clone(schema)).with_force_reorder(force_reorder);
562
563        log::info!(
564            "[merge] {} segments -> {} (trained={}, force_reorder={})",
565            segment_ids_to_merge.len(),
566            output_hex,
567            trained.map_or(0, |t| t.centroids.len()),
568            force_reorder,
569        );
570
571        merger
572            .merge(directory, &readers, output_segment_id, trained)
573            .await?;
574
575        log::info!(
576            "[merge] total wall-clock: {:.1}s ({} segments, {} docs)",
577            load_start.elapsed().as_secs_f64(),
578            readers.len(),
579            total_docs,
580        );
581
582        if total_docs > u32::MAX as u64 {
583            return Err(Error::Internal(format!(
584                "Merged segment doc count ({}) exceeds u32::MAX",
585                total_docs
586            )));
587        }
588        Ok((output_hex, total_docs as u32))
589    }
590
591    /// Abort all in-flight merge tasks without waiting for completion.
592    /// Used during index deletion to stop background work immediately.
593    pub async fn abort_merges(&self) {
594        let handles: Vec<JoinHandle<()>> =
595            { std::mem::take(&mut *self.merge_handles.lock().await) };
596        for h in handles {
597            h.abort();
598        }
599    }
600
601    /// Wait for all current in-flight merges to complete.
602    pub async fn wait_for_merging_thread(self: &Arc<Self>) {
603        let handles: Vec<JoinHandle<()>> =
604            { std::mem::take(&mut *self.merge_handles.lock().await) };
605        for h in handles {
606            let _ = h.await;
607        }
608    }
609
610    /// Wait for all eligible merges to complete, including cascading merges.
611    ///
612    /// Drains current handles, then loops. Each completed merge auto-triggers
613    /// `maybe_merge` (which pushes new handles) before its JoinHandle resolves,
614    /// so by the time `h.await` returns all cascading handles are registered.
615    pub async fn wait_for_all_merges(self: &Arc<Self>) {
616        loop {
617            let handles: Vec<JoinHandle<()>> =
618                { std::mem::take(&mut *self.merge_handles.lock().await) };
619            if handles.is_empty() {
620                break;
621            }
622            for h in handles {
623                let _ = h.await;
624            }
625        }
626    }
627
628    /// Force merge all segments into one. Iterates in batches until ≤1 segment remains.
629    ///
630    /// Each batch is registered in `merge_inventory` via `MergeGuard` to prevent
631    /// `maybe_merge` from spawning a conflicting background merge.
632    pub async fn force_merge(self: &Arc<Self>) -> Result<()> {
633        const FORCE_MERGE_BATCH: usize = 64;
634
635        // Wait for all in-flight background merges (including cascading)
636        // before starting forced merges to avoid try_register conflicts.
637        self.wait_for_all_merges().await;
638
639        loop {
640            let ids_to_merge = self.get_segment_ids().await;
641            if ids_to_merge.len() < 2 {
642                return Ok(());
643            }
644
645            let batch: Vec<String> = ids_to_merge.into_iter().take(FORCE_MERGE_BATCH).collect();
646
647            log::info!("[force_merge] merging batch of {} segments", batch.len());
648
649            let output_id = SegmentId::new();
650            let output_hex = output_id.to_hex();
651
652            // Register batch + output in inventory so maybe_merge skips them.
653            let mut all_ids = batch.clone();
654            all_ids.push(output_hex);
655            let _guard = match self.merge_inventory.try_register(all_ids) {
656                Some(g) => g,
657                None => {
658                    // A background merge slipped in — wait for it, then retry the loop
659                    self.wait_for_merging_thread().await;
660                    continue;
661                }
662            };
663
664            let trained_snap = self.trained();
665            let (new_segment_id, total_docs) = Self::do_merge(
666                self.directory.as_ref(),
667                &self.schema,
668                &batch,
669                output_id,
670                self.term_cache_blocks,
671                trained_snap.as_deref(),
672                false,
673            )
674            .await?;
675
676            self.replace_segments(&batch, new_segment_id, total_docs)
677                .await?;
678
679            // _guard drops here → segments unregistered from inventory
680        }
681    }
682
683    /// Reorder all segments via Recursive Graph Bisection (BP) for better BMP pruning.
684    ///
685    /// Each segment is individually rebuilt with record-level BP reordering:
686    /// ordinals are shuffled across blocks so that similar content clusters tightly.
687    /// Non-BMP fields pass through unchanged (identity-copied via merge).
688    ///
689    /// Uses the same locking infrastructure as merge: `MergeInventory` prevents
690    /// concurrent operations on the same segment, `replace_segments()` does atomic
691    /// metadata swap.
692    pub async fn reorder_segments(self: &Arc<Self>) -> Result<()> {
693        self.wait_for_all_merges().await;
694        let segment_ids = self.get_segment_ids().await;
695
696        if segment_ids.is_empty() {
697            log::info!("[reorder] no segments to reorder");
698            return Ok(());
699        }
700
701        log::info!("[reorder] reordering {} segments", segment_ids.len());
702
703        for seg_id in segment_ids {
704            let output_id = SegmentId::new();
705            let output_hex = output_id.to_hex();
706
707            let all_ids = vec![seg_id.clone(), output_hex];
708            let _guard = match self.merge_inventory.try_register(all_ids) {
709                Some(g) => g,
710                None => {
711                    log::warn!("[reorder] segment {} in active merge, skipping", seg_id);
712                    continue;
713                }
714            };
715
716            let trained_snap = self.trained();
717            let (new_id, total_docs) = Self::do_merge(
718                self.directory.as_ref(),
719                &self.schema,
720                std::slice::from_ref(&seg_id),
721                output_id,
722                self.term_cache_blocks,
723                trained_snap.as_deref(),
724                true, // force_reorder
725            )
726            .await?;
727
728            self.replace_segments(&[seg_id], new_id, total_docs).await?;
729        }
730
731        log::info!("[reorder] all segments reordered");
732        Ok(())
733    }
734
735    /// Clean up orphan segment files not registered in metadata.
736    ///
737    /// Non-blocking: reads both metadata and `merge_inventory` to determine which
738    /// segments are legitimate. In-flight merge outputs are protected by the inventory.
739    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
740        // Read BOTH sets under the same state lock to prevent TOCTOU:
741        // without this, a merge completing between the two reads could make
742        // its output segment invisible to both sets → wrongly deleted.
743        let (registered_set, in_merge_set) = {
744            let st = self.state.lock().await;
745            let registered = st
746                .metadata
747                .segment_metas
748                .keys()
749                .cloned()
750                .collect::<HashSet<String>>();
751            let in_merge = self.merge_inventory.snapshot();
752            (registered, in_merge)
753        };
754
755        let mut orphan_ids: HashSet<String> = HashSet::new();
756
757        if let Ok(entries) = self.directory.list_files(std::path::Path::new("")).await {
758            for entry in entries {
759                let filename = entry.to_string_lossy();
760                if filename.starts_with("seg_") && filename.len() > 37 {
761                    let hex_part = &filename[4..36];
762                    if !registered_set.contains(hex_part) && !in_merge_set.contains(hex_part) {
763                        orphan_ids.insert(hex_part.to_string());
764                    }
765                }
766            }
767        }
768
769        let mut deleted = 0;
770        for hex_id in &orphan_ids {
771            if let Some(segment_id) = SegmentId::from_hex(hex_id)
772                && crate::segment::delete_segment(self.directory.as_ref(), segment_id)
773                    .await
774                    .is_ok()
775            {
776                deleted += 1;
777            }
778        }
779
780        Ok(deleted)
781    }
782}
783
784#[cfg(test)]
785mod tests {
786    use super::*;
787
788    #[test]
789    fn test_inventory_guard_drop_unregisters() {
790        let inv = Arc::new(MergeInventory::new());
791        {
792            let _guard = inv.try_register(vec!["a".into(), "b".into()]).unwrap();
793            let snap = inv.snapshot();
794            assert!(snap.contains("a"));
795            assert!(snap.contains("b"));
796        }
797        // Guard dropped → segments unregistered
798        assert!(inv.snapshot().is_empty());
799    }
800
801    #[test]
802    fn test_inventory_concurrent_non_overlapping_merges() {
803        let inv = Arc::new(MergeInventory::new());
804        let _g1 = inv.try_register(vec!["a".into(), "b".into()]).unwrap();
805        // Non-overlapping merge succeeds concurrently
806        let _g2 = inv.try_register(vec!["c".into(), "d".into()]).unwrap();
807        let snap = inv.snapshot();
808        assert_eq!(snap.len(), 4);
809
810        // Drop first guard — only its segments are removed
811        drop(_g1);
812        let snap = inv.snapshot();
813        assert_eq!(snap.len(), 2);
814        assert!(snap.contains("c"));
815        assert!(snap.contains("d"));
816    }
817
818    #[test]
819    fn test_inventory_overlapping_merge_rejected() {
820        let inv = Arc::new(MergeInventory::new());
821        let _g1 = inv.try_register(vec!["a".into(), "b".into()]).unwrap();
822        // Overlapping merge rejected (shares "b")
823        assert!(inv.try_register(vec!["b".into(), "c".into()]).is_none());
824        // After drop, the overlapping merge succeeds
825        drop(_g1);
826        assert!(inv.try_register(vec!["b".into(), "c".into()]).is_some());
827    }
828
829    #[test]
830    fn test_inventory_snapshot() {
831        let inv = Arc::new(MergeInventory::new());
832        let _g = inv.try_register(vec!["x".into(), "y".into()]).unwrap();
833        let snap = inv.snapshot();
834        assert!(snap.contains("x"));
835        assert!(snap.contains("y"));
836        assert!(!snap.contains("z"));
837    }
838}