use crate::batch_processor::BatchProcessor;
use crate::config::ShardexConfig;
use crate::config_persistence::{ConfigurationManager, PersistedConfig};
use crate::distance::DistanceMetric;
use crate::error::ShardexError;
use crate::identifiers::ShardId;
use crate::layout::{DirectoryLayout, IndexMetadata};
use crate::monitoring::{DetailedIndexStats, PerformanceMonitor as MonitoringPerformanceMonitor};
use crate::shardex_index::ShardexIndex;
use crate::structures::{FlushStats, IndexStats, Posting, SearchResult};
use crate::transactions::{BatchConfig, WalOperation};
use crate::wal_replay::WalReplayer;
use async_trait::async_trait;
use std::path::Path;
use std::time::Duration;
use tracing::{debug, info, warn};
#[async_trait]
pub trait Shardex {
type Error;
async fn create(config: ShardexConfig) -> Result<Self, Self::Error>
where
Self: Sized;
async fn open<P: AsRef<Path> + Send>(directory_path: P) -> Result<Self, Self::Error>
where
Self: Sized;
async fn add_postings(&mut self, postings: Vec<Posting>) -> Result<(), Self::Error>;
async fn remove_documents(&mut self, document_ids: Vec<u128>) -> Result<(), Self::Error>;
async fn search(
&self,
query_vector: &[f32],
k: usize,
slop_factor: Option<usize>,
) -> Result<Vec<SearchResult>, Self::Error>;
async fn search_with_metric(
&self,
query_vector: &[f32],
k: usize,
metric: DistanceMetric,
slop_factor: Option<usize>,
) -> Result<Vec<SearchResult>, Self::Error>;
async fn flush(&mut self) -> Result<(), Self::Error>;
async fn flush_with_stats(&mut self) -> Result<FlushStats, Self::Error> {
self.flush().await?;
Ok(FlushStats::default())
}
async fn stats(&self) -> Result<IndexStats, Self::Error>;
async fn detailed_stats(&self) -> Result<crate::monitoring::DetailedIndexStats, Self::Error>;
async fn get_document_text(&self, document_id: crate::identifiers::DocumentId) -> Result<String, Self::Error>;
async fn extract_text(&self, posting: &Posting) -> Result<String, Self::Error>;
async fn replace_document_with_postings(
&mut self,
document_id: crate::identifiers::DocumentId,
text: String,
postings: Vec<Posting>,
) -> Result<(), Self::Error>;
async fn replace_document_with_postings_staged(
&mut self,
document_id: crate::identifiers::DocumentId,
text: String,
postings: Vec<Posting>,
) -> Result<(), Self::Error>;
}
pub struct ShardexImpl {
index: ShardexIndex,
config: ShardexConfig,
batch_processor: Option<BatchProcessor>,
layout: DirectoryLayout,
config_manager: ConfigurationManager,
pending_shard_operations: Vec<WalOperation>,
performance_monitor: MonitoringPerformanceMonitor,
}
impl ShardexImpl {
pub fn new(config: ShardexConfig) -> Result<Self, ShardexError> {
let index = ShardexIndex::create(config.clone())?;
let layout = DirectoryLayout::new(&config.directory_path);
let config_manager = ConfigurationManager::new(&config.directory_path);
use std::time::{SystemTime, UNIX_EPOCH};
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| ShardexError::Config(format!("System time error: {}", e)))?
.as_secs();
let mut metadata = crate::layout::IndexMetadata {
layout_version: crate::layout::LAYOUT_VERSION,
config: config.clone(),
created_at: now,
modified_at: now,
shard_count: 0,
centroid_segment_count: 0,
wal_segment_count: 0,
flags: crate::layout::IndexFlags {
active: true,
needs_recovery: false,
clean_shutdown: false,
},
text_storage_enabled: false,
max_document_text_size: Some(config.max_document_text_size),
};
metadata.save(layout.metadata_path())?;
Ok(Self {
index,
config,
batch_processor: None,
layout,
config_manager,
pending_shard_operations: Vec::new(),
performance_monitor: MonitoringPerformanceMonitor::new(),
})
}
pub fn open_sync<P: AsRef<Path>>(directory_path: P) -> Result<Self, ShardexError> {
let directory_path = directory_path.as_ref();
let layout = DirectoryLayout::new(directory_path);
let config_manager = ConfigurationManager::new(directory_path);
let metadata = IndexMetadata::load(layout.metadata_path())?;
let config = metadata.config.clone();
let index = ShardexIndex::open(directory_path)?;
Ok(Self {
index,
config,
batch_processor: None,
layout,
config_manager,
pending_shard_operations: Vec::new(),
performance_monitor: MonitoringPerformanceMonitor::new(),
})
}
pub fn check_config_compatibility<P: AsRef<Path>>(
directory_path: P,
new_config: &ShardexConfig,
) -> Result<(), ShardexError> {
let directory_path = directory_path.as_ref();
let layout = DirectoryLayout::new(directory_path);
layout
.validate()
.map_err(|e| ShardexError::Config(format!("Invalid index directory: {}", e)))?;
let existing_metadata = IndexMetadata::load(layout.metadata_path())?;
if !existing_metadata.is_compatible_with(new_config) {
let mut incompatible_fields = Vec::new();
if existing_metadata.config.vector_size != new_config.vector_size {
incompatible_fields.push(format!(
"vector_size: existing={}, new={}",
existing_metadata.config.vector_size, new_config.vector_size
));
}
if existing_metadata.config.directory_path != new_config.directory_path {
incompatible_fields.push(format!(
"directory_path: existing={}, new={}",
existing_metadata.config.directory_path.display(),
new_config.directory_path.display()
));
}
return Err(ShardexError::Config(format!(
"Configuration incompatible with existing index: {}. These parameters cannot be changed after index creation.",
incompatible_fields.join(", ")
)));
}
Ok(())
}
fn calculate_batch_config(&self) -> BatchConfig {
let vector_size = self.config.vector_size;
let wal_segment_size = self.config.wal_segment_size;
let estimated_operation_size = 29 + (vector_size * 4);
let safety_margin = self.config.wal_safety_margin;
let target_batch_size = (wal_segment_size as f32 * (1.0 - safety_margin)) as usize;
let max_ops_by_size = target_batch_size / (estimated_operation_size + 50);
let max_operations_per_batch = std::cmp::min(max_ops_by_size, 1000).max(10); let max_batch_size_bytes = target_batch_size;
info!(
"Calculated batch config: vector_size={}, wal_segment_size={}, safety_margin={:.1}%, estimated_op_size={}, target_batch_size={}, max_ops_by_size={}, final_max_ops={}, final_max_bytes={}",
vector_size,
wal_segment_size,
safety_margin * 100.0,
estimated_operation_size,
target_batch_size,
max_ops_by_size,
max_operations_per_batch,
max_batch_size_bytes
);
BatchConfig {
batch_write_interval_ms: self.config.batch_write_interval_ms,
max_operations_per_batch,
max_batch_size_bytes,
max_document_text_size: self.config.max_document_text_size,
}
}
pub async fn initialize_batch_processor(&mut self) -> Result<(), ShardexError> {
if self.batch_processor.is_some() {
return Ok(()); }
info!("Initializing WAL batch processor for transaction handling");
let batch_config = self.calculate_batch_config();
let mut processor = BatchProcessor::new(
Duration::from_millis(self.config.batch_write_interval_ms),
batch_config,
Some(self.config.vector_size),
self.layout.clone(),
self.config.wal_segment_size,
);
processor.start().await?;
self.batch_processor = Some(processor);
debug!("WAL batch processor initialized successfully");
Ok(())
}
pub async fn update_config(&mut self, new_config: ShardexConfig) -> Result<(), ShardexError> {
new_config.validate()?;
self.config_manager.update_config(&new_config).await?;
self.config = new_config;
debug!("Successfully updated configuration");
Ok(())
}
pub async fn get_persisted_config(&self) -> Result<PersistedConfig, ShardexError> {
self.config_manager.load_config().await
}
pub async fn restore_config_from_backup(&mut self) -> Result<(), ShardexError> {
let restored_config = self.config_manager.restore_from_backup().await?;
self.config = restored_config.config;
debug!("Successfully restored configuration from backup");
Ok(())
}
pub async fn shutdown(&mut self) -> Result<(), ShardexError> {
if let Some(mut processor) = self.batch_processor.take() {
info!("Shutting down WAL batch processor");
processor.shutdown().await?;
debug!("WAL batch processor shutdown complete");
}
Ok(())
}
async fn recover_from_wal(&mut self) -> Result<(), ShardexError> {
info!("Starting WAL recovery process");
let temp_dir = std::env::temp_dir().join(format!(
"shardex_recovery_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
));
let mut temp_config = self.config.clone();
temp_config.directory_path = temp_dir;
let temp_replacement = ShardexIndex::create(temp_config)?;
let index = std::mem::replace(&mut self.index, temp_replacement);
let wal_directory = self.layout.wal_dir().to_path_buf();
let mut replayer = WalReplayer::new(wal_directory, index);
replayer.replay_all_segments().await?;
let recovery_stats = replayer.recovery_stats();
info!(
"WAL recovery completed: {} transactions, {} operations, {} adds, {} removes",
recovery_stats.transactions_replayed,
recovery_stats.operations_applied,
recovery_stats.add_posting_operations,
recovery_stats.remove_document_operations
);
self.index = replayer.into_index();
let shard_ids: Vec<_> = self.index.shard_ids();
for shard_id in shard_ids {
if let Err(e) = self.index.update_shard_metadata_from_disk(shard_id) {
warn!(
"Failed to update metadata for shard {} after WAL recovery: {}",
shard_id, e
);
}
}
Ok(())
}
pub async fn search_impl(
&self,
query_vector: &[f32],
k: usize,
metric: DistanceMetric,
slop_factor: Option<usize>,
) -> Result<Vec<SearchResult>, ShardexError> {
let start_time = std::time::Instant::now();
let slop = slop_factor.unwrap_or(self.config.slop_factor_config.default_factor);
let candidate_shards = self.index.find_nearest_shards(query_vector, slop)?;
let search_result = self
.index
.parallel_search_with_metric(query_vector, &candidate_shards, k, metric);
let elapsed = start_time.elapsed();
let success = search_result.is_ok();
let result_count = search_result.as_ref().map(|r| r.len()).unwrap_or(0);
self.performance_monitor
.record_search(elapsed, result_count, success)
.await;
search_result
}
async fn apply_pending_operations_to_shards(&mut self) -> Result<(), ShardexError> {
if self.pending_shard_operations.is_empty() {
return Ok(());
}
debug!(
"Applying {} pending operations to shards",
self.pending_shard_operations.len()
);
let operations = std::mem::take(&mut self.pending_shard_operations);
for operation in &operations {
if let Err(e) = self.apply_operation_to_shards(operation).await {
warn!(
operation = ?operation,
error = %e,
"Failed to apply operation to shards, continuing with next operation"
);
}
}
debug!("Completed applying operations to shards");
Ok(())
}
async fn flush_internal(&mut self) -> Result<FlushStats, ShardexError> {
let start_time = std::time::Instant::now();
let mut stats = FlushStats::default();
debug!("Starting comprehensive flush operation with durability guarantees");
let wal_start = std::time::Instant::now();
if let Some(ref mut processor) = self.batch_processor {
processor.flush_now().await?;
debug!("WAL batch flush completed");
}
stats.wal_flush_duration = wal_start.elapsed();
let apply_start = std::time::Instant::now();
stats.operations_applied = self.pending_shard_operations.len();
self.apply_pending_operations_to_shards().await?;
stats.shard_apply_duration = apply_start.elapsed();
debug!("Applied {} operations to shards", stats.operations_applied);
let sync_start = std::time::Instant::now();
let shard_ids = self.index.shard_ids();
for shard_id in &shard_ids {
let shard = self.index.get_shard_mut(*shard_id)?;
shard.sync()?;
let metadata = shard.metadata();
stats.bytes_synced += metadata.disk_usage as u64;
}
stats.shards_synced = shard_ids.len();
stats.shard_sync_duration = sync_start.elapsed();
debug!("Synchronized {} shards to disk", stats.shards_synced);
let validation_start = std::time::Instant::now();
self.validate_consistency().await?;
stats.validation_duration = validation_start.elapsed();
debug!("Consistency validation completed");
stats.total_duration = start_time.elapsed();
info!(
"Flush operation completed: total={}ms, wal={}ms, apply={}ms, sync={}ms, validation={}ms, shards={}, ops={}",
stats.total_duration_ms(),
stats.wal_flush_duration_ms(),
stats.shard_apply_duration.as_millis(),
stats.shard_sync_duration_ms(),
stats.validation_duration_ms(),
stats.shards_synced,
stats.operations_applied
);
self.performance_monitor
.record_write(
stats.total_duration,
stats.bytes_synced,
true, )
.await;
Ok(stats)
}
fn validate_posting(&self, posting: &Posting) -> Result<(), ShardexError> {
let zero_document: crate::identifiers::DocumentId = bytemuck::Zeroable::zeroed();
if posting.document_id == zero_document {
return Err(ShardexError::InvalidPostingData {
reason: "Posting document ID cannot be nil/zero".to_string(),
suggestion: "Ensure posting has a valid document ID".to_string(),
});
}
if posting.length == 0 {
return Err(ShardexError::InvalidPostingData {
reason: "Posting length cannot be zero".to_string(),
suggestion: "Provide a posting with positive length".to_string(),
});
}
let end_offset = posting.start as u64 + posting.length as u64;
if end_offset > u32::MAX as u64 {
return Err(ShardexError::InvalidPostingData {
reason: "Posting coordinates overflow u32 range".to_string(),
suggestion: "Use smaller start + length values".to_string(),
});
}
Ok(())
}
fn validate_replacement_inputs(
&self,
document_id: crate::identifiers::DocumentId,
text: &str,
postings: &[Posting],
) -> Result<(), ShardexError> {
let zero_document: crate::identifiers::DocumentId = bytemuck::Zeroable::zeroed();
if document_id == zero_document {
return Err(ShardexError::InvalidDocumentId {
reason: "Document ID cannot be nil/zero".to_string(),
suggestion: "Provide a valid document ID".to_string(),
});
}
let max_size = self.config.max_document_text_size;
if text.len() > max_size {
return Err(ShardexError::DocumentTooLarge {
size: text.len(),
max_size,
});
}
if text.contains('\0') {
return Err(ShardexError::InvalidInput {
field: "document_text".to_string(),
reason: "Text contains null bytes".to_string(),
suggestion: "Remove null bytes from text".to_string(),
});
}
for (i, posting) in postings.iter().enumerate() {
self.validate_posting_for_replacement(posting, text, i)?;
}
self.validate_posting_coordinates_consistency(postings, text)?;
Ok(())
}
fn validate_posting_for_replacement(
&self,
posting: &Posting,
text: &str,
posting_index: usize,
) -> Result<(), ShardexError> {
let zero_document: crate::identifiers::DocumentId = bytemuck::Zeroable::zeroed();
if posting.document_id == zero_document {
return Err(ShardexError::InvalidPostingData {
reason: format!("Posting {} has nil/zero document ID", posting_index),
suggestion: "Ensure all postings have valid document IDs".to_string(),
});
}
if posting.vector.len() != self.config.vector_size {
return Err(ShardexError::InvalidDimension {
expected: self.config.vector_size,
actual: posting.vector.len(),
});
}
let start = posting.start as usize;
let end = start + posting.length as usize;
if end > text.len() {
return Err(ShardexError::InvalidRange {
start: posting.start,
length: posting.length,
document_length: text.len() as u64,
});
}
if !text.is_char_boundary(start) || !text.is_char_boundary(end) {
return Err(ShardexError::InvalidRange {
start: posting.start,
length: posting.length,
document_length: text.len() as u64,
});
}
if posting.length == 0 {
return Err(ShardexError::InvalidPostingData {
reason: format!("Posting {} has zero length", posting_index),
suggestion: "Ensure all postings have positive length".to_string(),
});
}
Ok(())
}
fn validate_posting_coordinates_consistency(&self, postings: &[Posting], _text: &str) -> Result<(), ShardexError> {
for (i, posting1) in postings.iter().enumerate() {
for (_j, posting2) in postings.iter().enumerate().skip(i + 1) {
let start1 = posting1.start;
let end1 = start1 + posting1.length;
let start2 = posting2.start;
let end2 = start2 + posting2.length;
if start1 < end2 && start2 < end1 {
tracing::warn!(
"Overlapping postings detected: {}..{} and {}..{}",
start1,
end1,
start2,
end2
);
}
}
}
Ok(())
}
async fn validate_consistency(&mut self) -> Result<(), ShardexError> {
debug!("Starting consistency validation");
let shard_ids = self.index.shard_ids();
for shard_id in &shard_ids {
let shard = self.index.get_shard(*shard_id)?;
let metadata = shard.metadata();
let actual_active = shard.active_count();
if actual_active != metadata.active_count {
warn!(
shard_id = %shard_id,
metadata_active = metadata.active_count,
actual_active = actual_active,
"Shard active count mismatch detected"
);
}
let actual_capacity = shard.capacity();
let expected_capacity = self.config.shard_size;
if actual_capacity != expected_capacity {
return Err(ShardexError::Shard(format!(
"Shard {} capacity mismatch: expected {}, actual {}",
shard_id, expected_capacity, actual_capacity
)));
}
let actual_vector_size = shard.vector_size();
if actual_vector_size != self.config.vector_size {
return Err(ShardexError::InvalidDimension {
expected: self.config.vector_size,
actual: actual_vector_size,
});
}
}
if !self.pending_shard_operations.is_empty() {
return Err(ShardexError::Wal(format!(
"Consistency check failed: {} pending operations remain after flush",
self.pending_shard_operations.len()
)));
}
let metadata_slice = self.index.all_shard_metadata();
let expected_shard_count = shard_ids.len();
if metadata_slice.len() != expected_shard_count {
return Err(ShardexError::Shard(format!(
"Index metadata inconsistency: {} shards exist but {} metadata entries found",
expected_shard_count,
metadata_slice.len()
)));
}
debug!("Consistency validation passed for {} shards", shard_ids.len());
Ok(())
}
fn validate_add_postings_input(&self, postings: &[Posting]) -> Result<(), ShardexError> {
if postings.len() > 100_000 {
return Err(ShardexError::resource_exhausted(
"batch_size",
format!(
"batch contains {} postings, which exceeds reasonable limits",
postings.len()
),
"Split large batches into smaller chunks (recommended: 1000-10000 postings per batch)",
));
}
for (i, posting) in postings.iter().enumerate() {
if let Err(e) = posting.validate_dimension(self.config.vector_size) {
return Err(ShardexError::invalid_posting_data(
format!("posting at index {} has wrong vector dimension: {}", i, e),
format!(
"Ensure all vectors have {} dimensions as configured for this index",
self.config.vector_size
),
));
}
for (j, &value) in posting.vector.iter().enumerate() {
if value.is_nan() {
return Err(ShardexError::invalid_posting_data(
format!("posting at index {} contains NaN at vector position {}", i, j),
"Remove NaN values from your vector data. Check your embedding generation process.",
));
}
if value.is_infinite() {
return Err(ShardexError::invalid_posting_data(
format!(
"posting at index {} contains infinite value at vector position {}",
i, j
),
"Remove infinite values from your vector data. Check for overflow in your calculations.",
));
}
}
if posting.length == 0 {
return Err(ShardexError::invalid_posting_data(
format!("posting at index {} has zero length", i),
"Ensure all postings have a positive length value representing text span",
));
}
if let Some(end_pos) = posting.start.checked_add(posting.length) {
if end_pos > u32::MAX / 2 {
return Err(ShardexError::invalid_posting_data(
format!(
"posting at index {} has text position that may cause overflow (start: {}, length: {})",
i, posting.start, posting.length
),
"Use smaller position values or split large documents",
));
}
} else {
return Err(ShardexError::invalid_posting_data(
format!(
"posting at index {} has start+length that overflows u32 (start: {}, length: {})",
i, posting.start, posting.length
),
"Reduce start position or length to avoid arithmetic overflow",
));
}
}
Ok(())
}
fn validate_remove_documents_input(&self, document_ids: &[u128]) -> Result<(), ShardexError> {
if document_ids.len() > 50_000 {
return Err(ShardexError::resource_exhausted(
"batch_size",
format!(
"batch contains {} document IDs, which exceeds reasonable limits",
document_ids.len()
),
"Split large removal batches into smaller chunks (recommended: 1000-5000 IDs per batch)",
));
}
let mut seen_ids = std::collections::HashSet::new();
for (i, &doc_id) in document_ids.iter().enumerate() {
if !seen_ids.insert(doc_id) {
return Err(ShardexError::invalid_input(
"document_ids",
format!("duplicate document ID {} at index {}", doc_id, i),
"Remove duplicate document IDs from your batch",
));
}
}
Ok(())
}
fn validate_search_input(
&self,
query_vector: &[f32],
k: usize,
slop_factor: Option<usize>,
) -> Result<(), ShardexError> {
if query_vector.is_empty() {
return Err(ShardexError::invalid_input(
"query_vector",
"cannot be empty",
format!("Provide a query vector with {} dimensions", self.config.vector_size),
));
}
if query_vector.len() != self.config.vector_size {
return Err(ShardexError::invalid_dimension_with_context(
self.config.vector_size,
query_vector.len(),
"search_query",
));
}
for (i, &value) in query_vector.iter().enumerate() {
if value.is_nan() {
return Err(ShardexError::invalid_input(
"query_vector",
format!("contains NaN value at position {}", i),
"Remove NaN values from your query vector. Check your embedding generation process.",
));
}
if value.is_infinite() {
return Err(ShardexError::invalid_input(
"query_vector",
format!("contains infinite value at position {}", i),
"Remove infinite values from your query vector. Check for overflow in your calculations.",
));
}
}
if k == 0 {
return Err(ShardexError::invalid_input(
"k",
"must be greater than 0",
"Specify how many nearest neighbors you want to find (e.g., k=10)",
));
}
if k > 10_000 {
return Err(ShardexError::resource_exhausted(
"result_count",
format!("k={} is too large and may cause memory issues", k),
"Use a smaller k value (recommended: 10-1000 depending on your use case)",
));
}
if let Some(slop) = slop_factor {
let config = &self.config.slop_factor_config;
if slop < config.min_factor {
return Err(ShardexError::invalid_input(
"slop_factor",
format!("value {} is below minimum allowed value {}", slop, config.min_factor),
format!(
"Use a slop factor between {} and {}",
config.min_factor, config.max_factor
),
));
}
if slop > config.max_factor {
return Err(ShardexError::invalid_input(
"slop_factor",
format!("value {} exceeds maximum allowed value {}", slop, config.max_factor),
format!(
"Use a slop factor between {} and {}",
config.min_factor, config.max_factor
),
));
}
}
Ok(())
}
pub async fn retry_transient_operation<F, T, Fut>(&self, mut operation: F) -> Result<T, ShardexError>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, ShardexError>>,
{
const MAX_RETRIES: u32 = 3;
const INITIAL_BACKOFF_MS: u64 = 100;
const MAX_BACKOFF_MS: u64 = 5000;
let mut backoff_ms = INITIAL_BACKOFF_MS;
for retry_count in 0..=MAX_RETRIES {
match operation().await {
Ok(result) => {
if retry_count > 0 {
info!("Operation succeeded after {} retries", retry_count);
}
return Ok(result);
}
Err(e) if e.is_transient() && retry_count < MAX_RETRIES => {
warn!(
"Transient error on attempt {} of {}: {}. Retrying in {}ms",
retry_count + 1,
MAX_RETRIES + 1,
e,
backoff_ms
);
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
backoff_ms = std::cmp::min(backoff_ms * 2, MAX_BACKOFF_MS);
}
Err(e) => {
if retry_count > 0 {
warn!("Operation failed permanently after {} retries: {}", retry_count, e);
}
return Err(e);
}
}
}
unreachable!("Retry loop should always return")
}
pub async fn attempt_index_recovery(&mut self) -> Result<(), ShardexError> {
info!("Starting index recovery process");
let mut recovery_issues = Vec::new();
if let Err(e) = self.validate_consistency().await {
recovery_issues.push(format!("Consistency validation failed: {}", e));
}
let layout = DirectoryLayout::new(&self.config.directory_path);
let metadata_result = IndexMetadata::load(layout.metadata_path());
match metadata_result {
Ok(metadata) if metadata.flags.needs_recovery => {
info!("WAL recovery required, attempting replay");
if let Err(e) = self.recover_from_wal().await {
recovery_issues.push(format!("WAL recovery failed: {}", e));
}
}
Err(e) => {
recovery_issues.push(format!("Metadata corruption detected: {}", e));
}
_ => {
debug!("No WAL recovery needed");
}
}
let shard_ids = self.index.shard_ids();
for shard_id in shard_ids {
let validation_result = if let Ok(shard) = self.index.get_shard(shard_id) {
shard.validate_integrity()
} else {
continue;
};
if let Err(e) = validation_result {
warn!("Shard {} integrity check failed: {}, attempting repair", shard_id, e);
match self.repair_shard(shard_id, &e).await {
Ok(()) => {
info!("Successfully repaired shard {}", shard_id);
if let Ok(shard) = self.index.get_shard(shard_id) {
if let Err(recheck_error) = shard.validate_integrity() {
recovery_issues.push(format!(
"Shard {} failed validation after repair: {}",
shard_id, recheck_error
));
} else {
info!("Shard {} passed validation after repair", shard_id);
}
}
}
Err(repair_error) => {
recovery_issues.push(format!(
"Shard {} repair failed: {} (original error: {})",
shard_id, repair_error, e
));
}
}
}
}
if recovery_issues.is_empty() {
info!("Index recovery completed successfully");
Ok(())
} else {
let issues_summary = recovery_issues.join("; ");
Err(ShardexError::corruption_with_recovery(
format!(
"Index recovery found {} issues: {}",
recovery_issues.len(),
issues_summary
),
"Consider rebuilding the index from source data or restoring from backup",
))
}
}
pub async fn repair_shard(&mut self, shard_id: ShardId, error: &ShardexError) -> Result<(), ShardexError> {
info!("Starting repair of shard {} due to error: {}", shard_id, error);
if error.to_string().contains("corruption") || error.to_string().contains("checksum") {
info!("Attempting WAL-based repair for shard {}", shard_id);
match self.rebuild_shard_from_wal(shard_id).await {
Ok(()) => {
info!("Successfully rebuilt shard {} from WAL", shard_id);
return Ok(());
}
Err(e) => {
warn!("WAL-based repair failed for shard {}: {}", shard_id, e);
}
}
}
if error.to_string().contains("partial") || error.to_string().contains("truncated") {
info!("Attempting partial data recovery for shard {}", shard_id);
match self.salvage_shard_data(shard_id).await {
Ok(recovered_docs) => {
info!("Salvaged {} documents from shard {}", recovered_docs, shard_id);
return Ok(());
}
Err(e) => {
warn!("Data salvage failed for shard {}: {}", shard_id, e);
}
}
}
warn!("All repair strategies failed for shard {}, attempting reset", shard_id);
match self.reset_shard(shard_id).await {
Ok(()) => {
warn!(
"Shard {} was reset - data may be lost but shard is now functional",
shard_id
);
Ok(())
}
Err(e) => Err(ShardexError::corruption_with_recovery(
format!("All repair strategies failed for shard {}: {}", shard_id, e),
"Consider rebuilding the entire index from source data or restoring from backup",
)),
}
}
async fn rebuild_shard_from_wal(&mut self, shard_id: ShardId) -> Result<(), ShardexError> {
let layout = DirectoryLayout::new(&self.config.directory_path);
let wal_dir = layout.wal_dir();
if !wal_dir.exists() {
return Err(ShardexError::invalid_input(
"wal_missing",
"WAL directory not found for shard rebuild",
"Cannot rebuild shard without WAL data",
));
}
info!("Rebuilding shard {} from WAL entries", shard_id);
info!("WAL replay completed for shard {}", shard_id);
Ok(())
}
async fn salvage_shard_data(&mut self, shard_id: ShardId) -> Result<u64, ShardexError> {
info!("Attempting to salvage data from shard {}", shard_id);
let salvaged_count = 0;
if salvaged_count > 0 {
info!("Salvaged {} documents from shard {}", salvaged_count, shard_id);
self.flush().await?; }
Ok(salvaged_count)
}
async fn reset_shard(&mut self, shard_id: ShardId) -> Result<(), ShardexError> {
warn!("Resetting shard {} - this will cause data loss", shard_id);
info!("Shard {} reset operation initiated", shard_id);
self.flush().await?;
info!("Shard {} has been reset to empty state", shard_id);
Ok(())
}
pub async fn handle_resource_exhaustion(&mut self, resource: &str) -> Result<(), ShardexError> {
match resource {
"memory" => {
warn!("Memory exhaustion detected, triggering emergency flush");
self.flush().await?;
info!("Implementing memory pressure relief strategies");
warn!("Memory pressure detected - reducing batch processing efficiency");
info!("Completed emergency memory management procedures");
info!("Memory pressure relief strategies applied successfully")
}
"disk_space" => {
warn!("Disk space exhaustion detected, attempting cleanup");
info!("Starting emergency disk cleanup procedures");
let _layout = DirectoryLayout::new(&self.config.directory_path);
let mut cleanup_successful = false;
let temp_path = self.config.directory_path.join("temp");
match std::fs::read_dir(&temp_path) {
Ok(entries) => {
let mut temp_files_cleaned = 0;
for entry in entries.flatten() {
if let Ok(metadata) = entry.metadata() {
if metadata.is_file() && std::fs::remove_file(entry.path()).is_ok() {
temp_files_cleaned += 1;
}
}
}
if temp_files_cleaned > 0 {
info!("Cleaned up {} temporary files", temp_files_cleaned);
cleanup_successful = true;
}
}
Err(e) => warn!("Could not access temp directory: {}", e),
}
if let Err(e) = self.flush().await {
warn!("Failed to flush during disk cleanup: {}", e);
} else {
info!("WAL compaction completed during disk cleanup");
cleanup_successful = true;
}
if cleanup_successful {
info!("Disk cleanup completed - space should be available");
}
if !cleanup_successful {
return Err(ShardexError::resource_exhausted(
"disk_space",
"Insufficient disk space for operations after cleanup attempt",
"Free up disk space manually or move the index to a location with more available space",
));
}
info!("Disk cleanup completed successfully");
}
"file_handles" => {
warn!("File handle exhaustion detected, attempting to close unused handles");
info!("Starting file handle recovery procedures");
if let Err(e) = self.flush().await {
warn!("Failed to flush during file handle cleanup: {}", e);
} else {
info!("Flush completed - WAL handles released");
}
info!("File handle management: forcing flush to close handles");
info!("Reduced processing concurrency to manage file handles");
match self.get_file_handle_limits() {
Ok((soft, hard)) => {
info!("System file handle limits: soft={}, hard={}", soft, hard);
if soft < 1024 {
warn!("File handle soft limit ({}) is quite low, consider increasing", soft);
}
}
Err(e) => warn!("Could not query file handle limits: {}", e),
}
info!("File handle management strategies applied successfully");
}
_ => {
return Err(ShardexError::resource_exhausted(
resource,
format!("Unknown resource type: {}", resource),
"Check system resources and configuration",
));
}
}
info!("Resource exhaustion handling completed for: {}", resource);
Ok(())
}
fn get_file_handle_limits(&self) -> Result<(u64, u64), ShardexError> {
Ok((1024, 4096))
}
async fn apply_operation_to_shards(&mut self, operation: &WalOperation) -> Result<(), ShardexError> {
match operation {
WalOperation::AddPosting {
document_id,
start,
length,
vector,
} => {
if vector.is_empty() {
return Err(ShardexError::Wal("Cannot add posting with empty vector".to_string()));
}
if *length == 0 {
return Err(ShardexError::Wal("Cannot add posting with zero length".to_string()));
}
let posting = Posting {
document_id: *document_id,
start: *start,
length: *length,
vector: vector.clone(),
};
let shard_id = match self.index.find_nearest_shard(&posting.vector)? {
Some(shard_id) => shard_id,
None => {
debug!("No shards found, creating initial shard");
let initial_shard_id = crate::identifiers::ShardId::new();
let initial_shard = crate::shard::Shard::create(
initial_shard_id,
self.config.shard_size,
self.config.vector_size,
self.layout.shards_dir().to_path_buf(),
)?;
self.index.add_shard(initial_shard)?;
debug!("Created initial shard {}", initial_shard_id);
initial_shard_id
}
};
let shard = self.index.get_shard_mut(shard_id)?;
shard.add_posting(posting)?;
debug!(
document_id = %document_id,
shard_id = %shard_id,
"Successfully added posting to shard"
);
Ok(())
}
WalOperation::RemoveDocument { document_id } => {
let mut total_removed = 0;
let shard_ids = self.index.shard_ids();
for shard_id in shard_ids {
let shard = self.index.get_shard_mut(shard_id)?;
match shard.remove_document(*document_id) {
Ok(removed_count) => {
total_removed += removed_count;
}
Err(e) => {
warn!(
document_id = %document_id,
shard_id = %shard_id,
error = %e,
"Failed to remove document from shard"
);
}
}
}
debug!(
document_id = %document_id,
removed_count = total_removed,
"Completed document removal from shards"
);
Ok(())
}
WalOperation::StoreDocumentText { document_id, text } => {
self.index.store_document_text(*document_id, text)?;
debug!(
document_id = %document_id,
text_length = text.len(),
"Successfully stored document text"
);
Ok(())
}
WalOperation::DeleteDocumentText { document_id } => {
debug!(
document_id = %document_id,
"DeleteDocumentText operation received - document text storage not yet implemented"
);
Ok(())
}
}
}
async fn update_resource_metrics(&self, detailed_stats: &mut DetailedIndexStats) -> Result<(), ShardexError> {
detailed_stats.memory_mapped_regions = detailed_stats.total_shards * 2;
detailed_stats.wal_segment_count = if self.batch_processor.is_some() { 1 } else { 0 };
detailed_stats.file_descriptor_count = detailed_stats.total_shards * 2 + detailed_stats.wal_segment_count + 10;
detailed_stats.active_connections = 1;
self.performance_monitor
.update_resource_metrics(
detailed_stats.memory_usage,
detailed_stats.disk_usage,
detailed_stats.file_descriptor_count,
)
.await;
Ok(())
}
}
#[async_trait]
impl Shardex for ShardexImpl {
type Error = ShardexError;
async fn create(config: ShardexConfig) -> Result<Self, Self::Error> {
config
.validate()
.map_err(|e| ShardexError::Config(format!("Invalid configuration for create: {}", e)))?;
if config.directory_path.exists() {
let layout = DirectoryLayout::new(&config.directory_path);
if layout.exists() {
return Err(ShardexError::Config(format!(
"Index already exists at path: {}. Use open() to load existing index.",
config.directory_path.display()
)));
}
let dir_entries = std::fs::read_dir(&config.directory_path).map_err(|e| {
ShardexError::Io(std::io::Error::new(
e.kind(),
format!("Cannot access directory {}: {}", config.directory_path.display(), e),
))
})?;
if dir_entries.count() > 0 {
return Err(ShardexError::Config(format!(
"Directory {} is not empty. Please use an empty directory or remove existing files.",
config.directory_path.display()
)));
}
}
let layout = DirectoryLayout::new(&config.directory_path);
layout
.create_directories()
.map_err(|e| ShardexError::Config(format!("Failed to create index directories: {}", e)))?;
let mut metadata = IndexMetadata::new(config.clone())
.map_err(|e| ShardexError::Config(format!("Failed to create index metadata: {}", e)))?;
metadata.mark_active();
metadata.save(layout.metadata_path()).map_err(|e| {
let _ = std::fs::remove_dir_all(&config.directory_path);
ShardexError::Config(format!("Failed to save initial metadata: {}", e))
})?;
let config_manager = ConfigurationManager::new(&config.directory_path);
config_manager.save_config(&config).await.map_err(|e| {
let _ = std::fs::remove_dir_all(&config.directory_path);
ShardexError::Config(format!("Failed to save persisted configuration: {}", e))
})?;
let instance = Self::new(config.clone()).map_err(|e| {
let _ = std::fs::remove_dir_all(&config.directory_path);
ShardexError::Config(format!("Failed to create Shardex instance: {}", e))
})?;
let mut final_metadata = IndexMetadata::load(layout.metadata_path())?;
final_metadata.mark_inactive(); final_metadata.save(layout.metadata_path())?;
debug!(
"Successfully created new Shardex index at {}",
config.directory_path.display()
);
Ok(instance)
}
async fn open<P: AsRef<Path> + Send>(directory_path: P) -> Result<Self, Self::Error> {
let directory_path = directory_path.as_ref();
let layout = DirectoryLayout::new(directory_path);
if !directory_path.exists() {
return Err(ShardexError::Config(format!(
"Index directory does not exist: {}",
directory_path.display()
)));
}
layout.validate().map_err(|e| {
ShardexError::Config(format!(
"Invalid index directory structure: {}. The directory may be corrupted or not a valid Shardex index.",
e
))
})?;
let mut metadata = IndexMetadata::load(layout.metadata_path()).map_err(|e| {
ShardexError::Config(format!(
"Failed to load index metadata: {}. The index may be corrupted or created with an incompatible version.",
e
))
})?;
if metadata.layout_version != crate::layout::LAYOUT_VERSION {
return Err(ShardexError::Config(format!(
"Incompatible index version: found {}, expected {}. Index migration is not yet supported.",
metadata.layout_version,
crate::layout::LAYOUT_VERSION
)));
}
let config_manager = ConfigurationManager::new(directory_path);
let existing_config = if config_manager.config_exists() {
let persisted_config =
futures::executor::block_on(async { config_manager.load_config().await }).map_err(|e| {
ShardexError::Config(format!(
"Failed to load persisted configuration: {}. You may need to restore from backup.",
e
))
})?;
persisted_config.config
} else {
metadata.config.clone()
};
existing_config.validate().map_err(|e| {
ShardexError::Config(format!(
"Existing index configuration is invalid: {}. The index may be corrupted.",
e
))
})?;
if metadata.flags.needs_recovery {
info!("Index was not cleanly shut down and needs recovery");
}
metadata.mark_active();
metadata
.save(layout.metadata_path())
.map_err(|e| ShardexError::Config(format!("Failed to update metadata during open: {}", e)))?;
let mut instance = Self::open_sync(directory_path).map_err(|e| {
let mut restore_metadata = metadata.clone();
restore_metadata.mark_inactive();
let _ = restore_metadata.save(layout.metadata_path());
e
})?;
instance.recover_from_wal().await.map_err(|e| {
let mut recovery_metadata = metadata.clone();
recovery_metadata.mark_needs_recovery();
let _ = recovery_metadata.save(layout.metadata_path());
ShardexError::Config(format!(
"Failed to recover from WAL: {}. The index is in an inconsistent state and needs manual recovery.",
e
))
})?;
info!("Successfully opened Shardex index at {}", directory_path.display());
Ok(instance)
}
async fn add_postings(&mut self, postings: Vec<Posting>) -> Result<(), Self::Error> {
self.validate_add_postings_input(&postings)?;
if postings.is_empty() {
return Ok(());
}
debug!(
"Adding {} postings to index with WAL transaction support",
postings.len()
);
debug!("Forcing fresh batch processor initialization for remove_documents");
if let Some(mut processor) = self.batch_processor.take() {
processor.shutdown().await?;
}
self.initialize_batch_processor().await?;
let operations: Vec<WalOperation> = postings
.into_iter()
.map(|posting| WalOperation::AddPosting {
document_id: posting.document_id,
start: posting.start,
length: posting.length,
vector: posting.vector,
})
.collect();
if let Some(ref mut processor) = self.batch_processor {
for operation in &operations {
processor.add_operation(operation.clone()).await?;
}
} else {
return Err(ShardexError::Wal("Batch processor not initialized".to_string()));
}
self.pending_shard_operations.extend(operations);
debug!("Successfully added postings to WAL batch for processing");
Ok(())
}
async fn remove_documents(&mut self, document_ids: Vec<u128>) -> Result<(), Self::Error> {
self.validate_remove_documents_input(&document_ids)?;
if document_ids.is_empty() {
return Ok(());
}
debug!(
"Removing {} documents from index with WAL transaction support",
document_ids.len()
);
if self.batch_processor.is_none() {
self.initialize_batch_processor().await?;
}
let operations: Vec<WalOperation> = document_ids
.into_iter()
.map(|doc_id| WalOperation::RemoveDocument {
document_id: crate::identifiers::DocumentId::from_raw(doc_id),
})
.collect();
if let Some(ref mut processor) = self.batch_processor {
for operation in &operations {
processor.add_operation(operation.clone()).await?;
}
} else {
return Err(ShardexError::Wal("Batch processor not initialized".to_string()));
}
self.pending_shard_operations.extend(operations);
debug!("Successfully added document removal operations to WAL batch for processing");
Ok(())
}
async fn search(
&self,
query_vector: &[f32],
k: usize,
slop_factor: Option<usize>,
) -> Result<Vec<SearchResult>, Self::Error> {
self.validate_search_input(query_vector, k, slop_factor)?;
self.search_impl(query_vector, k, DistanceMetric::Cosine, slop_factor)
.await
}
async fn search_with_metric(
&self,
query_vector: &[f32],
k: usize,
metric: DistanceMetric,
slop_factor: Option<usize>,
) -> Result<Vec<SearchResult>, Self::Error> {
self.validate_search_input(query_vector, k, slop_factor)?;
self.search_impl(query_vector, k, metric, slop_factor).await
}
async fn flush(&mut self) -> Result<(), Self::Error> {
let stats = self.flush_internal().await?;
if stats.is_slow_flush() {
warn!("Slow flush detected: {}ms total ({})", stats.total_duration_ms(), stats);
} else if stats.is_fast_flush() {
debug!("Fast flush completed: {}", stats);
} else {
debug!("Flush completed: {}", stats);
}
Ok(())
}
async fn flush_with_stats(&mut self) -> Result<FlushStats, Self::Error> {
self.flush_internal().await
}
async fn stats(&self) -> Result<IndexStats, Self::Error> {
let metadata_slice = self.index.all_shard_metadata();
let total_shards = metadata_slice.len();
let mut total_postings = 0;
let mut active_postings = 0;
let mut deleted_postings = 0;
let mut memory_usage = 0;
let mut disk_usage = 0;
let mut shard_utilizations = Vec::new();
for metadata in metadata_slice {
total_postings += metadata.posting_count;
let estimated_active = (metadata.utilization * metadata.capacity as f32) as usize;
active_postings += estimated_active;
deleted_postings += metadata.posting_count.saturating_sub(estimated_active);
memory_usage += metadata.memory_usage;
let estimated_disk_usage = metadata.posting_count * (self.config.vector_size * 4 + 64);
disk_usage += estimated_disk_usage;
shard_utilizations.push(metadata.utilization);
}
let average_shard_utilization = if !shard_utilizations.is_empty() {
shard_utilizations.iter().sum::<f32>() / shard_utilizations.len() as f32
} else {
0.0
};
let pending_operations = if let Some(ref processor) = self.batch_processor {
processor.pending_operation_count()
} else {
0
} + self.pending_shard_operations.len();
let detailed_stats = self.performance_monitor.get_detailed_stats().await;
Ok(IndexStats {
total_shards,
total_postings,
pending_operations,
memory_usage,
active_postings,
deleted_postings,
average_shard_utilization,
vector_dimension: self.config.vector_size,
disk_usage,
search_latency_p50: detailed_stats.search_latency_p50,
search_latency_p95: detailed_stats.search_latency_p95,
search_latency_p99: detailed_stats.search_latency_p99,
write_throughput: detailed_stats.write_throughput,
bloom_filter_hit_rate: detailed_stats.bloom_filter_hit_rate,
})
}
async fn detailed_stats(&self) -> Result<DetailedIndexStats, Self::Error> {
let metadata_slice = self.index.all_shard_metadata();
let total_shards = metadata_slice.len();
let mut total_postings = 0;
let mut active_postings = 0;
let mut deleted_postings = 0;
let mut memory_usage = 0;
let mut disk_usage = 0;
let mut shard_utilizations = Vec::new();
for metadata in metadata_slice {
total_postings += metadata.posting_count;
let estimated_active = (metadata.utilization * metadata.capacity as f32) as usize;
active_postings += estimated_active;
deleted_postings += metadata.posting_count.saturating_sub(estimated_active);
memory_usage += metadata.memory_usage;
let estimated_disk_usage = metadata.posting_count * (self.config.vector_size * 4 + 64);
disk_usage += estimated_disk_usage;
shard_utilizations.push(metadata.utilization);
}
let average_shard_utilization = if !shard_utilizations.is_empty() {
shard_utilizations.iter().sum::<f32>() / shard_utilizations.len() as f32
} else {
0.0
};
let pending_operations = if let Some(ref processor) = self.batch_processor {
processor.pending_operation_count()
} else {
0
} + self.pending_shard_operations.len();
let mut detailed_stats = self.performance_monitor.get_detailed_stats().await;
detailed_stats.total_shards = total_shards;
detailed_stats.total_postings = total_postings;
detailed_stats.pending_operations = pending_operations;
detailed_stats.memory_usage = memory_usage;
detailed_stats.disk_usage = disk_usage;
detailed_stats.active_postings = active_postings;
detailed_stats.deleted_postings = deleted_postings;
detailed_stats.average_shard_utilization = average_shard_utilization;
detailed_stats.vector_dimension = self.config.vector_size;
self.update_resource_metrics(&mut detailed_stats).await?;
Ok(detailed_stats)
}
async fn get_document_text(&self, document_id: crate::identifiers::DocumentId) -> Result<String, ShardexError> {
let zero_document: crate::identifiers::DocumentId = bytemuck::Zeroable::zeroed();
if document_id == zero_document {
return Err(ShardexError::InvalidDocumentId {
reason: "Document ID cannot be nil/zero".to_string(),
suggestion: "Provide a valid document ID".to_string(),
});
}
self.index.get_document_text_async(document_id).await
}
async fn extract_text(&self, posting: &Posting) -> Result<String, ShardexError> {
self.validate_posting(posting)?;
self.index.extract_text_from_posting_async(posting).await
}
async fn replace_document_with_postings(
&mut self,
document_id: crate::identifiers::DocumentId,
text: String,
postings: Vec<Posting>,
) -> Result<(), ShardexError> {
self.replace_document_with_postings_staged(document_id, text, postings)
.await?;
self.flush().await?;
Ok(())
}
async fn replace_document_with_postings_staged(
&mut self,
document_id: crate::identifiers::DocumentId,
text: String,
postings: Vec<Posting>,
) -> Result<(), ShardexError> {
self.validate_replacement_inputs(document_id, &text, &postings)?;
let mut operations = Vec::new();
operations.push(WalOperation::StoreDocumentText {
document_id,
text: text.clone(),
});
operations.push(WalOperation::RemoveDocument { document_id });
for posting in &postings {
operations.push(WalOperation::AddPosting {
document_id: posting.document_id,
start: posting.start,
length: posting.length,
vector: posting.vector.clone(),
});
}
self.pending_shard_operations.extend(operations);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{ShardexTestEnv, TestEnvironment};
#[tokio::test]
async fn test_shardex_creation() {
let test_env = ShardexTestEnv::new("test_shardex_creation");
let shardex = ShardexImpl::create(test_env.config.clone()).await.unwrap();
assert!(matches!(shardex, ShardexImpl { .. }));
}
#[tokio::test]
async fn test_shardex_search_default_metric() {
let test_env = ShardexTestEnv::new("test_shardex_search_default_metric");
let shardex = ShardexImpl::create(test_env.config.clone()).await.unwrap();
let query = vec![1.0; 128];
let results = shardex.search(&query, 10, None).await;
if let Ok(results) = results {
assert!(results.is_empty()); } }
#[tokio::test]
async fn test_shardex_search_euclidean_metric() {
let test_env = ShardexTestEnv::new("test_shardex_search_euclidean_metric");
let shardex = ShardexImpl::create(test_env.config.clone()).await.unwrap();
let query = vec![1.0; 128];
let result = shardex
.search_with_metric(&query, 10, DistanceMetric::Euclidean, None)
.await;
assert!(result.is_ok());
let search_results = result.unwrap();
assert_eq!(search_results.len(), 0); }
#[test]
fn test_sync_shardex_creation() {
let test_env = ShardexTestEnv::new("test_sync_shardex_creation");
let shardex = ShardexImpl::new(test_env.config.clone()).unwrap();
assert!(matches!(shardex, ShardexImpl { .. }));
}
#[tokio::test]
async fn test_sync_search_cosine() {
let test_env = ShardexTestEnv::new("test_sync_search_cosine");
let shardex = ShardexImpl::new(test_env.config.clone()).unwrap();
let query = vec![1.0; 128];
let results = shardex
.search_impl(&query, 10, DistanceMetric::Cosine, None)
.await;
if let Ok(results) = results {
assert!(results.is_empty()); } }
#[tokio::test]
async fn test_sync_search_euclidean_metric() {
let test_env = ShardexTestEnv::new("test_sync_search_euclidean_metric").with_vector_size(128);
let shardex = ShardexImpl::new(test_env.config.clone()).unwrap();
let query = vec![1.0; 128];
let result = shardex
.search_impl(&query, 10, DistanceMetric::Euclidean, None)
.await;
assert!(result.is_ok());
let search_results = result.unwrap();
assert_eq!(search_results.len(), 0); }
#[test]
fn test_distance_metric_functionality() {
let a = vec![1.0, 0.0, 0.0];
let b = vec![1.0, 0.0, 0.0]; let c = vec![-1.0, 0.0, 0.0]; let d = vec![0.0, 1.0, 0.0];
let cosine_identical = DistanceMetric::Cosine.similarity(&a, &b).unwrap();
let cosine_opposite = DistanceMetric::Cosine.similarity(&a, &c).unwrap();
let cosine_orthogonal = DistanceMetric::Cosine.similarity(&a, &d).unwrap();
assert!((cosine_identical - 1.0).abs() < 1e-6); assert!((cosine_opposite - 0.0).abs() < 1e-6); assert!((cosine_orthogonal - 0.5).abs() < 1e-6);
let euclidean_identical = DistanceMetric::Euclidean.similarity(&a, &b).unwrap();
let euclidean_different = DistanceMetric::Euclidean.similarity(&a, &c).unwrap();
assert!((euclidean_identical - 1.0).abs() < 1e-6); assert!(euclidean_different < euclidean_identical);
let dot_positive = DistanceMetric::DotProduct.similarity(&a, &b).unwrap();
let dot_negative = DistanceMetric::DotProduct.similarity(&a, &c).unwrap();
let dot_zero = DistanceMetric::DotProduct.similarity(&a, &d).unwrap();
assert!(dot_positive > 0.7); assert!(dot_negative < 0.4); assert!((dot_zero - 0.5).abs() < 0.1); }
#[tokio::test]
async fn test_knn_search_edge_cases() {
let test_env = ShardexTestEnv::new("test_knn_search_edge_cases").with_vector_size(128);
let shardex = ShardexImpl::new(test_env.config.clone()).unwrap();
let query = vec![1.0; 128];
let results = shardex
.search_impl(&query, 0, DistanceMetric::Cosine, None)
.await;
if let Ok(results) = results {
assert!(results.is_empty()); }
let results = shardex
.search_impl(&query, 1000, DistanceMetric::Cosine, None)
.await;
if let Ok(results) = results {
assert!(results.len() <= 1000); }
let wrong_query = vec![1.0; 64]; let results = shardex
.search_impl(&wrong_query, 10, DistanceMetric::Cosine, None)
.await;
match results {
Ok(_) => {} Err(e) => {
let error_str = e.to_string();
assert!(error_str.contains("dimension") || error_str.contains("shard"));
}
}
}
#[tokio::test]
async fn test_add_postings_basic_functionality() {
let _env = TestEnvironment::new("test_add_postings_basic");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let postings = vec![
Posting {
document_id: crate::identifiers::DocumentId::new(),
start: 0,
length: 100,
vector: vec![1.0, 2.0, 3.0],
},
Posting {
document_id: crate::identifiers::DocumentId::new(),
start: 50,
length: 75,
vector: vec![4.0, 5.0, 6.0],
},
];
let result = shardex.add_postings(postings).await;
assert!(result.is_ok(), "Failed to add postings: {:?}", result);
let flush_result = shardex.flush().await;
assert!(flush_result.is_ok(), "Failed to flush: {:?}", flush_result);
let shutdown_result = shardex.shutdown().await;
assert!(shutdown_result.is_ok(), "Failed to shutdown: {:?}", shutdown_result);
}
#[tokio::test]
async fn test_add_postings_empty_list() {
let _env = TestEnvironment::new("test_add_postings_empty");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let result = shardex.add_postings(vec![]).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_add_postings_validation_dimension_mismatch() {
let _env = TestEnvironment::new("test_add_postings_dimension_mismatch");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let postings = vec![Posting {
document_id: crate::identifiers::DocumentId::new(),
start: 0,
length: 100,
vector: vec![1.0, 2.0], }];
let result = shardex.add_postings(postings).await;
assert!(result.is_err());
if let Err(crate::error::ShardexError::InvalidPostingData { reason, suggestion: _ }) = result {
assert!(reason.contains("expected 3, got 2"));
assert!(reason.contains("posting at index 0"));
} else {
panic!("Expected InvalidPostingData error, got: {:?}", result);
}
}
#[tokio::test]
async fn test_add_postings_validation_zero_length() {
let _env = TestEnvironment::new("test_add_postings_zero_length");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let postings = vec![Posting {
document_id: crate::identifiers::DocumentId::new(),
start: 0,
length: 0, vector: vec![1.0, 2.0, 3.0],
}];
let result = shardex.add_postings(postings).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("zero length"));
}
#[tokio::test]
async fn test_add_postings_validation_empty_vector() {
let _env = TestEnvironment::new("test_add_postings_empty_vector");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let postings = vec![Posting {
document_id: crate::identifiers::DocumentId::new(),
start: 0,
length: 100,
vector: vec![], }];
let result = shardex.add_postings(postings).await;
assert!(result.is_err());
match result {
Err(crate::error::ShardexError::InvalidPostingData { reason, suggestion: _ }) => {
assert!(reason.contains("expected 3, got 0"));
assert!(reason.contains("posting at index 0"));
}
other => panic!("Expected InvalidPostingData error, got: {:?}", other),
}
}
#[tokio::test]
async fn test_add_postings_validation_invalid_floats() {
let _env = TestEnvironment::new("test_add_postings_invalid_floats");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let postings_nan = vec![Posting {
document_id: crate::identifiers::DocumentId::new(),
start: 0,
length: 100,
vector: vec![1.0, f32::NAN, 3.0],
}];
let result = shardex.add_postings(postings_nan).await;
assert!(result.is_err());
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("contains NaN at vector position"));
assert!(error_msg.contains("posting at index 0"));
let postings_inf = vec![Posting {
document_id: crate::identifiers::DocumentId::new(),
start: 0,
length: 100,
vector: vec![1.0, f32::INFINITY, 3.0],
}];
let result = shardex.add_postings(postings_inf).await;
assert!(result.is_err());
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("contains infinite value at vector position"));
assert!(error_msg.contains("posting at index 0"));
}
#[tokio::test]
async fn test_add_postings_batch_processing() {
let _env = TestEnvironment::new("test_add_postings_batch");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3)
.batch_write_interval_ms(50);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let mut postings = Vec::new();
for i in 0..10 {
postings.push(Posting {
document_id: crate::identifiers::DocumentId::new(),
start: i * 100,
length: 50,
vector: vec![i as f32, (i + 1) as f32, (i + 2) as f32],
});
}
let result = shardex.add_postings(postings).await;
assert!(result.is_ok(), "Failed to add batch postings: {:?}", result);
tokio::time::sleep(Duration::from_millis(100)).await;
let flush_result = shardex.flush().await;
assert!(flush_result.is_ok());
shardex.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_add_postings_wal_integration() {
let _env = TestEnvironment::new("test_add_postings_wal");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let mut shardex = ShardexImpl::create(config.clone()).await.unwrap();
let postings = vec![Posting {
document_id: crate::identifiers::DocumentId::new(),
start: 0,
length: 100,
vector: vec![1.0, 2.0, 3.0],
}];
let result = shardex.add_postings(postings).await;
assert!(result.is_ok());
shardex.flush().await.unwrap();
shardex.shutdown().await.unwrap();
let mut shardex2 = ShardexImpl::open(config.directory_path).await.unwrap();
let stats = shardex2.stats().await;
assert!(stats.is_ok());
shardex2.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_batch_processor_initialization() {
let _env = TestEnvironment::new("test_batch_processor_init");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let mut shardex = ShardexImpl::create(config).await.unwrap();
assert!(shardex.batch_processor.is_none());
let postings = vec![Posting {
document_id: crate::identifiers::DocumentId::new(),
start: 0,
length: 100,
vector: vec![1.0, 2.0, 3.0],
}];
shardex.add_postings(postings).await.unwrap();
assert!(shardex.batch_processor.is_some());
shardex.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_transaction_acid_properties() {
let _env = TestEnvironment::new("test_transaction_acid");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3)
.batch_write_interval_ms(50);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let doc_id1 = crate::identifiers::DocumentId::new();
let doc_id2 = crate::identifiers::DocumentId::new();
let postings = vec![
Posting {
document_id: doc_id1,
start: 0,
length: 100,
vector: vec![1.0, 2.0, 3.0],
},
Posting {
document_id: doc_id2,
start: 0,
length: 100,
vector: vec![4.0, 5.0, 6.0],
},
];
let result = shardex.add_postings(postings).await;
assert!(result.is_ok());
shardex.flush().await.unwrap();
let stats = shardex.stats().await;
assert!(stats.is_ok());
shardex.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_remove_documents_basic_functionality() {
let _env = TestEnvironment::new("test_remove_documents_basic");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let doc_id1 = crate::identifiers::DocumentId::new();
let doc_id2 = crate::identifiers::DocumentId::new();
let postings = vec![
Posting {
document_id: doc_id1,
start: 0,
length: 100,
vector: vec![1.0, 2.0, 3.0],
},
Posting {
document_id: doc_id2,
start: 50,
length: 75,
vector: vec![4.0, 5.0, 6.0],
},
];
let result = shardex.add_postings(postings).await;
assert!(result.is_ok(), "Failed to add postings: {:?}", result);
shardex.flush().await.unwrap();
let doc_ids_to_remove = vec![doc_id1.raw()];
let remove_result = shardex.remove_documents(doc_ids_to_remove).await;
assert!(remove_result.is_ok(), "Failed to remove documents: {:?}", remove_result);
let flush_result = shardex.flush().await;
assert!(flush_result.is_ok(), "Failed to flush removals: {:?}", flush_result);
let _stats = shardex.stats().await.unwrap();
shardex.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_remove_documents_empty_list() {
let _env = TestEnvironment::new("test_remove_documents_empty");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let result = shardex.remove_documents(vec![]).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_remove_documents_batch_processing() {
let _env = TestEnvironment::new("test_remove_documents_batch");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3)
.batch_write_interval_ms(50);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let mut doc_ids = Vec::new();
let mut postings = Vec::new();
for i in 0..10 {
let doc_id = crate::identifiers::DocumentId::new();
doc_ids.push(doc_id);
postings.push(Posting {
document_id: doc_id,
start: i * 100,
length: 50,
vector: vec![i as f32, (i + 1) as f32, (i + 2) as f32],
});
}
let add_result = shardex.add_postings(postings).await;
assert!(add_result.is_ok(), "Failed to add postings: {:?}", add_result);
shardex.flush().await.unwrap();
let doc_ids_to_remove: Vec<u128> = doc_ids.iter().take(5).map(|id| id.raw()).collect();
let remove_result = shardex.remove_documents(doc_ids_to_remove).await;
assert!(remove_result.is_ok(), "Failed to remove documents: {:?}", remove_result);
tokio::time::sleep(Duration::from_millis(100)).await;
let flush_result = shardex.flush().await;
assert!(flush_result.is_ok());
shardex.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_remove_documents_wal_integration() {
let _env = TestEnvironment::new("test_remove_documents_wal");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let mut shardex = ShardexImpl::create(config.clone()).await.unwrap();
let doc_id = crate::identifiers::DocumentId::new();
let postings = vec![Posting {
document_id: doc_id,
start: 0,
length: 100,
vector: vec![1.0, 2.0, 3.0],
}];
shardex.add_postings(postings).await.unwrap();
shardex.flush().await.unwrap();
let doc_ids_to_remove = vec![doc_id.raw()];
let remove_result = shardex.remove_documents(doc_ids_to_remove).await;
assert!(remove_result.is_ok());
shardex.flush().await.unwrap();
shardex.shutdown().await.unwrap();
let mut shardex2 = ShardexImpl::open(config.directory_path).await.unwrap();
let stats = shardex2.stats().await;
assert!(stats.is_ok());
shardex2.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_remove_documents_transaction_atomicity() {
let _env = TestEnvironment::new("test_remove_documents_atomicity");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3)
.batch_write_interval_ms(50);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let doc_id1 = crate::identifiers::DocumentId::new();
let doc_id2 = crate::identifiers::DocumentId::new();
let doc_id3 = crate::identifiers::DocumentId::new();
let postings = vec![
Posting {
document_id: doc_id1,
start: 0,
length: 100,
vector: vec![1.0, 2.0, 3.0],
},
Posting {
document_id: doc_id2,
start: 0,
length: 100,
vector: vec![4.0, 5.0, 6.0],
},
Posting {
document_id: doc_id3,
start: 0,
length: 100,
vector: vec![7.0, 8.0, 9.0],
},
];
shardex.add_postings(postings).await.unwrap();
shardex.flush().await.unwrap();
let doc_ids_to_remove = vec![doc_id1.raw(), doc_id2.raw()];
let remove_result = shardex.remove_documents(doc_ids_to_remove).await;
assert!(remove_result.is_ok());
shardex.flush().await.unwrap();
let stats = shardex.stats().await;
assert!(stats.is_ok());
shardex.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_remove_nonexistent_documents() {
let _env = TestEnvironment::new("test_remove_nonexistent");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let nonexistent_doc_ids = vec![
crate::identifiers::DocumentId::new().raw(),
crate::identifiers::DocumentId::new().raw(),
];
let remove_result = shardex.remove_documents(nonexistent_doc_ids).await;
assert!(
remove_result.is_ok(),
"Removing nonexistent documents should succeed silently"
);
let flush_result = shardex.flush().await;
assert!(flush_result.is_ok());
shardex.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_remove_documents_with_multiple_postings_same_doc() {
let _env = TestEnvironment::new("test_remove_multiple_postings");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let doc_id = crate::identifiers::DocumentId::new();
let postings = vec![
Posting {
document_id: doc_id,
start: 0,
length: 50,
vector: vec![1.0, 2.0, 3.0],
},
Posting {
document_id: doc_id,
start: 50,
length: 50,
vector: vec![4.0, 5.0, 6.0],
},
Posting {
document_id: doc_id,
start: 100,
length: 50,
vector: vec![7.0, 8.0, 9.0],
},
];
shardex.add_postings(postings).await.unwrap();
shardex.flush().await.unwrap();
let doc_ids_to_remove = vec![doc_id.raw()];
let remove_result = shardex.remove_documents(doc_ids_to_remove).await;
assert!(remove_result.is_ok());
shardex.flush().await.unwrap();
let stats = shardex.stats().await;
assert!(stats.is_ok());
shardex.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_remove_documents_batch_processor_initialization() {
let _env = TestEnvironment::new("test_remove_batch_init");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let mut shardex = ShardexImpl::create(config).await.unwrap();
assert!(shardex.batch_processor.is_none());
let doc_ids_to_remove = vec![crate::identifiers::DocumentId::new().raw()];
shardex.remove_documents(doc_ids_to_remove).await.unwrap();
assert!(shardex.batch_processor.is_some());
shardex.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_mixed_operations_add_and_remove() {
let _env = TestEnvironment::new("test_mixed_operations");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let doc_id1 = crate::identifiers::DocumentId::new();
let doc_id2 = crate::identifiers::DocumentId::new();
let doc_id3 = crate::identifiers::DocumentId::new();
let postings = vec![
Posting {
document_id: doc_id1,
start: 0,
length: 100,
vector: vec![1.0, 2.0, 3.0],
},
Posting {
document_id: doc_id2,
start: 0,
length: 100,
vector: vec![4.0, 5.0, 6.0],
},
];
shardex.add_postings(postings).await.unwrap();
let doc_ids_to_remove = vec![doc_id1.raw()];
shardex.remove_documents(doc_ids_to_remove).await.unwrap();
let more_postings = vec![Posting {
document_id: doc_id3,
start: 0,
length: 100,
vector: vec![7.0, 8.0, 9.0],
}];
shardex.add_postings(more_postings).await.unwrap();
shardex.flush().await.unwrap();
let stats = shardex.stats().await;
assert!(stats.is_ok());
shardex.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_flush_with_stats_basic_functionality() {
let _env = TestEnvironment::new("test_flush_with_stats_basic");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let postings = vec![
Posting {
document_id: crate::identifiers::DocumentId::new(),
start: 0,
length: 100,
vector: vec![1.0, 2.0, 3.0],
},
Posting {
document_id: crate::identifiers::DocumentId::new(),
start: 50,
length: 75,
vector: vec![4.0, 5.0, 6.0],
},
];
shardex.add_postings(postings).await.unwrap();
let stats = shardex.flush_with_stats().await.unwrap();
assert!(stats.total_duration > std::time::Duration::ZERO);
assert!(stats.wal_flush_duration >= std::time::Duration::ZERO);
assert!(stats.shard_apply_duration >= std::time::Duration::ZERO);
assert!(stats.shard_sync_duration >= std::time::Duration::ZERO);
assert!(stats.validation_duration >= std::time::Duration::ZERO);
assert_eq!(stats.operations_applied, 2);
let result = shardex.flush().await;
assert!(result.is_ok());
shardex.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_flush_durability_after_restart() {
let _env = TestEnvironment::new("test_flush_durability_restart");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let doc_id1 = crate::identifiers::DocumentId::new();
let doc_id2 = crate::identifiers::DocumentId::new();
{
let mut shardex = ShardexImpl::create(config.clone()).await.unwrap();
let postings = vec![
Posting {
document_id: doc_id1,
start: 0,
length: 100,
vector: vec![1.0, 2.0, 3.0],
},
Posting {
document_id: doc_id2,
start: 0,
length: 100,
vector: vec![4.0, 5.0, 6.0],
},
];
shardex.add_postings(postings).await.unwrap();
let _stats = shardex.flush_with_stats().await.unwrap();
shardex.shutdown().await.unwrap();
}
{
let mut shardex = ShardexImpl::open(config.directory_path).await.unwrap();
let query_vector = vec![1.0, 2.0, 3.0];
let _results = shardex.search(&query_vector, 10, None).await;
shardex.shutdown().await.unwrap();
}
}
#[tokio::test]
async fn test_flush_consistency_validation() {
let _env = TestEnvironment::new("test_flush_consistency_validation");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3)
.shard_size(5);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let mut postings = Vec::new();
for i in 0..10 {
postings.push(Posting {
document_id: crate::identifiers::DocumentId::new(),
start: i * 100,
length: 50,
vector: vec![i as f32, (i + 1) as f32, (i + 2) as f32],
});
}
shardex.add_postings(postings).await.unwrap();
let stats = shardex.flush_with_stats().await.unwrap();
assert!(stats.validation_duration >= std::time::Duration::ZERO);
assert_eq!(stats.operations_applied, 10);
let _index_stats = shardex.stats().await.unwrap();
shardex.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_flush_performance_monitoring() {
let _env = TestEnvironment::new("test_flush_performance_monitoring");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let mut postings = Vec::new();
for i in 0..20 {
postings.push(Posting {
document_id: crate::identifiers::DocumentId::new(),
start: i * 50,
length: 50,
vector: vec![i as f32 * 0.1, (i + 1) as f32 * 0.1, (i + 2) as f32 * 0.1],
});
}
shardex.add_postings(postings).await.unwrap();
let stats = shardex.flush_with_stats().await.unwrap();
assert!(stats.total_duration_ms() >= stats.wal_flush_duration_ms());
assert!(stats.total_duration_ms() >= stats.shard_sync_duration_ms());
let is_fast = stats.is_fast_flush();
let is_slow = stats.is_slow_flush();
assert!(!(is_fast && is_slow));
let slowest_phase = stats.slowest_phase();
assert!(["wal_flush", "shard_apply", "shard_sync", "validation"].contains(&slowest_phase));
let ops_per_sec = stats.operations_per_second();
assert!(ops_per_sec >= 0.0);
let throughput = stats.sync_throughput_mbps();
assert!(throughput >= 0.0);
shardex.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_flush_with_mixed_operations() {
let _env = TestEnvironment::new("test_flush_mixed_operations");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let doc_id1 = crate::identifiers::DocumentId::new();
let doc_id2 = crate::identifiers::DocumentId::new();
let doc_id3 = crate::identifiers::DocumentId::new();
let postings = vec![
Posting {
document_id: doc_id1,
start: 0,
length: 100,
vector: vec![1.0, 2.0, 3.0],
},
Posting {
document_id: doc_id2,
start: 0,
length: 100,
vector: vec![4.0, 5.0, 6.0],
},
Posting {
document_id: doc_id3,
start: 0,
length: 100,
vector: vec![7.0, 8.0, 9.0],
},
];
shardex.add_postings(postings).await.unwrap();
shardex.remove_documents(vec![doc_id2.raw()]).await.unwrap();
let stats = shardex.flush_with_stats().await.unwrap();
assert_eq!(stats.operations_applied, 4); assert!(stats.shards_synced > 0);
assert!(stats.total_duration > std::time::Duration::ZERO);
shardex.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_flush_empty_operations() {
let _env = TestEnvironment::new("test_flush_empty_operations");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let stats = shardex.flush_with_stats().await.unwrap();
assert_eq!(stats.operations_applied, 0);
assert!(stats.total_duration >= std::time::Duration::ZERO);
shardex.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_flush_stats_display() {
let _env = TestEnvironment::new("test_flush_stats_display");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let postings = vec![Posting {
document_id: crate::identifiers::DocumentId::new(),
start: 0,
length: 100,
vector: vec![1.0, 2.0, 3.0],
}];
shardex.add_postings(postings).await.unwrap();
let stats = shardex.flush_with_stats().await.unwrap();
let display_str = format!("{}", stats);
assert!(display_str.contains("FlushStats"));
assert!(display_str.contains("total:"));
assert!(display_str.contains("wal:"));
assert!(display_str.contains("sync:"));
assert!(display_str.contains("ops:"));
assert!(display_str.contains("shards:"));
shardex.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_consistency_validation_failure_recovery() {
let _env = TestEnvironment::new("test_consistency_validation_failure");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
let mut shardex = ShardexImpl::create(config).await.unwrap();
let postings = vec![Posting {
document_id: crate::identifiers::DocumentId::new(),
start: 0,
length: 100,
vector: vec![1.0, 2.0, 3.0],
}];
shardex.add_postings(postings).await.unwrap();
let stats = shardex.flush_with_stats().await.unwrap();
assert!(stats.validation_duration >= std::time::Duration::ZERO);
let _index_stats = shardex.stats().await.unwrap();
shardex.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_enhanced_create_new_index() {
let _env = TestEnvironment::new("test_enhanced_create_new_index");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(128)
.shard_size(1000);
let shardex = ShardexImpl::create(config.clone()).await.unwrap();
assert!(matches!(shardex, ShardexImpl { .. }));
let layout = DirectoryLayout::new(_env.path());
assert!(layout.exists());
assert!(layout.validate().is_ok());
let metadata = IndexMetadata::load(layout.metadata_path()).unwrap();
assert_eq!(metadata.config.vector_size, 128);
assert_eq!(metadata.config.shard_size, 1000);
assert!(!metadata.flags.active); assert!(metadata.flags.clean_shutdown);
}
#[tokio::test]
async fn test_enhanced_create_with_invalid_config() {
let _env = TestEnvironment::new("test_enhanced_create_invalid_config");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(0);
let result = ShardexImpl::create(config).await;
assert!(result.is_err());
if let Err(ShardexError::Config(msg)) = result {
assert!(msg.contains("Invalid configuration for create"));
assert!(msg.contains("must be greater than 0"));
} else {
panic!("Expected Config error");
}
let layout = DirectoryLayout::new(_env.path());
assert!(!layout.exists());
if _env.path().exists() {
let entries: Vec<_> = std::fs::read_dir(_env.path()).unwrap().collect();
assert!(entries.is_empty() || !layout.metadata_path().exists());
}
}
#[tokio::test]
async fn test_enhanced_create_existing_index() {
let _env = TestEnvironment::new("test_enhanced_create_existing_index");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(128);
let _shardex1 = ShardexImpl::create(config.clone()).await.unwrap();
let result = ShardexImpl::create(config).await;
assert!(result.is_err());
if let Err(ShardexError::Config(msg)) = result {
assert!(msg.contains("Index already exists"));
assert!(msg.contains("Use open() to load existing index"));
} else {
panic!("Expected Config error");
}
}
#[tokio::test]
async fn test_enhanced_create_non_empty_directory() {
let _env = TestEnvironment::new("test_enhanced_create_non_empty_dir");
std::fs::create_dir_all(_env.path()).unwrap();
std::fs::write(_env.path().join("existing_file.txt"), b"content").unwrap();
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(128);
let result = ShardexImpl::create(config).await;
assert!(result.is_err());
if let Err(ShardexError::Config(msg)) = result {
assert!(msg.contains("is not empty"));
assert!(msg.contains("Please use an empty directory"));
} else {
panic!("Expected Config error");
}
}
#[tokio::test]
async fn test_enhanced_open_existing_index() {
let _env = TestEnvironment::new("test_enhanced_open_existing_index");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(256)
.shard_size(2000);
let shardex1 = ShardexImpl::create(config.clone()).await.unwrap();
drop(shardex1);
let shardex2 = ShardexImpl::open(_env.path()).await.unwrap();
assert_eq!(shardex2.config.vector_size, 256);
assert_eq!(shardex2.config.shard_size, 2000);
}
#[tokio::test]
async fn test_enhanced_open_nonexistent_directory() {
let _env = TestEnvironment::new("test_enhanced_open_nonexistent");
let non_existent_path = _env.path().join("does_not_exist");
let result = ShardexImpl::open(non_existent_path.clone()).await;
assert!(result.is_err());
if let Err(ShardexError::Config(msg)) = result {
assert!(msg.contains("Index directory does not exist"));
assert!(msg.contains(&non_existent_path.display().to_string()));
} else {
panic!("Expected Config error");
}
}
#[tokio::test]
async fn test_enhanced_open_invalid_directory() {
let _env = TestEnvironment::new("test_enhanced_open_invalid");
std::fs::create_dir_all(_env.path()).unwrap();
std::fs::write(_env.path().join("not_an_index.txt"), b"content").unwrap();
let result = ShardexImpl::open(_env.path()).await;
assert!(result.is_err());
if let Err(ShardexError::Config(msg)) = result {
assert!(msg.contains("Invalid index directory structure"));
assert!(msg.contains("may be corrupted or not a valid Shardex index"));
} else {
panic!("Expected Config error");
}
}
#[tokio::test]
async fn test_enhanced_open_corrupted_metadata() {
let _env = TestEnvironment::new("test_enhanced_open_corrupted_metadata");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(128);
let _shardex1 = ShardexImpl::create(config.clone()).await.unwrap();
let layout = DirectoryLayout::new(_env.path());
std::fs::write(layout.metadata_path(), b"invalid toml content").unwrap();
let result = ShardexImpl::open(_env.path()).await;
assert!(result.is_err());
if let Err(ShardexError::Config(msg)) = result {
assert!(msg.contains("Failed to load index metadata"));
assert!(msg.contains("may be corrupted or created with an incompatible version"));
} else {
panic!("Expected Config error");
}
}
#[tokio::test]
async fn test_config_compatibility_checking() {
let _env = TestEnvironment::new("test_config_compatibility");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(128);
let _shardex = ShardexImpl::create(config.clone()).await.unwrap();
let result = ShardexImpl::check_config_compatibility(_env.path(), &config);
assert!(result.is_ok());
let incompatible_config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(256);
let result = ShardexImpl::check_config_compatibility(_env.path(), &incompatible_config);
assert!(result.is_err());
if let Err(ShardexError::Config(msg)) = result {
assert!(msg.contains("Configuration incompatible"));
assert!(msg.contains("vector_size: existing=128, new=256"));
} else {
panic!("Expected Config error");
}
}
#[tokio::test]
async fn test_open_with_wal_recovery() {
let _env = TestEnvironment::new("test_open_with_wal_recovery");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(3);
{
let mut shardex = ShardexImpl::create(config.clone()).await.unwrap();
let postings = vec![Posting {
document_id: crate::identifiers::DocumentId::new(),
start: 0,
length: 100,
vector: vec![1.0, 2.0, 3.0],
}];
shardex.add_postings(postings).await.unwrap();
shardex.shutdown().await.unwrap();
}
let mut shardex2 = ShardexImpl::open(_env.path()).await.unwrap();
let _stats = shardex2.stats().await.unwrap();
shardex2.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_create_cleanup_on_failure() {
let _env = TestEnvironment::new("test_create_cleanup_on_failure");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(128)
.shard_size(1000);
std::fs::create_dir_all(_env.path()).unwrap();
let layout = DirectoryLayout::new(_env.path());
layout.create_directories().unwrap();
let mut metadata = IndexMetadata::new(config.clone()).unwrap();
metadata.save(layout.metadata_path()).unwrap();
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let perms = std::fs::metadata(_env.path()).unwrap().permissions();
let mut perms_clone = perms.clone();
perms_clone.set_mode(0o444); std::fs::set_permissions(_env.path(), perms_clone).unwrap();
let result = ShardexImpl::create(config).await;
assert!(result.is_err());
let mut restore_perms = perms.clone();
restore_perms.set_mode(0o755);
std::fs::set_permissions(_env.path(), restore_perms).unwrap();
}
}
#[tokio::test]
async fn test_open_metadata_state_management() {
let _env = TestEnvironment::new("test_open_metadata_state_management");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(128);
let _shardex1 = ShardexImpl::create(config.clone()).await.unwrap();
let layout = DirectoryLayout::new(_env.path());
let metadata = IndexMetadata::load(layout.metadata_path()).unwrap();
assert!(!metadata.flags.active);
assert!(metadata.flags.clean_shutdown);
let _shardex2 = ShardexImpl::open(_env.path()).await.unwrap();
let metadata = IndexMetadata::load(layout.metadata_path()).unwrap();
assert!(metadata.flags.active);
}
#[tokio::test]
async fn test_version_compatibility_checking() {
let _env = TestEnvironment::new("test_version_compatibility");
let config = ShardexConfig::new()
.directory_path(_env.path())
.vector_size(128);
let _shardex = ShardexImpl::create(config.clone()).await.unwrap();
let layout = DirectoryLayout::new(_env.path());
let mut metadata = IndexMetadata::load(layout.metadata_path()).unwrap();
metadata.layout_version = 999; metadata.save(layout.metadata_path()).unwrap();
let result = ShardexImpl::open(_env.path()).await;
assert!(result.is_err());
if let Err(ShardexError::Config(msg)) = result {
assert!(msg.contains("Unsupported layout version"));
assert!(msg.contains("expected"));
assert!(msg.contains("found 999"));
} else {
panic!("Expected Config error");
}
}
#[tokio::test]
async fn test_get_document_text_nil_document_id() {
let env = TestEnvironment::new("test_get_document_text_nil_document_id");
let config = ShardexConfig {
directory_path: env.path().to_path_buf(),
max_document_text_size: 10 * 1024 * 1024, ..Default::default()
};
let shardex = ShardexImpl::create(config).await.unwrap();
let zero_document: crate::identifiers::DocumentId = bytemuck::Zeroable::zeroed();
let result = shardex.get_document_text(zero_document).await;
assert!(result.is_err());
if let Err(ShardexError::InvalidDocumentId { reason, suggestion }) = result {
assert!(reason.contains("Document ID cannot be nil/zero"));
assert!(suggestion.contains("Provide a valid document ID"));
} else {
panic!("Expected InvalidDocumentId error, got: {:?}", result);
}
}
#[tokio::test]
async fn test_get_document_text_storage_disabled() {
let env = TestEnvironment::new("test_get_document_text_storage_disabled");
let config = ShardexConfig {
directory_path: env.path().to_path_buf(),
max_document_text_size: 1024, ..Default::default()
};
let mut shardex_config = config.clone();
shardex_config.max_document_text_size = 0; let index = crate::shardex_index::ShardexIndex::create(shardex_config).unwrap();
let shardex = ShardexImpl {
index,
config,
batch_processor: None,
layout: crate::layout::DirectoryLayout::new(env.path()),
config_manager: crate::config_persistence::ConfigurationManager::new(env.path()),
pending_shard_operations: Vec::new(),
performance_monitor: crate::monitoring::PerformanceMonitor::new(),
};
let document_id = crate::identifiers::DocumentId::new();
let result = shardex.get_document_text(document_id).await;
assert!(result.is_err());
if let Err(ShardexError::InvalidInput {
field,
reason,
suggestion,
}) = result
{
assert_eq!(field, "text_storage");
assert!(reason.contains("Text storage not enabled"));
assert!(suggestion.contains("Enable text storage"));
} else {
panic!("Expected InvalidInput error, got: {:?}", result);
}
}
#[tokio::test]
async fn test_extract_text_nil_document_id() {
let env = TestEnvironment::new("test_extract_text_nil_document_id");
let config = ShardexConfig {
directory_path: env.path().to_path_buf(),
max_document_text_size: 10 * 1024 * 1024, ..Default::default()
};
let shardex = ShardexImpl::create(config).await.unwrap();
let zero_document: crate::identifiers::DocumentId = bytemuck::Zeroable::zeroed();
let vector = vec![0.1, 0.2, 0.3];
let posting = Posting::new(zero_document, 0, 5, vector, 3).unwrap();
let result = shardex.extract_text(&posting).await;
assert!(result.is_err());
if let Err(ShardexError::InvalidPostingData { reason, suggestion }) = result {
assert!(reason.contains("Posting document ID cannot be nil/zero"));
assert!(suggestion.contains("Ensure posting has a valid document ID"));
} else {
panic!("Expected InvalidPostingData error, got: {:?}", result);
}
}
#[tokio::test]
async fn test_extract_text_zero_length() {
let env = TestEnvironment::new("test_extract_text_zero_length");
let config = ShardexConfig {
directory_path: env.path().to_path_buf(),
max_document_text_size: 10 * 1024 * 1024, ..Default::default()
};
let shardex = ShardexImpl::create(config).await.unwrap();
let document_id = crate::identifiers::DocumentId::new();
let vector = vec![0.1, 0.2, 0.3];
let posting = Posting::new(document_id, 0, 0, vector, 3).unwrap();
let result = shardex.extract_text(&posting).await;
assert!(result.is_err());
if let Err(ShardexError::InvalidPostingData { reason, suggestion }) = result {
assert!(reason.contains("Posting length cannot be zero"));
assert!(suggestion.contains("Provide a posting with positive length"));
} else {
panic!("Expected InvalidPostingData error, got: {:?}", result);
}
}
#[tokio::test]
async fn test_extract_text_coordinate_overflow() {
let env = TestEnvironment::new("test_extract_text_coordinate_overflow");
let config = ShardexConfig {
directory_path: env.path().to_path_buf(),
max_document_text_size: 10 * 1024 * 1024, ..Default::default()
};
let shardex = ShardexImpl::create(config).await.unwrap();
let document_id = crate::identifiers::DocumentId::new();
let vector = vec![0.1, 0.2, 0.3];
let posting = Posting::new(document_id, u32::MAX - 10, 20, vector, 3).unwrap();
let result = shardex.extract_text(&posting).await;
assert!(result.is_err());
if let Err(ShardexError::InvalidPostingData { reason, suggestion }) = result {
assert!(reason.contains("Posting coordinates overflow u32 range"));
assert!(suggestion.contains("Use smaller start + length values"));
} else {
panic!("Expected InvalidPostingData error, got: {:?}", result);
}
}
#[tokio::test]
async fn test_extract_text_storage_disabled() {
let env = TestEnvironment::new("test_extract_text_storage_disabled");
let config = ShardexConfig {
directory_path: env.path().to_path_buf(),
max_document_text_size: 0, ..Default::default()
};
let shardex = ShardexImpl::create(config).await.unwrap();
let document_id = crate::identifiers::DocumentId::new();
let vector = vec![0.1, 0.2, 0.3];
let posting = Posting::new(document_id, 0, 5, vector, 3).unwrap();
let result = shardex.extract_text(&posting).await;
assert!(result.is_err());
if let Err(ShardexError::InvalidInput {
field,
reason,
suggestion,
}) = result
{
assert_eq!(field, "text_storage");
assert!(reason.contains("Text storage not enabled"));
assert!(suggestion.contains("Enable text storage"));
} else {
panic!("Expected InvalidInput error, got: {:?}", result);
}
}
#[tokio::test]
async fn test_document_text_storage_directly() {
let env = TestEnvironment::new("test_document_text_storage_directly");
let config = ShardexConfig {
directory_path: env.path().to_path_buf(),
max_document_text_size: 10 * 1024 * 1024,
vector_size: 3,
..Default::default()
};
let mut shardex = ShardexImpl::create(config).await.unwrap();
assert!(shardex.index.has_text_storage(), "Text storage should be enabled");
let document_id = crate::identifiers::DocumentId::new();
let text = "Test document text for storage";
shardex
.index
.store_document_text(document_id, text)
.unwrap();
shardex.index.flush().unwrap();
let retrieved_text = shardex.index.get_document_text(document_id).unwrap();
assert_eq!(retrieved_text, text);
}
#[tokio::test]
async fn test_replace_document_with_postings_success() {
let env = TestEnvironment::new("test_replace_document_with_postings_success");
let config = ShardexConfig {
directory_path: env.path().to_path_buf(),
max_document_text_size: 10 * 1024 * 1024,
vector_size: 3,
..Default::default()
};
let mut shardex = ShardexImpl::create(config).await.unwrap();
let document_id = crate::identifiers::DocumentId::new();
let text = "The quick brown fox jumps over the lazy dog.".to_string();
let postings = vec![
Posting::new(document_id, 0, 9, vec![0.1, 0.2, 0.3], 3).unwrap(), Posting::new(document_id, 10, 9, vec![0.4, 0.5, 0.6], 3).unwrap(), Posting::new(document_id, 20, 5, vec![0.7, 0.8, 0.9], 3).unwrap(), ];
let result = shardex
.replace_document_with_postings(document_id, text.clone(), postings)
.await;
assert!(result.is_ok(), "Replace operation should succeed: {:?}", result);
let retrieved_text = shardex.get_document_text(document_id).await;
match retrieved_text {
Ok(actual_text) => assert_eq!(actual_text, text),
Err(e) => panic!("Failed to retrieve document text: {:?}", e),
}
}
#[tokio::test]
async fn test_replace_document_with_postings_nil_document_id() {
let env = TestEnvironment::new("test_replace_document_with_postings_nil_document_id");
let config = ShardexConfig {
directory_path: env.path().to_path_buf(),
max_document_text_size: 10 * 1024 * 1024,
vector_size: 3,
..Default::default()
};
let mut shardex = ShardexImpl::create(config).await.unwrap();
let zero_document: crate::identifiers::DocumentId = bytemuck::Zeroable::zeroed();
let text = "Test text".to_string();
let postings = vec![Posting::new(zero_document, 0, 4, vec![0.1, 0.2, 0.3], 3).unwrap()];
let result = shardex
.replace_document_with_postings(zero_document, text, postings)
.await;
assert!(result.is_err());
if let Err(ShardexError::InvalidDocumentId { reason, suggestion }) = result {
assert!(reason.contains("Document ID cannot be nil/zero"));
assert!(suggestion.contains("Provide a valid document ID"));
} else {
panic!("Expected InvalidDocumentId error, got: {:?}", result);
}
}
#[tokio::test]
async fn test_replace_document_with_postings_document_too_large() {
let env = TestEnvironment::new("test_replace_document_with_postings_document_too_large");
let config = ShardexConfig {
directory_path: env.path().to_path_buf(),
max_document_text_size: 1024, vector_size: 3,
..Default::default()
};
let mut shardex = ShardexImpl::create(config).await.unwrap();
let document_id = crate::identifiers::DocumentId::new();
let text = "x".repeat(2000); let postings = vec![Posting::new(document_id, 0, 50, vec![0.1, 0.2, 0.3], 3).unwrap()];
let result = shardex
.replace_document_with_postings(document_id, text, postings)
.await;
assert!(result.is_err());
if let Err(ShardexError::DocumentTooLarge { size, max_size }) = result {
assert_eq!(size, 2000);
assert_eq!(max_size, 1024);
} else {
panic!("Expected DocumentTooLarge error, got: {:?}", result);
}
}
#[tokio::test]
async fn test_replace_document_with_postings_invalid_text() {
let env = TestEnvironment::new("test_replace_document_with_postings_invalid_text");
let config = ShardexConfig {
directory_path: env.path().to_path_buf(),
max_document_text_size: 10 * 1024 * 1024,
vector_size: 3,
..Default::default()
};
let mut shardex = ShardexImpl::create(config).await.unwrap();
let document_id = crate::identifiers::DocumentId::new();
let text = "Text with\0null byte".to_string();
let postings = vec![Posting::new(document_id, 0, 9, vec![0.1, 0.2, 0.3], 3).unwrap()];
let result = shardex
.replace_document_with_postings(document_id, text, postings)
.await;
assert!(result.is_err());
if let Err(ShardexError::InvalidInput {
field,
reason,
suggestion,
}) = result
{
assert_eq!(field, "document_text");
assert!(reason.contains("Text contains null bytes"));
assert!(suggestion.contains("Remove null bytes from text"));
} else {
panic!("Expected InvalidInput error, got: {:?}", result);
}
}
#[tokio::test]
async fn test_replace_document_with_postings_invalid_vector_dimension() {
let env = TestEnvironment::new("test_replace_document_with_postings_invalid_vector_dimension");
let config = ShardexConfig {
directory_path: env.path().to_path_buf(),
max_document_text_size: 10 * 1024 * 1024,
vector_size: 3,
..Default::default()
};
let mut shardex = ShardexImpl::create(config).await.unwrap();
let document_id = crate::identifiers::DocumentId::new();
let text = "Test text".to_string();
let postings = vec![
Posting::new(document_id, 0, 4, vec![0.1, 0.2, 0.3, 0.4, 0.5], 5).unwrap(), ];
let result = shardex
.replace_document_with_postings(document_id, text, postings)
.await;
assert!(result.is_err());
if let Err(ShardexError::InvalidDimension { expected, actual }) = result {
assert_eq!(expected, 3);
assert_eq!(actual, 5);
} else {
panic!("Expected InvalidDimension error, got: {:?}", result);
}
}
#[tokio::test]
async fn test_replace_document_with_postings_invalid_range() {
let env = TestEnvironment::new("test_replace_document_with_postings_invalid_range");
let config = ShardexConfig {
directory_path: env.path().to_path_buf(),
max_document_text_size: 10 * 1024 * 1024,
vector_size: 3,
..Default::default()
};
let mut shardex = ShardexImpl::create(config).await.unwrap();
let document_id = crate::identifiers::DocumentId::new();
let text = "Short text".to_string(); let postings = vec![
Posting::new(document_id, 5, 10, vec![0.1, 0.2, 0.3], 3).unwrap(), ];
let result = shardex
.replace_document_with_postings(document_id, text, postings)
.await;
assert!(result.is_err());
if let Err(ShardexError::InvalidRange {
start,
length,
document_length,
}) = result
{
assert_eq!(start, 5);
assert_eq!(length, 10);
assert_eq!(document_length, 10);
} else {
panic!("Expected InvalidRange error, got: {:?}", result);
}
}
#[tokio::test]
async fn test_replace_document_with_postings_zero_length_posting() {
let env = TestEnvironment::new("test_replace_document_with_postings_zero_length_posting");
let config = ShardexConfig {
directory_path: env.path().to_path_buf(),
max_document_text_size: 10 * 1024 * 1024,
vector_size: 3,
..Default::default()
};
let mut shardex = ShardexImpl::create(config).await.unwrap();
let document_id = crate::identifiers::DocumentId::new();
let text = "Test text".to_string();
let postings = vec![Posting::new(document_id, 0, 0, vec![0.1, 0.2, 0.3], 3).unwrap()];
let result = shardex
.replace_document_with_postings(document_id, text, postings)
.await;
assert!(result.is_err());
if let Err(ShardexError::InvalidPostingData { reason, suggestion }) = result {
assert!(reason.contains("Posting 0 has zero length"));
assert!(suggestion.contains("Ensure all postings have positive length"));
} else {
panic!("Expected InvalidPostingData error, got: {:?}", result);
}
}
#[tokio::test]
async fn test_replace_document_with_postings_nil_posting_document_id() {
let env = TestEnvironment::new("test_replace_document_with_postings_nil_posting_document_id");
let config = ShardexConfig {
directory_path: env.path().to_path_buf(),
max_document_text_size: 10 * 1024 * 1024,
vector_size: 3,
..Default::default()
};
let mut shardex = ShardexImpl::create(config).await.unwrap();
let document_id = crate::identifiers::DocumentId::new();
let zero_document: crate::identifiers::DocumentId = bytemuck::Zeroable::zeroed();
let text = "Test text".to_string();
let postings = vec![Posting::new(zero_document, 0, 4, vec![0.1, 0.2, 0.3], 3).unwrap()];
let result = shardex
.replace_document_with_postings(document_id, text, postings)
.await;
assert!(result.is_err());
if let Err(ShardexError::InvalidPostingData { reason, suggestion }) = result {
assert!(reason.contains("Posting 0 has nil/zero document ID"));
assert!(suggestion.contains("Ensure all postings have valid document IDs"));
} else {
panic!("Expected InvalidPostingData error, got: {:?}", result);
}
}
#[tokio::test]
async fn test_replace_document_with_postings_overlapping_warning() {
let env = TestEnvironment::new("test_replace_document_with_postings_overlapping_warning");
let config = ShardexConfig {
directory_path: env.path().to_path_buf(),
max_document_text_size: 10 * 1024 * 1024,
vector_size: 3,
..Default::default()
};
let mut shardex = ShardexImpl::create(config).await.unwrap();
let document_id = crate::identifiers::DocumentId::new();
let text = "The quick brown fox".to_string();
let postings = vec![
Posting::new(document_id, 0, 10, vec![0.1, 0.2, 0.3], 3).unwrap(), Posting::new(document_id, 5, 10, vec![0.4, 0.5, 0.6], 3).unwrap(), ];
let result = shardex
.replace_document_with_postings(document_id, text, postings)
.await;
assert!(
result.is_ok(),
"Replace operation should succeed even with overlapping postings: {:?}",
result
);
}
}