use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{SystemTime, UNIX_EPOCH};
use ahash::AHashMap;
use serde::{Deserialize, Serialize};
use crate::error::{LaurusError, Result};
use crate::storage::structured::{StructReader, StructWriter};
use crate::storage::{Storage, StorageInput, StorageOutput};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeletionConfig {
pub compaction_threshold: f64,
pub auto_compaction: bool,
pub compaction_interval_secs: u64,
pub max_bitmap_memory_mb: u64,
pub deletion_batch_size: usize,
pub enable_deletion_log: bool,
}
impl Default for DeletionConfig {
fn default() -> Self {
DeletionConfig {
compaction_threshold: 0.3,
auto_compaction: true,
compaction_interval_secs: 300, max_bitmap_memory_mb: 64,
deletion_batch_size: 1000,
enable_deletion_log: true,
}
}
}
#[derive(Debug)]
pub struct DeletionBitmap {
pub segment_id: String,
pub deleted_docs: RwLock<ahash::AHashSet<u64>>,
pub total_docs: AtomicU64,
pub min_doc_id: u64,
pub max_doc_id: u64,
pub deleted_count: AtomicU64,
pub last_modified: AtomicU64,
pub version: AtomicU64,
}
impl DeletionBitmap {
pub fn new(segment_id: String, min_doc_id: u64, max_doc_id: u64) -> Self {
let total_docs = if max_doc_id >= min_doc_id {
max_doc_id - min_doc_id + 1
} else {
0
};
DeletionBitmap {
segment_id,
deleted_docs: RwLock::new(ahash::AHashSet::new()),
total_docs: AtomicU64::new(total_docs),
min_doc_id,
max_doc_id,
deleted_count: AtomicU64::new(0),
last_modified: AtomicU64::new(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
),
version: AtomicU64::new(1),
}
}
pub fn delete_document(&self, doc_id: u64) -> Result<bool> {
if doc_id < self.min_doc_id || doc_id > self.max_doc_id {
return Err(LaurusError::index(format!(
"Document ID {doc_id} is out of range [{}, {}] for segment {}",
self.min_doc_id, self.max_doc_id, self.segment_id
)));
}
let mut docs = self.deleted_docs.write().unwrap();
let was_already_deleted = docs.contains(&doc_id);
if !was_already_deleted {
docs.insert(doc_id);
self.deleted_count.fetch_add(1, Ordering::SeqCst);
self.last_modified.store(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
Ordering::SeqCst,
);
self.version.fetch_add(1, Ordering::SeqCst);
}
Ok(!was_already_deleted)
}
pub fn resize(&self, new_size: u64) {
self.total_docs.store(new_size, Ordering::SeqCst);
}
pub fn is_deleted(&self, doc_id: u64) -> bool {
self.deleted_docs.read().unwrap().contains(&doc_id)
}
pub fn deletion_ratio(&self) -> f64 {
if self.total_docs.load(Ordering::SeqCst) == 0 {
0.0
} else {
self.deleted_count.load(Ordering::SeqCst) as f64
/ self.total_docs.load(Ordering::SeqCst) as f64
}
}
pub fn live_count(&self) -> u64 {
self.total_docs.load(Ordering::SeqCst) - self.deleted_count.load(Ordering::SeqCst)
}
pub fn needs_compaction(&self, threshold: f64) -> bool {
self.deletion_ratio() > threshold
}
pub fn get_deleted_docs(&self) -> Vec<u64> {
let docs = self.deleted_docs.read().unwrap();
docs.iter().cloned().collect()
}
pub fn memory_usage(&self) -> usize {
std::mem::size_of::<Self>() +
self.deleted_docs.read().unwrap().capacity() / 8 + self.segment_id.capacity()
}
pub fn write_to_storage<W: StorageOutput>(&self, writer: &mut StructWriter<W>) -> Result<()> {
writer.write_u32(0x44454C42)?; writer.write_u32(3)?;
writer.write_string(&self.segment_id)?;
writer.write_u64(self.total_docs.load(Ordering::SeqCst))?;
writer.write_u64(self.deleted_count.load(Ordering::SeqCst))?;
writer.write_u64(self.last_modified.load(Ordering::SeqCst))?;
writer.write_u64(self.version.load(Ordering::SeqCst))?;
writer.write_u64(self.min_doc_id)?;
writer.write_u64(self.max_doc_id)?;
let docs = self.deleted_docs.read().unwrap();
writer.write_varint(docs.len() as u64)?;
for &doc_id in docs.iter() {
writer.write_u64(doc_id)?;
}
Ok(())
}
pub fn read_from_storage<R: StorageInput>(reader: &mut StructReader<R>) -> Result<Self> {
let magic = reader.read_u32()?;
if magic != 0x44454C42 {
return Err(LaurusError::index("Invalid deletion bitmap format"));
}
let version = reader.read_u32()?;
if version == 1 {
let segment_id = reader.read_string()?;
let total_docs = reader.read_u64()?;
let deleted_count = reader.read_u64()?;
let last_modified = reader.read_u64()?;
let bitmap_version = reader.read_u64()?;
let _bitmap_size = reader.read_varint()? as usize;
let bitmap_bytes = reader.read_bytes()?;
let bitvec = bit_vec::BitVec::from_bytes(&bitmap_bytes);
let mut deleted_docs = ahash::AHashSet::new();
let mut min_doc_id = u64::MAX;
let mut max_doc_id = 0;
for (idx, bit) in bitvec.iter().enumerate() {
if bit {
let doc_id = idx as u64;
deleted_docs.insert(doc_id);
min_doc_id = min_doc_id.min(doc_id);
max_doc_id = max_doc_id.max(doc_id);
}
}
if total_docs > 0 && deleted_docs.is_empty() {
min_doc_id = 0;
max_doc_id = total_docs - 1;
} else if total_docs == 0 {
min_doc_id = 0;
max_doc_id = 0;
}
Ok(DeletionBitmap {
segment_id,
deleted_docs: RwLock::new(deleted_docs),
total_docs: AtomicU64::new(total_docs),
min_doc_id,
max_doc_id,
deleted_count: AtomicU64::new(deleted_count),
last_modified: AtomicU64::new(last_modified),
version: AtomicU64::new(bitmap_version),
})
} else if version == 2 {
let segment_id = reader.read_string()?;
let total_docs = reader.read_u64()?;
let deleted_count = reader.read_u64()?;
let last_modified = reader.read_u64()?;
let bitmap_version = reader.read_u64()?;
let deleted_id_count = reader.read_varint()? as usize;
let mut deleted_docs = ahash::AHashSet::with_capacity(deleted_id_count);
let mut min_doc_id = u64::MAX;
let mut max_doc_id = 0;
for _ in 0..deleted_id_count {
let doc_id = reader.read_u64()?;
deleted_docs.insert(doc_id);
min_doc_id = min_doc_id.min(doc_id);
max_doc_id = max_doc_id.max(doc_id);
}
if total_docs > 0 && deleted_docs.is_empty() {
min_doc_id = 0;
max_doc_id = total_docs - 1;
} else if total_docs == 0 {
min_doc_id = 0;
max_doc_id = 0;
}
Ok(DeletionBitmap {
segment_id,
deleted_docs: RwLock::new(deleted_docs),
total_docs: AtomicU64::new(total_docs),
min_doc_id,
max_doc_id,
deleted_count: AtomicU64::new(deleted_count),
last_modified: AtomicU64::new(last_modified),
version: AtomicU64::new(bitmap_version),
})
} else if version == 3 {
let segment_id = reader.read_string()?;
let total_docs = reader.read_u64()?;
let deleted_count = reader.read_u64()?;
let last_modified = reader.read_u64()?;
let bitmap_version = reader.read_u64()?;
let min_doc_id = reader.read_u64()?;
let max_doc_id = reader.read_u64()?;
let deleted_id_count = reader.read_varint()? as usize;
let mut deleted_docs = ahash::AHashSet::with_capacity(deleted_id_count);
for _ in 0..deleted_id_count {
deleted_docs.insert(reader.read_u64()?);
}
Ok(DeletionBitmap {
segment_id,
deleted_docs: RwLock::new(deleted_docs),
total_docs: AtomicU64::new(total_docs),
min_doc_id,
max_doc_id,
deleted_count: AtomicU64::new(deleted_count),
last_modified: AtomicU64::new(last_modified),
version: AtomicU64::new(bitmap_version),
})
} else {
Err(LaurusError::index(format!(
"Unsupported bitmap version: {version}"
)))
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeletionLogEntry {
pub timestamp: u64,
pub segment_id: String,
pub doc_id: u64,
pub reason: String,
pub sequence: u64,
}
#[derive(Debug)]
pub struct DeletionLog {
storage: Arc<dyn Storage>,
sequence: std::sync::atomic::AtomicU64,
log_path: String,
}
impl DeletionLog {
pub fn new(storage: Arc<dyn Storage>, log_path: String) -> Result<Self> {
let log = DeletionLog {
storage,
sequence: std::sync::atomic::AtomicU64::new(0),
log_path,
};
log.load_sequence()?;
Ok(log)
}
pub fn log_deletion(&self, segment_id: &str, doc_id: u64, reason: &str) -> Result<()> {
let entry = DeletionLogEntry {
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
segment_id: segment_id.to_string(),
doc_id,
reason: reason.to_string(),
sequence: self
.sequence
.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
};
let output = self.storage.create_output_append(&self.log_path)?;
let mut writer = StructWriter::new(output);
let json = serde_json::to_string(&entry)?;
writer.write_string(&json)?;
writer.write_u8(b'\n')?; writer.close()?;
Ok(())
}
fn load_sequence(&self) -> Result<()> {
if let Ok(input) = self.storage.open_input(&self.log_path) {
let mut reader = StructReader::new(input)?;
let mut max_sequence = 0;
while !reader.is_eof() {
if let Ok(json) = reader.read_string() {
if let Ok(entry) = serde_json::from_str::<DeletionLogEntry>(&json) {
max_sequence = max_sequence.max(entry.sequence);
}
if reader.read_u8().is_err() {
break;
}
} else {
break;
}
}
self.sequence
.store(max_sequence + 1, std::sync::atomic::Ordering::SeqCst);
}
Ok(())
}
}
#[derive(Debug, Clone, Default)]
pub struct DeletionStats {
pub segments_tracked: usize,
pub total_docs: u64,
pub total_deleted: u64,
pub overall_deletion_ratio: f64,
pub segments_needing_compaction: usize,
pub bitmap_memory_usage: usize,
pub deletion_operations: u64,
pub compaction_operations: u64,
}
#[derive(Debug, Clone)]
pub struct GlobalDeletionState {
pub total_documents: u64,
pub total_deleted: u64,
pub global_deletion_ratio: f64,
pub compaction_candidates: Vec<String>,
pub last_compaction: u64,
pub reclaimable_space: u64,
}
impl Default for GlobalDeletionState {
fn default() -> Self {
Self::new()
}
}
impl GlobalDeletionState {
pub fn new() -> Self {
GlobalDeletionState {
total_documents: 0,
total_deleted: 0,
global_deletion_ratio: 0.0,
compaction_candidates: Vec::new(),
last_compaction: 0,
reclaimable_space: 0,
}
}
pub fn needs_global_compaction(&self, threshold: f64) -> bool {
self.global_deletion_ratio > threshold
}
pub fn efficiency_metrics(&self) -> (f64, u64, usize) {
(
self.global_deletion_ratio,
self.reclaimable_space,
self.compaction_candidates.len(),
)
}
}
#[derive(Debug)]
pub struct DeletionManager {
config: DeletionConfig,
storage: Arc<dyn Storage>,
bitmaps: RwLock<AHashMap<String, Arc<DeletionBitmap>>>,
deletion_log: Option<DeletionLog>,
stats: RwLock<DeletionStats>,
global_state: RwLock<GlobalDeletionState>,
}
impl DeletionManager {
pub fn new(config: DeletionConfig, storage: Arc<dyn Storage>) -> Result<Self> {
let deletion_log = if config.enable_deletion_log {
Some(DeletionLog::new(
storage.clone(),
"deletions.log".to_string(),
)?)
} else {
None
};
let manager = DeletionManager {
config,
storage,
bitmaps: RwLock::new(AHashMap::new()),
deletion_log,
stats: RwLock::new(DeletionStats::default()),
global_state: RwLock::new(GlobalDeletionState::new()),
};
manager.load_bitmaps()?;
manager.update_global_state()?;
Ok(manager)
}
pub fn get_bitmap(&self, segment_id: &str) -> Option<Arc<DeletionBitmap>> {
self.bitmaps.read().unwrap().get(segment_id).cloned()
}
pub fn initialize_segment(
&self,
segment_id: &str,
min_doc_id: u64,
max_doc_id: u64,
) -> Result<()> {
let bitmaps = self.bitmaps.read().unwrap();
if bitmaps.contains_key(segment_id) {
return Ok(());
}
drop(bitmaps);
let bitmap = Arc::new(DeletionBitmap::new(
segment_id.to_string(),
min_doc_id,
max_doc_id,
));
{
let mut bitmaps = self.bitmaps.write().unwrap();
bitmaps.insert(segment_id.to_string(), bitmap);
}
self.save_bitmap(segment_id)?;
self.update_stats();
let _ = self.update_global_state();
Ok(())
}
pub fn resize_segment(&self, segment_id: &str, new_size: u64) -> Result<()> {
let bitmaps = self.bitmaps.read().unwrap();
if let Some(bitmap) = bitmaps.get(segment_id) {
bitmap.resize(new_size);
self.save_bitmap(segment_id)?;
}
Ok(())
}
pub fn delete_document(&self, segment_id: &str, doc_id: u64, reason: &str) -> Result<bool> {
let was_deleted = {
let bitmaps = self.bitmaps.read().unwrap();
if let Some(bitmap) = bitmaps.get(segment_id) {
bitmap.delete_document(doc_id)?
} else {
return Err(LaurusError::index(format!(
"Segment {segment_id} not found in deletion manager"
)));
}
};
if let Some(ref log) = self.deletion_log {
log.log_deletion(segment_id, doc_id, reason)?;
}
if was_deleted {
self.save_bitmap(segment_id)?;
self.update_stats();
let _ = self.update_global_state();
}
Ok(was_deleted)
}
pub fn delete_documents(&self, segment_id: &str, doc_ids: &[u64], reason: &str) -> Result<u64> {
let mut deleted_count = 0;
for chunk in doc_ids.chunks(self.config.deletion_batch_size) {
{
let bitmaps = self.bitmaps.read().unwrap();
if let Some(bitmap) = bitmaps.get(segment_id) {
for &doc_id in chunk {
if bitmap.delete_document(doc_id)? {
deleted_count += 1;
}
}
} else {
return Err(LaurusError::index(format!(
"Segment {segment_id} not found in deletion manager"
)));
}
}
if let Some(ref log) = self.deletion_log {
for &doc_id in chunk {
log.log_deletion(segment_id, doc_id, reason)?;
}
}
}
if deleted_count > 0 {
self.save_bitmap(segment_id)?;
self.update_stats();
let _ = self.update_global_state();
}
Ok(deleted_count)
}
pub fn is_deleted(&self, segment_id: &str, doc_id: u64) -> bool {
let bitmaps = self.bitmaps.read().unwrap();
if let Some(bitmap) = bitmaps.get(segment_id) {
bitmap.is_deleted(doc_id)
} else {
false
}
}
pub fn get_deletion_ratio(&self, segment_id: &str) -> f64 {
let bitmaps = self.bitmaps.read().unwrap();
if let Some(bitmap) = bitmaps.get(segment_id) {
bitmap.deletion_ratio()
} else {
0.0
}
}
pub fn get_compaction_candidates(&self) -> Vec<String> {
let bitmaps = self.bitmaps.read().unwrap();
bitmaps
.values()
.filter(|bitmap| bitmap.needs_compaction(self.config.compaction_threshold))
.map(|bitmap| bitmap.segment_id.clone())
.collect()
}
pub fn get_deleted_docs(&self, segment_id: &str) -> Vec<u64> {
let bitmaps = self.bitmaps.read().unwrap();
if let Some(bitmap) = bitmaps.get(segment_id) {
bitmap.get_deleted_docs()
} else {
Vec::new()
}
}
pub fn remove_segment(&self, segment_id: &str) -> Result<()> {
{
let mut bitmaps = self.bitmaps.write().unwrap();
bitmaps.remove(segment_id);
}
let bitmap_file = format!("{segment_id}.delmap");
let _ = self.storage.delete_file(&bitmap_file);
self.update_stats();
let _ = self.update_global_state();
Ok(())
}
fn save_bitmap(&self, segment_id: &str) -> Result<()> {
let bitmaps = self.bitmaps.read().unwrap();
if let Some(bitmap) = bitmaps.get(segment_id) {
let bitmap_file = format!("{segment_id}.delmap");
let output = self.storage.create_output(&bitmap_file)?;
let mut writer = StructWriter::new(output);
bitmap.write_to_storage(&mut writer)?;
writer.close()?;
}
Ok(())
}
fn load_bitmaps(&self) -> Result<()> {
let files = self.storage.list_files()?;
for file in files {
if file.ends_with(".delmap") {
let input = self.storage.open_input(&file)?;
let mut reader = StructReader::new(input)?;
if let Ok(bitmap) = DeletionBitmap::read_from_storage(&mut reader) {
let mut bitmaps = self.bitmaps.write().unwrap();
bitmaps.insert(bitmap.segment_id.clone(), Arc::new(bitmap));
}
}
}
self.update_stats();
let _ = self.update_global_state();
Ok(())
}
fn update_stats(&self) {
let bitmaps = self.bitmaps.read().unwrap();
let mut stats = self.stats.write().unwrap();
stats.segments_tracked = bitmaps.len();
stats.total_docs = bitmaps
.values()
.map(|b| b.total_docs.load(Ordering::SeqCst))
.sum();
stats.total_deleted = bitmaps
.values()
.map(|b| b.deleted_count.load(Ordering::SeqCst))
.sum();
if stats.total_docs > 0 {
stats.overall_deletion_ratio = stats.total_deleted as f64 / stats.total_docs as f64;
}
stats.segments_needing_compaction = bitmaps
.values()
.filter(|b| b.needs_compaction(self.config.compaction_threshold))
.count();
stats.bitmap_memory_usage = bitmaps.values().map(|b| b.memory_usage()).sum();
}
pub fn get_stats(&self) -> DeletionStats {
self.stats.read().unwrap().clone()
}
pub fn get_config(&self) -> &DeletionConfig {
&self.config
}
pub fn get_global_state(&self) -> GlobalDeletionState {
self.global_state.read().unwrap().clone()
}
pub fn restore_global_state(&self, state: GlobalDeletionState) -> Result<()> {
let mut global_state = self.global_state.write().unwrap();
*global_state = state;
Ok(())
}
pub fn update_global_state(&self) -> Result<()> {
let bitmaps = self.bitmaps.read().unwrap();
let mut global_state = self.global_state.write().unwrap();
global_state.total_documents = bitmaps
.values()
.map(|b| b.total_docs.load(Ordering::SeqCst))
.sum();
global_state.total_deleted = bitmaps
.values()
.map(|b| b.deleted_count.load(Ordering::SeqCst))
.sum();
if global_state.total_documents > 0 {
global_state.global_deletion_ratio =
global_state.total_deleted as f64 / global_state.total_documents as f64;
} else {
global_state.global_deletion_ratio = 0.0;
}
global_state.compaction_candidates = bitmaps
.values()
.filter(|b| b.needs_compaction(self.config.compaction_threshold))
.map(|b| b.segment_id.clone())
.collect();
global_state.reclaimable_space = bitmaps
.values()
.map(|b| {
if b.needs_compaction(self.config.compaction_threshold) {
(b.deletion_ratio() * b.total_docs.load(Ordering::SeqCst) as f64 * 100.0) as u64 } else {
0
}
})
.sum();
Ok(())
}
pub fn should_trigger_auto_compaction(&self) -> bool {
if !self.config.auto_compaction {
return false;
}
let global_state = self.global_state.read().unwrap();
let current_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let time_threshold = global_state.last_compaction + self.config.compaction_interval_secs;
let time_to_compact = current_time >= time_threshold;
let ratio_threshold =
global_state.needs_global_compaction(self.config.compaction_threshold);
let has_candidates = !global_state.compaction_candidates.is_empty();
time_to_compact && ratio_threshold && has_candidates
}
pub fn mark_compaction_completed(&self, segments_compacted: &[String]) -> Result<()> {
for segment_id in segments_compacted {
self.remove_segment(segment_id)?;
}
let mut global_state = self.global_state.write().unwrap();
global_state.last_compaction = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let mut stats = self.stats.write().unwrap();
stats.compaction_operations += 1;
drop(global_state);
self.update_global_state()?;
Ok(())
}
pub fn get_deletion_report(&self) -> DeletionReport {
let stats = self.stats.read().unwrap();
let global_state = self.global_state.read().unwrap();
let bitmaps = self.bitmaps.read().unwrap();
let segment_reports: Vec<SegmentDeletionReport> = bitmaps
.values()
.map(|bitmap| SegmentDeletionReport {
segment_id: bitmap.segment_id.clone(),
total_docs: bitmap.total_docs.load(Ordering::SeqCst),
deleted_docs: bitmap.deleted_count.load(Ordering::SeqCst),
deletion_ratio: bitmap.deletion_ratio(),
needs_compaction: bitmap.needs_compaction(self.config.compaction_threshold),
memory_usage: bitmap.memory_usage(),
last_modified: bitmap.last_modified.load(Ordering::SeqCst),
})
.collect();
DeletionReport {
global_state: global_state.clone(),
deletion_stats: stats.clone(),
segment_reports,
auto_compaction_enabled: self.config.auto_compaction,
next_compaction_due: global_state.last_compaction
+ self.config.compaction_interval_secs,
}
}
}
#[derive(Debug, Clone)]
pub struct SegmentDeletionReport {
pub segment_id: String,
pub total_docs: u64,
pub deleted_docs: u64,
pub deletion_ratio: f64,
pub needs_compaction: bool,
pub memory_usage: usize,
pub last_modified: u64,
}
#[derive(Debug, Clone)]
pub struct DeletionReport {
pub global_state: GlobalDeletionState,
pub deletion_stats: DeletionStats,
pub segment_reports: Vec<SegmentDeletionReport>,
pub auto_compaction_enabled: bool,
pub next_compaction_due: u64,
}
impl DeletionReport {
pub fn summary(&self) -> (f64, usize, u64, bool) {
(
self.global_state.global_deletion_ratio,
self.global_state.compaction_candidates.len(),
self.global_state.reclaimable_space,
self.global_state.needs_global_compaction(0.3), )
}
pub fn segments_by_urgency(&self) -> (Vec<String>, Vec<String>, Vec<String>) {
let mut urgent = Vec::new();
let mut moderate = Vec::new();
let mut low = Vec::new();
for report in &self.segment_reports {
if report.deletion_ratio > 0.5 {
urgent.push(report.segment_id.clone());
} else if report.deletion_ratio > 0.3 {
moderate.push(report.segment_id.clone());
} else if report.deletion_ratio > 0.1 {
low.push(report.segment_id.clone());
}
}
(urgent, moderate, low)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::memory::MemoryStorage;
use crate::storage::memory::MemoryStorageConfig;
#[test]
fn test_deletion_bitmap_creation() {
let bitmap = DeletionBitmap::new("seg001".to_string(), 0, 999);
assert_eq!(bitmap.segment_id, "seg001");
assert_eq!(bitmap.total_docs.load(Ordering::SeqCst), 1000);
assert_eq!(bitmap.deleted_count.load(Ordering::SeqCst), 0);
assert_eq!(bitmap.deletion_ratio(), 0.0);
assert_eq!(bitmap.live_count(), 1000);
}
#[test]
fn test_deletion_bitmap_operations() {
let bitmap = DeletionBitmap::new("seg001".to_string(), 0, 99);
assert!(bitmap.delete_document(5).unwrap());
assert!(bitmap.delete_document(10).unwrap());
assert!(bitmap.delete_document(15).unwrap());
assert!(bitmap.is_deleted(5));
assert!(bitmap.is_deleted(10));
assert!(bitmap.is_deleted(15));
assert!(!bitmap.is_deleted(20));
assert_eq!(bitmap.deleted_count.load(Ordering::SeqCst), 3);
assert_eq!(bitmap.live_count(), 97);
assert_eq!(bitmap.deletion_ratio(), 0.03);
assert!(!bitmap.delete_document(5).unwrap());
assert_eq!(bitmap.deleted_count.load(Ordering::SeqCst), 3);
}
#[test]
fn test_deletion_bitmap_out_of_range() {
let bitmap = DeletionBitmap::new("seg001".to_string(), 0, 99);
let result = bitmap.delete_document(150);
assert!(result.is_err());
assert!(!bitmap.is_deleted(150));
}
#[test]
fn test_deletion_manager_creation() {
let config = DeletionConfig::default();
let storage = Arc::new(MemoryStorage::new(MemoryStorageConfig::default()));
let manager = DeletionManager::new(config, storage).unwrap();
let stats = manager.get_stats();
assert_eq!(stats.segments_tracked, 0);
assert_eq!(stats.total_docs, 0);
}
#[test]
fn test_deletion_manager_operations() {
let config = DeletionConfig::default();
let storage = Arc::new(MemoryStorage::new(MemoryStorageConfig::default()));
let manager = DeletionManager::new(config, storage).unwrap();
manager.initialize_segment("seg001", 0, 999).unwrap();
assert!(
manager
.delete_document("seg001", 100, "test deletion")
.unwrap()
);
assert!(
manager
.delete_document("seg001", 200, "test deletion")
.unwrap()
);
assert!(manager.is_deleted("seg001", 100));
assert!(manager.is_deleted("seg001", 200));
assert!(!manager.is_deleted("seg001", 300));
let ratio = manager.get_deletion_ratio("seg001");
assert_eq!(ratio, 0.002);
let stats = manager.get_stats();
assert_eq!(stats.segments_tracked, 1);
assert_eq!(stats.total_deleted, 2);
}
#[test]
fn test_batch_deletion() {
let config = DeletionConfig::default();
let storage = Arc::new(MemoryStorage::new(MemoryStorageConfig::default()));
let manager = DeletionManager::new(config, storage).unwrap();
manager.initialize_segment("seg001", 0, 19).unwrap();
let doc_ids = vec![1, 2, 3, 4, 5]; let deleted_count = manager
.delete_documents("seg001", &doc_ids, "batch deletion")
.unwrap();
assert_eq!(deleted_count, 5);
for &doc_id in &doc_ids {
assert!(manager.is_deleted("seg001", doc_id));
}
let ratio = manager.get_deletion_ratio("seg001");
assert_eq!(ratio, 0.25); }
#[test]
fn test_compaction_candidates() {
let config = DeletionConfig {
compaction_threshold: 0.1, ..Default::default()
};
let storage = Arc::new(MemoryStorage::new(MemoryStorageConfig::default()));
let manager = DeletionManager::new(config, storage).unwrap();
manager.initialize_segment("seg001", 0, 9).unwrap(); manager.initialize_segment("seg002", 0, 9).unwrap();
let doc_ids: Vec<u64> = vec![0, 1]; manager
.delete_documents("seg001", &doc_ids, "test")
.unwrap();
manager.delete_documents("seg002", &[0], "test").unwrap();
let candidates = manager.get_compaction_candidates();
assert_eq!(candidates.len(), 1); assert!(candidates.contains(&"seg001".to_string()));
}
#[test]
fn test_global_deletion_state() {
let config = DeletionConfig::default();
let storage = Arc::new(MemoryStorage::new(MemoryStorageConfig::default()));
let manager = DeletionManager::new(config, storage).unwrap();
manager.initialize_segment("seg001", 0, 9).unwrap(); manager.initialize_segment("seg002", 0, 19).unwrap();
let doc_ids1: Vec<u64> = (0..4).collect(); manager
.delete_documents("seg001", &doc_ids1, "test")
.unwrap();
let doc_ids2: Vec<u64> = (0..2).collect(); manager
.delete_documents("seg002", &doc_ids2, "test")
.unwrap();
let global_state = manager.get_global_state();
assert_eq!(global_state.total_documents, 30); assert_eq!(global_state.total_deleted, 6); assert!((global_state.global_deletion_ratio - 0.2).abs() < 0.001); assert!(!global_state.compaction_candidates.is_empty());
assert!(
global_state
.compaction_candidates
.contains(&"seg001".to_string())
);
}
#[test]
fn test_deletion_report() {
let config = DeletionConfig::default();
let storage = Arc::new(MemoryStorage::new(MemoryStorageConfig::default()));
let manager = DeletionManager::new(config, storage).unwrap();
manager.initialize_segment("seg001", 0, 9).unwrap(); manager.initialize_segment("seg002", 0, 19).unwrap();
let doc_ids: Vec<u64> = (0..4).collect(); manager
.delete_documents("seg001", &doc_ids, "test")
.unwrap();
let report = manager.get_deletion_report();
assert_eq!(report.segment_reports.len(), 2);
assert_eq!(report.global_state.total_documents, 30); assert_eq!(report.global_state.total_deleted, 4);
assert!(report.auto_compaction_enabled);
let (urgent, moderate, low) = report.segments_by_urgency();
assert_eq!(urgent.len(), 0); assert_eq!(moderate.len(), 1); assert_eq!(low.len(), 0);
let (ratio, candidates, _space, needs_compaction) = report.summary();
assert!((ratio - 0.133333).abs() < 0.001); assert_eq!(candidates, 1);
assert!(!needs_compaction); }
#[test]
fn test_auto_compaction_trigger() {
let config = DeletionConfig {
auto_compaction: true,
compaction_threshold: 0.2, compaction_interval_secs: 1, ..Default::default()
};
let storage = Arc::new(MemoryStorage::new(MemoryStorageConfig::default()));
let manager = DeletionManager::new(config, storage).unwrap();
manager.initialize_segment("seg001", 0, 99).unwrap();
let doc_ids: Vec<u64> = (0..25).collect(); manager
.delete_documents("seg001", &doc_ids, "test")
.unwrap();
std::thread::sleep(std::time::Duration::from_secs(2));
assert!(manager.should_trigger_auto_compaction());
}
#[test]
fn test_mark_compaction_completed() {
let config = DeletionConfig::default();
let storage = Arc::new(MemoryStorage::new(MemoryStorageConfig::default()));
let manager = DeletionManager::new(config, storage).unwrap();
manager.initialize_segment("seg001", 0, 999).unwrap();
manager.initialize_segment("seg002", 0, 1999).unwrap();
let compacted_segments = vec!["seg001".to_string()];
manager
.mark_compaction_completed(&compacted_segments)
.unwrap();
let global_state = manager.get_global_state();
assert_eq!(global_state.total_documents, 2000);
let stats = manager.get_stats();
assert_eq!(stats.compaction_operations, 1);
}
}