use crate::error::ShardexError;
use crate::shardex_index::ShardexIndex;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct CowMemoryConfig {
pub metadata_bytes_per_shard: usize,
pub average_vectors_per_shard: usize,
pub modified_shard_overhead_bytes: usize,
pub new_shard_metadata_bytes: usize,
}
impl Default for CowMemoryConfig {
fn default() -> Self {
Self {
metadata_bytes_per_shard: 256,
average_vectors_per_shard: 1000,
modified_shard_overhead_bytes: 1024 * 1024, new_shard_metadata_bytes: 256,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct CowMetrics {
pub active_readers: usize,
pub pending_writers: usize,
pub memory_usage_bytes: usize,
pub clone_operations: u64,
pub average_clone_time: Duration,
pub commit_count: u64,
pub average_commit_time: Duration,
pub peak_memory_usage_bytes: usize,
}
struct CowInternalMetrics {
clone_operations: AtomicU64,
clone_time_total_ms: AtomicU64,
commit_count: AtomicU64,
commit_time_total_ms: AtomicU64,
peak_memory_usage: AtomicUsize,
active_writers: AtomicUsize,
}
impl Default for CowInternalMetrics {
fn default() -> Self {
Self {
clone_operations: AtomicU64::new(0),
clone_time_total_ms: AtomicU64::new(0),
commit_count: AtomicU64::new(0),
commit_time_total_ms: AtomicU64::new(0),
peak_memory_usage: AtomicUsize::new(0),
active_writers: AtomicUsize::new(0),
}
}
}
pub struct CowShardexIndex {
inner: Arc<RwLock<Arc<ShardexIndex>>>,
metrics: Arc<CowInternalMetrics>,
memory_config: CowMemoryConfig,
}
pub struct IndexWriter {
modified_index: Option<ShardexIndex>,
cow_index_ref: Arc<RwLock<Arc<ShardexIndex>>>,
metrics_ref: Arc<CowInternalMetrics>,
}
pub struct LazyIndexWriter {
original_index: Arc<ShardexIndex>,
modified_shards: HashMap<crate::identifiers::ShardId, crate::shard::Shard>,
new_shards: Vec<crate::shardex_index::ShardexMetadata>,
cow_index_ref: Arc<RwLock<Arc<ShardexIndex>>>,
metrics_ref: Arc<CowInternalMetrics>,
memory_config: CowMemoryConfig,
}
impl CowShardexIndex {
pub fn new(index: ShardexIndex) -> Self {
Self {
inner: Arc::new(RwLock::new(Arc::new(index))),
metrics: Arc::new(CowInternalMetrics::default()),
memory_config: CowMemoryConfig::default(),
}
}
pub fn new_with_memory_config(index: ShardexIndex, memory_config: CowMemoryConfig) -> Self {
Self {
inner: Arc::new(RwLock::new(Arc::new(index))),
metrics: Arc::new(CowInternalMetrics::default()),
memory_config,
}
}
pub fn memory_config(&self) -> &CowMemoryConfig {
&self.memory_config
}
pub fn set_memory_config(&mut self, config: CowMemoryConfig) {
self.memory_config = config;
}
pub fn read(&self) -> Arc<ShardexIndex> {
let guard = self.inner.read();
Arc::clone(&*guard)
}
pub fn clone_for_write(&self) -> Result<IndexWriter, ShardexError> {
let start_time = Instant::now();
let current_index = self.read();
let modified_index = current_index.deep_clone()?;
let clone_duration = start_time.elapsed();
self.metrics
.clone_operations
.fetch_add(1, Ordering::Relaxed);
self.metrics
.clone_time_total_ms
.fetch_add(clone_duration.as_millis() as u64, Ordering::Relaxed);
self.metrics.active_writers.fetch_add(1, Ordering::Relaxed);
let estimated_memory = self.estimate_index_memory_usage(&modified_index);
let current_peak = self.metrics.peak_memory_usage.load(Ordering::Relaxed);
if estimated_memory > current_peak {
self.metrics
.peak_memory_usage
.store(estimated_memory, Ordering::Relaxed);
}
Ok(IndexWriter {
modified_index: Some(modified_index),
cow_index_ref: Arc::clone(&self.inner),
metrics_ref: Arc::clone(&self.metrics),
})
}
pub fn clone_for_lazy_write(&self) -> Result<LazyIndexWriter, ShardexError> {
let start_time = Instant::now();
let current_index = self.read();
let clone_duration = start_time.elapsed();
self.metrics
.clone_operations
.fetch_add(1, Ordering::Relaxed);
self.metrics
.clone_time_total_ms
.fetch_add(clone_duration.as_millis() as u64, Ordering::Relaxed);
self.metrics.active_writers.fetch_add(1, Ordering::Relaxed);
Ok(LazyIndexWriter {
original_index: current_index,
modified_shards: HashMap::new(),
new_shards: Vec::new(),
cow_index_ref: Arc::clone(&self.inner),
metrics_ref: Arc::clone(&self.metrics),
memory_config: self.memory_config.clone(),
})
}
pub fn shard_count(&self) -> usize {
let index_ref = self.read();
index_ref.shard_count()
}
pub fn quick_stats(&self, pending_operations: usize) -> Result<crate::structures::IndexStats, ShardexError> {
let index_ref = self.read();
index_ref.stats(pending_operations)
}
pub fn is_empty(&self) -> bool {
self.shard_count() == 0
}
pub fn metrics(&self) -> CowMetrics {
let current_index = self.read();
let current_memory = self.estimate_index_memory_usage(¤t_index);
let clone_ops = self.metrics.clone_operations.load(Ordering::Relaxed);
let clone_time_ms = self.metrics.clone_time_total_ms.load(Ordering::Relaxed);
let commit_ops = self.metrics.commit_count.load(Ordering::Relaxed);
let commit_time_ms = self.metrics.commit_time_total_ms.load(Ordering::Relaxed);
CowMetrics {
active_readers: (Arc::strong_count(¤t_index)).saturating_sub(2), pending_writers: self.metrics.active_writers.load(Ordering::Relaxed),
memory_usage_bytes: current_memory,
clone_operations: clone_ops,
average_clone_time: if clone_ops > 0 {
Duration::from_millis(clone_time_ms / clone_ops)
} else {
Duration::ZERO
},
commit_count: commit_ops,
average_commit_time: if commit_ops > 0 {
Duration::from_millis(commit_time_ms / commit_ops)
} else {
Duration::ZERO
},
peak_memory_usage_bytes: {
let current_peak = self.metrics.peak_memory_usage.load(Ordering::Relaxed);
let new_peak = current_peak.max(current_memory);
self.metrics
.peak_memory_usage
.store(new_peak, Ordering::Relaxed);
new_peak
},
}
}
pub fn memory_usage(&self) -> usize {
let current_index = self.read();
self.estimate_index_memory_usage(¤t_index)
}
pub fn active_reader_count(&self) -> usize {
let guard = self.inner.read();
(Arc::strong_count(&*guard)).saturating_sub(1)
}
fn estimate_index_memory_usage(&self, index: &ShardexIndex) -> usize {
let base_size = std::mem::size_of::<ShardexIndex>();
let shard_count = index.shard_count();
let metadata_size = shard_count * self.memory_config.metadata_bytes_per_shard;
let vector_memory = shard_count
* index.vector_size()
* std::mem::size_of::<f32>()
* self.memory_config.average_vectors_per_shard;
base_size + metadata_size + vector_memory
}
}
impl IndexWriter {
pub fn index_mut(&mut self) -> &mut ShardexIndex {
self.modified_index
.as_mut()
.expect("IndexWriter has already been consumed")
}
pub fn index(&self) -> &ShardexIndex {
self.modified_index
.as_ref()
.expect("IndexWriter has already been consumed")
}
pub fn commit_changes(mut self) -> Result<(), ShardexError> {
let start_time = Instant::now();
let modified_index = self
.modified_index
.take()
.expect("IndexWriter has already been committed or discarded");
let new_index = Arc::new(modified_index);
{
let mut guard = self.cow_index_ref.write();
*guard = new_index;
}
let commit_duration = start_time.elapsed();
self.metrics_ref
.commit_count
.fetch_add(1, Ordering::Relaxed);
self.metrics_ref
.commit_time_total_ms
.fetch_add(commit_duration.as_millis() as u64, Ordering::Relaxed);
Ok(())
}
pub fn discard(mut self) {
self.modified_index.take();
drop(self);
}
pub fn stats(&self, pending_operations: usize) -> Result<crate::structures::IndexStats, ShardexError> {
self.modified_index
.as_ref()
.expect("IndexWriter has already been consumed")
.stats(pending_operations)
}
}
impl LazyIndexWriter {
pub fn shard_count(&self) -> usize {
self.original_index.shard_count() + self.new_shards.len()
}
pub fn vector_size(&self) -> usize {
self.original_index.vector_size()
}
pub fn has_modifications(&self) -> bool {
!self.modified_shards.is_empty() || !self.new_shards.is_empty()
}
pub fn modified_shard_count(&self) -> usize {
self.modified_shards.len()
}
pub fn new_shard_count(&self) -> usize {
self.new_shards.len()
}
pub fn commit_changes(self) -> Result<(), ShardexError> {
let start_time = Instant::now();
if !self.has_modifications() {
return Ok(());
}
let new_index = self.original_index.as_ref().deep_clone()?;
let new_index_arc = Arc::new(new_index);
{
let mut guard = self.cow_index_ref.write();
*guard = new_index_arc;
}
let commit_duration = start_time.elapsed();
self.metrics_ref
.commit_count
.fetch_add(1, Ordering::Relaxed);
self.metrics_ref
.commit_time_total_ms
.fetch_add(commit_duration.as_millis() as u64, Ordering::Relaxed);
Ok(())
}
pub fn discard(mut self) {
self.modified_shards.clear();
self.new_shards.clear();
drop(self);
}
pub fn memory_overhead(&self) -> usize {
let modified_shard_memory = self.modified_shards.len() * self.memory_config.modified_shard_overhead_bytes;
let new_shard_memory = self.new_shards.len() * self.memory_config.new_shard_metadata_bytes;
let base_overhead = std::mem::size_of::<LazyIndexWriter>();
base_overhead + modified_shard_memory + new_shard_memory
}
pub fn stats(&self, pending_operations: usize) -> Result<crate::structures::IndexStats, ShardexError> {
self.original_index.stats(pending_operations)
}
}
impl Clone for CowShardexIndex {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
metrics: Arc::clone(&self.metrics),
memory_config: self.memory_config.clone(),
}
}
}
unsafe impl Send for CowShardexIndex {}
unsafe impl Sync for CowShardexIndex {}
unsafe impl Send for IndexWriter {}
impl Drop for IndexWriter {
fn drop(&mut self) {
self.metrics_ref
.active_writers
.fetch_sub(1, Ordering::Relaxed);
}
}
unsafe impl Send for LazyIndexWriter {}
impl Drop for LazyIndexWriter {
fn drop(&mut self) {
self.metrics_ref
.active_writers
.fetch_sub(1, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::ShardexConfig;
use crate::test_utils::TestEnvironment;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
#[test]
fn test_cow_index_creation() {
let _test_env = TestEnvironment::new("test_cow_index_creation");
let config = ShardexConfig::new()
.directory_path(_test_env.temp_dir.path())
.vector_size(128)
.shard_size(100);
let index = ShardexIndex::create(config).expect("Failed to create index");
let cow_index = CowShardexIndex::new(index);
assert_eq!(cow_index.shard_count(), 0);
assert!(cow_index.is_empty());
}
#[test]
fn test_read_access() {
let _test_env = TestEnvironment::new("test_read_access");
let config = ShardexConfig::new()
.directory_path(_test_env.temp_dir.path())
.vector_size(128);
let index = ShardexIndex::create(config).expect("Failed to create index");
let cow_index = CowShardexIndex::new(index);
let read1 = cow_index.read();
let read2 = cow_index.read();
assert_eq!(read1.shard_count(), read2.shard_count());
assert_eq!(read1.shard_count(), 0);
}
#[test]
fn test_clone_for_write() {
let _test_env = TestEnvironment::new("test_clone_for_write");
let config = ShardexConfig::new()
.directory_path(_test_env.temp_dir.path())
.vector_size(128);
let index = ShardexIndex::create(config).expect("Failed to create index");
let cow_index = CowShardexIndex::new(index);
let writer = cow_index
.clone_for_write()
.expect("Failed to create writer");
assert_eq!(writer.index().shard_count(), 0);
}
#[test]
fn test_commit_changes() {
let _test_env = TestEnvironment::new("test_commit_changes");
let config = ShardexConfig::new()
.directory_path(_test_env.temp_dir.path())
.vector_size(128);
let index = ShardexIndex::create(config).expect("Failed to create index");
let cow_index = CowShardexIndex::new(index);
let initial_count = cow_index.shard_count();
let writer = cow_index
.clone_for_write()
.expect("Failed to create writer");
writer.commit_changes().expect("Failed to commit changes");
assert_eq!(cow_index.shard_count(), initial_count);
}
#[test]
fn test_concurrent_readers() {
let _test_env = TestEnvironment::new("test_concurrent_readers");
let config = ShardexConfig::new()
.directory_path(_test_env.temp_dir.path())
.vector_size(128);
let index = ShardexIndex::create(config).expect("Failed to create index");
let cow_index = Arc::new(CowShardexIndex::new(index));
let read_count = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for _ in 0..4 {
let cow_index_clone = Arc::clone(&cow_index);
let read_count_clone = Arc::clone(&read_count);
let handle = thread::spawn(move || {
for _ in 0..10 {
let _reader = cow_index_clone.read();
read_count_clone.fetch_add(1, Ordering::SeqCst);
thread::sleep(Duration::from_millis(1));
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(read_count.load(Ordering::SeqCst), 40);
}
#[test]
fn test_readers_during_write() {
let _test_env = TestEnvironment::new("test_readers_during_write");
let config = ShardexConfig::new()
.directory_path(_test_env.temp_dir.path())
.vector_size(128);
let index = ShardexIndex::create(config).expect("Failed to create index");
let cow_index = Arc::new(CowShardexIndex::new(index));
let concurrent_reads = Arc::new(AtomicUsize::new(0));
let cow_index_clone = Arc::clone(&cow_index);
let concurrent_reads_clone = Arc::clone(&concurrent_reads);
let reader_handle = thread::spawn(move || {
for _ in 0..100 {
let _reader = cow_index_clone.read();
concurrent_reads_clone.fetch_add(1, Ordering::SeqCst);
thread::sleep(Duration::from_millis(1));
}
});
let writer = cow_index
.clone_for_write()
.expect("Failed to create writer");
thread::sleep(Duration::from_millis(50));
writer.commit_changes().expect("Failed to commit");
reader_handle.join().unwrap();
let read_count = concurrent_reads.load(Ordering::SeqCst);
assert!(read_count > 0, "Expected concurrent reads, got {}", read_count);
}
#[test]
fn test_multiple_writers_sequential() {
let _test_env = TestEnvironment::new("test_multiple_writers_sequential");
let config = ShardexConfig::new()
.directory_path(_test_env.temp_dir.path())
.vector_size(128);
let index = ShardexIndex::create(config).expect("Failed to create index");
let cow_index = CowShardexIndex::new(index);
let writer1 = cow_index
.clone_for_write()
.expect("Failed to create writer1");
let writer2 = cow_index
.clone_for_write()
.expect("Failed to create writer2");
assert_eq!(writer1.index().shard_count(), writer2.index().shard_count());
writer1.discard();
writer2.discard();
}
#[test]
fn test_cow_index_clone() {
let _test_env = TestEnvironment::new("test_cow_index_clone");
let config = ShardexConfig::new()
.directory_path(_test_env.temp_dir.path())
.vector_size(128);
let index = ShardexIndex::create(config).expect("Failed to create index");
let cow_index1 = CowShardexIndex::new(index);
let cow_index2 = cow_index1.clone();
assert_eq!(cow_index1.shard_count(), cow_index2.shard_count());
let _writer1 = cow_index1
.clone_for_write()
.expect("Failed to create writer from clone1");
let _writer2 = cow_index2
.clone_for_write()
.expect("Failed to create writer from clone2");
}
#[test]
fn test_quick_stats_access() {
let _test_env = TestEnvironment::new("test_quick_stats_access");
let config = ShardexConfig::new()
.directory_path(_test_env.temp_dir.path())
.vector_size(128);
let index = ShardexIndex::create(config).expect("Failed to create index");
let cow_index = CowShardexIndex::new(index);
let stats = cow_index.quick_stats(0).expect("Failed to get quick stats");
assert_eq!(stats.total_shards, 0);
assert_eq!(stats.vector_dimension, 128);
}
#[test]
fn test_writer_stats() {
let _test_env = TestEnvironment::new("test_writer_stats");
let config = ShardexConfig::new()
.directory_path(_test_env.temp_dir.path())
.vector_size(128);
let index = ShardexIndex::create(config).expect("Failed to create index");
let cow_index = CowShardexIndex::new(index);
let writer = cow_index
.clone_for_write()
.expect("Failed to create writer");
let stats = writer.stats(0).expect("Failed to get writer stats");
assert_eq!(stats.vector_dimension, 128);
}
#[test]
fn test_thread_safety_markers() {
let _test_env = TestEnvironment::new("test_thread_safety_markers");
let config = ShardexConfig::new()
.directory_path(_test_env.temp_dir.path())
.vector_size(128);
let index = ShardexIndex::create(config).expect("Failed to create index");
let cow_index = CowShardexIndex::new(index);
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
assert_send::<CowShardexIndex>();
assert_sync::<CowShardexIndex>();
assert_send::<IndexWriter>();
let _: Arc<CowShardexIndex> = Arc::new(cow_index);
}
#[test]
fn test_metrics_tracking() {
let _test_env = TestEnvironment::new("test_metrics_tracking");
let config = ShardexConfig::new()
.directory_path(_test_env.temp_dir.path())
.vector_size(128);
let index = ShardexIndex::create(config).expect("Failed to create index");
let cow_index = CowShardexIndex::new(index);
let initial_metrics = cow_index.metrics();
assert_eq!(initial_metrics.clone_operations, 0);
assert_eq!(initial_metrics.commit_count, 0);
assert_eq!(initial_metrics.pending_writers, 0);
assert!(initial_metrics.memory_usage_bytes > 0);
let writer = cow_index
.clone_for_write()
.expect("Failed to create writer");
let metrics_after_clone = cow_index.metrics();
assert_eq!(metrics_after_clone.clone_operations, 1);
assert_eq!(metrics_after_clone.pending_writers, 1);
assert!(metrics_after_clone.average_clone_time >= Duration::ZERO);
writer.commit_changes().expect("Failed to commit changes");
let metrics_after_commit = cow_index.metrics();
assert_eq!(metrics_after_commit.commit_count, 1);
assert_eq!(metrics_after_commit.pending_writers, 0);
assert!(metrics_after_commit.average_commit_time >= Duration::ZERO);
}
#[test]
fn test_memory_usage_tracking() {
let _test_env = TestEnvironment::new("test_memory_usage_tracking");
let config = ShardexConfig::new()
.directory_path(_test_env.temp_dir.path())
.vector_size(128);
let index = ShardexIndex::create(config).expect("Failed to create index");
let cow_index = CowShardexIndex::new(index);
let memory_usage = cow_index.memory_usage();
assert!(memory_usage > 0);
let metrics = cow_index.metrics();
assert_eq!(metrics.memory_usage_bytes, memory_usage);
assert!(metrics.peak_memory_usage_bytes >= memory_usage);
}
#[test]
fn test_active_reader_count() {
let _test_env = TestEnvironment::new("test_active_reader_count");
let config = ShardexConfig::new()
.directory_path(_test_env.temp_dir.path())
.vector_size(128);
let index = ShardexIndex::create(config).expect("Failed to create index");
let cow_index = CowShardexIndex::new(index);
let initial_count = cow_index.active_reader_count();
let reader1 = cow_index.read();
let count_after_reader1 = cow_index.active_reader_count();
assert!(
count_after_reader1 >= initial_count,
"Expected reader count to be >= initial, got initial={}, after_reader1={}",
initial_count,
count_after_reader1
);
let reader2 = cow_index.read();
let count_after_reader2 = cow_index.active_reader_count();
assert!(
count_after_reader2 > count_after_reader1,
"Expected reader count to increase further, got after_reader1={}, after_reader2={}",
count_after_reader1,
count_after_reader2
);
drop(reader1);
let count_after_drop1 = cow_index.active_reader_count();
assert!(
count_after_drop1 < count_after_reader2,
"Expected reader count to decrease, got after_reader2={}, after_drop1={}",
count_after_reader2,
count_after_drop1
);
drop(reader2);
let final_count = cow_index.active_reader_count();
assert!(
final_count < count_after_drop1,
"Expected reader count to decrease further, got after_drop1={}, final={}",
count_after_drop1,
final_count
);
assert_eq!(final_count, 0);
}
#[test]
fn test_writer_discard_metrics() {
let _test_env = TestEnvironment::new("test_writer_discard_metrics");
let config = ShardexConfig::new()
.directory_path(_test_env.temp_dir.path())
.vector_size(128);
let index = ShardexIndex::create(config).expect("Failed to create index");
let cow_index = CowShardexIndex::new(index);
let writer = cow_index
.clone_for_write()
.expect("Failed to create writer");
let metrics_with_writer = cow_index.metrics();
assert_eq!(metrics_with_writer.pending_writers, 1);
writer.discard();
let metrics_after_discard = cow_index.metrics();
assert_eq!(metrics_after_discard.pending_writers, 0);
assert_eq!(metrics_after_discard.commit_count, 0); }
#[test]
fn test_writer_drop_metrics() {
let _test_env = TestEnvironment::new("test_writer_drop_metrics");
let config = ShardexConfig::new()
.directory_path(_test_env.temp_dir.path())
.vector_size(128);
let index = ShardexIndex::create(config).expect("Failed to create index");
let cow_index = CowShardexIndex::new(index);
{
let _writer = cow_index
.clone_for_write()
.expect("Failed to create writer");
let metrics_with_writer = cow_index.metrics();
assert_eq!(metrics_with_writer.pending_writers, 1);
}
let metrics_after_drop = cow_index.metrics();
assert_eq!(metrics_after_drop.pending_writers, 0);
}
#[test]
fn test_metrics_across_clones() {
let _test_env = TestEnvironment::new("test_metrics_across_clones");
let config = ShardexConfig::new()
.directory_path(_test_env.temp_dir.path())
.vector_size(128);
let index = ShardexIndex::create(config).expect("Failed to create index");
let cow_index1 = CowShardexIndex::new(index);
let cow_index2 = cow_index1.clone();
let writer = cow_index1
.clone_for_write()
.expect("Failed to create writer");
writer.commit_changes().expect("Failed to commit");
let metrics1 = cow_index1.metrics();
let metrics2 = cow_index2.metrics();
assert_eq!(metrics1.clone_operations, metrics2.clone_operations);
assert_eq!(metrics1.commit_count, metrics2.commit_count);
assert_eq!(metrics1.memory_usage_bytes, metrics2.memory_usage_bytes);
}
#[test]
fn test_cow_metrics_default() {
let default_metrics = CowMetrics::default();
assert_eq!(default_metrics.active_readers, 0);
assert_eq!(default_metrics.pending_writers, 0);
assert_eq!(default_metrics.memory_usage_bytes, 0);
assert_eq!(default_metrics.clone_operations, 0);
assert_eq!(default_metrics.average_clone_time, Duration::ZERO);
assert_eq!(default_metrics.commit_count, 0);
assert_eq!(default_metrics.average_commit_time, Duration::ZERO);
assert_eq!(default_metrics.peak_memory_usage_bytes, 0);
}
}