Skip to main content

amaters_core/storage/
value_log_gc.rs

1//! Garbage Collection for Value Log
2//!
3//! This module contains GC-related types and methods for the ValueLog.
4//! It handles space reclamation by identifying segments with high dead ratios
5//! and rewriting live entries to new segments.
6
7use crate::error::{AmateRSError, ErrorContext, Result};
8use crate::types::{CipherBlob, Key};
9use std::fs::{File, OpenOptions};
10use std::io::{BufReader, BufWriter, Read, Write};
11use std::path::Path;
12use std::sync::atomic::Ordering;
13use std::time::{Duration, Instant};
14
15use super::value_log::{VLogEntry, ValueLog};
16
17/// Per-segment statistics for tracking live/dead entries
18#[derive(Debug, Clone)]
19pub struct SegmentStats {
20    /// Total bytes in the segment
21    pub total_bytes: u64,
22    /// Bytes occupied by live entries
23    pub live_bytes: u64,
24    /// Bytes occupied by dead (invalidated) entries
25    pub dead_bytes: u64,
26    /// Total number of entries written to this segment
27    pub entry_count: u64,
28    /// Number of live entries remaining
29    pub live_count: u64,
30    /// Timestamp when the segment was created
31    pub created_at: Instant,
32}
33
34impl SegmentStats {
35    /// Create new stats for a fresh segment
36    pub(crate) fn new() -> Self {
37        Self {
38            total_bytes: 0,
39            live_bytes: 0,
40            dead_bytes: 0,
41            entry_count: 0,
42            live_count: 0,
43            created_at: Instant::now(),
44        }
45    }
46
47    /// Record a new live entry
48    pub(crate) fn record_write(&mut self, entry_bytes: u64) {
49        self.total_bytes += entry_bytes;
50        self.live_bytes += entry_bytes;
51        self.entry_count += 1;
52        self.live_count += 1;
53    }
54
55    /// Mark an entry as dead (move bytes from live to dead)
56    pub(crate) fn mark_entry_dead(&mut self, entry_bytes: u64) {
57        let move_bytes = entry_bytes.min(self.live_bytes);
58        self.live_bytes -= move_bytes;
59        self.dead_bytes += move_bytes;
60        if self.live_count > 0 {
61            self.live_count -= 1;
62        }
63    }
64
65    /// Get the dead ratio (dead_bytes / total_bytes)
66    pub fn dead_ratio(&self) -> f64 {
67        if self.total_bytes == 0 {
68            0.0
69        } else {
70            self.dead_bytes as f64 / self.total_bytes as f64
71        }
72    }
73}
74
75/// Configuration for garbage collection
76#[derive(Debug, Clone)]
77pub struct GcConfig {
78    /// Dead ratio threshold to trigger GC (default: 0.5 = 50%)
79    pub trigger_threshold: f64,
80    /// Minimum age of a segment before it can be GC'd (default: 1 hour)
81    pub min_segment_age: Duration,
82    /// Maximum bytes to process per GC run (default: 256MB)
83    pub max_gc_bytes_per_run: u64,
84}
85
86impl Default for GcConfig {
87    fn default() -> Self {
88        Self {
89            trigger_threshold: 0.5,
90            min_segment_age: Duration::from_secs(3600),
91            max_gc_bytes_per_run: 256 * 1024 * 1024,
92        }
93    }
94}
95
96/// Result of a garbage collection run
97#[derive(Debug, Clone)]
98pub struct GcResult {
99    /// Number of segments that were collected
100    pub segments_collected: usize,
101    /// Total bytes reclaimed
102    pub bytes_reclaimed: u64,
103    /// Total entries rewritten to new segments
104    pub entries_rewritten: u64,
105    /// Duration of the GC run
106    pub duration: Duration,
107}
108
109/// Garbage collection statistics
110#[derive(Debug, Clone)]
111pub struct GcStats {
112    /// File ID that was garbage collected
113    pub file_id: u64,
114    /// Number of live values kept
115    pub live_count: usize,
116    /// Number of dead values removed
117    pub dead_count: usize,
118    /// Bytes reclaimed
119    pub reclaimed_bytes: u64,
120}
121
122/// GC-related methods for ValueLog
123impl ValueLog {
124    /// Mark a value as dead/stale in segment stats
125    ///
126    /// This should be called when a key is overwritten or deleted,
127    /// invalidating the old value in the vLog.
128    pub fn mark_dead(&self, pointer: &super::value_log::ValuePointer) {
129        if let Some(mut stats) = self.segment_stats.get_mut(&pointer.file_id) {
130            stats.mark_entry_dead(pointer.length as u64);
131        }
132    }
133
134    /// Get the dead ratio for a given segment (file_id)
135    ///
136    /// Returns dead_bytes / total_bytes, or 0.0 if the segment has no data.
137    pub fn dead_ratio(&self, file_id: u64) -> f64 {
138        self.segment_stats
139            .get(&file_id)
140            .map(|stats| stats.dead_ratio())
141            .unwrap_or(0.0)
142    }
143
144    /// Get a copy of the segment stats for a given file_id
145    pub fn segment_stats(&self, file_id: u64) -> Option<SegmentStats> {
146        self.segment_stats.get(&file_id).map(|s| s.clone())
147    }
148
149    /// Check if GC is currently running
150    pub fn is_gc_running(&self) -> bool {
151        self.gc_running.load(Ordering::Relaxed)
152    }
153
154    /// Get total reclaimable bytes across all segments
155    pub fn total_reclaimable_bytes(&self) -> u64 {
156        self.segment_stats
157            .iter()
158            .map(|entry| entry.value().dead_bytes)
159            .sum()
160    }
161
162    /// Collect garbage across all eligible segments
163    ///
164    /// Finds segments exceeding the dead ratio threshold and old enough,
165    /// then rewrites live entries to new segments and deletes old ones.
166    ///
167    /// `is_live_fn`: Function that checks if a key is still live in the LSM-Tree.
168    /// The function receives the key and should return true if the key's current
169    /// value pointer still points to this entry.
170    pub fn collect_garbage<F>(&self, is_live_fn: F) -> Result<GcResult>
171    where
172        F: Fn(&Key) -> bool,
173    {
174        // Set GC running flag
175        if self
176            .gc_running
177            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
178            .is_err()
179        {
180            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
181                "GC is already running",
182            )));
183        }
184
185        let start = Instant::now();
186        let result = self.collect_garbage_inner(&is_live_fn);
187
188        // Always clear the GC flag
189        self.gc_running.store(false, Ordering::SeqCst);
190
191        result.map(
192            |(segments_collected, bytes_reclaimed, entries_rewritten)| GcResult {
193                segments_collected,
194                bytes_reclaimed,
195                entries_rewritten,
196                duration: start.elapsed(),
197            },
198        )
199    }
200
201    /// Inner GC logic (separated for clean flag management)
202    fn collect_garbage_inner<F>(&self, is_live_fn: &F) -> Result<(usize, u64, u64)>
203    where
204        F: Fn(&Key) -> bool,
205    {
206        let current_file_id = *self.current_file_id.read();
207        let threshold = self.gc_config.trigger_threshold;
208        let min_age = self.gc_config.min_segment_age;
209        let max_bytes = self.gc_config.max_gc_bytes_per_run;
210
211        // Find candidate segments
212        let mut candidates: Vec<(u64, f64, u64)> = Vec::new();
213        for entry in self.segment_stats.iter() {
214            let seg_id = *entry.key();
215            let stats = entry.value();
216
217            // Skip the active segment
218            if seg_id == current_file_id {
219                continue;
220            }
221
222            // Check age
223            if stats.created_at.elapsed() < min_age {
224                continue;
225            }
226
227            // Check dead ratio
228            let ratio = stats.dead_ratio();
229            if ratio >= threshold {
230                candidates.push((seg_id, ratio, stats.total_bytes));
231            }
232        }
233
234        // Sort by dead ratio descending (highest garbage first)
235        candidates.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
236
237        let mut total_segments = 0usize;
238        let mut total_bytes_reclaimed = 0u64;
239        let mut total_entries_rewritten = 0u64;
240        let mut bytes_processed = 0u64;
241
242        for (seg_id, _ratio, seg_bytes) in candidates {
243            if bytes_processed + seg_bytes > max_bytes {
244                break;
245            }
246
247            match self.reclaim_segment(seg_id, is_live_fn) {
248                Ok((reclaimed, rewritten)) => {
249                    total_segments += 1;
250                    total_bytes_reclaimed += reclaimed;
251                    total_entries_rewritten += rewritten;
252                    bytes_processed += seg_bytes;
253                }
254                Err(e) => {
255                    // Log but continue with other segments
256                    tracing::warn!("GC failed for segment {}: {}", seg_id, e);
257                }
258            }
259        }
260
261        Ok((
262            total_segments,
263            total_bytes_reclaimed,
264            total_entries_rewritten,
265        ))
266    }
267
268    /// Reclaim a single segment by rewriting live entries to the active segment
269    ///
270    /// Returns (bytes_reclaimed, entries_rewritten) on success.
271    pub fn reclaim_segment<F>(&self, file_id: u64, is_live_fn: &F) -> Result<(u64, u64)>
272    where
273        F: Fn(&Key) -> bool,
274    {
275        let file_path = Self::vlog_file_path(&self.config.vlog_dir, file_id);
276
277        // Acquire write lock on the segment to ensure no readers during deletion
278        let reader_lock = self
279            .segment_readers
280            .entry(file_id)
281            .or_insert_with(|| std::sync::Arc::new(parking_lot::RwLock::new(())))
282            .clone();
283
284        // Read all live entries first (under read lock, so concurrent reads still work)
285        let (live_entries, original_size) = {
286            let _read_guard = reader_lock.read();
287            self.read_live_entries(file_id, is_live_fn)?
288        };
289
290        let entries_rewritten = live_entries.len() as u64;
291
292        // Write live entries to the current active segment
293        for (key, value) in &live_entries {
294            self.append(key.clone(), value.clone())?;
295        }
296        self.flush()?;
297
298        // Now acquire write lock to safely delete the old segment
299        {
300            let _write_guard = reader_lock.write();
301            if file_path.exists() {
302                std::fs::remove_file(&file_path).map_err(|e| {
303                    AmateRSError::IoError(ErrorContext::new(format!(
304                        "Failed to delete old vLog segment {}: {}",
305                        file_id, e
306                    )))
307                })?;
308            }
309        }
310
311        // Calculate reclaimed bytes
312        let new_live_bytes: u64 = live_entries
313            .iter()
314            .map(|(k, v)| {
315                // entry overhead: magic(4) + key_len(4) + key + value_len(4) + value + checksum(4)
316                (16 + k.len() + v.len()) as u64
317            })
318            .sum();
319        let bytes_reclaimed = original_size.saturating_sub(new_live_bytes);
320
321        // Remove old segment stats and reader lock
322        self.segment_stats.remove(&file_id);
323        self.segment_readers.remove(&file_id);
324
325        Ok((bytes_reclaimed, entries_rewritten))
326    }
327
328    /// Read all live entries from a segment
329    fn read_live_entries<F>(
330        &self,
331        file_id: u64,
332        is_live_fn: &F,
333    ) -> Result<(Vec<(Key, CipherBlob)>, u64)>
334    where
335        F: Fn(&Key) -> bool,
336    {
337        let file_path = Self::vlog_file_path(&self.config.vlog_dir, file_id);
338
339        let old_file = File::open(&file_path).map_err(|e| {
340            AmateRSError::IoError(ErrorContext::new(format!(
341                "Failed to open vLog segment {} for GC: {}",
342                file_id, e
343            )))
344        })?;
345
346        let file_size = old_file
347            .metadata()
348            .map_err(|e| {
349                AmateRSError::IoError(ErrorContext::new(format!(
350                    "Failed to get segment {} size: {}",
351                    file_id, e
352                )))
353            })?
354            .len();
355
356        let mut reader = BufReader::new(old_file);
357        let mut offset = 0u64;
358        let mut live_entries = Vec::new();
359
360        while offset < file_size {
361            match Self::read_next_entry(&mut reader) {
362                Ok(Some((key, value, entry_size))) => {
363                    offset += entry_size as u64;
364                    if is_live_fn(&key) {
365                        live_entries.push((key, value));
366                    }
367                }
368                Ok(None) => break,
369                Err(e) => {
370                    tracing::warn!(
371                        "Error reading entry at offset {} in segment {}: {}",
372                        offset,
373                        file_id,
374                        e
375                    );
376                    break;
377                }
378            }
379        }
380
381        Ok((live_entries, file_size))
382    }
383
384    /// Read the next entry from a reader, returning (key, value, entry_size) or None at EOF
385    fn read_next_entry(reader: &mut BufReader<File>) -> Result<Option<(Key, CipherBlob, usize)>> {
386        // Read magic
387        let mut magic_bytes = [0u8; 4];
388        match reader.read_exact(&mut magic_bytes) {
389            Ok(()) => {}
390            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
391            Err(e) => {
392                return Err(AmateRSError::IoError(ErrorContext::new(format!(
393                    "Failed to read magic: {}",
394                    e
395                ))));
396            }
397        }
398
399        let magic = u32::from_le_bytes(magic_bytes);
400        if magic != 0x564C4F47 {
401            return Ok(None);
402        }
403
404        // Read key length
405        let mut key_len_bytes = [0u8; 4];
406        reader.read_exact(&mut key_len_bytes).map_err(|e| {
407            AmateRSError::IoError(ErrorContext::new(format!(
408                "Failed to read key length: {}",
409                e
410            )))
411        })?;
412        let key_len = u32::from_le_bytes(key_len_bytes) as usize;
413
414        // Read key
415        let mut key_bytes = vec![0u8; key_len];
416        reader.read_exact(&mut key_bytes).map_err(|e| {
417            AmateRSError::IoError(ErrorContext::new(format!("Failed to read key: {}", e)))
418        })?;
419        let key = Key::from_slice(&key_bytes);
420
421        // Read value length
422        let mut value_len_bytes = [0u8; 4];
423        reader.read_exact(&mut value_len_bytes).map_err(|e| {
424            AmateRSError::IoError(ErrorContext::new(format!(
425                "Failed to read value length: {}",
426                e
427            )))
428        })?;
429        let value_len = u32::from_le_bytes(value_len_bytes) as usize;
430
431        // Read value
432        let mut value_bytes = vec![0u8; value_len];
433        reader.read_exact(&mut value_bytes).map_err(|e| {
434            AmateRSError::IoError(ErrorContext::new(format!("Failed to read value: {}", e)))
435        })?;
436        let value = CipherBlob::new(value_bytes);
437
438        // Read checksum
439        let mut checksum_bytes = [0u8; 4];
440        reader.read_exact(&mut checksum_bytes).map_err(|e| {
441            AmateRSError::IoError(ErrorContext::new(format!("Failed to read checksum: {}", e)))
442        })?;
443
444        let entry_size = 4 + 4 + key_len + 4 + value_len + 4;
445
446        Ok(Some((key, value, entry_size)))
447    }
448
449    /// Perform garbage collection on a vLog file
450    ///
451    /// Scans the file and rewrites live values to a new file, discarding dead values.
452    /// This is typically called when a file has too much garbage.
453    ///
454    /// `is_live_fn`: Function that checks if a key is still live in the LSM-Tree
455    pub fn garbage_collect_file<F>(&self, file_id: u64, is_live_fn: F) -> Result<GcStats>
456    where
457        F: Fn(&Key) -> bool,
458    {
459        let file_path = Self::vlog_file_path(&self.config.vlog_dir, file_id);
460
461        // Open old file for reading
462        let old_file = File::open(&file_path).map_err(|e| {
463            AmateRSError::IoError(ErrorContext::new(format!(
464                "Failed to open vLog file for GC: {}",
465                e
466            )))
467        })?;
468
469        let file_size = old_file
470            .metadata()
471            .map_err(|e| {
472                AmateRSError::IoError(ErrorContext::new(format!("Failed to get file size: {}", e)))
473            })?
474            .len();
475
476        let mut reader = BufReader::new(old_file);
477        let mut offset = 0u64;
478
479        let mut live_values = Vec::new();
480        let mut dead_count = 0usize;
481        let mut live_count = 0usize;
482
483        // Scan file and identify live values
484        while offset < file_size {
485            // Read entry length (magic + key_len + key + value_len + value + checksum)
486            let _start_offset = offset;
487
488            // Try to read entry
489            let mut magic_bytes = [0u8; 4];
490            match reader.read_exact(&mut magic_bytes) {
491                Ok(()) => {}
492                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
493                    // End of file
494                    break;
495                }
496                Err(e) => {
497                    return Err(AmateRSError::IoError(ErrorContext::new(format!(
498                        "Failed to read magic: {}",
499                        e
500                    ))));
501                }
502            }
503
504            // Verify magic
505            let magic = u32::from_le_bytes(magic_bytes);
506            if magic != 0x564C4F47 {
507                // Corrupted entry, skip
508                break;
509            }
510
511            // Read key length
512            let mut key_len_bytes = [0u8; 4];
513            reader.read_exact(&mut key_len_bytes).map_err(|e| {
514                AmateRSError::IoError(ErrorContext::new(format!(
515                    "Failed to read key length: {}",
516                    e
517                )))
518            })?;
519            let key_len = u32::from_le_bytes(key_len_bytes) as usize;
520
521            // Read key
522            let mut key_bytes = vec![0u8; key_len];
523            reader.read_exact(&mut key_bytes).map_err(|e| {
524                AmateRSError::IoError(ErrorContext::new(format!("Failed to read key: {}", e)))
525            })?;
526            let key = Key::from_slice(&key_bytes);
527
528            // Read value length
529            let mut value_len_bytes = [0u8; 4];
530            reader.read_exact(&mut value_len_bytes).map_err(|e| {
531                AmateRSError::IoError(ErrorContext::new(format!(
532                    "Failed to read value length: {}",
533                    e
534                )))
535            })?;
536            let value_len = u32::from_le_bytes(value_len_bytes) as usize;
537
538            // Read value
539            let mut value_bytes = vec![0u8; value_len];
540            reader.read_exact(&mut value_bytes).map_err(|e| {
541                AmateRSError::IoError(ErrorContext::new(format!("Failed to read value: {}", e)))
542            })?;
543            let value = CipherBlob::new(value_bytes);
544
545            // Read checksum
546            let mut checksum_bytes = [0u8; 4];
547            reader.read_exact(&mut checksum_bytes).map_err(|e| {
548                AmateRSError::IoError(ErrorContext::new(format!("Failed to read checksum: {}", e)))
549            })?;
550
551            // Calculate entry size
552            let entry_size = 4 + 4 + key_len + 4 + value_len + 4;
553            offset += entry_size as u64;
554
555            // Check if value is live
556            if is_live_fn(&key) {
557                live_values.push((key, value));
558                live_count += 1;
559            } else {
560                dead_count += 1;
561            }
562        }
563
564        // Rewrite live values to new file
565        let new_file_id = Self::find_latest_vlog(&self.config)? + 1;
566        let new_file_path = Self::vlog_file_path(&self.config.vlog_dir, new_file_id);
567
568        let new_file = OpenOptions::new()
569            .create(true)
570            .write(true)
571            .truncate(true)
572            .open(&new_file_path)
573            .map_err(|e| {
574                AmateRSError::IoError(ErrorContext::new(format!(
575                    "Failed to create new vLog file: {}",
576                    e
577                )))
578            })?;
579
580        let mut new_writer = BufWriter::new(new_file);
581
582        for (key, value) in live_values {
583            let entry = VLogEntry::new(key, value);
584            let entry_bytes = entry.encode();
585            new_writer.write_all(&entry_bytes).map_err(|e| {
586                AmateRSError::IoError(ErrorContext::new(format!(
587                    "Failed to write GC entry: {}",
588                    e
589                )))
590            })?;
591        }
592
593        new_writer.flush().map_err(|e| {
594            AmateRSError::IoError(ErrorContext::new(format!("Failed to flush GC file: {}", e)))
595        })?;
596
597        // Delete old file
598        std::fs::remove_file(&file_path).map_err(|e| {
599            AmateRSError::IoError(ErrorContext::new(format!(
600                "Failed to delete old vLog file: {}",
601                e
602            )))
603        })?;
604
605        Ok(GcStats {
606            file_id,
607            live_count,
608            dead_count,
609            reclaimed_bytes: file_size
610                - new_writer
611                    .get_ref()
612                    .metadata()
613                    .map_err(|e| {
614                        AmateRSError::IoError(ErrorContext::new(format!(
615                            "Failed to get new file size: {}",
616                            e
617                        )))
618                    })?
619                    .len(),
620        })
621    }
622
623    /// Calculate garbage ratio for a vLog file
624    ///
625    /// Returns the ratio of dead values to total values.
626    /// This can be used to determine if GC is needed.
627    pub fn calculate_garbage_ratio<F>(&self, file_id: u64, is_live_fn: F) -> Result<f64>
628    where
629        F: Fn(&Key) -> bool,
630    {
631        let file_path = Self::vlog_file_path(&self.config.vlog_dir, file_id);
632
633        let file = File::open(&file_path).map_err(|e| {
634            AmateRSError::IoError(ErrorContext::new(format!(
635                "Failed to open vLog file: {}",
636                e
637            )))
638        })?;
639
640        let file_size = file
641            .metadata()
642            .map_err(|e| {
643                AmateRSError::IoError(ErrorContext::new(format!("Failed to get file size: {}", e)))
644            })?
645            .len();
646
647        let mut reader = BufReader::new(file);
648        let mut offset = 0u64;
649
650        let mut live_bytes = 0u64;
651        let mut dead_bytes = 0u64;
652
653        while offset < file_size {
654            let _start_offset = offset;
655
656            // Try to read entry
657            let mut magic_bytes = [0u8; 4];
658            match reader.read_exact(&mut magic_bytes) {
659                Ok(()) => {}
660                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
661                Err(e) => {
662                    return Err(AmateRSError::IoError(ErrorContext::new(format!(
663                        "Failed to read magic: {}",
664                        e
665                    ))));
666                }
667            }
668
669            let magic = u32::from_le_bytes(magic_bytes);
670            if magic != 0x564C4F47 {
671                break;
672            }
673
674            // Read key length
675            let mut key_len_bytes = [0u8; 4];
676            reader.read_exact(&mut key_len_bytes).map_err(|e| {
677                AmateRSError::IoError(ErrorContext::new(format!(
678                    "Failed to read key length: {}",
679                    e
680                )))
681            })?;
682            let key_len = u32::from_le_bytes(key_len_bytes) as usize;
683
684            // Read key
685            let mut key_bytes = vec![0u8; key_len];
686            reader.read_exact(&mut key_bytes).map_err(|e| {
687                AmateRSError::IoError(ErrorContext::new(format!("Failed to read key: {}", e)))
688            })?;
689            let key = Key::from_slice(&key_bytes);
690
691            // Read value length
692            let mut value_len_bytes = [0u8; 4];
693            reader.read_exact(&mut value_len_bytes).map_err(|e| {
694                AmateRSError::IoError(ErrorContext::new(format!(
695                    "Failed to read value length: {}",
696                    e
697                )))
698            })?;
699            let value_len = u32::from_le_bytes(value_len_bytes) as usize;
700
701            // Skip value
702            let mut value_bytes = vec![0u8; value_len];
703            reader.read_exact(&mut value_bytes).map_err(|e| {
704                AmateRSError::IoError(ErrorContext::new(format!("Failed to read value: {}", e)))
705            })?;
706
707            // Skip checksum
708            let mut checksum_bytes = [0u8; 4];
709            reader.read_exact(&mut checksum_bytes).map_err(|e| {
710                AmateRSError::IoError(ErrorContext::new(format!("Failed to read checksum: {}", e)))
711            })?;
712
713            let entry_size = 4 + 4 + key_len + 4 + value_len + 4;
714            offset += entry_size as u64;
715
716            if is_live_fn(&key) {
717                live_bytes += entry_size as u64;
718            } else {
719                dead_bytes += entry_size as u64;
720            }
721        }
722
723        let total_bytes = live_bytes + dead_bytes;
724        if total_bytes == 0 {
725            Ok(0.0)
726        } else {
727            Ok(dead_bytes as f64 / total_bytes as f64)
728        }
729    }
730}
731
732#[cfg(test)]
733mod tests {
734    use super::*;
735    use crate::storage::value_log::{ValueLog, ValueLogConfig, ValuePointer};
736    use std::env;
737    use std::path::PathBuf;
738
739    /// Helper to create a unique temp directory for each test
740    fn make_test_dir(name: &str) -> PathBuf {
741        let dir = env::temp_dir()
742            .join("amaters_vlog_gc_tests")
743            .join(name)
744            .join(format!("{}", std::process::id()));
745        std::fs::create_dir_all(&dir).ok();
746        // Clean any leftover files from prior runs
747        if let Ok(entries) = std::fs::read_dir(&dir) {
748            for entry in entries.flatten() {
749                std::fs::remove_file(entry.path()).ok();
750            }
751        }
752        dir
753    }
754
755    #[test]
756    fn test_segment_stats_tracking() -> Result<()> {
757        let temp_dir = make_test_dir("segment_stats");
758
759        let vlog = ValueLog::new(&temp_dir)?;
760        let file_id = vlog.current_file_id();
761
762        // Write some entries
763        let mut pointers = Vec::new();
764        for i in 0..5 {
765            let key = Key::from_str(&format!("stats_key_{}", i));
766            let value = CipherBlob::new(vec![i as u8; 500]);
767            let ptr = vlog.append(key, value)?;
768            pointers.push(ptr);
769        }
770        vlog.flush()?;
771
772        // Check stats
773        let stats = vlog
774            .segment_stats(file_id)
775            .expect("stats should exist for current segment");
776        assert_eq!(stats.entry_count, 5);
777        assert_eq!(stats.live_count, 5);
778        assert!(stats.total_bytes > 0);
779        assert_eq!(stats.dead_bytes, 0);
780        assert!((stats.dead_ratio() - 0.0).abs() < f64::EPSILON);
781
782        std::fs::remove_dir_all(&temp_dir).ok();
783        Ok(())
784    }
785
786    #[test]
787    fn test_mark_dead_and_dead_ratio() -> Result<()> {
788        let temp_dir = make_test_dir("mark_dead");
789
790        let vlog = ValueLog::new(&temp_dir)?;
791        let file_id = vlog.current_file_id();
792
793        // Write 4 entries of equal size
794        let mut pointers = Vec::new();
795        for i in 0..4 {
796            let key = Key::from_str(&format!("dead_key_{}", i));
797            let value = CipherBlob::new(vec![i as u8; 200]);
798            let ptr = vlog.append(key, value)?;
799            pointers.push(ptr);
800        }
801        vlog.flush()?;
802
803        // Initially, dead ratio should be 0
804        let ratio = vlog.dead_ratio(file_id);
805        assert!((ratio - 0.0).abs() < f64::EPSILON);
806
807        // Mark 2 of 4 entries dead
808        vlog.mark_dead(&pointers[0]);
809        vlog.mark_dead(&pointers[1]);
810
811        let ratio = vlog.dead_ratio(file_id);
812        // Each entry is same size, so marking 2 of 4 dead ~ 0.5
813        assert!(ratio > 0.45 && ratio < 0.55, "Expected ~0.5, got {}", ratio);
814
815        // Check stats reflect the change
816        let stats = vlog.segment_stats(file_id).expect("stats should exist");
817        assert_eq!(stats.live_count, 2);
818        assert_eq!(stats.entry_count, 4);
819
820        std::fs::remove_dir_all(&temp_dir).ok();
821        Ok(())
822    }
823
824    #[test]
825    fn test_total_reclaimable_bytes() -> Result<()> {
826        let temp_dir = make_test_dir("reclaimable");
827
828        let vlog = ValueLog::new(&temp_dir)?;
829
830        let mut pointers = Vec::new();
831        for i in 0..6 {
832            let key = Key::from_str(&format!("reclaim_key_{}", i));
833            let value = CipherBlob::new(vec![i as u8; 300]);
834            let ptr = vlog.append(key, value)?;
835            pointers.push(ptr);
836        }
837        vlog.flush()?;
838
839        // Initially no reclaimable bytes
840        assert_eq!(vlog.total_reclaimable_bytes(), 0);
841
842        // Mark 3 entries dead
843        for ptr in &pointers[0..3] {
844            vlog.mark_dead(ptr);
845        }
846
847        let reclaimable = vlog.total_reclaimable_bytes();
848        assert!(
849            reclaimable > 0,
850            "Expected reclaimable bytes > 0, got {}",
851            reclaimable
852        );
853
854        std::fs::remove_dir_all(&temp_dir).ok();
855        Ok(())
856    }
857
858    #[test]
859    fn test_gc_correctness() -> Result<()> {
860        let temp_dir = make_test_dir("gc_correctness");
861
862        // Use small file size so writes go to a single file, then rotate
863        let config = ValueLogConfig {
864            vlog_dir: temp_dir.clone(),
865            max_file_size: 100_000,
866            sync_on_write: true,
867            ..Default::default()
868        };
869        let gc_config = GcConfig {
870            trigger_threshold: 0.3,
871            min_segment_age: Duration::from_secs(0), // No age limit for test
872            max_gc_bytes_per_run: 1024 * 1024,
873        };
874
875        let vlog = ValueLog::with_config_and_gc(config, gc_config)?;
876
877        // Write values to segment 0
878        let mut pointers = Vec::new();
879        let mut values = Vec::new();
880        for i in 0..10 {
881            let key = Key::from_str(&format!("gc_key_{}", i));
882            let value = CipherBlob::new(vec![i as u8; 500]);
883            let ptr = vlog.append(key, value.clone())?;
884            pointers.push(ptr);
885            values.push(value);
886        }
887        vlog.flush()?;
888
889        let old_file_id = vlog.current_file_id();
890
891        // Force rotation so old segment is eligible for GC
892        vlog.rotate()?;
893
894        // Mark entries 5-9 as dead
895        for ptr in &pointers[5..10] {
896            vlog.mark_dead(ptr);
897        }
898
899        // Run GC - entries 0-4 are "live"
900        let result = vlog.collect_garbage(|key| {
901            let key_str = String::from_utf8_lossy(key.as_bytes());
902            if let Some(num_str) = key_str.strip_prefix("gc_key_") {
903                if let Ok(num) = num_str.parse::<usize>() {
904                    return num < 5;
905                }
906            }
907            false
908        })?;
909
910        assert_eq!(result.segments_collected, 1);
911        assert!(result.bytes_reclaimed > 0);
912        assert_eq!(result.entries_rewritten, 5);
913
914        // Verify old segment file is deleted
915        let old_path = ValueLog::vlog_file_path(&temp_dir, old_file_id);
916        assert!(
917            !old_path.exists(),
918            "Old segment file should have been deleted"
919        );
920
921        std::fs::remove_dir_all(&temp_dir).ok();
922        Ok(())
923    }
924
925    #[test]
926    fn test_gc_threshold_respected() -> Result<()> {
927        let temp_dir = make_test_dir("gc_threshold");
928
929        let config = ValueLogConfig {
930            vlog_dir: temp_dir.clone(),
931            max_file_size: 100_000,
932            sync_on_write: true,
933            ..Default::default()
934        };
935        let gc_config = GcConfig {
936            trigger_threshold: 0.8, // High threshold
937            min_segment_age: Duration::from_secs(0),
938            max_gc_bytes_per_run: 1024 * 1024,
939        };
940
941        let vlog = ValueLog::with_config_and_gc(config, gc_config)?;
942
943        // Write entries
944        let mut pointers = Vec::new();
945        for i in 0..10 {
946            let key = Key::from_str(&format!("thresh_key_{}", i));
947            let value = CipherBlob::new(vec![i as u8; 300]);
948            let ptr = vlog.append(key, value)?;
949            pointers.push(ptr);
950        }
951        vlog.flush()?;
952
953        let old_file_id = vlog.current_file_id();
954        vlog.rotate()?;
955
956        // Mark only 3 of 10 dead (~30% dead ratio, below 80% threshold)
957        for ptr in &pointers[0..3] {
958            vlog.mark_dead(ptr);
959        }
960
961        // GC should not collect this segment (ratio too low)
962        let result = vlog.collect_garbage(|_| true)?;
963        assert_eq!(
964            result.segments_collected, 0,
965            "GC should not trigger below threshold"
966        );
967
968        // Verify old segment still exists
969        let old_path = ValueLog::vlog_file_path(&temp_dir, old_file_id);
970        assert!(old_path.exists(), "Segment should still exist");
971
972        std::fs::remove_dir_all(&temp_dir).ok();
973        Ok(())
974    }
975
976    #[test]
977    fn test_gc_empty_segment() -> Result<()> {
978        let temp_dir = make_test_dir("gc_empty");
979
980        let config = ValueLogConfig {
981            vlog_dir: temp_dir.clone(),
982            max_file_size: 100_000,
983            sync_on_write: true,
984            ..Default::default()
985        };
986        let gc_config = GcConfig {
987            trigger_threshold: 0.3,
988            min_segment_age: Duration::from_secs(0),
989            max_gc_bytes_per_run: 1024 * 1024,
990        };
991
992        let vlog = ValueLog::with_config_and_gc(config, gc_config)?;
993
994        // Just rotate to create an empty old segment
995        let first_id = vlog.current_file_id();
996        vlog.rotate()?;
997
998        // GC should handle empty segment gracefully (no entries, no dead ratio)
999        let result = vlog.collect_garbage(|_| false)?;
1000        // Empty segment has 0 dead ratio, so it won't be collected
1001        assert_eq!(result.segments_collected, 0);
1002
1003        std::fs::remove_dir_all(&temp_dir).ok();
1004        let _ = first_id;
1005        Ok(())
1006    }
1007
1008    #[test]
1009    fn test_gc_all_dead_segment() -> Result<()> {
1010        let temp_dir = make_test_dir("gc_all_dead");
1011
1012        let config = ValueLogConfig {
1013            vlog_dir: temp_dir.clone(),
1014            max_file_size: 100_000,
1015            sync_on_write: true,
1016            ..Default::default()
1017        };
1018        let gc_config = GcConfig {
1019            trigger_threshold: 0.5,
1020            min_segment_age: Duration::from_secs(0),
1021            max_gc_bytes_per_run: 1024 * 1024,
1022        };
1023
1024        let vlog = ValueLog::with_config_and_gc(config, gc_config)?;
1025
1026        // Write entries
1027        let mut pointers = Vec::new();
1028        for i in 0..5 {
1029            let key = Key::from_str(&format!("alldead_key_{}", i));
1030            let value = CipherBlob::new(vec![i as u8; 200]);
1031            let ptr = vlog.append(key, value)?;
1032            pointers.push(ptr);
1033        }
1034        vlog.flush()?;
1035
1036        let old_file_id = vlog.current_file_id();
1037        vlog.rotate()?;
1038
1039        // Mark ALL entries dead
1040        for ptr in &pointers {
1041            vlog.mark_dead(ptr);
1042        }
1043
1044        // Dead ratio should be 1.0
1045        let ratio = vlog.dead_ratio(old_file_id);
1046        assert!(
1047            (ratio - 1.0).abs() < 0.01,
1048            "Expected ratio ~1.0, got {}",
1049            ratio
1050        );
1051
1052        // GC should collect and rewrite 0 entries
1053        let result = vlog.collect_garbage(|_| false)?;
1054        assert_eq!(result.segments_collected, 1);
1055        assert_eq!(result.entries_rewritten, 0);
1056        assert!(result.bytes_reclaimed > 0);
1057
1058        // Old segment should be deleted
1059        let old_path = ValueLog::vlog_file_path(&temp_dir, old_file_id);
1060        assert!(!old_path.exists());
1061
1062        std::fs::remove_dir_all(&temp_dir).ok();
1063        Ok(())
1064    }
1065
1066    #[test]
1067    fn test_gc_all_live_segment() -> Result<()> {
1068        let temp_dir = make_test_dir("gc_all_live");
1069
1070        let config = ValueLogConfig {
1071            vlog_dir: temp_dir.clone(),
1072            max_file_size: 100_000,
1073            sync_on_write: true,
1074            ..Default::default()
1075        };
1076        let gc_config = GcConfig {
1077            trigger_threshold: 0.3,
1078            min_segment_age: Duration::from_secs(0),
1079            max_gc_bytes_per_run: 1024 * 1024,
1080        };
1081
1082        let vlog = ValueLog::with_config_and_gc(config, gc_config)?;
1083
1084        // Write entries but mark none dead
1085        for i in 0..5 {
1086            let key = Key::from_str(&format!("alllive_key_{}", i));
1087            let value = CipherBlob::new(vec![i as u8; 200]);
1088            vlog.append(key, value)?;
1089        }
1090        vlog.flush()?;
1091        vlog.rotate()?;
1092
1093        // No entries marked dead, so dead ratio = 0, below threshold
1094        let result = vlog.collect_garbage(|_| true)?;
1095        assert_eq!(
1096            result.segments_collected, 0,
1097            "All-live segment should not be collected"
1098        );
1099
1100        std::fs::remove_dir_all(&temp_dir).ok();
1101        Ok(())
1102    }
1103
1104    #[test]
1105    fn test_gc_result_stats_accuracy() -> Result<()> {
1106        let temp_dir = make_test_dir("gc_stats_accuracy");
1107
1108        let config = ValueLogConfig {
1109            vlog_dir: temp_dir.clone(),
1110            max_file_size: 100_000,
1111            sync_on_write: true,
1112            ..Default::default()
1113        };
1114        let gc_config = GcConfig {
1115            trigger_threshold: 0.3,
1116            min_segment_age: Duration::from_secs(0),
1117            max_gc_bytes_per_run: 1024 * 1024,
1118        };
1119
1120        let vlog = ValueLog::with_config_and_gc(config, gc_config)?;
1121
1122        let mut pointers = Vec::new();
1123        for i in 0..8 {
1124            let key = Key::from_str(&format!("acc_key_{}", i));
1125            let value = CipherBlob::new(vec![i as u8; 400]);
1126            let ptr = vlog.append(key, value)?;
1127            pointers.push(ptr);
1128        }
1129        vlog.flush()?;
1130        vlog.rotate()?;
1131
1132        // Mark 6 of 8 dead (75% dead)
1133        for ptr in &pointers[0..6] {
1134            vlog.mark_dead(ptr);
1135        }
1136
1137        let result = vlog.collect_garbage(|key| {
1138            let key_str = String::from_utf8_lossy(key.as_bytes());
1139            if let Some(num_str) = key_str.strip_prefix("acc_key_") {
1140                if let Ok(num) = num_str.parse::<usize>() {
1141                    return num >= 6;
1142                }
1143            }
1144            false
1145        })?;
1146
1147        assert_eq!(result.segments_collected, 1);
1148        assert_eq!(result.entries_rewritten, 2);
1149        assert!(result.bytes_reclaimed > 0);
1150        assert!(result.duration.as_nanos() > 0);
1151
1152        std::fs::remove_dir_all(&temp_dir).ok();
1153        Ok(())
1154    }
1155
1156    #[test]
1157    fn test_is_gc_running_flag() -> Result<()> {
1158        let temp_dir = make_test_dir("gc_running_flag");
1159
1160        let vlog = ValueLog::new(&temp_dir)?;
1161        assert!(!vlog.is_gc_running());
1162
1163        std::fs::remove_dir_all(&temp_dir).ok();
1164        Ok(())
1165    }
1166
1167    #[test]
1168    fn test_concurrent_reads_during_gc() -> Result<()> {
1169        use std::sync::Arc;
1170
1171        let temp_dir = make_test_dir("concurrent_gc");
1172
1173        let config = ValueLogConfig {
1174            vlog_dir: temp_dir.clone(),
1175            max_file_size: 100_000,
1176            sync_on_write: true,
1177            ..Default::default()
1178        };
1179        let gc_config = GcConfig {
1180            trigger_threshold: 0.3,
1181            min_segment_age: Duration::from_secs(0),
1182            max_gc_bytes_per_run: 1024 * 1024,
1183        };
1184
1185        let vlog = Arc::new(ValueLog::with_config_and_gc(config, gc_config)?);
1186
1187        // Write entries to first segment
1188        let mut first_segment_pointers = Vec::new();
1189        for i in 0..10 {
1190            let key = Key::from_str(&format!("conc_key_{}", i));
1191            let value = CipherBlob::new(vec![i as u8; 300]);
1192            let ptr = vlog.append(key, value)?;
1193            first_segment_pointers.push(ptr);
1194        }
1195        vlog.flush()?;
1196
1197        // Rotate to new segment
1198        vlog.rotate()?;
1199
1200        // Write entries to second segment (these will be read concurrently)
1201        let mut second_segment_pointers = Vec::new();
1202        for i in 0..5 {
1203            let key = Key::from_str(&format!("conc2_key_{}", i));
1204            let value = CipherBlob::new(vec![(i + 100) as u8; 300]);
1205            let ptr = vlog.append(key, value)?;
1206            second_segment_pointers.push(ptr);
1207        }
1208        vlog.flush()?;
1209
1210        // Mark first segment entries 5-9 as dead
1211        for ptr in &first_segment_pointers[5..10] {
1212            vlog.mark_dead(ptr);
1213        }
1214
1215        // Spawn reader threads that read from the second segment
1216        let handles: Vec<_> = second_segment_pointers
1217            .iter()
1218            .enumerate()
1219            .map(|(i, ptr)| {
1220                let vlog_clone = Arc::clone(&vlog);
1221                let ptr_clone = ptr.clone();
1222                let expected = (i + 100) as u8;
1223                std::thread::spawn(move || {
1224                    for _ in 0..10 {
1225                        let val = vlog_clone
1226                            .read(&ptr_clone)
1227                            .expect("read should succeed during GC");
1228                        assert_eq!(val.as_bytes()[0], expected);
1229                        std::thread::yield_now();
1230                    }
1231                })
1232            })
1233            .collect();
1234
1235        // Run GC on first segment concurrently with reads on second
1236        let gc_result = vlog.collect_garbage(|key| {
1237            let key_str = String::from_utf8_lossy(key.as_bytes());
1238            if let Some(num_str) = key_str.strip_prefix("conc_key_") {
1239                if let Ok(num) = num_str.parse::<usize>() {
1240                    return num < 5;
1241                }
1242            }
1243            // second segment entries are always live
1244            true
1245        })?;
1246
1247        // Wait for all reader threads
1248        for handle in handles {
1249            handle.join().expect("reader thread should not panic");
1250        }
1251
1252        assert!(gc_result.segments_collected >= 1);
1253
1254        std::fs::remove_dir_all(&temp_dir).ok();
1255        Ok(())
1256    }
1257
1258    #[test]
1259    fn test_space_reclamation_preserves_live_data() -> Result<()> {
1260        let temp_dir = make_test_dir("reclaim_preserves");
1261
1262        let config = ValueLogConfig {
1263            vlog_dir: temp_dir.clone(),
1264            max_file_size: 100_000,
1265            sync_on_write: true,
1266            ..Default::default()
1267        };
1268        let gc_config = GcConfig {
1269            trigger_threshold: 0.2,
1270            min_segment_age: Duration::from_secs(0),
1271            max_gc_bytes_per_run: 1024 * 1024,
1272        };
1273
1274        let vlog = ValueLog::with_config_and_gc(config, gc_config)?;
1275
1276        // Write entries
1277        let mut pointers = Vec::new();
1278        let mut expected_values = Vec::new();
1279        for i in 0..6 {
1280            let key = Key::from_str(&format!("reclaim_key_{}", i));
1281            let value = CipherBlob::new(vec![i as u8; 250]);
1282            let ptr = vlog.append(key, value.clone())?;
1283            pointers.push(ptr);
1284            expected_values.push(value);
1285        }
1286        vlog.flush()?;
1287
1288        let old_file_id = vlog.current_file_id();
1289        vlog.rotate()?;
1290
1291        // Mark entries 0, 2, 4 as dead (keep 1, 3, 5 live)
1292        vlog.mark_dead(&pointers[0]);
1293        vlog.mark_dead(&pointers[2]);
1294        vlog.mark_dead(&pointers[4]);
1295
1296        // Reclaim the old segment
1297        let is_live = |key: &Key| -> bool {
1298            let key_str = String::from_utf8_lossy(key.as_bytes());
1299            if let Some(num_str) = key_str.strip_prefix("reclaim_key_") {
1300                if let Ok(num) = num_str.parse::<usize>() {
1301                    return num % 2 == 1; // odd keys are live
1302                }
1303            }
1304            false
1305        };
1306        let (reclaimed, rewritten) = vlog.reclaim_segment(old_file_id, &is_live)?;
1307
1308        assert_eq!(rewritten, 3);
1309        assert!(reclaimed > 0);
1310
1311        // Old segment file should be gone
1312        let old_path = ValueLog::vlog_file_path(&temp_dir, old_file_id);
1313        assert!(!old_path.exists());
1314
1315        std::fs::remove_dir_all(&temp_dir).ok();
1316        Ok(())
1317    }
1318
1319    #[test]
1320    fn test_dead_ratio_nonexistent_segment() {
1321        let temp_dir = make_test_dir("dead_ratio_noexist");
1322        let vlog = ValueLog::new(&temp_dir).expect("should create vlog");
1323
1324        // Non-existent segment should return 0.0
1325        let ratio = vlog.dead_ratio(9999);
1326        assert!((ratio - 0.0).abs() < f64::EPSILON);
1327
1328        std::fs::remove_dir_all(&temp_dir).ok();
1329    }
1330
1331    #[test]
1332    fn test_gc_config_defaults() {
1333        let gc = GcConfig::default();
1334        assert!((gc.trigger_threshold - 0.5).abs() < f64::EPSILON);
1335        assert_eq!(gc.min_segment_age, Duration::from_secs(3600));
1336        assert_eq!(gc.max_gc_bytes_per_run, 256 * 1024 * 1024);
1337    }
1338
1339    #[test]
1340    fn test_segment_stats_new() {
1341        let stats = SegmentStats::new();
1342        assert_eq!(stats.total_bytes, 0);
1343        assert_eq!(stats.live_bytes, 0);
1344        assert_eq!(stats.dead_bytes, 0);
1345        assert_eq!(stats.entry_count, 0);
1346        assert_eq!(stats.live_count, 0);
1347        assert!((stats.dead_ratio() - 0.0).abs() < f64::EPSILON);
1348    }
1349}