Skip to main content

hermes_core/segment/
tracker.rs

1//! Segment lifecycle tracker with reference counting
2//!
3//! Provides safe segment deletion by tracking references:
4//! - Readers acquire segment snapshots (incrementing ref counts)
5//! - When snapshot is dropped, ref counts are decremented
6//! - Segments marked for deletion are only deleted when ref count reaches 0
7//!
8//! Uses a single `parking_lot::Mutex` for all state (sub-μs holds, needed for sync Drop).
9
10use std::collections::HashMap;
11use std::sync::Arc;
12
13use parking_lot::Mutex;
14
15use crate::segment::SegmentId;
16
17/// Internal state protected by single Mutex
18struct TrackerInner {
19    ref_counts: HashMap<String, usize>,
20    pending_deletions: HashMap<String, SegmentId>,
21}
22
23/// Tracks segment references and pending deletions
24pub struct SegmentTracker {
25    inner: Mutex<TrackerInner>,
26}
27
28impl SegmentTracker {
29    /// Create a new segment tracker
30    pub fn new() -> Self {
31        Self {
32            inner: Mutex::new(TrackerInner {
33                ref_counts: HashMap::new(),
34                pending_deletions: HashMap::new(),
35            }),
36        }
37    }
38
39    /// Register a new segment (called when segment is committed)
40    pub fn register(&self, segment_id: &str) {
41        let mut inner = self.inner.lock();
42        inner.ref_counts.entry(segment_id.to_string()).or_insert(0);
43    }
44
45    /// Acquire references to a set of segments (called when taking a snapshot)
46    /// Returns the segment IDs that were successfully acquired
47    pub fn acquire(&self, segment_ids: &[String]) -> Vec<String> {
48        let mut inner = self.inner.lock();
49        let mut acquired = Vec::with_capacity(segment_ids.len());
50        for id in segment_ids {
51            if inner.pending_deletions.contains_key(id) {
52                continue;
53            }
54            *inner.ref_counts.entry(id.clone()).or_insert(0) += 1;
55            acquired.push(id.clone());
56        }
57        acquired
58    }
59
60    /// Release references to a set of segments (called when snapshot is dropped)
61    /// Returns segment IDs that are now ready for deletion
62    pub fn release(&self, segment_ids: &[String]) -> Vec<SegmentId> {
63        let mut inner = self.inner.lock();
64        let mut ready_for_deletion = Vec::new();
65
66        for id in segment_ids {
67            if let Some(count) = inner.ref_counts.get_mut(id) {
68                *count = count.saturating_sub(1);
69
70                if *count == 0
71                    && let Some(segment_id) = inner.pending_deletions.remove(id)
72                {
73                    inner.ref_counts.remove(id);
74                    ready_for_deletion.push(segment_id);
75                }
76            }
77        }
78
79        ready_for_deletion
80    }
81
82    /// Mark segments for deletion (called after merge completes)
83    /// Segments with ref count 0 are returned immediately for deletion
84    /// Segments with refs > 0 are queued for deletion when refs are released
85    pub fn mark_for_deletion(&self, segment_ids: &[String]) -> Vec<SegmentId> {
86        let mut inner = self.inner.lock();
87        let mut ready_for_deletion = Vec::new();
88
89        for id_str in segment_ids {
90            let Some(segment_id) = SegmentId::from_hex(id_str) else {
91                continue;
92            };
93
94            let ref_count = inner.ref_counts.get(id_str).copied().unwrap_or(0);
95
96            if ref_count == 0 {
97                inner.ref_counts.remove(id_str);
98                ready_for_deletion.push(segment_id);
99            } else {
100                inner.pending_deletions.insert(id_str.clone(), segment_id);
101            }
102        }
103
104        ready_for_deletion
105    }
106
107    /// Check if a segment is pending deletion
108    pub fn is_pending_deletion(&self, segment_id: &str) -> bool {
109        self.inner.lock().pending_deletions.contains_key(segment_id)
110    }
111
112    /// Get the number of active references for a segment
113    pub fn ref_count(&self, segment_id: &str) -> usize {
114        self.inner
115            .lock()
116            .ref_counts
117            .get(segment_id)
118            .copied()
119            .unwrap_or(0)
120    }
121
122    /// Get current segment IDs (excluding those pending deletion)
123    pub fn get_active_segments(&self) -> Vec<String> {
124        let inner = self.inner.lock();
125        inner
126            .ref_counts
127            .keys()
128            .filter(|id| !inner.pending_deletions.contains_key(*id))
129            .cloned()
130            .collect()
131    }
132}
133
134impl Default for SegmentTracker {
135    fn default() -> Self {
136        Self::new()
137    }
138}
139
140/// RAII guard that holds references to a snapshot of segments.
141/// When dropped, releases all segment references and triggers deferred deletion.
142///
143/// Not generic over Directory — the delete callback abstracts away directory access.
144pub struct SegmentSnapshot {
145    tracker: Arc<SegmentTracker>,
146    segment_ids: Vec<String>,
147    /// Callback to delete segment files when they become ready for deletion.
148    delete_fn: Option<Arc<dyn Fn(Vec<SegmentId>) + Send + Sync>>,
149}
150
151impl SegmentSnapshot {
152    /// Create a new snapshot holding references to the given segments
153    pub fn new(tracker: Arc<SegmentTracker>, segment_ids: Vec<String>) -> Self {
154        Self {
155            tracker,
156            segment_ids,
157            delete_fn: None,
158        }
159    }
160
161    /// Create a snapshot with a deletion callback for deferred segment cleanup
162    pub fn with_delete_fn(
163        tracker: Arc<SegmentTracker>,
164        segment_ids: Vec<String>,
165        delete_fn: Arc<dyn Fn(Vec<SegmentId>) + Send + Sync>,
166    ) -> Self {
167        Self {
168            tracker,
169            segment_ids,
170            delete_fn: Some(delete_fn),
171        }
172    }
173
174    /// Get the segment IDs in this snapshot
175    pub fn segment_ids(&self) -> &[String] {
176        &self.segment_ids
177    }
178
179    /// Check if this snapshot is empty
180    pub fn is_empty(&self) -> bool {
181        self.segment_ids.is_empty()
182    }
183
184    /// Get the number of segments in this snapshot
185    pub fn len(&self) -> usize {
186        self.segment_ids.len()
187    }
188}
189
190impl Drop for SegmentSnapshot {
191    fn drop(&mut self) {
192        let to_delete = self.tracker.release(&self.segment_ids);
193        if !to_delete.is_empty() {
194            if let Some(delete_fn) = &self.delete_fn {
195                log::info!(
196                    "[segment_snapshot] dropping snapshot, deleting {} deferred segments",
197                    to_delete.len()
198                );
199                delete_fn(to_delete);
200            } else {
201                log::warn!(
202                    "[segment_snapshot] {} segments ready for deletion but no delete_fn provided",
203                    to_delete.len()
204                );
205            }
206        }
207    }
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213
214    const SEG1: &str = "00000000000000000000000000000001";
215    const SEG2: &str = "00000000000000000000000000000002";
216    const SEG3: &str = "00000000000000000000000000000003";
217
218    #[test]
219    fn test_tracker_register_and_acquire() {
220        let tracker = SegmentTracker::new();
221
222        tracker.register(SEG1);
223        tracker.register(SEG2);
224
225        let acquired = tracker.acquire(&[SEG1.to_string(), SEG2.to_string()]);
226        assert_eq!(acquired.len(), 2);
227
228        assert_eq!(tracker.ref_count(SEG1), 1);
229        assert_eq!(tracker.ref_count(SEG2), 1);
230    }
231
232    #[test]
233    fn test_tracker_release() {
234        let tracker = SegmentTracker::new();
235
236        tracker.register(SEG1);
237        tracker.acquire(&[SEG1.to_string()]);
238        tracker.acquire(&[SEG1.to_string()]);
239
240        assert_eq!(tracker.ref_count(SEG1), 2);
241
242        tracker.release(&[SEG1.to_string()]);
243        assert_eq!(tracker.ref_count(SEG1), 1);
244
245        tracker.release(&[SEG1.to_string()]);
246        assert_eq!(tracker.ref_count(SEG1), 0);
247    }
248
249    #[test]
250    fn test_tracker_mark_for_deletion_no_refs() {
251        let tracker = SegmentTracker::new();
252
253        tracker.register(SEG1);
254
255        let ready = tracker.mark_for_deletion(&[SEG1.to_string()]);
256        assert_eq!(ready.len(), 1);
257        assert!(!tracker.is_pending_deletion(SEG1));
258    }
259
260    #[test]
261    fn test_tracker_mark_for_deletion_with_refs() {
262        let tracker = SegmentTracker::new();
263
264        tracker.register(SEG1);
265        tracker.acquire(&[SEG1.to_string()]);
266
267        let ready = tracker.mark_for_deletion(&[SEG1.to_string()]);
268        assert!(ready.is_empty());
269        assert!(tracker.is_pending_deletion(SEG1));
270
271        let deleted = tracker.release(&[SEG1.to_string()]);
272        assert_eq!(deleted.len(), 1);
273        assert!(!tracker.is_pending_deletion(SEG1));
274    }
275
276    #[test]
277    fn test_tracker_active_segments() {
278        let tracker = SegmentTracker::new();
279
280        tracker.register(SEG1);
281        tracker.register(SEG2);
282        tracker.register(SEG3);
283
284        tracker.acquire(&[SEG2.to_string()]);
285        tracker.mark_for_deletion(&[SEG2.to_string()]);
286
287        let active = tracker.get_active_segments();
288        assert!(active.contains(&SEG1.to_string()));
289        assert!(!active.contains(&SEG2.to_string()));
290        assert!(active.contains(&SEG3.to_string()));
291    }
292}