use crate::error::ShardexError;
use crate::identifiers::{DocumentId, ShardId, TransactionId};
use crate::layout::{DirectoryLayout, FileDiscovery};
use crate::shard::Shard;
use crate::shardex_index::ShardexIndex;
use crate::structures::Posting;
use crate::transactions::{WalOperation, WalTransaction};
use crate::wal::{WalRecordHeader, WalSegment};
use std::collections::HashSet;
use std::path::PathBuf;
use tracing::{info, warn};
#[derive(Debug, Clone, Default, PartialEq)]
pub struct RecoveryStats {
pub segments_processed: usize,
pub transactions_replayed: usize,
pub transactions_skipped: usize,
pub operations_applied: usize,
pub errors_encountered: Vec<String>,
pub add_posting_operations: usize,
pub remove_document_operations: usize,
pub text_store_operations: usize,
pub text_delete_operations: usize,
pub total_text_bytes_replayed: u64,
pub text_storage_errors: usize,
}
impl RecoveryStats {
pub fn new() -> Self {
Self::default()
}
pub fn add_error<S: Into<String>>(&mut self, error: S) {
self.errors_encountered.push(error.into());
}
pub fn has_errors(&self) -> bool {
!self.errors_encountered.is_empty()
}
pub fn total_operations_processed(&self) -> usize {
self.add_posting_operations
+ self.remove_document_operations
+ self.text_store_operations
+ self.text_delete_operations
}
pub fn total_text_operations(&self) -> usize {
self.text_store_operations + self.text_delete_operations
}
pub fn has_text_storage_errors(&self) -> bool {
self.text_storage_errors > 0
}
pub fn success_rate(&self) -> f64 {
let total_transactions = self.transactions_replayed + self.transactions_skipped;
if total_transactions == 0 {
100.0
} else {
(self.transactions_replayed as f64 / total_transactions as f64) * 100.0
}
}
}
pub struct WalReplayer {
wal_directory: PathBuf,
shardex_index: ShardexIndex,
processed_transactions: HashSet<TransactionId>,
recovery_stats: RecoveryStats,
}
impl WalReplayer {
pub fn new(wal_directory: PathBuf, shardex_index: ShardexIndex) -> Self {
Self {
wal_directory,
shardex_index,
processed_transactions: HashSet::new(),
recovery_stats: RecoveryStats::new(),
}
}
pub fn recovery_stats(&self) -> &RecoveryStats {
&self.recovery_stats
}
pub fn recovery_stats_mut(&mut self) -> &mut RecoveryStats {
&mut self.recovery_stats
}
pub async fn replay_all_segments(&mut self) -> Result<(), ShardexError> {
let parent_dir = self
.wal_directory
.parent()
.ok_or_else(|| ShardexError::Wal("WAL directory has no parent directory".to_string()))?;
let layout = DirectoryLayout::new(parent_dir);
let discovery = FileDiscovery::new(layout);
let wal_segments = discovery.discover_wal_segments()?;
if wal_segments.is_empty() {
return Ok(());
}
let mut total_transactions = 0;
for segment_info in &wal_segments {
let segment = WalSegment::open(segment_info.path.clone())?;
match self.replay_segment(&segment).await {
Ok(transactions_processed) => {
total_transactions += transactions_processed;
}
Err(e) => {
self.recovery_stats.add_error(format!(
"Failed to replay segment {}: {}",
segment_info.path.display(),
e
));
}
}
}
if total_transactions > 0 {
info!(
segments = wal_segments.len(),
transactions = total_transactions,
"WAL replay completed"
);
}
Ok(())
}
pub async fn replay_segment(&mut self, segment: &WalSegment) -> Result<usize, ShardexError> {
let mut transactions_processed = 0;
let initial_write_pos = crate::wal::initial_write_position();
let segment_data = segment.read_segment_data()?;
let mut current_pos = initial_write_pos;
let write_pointer = segment.write_pointer();
while current_pos < write_pointer {
if current_pos + WalRecordHeader::SIZE > segment_data.len() {
break;
}
let header_bytes = &segment_data[current_pos..current_pos + WalRecordHeader::SIZE];
let data_length = u32::from_le_bytes([header_bytes[0], header_bytes[1], header_bytes[2], header_bytes[3]]);
let checksum = u32::from_le_bytes([header_bytes[4], header_bytes[5], header_bytes[6], header_bytes[7]]);
let data_length_usize = data_length as usize;
let record_data_start = current_pos + WalRecordHeader::SIZE;
let record_data_end = record_data_start + data_length_usize;
if record_data_end > segment_data.len() || record_data_end > write_pointer {
self.recovery_stats
.add_error(format!("Truncated record at position {}", current_pos));
break;
}
let record_data = &segment_data[record_data_start..record_data_end];
let expected_checksum = crc32fast::hash(record_data);
if checksum != expected_checksum {
self.recovery_stats
.add_error(format!("Invalid checksum at position {}", current_pos));
current_pos = record_data_end;
continue;
}
match WalTransaction::deserialize(record_data) {
Ok(transaction) => {
transactions_processed += 1;
if self.is_transaction_processed(&transaction.id) {
self.recovery_stats.transactions_skipped += 1;
} else {
match self.apply_transaction(&transaction).await {
Ok(operations_applied) => {
self.recovery_stats.transactions_replayed += 1;
self.recovery_stats.operations_applied += operations_applied;
self.mark_transaction_processed(transaction.id);
}
Err(e) => {
self.recovery_stats
.add_error(format!("Failed to apply transaction {}: {}", transaction.id, e));
}
}
}
}
Err(e) => {
self.recovery_stats.add_error(format!(
"Failed to deserialize transaction at position {}: {}",
current_pos, e
));
}
}
current_pos = record_data_end;
}
self.recovery_stats.segments_processed += 1;
Ok(transactions_processed)
}
async fn apply_transaction(&mut self, transaction: &WalTransaction) -> Result<usize, ShardexError> {
let mut operations_applied = 0;
for operation in &transaction.operations {
match self.apply_operation(operation) {
Ok(()) => {
operations_applied += 1;
match operation {
WalOperation::AddPosting { .. } => {
self.recovery_stats.add_posting_operations += 1;
}
WalOperation::RemoveDocument { .. } => {
self.recovery_stats.remove_document_operations += 1;
}
WalOperation::StoreDocumentText { text, .. } => {
self.recovery_stats.text_store_operations += 1;
self.recovery_stats.total_text_bytes_replayed += text.len() as u64;
}
WalOperation::DeleteDocumentText { .. } => {
self.recovery_stats.text_delete_operations += 1;
}
}
}
Err(e) => {
warn!(
operation = ?operation,
error = %e,
"Failed to apply operation during WAL replay"
);
match operation {
WalOperation::StoreDocumentText { .. } | WalOperation::DeleteDocumentText { .. } => {
self.recovery_stats.text_storage_errors += 1;
}
_ => {}
}
self.recovery_stats
.add_error(format!("Failed to apply {:?}: {}", operation, e));
}
}
}
Ok(operations_applied)
}
fn apply_operation(&mut self, op: &WalOperation) -> Result<(), ShardexError> {
match op {
WalOperation::AddPosting {
document_id,
start,
length,
vector,
} => {
if vector.is_empty() {
return Err(ShardexError::Wal("Cannot add posting with empty vector".to_string()));
}
if *length == 0 {
return Err(ShardexError::Wal("Cannot add posting with zero length".to_string()));
}
let posting = Posting {
document_id: *document_id,
start: *start,
length: *length,
vector: vector.clone(),
};
let shard_id = match self.shardex_index.find_nearest_shard(&posting.vector)? {
Some(shard_id) => shard_id,
None => {
info!("No shards found during WAL replay - creating default shard for recovery");
self.create_default_shard_for_recovery(&posting.vector)?
}
};
let shard = self.shardex_index.get_shard_mut(shard_id)?;
match shard.add_posting(posting) {
Ok(_) => {
Ok(())
}
Err(e) => {
warn!(
document_id = %document_id,
shard_id = %shard_id,
error = %e,
"Failed to add posting to shard during WAL replay"
);
Err(e)
}
}
}
WalOperation::RemoveDocument { document_id } => {
let mut total_removed = 0;
let shard_ids = self.shardex_index.shard_ids();
for shard_id in shard_ids {
let shard = self.shardex_index.get_shard_mut(shard_id)?;
match shard.remove_document(*document_id) {
Ok(removed_count) => {
total_removed += removed_count;
}
Err(e) => {
warn!(
document_id = %document_id,
shard_id = %shard_id,
error = %e,
"Failed to remove document from shard during WAL replay"
);
}
}
}
if total_removed == 0 {
warn!(
document_id = %document_id,
"No postings found to remove for document during WAL replay"
);
}
Ok(())
}
WalOperation::StoreDocumentText { document_id, text } => {
self.replay_store_document_text(*document_id, text)
}
WalOperation::DeleteDocumentText { document_id } => self.replay_delete_document_text(*document_id),
}
}
fn replay_store_document_text(&mut self, document_id: DocumentId, text: &str) -> Result<(), ShardexError> {
tracing::debug!("Replaying store document text: {} ({} bytes)", document_id, text.len());
self.recovery_stats.text_store_operations += 1;
match self.shardex_index.store_document_text(document_id, text) {
Ok(()) => {
tracing::debug!("Successfully replayed text storage for document {}", document_id);
self.recovery_stats.total_text_bytes_replayed += text.len() as u64;
Ok(())
}
Err(e) => {
tracing::error!("Failed to replay text storage for document {}: {}", document_id, e);
self.recovery_stats.text_storage_errors += 1;
self.recovery_stats
.add_error(format!("Text storage error for document {}: {}", document_id, e));
self.handle_text_storage_error(&e, document_id, text)?;
Ok(())
}
}
}
fn replay_delete_document_text(&mut self, document_id: DocumentId) -> Result<(), ShardexError> {
tracing::debug!("Replaying delete document text: {}", document_id);
self.recovery_stats.text_delete_operations += 1;
if self.shardex_index.has_text_storage() {
tracing::debug!("Document text deletion for {} replayed (logical deletion)", document_id);
Ok(())
} else {
tracing::warn!(
"WAL contains text deletion for document {} but text storage not enabled",
document_id
);
self.recovery_stats.add_error(format!(
"WAL contains text deletion for document {} but text storage not enabled",
document_id
));
Ok(())
}
}
fn handle_text_storage_error(
&mut self,
error: &ShardexError,
document_id: DocumentId,
_text: &str,
) -> Result<(), ShardexError> {
tracing::error!(
"Handling text storage error during replay for document {}: {}",
document_id,
error
);
match error {
ShardexError::TextCorruption(msg) => {
if msg.contains("Index file size mismatch") {
tracing::warn!("Attempting index file recovery for text storage");
Ok(())
} else if msg.contains("Data file next offset") {
tracing::warn!("Attempting data file recovery for text storage");
Ok(())
} else {
tracing::error!("Unrecoverable text storage corruption: {}", msg);
Ok(())
}
}
ShardexError::DocumentTooLarge { size, max_size } => {
tracing::warn!(
"Document {} too large ({} bytes > {} max) during replay, skipping",
document_id,
size,
max_size
);
Ok(())
}
_ => {
tracing::error!("Text storage error during replay: {}", error);
Ok(())
}
}
}
pub fn is_transaction_processed(&self, transaction_id: &TransactionId) -> bool {
self.processed_transactions.contains(transaction_id)
}
pub fn mark_transaction_processed(&mut self, transaction_id: TransactionId) {
self.processed_transactions.insert(transaction_id);
}
pub fn processed_transaction_count(&self) -> usize {
self.processed_transactions.len()
}
pub fn into_index(self) -> ShardexIndex {
self.shardex_index
}
fn create_default_shard_for_recovery(&mut self, sample_vector: &[f32]) -> Result<ShardId, ShardexError> {
let shard_id = ShardId::new();
let vector_size = sample_vector.len();
let default_capacity = 1000;
let shard = Shard::create(
shard_id,
default_capacity,
vector_size,
self.wal_directory
.parent()
.ok_or_else(|| ShardexError::Wal("WAL directory has no parent for shard creation".to_string()))?
.to_path_buf(),
)?;
self.shardex_index.add_shard(shard)?;
info!(
shard_id = %shard_id,
vector_size = vector_size,
capacity = default_capacity,
"Created default shard for WAL replay recovery"
);
Ok(shard_id)
}
pub fn validate_text_storage_after_replay(&self) -> Result<(), ShardexError> {
if self.shardex_index.has_text_storage() {
tracing::info!("Validating text storage consistency after WAL replay");
let stats = self.shardex_index.text_storage_stats();
match stats {
Some(stats) => {
tracing::info!(
"Text storage validation complete: {} documents, {} total bytes",
stats.document_count,
stats.total_text_size
);
if stats.document_count > 0 && stats.total_text_size == 0 {
return Err(ShardexError::text_corruption(
"Text storage has entries but zero total size",
));
}
}
None => {
tracing::warn!("Could not retrieve text storage statistics during validation");
}
}
Ok(())
} else {
tracing::debug!("No text storage to validate");
Ok(())
}
}
pub fn get_comprehensive_stats(&self) -> String {
let stats = &self.recovery_stats;
format!(
"WAL Recovery Statistics:
- Segments processed: {}
- Transactions replayed: {}
- Transactions skipped: {}
- Total operations applied: {}
- AddPosting operations: {}
- RemoveDocument operations: {}
- StoreDocumentText operations: {}
- DeleteDocumentText operations: {}
- Total text bytes replayed: {}
- Text storage errors: {}
- Total errors encountered: {}
- Success rate: {:.2}%",
stats.segments_processed,
stats.transactions_replayed,
stats.transactions_skipped,
stats.operations_applied,
stats.add_posting_operations,
stats.remove_document_operations,
stats.text_store_operations,
stats.text_delete_operations,
stats.total_text_bytes_replayed,
stats.text_storage_errors,
stats.errors_encountered.len(),
stats.success_rate()
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::ShardexConfig;
use crate::test_utils::TestEnvironment;
#[test]
fn test_recovery_stats_basic() {
let mut stats = RecoveryStats::new();
assert_eq!(stats.segments_processed, 0);
assert_eq!(stats.transactions_replayed, 0);
assert_eq!(stats.transactions_skipped, 0);
assert_eq!(stats.operations_applied, 0);
assert_eq!(stats.text_store_operations, 0);
assert_eq!(stats.text_delete_operations, 0);
assert_eq!(stats.total_text_bytes_replayed, 0);
assert_eq!(stats.text_storage_errors, 0);
assert!(stats.errors_encountered.is_empty());
assert!(!stats.has_errors());
assert!(!stats.has_text_storage_errors());
assert_eq!(stats.success_rate(), 100.0);
stats.add_error("Test error");
assert!(stats.has_errors());
assert_eq!(stats.errors_encountered.len(), 1);
}
#[test]
fn test_recovery_stats_success_rate() {
let mut stats = RecoveryStats::new();
stats.transactions_replayed = 8;
stats.transactions_skipped = 2;
assert_eq!(stats.success_rate(), 80.0);
stats.transactions_replayed = 10;
stats.transactions_skipped = 0;
assert_eq!(stats.success_rate(), 100.0);
}
#[test]
fn test_wal_replayer_creation() {
let _test_env = TestEnvironment::new("test_wal_replayer_creation");
let config = ShardexConfig::new()
.directory_path(_test_env.path())
.vector_size(128);
let index = ShardexIndex::create(config).unwrap();
let wal_directory = _test_env.path().join("wal");
let replayer = WalReplayer::new(wal_directory.clone(), index);
assert_eq!(replayer.wal_directory, wal_directory);
assert_eq!(replayer.processed_transaction_count(), 0);
assert!(!replayer.recovery_stats().has_errors());
}
#[test]
fn test_transaction_tracking() {
let _test_env = TestEnvironment::new("test_transaction_tracking");
let config = ShardexConfig::new()
.directory_path(_test_env.path())
.vector_size(128);
let index = ShardexIndex::create(config).unwrap();
let wal_directory = _test_env.path().join("wal");
let mut replayer = WalReplayer::new(wal_directory, index);
let transaction_id = TransactionId::new();
assert!(!replayer.is_transaction_processed(&transaction_id));
assert_eq!(replayer.processed_transaction_count(), 0);
replayer.mark_transaction_processed(transaction_id);
assert!(replayer.is_transaction_processed(&transaction_id));
assert_eq!(replayer.processed_transaction_count(), 1);
}
#[test]
fn test_recovery_stats_text_operations() {
let mut stats = RecoveryStats::new();
stats.text_store_operations = 5;
stats.text_delete_operations = 3;
stats.total_text_bytes_replayed = 10240;
stats.text_storage_errors = 2;
assert_eq!(stats.total_text_operations(), 8);
assert!(stats.has_text_storage_errors());
assert_eq!(stats.total_operations_processed(), 8);
stats.add_posting_operations = 10;
stats.remove_document_operations = 2;
assert_eq!(stats.total_operations_processed(), 20); }
#[test]
fn test_recovery_stats_comprehensive() {
let mut stats = RecoveryStats::new();
stats.segments_processed = 3;
stats.transactions_replayed = 50;
stats.transactions_skipped = 5;
stats.operations_applied = 120;
stats.add_posting_operations = 80;
stats.remove_document_operations = 10;
stats.text_store_operations = 25;
stats.text_delete_operations = 5;
stats.total_text_bytes_replayed = 1048576; stats.text_storage_errors = 1;
assert_eq!(stats.total_operations_processed(), 120);
assert_eq!(stats.total_text_operations(), 30);
assert!(stats.has_text_storage_errors());
assert!(!stats.has_errors());
stats.add_error("Some error");
assert!(stats.has_errors());
let expected_rate = (50.0 / 55.0) * 100.0; assert!((stats.success_rate() - expected_rate).abs() < 0.01);
}
#[tokio::test]
async fn test_text_operation_replay_with_storage() {
let _test_env = TestEnvironment::new("test_text_operation_replay_with_storage");
let config = ShardexConfig::new()
.directory_path(_test_env.path())
.vector_size(128)
.max_document_text_size(1024 * 1024);
let index = ShardexIndex::create(config).unwrap();
let wal_directory = _test_env.path().join("wal");
let mut replayer = WalReplayer::new(wal_directory, index);
let doc_id = DocumentId::new();
let test_text = "Test document text for replay";
let store_op = WalOperation::StoreDocumentText {
document_id: doc_id,
text: test_text.to_string(),
};
let result = replayer.apply_operation(&store_op);
assert!(result.is_ok(), "Store operation should succeed");
let stats = replayer.recovery_stats();
assert_eq!(stats.text_store_operations, 1);
assert_eq!(stats.total_text_bytes_replayed, test_text.len() as u64);
assert_eq!(stats.text_storage_errors, 0);
let retrieved_text = replayer.shardex_index.get_document_text(doc_id);
assert!(retrieved_text.is_ok());
assert_eq!(retrieved_text.unwrap(), test_text);
let delete_op = WalOperation::DeleteDocumentText { document_id: doc_id };
let result = replayer.apply_operation(&delete_op);
assert!(result.is_ok(), "Delete operation should succeed");
let stats = replayer.recovery_stats();
assert_eq!(stats.text_delete_operations, 1);
}
#[tokio::test]
async fn test_text_operation_replay_without_storage() {
let _test_env = TestEnvironment::new("test_text_operation_replay_without_storage");
let config = ShardexConfig::new()
.directory_path(_test_env.path())
.vector_size(128)
.max_document_text_size(0);
let index = ShardexIndex::create(config).unwrap();
let wal_directory = _test_env.path().join("wal");
let mut replayer = WalReplayer::new(wal_directory, index);
let doc_id = DocumentId::new();
let test_text = "Test document text for replay";
let store_op = WalOperation::StoreDocumentText {
document_id: doc_id,
text: test_text.to_string(),
};
let result = replayer.apply_operation(&store_op);
assert!(
result.is_ok(),
"Store operation should handle missing storage gracefully"
);
let stats = replayer.recovery_stats();
assert!(
!stats.errors_encountered.is_empty(),
"Should have logged an error for missing text storage"
);
let has_text_storage_error = stats
.errors_encountered
.iter()
.any(|msg| msg.to_lowercase().contains("text storage not enabled"));
assert!(
has_text_storage_error,
"Should have specific error about text storage not enabled"
);
}
#[test]
fn test_validate_text_storage_after_replay_no_storage() {
let _test_env = TestEnvironment::new("test_validate_text_storage_no_storage");
let config = ShardexConfig::new()
.directory_path(_test_env.path())
.vector_size(128)
.max_document_text_size(0);
let index = ShardexIndex::create(config).unwrap();
let wal_directory = _test_env.path().join("wal");
let replayer = WalReplayer::new(wal_directory, index);
let result = replayer.validate_text_storage_after_replay();
assert!(result.is_ok());
}
#[test]
fn test_validate_text_storage_after_replay_with_storage() {
let _test_env = TestEnvironment::new("test_validate_text_storage_with_storage");
let config = ShardexConfig::new()
.directory_path(_test_env.path())
.vector_size(128)
.max_document_text_size(1024 * 1024);
let index = ShardexIndex::create(config).unwrap();
let wal_directory = _test_env.path().join("wal");
let replayer = WalReplayer::new(wal_directory, index);
let result = replayer.validate_text_storage_after_replay();
assert!(result.is_ok());
}
#[test]
fn test_comprehensive_stats_display() {
let _test_env = TestEnvironment::new("test_comprehensive_stats_display");
let config = ShardexConfig::new()
.directory_path(_test_env.path())
.vector_size(128);
let index = ShardexIndex::create(config).unwrap();
let wal_directory = _test_env.path().join("wal");
let mut replayer = WalReplayer::new(wal_directory, index);
replayer.recovery_stats.segments_processed = 2;
replayer.recovery_stats.transactions_replayed = 10;
replayer.recovery_stats.text_store_operations = 5;
replayer.recovery_stats.text_delete_operations = 2;
replayer.recovery_stats.total_text_bytes_replayed = 2048;
replayer.recovery_stats.text_storage_errors = 1;
let stats_display = replayer.get_comprehensive_stats();
assert!(stats_display.contains("Segments processed: 2"));
assert!(stats_display.contains("Transactions replayed: 10"));
assert!(stats_display.contains("StoreDocumentText operations: 5"));
assert!(stats_display.contains("DeleteDocumentText operations: 2"));
assert!(stats_display.contains("Total text bytes replayed: 2048"));
assert!(stats_display.contains("Text storage errors: 1"));
}
}