Skip to main content

hermes_core/segment/
tracker.rs

1//! Segment lifecycle tracker with reference counting
2//!
3//! Provides safe segment deletion by tracking references:
4//! - Readers acquire segment snapshots (incrementing ref counts)
5//! - When snapshot is dropped, ref counts are decremented
6//! - Segments marked for deletion are only deleted when ref count reaches 0
7//!
8//! Uses a single `parking_lot::Mutex` for all state (sub-μs holds, needed for sync Drop).
9
10use std::collections::HashMap;
11use std::sync::Arc;
12
13use parking_lot::Mutex;
14
15use crate::segment::SegmentId;
16
17/// Internal state protected by single Mutex
18struct TrackerInner {
19    ref_counts: HashMap<String, usize>,
20    pending_deletions: HashMap<String, SegmentId>,
21}
22
23/// Tracks segment references and pending deletions
24pub struct SegmentTracker {
25    inner: Mutex<TrackerInner>,
26}
27
28impl SegmentTracker {
29    /// Create a new segment tracker
30    pub fn new() -> Self {
31        Self {
32            inner: Mutex::new(TrackerInner {
33                ref_counts: HashMap::new(),
34                pending_deletions: HashMap::new(),
35            }),
36        }
37    }
38
39    /// Register a new segment (called when segment is committed)
40    pub fn register(&self, segment_id: &str) {
41        let mut inner = self.inner.lock();
42        inner.ref_counts.entry(segment_id.to_string()).or_insert(0);
43    }
44
45    /// Acquire references to a set of segments (called when taking a snapshot)
46    /// Returns the segment IDs that were successfully acquired
47    pub fn acquire(&self, segment_ids: &[String]) -> Vec<String> {
48        let mut inner = self.inner.lock();
49        let mut acquired = Vec::with_capacity(segment_ids.len());
50        for id in segment_ids {
51            if inner.pending_deletions.contains_key(id) {
52                continue;
53            }
54            *inner.ref_counts.entry(id.clone()).or_insert(0) += 1;
55            acquired.push(id.clone());
56        }
57        acquired
58    }
59
60    /// Release references to a set of segments (called when snapshot is dropped)
61    /// Returns segment IDs that are now ready for deletion
62    pub fn release(&self, segment_ids: &[String]) -> Vec<SegmentId> {
63        let mut inner = self.inner.lock();
64        let mut ready_for_deletion = Vec::new();
65
66        for id in segment_ids {
67            if let Some(count) = inner.ref_counts.get_mut(id) {
68                *count = count.saturating_sub(1);
69
70                if *count == 0
71                    && let Some(segment_id) = inner.pending_deletions.remove(id)
72                {
73                    inner.ref_counts.remove(id);
74                    ready_for_deletion.push(segment_id);
75                }
76            }
77        }
78
79        ready_for_deletion
80    }
81
82    /// Mark segments for deletion (called after merge completes)
83    /// Segments with ref count 0 are returned immediately for deletion
84    /// Segments with refs > 0 are queued for deletion when refs are released
85    pub fn mark_for_deletion(&self, segment_ids: &[String]) -> Vec<SegmentId> {
86        let mut inner = self.inner.lock();
87        let mut ready_for_deletion = Vec::new();
88
89        for id_str in segment_ids {
90            let Some(segment_id) = SegmentId::from_hex(id_str) else {
91                continue;
92            };
93
94            // Skip segments not tracked (already deleted or never registered)
95            let Some(&ref_count) = inner.ref_counts.get(id_str) else {
96                continue;
97            };
98
99            if ref_count == 0 {
100                inner.ref_counts.remove(id_str);
101                ready_for_deletion.push(segment_id);
102            } else {
103                inner.pending_deletions.insert(id_str.clone(), segment_id);
104            }
105        }
106
107        ready_for_deletion
108    }
109
110    /// Check if a segment is pending deletion
111    pub fn is_pending_deletion(&self, segment_id: &str) -> bool {
112        self.inner.lock().pending_deletions.contains_key(segment_id)
113    }
114
115    /// Get the number of active references for a segment
116    pub fn ref_count(&self, segment_id: &str) -> usize {
117        self.inner
118            .lock()
119            .ref_counts
120            .get(segment_id)
121            .copied()
122            .unwrap_or(0)
123    }
124}
125
126impl Default for SegmentTracker {
127    fn default() -> Self {
128        Self::new()
129    }
130}
131
132/// RAII guard that holds references to a snapshot of segments.
133/// When dropped, releases all segment references and triggers deferred deletion.
134///
135/// Not generic over Directory — the delete callback abstracts away directory access.
136pub struct SegmentSnapshot {
137    tracker: Arc<SegmentTracker>,
138    segment_ids: Vec<String>,
139    /// Callback to delete segment files when they become ready for deletion.
140    delete_fn: Option<Arc<dyn Fn(Vec<SegmentId>) + Send + Sync>>,
141}
142
143impl SegmentSnapshot {
144    /// Create a new snapshot holding references to the given segments
145    pub fn new(tracker: Arc<SegmentTracker>, segment_ids: Vec<String>) -> Self {
146        Self {
147            tracker,
148            segment_ids,
149            delete_fn: None,
150        }
151    }
152
153    /// Create a snapshot with a deletion callback for deferred segment cleanup
154    pub fn with_delete_fn(
155        tracker: Arc<SegmentTracker>,
156        segment_ids: Vec<String>,
157        delete_fn: Arc<dyn Fn(Vec<SegmentId>) + Send + Sync>,
158    ) -> Self {
159        Self {
160            tracker,
161            segment_ids,
162            delete_fn: Some(delete_fn),
163        }
164    }
165
166    /// Get the segment IDs in this snapshot
167    pub fn segment_ids(&self) -> &[String] {
168        &self.segment_ids
169    }
170
171    /// Check if this snapshot is empty
172    pub fn is_empty(&self) -> bool {
173        self.segment_ids.is_empty()
174    }
175
176    /// Get the number of segments in this snapshot
177    pub fn len(&self) -> usize {
178        self.segment_ids.len()
179    }
180}
181
182impl Drop for SegmentSnapshot {
183    fn drop(&mut self) {
184        let to_delete = self.tracker.release(&self.segment_ids);
185        if !to_delete.is_empty() {
186            if let Some(delete_fn) = &self.delete_fn {
187                log::info!(
188                    "[segment_snapshot] dropping snapshot, deleting {} deferred segments",
189                    to_delete.len()
190                );
191                delete_fn(to_delete);
192            } else {
193                log::warn!(
194                    "[segment_snapshot] {} segments ready for deletion but no delete_fn provided",
195                    to_delete.len()
196                );
197            }
198        }
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205
206    const SEG1: &str = "00000000000000000000000000000001";
207    const SEG2: &str = "00000000000000000000000000000002";
208
209    #[test]
210    fn test_tracker_register_and_acquire() {
211        let tracker = SegmentTracker::new();
212
213        tracker.register(SEG1);
214        tracker.register(SEG2);
215
216        let acquired = tracker.acquire(&[SEG1.to_string(), SEG2.to_string()]);
217        assert_eq!(acquired.len(), 2);
218
219        assert_eq!(tracker.ref_count(SEG1), 1);
220        assert_eq!(tracker.ref_count(SEG2), 1);
221    }
222
223    #[test]
224    fn test_tracker_release() {
225        let tracker = SegmentTracker::new();
226
227        tracker.register(SEG1);
228        tracker.acquire(&[SEG1.to_string()]);
229        tracker.acquire(&[SEG1.to_string()]);
230
231        assert_eq!(tracker.ref_count(SEG1), 2);
232
233        tracker.release(&[SEG1.to_string()]);
234        assert_eq!(tracker.ref_count(SEG1), 1);
235
236        tracker.release(&[SEG1.to_string()]);
237        assert_eq!(tracker.ref_count(SEG1), 0);
238    }
239
240    #[test]
241    fn test_tracker_mark_for_deletion_no_refs() {
242        let tracker = SegmentTracker::new();
243
244        tracker.register(SEG1);
245
246        let ready = tracker.mark_for_deletion(&[SEG1.to_string()]);
247        assert_eq!(ready.len(), 1);
248        assert!(!tracker.is_pending_deletion(SEG1));
249    }
250
251    #[test]
252    fn test_tracker_mark_for_deletion_with_refs() {
253        let tracker = SegmentTracker::new();
254
255        tracker.register(SEG1);
256        tracker.acquire(&[SEG1.to_string()]);
257
258        let ready = tracker.mark_for_deletion(&[SEG1.to_string()]);
259        assert!(ready.is_empty());
260        assert!(tracker.is_pending_deletion(SEG1));
261
262        let deleted = tracker.release(&[SEG1.to_string()]);
263        assert_eq!(deleted.len(), 1);
264        assert!(!tracker.is_pending_deletion(SEG1));
265    }
266
267    #[test]
268    fn test_tracker_double_mark_for_deletion() {
269        let tracker = SegmentTracker::new();
270        tracker.register(SEG1);
271
272        let ready1 = tracker.mark_for_deletion(&[SEG1.to_string()]);
273        assert_eq!(ready1.len(), 1);
274
275        // Second mark: segment already removed from ref_counts, should return empty
276        let ready2 = tracker.mark_for_deletion(&[SEG1.to_string()]);
277        assert!(ready2.is_empty());
278    }
279
280    #[test]
281    fn test_tracker_acquire_unregistered() {
282        let tracker = SegmentTracker::new();
283
284        // Acquire a segment that was never registered — or_insert(0) makes it ref_count=1
285        let acquired = tracker.acquire(&[SEG1.to_string()]);
286        assert_eq!(acquired.len(), 1);
287        assert_eq!(tracker.ref_count(SEG1), 1);
288    }
289
290    #[test]
291    fn test_tracker_release_without_acquire() {
292        let tracker = SegmentTracker::new();
293        tracker.register(SEG1);
294
295        // Release without acquire — should not panic, ref stays at 0 (saturating_sub)
296        let deleted = tracker.release(&[SEG1.to_string()]);
297        assert!(deleted.is_empty());
298        assert_eq!(tracker.ref_count(SEG1), 0);
299    }
300
301    #[test]
302    fn test_snapshot_drop_triggers_deferred_delete() {
303        use std::sync::atomic::{AtomicUsize, Ordering};
304
305        let tracker = Arc::new(SegmentTracker::new());
306        tracker.register(SEG1);
307        tracker.register(SEG2);
308
309        let delete_count = Arc::new(AtomicUsize::new(0));
310        let dc = Arc::clone(&delete_count);
311        let delete_fn: Arc<dyn Fn(Vec<SegmentId>) + Send + Sync> = Arc::new(move |ids| {
312            dc.fetch_add(ids.len(), Ordering::SeqCst);
313        });
314
315        // Take a snapshot holding refs to both segments
316        let acquired = tracker.acquire(&[SEG1.to_string(), SEG2.to_string()]);
317        let snapshot =
318            SegmentSnapshot::with_delete_fn(Arc::clone(&tracker), acquired, Arc::clone(&delete_fn));
319
320        // Mark both for deletion — should be deferred (refs > 0)
321        let ready = tracker.mark_for_deletion(&[SEG1.to_string(), SEG2.to_string()]);
322        assert!(ready.is_empty());
323        assert!(tracker.is_pending_deletion(SEG1));
324        assert!(tracker.is_pending_deletion(SEG2));
325
326        // Drop snapshot → refs go to 0 → delete_fn called
327        drop(snapshot);
328        assert_eq!(delete_count.load(Ordering::SeqCst), 2);
329        assert!(!tracker.is_pending_deletion(SEG1));
330        assert!(!tracker.is_pending_deletion(SEG2));
331    }
332}