1use crate::error::{AmateRSError, ErrorContext, Result};
8use crate::types::{CipherBlob, Key};
9use std::fs::{File, OpenOptions};
10use std::io::{BufReader, BufWriter, Read, Write};
11use std::path::Path;
12use std::sync::atomic::Ordering;
13use std::time::{Duration, Instant};
14
15use super::value_log::{VLogEntry, ValueLog};
16
17#[derive(Debug, Clone)]
19pub struct SegmentStats {
20 pub total_bytes: u64,
22 pub live_bytes: u64,
24 pub dead_bytes: u64,
26 pub entry_count: u64,
28 pub live_count: u64,
30 pub created_at: Instant,
32}
33
34impl SegmentStats {
35 pub(crate) fn new() -> Self {
37 Self {
38 total_bytes: 0,
39 live_bytes: 0,
40 dead_bytes: 0,
41 entry_count: 0,
42 live_count: 0,
43 created_at: Instant::now(),
44 }
45 }
46
47 pub(crate) fn record_write(&mut self, entry_bytes: u64) {
49 self.total_bytes += entry_bytes;
50 self.live_bytes += entry_bytes;
51 self.entry_count += 1;
52 self.live_count += 1;
53 }
54
55 pub(crate) fn mark_entry_dead(&mut self, entry_bytes: u64) {
57 let move_bytes = entry_bytes.min(self.live_bytes);
58 self.live_bytes -= move_bytes;
59 self.dead_bytes += move_bytes;
60 if self.live_count > 0 {
61 self.live_count -= 1;
62 }
63 }
64
65 pub fn dead_ratio(&self) -> f64 {
67 if self.total_bytes == 0 {
68 0.0
69 } else {
70 self.dead_bytes as f64 / self.total_bytes as f64
71 }
72 }
73}
74
75#[derive(Debug, Clone)]
77pub struct GcConfig {
78 pub trigger_threshold: f64,
80 pub min_segment_age: Duration,
82 pub max_gc_bytes_per_run: u64,
84}
85
86impl Default for GcConfig {
87 fn default() -> Self {
88 Self {
89 trigger_threshold: 0.5,
90 min_segment_age: Duration::from_secs(3600),
91 max_gc_bytes_per_run: 256 * 1024 * 1024,
92 }
93 }
94}
95
96#[derive(Debug, Clone)]
98pub struct GcResult {
99 pub segments_collected: usize,
101 pub bytes_reclaimed: u64,
103 pub entries_rewritten: u64,
105 pub duration: Duration,
107}
108
109#[derive(Debug, Clone)]
111pub struct GcStats {
112 pub file_id: u64,
114 pub live_count: usize,
116 pub dead_count: usize,
118 pub reclaimed_bytes: u64,
120}
121
122impl ValueLog {
124 pub fn mark_dead(&self, pointer: &super::value_log::ValuePointer) {
129 if let Some(mut stats) = self.segment_stats.get_mut(&pointer.file_id) {
130 stats.mark_entry_dead(pointer.length as u64);
131 }
132 }
133
134 pub fn dead_ratio(&self, file_id: u64) -> f64 {
138 self.segment_stats
139 .get(&file_id)
140 .map(|stats| stats.dead_ratio())
141 .unwrap_or(0.0)
142 }
143
144 pub fn segment_stats(&self, file_id: u64) -> Option<SegmentStats> {
146 self.segment_stats.get(&file_id).map(|s| s.clone())
147 }
148
149 pub fn is_gc_running(&self) -> bool {
151 self.gc_running.load(Ordering::Relaxed)
152 }
153
154 pub fn total_reclaimable_bytes(&self) -> u64 {
156 self.segment_stats
157 .iter()
158 .map(|entry| entry.value().dead_bytes)
159 .sum()
160 }
161
162 pub fn collect_garbage<F>(&self, is_live_fn: F) -> Result<GcResult>
171 where
172 F: Fn(&Key) -> bool,
173 {
174 if self
176 .gc_running
177 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
178 .is_err()
179 {
180 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
181 "GC is already running",
182 )));
183 }
184
185 let start = Instant::now();
186 let result = self.collect_garbage_inner(&is_live_fn);
187
188 self.gc_running.store(false, Ordering::SeqCst);
190
191 result.map(
192 |(segments_collected, bytes_reclaimed, entries_rewritten)| GcResult {
193 segments_collected,
194 bytes_reclaimed,
195 entries_rewritten,
196 duration: start.elapsed(),
197 },
198 )
199 }
200
201 fn collect_garbage_inner<F>(&self, is_live_fn: &F) -> Result<(usize, u64, u64)>
203 where
204 F: Fn(&Key) -> bool,
205 {
206 let current_file_id = *self.current_file_id.read();
207 let threshold = self.gc_config.trigger_threshold;
208 let min_age = self.gc_config.min_segment_age;
209 let max_bytes = self.gc_config.max_gc_bytes_per_run;
210
211 let mut candidates: Vec<(u64, f64, u64)> = Vec::new();
213 for entry in self.segment_stats.iter() {
214 let seg_id = *entry.key();
215 let stats = entry.value();
216
217 if seg_id == current_file_id {
219 continue;
220 }
221
222 if stats.created_at.elapsed() < min_age {
224 continue;
225 }
226
227 let ratio = stats.dead_ratio();
229 if ratio >= threshold {
230 candidates.push((seg_id, ratio, stats.total_bytes));
231 }
232 }
233
234 candidates.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
236
237 let mut total_segments = 0usize;
238 let mut total_bytes_reclaimed = 0u64;
239 let mut total_entries_rewritten = 0u64;
240 let mut bytes_processed = 0u64;
241
242 for (seg_id, _ratio, seg_bytes) in candidates {
243 if bytes_processed + seg_bytes > max_bytes {
244 break;
245 }
246
247 match self.reclaim_segment(seg_id, is_live_fn) {
248 Ok((reclaimed, rewritten)) => {
249 total_segments += 1;
250 total_bytes_reclaimed += reclaimed;
251 total_entries_rewritten += rewritten;
252 bytes_processed += seg_bytes;
253 }
254 Err(e) => {
255 tracing::warn!("GC failed for segment {}: {}", seg_id, e);
257 }
258 }
259 }
260
261 Ok((
262 total_segments,
263 total_bytes_reclaimed,
264 total_entries_rewritten,
265 ))
266 }
267
268 pub fn reclaim_segment<F>(&self, file_id: u64, is_live_fn: &F) -> Result<(u64, u64)>
272 where
273 F: Fn(&Key) -> bool,
274 {
275 let file_path = Self::vlog_file_path(&self.config.vlog_dir, file_id);
276
277 let reader_lock = self
279 .segment_readers
280 .entry(file_id)
281 .or_insert_with(|| std::sync::Arc::new(parking_lot::RwLock::new(())))
282 .clone();
283
284 let (live_entries, original_size) = {
286 let _read_guard = reader_lock.read();
287 self.read_live_entries(file_id, is_live_fn)?
288 };
289
290 let entries_rewritten = live_entries.len() as u64;
291
292 for (key, value) in &live_entries {
294 self.append(key.clone(), value.clone())?;
295 }
296 self.flush()?;
297
298 {
300 let _write_guard = reader_lock.write();
301 if file_path.exists() {
302 std::fs::remove_file(&file_path).map_err(|e| {
303 AmateRSError::IoError(ErrorContext::new(format!(
304 "Failed to delete old vLog segment {}: {}",
305 file_id, e
306 )))
307 })?;
308 }
309 }
310
311 let new_live_bytes: u64 = live_entries
313 .iter()
314 .map(|(k, v)| {
315 (16 + k.len() + v.len()) as u64
317 })
318 .sum();
319 let bytes_reclaimed = original_size.saturating_sub(new_live_bytes);
320
321 self.segment_stats.remove(&file_id);
323 self.segment_readers.remove(&file_id);
324
325 Ok((bytes_reclaimed, entries_rewritten))
326 }
327
328 fn read_live_entries<F>(
330 &self,
331 file_id: u64,
332 is_live_fn: &F,
333 ) -> Result<(Vec<(Key, CipherBlob)>, u64)>
334 where
335 F: Fn(&Key) -> bool,
336 {
337 let file_path = Self::vlog_file_path(&self.config.vlog_dir, file_id);
338
339 let old_file = File::open(&file_path).map_err(|e| {
340 AmateRSError::IoError(ErrorContext::new(format!(
341 "Failed to open vLog segment {} for GC: {}",
342 file_id, e
343 )))
344 })?;
345
346 let file_size = old_file
347 .metadata()
348 .map_err(|e| {
349 AmateRSError::IoError(ErrorContext::new(format!(
350 "Failed to get segment {} size: {}",
351 file_id, e
352 )))
353 })?
354 .len();
355
356 let mut reader = BufReader::new(old_file);
357 let mut offset = 0u64;
358 let mut live_entries = Vec::new();
359
360 while offset < file_size {
361 match Self::read_next_entry(&mut reader) {
362 Ok(Some((key, value, entry_size))) => {
363 offset += entry_size as u64;
364 if is_live_fn(&key) {
365 live_entries.push((key, value));
366 }
367 }
368 Ok(None) => break,
369 Err(e) => {
370 tracing::warn!(
371 "Error reading entry at offset {} in segment {}: {}",
372 offset,
373 file_id,
374 e
375 );
376 break;
377 }
378 }
379 }
380
381 Ok((live_entries, file_size))
382 }
383
384 fn read_next_entry(reader: &mut BufReader<File>) -> Result<Option<(Key, CipherBlob, usize)>> {
386 let mut magic_bytes = [0u8; 4];
388 match reader.read_exact(&mut magic_bytes) {
389 Ok(()) => {}
390 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
391 Err(e) => {
392 return Err(AmateRSError::IoError(ErrorContext::new(format!(
393 "Failed to read magic: {}",
394 e
395 ))));
396 }
397 }
398
399 let magic = u32::from_le_bytes(magic_bytes);
400 if magic != 0x564C4F47 {
401 return Ok(None);
402 }
403
404 let mut key_len_bytes = [0u8; 4];
406 reader.read_exact(&mut key_len_bytes).map_err(|e| {
407 AmateRSError::IoError(ErrorContext::new(format!(
408 "Failed to read key length: {}",
409 e
410 )))
411 })?;
412 let key_len = u32::from_le_bytes(key_len_bytes) as usize;
413
414 let mut key_bytes = vec![0u8; key_len];
416 reader.read_exact(&mut key_bytes).map_err(|e| {
417 AmateRSError::IoError(ErrorContext::new(format!("Failed to read key: {}", e)))
418 })?;
419 let key = Key::from_slice(&key_bytes);
420
421 let mut value_len_bytes = [0u8; 4];
423 reader.read_exact(&mut value_len_bytes).map_err(|e| {
424 AmateRSError::IoError(ErrorContext::new(format!(
425 "Failed to read value length: {}",
426 e
427 )))
428 })?;
429 let value_len = u32::from_le_bytes(value_len_bytes) as usize;
430
431 let mut value_bytes = vec![0u8; value_len];
433 reader.read_exact(&mut value_bytes).map_err(|e| {
434 AmateRSError::IoError(ErrorContext::new(format!("Failed to read value: {}", e)))
435 })?;
436 let value = CipherBlob::new(value_bytes);
437
438 let mut checksum_bytes = [0u8; 4];
440 reader.read_exact(&mut checksum_bytes).map_err(|e| {
441 AmateRSError::IoError(ErrorContext::new(format!("Failed to read checksum: {}", e)))
442 })?;
443
444 let entry_size = 4 + 4 + key_len + 4 + value_len + 4;
445
446 Ok(Some((key, value, entry_size)))
447 }
448
449 pub fn garbage_collect_file<F>(&self, file_id: u64, is_live_fn: F) -> Result<GcStats>
456 where
457 F: Fn(&Key) -> bool,
458 {
459 let file_path = Self::vlog_file_path(&self.config.vlog_dir, file_id);
460
461 let old_file = File::open(&file_path).map_err(|e| {
463 AmateRSError::IoError(ErrorContext::new(format!(
464 "Failed to open vLog file for GC: {}",
465 e
466 )))
467 })?;
468
469 let file_size = old_file
470 .metadata()
471 .map_err(|e| {
472 AmateRSError::IoError(ErrorContext::new(format!("Failed to get file size: {}", e)))
473 })?
474 .len();
475
476 let mut reader = BufReader::new(old_file);
477 let mut offset = 0u64;
478
479 let mut live_values = Vec::new();
480 let mut dead_count = 0usize;
481 let mut live_count = 0usize;
482
483 while offset < file_size {
485 let _start_offset = offset;
487
488 let mut magic_bytes = [0u8; 4];
490 match reader.read_exact(&mut magic_bytes) {
491 Ok(()) => {}
492 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
493 break;
495 }
496 Err(e) => {
497 return Err(AmateRSError::IoError(ErrorContext::new(format!(
498 "Failed to read magic: {}",
499 e
500 ))));
501 }
502 }
503
504 let magic = u32::from_le_bytes(magic_bytes);
506 if magic != 0x564C4F47 {
507 break;
509 }
510
511 let mut key_len_bytes = [0u8; 4];
513 reader.read_exact(&mut key_len_bytes).map_err(|e| {
514 AmateRSError::IoError(ErrorContext::new(format!(
515 "Failed to read key length: {}",
516 e
517 )))
518 })?;
519 let key_len = u32::from_le_bytes(key_len_bytes) as usize;
520
521 let mut key_bytes = vec![0u8; key_len];
523 reader.read_exact(&mut key_bytes).map_err(|e| {
524 AmateRSError::IoError(ErrorContext::new(format!("Failed to read key: {}", e)))
525 })?;
526 let key = Key::from_slice(&key_bytes);
527
528 let mut value_len_bytes = [0u8; 4];
530 reader.read_exact(&mut value_len_bytes).map_err(|e| {
531 AmateRSError::IoError(ErrorContext::new(format!(
532 "Failed to read value length: {}",
533 e
534 )))
535 })?;
536 let value_len = u32::from_le_bytes(value_len_bytes) as usize;
537
538 let mut value_bytes = vec![0u8; value_len];
540 reader.read_exact(&mut value_bytes).map_err(|e| {
541 AmateRSError::IoError(ErrorContext::new(format!("Failed to read value: {}", e)))
542 })?;
543 let value = CipherBlob::new(value_bytes);
544
545 let mut checksum_bytes = [0u8; 4];
547 reader.read_exact(&mut checksum_bytes).map_err(|e| {
548 AmateRSError::IoError(ErrorContext::new(format!("Failed to read checksum: {}", e)))
549 })?;
550
551 let entry_size = 4 + 4 + key_len + 4 + value_len + 4;
553 offset += entry_size as u64;
554
555 if is_live_fn(&key) {
557 live_values.push((key, value));
558 live_count += 1;
559 } else {
560 dead_count += 1;
561 }
562 }
563
564 let new_file_id = Self::find_latest_vlog(&self.config)? + 1;
566 let new_file_path = Self::vlog_file_path(&self.config.vlog_dir, new_file_id);
567
568 let new_file = OpenOptions::new()
569 .create(true)
570 .write(true)
571 .truncate(true)
572 .open(&new_file_path)
573 .map_err(|e| {
574 AmateRSError::IoError(ErrorContext::new(format!(
575 "Failed to create new vLog file: {}",
576 e
577 )))
578 })?;
579
580 let mut new_writer = BufWriter::new(new_file);
581
582 for (key, value) in live_values {
583 let entry = VLogEntry::new(key, value);
584 let entry_bytes = entry.encode();
585 new_writer.write_all(&entry_bytes).map_err(|e| {
586 AmateRSError::IoError(ErrorContext::new(format!(
587 "Failed to write GC entry: {}",
588 e
589 )))
590 })?;
591 }
592
593 new_writer.flush().map_err(|e| {
594 AmateRSError::IoError(ErrorContext::new(format!("Failed to flush GC file: {}", e)))
595 })?;
596
597 std::fs::remove_file(&file_path).map_err(|e| {
599 AmateRSError::IoError(ErrorContext::new(format!(
600 "Failed to delete old vLog file: {}",
601 e
602 )))
603 })?;
604
605 Ok(GcStats {
606 file_id,
607 live_count,
608 dead_count,
609 reclaimed_bytes: file_size
610 - new_writer
611 .get_ref()
612 .metadata()
613 .map_err(|e| {
614 AmateRSError::IoError(ErrorContext::new(format!(
615 "Failed to get new file size: {}",
616 e
617 )))
618 })?
619 .len(),
620 })
621 }
622
623 pub fn calculate_garbage_ratio<F>(&self, file_id: u64, is_live_fn: F) -> Result<f64>
628 where
629 F: Fn(&Key) -> bool,
630 {
631 let file_path = Self::vlog_file_path(&self.config.vlog_dir, file_id);
632
633 let file = File::open(&file_path).map_err(|e| {
634 AmateRSError::IoError(ErrorContext::new(format!(
635 "Failed to open vLog file: {}",
636 e
637 )))
638 })?;
639
640 let file_size = file
641 .metadata()
642 .map_err(|e| {
643 AmateRSError::IoError(ErrorContext::new(format!("Failed to get file size: {}", e)))
644 })?
645 .len();
646
647 let mut reader = BufReader::new(file);
648 let mut offset = 0u64;
649
650 let mut live_bytes = 0u64;
651 let mut dead_bytes = 0u64;
652
653 while offset < file_size {
654 let _start_offset = offset;
655
656 let mut magic_bytes = [0u8; 4];
658 match reader.read_exact(&mut magic_bytes) {
659 Ok(()) => {}
660 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
661 Err(e) => {
662 return Err(AmateRSError::IoError(ErrorContext::new(format!(
663 "Failed to read magic: {}",
664 e
665 ))));
666 }
667 }
668
669 let magic = u32::from_le_bytes(magic_bytes);
670 if magic != 0x564C4F47 {
671 break;
672 }
673
674 let mut key_len_bytes = [0u8; 4];
676 reader.read_exact(&mut key_len_bytes).map_err(|e| {
677 AmateRSError::IoError(ErrorContext::new(format!(
678 "Failed to read key length: {}",
679 e
680 )))
681 })?;
682 let key_len = u32::from_le_bytes(key_len_bytes) as usize;
683
684 let mut key_bytes = vec![0u8; key_len];
686 reader.read_exact(&mut key_bytes).map_err(|e| {
687 AmateRSError::IoError(ErrorContext::new(format!("Failed to read key: {}", e)))
688 })?;
689 let key = Key::from_slice(&key_bytes);
690
691 let mut value_len_bytes = [0u8; 4];
693 reader.read_exact(&mut value_len_bytes).map_err(|e| {
694 AmateRSError::IoError(ErrorContext::new(format!(
695 "Failed to read value length: {}",
696 e
697 )))
698 })?;
699 let value_len = u32::from_le_bytes(value_len_bytes) as usize;
700
701 let mut value_bytes = vec![0u8; value_len];
703 reader.read_exact(&mut value_bytes).map_err(|e| {
704 AmateRSError::IoError(ErrorContext::new(format!("Failed to read value: {}", e)))
705 })?;
706
707 let mut checksum_bytes = [0u8; 4];
709 reader.read_exact(&mut checksum_bytes).map_err(|e| {
710 AmateRSError::IoError(ErrorContext::new(format!("Failed to read checksum: {}", e)))
711 })?;
712
713 let entry_size = 4 + 4 + key_len + 4 + value_len + 4;
714 offset += entry_size as u64;
715
716 if is_live_fn(&key) {
717 live_bytes += entry_size as u64;
718 } else {
719 dead_bytes += entry_size as u64;
720 }
721 }
722
723 let total_bytes = live_bytes + dead_bytes;
724 if total_bytes == 0 {
725 Ok(0.0)
726 } else {
727 Ok(dead_bytes as f64 / total_bytes as f64)
728 }
729 }
730}
731
732#[cfg(test)]
733mod tests {
734 use super::*;
735 use crate::storage::value_log::{ValueLog, ValueLogConfig, ValuePointer};
736 use std::env;
737 use std::path::PathBuf;
738
739 fn make_test_dir(name: &str) -> PathBuf {
741 let dir = env::temp_dir()
742 .join("amaters_vlog_gc_tests")
743 .join(name)
744 .join(format!("{}", std::process::id()));
745 std::fs::create_dir_all(&dir).ok();
746 if let Ok(entries) = std::fs::read_dir(&dir) {
748 for entry in entries.flatten() {
749 std::fs::remove_file(entry.path()).ok();
750 }
751 }
752 dir
753 }
754
755 #[test]
756 fn test_segment_stats_tracking() -> Result<()> {
757 let temp_dir = make_test_dir("segment_stats");
758
759 let vlog = ValueLog::new(&temp_dir)?;
760 let file_id = vlog.current_file_id();
761
762 let mut pointers = Vec::new();
764 for i in 0..5 {
765 let key = Key::from_str(&format!("stats_key_{}", i));
766 let value = CipherBlob::new(vec![i as u8; 500]);
767 let ptr = vlog.append(key, value)?;
768 pointers.push(ptr);
769 }
770 vlog.flush()?;
771
772 let stats = vlog
774 .segment_stats(file_id)
775 .expect("stats should exist for current segment");
776 assert_eq!(stats.entry_count, 5);
777 assert_eq!(stats.live_count, 5);
778 assert!(stats.total_bytes > 0);
779 assert_eq!(stats.dead_bytes, 0);
780 assert!((stats.dead_ratio() - 0.0).abs() < f64::EPSILON);
781
782 std::fs::remove_dir_all(&temp_dir).ok();
783 Ok(())
784 }
785
786 #[test]
787 fn test_mark_dead_and_dead_ratio() -> Result<()> {
788 let temp_dir = make_test_dir("mark_dead");
789
790 let vlog = ValueLog::new(&temp_dir)?;
791 let file_id = vlog.current_file_id();
792
793 let mut pointers = Vec::new();
795 for i in 0..4 {
796 let key = Key::from_str(&format!("dead_key_{}", i));
797 let value = CipherBlob::new(vec![i as u8; 200]);
798 let ptr = vlog.append(key, value)?;
799 pointers.push(ptr);
800 }
801 vlog.flush()?;
802
803 let ratio = vlog.dead_ratio(file_id);
805 assert!((ratio - 0.0).abs() < f64::EPSILON);
806
807 vlog.mark_dead(&pointers[0]);
809 vlog.mark_dead(&pointers[1]);
810
811 let ratio = vlog.dead_ratio(file_id);
812 assert!(ratio > 0.45 && ratio < 0.55, "Expected ~0.5, got {}", ratio);
814
815 let stats = vlog.segment_stats(file_id).expect("stats should exist");
817 assert_eq!(stats.live_count, 2);
818 assert_eq!(stats.entry_count, 4);
819
820 std::fs::remove_dir_all(&temp_dir).ok();
821 Ok(())
822 }
823
824 #[test]
825 fn test_total_reclaimable_bytes() -> Result<()> {
826 let temp_dir = make_test_dir("reclaimable");
827
828 let vlog = ValueLog::new(&temp_dir)?;
829
830 let mut pointers = Vec::new();
831 for i in 0..6 {
832 let key = Key::from_str(&format!("reclaim_key_{}", i));
833 let value = CipherBlob::new(vec![i as u8; 300]);
834 let ptr = vlog.append(key, value)?;
835 pointers.push(ptr);
836 }
837 vlog.flush()?;
838
839 assert_eq!(vlog.total_reclaimable_bytes(), 0);
841
842 for ptr in &pointers[0..3] {
844 vlog.mark_dead(ptr);
845 }
846
847 let reclaimable = vlog.total_reclaimable_bytes();
848 assert!(
849 reclaimable > 0,
850 "Expected reclaimable bytes > 0, got {}",
851 reclaimable
852 );
853
854 std::fs::remove_dir_all(&temp_dir).ok();
855 Ok(())
856 }
857
858 #[test]
859 fn test_gc_correctness() -> Result<()> {
860 let temp_dir = make_test_dir("gc_correctness");
861
862 let config = ValueLogConfig {
864 vlog_dir: temp_dir.clone(),
865 max_file_size: 100_000,
866 sync_on_write: true,
867 ..Default::default()
868 };
869 let gc_config = GcConfig {
870 trigger_threshold: 0.3,
871 min_segment_age: Duration::from_secs(0), max_gc_bytes_per_run: 1024 * 1024,
873 };
874
875 let vlog = ValueLog::with_config_and_gc(config, gc_config)?;
876
877 let mut pointers = Vec::new();
879 let mut values = Vec::new();
880 for i in 0..10 {
881 let key = Key::from_str(&format!("gc_key_{}", i));
882 let value = CipherBlob::new(vec![i as u8; 500]);
883 let ptr = vlog.append(key, value.clone())?;
884 pointers.push(ptr);
885 values.push(value);
886 }
887 vlog.flush()?;
888
889 let old_file_id = vlog.current_file_id();
890
891 vlog.rotate()?;
893
894 for ptr in &pointers[5..10] {
896 vlog.mark_dead(ptr);
897 }
898
899 let result = vlog.collect_garbage(|key| {
901 let key_str = String::from_utf8_lossy(key.as_bytes());
902 if let Some(num_str) = key_str.strip_prefix("gc_key_") {
903 if let Ok(num) = num_str.parse::<usize>() {
904 return num < 5;
905 }
906 }
907 false
908 })?;
909
910 assert_eq!(result.segments_collected, 1);
911 assert!(result.bytes_reclaimed > 0);
912 assert_eq!(result.entries_rewritten, 5);
913
914 let old_path = ValueLog::vlog_file_path(&temp_dir, old_file_id);
916 assert!(
917 !old_path.exists(),
918 "Old segment file should have been deleted"
919 );
920
921 std::fs::remove_dir_all(&temp_dir).ok();
922 Ok(())
923 }
924
925 #[test]
926 fn test_gc_threshold_respected() -> Result<()> {
927 let temp_dir = make_test_dir("gc_threshold");
928
929 let config = ValueLogConfig {
930 vlog_dir: temp_dir.clone(),
931 max_file_size: 100_000,
932 sync_on_write: true,
933 ..Default::default()
934 };
935 let gc_config = GcConfig {
936 trigger_threshold: 0.8, min_segment_age: Duration::from_secs(0),
938 max_gc_bytes_per_run: 1024 * 1024,
939 };
940
941 let vlog = ValueLog::with_config_and_gc(config, gc_config)?;
942
943 let mut pointers = Vec::new();
945 for i in 0..10 {
946 let key = Key::from_str(&format!("thresh_key_{}", i));
947 let value = CipherBlob::new(vec![i as u8; 300]);
948 let ptr = vlog.append(key, value)?;
949 pointers.push(ptr);
950 }
951 vlog.flush()?;
952
953 let old_file_id = vlog.current_file_id();
954 vlog.rotate()?;
955
956 for ptr in &pointers[0..3] {
958 vlog.mark_dead(ptr);
959 }
960
961 let result = vlog.collect_garbage(|_| true)?;
963 assert_eq!(
964 result.segments_collected, 0,
965 "GC should not trigger below threshold"
966 );
967
968 let old_path = ValueLog::vlog_file_path(&temp_dir, old_file_id);
970 assert!(old_path.exists(), "Segment should still exist");
971
972 std::fs::remove_dir_all(&temp_dir).ok();
973 Ok(())
974 }
975
976 #[test]
977 fn test_gc_empty_segment() -> Result<()> {
978 let temp_dir = make_test_dir("gc_empty");
979
980 let config = ValueLogConfig {
981 vlog_dir: temp_dir.clone(),
982 max_file_size: 100_000,
983 sync_on_write: true,
984 ..Default::default()
985 };
986 let gc_config = GcConfig {
987 trigger_threshold: 0.3,
988 min_segment_age: Duration::from_secs(0),
989 max_gc_bytes_per_run: 1024 * 1024,
990 };
991
992 let vlog = ValueLog::with_config_and_gc(config, gc_config)?;
993
994 let first_id = vlog.current_file_id();
996 vlog.rotate()?;
997
998 let result = vlog.collect_garbage(|_| false)?;
1000 assert_eq!(result.segments_collected, 0);
1002
1003 std::fs::remove_dir_all(&temp_dir).ok();
1004 let _ = first_id;
1005 Ok(())
1006 }
1007
1008 #[test]
1009 fn test_gc_all_dead_segment() -> Result<()> {
1010 let temp_dir = make_test_dir("gc_all_dead");
1011
1012 let config = ValueLogConfig {
1013 vlog_dir: temp_dir.clone(),
1014 max_file_size: 100_000,
1015 sync_on_write: true,
1016 ..Default::default()
1017 };
1018 let gc_config = GcConfig {
1019 trigger_threshold: 0.5,
1020 min_segment_age: Duration::from_secs(0),
1021 max_gc_bytes_per_run: 1024 * 1024,
1022 };
1023
1024 let vlog = ValueLog::with_config_and_gc(config, gc_config)?;
1025
1026 let mut pointers = Vec::new();
1028 for i in 0..5 {
1029 let key = Key::from_str(&format!("alldead_key_{}", i));
1030 let value = CipherBlob::new(vec![i as u8; 200]);
1031 let ptr = vlog.append(key, value)?;
1032 pointers.push(ptr);
1033 }
1034 vlog.flush()?;
1035
1036 let old_file_id = vlog.current_file_id();
1037 vlog.rotate()?;
1038
1039 for ptr in &pointers {
1041 vlog.mark_dead(ptr);
1042 }
1043
1044 let ratio = vlog.dead_ratio(old_file_id);
1046 assert!(
1047 (ratio - 1.0).abs() < 0.01,
1048 "Expected ratio ~1.0, got {}",
1049 ratio
1050 );
1051
1052 let result = vlog.collect_garbage(|_| false)?;
1054 assert_eq!(result.segments_collected, 1);
1055 assert_eq!(result.entries_rewritten, 0);
1056 assert!(result.bytes_reclaimed > 0);
1057
1058 let old_path = ValueLog::vlog_file_path(&temp_dir, old_file_id);
1060 assert!(!old_path.exists());
1061
1062 std::fs::remove_dir_all(&temp_dir).ok();
1063 Ok(())
1064 }
1065
1066 #[test]
1067 fn test_gc_all_live_segment() -> Result<()> {
1068 let temp_dir = make_test_dir("gc_all_live");
1069
1070 let config = ValueLogConfig {
1071 vlog_dir: temp_dir.clone(),
1072 max_file_size: 100_000,
1073 sync_on_write: true,
1074 ..Default::default()
1075 };
1076 let gc_config = GcConfig {
1077 trigger_threshold: 0.3,
1078 min_segment_age: Duration::from_secs(0),
1079 max_gc_bytes_per_run: 1024 * 1024,
1080 };
1081
1082 let vlog = ValueLog::with_config_and_gc(config, gc_config)?;
1083
1084 for i in 0..5 {
1086 let key = Key::from_str(&format!("alllive_key_{}", i));
1087 let value = CipherBlob::new(vec![i as u8; 200]);
1088 vlog.append(key, value)?;
1089 }
1090 vlog.flush()?;
1091 vlog.rotate()?;
1092
1093 let result = vlog.collect_garbage(|_| true)?;
1095 assert_eq!(
1096 result.segments_collected, 0,
1097 "All-live segment should not be collected"
1098 );
1099
1100 std::fs::remove_dir_all(&temp_dir).ok();
1101 Ok(())
1102 }
1103
1104 #[test]
1105 fn test_gc_result_stats_accuracy() -> Result<()> {
1106 let temp_dir = make_test_dir("gc_stats_accuracy");
1107
1108 let config = ValueLogConfig {
1109 vlog_dir: temp_dir.clone(),
1110 max_file_size: 100_000,
1111 sync_on_write: true,
1112 ..Default::default()
1113 };
1114 let gc_config = GcConfig {
1115 trigger_threshold: 0.3,
1116 min_segment_age: Duration::from_secs(0),
1117 max_gc_bytes_per_run: 1024 * 1024,
1118 };
1119
1120 let vlog = ValueLog::with_config_and_gc(config, gc_config)?;
1121
1122 let mut pointers = Vec::new();
1123 for i in 0..8 {
1124 let key = Key::from_str(&format!("acc_key_{}", i));
1125 let value = CipherBlob::new(vec![i as u8; 400]);
1126 let ptr = vlog.append(key, value)?;
1127 pointers.push(ptr);
1128 }
1129 vlog.flush()?;
1130 vlog.rotate()?;
1131
1132 for ptr in &pointers[0..6] {
1134 vlog.mark_dead(ptr);
1135 }
1136
1137 let result = vlog.collect_garbage(|key| {
1138 let key_str = String::from_utf8_lossy(key.as_bytes());
1139 if let Some(num_str) = key_str.strip_prefix("acc_key_") {
1140 if let Ok(num) = num_str.parse::<usize>() {
1141 return num >= 6;
1142 }
1143 }
1144 false
1145 })?;
1146
1147 assert_eq!(result.segments_collected, 1);
1148 assert_eq!(result.entries_rewritten, 2);
1149 assert!(result.bytes_reclaimed > 0);
1150 assert!(result.duration.as_nanos() > 0);
1151
1152 std::fs::remove_dir_all(&temp_dir).ok();
1153 Ok(())
1154 }
1155
1156 #[test]
1157 fn test_is_gc_running_flag() -> Result<()> {
1158 let temp_dir = make_test_dir("gc_running_flag");
1159
1160 let vlog = ValueLog::new(&temp_dir)?;
1161 assert!(!vlog.is_gc_running());
1162
1163 std::fs::remove_dir_all(&temp_dir).ok();
1164 Ok(())
1165 }
1166
1167 #[test]
1168 fn test_concurrent_reads_during_gc() -> Result<()> {
1169 use std::sync::Arc;
1170
1171 let temp_dir = make_test_dir("concurrent_gc");
1172
1173 let config = ValueLogConfig {
1174 vlog_dir: temp_dir.clone(),
1175 max_file_size: 100_000,
1176 sync_on_write: true,
1177 ..Default::default()
1178 };
1179 let gc_config = GcConfig {
1180 trigger_threshold: 0.3,
1181 min_segment_age: Duration::from_secs(0),
1182 max_gc_bytes_per_run: 1024 * 1024,
1183 };
1184
1185 let vlog = Arc::new(ValueLog::with_config_and_gc(config, gc_config)?);
1186
1187 let mut first_segment_pointers = Vec::new();
1189 for i in 0..10 {
1190 let key = Key::from_str(&format!("conc_key_{}", i));
1191 let value = CipherBlob::new(vec![i as u8; 300]);
1192 let ptr = vlog.append(key, value)?;
1193 first_segment_pointers.push(ptr);
1194 }
1195 vlog.flush()?;
1196
1197 vlog.rotate()?;
1199
1200 let mut second_segment_pointers = Vec::new();
1202 for i in 0..5 {
1203 let key = Key::from_str(&format!("conc2_key_{}", i));
1204 let value = CipherBlob::new(vec![(i + 100) as u8; 300]);
1205 let ptr = vlog.append(key, value)?;
1206 second_segment_pointers.push(ptr);
1207 }
1208 vlog.flush()?;
1209
1210 for ptr in &first_segment_pointers[5..10] {
1212 vlog.mark_dead(ptr);
1213 }
1214
1215 let handles: Vec<_> = second_segment_pointers
1217 .iter()
1218 .enumerate()
1219 .map(|(i, ptr)| {
1220 let vlog_clone = Arc::clone(&vlog);
1221 let ptr_clone = ptr.clone();
1222 let expected = (i + 100) as u8;
1223 std::thread::spawn(move || {
1224 for _ in 0..10 {
1225 let val = vlog_clone
1226 .read(&ptr_clone)
1227 .expect("read should succeed during GC");
1228 assert_eq!(val.as_bytes()[0], expected);
1229 std::thread::yield_now();
1230 }
1231 })
1232 })
1233 .collect();
1234
1235 let gc_result = vlog.collect_garbage(|key| {
1237 let key_str = String::from_utf8_lossy(key.as_bytes());
1238 if let Some(num_str) = key_str.strip_prefix("conc_key_") {
1239 if let Ok(num) = num_str.parse::<usize>() {
1240 return num < 5;
1241 }
1242 }
1243 true
1245 })?;
1246
1247 for handle in handles {
1249 handle.join().expect("reader thread should not panic");
1250 }
1251
1252 assert!(gc_result.segments_collected >= 1);
1253
1254 std::fs::remove_dir_all(&temp_dir).ok();
1255 Ok(())
1256 }
1257
1258 #[test]
1259 fn test_space_reclamation_preserves_live_data() -> Result<()> {
1260 let temp_dir = make_test_dir("reclaim_preserves");
1261
1262 let config = ValueLogConfig {
1263 vlog_dir: temp_dir.clone(),
1264 max_file_size: 100_000,
1265 sync_on_write: true,
1266 ..Default::default()
1267 };
1268 let gc_config = GcConfig {
1269 trigger_threshold: 0.2,
1270 min_segment_age: Duration::from_secs(0),
1271 max_gc_bytes_per_run: 1024 * 1024,
1272 };
1273
1274 let vlog = ValueLog::with_config_and_gc(config, gc_config)?;
1275
1276 let mut pointers = Vec::new();
1278 let mut expected_values = Vec::new();
1279 for i in 0..6 {
1280 let key = Key::from_str(&format!("reclaim_key_{}", i));
1281 let value = CipherBlob::new(vec![i as u8; 250]);
1282 let ptr = vlog.append(key, value.clone())?;
1283 pointers.push(ptr);
1284 expected_values.push(value);
1285 }
1286 vlog.flush()?;
1287
1288 let old_file_id = vlog.current_file_id();
1289 vlog.rotate()?;
1290
1291 vlog.mark_dead(&pointers[0]);
1293 vlog.mark_dead(&pointers[2]);
1294 vlog.mark_dead(&pointers[4]);
1295
1296 let is_live = |key: &Key| -> bool {
1298 let key_str = String::from_utf8_lossy(key.as_bytes());
1299 if let Some(num_str) = key_str.strip_prefix("reclaim_key_") {
1300 if let Ok(num) = num_str.parse::<usize>() {
1301 return num % 2 == 1; }
1303 }
1304 false
1305 };
1306 let (reclaimed, rewritten) = vlog.reclaim_segment(old_file_id, &is_live)?;
1307
1308 assert_eq!(rewritten, 3);
1309 assert!(reclaimed > 0);
1310
1311 let old_path = ValueLog::vlog_file_path(&temp_dir, old_file_id);
1313 assert!(!old_path.exists());
1314
1315 std::fs::remove_dir_all(&temp_dir).ok();
1316 Ok(())
1317 }
1318
1319 #[test]
1320 fn test_dead_ratio_nonexistent_segment() {
1321 let temp_dir = make_test_dir("dead_ratio_noexist");
1322 let vlog = ValueLog::new(&temp_dir).expect("should create vlog");
1323
1324 let ratio = vlog.dead_ratio(9999);
1326 assert!((ratio - 0.0).abs() < f64::EPSILON);
1327
1328 std::fs::remove_dir_all(&temp_dir).ok();
1329 }
1330
1331 #[test]
1332 fn test_gc_config_defaults() {
1333 let gc = GcConfig::default();
1334 assert!((gc.trigger_threshold - 0.5).abs() < f64::EPSILON);
1335 assert_eq!(gc.min_segment_age, Duration::from_secs(3600));
1336 assert_eq!(gc.max_gc_bytes_per_run, 256 * 1024 * 1024);
1337 }
1338
1339 #[test]
1340 fn test_segment_stats_new() {
1341 let stats = SegmentStats::new();
1342 assert_eq!(stats.total_bytes, 0);
1343 assert_eq!(stats.live_bytes, 0);
1344 assert_eq!(stats.dead_bytes, 0);
1345 assert_eq!(stats.entry_count, 0);
1346 assert_eq!(stats.live_count, 0);
1347 assert!((stats.dead_ratio() - 0.0).abs() < f64::EPSILON);
1348 }
1349}