use super::{IntegrityCheckResult, IntegrityStatus, SSTableReader, SSTableReaderHealthMetrics};
use crate::types::Value;
use crate::Result;
use log::{debug, info};
use std::io::SeekFrom;
use tokio::io::AsyncSeekExt;
#[cfg(feature = "tombstones")]
use super::super::tombstone_merger::GenerationValue;
#[cfg(feature = "tombstones")]
use crate::{types::TableId, RowKey};
#[cfg(feature = "tombstones")]
use log::warn;
impl SSTableReader {
pub async fn get_health_metrics(&self) -> Result<SSTableReaderHealthMetrics> {
let stats = self.stats().await?;
let cache_hit_rate = self.calculate_cache_hit_rate();
let memory_usage = self.estimate_memory_usage();
Ok(SSTableReaderHealthMetrics {
file_path: self.file_path.clone(),
file_accessible: self.file_path.exists(),
header_version: self.header.cassandra_version,
total_file_size: stats.file_size,
estimated_memory_usage: memory_usage,
block_cache_entries: self.block_cache.len(),
block_cache_hit_rate: cache_hit_rate,
compression_enabled: self.compression_reader.is_some(),
compression_algorithm: self.header.compression.algorithm.clone(),
bloom_filter_enabled: self.bloom_filter.is_some(),
index_available: self.index.is_some(),
generation: self.generation,
last_error: None,
})
}
pub async fn perform_integrity_check(&self) -> Result<IntegrityCheckResult> {
debug!("Starting integrity check for {:?}", self.file_path);
let mut result = IntegrityCheckResult {
file_path: self.file_path.clone(),
total_blocks_checked: 0,
corrupted_blocks: Vec::new(),
checksum_mismatches: 0,
unreadable_blocks: 0,
total_entries: 0,
parsing_errors: Vec::new(),
overall_status: IntegrityStatus::Healthy,
};
let original_position = {
let mut file_guard = self.file.lock().await;
file_guard.stream_position().await.unwrap_or(0)
};
let header_size = self.calculate_header_size();
{
let mut file_guard = self.file.lock().await;
file_guard.seek(SeekFrom::Start(header_size as u64)).await?;
}
while let Some(block_data) = self.read_next_block().await.ok().flatten() {
result.total_blocks_checked += 1;
match self.parse_block_entries(&block_data, None) {
Ok(entries) => {
result.total_entries += entries.len();
}
Err(e) => {
result
.parsing_errors
.push(format!("Block {}: {}", result.total_blocks_checked, e));
result.corrupted_blocks.push(result.total_blocks_checked);
}
}
if result.total_blocks_checked % 100 == 0 {
tokio::task::yield_now().await;
}
}
{
let mut file_guard = self.file.lock().await;
file_guard.seek(SeekFrom::Start(original_position)).await?;
}
result.overall_status =
if !result.corrupted_blocks.is_empty() || !result.parsing_errors.is_empty() {
IntegrityStatus::Corrupted
} else if result.checksum_mismatches > 0 {
IntegrityStatus::Degraded
} else {
IntegrityStatus::Healthy
};
info!(
"Integrity check completed for {:?}: {:?}, {} blocks checked, {} entries",
self.file_path,
result.overall_status,
result.total_blocks_checked,
result.total_entries
);
Ok(result)
}
#[cfg(feature = "tombstones")]
pub(super) fn filter_tombstone(&self, value: &Value) -> bool {
let write_time = self.extract_write_time_from_value(value);
if self
.tombstone_merger
.fast_tombstone_check(value, write_time)
{
return false;
}
if let Some(ttl) = self.extract_ttl_from_value(value) {
let current_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_micros() as i64)
.unwrap_or_else(|e| {
warn!("Failed to get system time: {}; using fallback value 0", e);
0
});
if current_time > write_time + ttl {
return false;
}
}
true }
#[cfg(not(feature = "tombstones"))]
pub(super) fn filter_tombstone(&self, value: &Value) -> bool {
use crate::types::TombstoneType;
!matches!(
value,
Value::Tombstone(info)
if info.tombstone_type == TombstoneType::RowTombstone
)
}
#[cfg(feature = "tombstones")]
pub async fn filter_with_multi_generation_merge(
&self,
table_id: &TableId,
entries: Vec<(RowKey, Vec<GenerationValue>)>,
) -> Result<Vec<(RowKey, Value)>> {
let mut results = Vec::new();
log::debug!(
"Processing {} key groups for multi-generation merge",
entries.len()
);
const BATCH_SIZE: usize = 1000;
let batches: Vec<_> = entries.chunks(BATCH_SIZE).collect();
for (batch_idx, batch) in batches.iter().enumerate() {
log::debug!(
"Processing batch {}/{} with {} entries",
batch_idx + 1,
batches.len(),
batch.len()
);
let batch_entries = batch.to_vec();
let merged_results = self
.tombstone_merger
.batch_merge_with_tombstones(batch_entries, BATCH_SIZE)?;
for (key, merged_value) in merged_results {
if let Some(value) = merged_value {
if self.should_include_value_after_merge(&value, table_id, &key)? {
results.push((key, value));
}
} else {
log::debug!("Value for key {:?} was completely tombstoned", key);
}
}
}
log::debug!(
"Multi-generation merge completed: {} final results from {} input groups",
results.len(),
entries.len()
);
Ok(results)
}
#[cfg(feature = "tombstones")]
#[allow(clippy::only_used_in_recursion)]
fn should_include_value_after_merge(
&self,
value: &Value,
_table_id: &TableId,
_key: &RowKey,
) -> Result<bool> {
match value {
Value::Null => Ok(false),
Value::List(list) => Ok(!list.is_empty()),
Value::Set(set) => Ok(!set.is_empty()),
Value::Map(map) => Ok(!map.is_empty()),
Value::Udt(udt) => {
let has_non_null_fields = udt.fields.iter().any(|field| field.value.is_some());
Ok(has_non_null_fields)
}
Value::Frozen(boxed_value) => {
self.should_include_value_after_merge(boxed_value, _table_id, _key)
}
Value::Tombstone(_) => Ok(false),
_ => Ok(true),
}
}
#[cfg(feature = "tombstones")]
fn extract_ttl_from_value(&self, value: &Value) -> Option<i64> {
match value {
Value::Tombstone(info) => info.ttl,
_ => None, }
}
#[cfg(feature = "tombstones")]
fn extract_write_time_from_value(&self, value: &Value) -> i64 {
match value {
Value::Tombstone(info) => info.deletion_time,
_ => std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_micros() as i64)
.unwrap_or_else(|e| {
warn!("Failed to get system time: {}; using fallback value 0", e);
0
}),
}
}
}