hermes_core/segment/
tracker.rs1use std::collections::HashMap;
12use std::sync::Arc;
13
14use parking_lot::RwLock;
15
16use crate::directories::Directory;
17use crate::segment::SegmentId;
18
19pub struct SegmentTracker {
21 ref_counts: RwLock<HashMap<String, usize>>,
23 pending_deletions: RwLock<HashMap<String, PendingDeletion>>,
25}
26
27struct PendingDeletion {
29 segment_id: SegmentId,
30}
31
32impl SegmentTracker {
33 pub fn new() -> Self {
35 Self {
36 ref_counts: RwLock::new(HashMap::new()),
37 pending_deletions: RwLock::new(HashMap::new()),
38 }
39 }
40
41 pub fn register(&self, segment_id: &str) {
43 let mut refs = self.ref_counts.write();
44 refs.entry(segment_id.to_string()).or_insert(0);
45 }
46
47 pub fn acquire(&self, segment_ids: &[String]) -> Vec<String> {
50 let mut refs = self.ref_counts.write();
51 let pending = self.pending_deletions.read();
52
53 let mut acquired = Vec::with_capacity(segment_ids.len());
54 for id in segment_ids {
55 if pending.contains_key(id) {
57 continue;
58 }
59 *refs.entry(id.clone()).or_insert(0) += 1;
60 acquired.push(id.clone());
61 }
62 acquired
63 }
64
65 pub fn release(&self, segment_ids: &[String]) -> Vec<SegmentId> {
68 let mut refs = self.ref_counts.write();
69 let mut pending = self.pending_deletions.write();
70
71 let mut ready_for_deletion = Vec::new();
72
73 for id in segment_ids {
74 if let Some(count) = refs.get_mut(id) {
75 *count = count.saturating_sub(1);
76
77 if *count == 0
79 && let Some(deletion) = pending.remove(id)
80 {
81 refs.remove(id);
82 ready_for_deletion.push(deletion.segment_id);
83 }
84 }
85 }
86
87 ready_for_deletion
88 }
89
90 pub fn mark_for_deletion(&self, segment_ids: &[String]) -> Vec<SegmentId> {
94 let mut refs = self.ref_counts.write();
95 let mut pending = self.pending_deletions.write();
96
97 let mut ready_for_deletion = Vec::new();
98
99 for id_str in segment_ids {
100 let Some(segment_id) = SegmentId::from_hex(id_str) else {
101 continue;
102 };
103
104 let ref_count = refs.get(id_str).copied().unwrap_or(0);
105
106 if ref_count == 0 {
107 refs.remove(id_str);
109 ready_for_deletion.push(segment_id);
110 } else {
111 pending.insert(id_str.clone(), PendingDeletion { segment_id });
113 }
114 }
115
116 ready_for_deletion
117 }
118
119 pub fn get_active_segments(&self) -> Vec<String> {
121 let refs = self.ref_counts.read();
122 let pending = self.pending_deletions.read();
123
124 refs.keys()
125 .filter(|id| !pending.contains_key(*id))
126 .cloned()
127 .collect()
128 }
129
130 pub fn ref_count(&self, segment_id: &str) -> usize {
132 self.ref_counts.read().get(segment_id).copied().unwrap_or(0)
133 }
134
135 pub fn is_pending_deletion(&self, segment_id: &str) -> bool {
137 self.pending_deletions.read().contains_key(segment_id)
138 }
139}
140
141impl Default for SegmentTracker {
142 fn default() -> Self {
143 Self::new()
144 }
145}
146
147pub struct SegmentSnapshot<D: Directory + 'static> {
150 tracker: Arc<SegmentTracker>,
151 segment_ids: Vec<String>,
152 directory: Arc<D>,
153}
154
155impl<D: Directory + 'static> SegmentSnapshot<D> {
156 pub fn new(tracker: Arc<SegmentTracker>, directory: Arc<D>, segment_ids: Vec<String>) -> Self {
158 Self {
160 tracker,
161 segment_ids,
162 directory,
163 }
164 }
165
166 pub fn segment_ids(&self) -> &[String] {
168 &self.segment_ids
169 }
170
171 pub fn is_empty(&self) -> bool {
173 self.segment_ids.is_empty()
174 }
175
176 pub fn len(&self) -> usize {
178 self.segment_ids.len()
179 }
180}
181
182impl<D: Directory + 'static> Drop for SegmentSnapshot<D> {
183 fn drop(&mut self) {
184 let _to_delete = self.tracker.release(&self.segment_ids);
186 let _ = _to_delete; let _ = &self.directory; }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194
195 const SEG1: &str = "00000000000000000000000000000001";
197 const SEG2: &str = "00000000000000000000000000000002";
198 const SEG3: &str = "00000000000000000000000000000003";
199
200 #[test]
201 fn test_tracker_register_and_acquire() {
202 let tracker = SegmentTracker::new();
203
204 tracker.register(SEG1);
205 tracker.register(SEG2);
206
207 let acquired = tracker.acquire(&[SEG1.to_string(), SEG2.to_string()]);
208 assert_eq!(acquired.len(), 2);
209
210 assert_eq!(tracker.ref_count(SEG1), 1);
211 assert_eq!(tracker.ref_count(SEG2), 1);
212 }
213
214 #[test]
215 fn test_tracker_release() {
216 let tracker = SegmentTracker::new();
217
218 tracker.register(SEG1);
219 tracker.acquire(&[SEG1.to_string()]);
220 tracker.acquire(&[SEG1.to_string()]);
221
222 assert_eq!(tracker.ref_count(SEG1), 2);
223
224 tracker.release(&[SEG1.to_string()]);
225 assert_eq!(tracker.ref_count(SEG1), 1);
226
227 tracker.release(&[SEG1.to_string()]);
228 assert_eq!(tracker.ref_count(SEG1), 0);
229 }
230
231 #[test]
232 fn test_tracker_mark_for_deletion_no_refs() {
233 let tracker = SegmentTracker::new();
234
235 tracker.register(SEG1);
236
237 let ready = tracker.mark_for_deletion(&[SEG1.to_string()]);
238 assert_eq!(ready.len(), 1);
239 assert!(!tracker.is_pending_deletion(SEG1));
240 }
241
242 #[test]
243 fn test_tracker_mark_for_deletion_with_refs() {
244 let tracker = SegmentTracker::new();
245
246 tracker.register(SEG1);
247 tracker.acquire(&[SEG1.to_string()]);
248
249 let ready = tracker.mark_for_deletion(&[SEG1.to_string()]);
250 assert!(ready.is_empty());
251 assert!(tracker.is_pending_deletion(SEG1));
252
253 let deleted = tracker.release(&[SEG1.to_string()]);
255 assert_eq!(deleted.len(), 1);
256 assert!(!tracker.is_pending_deletion(SEG1));
257 }
258
259 #[test]
260 fn test_tracker_active_segments() {
261 let tracker = SegmentTracker::new();
262
263 tracker.register(SEG1);
264 tracker.register(SEG2);
265 tracker.register(SEG3);
266
267 tracker.acquire(&[SEG2.to_string()]);
268 tracker.mark_for_deletion(&[SEG2.to_string()]);
269
270 let active = tracker.get_active_segments();
271 assert!(active.contains(&SEG1.to_string()));
272 assert!(!active.contains(&SEG2.to_string())); assert!(active.contains(&SEG3.to_string()));
274 }
275}