Skip to main content

cqlite_core/storage/sstable/reader/
integrity.rs

1//! Integrity checking and health monitoring methods for SSTableReader
2//!
3//! This module contains methods for checking SSTable integrity, monitoring health,
4//! and handling tombstone filtering.
5
6use super::{IntegrityCheckResult, IntegrityStatus, SSTableReader, SSTableReaderHealthMetrics};
7use crate::types::Value;
8use crate::Result;
9
10use log::{debug, info};
11use std::io::SeekFrom;
12use tokio::io::AsyncSeekExt;
13
14#[cfg(feature = "tombstones")]
15use super::super::tombstone_merger::GenerationValue;
16
17#[cfg(feature = "tombstones")]
18use crate::{types::TableId, RowKey};
19
20#[cfg(feature = "tombstones")]
21use log::warn;
22
23impl SSTableReader {
24    /// Get comprehensive reader health and performance metrics
25    pub async fn get_health_metrics(&self) -> Result<SSTableReaderHealthMetrics> {
26        let stats = self.stats().await?;
27
28        // Calculate actual cache hit rate from atomic counters
29        let cache_hit_rate = self.calculate_cache_hit_rate();
30
31        let memory_usage = self.estimate_memory_usage();
32
33        Ok(SSTableReaderHealthMetrics {
34            file_path: self.file_path.clone(),
35            file_accessible: self.file_path.exists(),
36            header_version: self.header.cassandra_version,
37            total_file_size: stats.file_size,
38            estimated_memory_usage: memory_usage,
39            block_cache_entries: self.block_cache.len(),
40            block_cache_hit_rate: cache_hit_rate,
41            compression_enabled: self.compression_reader.is_some(),
42            compression_algorithm: self.header.compression.algorithm.clone(),
43            bloom_filter_enabled: self.bloom_filter.is_some(),
44            index_available: self.index.is_some(),
45            generation: self.generation,
46            last_error: None,
47        })
48    }
49
50    /// Perform integrity check on the SSTable file
51    pub async fn perform_integrity_check(&self) -> Result<IntegrityCheckResult> {
52        debug!("Starting integrity check for {:?}", self.file_path);
53
54        let mut result = IntegrityCheckResult {
55            file_path: self.file_path.clone(),
56            total_blocks_checked: 0,
57            corrupted_blocks: Vec::new(),
58            checksum_mismatches: 0,
59            unreadable_blocks: 0,
60            total_entries: 0,
61            parsing_errors: Vec::new(),
62            overall_status: IntegrityStatus::Healthy,
63        };
64
65        // Save current position
66        let original_position = {
67            let mut file_guard = self.file.lock().await;
68            file_guard.stream_position().await.unwrap_or(0)
69        };
70
71        // Reset to data section
72        let header_size = self.calculate_header_size();
73        {
74            let mut file_guard = self.file.lock().await;
75            file_guard.seek(SeekFrom::Start(header_size as u64)).await?;
76        }
77
78        // Check each block
79        while let Some(block_data) = self.read_next_block().await.ok().flatten() {
80            result.total_blocks_checked += 1;
81
82            // Try to parse block entries
83            match self.parse_block_entries(&block_data, None) {
84                Ok(entries) => {
85                    result.total_entries += entries.len();
86                }
87                Err(e) => {
88                    result
89                        .parsing_errors
90                        .push(format!("Block {}: {}", result.total_blocks_checked, e));
91                    result.corrupted_blocks.push(result.total_blocks_checked);
92                }
93            }
94
95            // Yield control periodically
96            if result.total_blocks_checked % 100 == 0 {
97                tokio::task::yield_now().await;
98            }
99        }
100
101        // Restore original position
102        {
103            let mut file_guard = self.file.lock().await;
104            file_guard.seek(SeekFrom::Start(original_position)).await?;
105        }
106
107        // Determine overall status
108        result.overall_status =
109            if !result.corrupted_blocks.is_empty() || !result.parsing_errors.is_empty() {
110                IntegrityStatus::Corrupted
111            } else if result.checksum_mismatches > 0 {
112                IntegrityStatus::Degraded
113            } else {
114                IntegrityStatus::Healthy
115            };
116
117        info!(
118            "Integrity check completed for {:?}: {:?}, {} blocks checked, {} entries",
119            self.file_path,
120            result.overall_status,
121            result.total_blocks_checked,
122            result.total_entries
123        );
124
125        Ok(result)
126    }
127
128    /// Enhanced tombstone filtering using TombstoneMerger
129    #[cfg(feature = "tombstones")]
130    pub(super) fn filter_tombstone(&self, value: &Value) -> bool {
131        // Use the fast tombstone check for performance
132        let write_time = self.extract_write_time_from_value(value);
133
134        if self
135            .tombstone_merger
136            .fast_tombstone_check(value, write_time)
137        {
138            // Value is deleted by tombstone
139            return false;
140        }
141
142        // Check for TTL expiration on regular values
143        if let Some(ttl) = self.extract_ttl_from_value(value) {
144            let current_time = std::time::SystemTime::now()
145                .duration_since(std::time::UNIX_EPOCH)
146                .map(|d| d.as_micros() as i64)
147                .unwrap_or_else(|e| {
148                    warn!("Failed to get system time: {}; using fallback value 0", e);
149                    0
150                });
151
152            if current_time > write_time + ttl {
153                // Value has expired
154                return false;
155            }
156        }
157
158        true // Keep valid, non-deleted values
159    }
160
161    /// Simple tombstone filtering (fallback when tombstones feature is disabled).
162    ///
163    /// Row tombstones (`Value::Tombstone(RowTombstone)`) are always filtered out of
164    /// user-facing scan/get results, regardless of the `tombstones` feature flag.
165    /// This prevents deleted rows that are still present on disk (either from a live
166    /// SSTable that contains a tombstone entry, or from a post-compaction SSTable
167    /// that preserved tombstone rows for GC purposes) from appearing in query results.
168    ///
169    /// Cell tombstones (`Value::Tombstone(CellTombstone)`) within a Map are NOT
170    /// filtered here — they are preserved so callers can inspect them.  If a caller
171    /// needs to suppress null-cell entries, it should do so at the query layer.
172    ///
173    /// (Issue #505)
174    #[cfg(not(feature = "tombstones"))]
175    pub(super) fn filter_tombstone(&self, value: &Value) -> bool {
176        use crate::types::TombstoneType;
177        // Filter out row-level tombstones; keep everything else.
178        !matches!(
179            value,
180            Value::Tombstone(info)
181                if info.tombstone_type == TombstoneType::RowTombstone
182        )
183    }
184
185    /// Enhanced multi-generation tombstone filtering for compaction
186    #[cfg(feature = "tombstones")]
187    pub async fn filter_with_multi_generation_merge(
188        &self,
189        table_id: &TableId,
190        entries: Vec<(RowKey, Vec<GenerationValue>)>,
191    ) -> Result<Vec<(RowKey, Value)>> {
192        let mut results = Vec::new();
193
194        log::debug!(
195            "Processing {} key groups for multi-generation merge",
196            entries.len()
197        );
198
199        // Use batch processing for better performance
200        const BATCH_SIZE: usize = 1000;
201
202        let batches: Vec<_> = entries.chunks(BATCH_SIZE).collect();
203
204        for (batch_idx, batch) in batches.iter().enumerate() {
205            log::debug!(
206                "Processing batch {}/{} with {} entries",
207                batch_idx + 1,
208                batches.len(),
209                batch.len()
210            );
211
212            let batch_entries = batch.to_vec();
213            let merged_results = self
214                .tombstone_merger
215                .batch_merge_with_tombstones(batch_entries, BATCH_SIZE)?;
216
217            for (key, merged_value) in merged_results {
218                if let Some(value) = merged_value {
219                    if self.should_include_value_after_merge(&value, table_id, &key)? {
220                        results.push((key, value));
221                    }
222                } else {
223                    // Value was completely tombstoned
224                    log::debug!("Value for key {:?} was completely tombstoned", key);
225                }
226            }
227        }
228
229        log::debug!(
230            "Multi-generation merge completed: {} final results from {} input groups",
231            results.len(),
232            entries.len()
233        );
234
235        Ok(results)
236    }
237
238    /// Enhanced filtering logic for post-merge values including collection validation
239    #[cfg(feature = "tombstones")]
240    #[allow(clippy::only_used_in_recursion)]
241    fn should_include_value_after_merge(
242        &self,
243        value: &Value,
244        _table_id: &TableId,
245        _key: &RowKey,
246    ) -> Result<bool> {
247        match value {
248            // Skip null values
249            Value::Null => Ok(false),
250
251            // For collections, check if they have valid content
252            Value::List(list) => Ok(!list.is_empty()),
253            Value::Set(set) => Ok(!set.is_empty()),
254            Value::Map(map) => Ok(!map.is_empty()),
255
256            // For UDTs, check if they have non-null fields
257            Value::Udt(udt) => {
258                let has_non_null_fields = udt.fields.iter().any(|field| field.value.is_some());
259                Ok(has_non_null_fields)
260            }
261
262            // For frozen values, recursively check the inner value
263            Value::Frozen(boxed_value) => {
264                self.should_include_value_after_merge(boxed_value, _table_id, _key)
265            }
266
267            // Tombstones should not be included in final results
268            Value::Tombstone(_) => Ok(false),
269
270            // All other value types are included
271            _ => Ok(true),
272        }
273    }
274
275    /// Extract TTL from value metadata
276    #[cfg(feature = "tombstones")]
277    fn extract_ttl_from_value(&self, value: &Value) -> Option<i64> {
278        match value {
279            Value::Tombstone(info) => info.ttl,
280            _ => None, // Regular values would have TTL in SSTable metadata
281        }
282    }
283
284    /// Extract write time from value
285    #[cfg(feature = "tombstones")]
286    fn extract_write_time_from_value(&self, value: &Value) -> i64 {
287        match value {
288            Value::Tombstone(info) => info.deletion_time,
289            _ => std::time::SystemTime::now()
290                .duration_since(std::time::UNIX_EPOCH)
291                .map(|d| d.as_micros() as i64)
292                .unwrap_or_else(|e| {
293                    warn!("Failed to get system time: {}; using fallback value 0", e);
294                    0
295                }),
296        }
297    }
298}