hermes_core/segment/
tracker.rs1use std::collections::HashMap;
12use std::sync::Arc;
13
14use parking_lot::RwLock;
15
16use crate::directories::Directory;
17use crate::segment::SegmentId;
18
19pub struct SegmentTracker {
21 ref_counts: RwLock<HashMap<String, usize>>,
23 pending_deletions: RwLock<HashMap<String, PendingDeletion>>,
25}
26
27struct PendingDeletion {
29 segment_id: SegmentId,
30}
31
32impl SegmentTracker {
33 pub fn new() -> Self {
35 Self {
36 ref_counts: RwLock::new(HashMap::new()),
37 pending_deletions: RwLock::new(HashMap::new()),
38 }
39 }
40
41 pub fn register(&self, segment_id: &str) {
43 let mut refs = self.ref_counts.write();
44 refs.entry(segment_id.to_string()).or_insert(0);
45 }
46
47 pub fn acquire(&self, segment_ids: &[String]) -> Vec<String> {
50 let mut refs = self.ref_counts.write();
51 let pending = self.pending_deletions.read();
52
53 let mut acquired = Vec::with_capacity(segment_ids.len());
54 for id in segment_ids {
55 if pending.contains_key(id) {
57 continue;
58 }
59 *refs.entry(id.clone()).or_insert(0) += 1;
60 acquired.push(id.clone());
61 }
62 acquired
63 }
64
65 pub fn release(&self, segment_ids: &[String]) -> Vec<SegmentId> {
68 let mut refs = self.ref_counts.write();
69 let mut pending = self.pending_deletions.write();
70
71 let mut ready_for_deletion = Vec::new();
72
73 for id in segment_ids {
74 if let Some(count) = refs.get_mut(id) {
75 *count = count.saturating_sub(1);
76
77 if *count == 0
79 && let Some(deletion) = pending.remove(id)
80 {
81 refs.remove(id);
82 ready_for_deletion.push(deletion.segment_id);
83 }
84 }
85 }
86
87 ready_for_deletion
88 }
89
90 pub fn mark_for_deletion(&self, segment_ids: &[String]) -> Vec<SegmentId> {
94 let mut refs = self.ref_counts.write();
95 let mut pending = self.pending_deletions.write();
96
97 let mut ready_for_deletion = Vec::new();
98
99 for id_str in segment_ids {
100 let Some(segment_id) = SegmentId::from_hex(id_str) else {
101 continue;
102 };
103
104 let ref_count = refs.get(id_str).copied().unwrap_or(0);
105
106 if ref_count == 0 {
107 refs.remove(id_str);
109 ready_for_deletion.push(segment_id);
110 } else {
111 pending.insert(id_str.clone(), PendingDeletion { segment_id });
113 }
114 }
115
116 ready_for_deletion
117 }
118
119 pub fn get_active_segments(&self) -> Vec<String> {
121 let refs = self.ref_counts.read();
122 let pending = self.pending_deletions.read();
123
124 refs.keys()
125 .filter(|id| !pending.contains_key(*id))
126 .cloned()
127 .collect()
128 }
129
130 pub fn ref_count(&self, segment_id: &str) -> usize {
132 self.ref_counts.read().get(segment_id).copied().unwrap_or(0)
133 }
134
135 pub fn is_pending_deletion(&self, segment_id: &str) -> bool {
137 self.pending_deletions.read().contains_key(segment_id)
138 }
139}
140
141impl Default for SegmentTracker {
142 fn default() -> Self {
143 Self::new()
144 }
145}
146
147pub struct SegmentSnapshot<D: Directory + 'static> {
151 tracker: Arc<SegmentTracker>,
152 segment_ids: Vec<String>,
153 _directory: std::marker::PhantomData<Arc<D>>,
155 delete_fn: Option<Arc<dyn Fn(Vec<SegmentId>) + Send + Sync>>,
158}
159
160impl<D: Directory + 'static> SegmentSnapshot<D> {
161 pub fn new(tracker: Arc<SegmentTracker>, _directory: Arc<D>, segment_ids: Vec<String>) -> Self {
163 Self {
165 tracker,
166 segment_ids,
167 _directory: std::marker::PhantomData,
168 delete_fn: None,
169 }
170 }
171
172 pub fn with_delete_fn(
174 tracker: Arc<SegmentTracker>,
175 _directory: Arc<D>,
176 segment_ids: Vec<String>,
177 delete_fn: Arc<dyn Fn(Vec<SegmentId>) + Send + Sync>,
178 ) -> Self {
179 Self {
180 tracker,
181 segment_ids,
182 _directory: std::marker::PhantomData,
183 delete_fn: Some(delete_fn),
184 }
185 }
186
187 pub fn segment_ids(&self) -> &[String] {
189 &self.segment_ids
190 }
191
192 pub fn is_empty(&self) -> bool {
194 self.segment_ids.is_empty()
195 }
196
197 pub fn len(&self) -> usize {
199 self.segment_ids.len()
200 }
201}
202
203impl<D: Directory + 'static> Drop for SegmentSnapshot<D> {
204 fn drop(&mut self) {
205 let to_delete = self.tracker.release(&self.segment_ids);
206 if !to_delete.is_empty() {
207 if let Some(delete_fn) = &self.delete_fn {
208 log::info!(
209 "[segment_snapshot] dropping snapshot, deleting {} deferred segments",
210 to_delete.len()
211 );
212 delete_fn(to_delete);
213 } else {
214 log::warn!(
215 "[segment_snapshot] {} segments ready for deletion but no delete_fn provided",
216 to_delete.len()
217 );
218 }
219 }
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226
227 const SEG1: &str = "00000000000000000000000000000001";
229 const SEG2: &str = "00000000000000000000000000000002";
230 const SEG3: &str = "00000000000000000000000000000003";
231
232 #[test]
233 fn test_tracker_register_and_acquire() {
234 let tracker = SegmentTracker::new();
235
236 tracker.register(SEG1);
237 tracker.register(SEG2);
238
239 let acquired = tracker.acquire(&[SEG1.to_string(), SEG2.to_string()]);
240 assert_eq!(acquired.len(), 2);
241
242 assert_eq!(tracker.ref_count(SEG1), 1);
243 assert_eq!(tracker.ref_count(SEG2), 1);
244 }
245
246 #[test]
247 fn test_tracker_release() {
248 let tracker = SegmentTracker::new();
249
250 tracker.register(SEG1);
251 tracker.acquire(&[SEG1.to_string()]);
252 tracker.acquire(&[SEG1.to_string()]);
253
254 assert_eq!(tracker.ref_count(SEG1), 2);
255
256 tracker.release(&[SEG1.to_string()]);
257 assert_eq!(tracker.ref_count(SEG1), 1);
258
259 tracker.release(&[SEG1.to_string()]);
260 assert_eq!(tracker.ref_count(SEG1), 0);
261 }
262
263 #[test]
264 fn test_tracker_mark_for_deletion_no_refs() {
265 let tracker = SegmentTracker::new();
266
267 tracker.register(SEG1);
268
269 let ready = tracker.mark_for_deletion(&[SEG1.to_string()]);
270 assert_eq!(ready.len(), 1);
271 assert!(!tracker.is_pending_deletion(SEG1));
272 }
273
274 #[test]
275 fn test_tracker_mark_for_deletion_with_refs() {
276 let tracker = SegmentTracker::new();
277
278 tracker.register(SEG1);
279 tracker.acquire(&[SEG1.to_string()]);
280
281 let ready = tracker.mark_for_deletion(&[SEG1.to_string()]);
282 assert!(ready.is_empty());
283 assert!(tracker.is_pending_deletion(SEG1));
284
285 let deleted = tracker.release(&[SEG1.to_string()]);
287 assert_eq!(deleted.len(), 1);
288 assert!(!tracker.is_pending_deletion(SEG1));
289 }
290
291 #[test]
292 fn test_tracker_active_segments() {
293 let tracker = SegmentTracker::new();
294
295 tracker.register(SEG1);
296 tracker.register(SEG2);
297 tracker.register(SEG3);
298
299 tracker.acquire(&[SEG2.to_string()]);
300 tracker.mark_for_deletion(&[SEG2.to_string()]);
301
302 let active = tracker.get_active_segments();
303 assert!(active.contains(&SEG1.to_string()));
304 assert!(!active.contains(&SEG2.to_string())); assert!(active.contains(&SEG3.to_string()));
306 }
307}