use std::{
collections::{
HashMap,
HashSet,
},
path::PathBuf,
sync::Arc,
};
use parking_lot::RwLock;
use crate::segment::Segment;
pub struct SegmentRegistry {
base_path: PathBuf,
live_segments: RwLock<HashSet<u64>>,
segments: RwLock<HashMap<u64, (Arc<Segment>, PathBuf)>>,
pending_deletion: RwLock<HashSet<u64>>,
}
impl SegmentRegistry {
pub fn new(base_path: PathBuf) -> Self {
Self {
base_path,
live_segments: RwLock::new(HashSet::new()),
segments: RwLock::new(HashMap::new()),
pending_deletion: RwLock::new(HashSet::new()),
}
}
pub fn register(&self, segment: Arc<Segment>, path: PathBuf) {
let id = segment.id();
let mut live = self.live_segments.write();
let mut segments = self.segments.write();
live.insert(id);
segments.insert(id, (segment, path));
}
pub fn mark_for_deletion(&self, segment_id: u64) {
let mut live = self.live_segments.write();
let mut pending = self.pending_deletion.write();
live.remove(&segment_id);
pending.insert(segment_id);
}
pub fn get(&self, segment_id: u64) -> Option<Arc<Segment>> {
self.segments
.read()
.get(&segment_id)
.map(|(seg, _)| Arc::clone(seg))
}
pub fn is_live(&self, segment_id: u64) -> bool {
self.live_segments.read().contains(&segment_id)
}
pub fn live_count(&self) -> usize {
self.live_segments.read().len()
}
pub fn pending_deletion_count(&self) -> usize {
self.pending_deletion.read().len()
}
pub fn cleanup(&self) -> (usize, u64) {
let mut pending = self.pending_deletion.write();
let mut segments = self.segments.write();
let to_delete: Vec<u64> = pending
.iter()
.filter(|id| {
segments
.get(id)
.map(|(seg, _)| Arc::strong_count(seg) == 1)
.unwrap_or(false)
})
.cloned()
.collect();
let mut bytes_freed: u64 = 0;
for id in &to_delete {
pending.remove(id);
if let Some((segment, path)) = segments.remove(id) {
bytes_freed += segment.size_in_bytes();
drop(segment);
if path.exists() {
if let Err(e) = std::fs::remove_dir_all(&path) {
tracing::error!(
segment_id = id,
path = ?path,
error = ?e,
"Failed to delete segment directory"
);
} else {
tracing::debug!(segment_id = id, path = ?path, "Deleted segment directory");
}
}
}
}
(to_delete.len(), bytes_freed)
}
pub fn force_remove(&self, segment_id: u64) -> Option<Arc<Segment>> {
let mut live = self.live_segments.write();
let mut pending = self.pending_deletion.write();
let mut segments = self.segments.write();
live.remove(&segment_id);
pending.remove(&segment_id);
segments.remove(&segment_id).map(|(seg, _)| seg)
}
pub fn stats(&self) -> RegistryStats {
RegistryStats {
live: self.live_count(),
pending_deletion: self.pending_deletion_count(),
total_tracked: self.segments.read().len(),
}
}
pub fn base_path(&self) -> &PathBuf {
&self.base_path
}
#[cfg(test)]
pub fn clear(&self) {
self.live_segments.write().clear();
self.segments.write().clear();
self.pending_deletion.write().clear();
}
}
impl Default for SegmentRegistry {
fn default() -> Self {
Self::new(PathBuf::from("."))
}
}
#[derive(Debug, Clone, Copy)]
pub struct RegistryStats {
pub live: usize,
pub pending_deletion: usize,
pub total_tracked: usize,
}
impl std::fmt::Display for RegistryStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Registry: {} live, {} pending deletion, {} total",
self.live, self.pending_deletion, self.total_tracked
)
}
}
#[cfg(test)]
mod tests {
use tempfile::TempDir;
use super::*;
use crate::{
segment::Segment,
segment_builder::SegmentBuilder,
};
fn create_test_segment_with_dir(id: u64) -> (Arc<Segment>, TempDir) {
let temp_dir = TempDir::new().unwrap();
let builder = SegmentBuilder::new(temp_dir.path().to_path_buf()).unwrap();
let segment = builder.new_segment(id, 12345, 64 * 1024 * 1024).unwrap();
(segment, temp_dir)
}
#[test]
fn test_registry_creation() {
let registry = SegmentRegistry::new(PathBuf::from("."));
assert_eq!(registry.live_count(), 0);
assert_eq!(registry.pending_deletion_count(), 0);
}
#[test]
fn test_register_segment() {
let registry = SegmentRegistry::new(PathBuf::from("."));
let (segment, temp_dir) = create_test_segment_with_dir(1);
registry.register(segment.clone(), temp_dir.path().to_path_buf());
assert_eq!(registry.live_count(), 1);
assert!(registry.is_live(1));
assert_eq!(Arc::strong_count(&segment), 2); }
#[test]
fn test_mark_for_deletion() {
let registry = SegmentRegistry::new(PathBuf::from("."));
let (segment, temp_dir) = create_test_segment_with_dir(1);
registry.register(segment.clone(), temp_dir.path().to_path_buf());
assert!(registry.is_live(1));
registry.mark_for_deletion(1);
assert!(!registry.is_live(1));
assert_eq!(registry.pending_deletion_count(), 1);
}
#[test]
fn test_cleanup_with_no_external_refs() {
let registry = SegmentRegistry::new(PathBuf::from("."));
let (segment, temp_dir) = create_test_segment_with_dir(1);
let path = temp_dir.path().to_path_buf();
registry.register(segment.clone(), path.clone());
drop(segment);
registry.mark_for_deletion(1);
assert_eq!(registry.pending_deletion_count(), 1);
let (deleted, _bytes_freed) = registry.cleanup();
assert_eq!(deleted, 1);
assert_eq!(registry.pending_deletion_count(), 0);
assert_eq!(registry.live_count(), 0);
}
#[test]
fn test_cleanup_with_external_refs() {
let registry = SegmentRegistry::new(PathBuf::from("."));
let (segment, temp_dir) = create_test_segment_with_dir(1);
let path = temp_dir.path().to_path_buf();
registry.register(segment.clone(), path.clone());
registry.mark_for_deletion(1);
assert_eq!(registry.pending_deletion_count(), 1);
let (deleted, _) = registry.cleanup();
assert_eq!(deleted, 0);
assert_eq!(registry.pending_deletion_count(), 1);
drop(segment);
let (deleted, _) = registry.cleanup();
assert_eq!(deleted, 1);
assert_eq!(registry.pending_deletion_count(), 0);
}
#[test]
fn test_get_segment() {
let registry = SegmentRegistry::new(PathBuf::from("."));
let (segment, temp_dir) = create_test_segment_with_dir(1);
registry.register(segment.clone(), temp_dir.path().to_path_buf());
let retrieved = registry.get(1);
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().id(), 1);
let non_existent = registry.get(999);
assert!(non_existent.is_none());
}
#[test]
fn test_force_remove() {
let registry = SegmentRegistry::new(PathBuf::from("."));
let (segment, temp_dir) = create_test_segment_with_dir(1);
registry.register(segment.clone(), temp_dir.path().to_path_buf());
assert!(registry.is_live(1));
let removed = registry.force_remove(1);
assert!(removed.is_some());
assert!(!registry.is_live(1));
assert_eq!(registry.live_count(), 0);
}
#[test]
fn test_stats() {
let registry = SegmentRegistry::new(PathBuf::from("."));
let (seg1, temp1) = create_test_segment_with_dir(1);
let (seg2, temp2) = create_test_segment_with_dir(2);
registry.register(seg1, temp1.path().to_path_buf());
registry.register(seg2.clone(), temp2.path().to_path_buf());
let stats = registry.stats();
assert_eq!(stats.live, 2);
assert_eq!(stats.pending_deletion, 0);
assert_eq!(stats.total_tracked, 2);
registry.mark_for_deletion(1);
let stats = registry.stats();
assert_eq!(stats.live, 1);
assert_eq!(stats.pending_deletion, 1);
assert_eq!(stats.total_tracked, 2);
}
#[test]
fn test_concurrent_access() {
use std::thread;
let registry = Arc::new(SegmentRegistry::new(PathBuf::from(".")));
let mut handles = vec![];
for i in 0..10 {
let reg = registry.clone();
handles.push(thread::spawn(move || {
let (segment, temp_dir) = create_test_segment_with_dir(i);
reg.register(segment, temp_dir.path().to_path_buf());
}));
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(registry.live_count(), 10);
}
#[test]
fn test_cleanup_concurrent_readers() {
use std::{
sync::atomic::{
AtomicBool,
Ordering,
},
thread,
};
let registry = Arc::new(SegmentRegistry::new(PathBuf::from(".")));
let mut temp_dirs = Vec::new();
for i in 0..10 {
let (segment, temp_dir) = create_test_segment_with_dir(i);
temp_dirs.push(temp_dir);
registry.register(segment, temp_dirs[i as usize].path().to_path_buf());
registry.mark_for_deletion(i);
}
let done = Arc::new(AtomicBool::new(false));
let mut handles = vec![];
for _ in 0..10 {
let reg = registry.clone();
let done = done.clone();
handles.push(thread::spawn(move || {
let mut reads = 0u64;
while !done.load(Ordering::Relaxed) {
for id in 0..10 {
let _ = reg.get(id);
reads += 1;
}
}
reads
}));
}
let cleanup_reg = registry.clone();
let cleanup_handle = thread::spawn(move || {
let mut total_deleted = 0;
for _ in 0..1000 {
total_deleted += cleanup_reg.cleanup().0;
}
total_deleted
});
let total_deleted = cleanup_handle.join().unwrap();
done.store(true, Ordering::Relaxed);
let total_reads: u64 = handles.into_iter().map(|h| h.join().unwrap()).sum();
assert_eq!(total_deleted, 10);
assert_eq!(registry.pending_deletion_count(), 0);
assert!(total_reads > 0, "readers should have completed some reads");
}
#[test]
fn test_cleanup_idempotent() {
let registry = SegmentRegistry::new(PathBuf::from("."));
let (segment, temp_dir) = create_test_segment_with_dir(1);
registry.register(segment, temp_dir.path().to_path_buf());
registry.mark_for_deletion(1);
let (deleted, _) = registry.cleanup();
assert_eq!(deleted, 1);
let (deleted, _) = registry.cleanup();
assert_eq!(deleted, 0);
assert_eq!(registry.pending_deletion_count(), 0);
}
#[test]
fn test_cleanup_with_active_reader() {
let registry = SegmentRegistry::new(PathBuf::from("."));
let (segment, temp_dir) = create_test_segment_with_dir(1);
registry.register(segment, temp_dir.path().to_path_buf());
let reader_ref = registry.get(1).unwrap();
assert_eq!(Arc::strong_count(&reader_ref), 2);
registry.mark_for_deletion(1);
let (deleted, _) = registry.cleanup();
assert_eq!(deleted, 0);
assert_eq!(registry.pending_deletion_count(), 1);
drop(reader_ref);
let (deleted, _) = registry.cleanup();
assert_eq!(deleted, 1);
assert_eq!(registry.pending_deletion_count(), 0);
}
#[test]
fn test_cleanup_deletes_files() {
let registry = SegmentRegistry::new(PathBuf::from("."));
let (segment, temp_dir) = create_test_segment_with_dir(1);
let path = temp_dir.path().to_path_buf();
registry.register(segment, path.clone());
assert!(path.exists());
registry.mark_for_deletion(1);
let (deleted, _bytes_freed) = registry.cleanup();
assert_eq!(deleted, 1);
assert!(!path.exists(), "segment directory should be deleted");
}
#[test]
fn test_cleanup_does_not_delete_live_files() {
let registry = SegmentRegistry::new(PathBuf::from("."));
let (segment, temp_dir) = create_test_segment_with_dir(1);
let path = temp_dir.path().to_path_buf();
registry.register(segment, path.clone());
let (deleted, _) = registry.cleanup();
assert_eq!(deleted, 0);
assert!(
path.exists(),
"live segment directory should NOT be deleted"
);
}
#[test]
fn test_cleanup_returns_bytes_freed() {
let registry = SegmentRegistry::new(PathBuf::from("."));
let (segment, temp_dir) = create_test_segment_with_dir(1);
let path = temp_dir.path().to_path_buf();
registry.register(segment, path.clone());
registry.mark_for_deletion(1);
let (deleted, _bytes_freed) = registry.cleanup();
assert_eq!(deleted, 1);
assert!(!path.exists(), "segment directory should be deleted");
}
}