use std::alloc::{GlobalAlloc, Layout, System};
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tempfile::TempDir;
use tokio::fs;
use cqlite_core::platform::Platform;
use cqlite_core::storage::sstable::index_reader::{IndexReader, PartitionIndexEntry};
use cqlite_core::Config;
struct TrackingAllocator {
allocations: AtomicUsize,
deallocations: AtomicUsize,
peak_memory: AtomicUsize,
current_memory: AtomicUsize,
}
impl TrackingAllocator {
const fn new() -> Self {
Self {
allocations: AtomicUsize::new(0),
deallocations: AtomicUsize::new(0),
peak_memory: AtomicUsize::new(0),
current_memory: AtomicUsize::new(0),
}
}
fn reset(&self) {
self.allocations.store(0, Ordering::SeqCst);
self.deallocations.store(0, Ordering::SeqCst);
self.peak_memory.store(0, Ordering::SeqCst);
self.current_memory.store(0, Ordering::SeqCst);
}
fn get_current_memory(&self) -> usize {
self.current_memory.load(Ordering::SeqCst)
}
fn get_allocation_count(&self) -> usize {
self.allocations.load(Ordering::SeqCst)
}
}
unsafe impl GlobalAlloc for TrackingAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
unsafe {
let ptr = System.alloc(layout);
if !ptr.is_null() {
self.allocations.fetch_add(1, Ordering::SeqCst);
let new_current = self
.current_memory
.fetch_add(layout.size(), Ordering::SeqCst)
+ layout.size();
let mut peak = self.peak_memory.load(Ordering::SeqCst);
while new_current > peak {
match self.peak_memory.compare_exchange_weak(
peak,
new_current,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => break,
Err(x) => peak = x,
}
}
}
ptr
}
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
unsafe {
System.dealloc(ptr, layout);
self.deallocations.fetch_add(1, Ordering::SeqCst);
self.current_memory
.fetch_sub(layout.size(), Ordering::SeqCst);
}
}
}
#[global_allocator]
static TRACKING_ALLOCATOR: TrackingAllocator = TrackingAllocator::new();
#[test]
#[ignore = "Memory benchmark - not a functional test"]
fn test_arc_lookup_table_memory_efficiency() {
TRACKING_ALLOCATOR.reset();
let mut partition_entries = Vec::new();
for i in 0..1000 {
let mut key_digest = vec![0u8; 16];
key_digest[0] = (i % 256) as u8;
key_digest[1] = ((i / 256) % 256) as u8;
partition_entries.push(PartitionIndexEntry {
key_digest: Arc::from(key_digest.into_boxed_slice()),
raw_key: None, data_offset: i as u64 * 4096,
data_size: 4096,
promoted_index: None,
});
}
let initial_memory = TRACKING_ALLOCATOR.get_current_memory();
let initial_allocations = TRACKING_ALLOCATOR.get_allocation_count();
let mut key_lookup: HashMap<Arc<[u8]>, usize> = HashMap::new();
for (index, entry) in partition_entries.iter().enumerate() {
key_lookup.insert(Arc::clone(&entry.key_digest), index);
}
let final_memory = TRACKING_ALLOCATOR.get_current_memory();
let final_allocations = TRACKING_ALLOCATOR.get_allocation_count();
assert_eq!(key_lookup.len(), 1000);
for (expected_index, entry) in partition_entries.iter().enumerate() {
let lookup_result = key_lookup.get(&entry.key_digest);
assert_eq!(lookup_result, Some(&expected_index));
}
println!(
"Memory used for Arc-based lookup table: {} bytes",
final_memory - initial_memory
);
println!("Allocations: {}", final_allocations - initial_allocations);
let actual_memory_increase = final_memory - initial_memory;
assert!(actual_memory_increase < 8000); }
#[test]
#[ignore = "Memory benchmark - not a functional test"]
fn test_memory_comparison_vec_vs_arc() {
TRACKING_ALLOCATOR.reset();
let initial_memory_vec = TRACKING_ALLOCATOR.get_current_memory();
let mut vec_entries = Vec::new();
let mut vec_lookup: HashMap<Vec<u8>, usize> = HashMap::new();
for i in 0..1000 {
let mut key_digest = vec![0u8; 16];
key_digest[0] = (i % 256) as u8;
key_digest[1] = ((i / 256) % 256) as u8;
vec_entries.push(key_digest.clone());
vec_lookup.insert(key_digest, i); }
let final_memory_vec = TRACKING_ALLOCATOR.get_current_memory();
let vec_memory_usage = final_memory_vec - initial_memory_vec;
TRACKING_ALLOCATOR.reset();
let initial_memory_arc = TRACKING_ALLOCATOR.get_current_memory();
let mut arc_entries = Vec::new();
let mut arc_lookup: HashMap<Arc<[u8]>, usize> = HashMap::new();
for i in 0..1000 {
let mut key_digest = vec![0u8; 16];
key_digest[0] = (i % 256) as u8;
key_digest[1] = ((i / 256) % 256) as u8;
let arc_key = Arc::from(key_digest.into_boxed_slice());
arc_entries.push(Arc::clone(&arc_key));
arc_lookup.insert(arc_key, i); }
let final_memory_arc = TRACKING_ALLOCATOR.get_current_memory();
let arc_memory_usage = final_memory_arc - initial_memory_arc;
println!(
"Vec cloning approach memory usage: {} bytes",
vec_memory_usage
);
println!(
"Arc sharing approach memory usage: {} bytes",
arc_memory_usage
);
println!(
"Memory reduction: {} bytes ({:.1}%)",
vec_memory_usage - arc_memory_usage,
((vec_memory_usage - arc_memory_usage) as f64 / vec_memory_usage as f64) * 100.0
);
assert!(arc_memory_usage < vec_memory_usage);
let memory_savings = vec_memory_usage - arc_memory_usage;
assert!(memory_savings > vec_memory_usage / 3);
}
#[tokio::test]
#[ignore = "Memory benchmark - not a functional test"]
async fn test_large_sstable_memory_usage() {
let temp_dir = TempDir::new().unwrap();
let index_file = temp_dir.path().join("large-Index.db");
create_large_index_file(&index_file, 10000).await;
let config = Config::default();
let platform = Arc::new(Platform::new(&config).await.unwrap());
TRACKING_ALLOCATOR.reset();
let initial_memory = TRACKING_ALLOCATOR.get_current_memory();
match IndexReader::open(&index_file, platform).await {
Ok(index_reader) => {
let final_memory = TRACKING_ALLOCATOR.get_current_memory();
let entries = index_reader.get_partition_entries();
println!("Loaded {} partition entries", entries.len());
println!("Memory usage: {} bytes", final_memory - initial_memory);
let bytes_per_entry = (final_memory - initial_memory) / entries.len().max(1);
println!("Average memory per entry: {} bytes", bytes_per_entry);
assert!(bytes_per_entry < 100);
if entries.len() >= 100 {
for i in (0..entries.len()).step_by(entries.len() / 100) {
let entry = &entries[i];
let lookup_result = index_reader.lookup_partition(&entry.key_digest);
assert!(lookup_result.is_some());
}
}
}
Err(e) => {
println!(
"Index reader test with large file failed (expected with mock data): {}",
e
);
}
}
}
#[test]
#[ignore = "Performance benchmark - not a functional test"]
fn benchmark_arc_vs_vec_performance() {
let mut vec_partition_entries = Vec::new();
for i in 0..10000 {
let mut key_digest = vec![0u8; 16];
key_digest[0] = (i % 256) as u8;
key_digest[1] = ((i / 256) % 256) as u8;
key_digest[2] = ((i / 65536) % 256) as u8;
key_digest[3] = ((i / 16777216) % 256) as u8;
vec_partition_entries.push(key_digest);
}
let mut arc_partition_entries = Vec::new();
for i in 0..10000 {
let mut key_digest = vec![0u8; 16];
key_digest[0] = (i % 256) as u8;
key_digest[1] = ((i / 256) % 256) as u8;
key_digest[2] = ((i / 65536) % 256) as u8;
key_digest[3] = ((i / 16777216) % 256) as u8;
arc_partition_entries.push(PartitionIndexEntry {
key_digest: Arc::from(key_digest.into_boxed_slice()),
raw_key: None, data_offset: i as u64 * 4096,
data_size: 4096,
promoted_index: None,
});
}
let start = std::time::Instant::now();
let mut vec_lookup: HashMap<Vec<u8>, usize> = HashMap::new();
for (index, entry) in vec_partition_entries.iter().enumerate() {
vec_lookup.insert(entry.clone(), index); }
let vec_build_time = start.elapsed();
let start = std::time::Instant::now();
for entry in vec_partition_entries.iter().take(1000) {
let _ = vec_lookup.get(entry);
}
let vec_lookup_time = start.elapsed();
let start = std::time::Instant::now();
let mut arc_lookup: HashMap<Arc<[u8]>, usize> = HashMap::new();
for (index, entry) in arc_partition_entries.iter().enumerate() {
arc_lookup.insert(Arc::clone(&entry.key_digest), index); }
let arc_build_time = start.elapsed();
let start = std::time::Instant::now();
for entry in arc_partition_entries.iter().take(1000) {
let _ = arc_lookup.get(&entry.key_digest);
}
let arc_lookup_time = start.elapsed();
println!("=== PERFORMANCE BENCHMARK ===");
println!("Vec cloning approach - Build time: {:?}", vec_build_time);
println!("Vec cloning approach - Lookup time: {:?}", vec_lookup_time);
println!("Arc sharing approach - Build time: {:?}", arc_build_time);
println!("Arc sharing approach - Lookup time: {:?}", arc_lookup_time);
assert!(arc_build_time <= vec_build_time);
let build_improvement = vec_build_time.as_nanos() as f64 / arc_build_time.as_nanos() as f64;
let lookup_improvement = vec_lookup_time.as_nanos() as f64 / arc_lookup_time.as_nanos() as f64;
println!("Build time improvement: {:.2}x", build_improvement);
println!("Lookup time improvement: {:.2}x", lookup_improvement);
assert!(build_improvement >= 1.0);
}
#[test]
fn test_arc_edge_cases() {
let empty_entries: Vec<PartitionIndexEntry> = Vec::new();
let mut empty_lookup: HashMap<Arc<[u8]>, usize> = HashMap::new();
for (index, entry) in empty_entries.iter().enumerate() {
empty_lookup.insert(Arc::clone(&entry.key_digest), index);
}
assert_eq!(empty_lookup.len(), 0);
let single_entry = [PartitionIndexEntry {
key_digest: Arc::from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]),
raw_key: None, data_offset: 1024,
data_size: 4096,
promoted_index: None,
}];
let mut single_lookup: HashMap<Arc<[u8]>, usize> = HashMap::new();
for (index, entry) in single_entry.iter().enumerate() {
single_lookup.insert(Arc::clone(&entry.key_digest), index);
}
assert_eq!(single_lookup.len(), 1);
assert_eq!(single_lookup.get(&single_entry[0].key_digest), Some(&0));
let duplicate_arc = Arc::from([1u8; 16]);
let duplicate_entries = [
PartitionIndexEntry {
key_digest: Arc::clone(&duplicate_arc),
raw_key: None, data_offset: 1024,
data_size: 4096,
promoted_index: None,
},
PartitionIndexEntry {
key_digest: Arc::clone(&duplicate_arc), raw_key: None, data_offset: 2048,
data_size: 4096,
promoted_index: None,
},
];
let mut duplicate_lookup: HashMap<Arc<[u8]>, usize> = HashMap::new();
for (index, entry) in duplicate_entries.iter().enumerate() {
duplicate_lookup.insert(Arc::clone(&entry.key_digest), index);
}
assert_eq!(duplicate_lookup.len(), 1); assert_eq!(duplicate_lookup.get(&duplicate_arc), Some(&1));
assert!(Arc::ptr_eq(
&duplicate_entries[0].key_digest,
&duplicate_entries[1].key_digest
));
}
#[test]
#[ignore = "Property test - not a functional test"]
fn property_test_arc_lookup_correctness() {
use std::collections::HashSet;
for num_entries in [0, 1, 10, 100, 1000].iter() {
let mut partition_entries = Vec::new();
let mut seen_keys = HashSet::new();
for i in 0..*num_entries {
let mut key_digest = vec![0u8; 16];
let key_value = i as u64;
key_digest[0..8].copy_from_slice(&key_value.to_le_bytes());
key_digest[8..16].copy_from_slice(&(key_value.wrapping_mul(17)).to_le_bytes());
assert!(
seen_keys.insert(key_digest.clone()),
"Failed to generate unique key for index {}",
i
);
partition_entries.push(PartitionIndexEntry {
key_digest: Arc::from(key_digest.into_boxed_slice()),
raw_key: None, data_offset: i as u64 * 4096,
data_size: 4096,
promoted_index: None,
});
}
let mut key_lookup: HashMap<Arc<[u8]>, usize> = HashMap::new();
for (index, entry) in partition_entries.iter().enumerate() {
key_lookup.insert(Arc::clone(&entry.key_digest), index);
}
for (expected_index, entry) in partition_entries.iter().enumerate() {
let lookup_result = key_lookup.get(&entry.key_digest);
assert_eq!(
lookup_result,
Some(&expected_index),
"Failed to find entry {} with key digest {:?}",
expected_index,
entry.key_digest
);
}
assert_eq!(key_lookup.len(), partition_entries.len());
for entry in &partition_entries {
if let Some(&index) = key_lookup.get(&entry.key_digest) {
assert_eq!(
index,
partition_entries
.iter()
.position(|e| Arc::ptr_eq(&e.key_digest, &entry.key_digest))
.unwrap()
);
}
}
println!("✓ Arc property test passed for {} entries", num_entries);
}
}
async fn create_large_index_file(path: &std::path::Path, num_entries: usize) {
let mut data = Vec::new();
for i in 0..num_entries {
data.extend_from_slice(&[0x00, 0x10]);
let mut key_digest = vec![0u8; 16];
key_digest[0] = (i % 256) as u8;
key_digest[1] = ((i / 256) % 256) as u8;
key_digest[2] = ((i / 65536) % 256) as u8;
key_digest[3] = ((i / 16777216) % 256) as u8;
data.extend_from_slice(&key_digest);
}
fs::write(path, data).await.unwrap();
}
#[test]
#[ignore = "Memory benchmark - not a functional test"]
fn test_arc_no_memory_leaks() {
TRACKING_ALLOCATOR.reset();
let initial_memory = TRACKING_ALLOCATOR.get_current_memory();
{
for _ in 0..10 {
let mut partition_entries = Vec::new();
for i in 0..100 {
let mut key_digest = vec![0u8; 16];
key_digest[0] = (i % 256) as u8;
partition_entries.push(PartitionIndexEntry {
key_digest: Arc::from(key_digest.into_boxed_slice()),
raw_key: None, data_offset: i as u64 * 4096,
data_size: 4096,
promoted_index: None,
});
}
let mut key_lookup: HashMap<Arc<[u8]>, usize> = HashMap::new();
for (index, entry) in partition_entries.iter().enumerate() {
key_lookup.insert(Arc::clone(&entry.key_digest), index);
}
for entry in &partition_entries {
let _ = key_lookup.get(&entry.key_digest);
}
for entry in &partition_entries {
assert_eq!(Arc::strong_count(&entry.key_digest), 2);
}
}
}
std::hint::black_box(());
let final_memory = TRACKING_ALLOCATOR.get_current_memory();
let memory_difference = final_memory.saturating_sub(initial_memory);
println!(
"Memory difference after Arc test: {} bytes",
memory_difference
);
assert!(memory_difference < 1024); }
#[test]
fn test_arc_reference_counting() {
let key_data: Box<[u8]> =
vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16].into_boxed_slice();
let arc_key = Arc::from(key_data);
assert_eq!(Arc::strong_count(&arc_key), 1);
let entry = PartitionIndexEntry {
key_digest: Arc::clone(&arc_key),
raw_key: None, data_offset: 1024,
data_size: 4096,
promoted_index: None,
};
assert_eq!(Arc::strong_count(&arc_key), 2);
assert_eq!(Arc::strong_count(&entry.key_digest), 2);
let mut lookup: HashMap<Arc<[u8]>, usize> = HashMap::new();
lookup.insert(Arc::clone(&entry.key_digest), 0);
assert_eq!(Arc::strong_count(&arc_key), 3);
drop(lookup);
assert_eq!(Arc::strong_count(&arc_key), 2);
drop(entry);
assert_eq!(Arc::strong_count(&arc_key), 1);
}