Skip to main content

hermes_core/segment/
tracker.rs

1//! Segment lifecycle tracker with reference counting
2//!
3//! This module provides safe segment deletion by tracking references:
4//! - Readers acquire segment snapshots (incrementing ref counts)
5//! - When snapshot is dropped, ref counts are decremented
6//! - Segments marked for deletion are only deleted when ref count reaches 0
7//!
8//! This prevents "file not found" errors when mergers delete segments
9//! that are still being used by active readers.
10
11use std::collections::HashMap;
12use std::sync::Arc;
13
14use parking_lot::RwLock;
15
16use crate::directories::Directory;
17use crate::segment::SegmentId;
18
19/// Tracks segment references and pending deletions
20pub struct SegmentTracker {
21    /// Segment ID -> reference count
22    ref_counts: RwLock<HashMap<String, usize>>,
23    /// Segments marked for deletion (will be deleted when ref count reaches 0)
24    pending_deletions: RwLock<HashMap<String, PendingDeletion>>,
25}
26
27/// Info about a segment pending deletion
28struct PendingDeletion {
29    segment_id: SegmentId,
30}
31
32impl SegmentTracker {
33    /// Create a new segment tracker
34    pub fn new() -> Self {
35        Self {
36            ref_counts: RwLock::new(HashMap::new()),
37            pending_deletions: RwLock::new(HashMap::new()),
38        }
39    }
40
41    /// Register a new segment (called when segment is committed)
42    pub fn register(&self, segment_id: &str) {
43        let mut refs = self.ref_counts.write();
44        refs.entry(segment_id.to_string()).or_insert(0);
45    }
46
47    /// Acquire references to a set of segments (called when taking a snapshot)
48    /// Returns the segment IDs that were successfully acquired
49    pub fn acquire(&self, segment_ids: &[String]) -> Vec<String> {
50        let mut refs = self.ref_counts.write();
51        let pending = self.pending_deletions.read();
52
53        let mut acquired = Vec::with_capacity(segment_ids.len());
54        for id in segment_ids {
55            // Don't acquire segments that are pending deletion
56            if pending.contains_key(id) {
57                continue;
58            }
59            *refs.entry(id.clone()).or_insert(0) += 1;
60            acquired.push(id.clone());
61        }
62        acquired
63    }
64
65    /// Release references to a set of segments (called when snapshot is dropped)
66    /// Returns segment IDs that are now ready for deletion (ref count hit 0 and marked for deletion)
67    pub fn release(&self, segment_ids: &[String]) -> Vec<SegmentId> {
68        let mut refs = self.ref_counts.write();
69        let mut pending = self.pending_deletions.write();
70
71        let mut ready_for_deletion = Vec::new();
72
73        for id in segment_ids {
74            if let Some(count) = refs.get_mut(id) {
75                *count = count.saturating_sub(1);
76
77                // If ref count is 0 and segment is pending deletion, it can be deleted
78                if *count == 0
79                    && let Some(deletion) = pending.remove(id)
80                {
81                    refs.remove(id);
82                    ready_for_deletion.push(deletion.segment_id);
83                }
84            }
85        }
86
87        ready_for_deletion
88    }
89
90    /// Mark segments for deletion (called after merge completes)
91    /// Segments with ref count 0 are returned immediately for deletion
92    /// Segments with refs > 0 are queued for deletion when refs are released
93    pub fn mark_for_deletion(&self, segment_ids: &[String]) -> Vec<SegmentId> {
94        let mut refs = self.ref_counts.write();
95        let mut pending = self.pending_deletions.write();
96
97        let mut ready_for_deletion = Vec::new();
98
99        for id_str in segment_ids {
100            let Some(segment_id) = SegmentId::from_hex(id_str) else {
101                continue;
102            };
103
104            let ref_count = refs.get(id_str).copied().unwrap_or(0);
105
106            if ref_count == 0 {
107                // No refs, can delete immediately
108                refs.remove(id_str);
109                ready_for_deletion.push(segment_id);
110            } else {
111                // Has refs, queue for deletion when refs are released
112                pending.insert(id_str.clone(), PendingDeletion { segment_id });
113            }
114        }
115
116        ready_for_deletion
117    }
118
119    /// Get current segment IDs (excluding those pending deletion)
120    pub fn get_active_segments(&self) -> Vec<String> {
121        let refs = self.ref_counts.read();
122        let pending = self.pending_deletions.read();
123
124        refs.keys()
125            .filter(|id| !pending.contains_key(*id))
126            .cloned()
127            .collect()
128    }
129
130    /// Get the number of active references for a segment
131    pub fn ref_count(&self, segment_id: &str) -> usize {
132        self.ref_counts.read().get(segment_id).copied().unwrap_or(0)
133    }
134
135    /// Check if a segment is pending deletion
136    pub fn is_pending_deletion(&self, segment_id: &str) -> bool {
137        self.pending_deletions.read().contains_key(segment_id)
138    }
139}
140
141impl Default for SegmentTracker {
142    fn default() -> Self {
143        Self::new()
144    }
145}
146
147/// RAII guard that holds references to a snapshot of segments
148/// When dropped, releases all segment references and deletes any
149/// segments that were pending deletion and have no remaining references.
150pub struct SegmentSnapshot<D: Directory + 'static> {
151    tracker: Arc<SegmentTracker>,
152    segment_ids: Vec<String>,
153    /// Kept to satisfy the generic parameter; directory access is via delete_fn closure.
154    _directory: std::marker::PhantomData<Arc<D>>,
155    /// Callback to delete segment files when they become ready for deletion.
156    /// Provided by SegmentManager for native builds; None for read-only paths.
157    delete_fn: Option<Arc<dyn Fn(Vec<SegmentId>) + Send + Sync>>,
158}
159
160impl<D: Directory + 'static> SegmentSnapshot<D> {
161    /// Create a new snapshot holding references to the given segments
162    pub fn new(tracker: Arc<SegmentTracker>, segment_ids: Vec<String>) -> Self {
163        Self {
164            tracker,
165            segment_ids,
166            _directory: std::marker::PhantomData,
167            delete_fn: None,
168        }
169    }
170
171    /// Create a snapshot with a deletion callback for deferred segment cleanup
172    pub fn with_delete_fn(
173        tracker: Arc<SegmentTracker>,
174        segment_ids: Vec<String>,
175        delete_fn: Arc<dyn Fn(Vec<SegmentId>) + Send + Sync>,
176    ) -> Self {
177        Self {
178            tracker,
179            segment_ids,
180            _directory: std::marker::PhantomData,
181            delete_fn: Some(delete_fn),
182        }
183    }
184
185    /// Get the segment IDs in this snapshot
186    pub fn segment_ids(&self) -> &[String] {
187        &self.segment_ids
188    }
189
190    /// Check if this snapshot is empty
191    pub fn is_empty(&self) -> bool {
192        self.segment_ids.is_empty()
193    }
194
195    /// Get the number of segments in this snapshot
196    pub fn len(&self) -> usize {
197        self.segment_ids.len()
198    }
199}
200
201impl<D: Directory + 'static> Drop for SegmentSnapshot<D> {
202    fn drop(&mut self) {
203        let to_delete = self.tracker.release(&self.segment_ids);
204        if !to_delete.is_empty() {
205            if let Some(delete_fn) = &self.delete_fn {
206                log::info!(
207                    "[segment_snapshot] dropping snapshot, deleting {} deferred segments",
208                    to_delete.len()
209                );
210                delete_fn(to_delete);
211            } else {
212                log::warn!(
213                    "[segment_snapshot] {} segments ready for deletion but no delete_fn provided",
214                    to_delete.len()
215                );
216            }
217        }
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224
225    // Valid 32-char hex segment IDs for testing
226    const SEG1: &str = "00000000000000000000000000000001";
227    const SEG2: &str = "00000000000000000000000000000002";
228    const SEG3: &str = "00000000000000000000000000000003";
229
230    #[test]
231    fn test_tracker_register_and_acquire() {
232        let tracker = SegmentTracker::new();
233
234        tracker.register(SEG1);
235        tracker.register(SEG2);
236
237        let acquired = tracker.acquire(&[SEG1.to_string(), SEG2.to_string()]);
238        assert_eq!(acquired.len(), 2);
239
240        assert_eq!(tracker.ref_count(SEG1), 1);
241        assert_eq!(tracker.ref_count(SEG2), 1);
242    }
243
244    #[test]
245    fn test_tracker_release() {
246        let tracker = SegmentTracker::new();
247
248        tracker.register(SEG1);
249        tracker.acquire(&[SEG1.to_string()]);
250        tracker.acquire(&[SEG1.to_string()]);
251
252        assert_eq!(tracker.ref_count(SEG1), 2);
253
254        tracker.release(&[SEG1.to_string()]);
255        assert_eq!(tracker.ref_count(SEG1), 1);
256
257        tracker.release(&[SEG1.to_string()]);
258        assert_eq!(tracker.ref_count(SEG1), 0);
259    }
260
261    #[test]
262    fn test_tracker_mark_for_deletion_no_refs() {
263        let tracker = SegmentTracker::new();
264
265        tracker.register(SEG1);
266
267        let ready = tracker.mark_for_deletion(&[SEG1.to_string()]);
268        assert_eq!(ready.len(), 1);
269        assert!(!tracker.is_pending_deletion(SEG1));
270    }
271
272    #[test]
273    fn test_tracker_mark_for_deletion_with_refs() {
274        let tracker = SegmentTracker::new();
275
276        tracker.register(SEG1);
277        tracker.acquire(&[SEG1.to_string()]);
278
279        let ready = tracker.mark_for_deletion(&[SEG1.to_string()]);
280        assert!(ready.is_empty());
281        assert!(tracker.is_pending_deletion(SEG1));
282
283        // Release should now return segment for deletion
284        let deleted = tracker.release(&[SEG1.to_string()]);
285        assert_eq!(deleted.len(), 1);
286        assert!(!tracker.is_pending_deletion(SEG1));
287    }
288
289    #[test]
290    fn test_tracker_active_segments() {
291        let tracker = SegmentTracker::new();
292
293        tracker.register(SEG1);
294        tracker.register(SEG2);
295        tracker.register(SEG3);
296
297        tracker.acquire(&[SEG2.to_string()]);
298        tracker.mark_for_deletion(&[SEG2.to_string()]);
299
300        let active = tracker.get_active_segments();
301        assert!(active.contains(&SEG1.to_string()));
302        assert!(!active.contains(&SEG2.to_string())); // pending deletion
303        assert!(active.contains(&SEG3.to_string()));
304    }
305}