Skip to main content

hermes_core/segment/
tracker.rs

1//! Segment lifecycle tracker with reference counting
2//!
3//! This module provides safe segment deletion by tracking references:
4//! - Readers acquire segment snapshots (incrementing ref counts)
5//! - When snapshot is dropped, ref counts are decremented
6//! - Segments marked for deletion are only deleted when ref count reaches 0
7//!
8//! This prevents "file not found" errors when mergers delete segments
9//! that are still being used by active readers.
10
11use std::collections::HashMap;
12use std::sync::Arc;
13
14use parking_lot::RwLock;
15
16use crate::directories::Directory;
17use crate::segment::SegmentId;
18
19/// Tracks segment references and pending deletions
20pub struct SegmentTracker {
21    /// Segment ID -> reference count
22    ref_counts: RwLock<HashMap<String, usize>>,
23    /// Segments marked for deletion (will be deleted when ref count reaches 0)
24    pending_deletions: RwLock<HashMap<String, PendingDeletion>>,
25}
26
27/// Info about a segment pending deletion
28struct PendingDeletion {
29    segment_id: SegmentId,
30}
31
32impl SegmentTracker {
33    /// Create a new segment tracker
34    pub fn new() -> Self {
35        Self {
36            ref_counts: RwLock::new(HashMap::new()),
37            pending_deletions: RwLock::new(HashMap::new()),
38        }
39    }
40
41    /// Register a new segment (called when segment is committed)
42    pub fn register(&self, segment_id: &str) {
43        let mut refs = self.ref_counts.write();
44        refs.entry(segment_id.to_string()).or_insert(0);
45    }
46
47    /// Acquire references to a set of segments (called when taking a snapshot)
48    /// Returns the segment IDs that were successfully acquired
49    pub fn acquire(&self, segment_ids: &[String]) -> Vec<String> {
50        let mut refs = self.ref_counts.write();
51        let pending = self.pending_deletions.read();
52
53        let mut acquired = Vec::with_capacity(segment_ids.len());
54        for id in segment_ids {
55            // Don't acquire segments that are pending deletion
56            if pending.contains_key(id) {
57                continue;
58            }
59            *refs.entry(id.clone()).or_insert(0) += 1;
60            acquired.push(id.clone());
61        }
62        acquired
63    }
64
65    /// Release references to a set of segments (called when snapshot is dropped)
66    /// Returns segment IDs that are now ready for deletion (ref count hit 0 and marked for deletion)
67    pub fn release(&self, segment_ids: &[String]) -> Vec<SegmentId> {
68        let mut refs = self.ref_counts.write();
69        let mut pending = self.pending_deletions.write();
70
71        let mut ready_for_deletion = Vec::new();
72
73        for id in segment_ids {
74            if let Some(count) = refs.get_mut(id) {
75                *count = count.saturating_sub(1);
76
77                // If ref count is 0 and segment is pending deletion, it can be deleted
78                if *count == 0
79                    && let Some(deletion) = pending.remove(id)
80                {
81                    refs.remove(id);
82                    ready_for_deletion.push(deletion.segment_id);
83                }
84            }
85        }
86
87        ready_for_deletion
88    }
89
90    /// Mark segments for deletion (called after merge completes)
91    /// Segments with ref count 0 are returned immediately for deletion
92    /// Segments with refs > 0 are queued for deletion when refs are released
93    pub fn mark_for_deletion(&self, segment_ids: &[String]) -> Vec<SegmentId> {
94        let mut refs = self.ref_counts.write();
95        let mut pending = self.pending_deletions.write();
96
97        let mut ready_for_deletion = Vec::new();
98
99        for id_str in segment_ids {
100            let Some(segment_id) = SegmentId::from_hex(id_str) else {
101                continue;
102            };
103
104            let ref_count = refs.get(id_str).copied().unwrap_or(0);
105
106            if ref_count == 0 {
107                // No refs, can delete immediately
108                refs.remove(id_str);
109                ready_for_deletion.push(segment_id);
110            } else {
111                // Has refs, queue for deletion when refs are released
112                pending.insert(id_str.clone(), PendingDeletion { segment_id });
113            }
114        }
115
116        ready_for_deletion
117    }
118
119    /// Get current segment IDs (excluding those pending deletion)
120    pub fn get_active_segments(&self) -> Vec<String> {
121        let refs = self.ref_counts.read();
122        let pending = self.pending_deletions.read();
123
124        refs.keys()
125            .filter(|id| !pending.contains_key(*id))
126            .cloned()
127            .collect()
128    }
129
130    /// Get the number of active references for a segment
131    pub fn ref_count(&self, segment_id: &str) -> usize {
132        self.ref_counts.read().get(segment_id).copied().unwrap_or(0)
133    }
134
135    /// Check if a segment is pending deletion
136    pub fn is_pending_deletion(&self, segment_id: &str) -> bool {
137        self.pending_deletions.read().contains_key(segment_id)
138    }
139}
140
141impl Default for SegmentTracker {
142    fn default() -> Self {
143        Self::new()
144    }
145}
146
147/// RAII guard that holds references to a snapshot of segments
148/// When dropped, releases all segment references
149pub struct SegmentSnapshot<D: Directory + 'static> {
150    tracker: Arc<SegmentTracker>,
151    segment_ids: Vec<String>,
152    directory: Arc<D>,
153}
154
155impl<D: Directory + 'static> SegmentSnapshot<D> {
156    /// Create a new snapshot holding references to the given segments
157    pub fn new(tracker: Arc<SegmentTracker>, directory: Arc<D>, segment_ids: Vec<String>) -> Self {
158        // Acquire is already done by caller
159        Self {
160            tracker,
161            segment_ids,
162            directory,
163        }
164    }
165
166    /// Get the segment IDs in this snapshot
167    pub fn segment_ids(&self) -> &[String] {
168        &self.segment_ids
169    }
170
171    /// Check if this snapshot is empty
172    pub fn is_empty(&self) -> bool {
173        self.segment_ids.is_empty()
174    }
175
176    /// Get the number of segments in this snapshot
177    pub fn len(&self) -> usize {
178        self.segment_ids.len()
179    }
180}
181
182impl<D: Directory + 'static> Drop for SegmentSnapshot<D> {
183    fn drop(&mut self) {
184        // Just release refs - actual deletion handled by SegmentManager for native
185        let _to_delete = self.tracker.release(&self.segment_ids);
186        let _ = _to_delete; // Suppress unused warning
187        let _ = &self.directory; // Suppress unused warning
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194
195    // Valid 32-char hex segment IDs for testing
196    const SEG1: &str = "00000000000000000000000000000001";
197    const SEG2: &str = "00000000000000000000000000000002";
198    const SEG3: &str = "00000000000000000000000000000003";
199
200    #[test]
201    fn test_tracker_register_and_acquire() {
202        let tracker = SegmentTracker::new();
203
204        tracker.register(SEG1);
205        tracker.register(SEG2);
206
207        let acquired = tracker.acquire(&[SEG1.to_string(), SEG2.to_string()]);
208        assert_eq!(acquired.len(), 2);
209
210        assert_eq!(tracker.ref_count(SEG1), 1);
211        assert_eq!(tracker.ref_count(SEG2), 1);
212    }
213
214    #[test]
215    fn test_tracker_release() {
216        let tracker = SegmentTracker::new();
217
218        tracker.register(SEG1);
219        tracker.acquire(&[SEG1.to_string()]);
220        tracker.acquire(&[SEG1.to_string()]);
221
222        assert_eq!(tracker.ref_count(SEG1), 2);
223
224        tracker.release(&[SEG1.to_string()]);
225        assert_eq!(tracker.ref_count(SEG1), 1);
226
227        tracker.release(&[SEG1.to_string()]);
228        assert_eq!(tracker.ref_count(SEG1), 0);
229    }
230
231    #[test]
232    fn test_tracker_mark_for_deletion_no_refs() {
233        let tracker = SegmentTracker::new();
234
235        tracker.register(SEG1);
236
237        let ready = tracker.mark_for_deletion(&[SEG1.to_string()]);
238        assert_eq!(ready.len(), 1);
239        assert!(!tracker.is_pending_deletion(SEG1));
240    }
241
242    #[test]
243    fn test_tracker_mark_for_deletion_with_refs() {
244        let tracker = SegmentTracker::new();
245
246        tracker.register(SEG1);
247        tracker.acquire(&[SEG1.to_string()]);
248
249        let ready = tracker.mark_for_deletion(&[SEG1.to_string()]);
250        assert!(ready.is_empty());
251        assert!(tracker.is_pending_deletion(SEG1));
252
253        // Release should now return segment for deletion
254        let deleted = tracker.release(&[SEG1.to_string()]);
255        assert_eq!(deleted.len(), 1);
256        assert!(!tracker.is_pending_deletion(SEG1));
257    }
258
259    #[test]
260    fn test_tracker_active_segments() {
261        let tracker = SegmentTracker::new();
262
263        tracker.register(SEG1);
264        tracker.register(SEG2);
265        tracker.register(SEG3);
266
267        tracker.acquire(&[SEG2.to_string()]);
268        tracker.mark_for_deletion(&[SEG2.to_string()]);
269
270        let active = tracker.get_active_segments();
271        assert!(active.contains(&SEG1.to_string()));
272        assert!(!active.contains(&SEG2.to_string())); // pending deletion
273        assert!(active.contains(&SEG3.to_string()));
274    }
275}