hermes_core/segment/
tracker.rs1use std::collections::HashMap;
11use std::sync::Arc;
12
13use parking_lot::Mutex;
14
15use crate::segment::SegmentId;
16
17struct TrackerInner {
19 ref_counts: HashMap<String, usize>,
20 pending_deletions: HashMap<String, SegmentId>,
21}
22
23pub struct SegmentTracker {
25 inner: Mutex<TrackerInner>,
26}
27
28impl SegmentTracker {
29 pub fn new() -> Self {
31 Self {
32 inner: Mutex::new(TrackerInner {
33 ref_counts: HashMap::new(),
34 pending_deletions: HashMap::new(),
35 }),
36 }
37 }
38
39 pub fn register(&self, segment_id: &str) {
41 let mut inner = self.inner.lock();
42 inner.ref_counts.entry(segment_id.to_string()).or_insert(0);
43 }
44
45 pub fn acquire(&self, segment_ids: &[String]) -> Vec<String> {
48 let mut inner = self.inner.lock();
49 let mut acquired = Vec::with_capacity(segment_ids.len());
50 for id in segment_ids {
51 if inner.pending_deletions.contains_key(id) {
52 continue;
53 }
54 *inner.ref_counts.entry(id.clone()).or_insert(0) += 1;
55 acquired.push(id.clone());
56 }
57 acquired
58 }
59
60 pub fn release(&self, segment_ids: &[String]) -> Vec<SegmentId> {
63 let mut inner = self.inner.lock();
64 let mut ready_for_deletion = Vec::new();
65
66 for id in segment_ids {
67 if let Some(count) = inner.ref_counts.get_mut(id) {
68 *count = count.saturating_sub(1);
69
70 if *count == 0
71 && let Some(segment_id) = inner.pending_deletions.remove(id)
72 {
73 inner.ref_counts.remove(id);
74 ready_for_deletion.push(segment_id);
75 }
76 }
77 }
78
79 ready_for_deletion
80 }
81
82 pub fn mark_for_deletion(&self, segment_ids: &[String]) -> Vec<SegmentId> {
86 let mut inner = self.inner.lock();
87 let mut ready_for_deletion = Vec::new();
88
89 for id_str in segment_ids {
90 let Some(segment_id) = SegmentId::from_hex(id_str) else {
91 continue;
92 };
93
94 let ref_count = inner.ref_counts.get(id_str).copied().unwrap_or(0);
95
96 if ref_count == 0 {
97 inner.ref_counts.remove(id_str);
98 ready_for_deletion.push(segment_id);
99 } else {
100 inner.pending_deletions.insert(id_str.clone(), segment_id);
101 }
102 }
103
104 ready_for_deletion
105 }
106
107 pub fn is_pending_deletion(&self, segment_id: &str) -> bool {
109 self.inner.lock().pending_deletions.contains_key(segment_id)
110 }
111
112 pub fn ref_count(&self, segment_id: &str) -> usize {
114 self.inner
115 .lock()
116 .ref_counts
117 .get(segment_id)
118 .copied()
119 .unwrap_or(0)
120 }
121
122 pub fn get_active_segments(&self) -> Vec<String> {
124 let inner = self.inner.lock();
125 inner
126 .ref_counts
127 .keys()
128 .filter(|id| !inner.pending_deletions.contains_key(*id))
129 .cloned()
130 .collect()
131 }
132}
133
134impl Default for SegmentTracker {
135 fn default() -> Self {
136 Self::new()
137 }
138}
139
140pub struct SegmentSnapshot {
145 tracker: Arc<SegmentTracker>,
146 segment_ids: Vec<String>,
147 delete_fn: Option<Arc<dyn Fn(Vec<SegmentId>) + Send + Sync>>,
149}
150
151impl SegmentSnapshot {
152 pub fn new(tracker: Arc<SegmentTracker>, segment_ids: Vec<String>) -> Self {
154 Self {
155 tracker,
156 segment_ids,
157 delete_fn: None,
158 }
159 }
160
161 pub fn with_delete_fn(
163 tracker: Arc<SegmentTracker>,
164 segment_ids: Vec<String>,
165 delete_fn: Arc<dyn Fn(Vec<SegmentId>) + Send + Sync>,
166 ) -> Self {
167 Self {
168 tracker,
169 segment_ids,
170 delete_fn: Some(delete_fn),
171 }
172 }
173
174 pub fn segment_ids(&self) -> &[String] {
176 &self.segment_ids
177 }
178
179 pub fn is_empty(&self) -> bool {
181 self.segment_ids.is_empty()
182 }
183
184 pub fn len(&self) -> usize {
186 self.segment_ids.len()
187 }
188}
189
190impl Drop for SegmentSnapshot {
191 fn drop(&mut self) {
192 let to_delete = self.tracker.release(&self.segment_ids);
193 if !to_delete.is_empty() {
194 if let Some(delete_fn) = &self.delete_fn {
195 log::info!(
196 "[segment_snapshot] dropping snapshot, deleting {} deferred segments",
197 to_delete.len()
198 );
199 delete_fn(to_delete);
200 } else {
201 log::warn!(
202 "[segment_snapshot] {} segments ready for deletion but no delete_fn provided",
203 to_delete.len()
204 );
205 }
206 }
207 }
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213
214 const SEG1: &str = "00000000000000000000000000000001";
215 const SEG2: &str = "00000000000000000000000000000002";
216 const SEG3: &str = "00000000000000000000000000000003";
217
218 #[test]
219 fn test_tracker_register_and_acquire() {
220 let tracker = SegmentTracker::new();
221
222 tracker.register(SEG1);
223 tracker.register(SEG2);
224
225 let acquired = tracker.acquire(&[SEG1.to_string(), SEG2.to_string()]);
226 assert_eq!(acquired.len(), 2);
227
228 assert_eq!(tracker.ref_count(SEG1), 1);
229 assert_eq!(tracker.ref_count(SEG2), 1);
230 }
231
232 #[test]
233 fn test_tracker_release() {
234 let tracker = SegmentTracker::new();
235
236 tracker.register(SEG1);
237 tracker.acquire(&[SEG1.to_string()]);
238 tracker.acquire(&[SEG1.to_string()]);
239
240 assert_eq!(tracker.ref_count(SEG1), 2);
241
242 tracker.release(&[SEG1.to_string()]);
243 assert_eq!(tracker.ref_count(SEG1), 1);
244
245 tracker.release(&[SEG1.to_string()]);
246 assert_eq!(tracker.ref_count(SEG1), 0);
247 }
248
249 #[test]
250 fn test_tracker_mark_for_deletion_no_refs() {
251 let tracker = SegmentTracker::new();
252
253 tracker.register(SEG1);
254
255 let ready = tracker.mark_for_deletion(&[SEG1.to_string()]);
256 assert_eq!(ready.len(), 1);
257 assert!(!tracker.is_pending_deletion(SEG1));
258 }
259
260 #[test]
261 fn test_tracker_mark_for_deletion_with_refs() {
262 let tracker = SegmentTracker::new();
263
264 tracker.register(SEG1);
265 tracker.acquire(&[SEG1.to_string()]);
266
267 let ready = tracker.mark_for_deletion(&[SEG1.to_string()]);
268 assert!(ready.is_empty());
269 assert!(tracker.is_pending_deletion(SEG1));
270
271 let deleted = tracker.release(&[SEG1.to_string()]);
272 assert_eq!(deleted.len(), 1);
273 assert!(!tracker.is_pending_deletion(SEG1));
274 }
275
276 #[test]
277 fn test_tracker_active_segments() {
278 let tracker = SegmentTracker::new();
279
280 tracker.register(SEG1);
281 tracker.register(SEG2);
282 tracker.register(SEG3);
283
284 tracker.acquire(&[SEG2.to_string()]);
285 tracker.mark_for_deletion(&[SEG2.to_string()]);
286
287 let active = tracker.get_active_segments();
288 assert!(active.contains(&SEG1.to_string()));
289 assert!(!active.contains(&SEG2.to_string()));
290 assert!(active.contains(&SEG3.to_string()));
291 }
292}