hermes_core/segment/
tracker.rs1use std::collections::HashMap;
11use std::sync::Arc;
12
13use parking_lot::Mutex;
14
15use crate::segment::SegmentId;
16
17struct TrackerInner {
19 ref_counts: HashMap<String, usize>,
20 pending_deletions: HashMap<String, SegmentId>,
21}
22
23pub struct SegmentTracker {
25 inner: Mutex<TrackerInner>,
26}
27
28impl SegmentTracker {
29 pub fn new() -> Self {
31 Self {
32 inner: Mutex::new(TrackerInner {
33 ref_counts: HashMap::new(),
34 pending_deletions: HashMap::new(),
35 }),
36 }
37 }
38
39 pub fn register(&self, segment_id: &str) {
41 let mut inner = self.inner.lock();
42 inner.ref_counts.entry(segment_id.to_string()).or_insert(0);
43 }
44
45 pub fn acquire(&self, segment_ids: &[String]) -> Vec<String> {
48 let mut inner = self.inner.lock();
49 let mut acquired = Vec::with_capacity(segment_ids.len());
50 for id in segment_ids {
51 if inner.pending_deletions.contains_key(id) {
52 continue;
53 }
54 *inner.ref_counts.entry(id.clone()).or_insert(0) += 1;
55 acquired.push(id.clone());
56 }
57 acquired
58 }
59
60 pub fn release(&self, segment_ids: &[String]) -> Vec<SegmentId> {
63 let mut inner = self.inner.lock();
64 let mut ready_for_deletion = Vec::new();
65
66 for id in segment_ids {
67 if let Some(count) = inner.ref_counts.get_mut(id) {
68 *count = count.saturating_sub(1);
69
70 if *count == 0
71 && let Some(segment_id) = inner.pending_deletions.remove(id)
72 {
73 inner.ref_counts.remove(id);
74 ready_for_deletion.push(segment_id);
75 }
76 }
77 }
78
79 ready_for_deletion
80 }
81
82 pub fn mark_for_deletion(&self, segment_ids: &[String]) -> Vec<SegmentId> {
86 let mut inner = self.inner.lock();
87 let mut ready_for_deletion = Vec::new();
88
89 for id_str in segment_ids {
90 let Some(segment_id) = SegmentId::from_hex(id_str) else {
91 continue;
92 };
93
94 let Some(&ref_count) = inner.ref_counts.get(id_str) else {
96 continue;
97 };
98
99 if ref_count == 0 {
100 inner.ref_counts.remove(id_str);
101 ready_for_deletion.push(segment_id);
102 } else {
103 inner.pending_deletions.insert(id_str.clone(), segment_id);
104 }
105 }
106
107 ready_for_deletion
108 }
109
110 pub fn is_pending_deletion(&self, segment_id: &str) -> bool {
112 self.inner.lock().pending_deletions.contains_key(segment_id)
113 }
114
115 pub fn ref_count(&self, segment_id: &str) -> usize {
117 self.inner
118 .lock()
119 .ref_counts
120 .get(segment_id)
121 .copied()
122 .unwrap_or(0)
123 }
124}
125
126impl Default for SegmentTracker {
127 fn default() -> Self {
128 Self::new()
129 }
130}
131
132pub struct SegmentSnapshot {
137 tracker: Arc<SegmentTracker>,
138 segment_ids: Vec<String>,
139 delete_fn: Option<Arc<dyn Fn(Vec<SegmentId>) + Send + Sync>>,
141}
142
143impl SegmentSnapshot {
144 pub fn new(tracker: Arc<SegmentTracker>, segment_ids: Vec<String>) -> Self {
146 Self {
147 tracker,
148 segment_ids,
149 delete_fn: None,
150 }
151 }
152
153 pub fn with_delete_fn(
155 tracker: Arc<SegmentTracker>,
156 segment_ids: Vec<String>,
157 delete_fn: Arc<dyn Fn(Vec<SegmentId>) + Send + Sync>,
158 ) -> Self {
159 Self {
160 tracker,
161 segment_ids,
162 delete_fn: Some(delete_fn),
163 }
164 }
165
166 pub fn segment_ids(&self) -> &[String] {
168 &self.segment_ids
169 }
170
171 pub fn is_empty(&self) -> bool {
173 self.segment_ids.is_empty()
174 }
175
176 pub fn len(&self) -> usize {
178 self.segment_ids.len()
179 }
180}
181
182impl Drop for SegmentSnapshot {
183 fn drop(&mut self) {
184 let to_delete = self.tracker.release(&self.segment_ids);
185 if !to_delete.is_empty() {
186 if let Some(delete_fn) = &self.delete_fn {
187 log::info!(
188 "[segment_snapshot] dropping snapshot, deleting {} deferred segments",
189 to_delete.len()
190 );
191 delete_fn(to_delete);
192 } else {
193 log::warn!(
194 "[segment_snapshot] {} segments ready for deletion but no delete_fn provided",
195 to_delete.len()
196 );
197 }
198 }
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205
206 const SEG1: &str = "00000000000000000000000000000001";
207 const SEG2: &str = "00000000000000000000000000000002";
208
209 #[test]
210 fn test_tracker_register_and_acquire() {
211 let tracker = SegmentTracker::new();
212
213 tracker.register(SEG1);
214 tracker.register(SEG2);
215
216 let acquired = tracker.acquire(&[SEG1.to_string(), SEG2.to_string()]);
217 assert_eq!(acquired.len(), 2);
218
219 assert_eq!(tracker.ref_count(SEG1), 1);
220 assert_eq!(tracker.ref_count(SEG2), 1);
221 }
222
223 #[test]
224 fn test_tracker_release() {
225 let tracker = SegmentTracker::new();
226
227 tracker.register(SEG1);
228 tracker.acquire(&[SEG1.to_string()]);
229 tracker.acquire(&[SEG1.to_string()]);
230
231 assert_eq!(tracker.ref_count(SEG1), 2);
232
233 tracker.release(&[SEG1.to_string()]);
234 assert_eq!(tracker.ref_count(SEG1), 1);
235
236 tracker.release(&[SEG1.to_string()]);
237 assert_eq!(tracker.ref_count(SEG1), 0);
238 }
239
240 #[test]
241 fn test_tracker_mark_for_deletion_no_refs() {
242 let tracker = SegmentTracker::new();
243
244 tracker.register(SEG1);
245
246 let ready = tracker.mark_for_deletion(&[SEG1.to_string()]);
247 assert_eq!(ready.len(), 1);
248 assert!(!tracker.is_pending_deletion(SEG1));
249 }
250
251 #[test]
252 fn test_tracker_mark_for_deletion_with_refs() {
253 let tracker = SegmentTracker::new();
254
255 tracker.register(SEG1);
256 tracker.acquire(&[SEG1.to_string()]);
257
258 let ready = tracker.mark_for_deletion(&[SEG1.to_string()]);
259 assert!(ready.is_empty());
260 assert!(tracker.is_pending_deletion(SEG1));
261
262 let deleted = tracker.release(&[SEG1.to_string()]);
263 assert_eq!(deleted.len(), 1);
264 assert!(!tracker.is_pending_deletion(SEG1));
265 }
266
267 #[test]
268 fn test_tracker_double_mark_for_deletion() {
269 let tracker = SegmentTracker::new();
270 tracker.register(SEG1);
271
272 let ready1 = tracker.mark_for_deletion(&[SEG1.to_string()]);
273 assert_eq!(ready1.len(), 1);
274
275 let ready2 = tracker.mark_for_deletion(&[SEG1.to_string()]);
277 assert!(ready2.is_empty());
278 }
279
280 #[test]
281 fn test_tracker_acquire_unregistered() {
282 let tracker = SegmentTracker::new();
283
284 let acquired = tracker.acquire(&[SEG1.to_string()]);
286 assert_eq!(acquired.len(), 1);
287 assert_eq!(tracker.ref_count(SEG1), 1);
288 }
289
290 #[test]
291 fn test_tracker_release_without_acquire() {
292 let tracker = SegmentTracker::new();
293 tracker.register(SEG1);
294
295 let deleted = tracker.release(&[SEG1.to_string()]);
297 assert!(deleted.is_empty());
298 assert_eq!(tracker.ref_count(SEG1), 0);
299 }
300
301 #[test]
302 fn test_snapshot_drop_triggers_deferred_delete() {
303 use std::sync::atomic::{AtomicUsize, Ordering};
304
305 let tracker = Arc::new(SegmentTracker::new());
306 tracker.register(SEG1);
307 tracker.register(SEG2);
308
309 let delete_count = Arc::new(AtomicUsize::new(0));
310 let dc = Arc::clone(&delete_count);
311 let delete_fn: Arc<dyn Fn(Vec<SegmentId>) + Send + Sync> = Arc::new(move |ids| {
312 dc.fetch_add(ids.len(), Ordering::SeqCst);
313 });
314
315 let acquired = tracker.acquire(&[SEG1.to_string(), SEG2.to_string()]);
317 let snapshot =
318 SegmentSnapshot::with_delete_fn(Arc::clone(&tracker), acquired, Arc::clone(&delete_fn));
319
320 let ready = tracker.mark_for_deletion(&[SEG1.to_string(), SEG2.to_string()]);
322 assert!(ready.is_empty());
323 assert!(tracker.is_pending_deletion(SEG1));
324 assert!(tracker.is_pending_deletion(SEG2));
325
326 drop(snapshot);
328 assert_eq!(delete_count.load(Ordering::SeqCst), 2);
329 assert!(!tracker.is_pending_deletion(SEG1));
330 assert!(!tracker.is_pending_deletion(SEG2));
331 }
332}