Skip to main content

hermes_core/merge/
segment_manager.rs

1//! Segment manager — coordinates segment commit, background merging, and trained structures.
2//!
3//! Architecture:
4//! - **Single mutation queue**: All metadata mutations serialize through `tokio::sync::Mutex<ManagerState>`.
5//! - **Concurrent merges**: Multiple non-overlapping merges can run in parallel.
6//!   Each merge registers its segment IDs in `MergeInventory` via RAII `MergeGuard`.
7//!   New merges are rejected only if they share segments with an active merge.
8//! - **Auto-trigger**: Each completed merge re-evaluates the merge policy and spawns
9//!   new merges if eligible (cascading merges for higher tiers).
10//! - **ArcSwap for trained**: Lock-free reads of trained vector structures.
11//!
12//! # Locking model (deadlock-free by construction)
13//!
14//! ```text
15//! Lock ordering (acquire in this order):
16//!   1. state               — tokio::sync::Mutex, held for mutations + disk I/O
17//!   2. merge_inventory     — parking_lot::Mutex (sync), sub-μs hold, RAII via MergeGuard
18//!   3. tracker.inner       — parking_lot::Mutex (sync), sub-μs hold
19//!
20//! Lock-free state:
21//!   trained                — arc_swap::ArcSwapOption, no ordering constraint
22//!   merge_handles          — tokio::sync::Mutex, never held with state
23//! ```
24//!
25//! **Rule:** Never hold a sync lock while `.await`-ing.
26
27use std::collections::HashSet;
28use std::sync::Arc;
29
30use arc_swap::ArcSwapOption;
31use tokio::sync::Mutex as AsyncMutex;
32use tokio::task::JoinHandle;
33
34use crate::directories::DirectoryWriter;
35use crate::error::{Error, Result};
36use crate::index::IndexMetadata;
37use crate::segment::{SegmentId, SegmentSnapshot, SegmentTracker, TrainedVectorStructures};
38#[cfg(feature = "native")]
39use crate::segment::{SegmentMerger, SegmentReader};
40
41use super::{MergePolicy, SegmentInfo};
42
43// ============================================================================
44// RAII merge tracking
45// ============================================================================
46
47/// Tracks which segments are involved in active merges.
48///
49/// Supports multiple concurrent merges. Each merge registers its segment IDs;
50/// a new merge is rejected only if any of its segments overlap with an active merge.
51/// Uses RAII via `MergeGuard`: when a merge task ends, the guard drops and
52/// its segment IDs are automatically unregistered.
53struct MergeInventory {
54    inner: parking_lot::Mutex<HashSet<String>>,
55}
56
57impl MergeInventory {
58    fn new() -> Self {
59        Self {
60            inner: parking_lot::Mutex::new(HashSet::new()),
61        }
62    }
63
64    /// Try to register a merge. Returns `MergeGuard` on success, `None` if
65    /// any of the requested segments are already in an active merge.
66    fn try_register(self: &Arc<Self>, segment_ids: Vec<String>) -> Option<MergeGuard> {
67        let mut inner = self.inner.lock();
68        // Check for overlap with any active merge
69        for id in &segment_ids {
70            if inner.contains(id) {
71                log::debug!(
72                    "[merge_inventory] rejected: {} overlaps with active merge ({} active IDs)",
73                    id,
74                    inner.len()
75                );
76                return None;
77            }
78        }
79        log::debug!(
80            "[merge_inventory] registered {} IDs (total active: {})",
81            segment_ids.len(),
82            inner.len() + segment_ids.len()
83        );
84        for id in &segment_ids {
85            inner.insert(id.clone());
86        }
87        Some(MergeGuard {
88            inventory: Arc::clone(self),
89            segment_ids,
90        })
91    }
92
93    /// Snapshot of all in-merge segment IDs (for cleanup_orphan_segments)
94    fn snapshot(&self) -> HashSet<String> {
95        self.inner.lock().clone()
96    }
97
98    /// Check if a specific segment is currently involved in a merge.
99    fn contains(&self, segment_id: &str) -> bool {
100        self.inner.lock().contains(segment_id)
101    }
102}
103
104/// RAII guard for a merge operation.
105/// Dropped when the merge task completes (success, failure, or panic) —
106/// automatically unregisters this merge's segment IDs from the inventory.
107struct MergeGuard {
108    inventory: Arc<MergeInventory>,
109    segment_ids: Vec<String>,
110}
111
112impl Drop for MergeGuard {
113    fn drop(&mut self) {
114        let mut inner = self.inventory.inner.lock();
115        for id in &self.segment_ids {
116            inner.remove(id);
117        }
118    }
119}
120
121/// All mutable state behind the single async Mutex.
122struct ManagerState {
123    metadata: IndexMetadata,
124    merge_policy: Box<dyn MergePolicy>,
125}
126
127/// Segment manager — coordinates segment commit, background merging, and trained structures.
128///
129/// SOLE owner of `metadata.json`. All metadata mutations go through `state` Mutex.
130pub struct SegmentManager<D: DirectoryWriter + 'static> {
131    /// Serializes ALL metadata mutations.
132    state: AsyncMutex<ManagerState>,
133
134    /// RAII merge tracking: segments are registered on merge start, automatically
135    /// unregistered when the merge task ends (via MergeGuard drop).
136    merge_inventory: Arc<MergeInventory>,
137
138    /// In-flight merge JoinHandles — supports multiple concurrent merges.
139    merge_handles: AsyncMutex<Vec<JoinHandle<()>>>,
140
141    /// Trained vector structures — lock-free reads via ArcSwap.
142    trained: ArcSwapOption<TrainedVectorStructures>,
143
144    /// Reference counting for safe segment deletion (sync Mutex for Drop).
145    tracker: Arc<SegmentTracker>,
146
147    /// Cached deletion callback for snapshots (avoids allocation per acquire_snapshot).
148    delete_fn: Arc<dyn Fn(Vec<SegmentId>) + Send + Sync>,
149
150    /// Directory for segment I/O
151    directory: Arc<D>,
152    /// Schema for segment operations
153    schema: Arc<crate::dsl::Schema>,
154    /// Term cache blocks for segment readers during merge
155    term_cache_blocks: usize,
156    /// Maximum number of concurrent background merges
157    max_concurrent_merges: usize,
158}
159
160impl<D: DirectoryWriter + 'static> SegmentManager<D> {
161    /// Create a new segment manager with existing metadata
162    pub fn new(
163        directory: Arc<D>,
164        schema: Arc<crate::dsl::Schema>,
165        metadata: IndexMetadata,
166        merge_policy: Box<dyn MergePolicy>,
167        term_cache_blocks: usize,
168        max_concurrent_merges: usize,
169    ) -> Self {
170        let tracker = Arc::new(SegmentTracker::new());
171        for seg_id in metadata.segment_metas.keys() {
172            tracker.register(seg_id);
173        }
174
175        let delete_fn: Arc<dyn Fn(Vec<SegmentId>) + Send + Sync> = {
176            let dir = Arc::clone(&directory);
177            Arc::new(move |segment_ids| {
178                // Guard: if the tokio runtime is gone (program exit), skip async
179                // deletion. Segment files become orphans cleaned up on next startup.
180                let Ok(handle) = tokio::runtime::Handle::try_current() else {
181                    return;
182                };
183                let dir = Arc::clone(&dir);
184                handle.spawn(async move {
185                    for segment_id in segment_ids {
186                        log::info!(
187                            "[segment_cleanup] deleting deferred segment {}",
188                            segment_id.0
189                        );
190                        let _ = crate::segment::delete_segment(dir.as_ref(), segment_id).await;
191                    }
192                });
193            })
194        };
195
196        Self {
197            state: AsyncMutex::new(ManagerState {
198                metadata,
199                merge_policy,
200            }),
201            merge_inventory: Arc::new(MergeInventory::new()),
202            merge_handles: AsyncMutex::new(Vec::new()),
203            trained: ArcSwapOption::new(None),
204            tracker,
205            delete_fn,
206            directory,
207            schema,
208            term_cache_blocks,
209            max_concurrent_merges: max_concurrent_merges.max(1),
210        }
211    }
212
213    // ========================================================================
214    // Read path (brief lock or lock-free)
215    // ========================================================================
216
217    /// Get the current segment IDs
218    pub async fn get_segment_ids(&self) -> Vec<String> {
219        self.state.lock().await.metadata.segment_ids()
220    }
221
222    /// Get trained vector structures (lock-free via ArcSwap)
223    pub fn trained(&self) -> Option<Arc<TrainedVectorStructures>> {
224        self.trained.load_full()
225    }
226
227    /// Load trained structures from disk and publish to ArcSwap.
228    /// Copies metadata under lock, releases lock, then does disk I/O.
229    pub async fn load_and_publish_trained(&self) {
230        // Copy vector_fields under lock (cheap clone of HashMap<u32, FieldMeta>)
231        let vector_fields = {
232            let st = self.state.lock().await;
233            st.metadata.vector_fields.clone()
234        };
235        // Disk I/O happens WITHOUT holding the state lock
236        let trained =
237            IndexMetadata::load_trained_from_fields(&vector_fields, self.directory.as_ref()).await;
238        if let Some(t) = trained {
239            self.trained.store(Some(Arc::new(t)));
240        }
241    }
242
243    /// Clear trained structures (sets ArcSwap to None)
244    pub(crate) fn clear_trained(&self) {
245        self.trained.store(None);
246    }
247
248    /// Read metadata with a closure (no persist)
249    pub(crate) async fn read_metadata<F, R>(&self, f: F) -> R
250    where
251        F: FnOnce(&IndexMetadata) -> R,
252    {
253        let st = self.state.lock().await;
254        f(&st.metadata)
255    }
256
257    /// Update metadata with a closure and persist atomically
258    pub(crate) async fn update_metadata<F>(&self, f: F) -> Result<()>
259    where
260        F: FnOnce(&mut IndexMetadata),
261    {
262        let mut st = self.state.lock().await;
263        f(&mut st.metadata);
264        st.metadata.save(self.directory.as_ref()).await
265    }
266
267    /// Acquire a snapshot of current segments for reading.
268    /// The snapshot holds references — segments won't be deleted while snapshot exists.
269    pub async fn acquire_snapshot(&self) -> SegmentSnapshot {
270        let acquired = {
271            let st = self.state.lock().await;
272            let segment_ids = st.metadata.segment_ids();
273            self.tracker.acquire(&segment_ids)
274        };
275
276        SegmentSnapshot::with_delete_fn(
277            Arc::clone(&self.tracker),
278            acquired,
279            Arc::clone(&self.delete_fn),
280        )
281    }
282
283    /// Get the segment tracker
284    pub fn tracker(&self) -> Arc<SegmentTracker> {
285        Arc::clone(&self.tracker)
286    }
287
288    /// Get the directory
289    pub fn directory(&self) -> Arc<D> {
290        Arc::clone(&self.directory)
291    }
292}
293
294// ============================================================================
295// Native-only: commit, merging, force_merge
296// ============================================================================
297
298#[cfg(feature = "native")]
299impl<D: DirectoryWriter + 'static> SegmentManager<D> {
300    /// Atomic commit: register new segments + persist metadata.
301    pub async fn commit(&self, new_segments: Vec<(String, u32)>) -> Result<()> {
302        let mut st = self.state.lock().await;
303        for (segment_id, num_docs) in new_segments {
304            if !st.metadata.has_segment(&segment_id) {
305                st.metadata.add_segment(segment_id.clone(), num_docs);
306                self.tracker.register(&segment_id);
307            }
308        }
309        st.metadata.save(self.directory.as_ref()).await
310    }
311
312    /// Evaluate merge policy and spawn background merges for all eligible candidates.
313    ///
314    /// **Atomicity**: The entire filter → find_merges → spawn_merge sequence runs
315    /// under the `state` lock to prevent a TOCTOU race where concurrent callers
316    /// both see segments as eligible before either registers them in the inventory.
317    /// `spawn_merge` is non-blocking (just `try_register` + `tokio::spawn`), so
318    /// holding the state lock through it is safe and sub-microsecond.
319    ///
320    /// Note: `max_concurrent_merges` is a soft limit — concurrent auto-triggers
321    /// may briefly exceed it by one or two due to TOCTOU between slot counting
322    /// and handle registration.
323    pub async fn maybe_merge(self: &Arc<Self>) {
324        // Drain completed handles and check how many slots are available
325        let slots_available = {
326            let mut handles = self.merge_handles.lock().await;
327            handles.retain(|h| !h.is_finished());
328            self.max_concurrent_merges.saturating_sub(handles.len())
329        };
330
331        if slots_available == 0 {
332            log::debug!("[maybe_merge] at max concurrent merges, skipping");
333            return;
334        }
335
336        // Hold state lock through spawn_merge to make filter + register atomic.
337        // This closes the TOCTOU window where concurrent maybe_merge calls could
338        // both see the same segments as eligible before either registers them.
339        let new_handles = {
340            let st = self.state.lock().await;
341
342            // Exclude segments that are pending deletion OR already in an active merge.
343            let segments: Vec<SegmentInfo> = st
344                .metadata
345                .segment_metas
346                .iter()
347                .filter(|(id, _)| {
348                    !self.tracker.is_pending_deletion(id) && !self.merge_inventory.contains(id)
349                })
350                .map(|(id, info)| SegmentInfo {
351                    id: id.clone(),
352                    num_docs: info.num_docs,
353                })
354                .collect();
355
356            log::debug!("[maybe_merge] {} eligible segments", segments.len());
357
358            let candidates = st.merge_policy.find_merges(&segments);
359
360            if candidates.is_empty() {
361                return;
362            }
363
364            log::debug!(
365                "[maybe_merge] {} merge candidates, {} slots available",
366                candidates.len(),
367                slots_available
368            );
369
370            let mut handles = Vec::new();
371            for c in candidates {
372                if handles.len() >= slots_available {
373                    break;
374                }
375                if let Some(h) = self.spawn_merge(c.segment_ids) {
376                    handles.push(h);
377                }
378            }
379            handles
380            // state lock released here — after spawn_merge registered IDs in inventory
381        };
382
383        if !new_handles.is_empty() {
384            self.merge_handles.lock().await.extend(new_handles);
385        }
386    }
387
388    /// Spawn a background merge task with RAII tracking.
389    ///
390    /// Pre-generates the output segment ID. `MergeGuard` registers all segment IDs
391    /// (old + output) in `merge_inventory`. When the task ends (success, failure, or
392    /// panic), the guard drops and segments are automatically unregistered.
393    ///
394    /// On completion, the task auto-triggers `maybe_merge` to evaluate cascading merges.
395    /// Returns the JoinHandle if the merge was spawned, None if it was skipped.
396    fn spawn_merge(self: &Arc<Self>, segment_ids_to_merge: Vec<String>) -> Option<JoinHandle<()>> {
397        let output_id = SegmentId::new();
398        let output_hex = output_id.to_hex();
399
400        let mut all_ids = segment_ids_to_merge.clone();
401        all_ids.push(output_hex);
402
403        let guard = match self.merge_inventory.try_register(all_ids) {
404            Some(g) => g,
405            None => {
406                log::debug!("[spawn_merge] skipped: segments overlap with active merge");
407                return None;
408            }
409        };
410
411        let sm = Arc::clone(self);
412        let ids = segment_ids_to_merge;
413
414        Some(tokio::spawn(async move {
415            let _guard = guard;
416
417            let trained_snap = sm.trained();
418            let result = Self::do_merge(
419                sm.directory.as_ref(),
420                &sm.schema,
421                &ids,
422                output_id,
423                sm.term_cache_blocks,
424                trained_snap.as_deref(),
425            )
426            .await;
427
428            match result {
429                Ok((new_id, doc_count)) => {
430                    if let Err(e) = sm.replace_segments(&ids, new_id, doc_count).await {
431                        log::error!("[merge] Failed to replace segments after merge: {:?}", e);
432                    }
433                }
434                Err(e) => {
435                    log::error!(
436                        "[merge] Background merge failed for segments {:?}: {:?}",
437                        ids,
438                        e
439                    );
440                }
441            }
442            // _guard drops here → segment IDs unregistered from inventory
443
444            // Auto-trigger: re-evaluate merge policy after this merge completes.
445            // The merged output may now be eligible for a higher-tier merge.
446            sm.maybe_merge().await;
447        }))
448    }
449
450    /// Atomically replace old segments with a new merged segment.
451    /// Computes merge generation as max(parent gens) + 1 and records ancestors.
452    async fn replace_segments(
453        &self,
454        old_ids: &[String],
455        new_id: String,
456        doc_count: u32,
457    ) -> Result<()> {
458        self.tracker.register(&new_id);
459
460        {
461            let mut st = self.state.lock().await;
462            // Compute generation from parents before removing them
463            let parent_gen = old_ids
464                .iter()
465                .filter_map(|id| st.metadata.segment_metas.get(id))
466                .map(|info| info.generation)
467                .max()
468                .unwrap_or(0);
469            let ancestors: Vec<String> = old_ids.to_vec();
470
471            for id in old_ids {
472                st.metadata.remove_segment(id);
473            }
474            st.metadata
475                .add_merged_segment(new_id, doc_count, ancestors, parent_gen + 1);
476            // Mutation + persist must be atomic — keep under lock
477            st.metadata.save(self.directory.as_ref()).await?;
478        }
479
480        let ready_to_delete = self.tracker.mark_for_deletion(old_ids);
481        for segment_id in ready_to_delete {
482            let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
483        }
484        Ok(())
485    }
486
487    /// Perform the actual merge operation (pure function — no shared state access).
488    /// `output_segment_id` is pre-generated by the caller so it can be tracked in `merge_inventory`.
489    /// Returns (new_segment_id_hex, total_doc_count).
490    pub(crate) async fn do_merge(
491        directory: &D,
492        schema: &Arc<crate::dsl::Schema>,
493        segment_ids_to_merge: &[String],
494        output_segment_id: SegmentId,
495        term_cache_blocks: usize,
496        trained: Option<&TrainedVectorStructures>,
497    ) -> Result<(String, u32)> {
498        let output_hex = output_segment_id.to_hex();
499        let load_start = std::time::Instant::now();
500
501        let segment_ids: Vec<SegmentId> = segment_ids_to_merge
502            .iter()
503            .map(|id_str| {
504                SegmentId::from_hex(id_str)
505                    .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))
506            })
507            .collect::<Result<Vec<_>>>()?;
508
509        let schema_arc = Arc::clone(schema);
510        let futures: Vec<_> = segment_ids
511            .iter()
512            .map(|&sid| {
513                let sch = Arc::clone(&schema_arc);
514                async move { SegmentReader::open(directory, sid, sch, term_cache_blocks).await }
515            })
516            .collect();
517
518        let results = futures::future::join_all(futures).await;
519        let mut readers = Vec::with_capacity(results.len());
520        let mut total_docs = 0u64;
521        for (i, result) in results.into_iter().enumerate() {
522            match result {
523                Ok(r) => {
524                    total_docs += r.meta().num_docs as u64;
525                    readers.push(r);
526                }
527                Err(e) => {
528                    log::error!(
529                        "[merge] Failed to open segment {}: {:?}",
530                        segment_ids_to_merge[i],
531                        e
532                    );
533                    return Err(e);
534                }
535            }
536        }
537
538        // Pre-merge validation: verify each source segment's store doc count
539        // matches its metadata. Catching mismatches early avoids building a
540        // corrupted merged segment and leaving orphan files on disk.
541        for (i, reader) in readers.iter().enumerate() {
542            let meta_docs = reader.meta().num_docs;
543            let store_docs = reader.store().num_docs();
544            if store_docs != meta_docs {
545                return Err(Error::Corruption(format!(
546                    "pre-merge validation: segment {} store has {} docs but meta says {}",
547                    segment_ids_to_merge[i], store_docs, meta_docs
548                )));
549            }
550        }
551
552        log::info!(
553            "[merge] loaded {} segment readers in {:.1}s",
554            readers.len(),
555            load_start.elapsed().as_secs_f64()
556        );
557
558        let merger = SegmentMerger::new(Arc::clone(schema));
559
560        log::info!(
561            "[merge] {} segments -> {} (trained={})",
562            segment_ids_to_merge.len(),
563            output_hex,
564            trained.map_or(0, |t| t.centroids.len())
565        );
566
567        merger
568            .merge(directory, &readers, output_segment_id, trained)
569            .await?;
570
571        log::info!(
572            "[merge] total wall-clock: {:.1}s ({} segments, {} docs)",
573            load_start.elapsed().as_secs_f64(),
574            readers.len(),
575            total_docs,
576        );
577
578        Ok((output_hex, total_docs.min(u32::MAX as u64) as u32))
579    }
580
581    /// Wait for all current in-flight merges to complete.
582    pub async fn wait_for_merging_thread(self: &Arc<Self>) {
583        let handles: Vec<JoinHandle<()>> =
584            { std::mem::take(&mut *self.merge_handles.lock().await) };
585        for h in handles {
586            let _ = h.await;
587        }
588    }
589
590    /// Wait for all eligible merges to complete, including cascading merges.
591    ///
592    /// Drains current handles, then loops. Each completed merge auto-triggers
593    /// `maybe_merge` (which pushes new handles) before its JoinHandle resolves,
594    /// so by the time `h.await` returns all cascading handles are registered.
595    pub async fn wait_for_all_merges(self: &Arc<Self>) {
596        loop {
597            let handles: Vec<JoinHandle<()>> =
598                { std::mem::take(&mut *self.merge_handles.lock().await) };
599            if handles.is_empty() {
600                break;
601            }
602            for h in handles {
603                let _ = h.await;
604            }
605        }
606    }
607
608    /// Force merge all segments into one. Iterates in batches until ≤1 segment remains.
609    ///
610    /// Each batch is registered in `merge_inventory` via `MergeGuard` to prevent
611    /// `maybe_merge` from spawning a conflicting background merge.
612    pub async fn force_merge(self: &Arc<Self>) -> Result<()> {
613        const FORCE_MERGE_BATCH: usize = 64;
614
615        // Wait for all in-flight background merges (including cascading)
616        // before starting forced merges to avoid try_register conflicts.
617        self.wait_for_all_merges().await;
618
619        loop {
620            let ids_to_merge = self.get_segment_ids().await;
621            if ids_to_merge.len() < 2 {
622                return Ok(());
623            }
624
625            let batch: Vec<String> = ids_to_merge.into_iter().take(FORCE_MERGE_BATCH).collect();
626
627            log::info!("[force_merge] merging batch of {} segments", batch.len());
628
629            let output_id = SegmentId::new();
630            let output_hex = output_id.to_hex();
631
632            // Register batch + output in inventory so maybe_merge skips them.
633            let mut all_ids = batch.clone();
634            all_ids.push(output_hex);
635            let _guard = match self.merge_inventory.try_register(all_ids) {
636                Some(g) => g,
637                None => {
638                    // A background merge slipped in — wait for it, then retry the loop
639                    self.wait_for_merging_thread().await;
640                    continue;
641                }
642            };
643
644            let trained_snap = self.trained();
645            let (new_segment_id, total_docs) = Self::do_merge(
646                self.directory.as_ref(),
647                &self.schema,
648                &batch,
649                output_id,
650                self.term_cache_blocks,
651                trained_snap.as_deref(),
652            )
653            .await?;
654
655            self.replace_segments(&batch, new_segment_id, total_docs)
656                .await?;
657
658            // _guard drops here → segments unregistered from inventory
659        }
660    }
661
662    /// Clean up orphan segment files not registered in metadata.
663    ///
664    /// Non-blocking: reads both metadata and `merge_inventory` to determine which
665    /// segments are legitimate. In-flight merge outputs are protected by the inventory.
666    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
667        // Read BOTH sets under the same state lock to prevent TOCTOU:
668        // without this, a merge completing between the two reads could make
669        // its output segment invisible to both sets → wrongly deleted.
670        let (registered_set, in_merge_set) = {
671            let st = self.state.lock().await;
672            let registered = st
673                .metadata
674                .segment_metas
675                .keys()
676                .cloned()
677                .collect::<HashSet<String>>();
678            let in_merge = self.merge_inventory.snapshot();
679            (registered, in_merge)
680        };
681
682        let mut orphan_ids: HashSet<String> = HashSet::new();
683
684        if let Ok(entries) = self.directory.list_files(std::path::Path::new("")).await {
685            for entry in entries {
686                let filename = entry.to_string_lossy();
687                if filename.starts_with("seg_") && filename.len() > 37 {
688                    let hex_part = &filename[4..36];
689                    if !registered_set.contains(hex_part) && !in_merge_set.contains(hex_part) {
690                        orphan_ids.insert(hex_part.to_string());
691                    }
692                }
693            }
694        }
695
696        let mut deleted = 0;
697        for hex_id in &orphan_ids {
698            if let Some(segment_id) = SegmentId::from_hex(hex_id)
699                && crate::segment::delete_segment(self.directory.as_ref(), segment_id)
700                    .await
701                    .is_ok()
702            {
703                deleted += 1;
704            }
705        }
706
707        Ok(deleted)
708    }
709}
710
711#[cfg(test)]
712mod tests {
713    use super::*;
714
715    #[test]
716    fn test_inventory_guard_drop_unregisters() {
717        let inv = Arc::new(MergeInventory::new());
718        {
719            let _guard = inv.try_register(vec!["a".into(), "b".into()]).unwrap();
720            let snap = inv.snapshot();
721            assert!(snap.contains("a"));
722            assert!(snap.contains("b"));
723        }
724        // Guard dropped → segments unregistered
725        assert!(inv.snapshot().is_empty());
726    }
727
728    #[test]
729    fn test_inventory_concurrent_non_overlapping_merges() {
730        let inv = Arc::new(MergeInventory::new());
731        let _g1 = inv.try_register(vec!["a".into(), "b".into()]).unwrap();
732        // Non-overlapping merge succeeds concurrently
733        let _g2 = inv.try_register(vec!["c".into(), "d".into()]).unwrap();
734        let snap = inv.snapshot();
735        assert_eq!(snap.len(), 4);
736
737        // Drop first guard — only its segments are removed
738        drop(_g1);
739        let snap = inv.snapshot();
740        assert_eq!(snap.len(), 2);
741        assert!(snap.contains("c"));
742        assert!(snap.contains("d"));
743    }
744
745    #[test]
746    fn test_inventory_overlapping_merge_rejected() {
747        let inv = Arc::new(MergeInventory::new());
748        let _g1 = inv.try_register(vec!["a".into(), "b".into()]).unwrap();
749        // Overlapping merge rejected (shares "b")
750        assert!(inv.try_register(vec!["b".into(), "c".into()]).is_none());
751        // After drop, the overlapping merge succeeds
752        drop(_g1);
753        assert!(inv.try_register(vec!["b".into(), "c".into()]).is_some());
754    }
755
756    #[test]
757    fn test_inventory_snapshot() {
758        let inv = Arc::new(MergeInventory::new());
759        let _g = inv.try_register(vec!["x".into(), "y".into()]).unwrap();
760        let snap = inv.snapshot();
761        assert!(snap.contains("x"));
762        assert!(snap.contains("y"));
763        assert!(!snap.contains("z"));
764    }
765}