cqlite_core/storage/sstable/reader/
integrity.rs1use super::{IntegrityCheckResult, IntegrityStatus, SSTableReader, SSTableReaderHealthMetrics};
7use crate::types::Value;
8use crate::Result;
9
10use log::{debug, info};
11use std::io::SeekFrom;
12use tokio::io::AsyncSeekExt;
13
14#[cfg(feature = "tombstones")]
15use super::super::tombstone_merger::GenerationValue;
16
17#[cfg(feature = "tombstones")]
18use crate::{types::TableId, RowKey};
19
20#[cfg(feature = "tombstones")]
21use log::warn;
22
23impl SSTableReader {
24 pub async fn get_health_metrics(&self) -> Result<SSTableReaderHealthMetrics> {
26 let stats = self.stats().await?;
27
28 let cache_hit_rate = self.calculate_cache_hit_rate();
30
31 let memory_usage = self.estimate_memory_usage();
32
33 Ok(SSTableReaderHealthMetrics {
34 file_path: self.file_path.clone(),
35 file_accessible: self.file_path.exists(),
36 header_version: self.header.cassandra_version,
37 total_file_size: stats.file_size,
38 estimated_memory_usage: memory_usage,
39 block_cache_entries: self.block_cache.len(),
40 block_cache_hit_rate: cache_hit_rate,
41 compression_enabled: self.compression_reader.is_some(),
42 compression_algorithm: self.header.compression.algorithm.clone(),
43 bloom_filter_enabled: self.bloom_filter.is_some(),
44 index_available: self.index.is_some(),
45 generation: self.generation,
46 last_error: None,
47 })
48 }
49
50 pub async fn perform_integrity_check(&self) -> Result<IntegrityCheckResult> {
52 debug!("Starting integrity check for {:?}", self.file_path);
53
54 let mut result = IntegrityCheckResult {
55 file_path: self.file_path.clone(),
56 total_blocks_checked: 0,
57 corrupted_blocks: Vec::new(),
58 checksum_mismatches: 0,
59 unreadable_blocks: 0,
60 total_entries: 0,
61 parsing_errors: Vec::new(),
62 overall_status: IntegrityStatus::Healthy,
63 };
64
65 let original_position = {
67 let mut file_guard = self.file.lock().await;
68 file_guard.stream_position().await.unwrap_or(0)
69 };
70
71 let header_size = self.calculate_header_size();
73 {
74 let mut file_guard = self.file.lock().await;
75 file_guard.seek(SeekFrom::Start(header_size as u64)).await?;
76 }
77
78 while let Some(block_data) = self.read_next_block().await.ok().flatten() {
80 result.total_blocks_checked += 1;
81
82 match self.parse_block_entries(&block_data, None) {
84 Ok(entries) => {
85 result.total_entries += entries.len();
86 }
87 Err(e) => {
88 result
89 .parsing_errors
90 .push(format!("Block {}: {}", result.total_blocks_checked, e));
91 result.corrupted_blocks.push(result.total_blocks_checked);
92 }
93 }
94
95 if result.total_blocks_checked % 100 == 0 {
97 tokio::task::yield_now().await;
98 }
99 }
100
101 {
103 let mut file_guard = self.file.lock().await;
104 file_guard.seek(SeekFrom::Start(original_position)).await?;
105 }
106
107 result.overall_status =
109 if !result.corrupted_blocks.is_empty() || !result.parsing_errors.is_empty() {
110 IntegrityStatus::Corrupted
111 } else if result.checksum_mismatches > 0 {
112 IntegrityStatus::Degraded
113 } else {
114 IntegrityStatus::Healthy
115 };
116
117 info!(
118 "Integrity check completed for {:?}: {:?}, {} blocks checked, {} entries",
119 self.file_path,
120 result.overall_status,
121 result.total_blocks_checked,
122 result.total_entries
123 );
124
125 Ok(result)
126 }
127
128 #[cfg(feature = "tombstones")]
130 pub(super) fn filter_tombstone(&self, value: &Value) -> bool {
131 let write_time = self.extract_write_time_from_value(value);
133
134 if self
135 .tombstone_merger
136 .fast_tombstone_check(value, write_time)
137 {
138 return false;
140 }
141
142 if let Some(ttl) = self.extract_ttl_from_value(value) {
144 let current_time = std::time::SystemTime::now()
145 .duration_since(std::time::UNIX_EPOCH)
146 .map(|d| d.as_micros() as i64)
147 .unwrap_or_else(|e| {
148 warn!("Failed to get system time: {}; using fallback value 0", e);
149 0
150 });
151
152 if current_time > write_time + ttl {
153 return false;
155 }
156 }
157
158 true }
160
161 #[cfg(not(feature = "tombstones"))]
175 pub(super) fn filter_tombstone(&self, value: &Value) -> bool {
176 use crate::types::TombstoneType;
177 !matches!(
179 value,
180 Value::Tombstone(info)
181 if info.tombstone_type == TombstoneType::RowTombstone
182 )
183 }
184
185 #[cfg(feature = "tombstones")]
187 pub async fn filter_with_multi_generation_merge(
188 &self,
189 table_id: &TableId,
190 entries: Vec<(RowKey, Vec<GenerationValue>)>,
191 ) -> Result<Vec<(RowKey, Value)>> {
192 let mut results = Vec::new();
193
194 log::debug!(
195 "Processing {} key groups for multi-generation merge",
196 entries.len()
197 );
198
199 const BATCH_SIZE: usize = 1000;
201
202 let batches: Vec<_> = entries.chunks(BATCH_SIZE).collect();
203
204 for (batch_idx, batch) in batches.iter().enumerate() {
205 log::debug!(
206 "Processing batch {}/{} with {} entries",
207 batch_idx + 1,
208 batches.len(),
209 batch.len()
210 );
211
212 let batch_entries = batch.to_vec();
213 let merged_results = self
214 .tombstone_merger
215 .batch_merge_with_tombstones(batch_entries, BATCH_SIZE)?;
216
217 for (key, merged_value) in merged_results {
218 if let Some(value) = merged_value {
219 if self.should_include_value_after_merge(&value, table_id, &key)? {
220 results.push((key, value));
221 }
222 } else {
223 log::debug!("Value for key {:?} was completely tombstoned", key);
225 }
226 }
227 }
228
229 log::debug!(
230 "Multi-generation merge completed: {} final results from {} input groups",
231 results.len(),
232 entries.len()
233 );
234
235 Ok(results)
236 }
237
238 #[cfg(feature = "tombstones")]
240 #[allow(clippy::only_used_in_recursion)]
241 fn should_include_value_after_merge(
242 &self,
243 value: &Value,
244 _table_id: &TableId,
245 _key: &RowKey,
246 ) -> Result<bool> {
247 match value {
248 Value::Null => Ok(false),
250
251 Value::List(list) => Ok(!list.is_empty()),
253 Value::Set(set) => Ok(!set.is_empty()),
254 Value::Map(map) => Ok(!map.is_empty()),
255
256 Value::Udt(udt) => {
258 let has_non_null_fields = udt.fields.iter().any(|field| field.value.is_some());
259 Ok(has_non_null_fields)
260 }
261
262 Value::Frozen(boxed_value) => {
264 self.should_include_value_after_merge(boxed_value, _table_id, _key)
265 }
266
267 Value::Tombstone(_) => Ok(false),
269
270 _ => Ok(true),
272 }
273 }
274
275 #[cfg(feature = "tombstones")]
277 fn extract_ttl_from_value(&self, value: &Value) -> Option<i64> {
278 match value {
279 Value::Tombstone(info) => info.ttl,
280 _ => None, }
282 }
283
284 #[cfg(feature = "tombstones")]
286 fn extract_write_time_from_value(&self, value: &Value) -> i64 {
287 match value {
288 Value::Tombstone(info) => info.deletion_time,
289 _ => std::time::SystemTime::now()
290 .duration_since(std::time::UNIX_EPOCH)
291 .map(|d| d.as_micros() as i64)
292 .unwrap_or_else(|e| {
293 warn!("Failed to get system time: {}; using fallback value 0", e);
294 0
295 }),
296 }
297 }
298}