use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::RwLock;
use crate::directories::DirectoryWriter;
use crate::segment::SegmentId;
pub struct SegmentTracker {
ref_counts: RwLock<HashMap<String, usize>>,
pending_deletions: RwLock<HashMap<String, PendingDeletion>>,
}
struct PendingDeletion {
segment_id: SegmentId,
}
impl SegmentTracker {
pub fn new() -> Self {
Self {
ref_counts: RwLock::new(HashMap::new()),
pending_deletions: RwLock::new(HashMap::new()),
}
}
pub fn register(&self, segment_id: &str) {
let mut refs = self.ref_counts.write();
refs.entry(segment_id.to_string()).or_insert(0);
}
pub fn acquire(&self, segment_ids: &[String]) -> Vec<String> {
let mut refs = self.ref_counts.write();
let pending = self.pending_deletions.read();
let mut acquired = Vec::with_capacity(segment_ids.len());
for id in segment_ids {
if pending.contains_key(id) {
continue;
}
*refs.entry(id.clone()).or_insert(0) += 1;
acquired.push(id.clone());
}
acquired
}
pub fn release(&self, segment_ids: &[String]) -> Vec<SegmentId> {
let mut refs = self.ref_counts.write();
let mut pending = self.pending_deletions.write();
let mut ready_for_deletion = Vec::new();
for id in segment_ids {
if let Some(count) = refs.get_mut(id) {
*count = count.saturating_sub(1);
if *count == 0
&& let Some(deletion) = pending.remove(id)
{
refs.remove(id);
ready_for_deletion.push(deletion.segment_id);
}
}
}
ready_for_deletion
}
pub fn mark_for_deletion(&self, segment_ids: &[String]) -> Vec<SegmentId> {
let mut refs = self.ref_counts.write();
let mut pending = self.pending_deletions.write();
let mut ready_for_deletion = Vec::new();
for id_str in segment_ids {
let Some(segment_id) = SegmentId::from_hex(id_str) else {
continue;
};
let ref_count = refs.get(id_str).copied().unwrap_or(0);
if ref_count == 0 {
refs.remove(id_str);
ready_for_deletion.push(segment_id);
} else {
pending.insert(id_str.clone(), PendingDeletion { segment_id });
}
}
ready_for_deletion
}
pub fn get_active_segments(&self) -> Vec<String> {
let refs = self.ref_counts.read();
let pending = self.pending_deletions.read();
refs.keys()
.filter(|id| !pending.contains_key(*id))
.cloned()
.collect()
}
pub fn ref_count(&self, segment_id: &str) -> usize {
self.ref_counts.read().get(segment_id).copied().unwrap_or(0)
}
pub fn is_pending_deletion(&self, segment_id: &str) -> bool {
self.pending_deletions.read().contains_key(segment_id)
}
}
impl Default for SegmentTracker {
fn default() -> Self {
Self::new()
}
}
pub struct SegmentSnapshot<D: DirectoryWriter + 'static> {
tracker: Arc<SegmentTracker>,
segment_ids: Vec<String>,
directory: Arc<D>,
}
impl<D: DirectoryWriter + 'static> SegmentSnapshot<D> {
pub fn new(tracker: Arc<SegmentTracker>, directory: Arc<D>, segment_ids: Vec<String>) -> Self {
Self {
tracker,
segment_ids,
directory,
}
}
pub fn segment_ids(&self) -> &[String] {
&self.segment_ids
}
pub fn is_empty(&self) -> bool {
self.segment_ids.is_empty()
}
pub fn len(&self) -> usize {
self.segment_ids.len()
}
}
impl<D: DirectoryWriter + 'static> Drop for SegmentSnapshot<D> {
fn drop(&mut self) {
let to_delete = self.tracker.release(&self.segment_ids);
if !to_delete.is_empty() {
let dir = Arc::clone(&self.directory);
tokio::spawn(async move {
for segment_id in to_delete {
let _ = crate::segment::delete_segment(dir.as_ref(), segment_id).await;
}
});
}
}
}
#[cfg(test)]
mod tests {
use super::*;
const SEG1: &str = "00000000000000000000000000000001";
const SEG2: &str = "00000000000000000000000000000002";
const SEG3: &str = "00000000000000000000000000000003";
#[test]
fn test_tracker_register_and_acquire() {
let tracker = SegmentTracker::new();
tracker.register(SEG1);
tracker.register(SEG2);
let acquired = tracker.acquire(&[SEG1.to_string(), SEG2.to_string()]);
assert_eq!(acquired.len(), 2);
assert_eq!(tracker.ref_count(SEG1), 1);
assert_eq!(tracker.ref_count(SEG2), 1);
}
#[test]
fn test_tracker_release() {
let tracker = SegmentTracker::new();
tracker.register(SEG1);
tracker.acquire(&[SEG1.to_string()]);
tracker.acquire(&[SEG1.to_string()]);
assert_eq!(tracker.ref_count(SEG1), 2);
tracker.release(&[SEG1.to_string()]);
assert_eq!(tracker.ref_count(SEG1), 1);
tracker.release(&[SEG1.to_string()]);
assert_eq!(tracker.ref_count(SEG1), 0);
}
#[test]
fn test_tracker_mark_for_deletion_no_refs() {
let tracker = SegmentTracker::new();
tracker.register(SEG1);
let ready = tracker.mark_for_deletion(&[SEG1.to_string()]);
assert_eq!(ready.len(), 1);
assert!(!tracker.is_pending_deletion(SEG1));
}
#[test]
fn test_tracker_mark_for_deletion_with_refs() {
let tracker = SegmentTracker::new();
tracker.register(SEG1);
tracker.acquire(&[SEG1.to_string()]);
let ready = tracker.mark_for_deletion(&[SEG1.to_string()]);
assert!(ready.is_empty());
assert!(tracker.is_pending_deletion(SEG1));
let deleted = tracker.release(&[SEG1.to_string()]);
assert_eq!(deleted.len(), 1);
assert!(!tracker.is_pending_deletion(SEG1));
}
#[test]
fn test_tracker_active_segments() {
let tracker = SegmentTracker::new();
tracker.register(SEG1);
tracker.register(SEG2);
tracker.register(SEG3);
tracker.acquire(&[SEG2.to_string()]);
tracker.mark_for_deletion(&[SEG2.to_string()]);
let active = tracker.get_active_segments();
assert!(active.contains(&SEG1.to_string()));
assert!(!active.contains(&SEG2.to_string())); assert!(active.contains(&SEG3.to_string()));
}
}