use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::Mutex;
use crate::segment::SegmentId;
struct TrackerInner {
ref_counts: HashMap<String, usize>,
pending_deletions: HashMap<String, SegmentId>,
}
pub struct SegmentTracker {
inner: Mutex<TrackerInner>,
}
impl SegmentTracker {
pub fn new() -> Self {
Self {
inner: Mutex::new(TrackerInner {
ref_counts: HashMap::new(),
pending_deletions: HashMap::new(),
}),
}
}
pub fn register(&self, segment_id: &str) {
let mut inner = self.inner.lock();
inner.ref_counts.entry(segment_id.to_string()).or_insert(0);
}
pub fn acquire(&self, segment_ids: &[String]) -> Vec<String> {
let mut inner = self.inner.lock();
let mut acquired = Vec::with_capacity(segment_ids.len());
for id in segment_ids {
if inner.pending_deletions.contains_key(id) {
continue;
}
*inner.ref_counts.entry(id.clone()).or_insert(0) += 1;
acquired.push(id.clone());
}
acquired
}
pub fn release(&self, segment_ids: &[String]) -> Vec<SegmentId> {
let mut inner = self.inner.lock();
let mut ready_for_deletion = Vec::new();
for id in segment_ids {
if let Some(count) = inner.ref_counts.get_mut(id) {
*count = count.saturating_sub(1);
if *count == 0
&& let Some(segment_id) = inner.pending_deletions.remove(id)
{
inner.ref_counts.remove(id);
ready_for_deletion.push(segment_id);
}
}
}
ready_for_deletion
}
pub fn mark_for_deletion(&self, segment_ids: &[String]) -> Vec<SegmentId> {
let mut inner = self.inner.lock();
let mut ready_for_deletion = Vec::new();
for id_str in segment_ids {
let Some(segment_id) = SegmentId::from_hex(id_str) else {
continue;
};
let Some(&ref_count) = inner.ref_counts.get(id_str) else {
continue;
};
if ref_count == 0 {
inner.ref_counts.remove(id_str);
ready_for_deletion.push(segment_id);
} else {
inner.pending_deletions.insert(id_str.clone(), segment_id);
}
}
ready_for_deletion
}
pub fn is_pending_deletion(&self, segment_id: &str) -> bool {
self.inner.lock().pending_deletions.contains_key(segment_id)
}
pub fn ref_count(&self, segment_id: &str) -> usize {
self.inner
.lock()
.ref_counts
.get(segment_id)
.copied()
.unwrap_or(0)
}
}
impl Default for SegmentTracker {
fn default() -> Self {
Self::new()
}
}
pub struct SegmentSnapshot {
tracker: Arc<SegmentTracker>,
segment_ids: Vec<String>,
delete_fn: Option<Arc<dyn Fn(Vec<SegmentId>) + Send + Sync>>,
}
impl SegmentSnapshot {
pub fn new(tracker: Arc<SegmentTracker>, segment_ids: Vec<String>) -> Self {
Self {
tracker,
segment_ids,
delete_fn: None,
}
}
pub fn with_delete_fn(
tracker: Arc<SegmentTracker>,
segment_ids: Vec<String>,
delete_fn: Arc<dyn Fn(Vec<SegmentId>) + Send + Sync>,
) -> Self {
Self {
tracker,
segment_ids,
delete_fn: Some(delete_fn),
}
}
pub fn segment_ids(&self) -> &[String] {
&self.segment_ids
}
pub fn is_empty(&self) -> bool {
self.segment_ids.is_empty()
}
pub fn len(&self) -> usize {
self.segment_ids.len()
}
}
impl Drop for SegmentSnapshot {
fn drop(&mut self) {
let to_delete = self.tracker.release(&self.segment_ids);
if !to_delete.is_empty() {
if let Some(delete_fn) = &self.delete_fn {
log::info!(
"[segment_snapshot] dropping snapshot, deleting {} deferred segments",
to_delete.len()
);
delete_fn(to_delete);
} else {
log::warn!(
"[segment_snapshot] {} segments ready for deletion but no delete_fn provided",
to_delete.len()
);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
const SEG1: &str = "00000000000000000000000000000001";
const SEG2: &str = "00000000000000000000000000000002";
#[test]
fn test_tracker_register_and_acquire() {
let tracker = SegmentTracker::new();
tracker.register(SEG1);
tracker.register(SEG2);
let acquired = tracker.acquire(&[SEG1.to_string(), SEG2.to_string()]);
assert_eq!(acquired.len(), 2);
assert_eq!(tracker.ref_count(SEG1), 1);
assert_eq!(tracker.ref_count(SEG2), 1);
}
#[test]
fn test_tracker_release() {
let tracker = SegmentTracker::new();
tracker.register(SEG1);
tracker.acquire(&[SEG1.to_string()]);
tracker.acquire(&[SEG1.to_string()]);
assert_eq!(tracker.ref_count(SEG1), 2);
tracker.release(&[SEG1.to_string()]);
assert_eq!(tracker.ref_count(SEG1), 1);
tracker.release(&[SEG1.to_string()]);
assert_eq!(tracker.ref_count(SEG1), 0);
}
#[test]
fn test_tracker_mark_for_deletion_no_refs() {
let tracker = SegmentTracker::new();
tracker.register(SEG1);
let ready = tracker.mark_for_deletion(&[SEG1.to_string()]);
assert_eq!(ready.len(), 1);
assert!(!tracker.is_pending_deletion(SEG1));
}
#[test]
fn test_tracker_mark_for_deletion_with_refs() {
let tracker = SegmentTracker::new();
tracker.register(SEG1);
tracker.acquire(&[SEG1.to_string()]);
let ready = tracker.mark_for_deletion(&[SEG1.to_string()]);
assert!(ready.is_empty());
assert!(tracker.is_pending_deletion(SEG1));
let deleted = tracker.release(&[SEG1.to_string()]);
assert_eq!(deleted.len(), 1);
assert!(!tracker.is_pending_deletion(SEG1));
}
#[test]
fn test_tracker_double_mark_for_deletion() {
let tracker = SegmentTracker::new();
tracker.register(SEG1);
let ready1 = tracker.mark_for_deletion(&[SEG1.to_string()]);
assert_eq!(ready1.len(), 1);
let ready2 = tracker.mark_for_deletion(&[SEG1.to_string()]);
assert!(ready2.is_empty());
}
#[test]
fn test_tracker_acquire_unregistered() {
let tracker = SegmentTracker::new();
let acquired = tracker.acquire(&[SEG1.to_string()]);
assert_eq!(acquired.len(), 1);
assert_eq!(tracker.ref_count(SEG1), 1);
}
#[test]
fn test_tracker_release_without_acquire() {
let tracker = SegmentTracker::new();
tracker.register(SEG1);
let deleted = tracker.release(&[SEG1.to_string()]);
assert!(deleted.is_empty());
assert_eq!(tracker.ref_count(SEG1), 0);
}
#[test]
fn test_snapshot_drop_triggers_deferred_delete() {
use std::sync::atomic::{AtomicUsize, Ordering};
let tracker = Arc::new(SegmentTracker::new());
tracker.register(SEG1);
tracker.register(SEG2);
let delete_count = Arc::new(AtomicUsize::new(0));
let dc = Arc::clone(&delete_count);
let delete_fn: Arc<dyn Fn(Vec<SegmentId>) + Send + Sync> = Arc::new(move |ids| {
dc.fetch_add(ids.len(), Ordering::SeqCst);
});
let acquired = tracker.acquire(&[SEG1.to_string(), SEG2.to_string()]);
let snapshot =
SegmentSnapshot::with_delete_fn(Arc::clone(&tracker), acquired, Arc::clone(&delete_fn));
let ready = tracker.mark_for_deletion(&[SEG1.to_string(), SEG2.to_string()]);
assert!(ready.is_empty());
assert!(tracker.is_pending_deletion(SEG1));
assert!(tracker.is_pending_deletion(SEG2));
drop(snapshot);
assert_eq!(delete_count.load(Ordering::SeqCst), 2);
assert!(!tracker.is_pending_deletion(SEG1));
assert!(!tracker.is_pending_deletion(SEG2));
}
}