use std::collections::HashMap;
use super::arena_manager::{ArenaManager, ArenaSlot};
use super::block_storage::BlockStorage;
use super::disk_manager::MmapDiskManager;
use super::PersistentARTrieError;
type Result<T> = std::result::Result<T, PersistentARTrieError>;
#[inline]
fn compute_hash(data: &[u8]) -> u64 {
xxhash_rust::xxh3::xxh3_64(data)
}
#[derive(Debug)]
pub struct NodeDeduplicator {
cache: HashMap<u64, ArenaSlot>,
hits: u64,
misses: u64,
collisions: u64,
}
impl NodeDeduplicator {
pub fn new() -> Self {
Self {
cache: HashMap::new(),
hits: 0,
misses: 0,
collisions: 0,
}
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
cache: HashMap::with_capacity(capacity),
hits: 0,
misses: 0,
collisions: 0,
}
}
#[inline]
fn hash(&self, data: &[u8]) -> u64 {
compute_hash(data)
}
pub fn lookup(&self, data: &[u8]) -> Option<ArenaSlot> {
let hash = self.hash(data);
self.cache.get(&hash).copied()
}
pub fn insert(&mut self, data: &[u8], slot: ArenaSlot) {
let hash = self.hash(data);
self.cache.insert(hash, slot);
}
pub fn record_hit(&mut self) {
self.hits += 1;
}
pub fn record_miss(&mut self) {
self.misses += 1;
}
pub fn record_collision(&mut self) {
self.collisions += 1;
}
pub fn stats(&self) -> DeduplicatorStats {
DeduplicatorStats {
cache_size: self.cache.len(),
hits: self.hits,
misses: self.misses,
collisions: self.collisions,
}
}
pub fn clear(&mut self) {
self.cache.clear();
self.hits = 0;
self.misses = 0;
self.collisions = 0;
}
pub fn capacity(&self) -> usize {
self.cache.capacity()
}
pub fn len(&self) -> usize {
self.cache.len()
}
pub fn is_empty(&self) -> bool {
self.cache.is_empty()
}
}
impl Default for NodeDeduplicator {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct DeduplicatorStats {
pub cache_size: usize,
pub hits: u64,
pub misses: u64,
pub collisions: u64,
}
impl DeduplicatorStats {
pub fn hit_rate(&self) -> f64 {
let total = self.hits + self.misses;
if total == 0 {
0.0
} else {
self.hits as f64 / total as f64
}
}
pub fn space_savings(&self, avg_node_size: usize) -> usize {
self.hits as usize * avg_node_size
}
}
#[derive(Debug)]
pub struct DeduplicatingArenaManager<S: BlockStorage = MmapDiskManager> {
arena_manager: ArenaManager<S>,
dedup: NodeDeduplicator,
verify_on_hit: bool,
}
impl<S: BlockStorage> DeduplicatingArenaManager<S> {
pub fn new(arena_manager: ArenaManager<S>) -> Self {
Self {
arena_manager,
dedup: NodeDeduplicator::new(),
verify_on_hit: true,
}
}
pub fn with_capacity(arena_manager: ArenaManager<S>, dedup_capacity: usize) -> Self {
Self {
arena_manager,
dedup: NodeDeduplicator::with_capacity(dedup_capacity),
verify_on_hit: true,
}
}
pub fn set_verify_on_hit(&mut self, _verify: bool) {
self.verify_on_hit = true;
}
pub fn allocate_dedup(&mut self, data: &[u8]) -> Result<ArenaSlot> {
if let Some(slot) = self.dedup.lookup(data) {
if self.verify_on_hit {
let existing = self.arena_manager.read(slot)?;
if existing == data {
self.dedup.record_hit();
return Ok(slot);
} else {
self.dedup.record_collision();
}
} else {
self.dedup.record_hit();
return Ok(slot);
}
}
self.dedup.record_miss();
let slot = self.arena_manager.allocate(data)?;
self.dedup.insert(data, slot);
Ok(slot)
}
pub fn allocate_direct(&mut self, data: &[u8]) -> Result<ArenaSlot> {
self.arena_manager.allocate(data)
}
pub fn read(&self, slot: ArenaSlot) -> Result<&[u8]> {
self.arena_manager.read(slot)
}
pub fn arena_manager(&self) -> &ArenaManager<S> {
&self.arena_manager
}
pub fn arena_manager_mut(&mut self) -> &mut ArenaManager<S> {
&mut self.arena_manager
}
pub fn dedup_stats(&self) -> DeduplicatorStats {
self.dedup.stats()
}
pub fn clear_dedup_cache(&mut self) {
self.dedup.clear();
}
}
#[derive(Debug)]
pub struct BatchDeduplicator {
local: NodeDeduplicator,
batch_threshold: usize,
}
impl BatchDeduplicator {
pub fn new(batch_threshold: usize) -> Self {
Self {
local: NodeDeduplicator::new(),
batch_threshold,
}
}
pub fn lookup(&self, data: &[u8]) -> Option<ArenaSlot> {
self.local.lookup(data)
}
pub fn insert(&mut self, data: &[u8], slot: ArenaSlot) {
self.local.insert(data, slot);
}
pub fn should_merge(&self) -> bool {
self.local.len() >= self.batch_threshold
}
pub fn take(&mut self) -> NodeDeduplicator {
std::mem::take(&mut self.local)
}
pub fn len(&self) -> usize {
self.local.len()
}
pub fn is_empty(&self) -> bool {
self.local.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_deduplicator_creation() {
let dedup = NodeDeduplicator::new();
assert_eq!(dedup.len(), 0);
assert!(dedup.is_empty());
}
#[test]
fn test_hash_consistency() {
let data = b"hello world";
let hash1 = compute_hash(data);
let hash2 = compute_hash(data);
assert_eq!(hash1, hash2);
}
#[test]
fn test_hash_different_data() {
let data1 = b"hello";
let data2 = b"world";
let hash1 = compute_hash(data1);
let hash2 = compute_hash(data2);
assert_ne!(hash1, hash2);
}
#[test]
fn test_deduplicator_lookup_insert() {
let mut dedup = NodeDeduplicator::new();
let data = b"test data";
let slot = ArenaSlot::new(1, 42);
assert!(dedup.lookup(data).is_none());
dedup.insert(data, slot);
assert_eq!(dedup.lookup(data), Some(slot));
}
#[test]
fn test_deduplicator_stats() {
let mut dedup = NodeDeduplicator::new();
dedup.record_hit();
dedup.record_hit();
dedup.record_miss();
let stats = dedup.stats();
assert_eq!(stats.hits, 2);
assert_eq!(stats.misses, 1);
assert!((stats.hit_rate() - 0.666).abs() < 0.01);
}
#[test]
fn test_deduplicator_clear() {
let mut dedup = NodeDeduplicator::new();
let data = b"test";
dedup.insert(data, ArenaSlot::new(0, 0));
dedup.record_hit();
assert!(!dedup.is_empty());
dedup.clear();
assert!(dedup.is_empty());
assert!(dedup.lookup(data).is_none());
assert_eq!(dedup.stats().hits, 0);
}
#[test]
fn test_batch_deduplicator() {
let mut batch = BatchDeduplicator::new(10);
for i in 0..15 {
let data = format!("data{}", i);
batch.insert(data.as_bytes(), ArenaSlot::new(0, i));
}
assert!(batch.should_merge());
assert_eq!(batch.len(), 15);
let taken = batch.take();
assert_eq!(taken.len(), 15);
assert!(batch.is_empty());
}
#[test]
fn test_space_savings_calculation() {
let stats = DeduplicatorStats {
cache_size: 100,
hits: 50,
misses: 100,
collisions: 2,
};
assert_eq!(stats.space_savings(200), 10000);
}
#[test]
fn test_deduplicator_stats_zero_accesses() {
let stats = DeduplicatorStats {
cache_size: 0,
hits: 0,
misses: 0,
collisions: 0,
};
assert_eq!(
stats.hit_rate(),
0.0,
"Hit rate should be 0.0 when no accesses"
);
}
#[test]
fn test_dedup_verify_on_hit_false() {
use super::ArenaManager;
let arena = ArenaManager::<MmapDiskManager>::with_arena_size(64 * 1024);
let mut dedup = DeduplicatingArenaManager::new(arena);
dedup.set_verify_on_hit(false);
let data = b"test data for dedup";
let slot1 = dedup.allocate_dedup(data).expect("first alloc");
let slot2 = dedup.allocate_dedup(data).expect("second alloc");
assert_eq!(slot1, slot2, "Same data should return same slot");
let stats = dedup.dedup_stats();
assert_eq!(stats.hits, 1, "Should have one cache hit");
assert_eq!(stats.misses, 1, "Should have one initial miss");
}
#[test]
fn test_dedup_verify_on_hit_true() {
use super::ArenaManager;
let arena = ArenaManager::<MmapDiskManager>::with_arena_size(64 * 1024);
let mut dedup = DeduplicatingArenaManager::new(arena);
let data = b"test data for verification";
let slot1 = dedup.allocate_dedup(data).expect("first alloc");
let slot2 = dedup.allocate_dedup(data).expect("second alloc");
assert_eq!(slot1, slot2, "Same data should return same slot");
let stats = dedup.dedup_stats();
assert_eq!(stats.hits, 1, "Should have one cache hit");
assert_eq!(stats.collisions, 0, "Should have no collisions");
}
#[test]
fn test_dedup_multiple_allocations() {
use super::ArenaManager;
let arena = ArenaManager::<MmapDiskManager>::with_arena_size(64 * 1024);
let mut dedup = DeduplicatingArenaManager::new(arena);
let data1 = b"data one";
let data2 = b"data two";
let data3 = b"data one";
let slot1 = dedup.allocate_dedup(data1).expect("alloc data1");
let slot2 = dedup.allocate_dedup(data2).expect("alloc data2");
let slot3 = dedup.allocate_dedup(data3).expect("alloc data3");
assert_ne!(slot1, slot2, "Different data should have different slots");
assert_eq!(slot1, slot3, "Same data should return same slot");
let stats = dedup.dedup_stats();
assert_eq!(stats.hits, 1, "Should have one cache hit");
assert_eq!(stats.misses, 2, "Should have two initial misses");
}
#[test]
fn test_dedup_allocate_direct_bypasses_cache() {
use super::ArenaManager;
let arena = ArenaManager::<MmapDiskManager>::with_arena_size(64 * 1024);
let mut dedup = DeduplicatingArenaManager::new(arena);
let data = b"bypass data";
let slot1 = dedup.allocate_direct(data).expect("direct alloc 1");
let slot2 = dedup.allocate_direct(data).expect("direct alloc 2");
assert_ne!(slot1, slot2, "Direct allocation should not deduplicate");
let stats = dedup.dedup_stats();
assert_eq!(stats.hits, 0, "Should have no hits with direct alloc");
assert_eq!(stats.misses, 0, "Should have no misses with direct alloc");
}
}