Skip to main content

amaters_core/storage/
value_log.rs

1//! Value Log (vLog) implementation for WiscKey-style value separation
2//!
3//! The value log stores large values separately from the LSM-Tree to reduce
4//! write amplification. Small values are stored inline, while large values
5//! (>threshold) are stored in the vLog and referenced by pointers.
6//!
7//! Key features:
8//! - Sequential append-only writes for high throughput
9//! - File rotation when files reach size threshold
10//! - Garbage collection to reclaim space from dead values (see `value_log_gc`)
11//! - Value pointers stored in LSM-Tree for indirection
12
13use crate::error::{AmateRSError, ErrorContext, Result};
14use crate::types::{CipherBlob, Key};
15use dashmap::DashMap;
16use parking_lot::RwLock;
17use std::fs::{File, OpenOptions};
18use std::io::{Read, Seek, SeekFrom, Write};
19use std::path::{Path, PathBuf};
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
22
23// Re-export GC types for backward compatibility
24pub use super::value_log_gc::{GcConfig, GcResult, GcStats, SegmentStats};
25
26/// Value pointer for referencing values in the vLog
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct ValuePointer {
29    /// File ID (vLog file number)
30    pub file_id: u64,
31    /// Offset within the file
32    pub offset: u64,
33    /// Length of the value
34    pub length: u32,
35    /// CRC32 checksum for verification
36    pub checksum: u32,
37}
38
39impl ValuePointer {
40    /// Create a new value pointer
41    pub fn new(file_id: u64, offset: u64, length: u32, checksum: u32) -> Self {
42        Self {
43            file_id,
44            offset,
45            length,
46            checksum,
47        }
48    }
49
50    /// Encode to bytes
51    pub fn encode(&self) -> Vec<u8> {
52        let mut bytes = Vec::with_capacity(24);
53        bytes.extend_from_slice(&self.file_id.to_le_bytes());
54        bytes.extend_from_slice(&self.offset.to_le_bytes());
55        bytes.extend_from_slice(&self.length.to_le_bytes());
56        bytes.extend_from_slice(&self.checksum.to_le_bytes());
57        bytes
58    }
59
60    /// Decode from bytes
61    pub fn decode(bytes: &[u8]) -> Result<Self> {
62        if bytes.len() < 24 {
63            return Err(AmateRSError::SerializationError(ErrorContext::new(
64                "ValuePointer too short",
65            )));
66        }
67
68        let file_id = u64::from_le_bytes(bytes[0..8].try_into().map_err(|_| {
69            AmateRSError::SerializationError(ErrorContext::new("Failed to read file_id"))
70        })?);
71
72        let offset = u64::from_le_bytes(bytes[8..16].try_into().map_err(|_| {
73            AmateRSError::SerializationError(ErrorContext::new("Failed to read offset"))
74        })?);
75
76        let length = u32::from_le_bytes(bytes[16..20].try_into().map_err(|_| {
77            AmateRSError::SerializationError(ErrorContext::new("Failed to read length"))
78        })?);
79
80        let checksum = u32::from_le_bytes(bytes[20..24].try_into().map_err(|_| {
81            AmateRSError::SerializationError(ErrorContext::new("Failed to read checksum"))
82        })?);
83
84        Ok(Self {
85            file_id,
86            offset,
87            length,
88            checksum,
89        })
90    }
91}
92
93/// Value log configuration
94#[derive(Debug, Clone)]
95pub struct ValueLogConfig {
96    /// Directory for vLog files
97    pub vlog_dir: PathBuf,
98    /// Maximum file size before rotation (default: 1GB)
99    pub max_file_size: u64,
100    /// Value size threshold for separation (default: 1KB)
101    pub value_threshold: usize,
102    /// Whether to sync after each write (default: false for performance)
103    pub sync_on_write: bool,
104    /// Garbage collection threshold (default: 0.5 = 50% garbage)
105    pub gc_threshold: f64,
106}
107
108impl Default for ValueLogConfig {
109    fn default() -> Self {
110        Self {
111            vlog_dir: PathBuf::from("./vlog"),
112            max_file_size: 1024 * 1024 * 1024, // 1GB
113            value_threshold: 1024,             // 1KB
114            sync_on_write: false,
115            gc_threshold: 0.5,
116        }
117    }
118}
119
120/// Value log entry
121pub(crate) struct VLogEntry {
122    /// Key (for GC to identify ownership)
123    pub(crate) key: Key,
124    /// Value data
125    pub(crate) value: CipherBlob,
126    /// CRC32 checksum
127    pub(crate) checksum: u32,
128}
129
130impl VLogEntry {
131    pub(crate) fn new(key: Key, value: CipherBlob) -> Self {
132        let mut hasher = crc32fast::Hasher::new();
133        hasher.update(key.as_bytes());
134        hasher.update(value.as_bytes());
135        let checksum = hasher.finalize();
136
137        Self {
138            key,
139            value,
140            checksum,
141        }
142    }
143
144    pub(crate) fn encode(&self) -> Vec<u8> {
145        let mut bytes = Vec::new();
146
147        // Magic number (0x564C4F47 = "VLOG" in hex)
148        bytes.extend_from_slice(&0x564C4F47u32.to_le_bytes());
149
150        // Key length and data
151        bytes.extend_from_slice(&(self.key.len() as u32).to_le_bytes());
152        bytes.extend_from_slice(self.key.as_bytes());
153
154        // Value length and data
155        bytes.extend_from_slice(&(self.value.len() as u32).to_le_bytes());
156        bytes.extend_from_slice(self.value.as_bytes());
157
158        // Checksum
159        bytes.extend_from_slice(&self.checksum.to_le_bytes());
160
161        bytes
162    }
163
164    fn decode(bytes: &[u8]) -> Result<Self> {
165        if bytes.len() < 16 {
166            return Err(AmateRSError::SerializationError(ErrorContext::new(
167                "VLogEntry too short",
168            )));
169        }
170
171        let mut offset = 0;
172
173        // Verify magic number
174        let magic = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
175        if magic != 0x564C4F47 {
176            return Err(AmateRSError::SerializationError(ErrorContext::new(
177                "Invalid vLog entry magic number",
178            )));
179        }
180        offset += 4;
181
182        // Key
183        let key_len = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
184            AmateRSError::SerializationError(ErrorContext::new("Failed to read key length"))
185        })?) as usize;
186        offset += 4;
187
188        let key_bytes = &bytes[offset..offset + key_len];
189        let key = Key::from_slice(key_bytes);
190        offset += key_len;
191
192        // Value
193        let value_len = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
194            AmateRSError::SerializationError(ErrorContext::new("Failed to read value length"))
195        })?) as usize;
196        offset += 4;
197
198        let value_bytes = &bytes[offset..offset + value_len];
199        let value = CipherBlob::new(value_bytes.to_vec());
200        offset += value_len;
201
202        // Checksum
203        let checksum = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
204            AmateRSError::SerializationError(ErrorContext::new("Failed to read checksum"))
205        })?);
206
207        let entry = Self {
208            key,
209            value,
210            checksum,
211        };
212
213        // Verify checksum
214        let mut hasher = crc32fast::Hasher::new();
215        hasher.update(entry.key.as_bytes());
216        hasher.update(entry.value.as_bytes());
217        let calculated = hasher.finalize();
218
219        if calculated != entry.checksum {
220            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
221                "vLog entry checksum mismatch: expected {}, got {}",
222                entry.checksum, calculated
223            ))));
224        }
225
226        Ok(entry)
227    }
228}
229
230/// Value log for storing large values
231pub struct ValueLog {
232    /// Configuration
233    pub(crate) config: ValueLogConfig,
234    /// GC configuration
235    pub(crate) gc_config: GcConfig,
236    /// Current vLog file number
237    pub(crate) current_file_id: Arc<RwLock<u64>>,
238    /// Current file writer
239    pub(crate) writer: Arc<RwLock<std::io::BufWriter<File>>>,
240    /// Current file offset
241    pub(crate) current_offset: Arc<RwLock<u64>>,
242    /// Current file size
243    pub(crate) current_size: Arc<RwLock<u64>>,
244    /// Per-segment statistics
245    pub(crate) segment_stats: Arc<DashMap<u64, SegmentStats>>,
246    /// Whether GC is currently running
247    pub(crate) gc_running: Arc<AtomicBool>,
248    /// Active readers count per segment (prevents deletion during reads)
249    pub(crate) segment_readers: Arc<DashMap<u64, Arc<RwLock<()>>>>,
250    /// Timestamp (millis since UNIX epoch) of the last write operation
251    pub(crate) last_write_time: Arc<AtomicU64>,
252}
253
254impl ValueLog {
255    /// Create a new value log with default configuration
256    pub fn new(vlog_dir: impl AsRef<Path>) -> Result<Self> {
257        let config = ValueLogConfig {
258            vlog_dir: vlog_dir.as_ref().to_path_buf(),
259            ..Default::default()
260        };
261        Self::with_config(config)
262    }
263
264    /// Create a new value log with custom configuration
265    pub fn with_config(config: ValueLogConfig) -> Result<Self> {
266        Self::with_config_and_gc(config, GcConfig::default())
267    }
268
269    /// Create a new value log with custom configuration and GC configuration
270    pub fn with_config_and_gc(config: ValueLogConfig, gc_config: GcConfig) -> Result<Self> {
271        // Create vLog directory
272        std::fs::create_dir_all(&config.vlog_dir).map_err(|e| {
273            AmateRSError::IoError(ErrorContext::new(format!(
274                "Failed to create vLog directory: {}",
275                e
276            )))
277        })?;
278
279        // Find the latest vLog file or create a new one
280        let file_id = Self::find_latest_vlog(&config)?;
281        let file_path = Self::vlog_file_path(&config.vlog_dir, file_id);
282
283        let file = OpenOptions::new()
284            .create(true)
285            .append(true)
286            .open(&file_path)
287            .map_err(|e| {
288                AmateRSError::IoError(ErrorContext::new(format!("Failed to open vLog: {}", e)))
289            })?;
290
291        let current_size = file
292            .metadata()
293            .map_err(|e| {
294                AmateRSError::IoError(ErrorContext::new(format!(
295                    "Failed to get vLog file size: {}",
296                    e
297                )))
298            })?
299            .len();
300
301        let segment_stats = Arc::new(DashMap::new());
302        // Initialize stats for the current segment
303        let mut initial_stats = SegmentStats::new();
304        initial_stats.total_bytes = current_size;
305        initial_stats.live_bytes = current_size;
306        segment_stats.insert(file_id, initial_stats);
307
308        let segment_readers = Arc::new(DashMap::new());
309        segment_readers.insert(file_id, Arc::new(RwLock::new(())));
310
311        let now_millis = std::time::SystemTime::now()
312            .duration_since(std::time::UNIX_EPOCH)
313            .map(|d| d.as_millis() as u64)
314            .unwrap_or(0);
315
316        Ok(Self {
317            config,
318            gc_config,
319            current_file_id: Arc::new(RwLock::new(file_id)),
320            writer: Arc::new(RwLock::new(std::io::BufWriter::new(file))),
321            current_offset: Arc::new(RwLock::new(current_size)),
322            current_size: Arc::new(RwLock::new(current_size)),
323            segment_stats,
324            gc_running: Arc::new(AtomicBool::new(false)),
325            segment_readers,
326            last_write_time: Arc::new(AtomicU64::new(now_millis)),
327        })
328    }
329
330    /// Check if a value should be stored in vLog
331    pub fn should_separate(&self, value: &CipherBlob) -> bool {
332        value.len() > self.config.value_threshold
333    }
334
335    /// Append a value to the vLog and return a pointer
336    pub fn append(&self, key: Key, value: CipherBlob) -> Result<ValuePointer> {
337        let entry = VLogEntry::new(key, value);
338        let entry_bytes = entry.encode();
339        let entry_len = entry_bytes.len() as u64;
340
341        // Get current position
342        let file_id = *self.current_file_id.read();
343        let offset = *self.current_offset.read();
344
345        // Write entry
346        {
347            let mut writer = self.writer.write();
348            writer.write_all(&entry_bytes).map_err(|e| {
349                AmateRSError::IoError(ErrorContext::new(format!(
350                    "Failed to write vLog entry: {}",
351                    e
352                )))
353            })?;
354
355            if self.config.sync_on_write {
356                writer.flush().map_err(|e| {
357                    AmateRSError::IoError(ErrorContext::new(format!("Failed to flush vLog: {}", e)))
358                })?;
359            }
360        }
361
362        // Update offset and size
363        {
364            let mut current_offset = self.current_offset.write();
365            *current_offset += entry_len;
366        }
367        {
368            let mut current_size = self.current_size.write();
369            *current_size += entry_len;
370        }
371
372        // Update segment stats
373        {
374            let mut stats = self
375                .segment_stats
376                .entry(file_id)
377                .or_insert_with(SegmentStats::new);
378            stats.record_write(entry_len);
379        }
380
381        // Update last write timestamp
382        {
383            let now_millis = std::time::SystemTime::now()
384                .duration_since(std::time::UNIX_EPOCH)
385                .map(|d| d.as_millis() as u64)
386                .unwrap_or(0);
387            self.last_write_time.store(now_millis, Ordering::Release);
388        }
389
390        // Check if rotation is needed
391        if *self.current_size.read() >= self.config.max_file_size {
392            self.rotate()?;
393        }
394
395        // Create pointer
396        let pointer = ValuePointer::new(file_id, offset, entry_bytes.len() as u32, entry.checksum);
397
398        Ok(pointer)
399    }
400
401    /// Read a value from the vLog using a pointer
402    pub fn read(&self, pointer: &ValuePointer) -> Result<CipherBlob> {
403        let file_path = Self::vlog_file_path(&self.config.vlog_dir, pointer.file_id);
404
405        // Acquire read lock on the segment to prevent deletion during read
406        let reader_lock = self
407            .segment_readers
408            .entry(pointer.file_id)
409            .or_insert_with(|| Arc::new(RwLock::new(())))
410            .clone();
411        let _read_guard = reader_lock.read();
412
413        // Open file for reading
414        let mut file = File::open(&file_path).map_err(|e| {
415            AmateRSError::IoError(ErrorContext::new(format!(
416                "Failed to open vLog file for reading: {}",
417                e
418            )))
419        })?;
420
421        // Seek to offset
422        file.seek(SeekFrom::Start(pointer.offset)).map_err(|e| {
423            AmateRSError::IoError(ErrorContext::new(format!(
424                "Failed to seek vLog file: {}",
425                e
426            )))
427        })?;
428
429        // Read entry
430        let mut entry_bytes = vec![0u8; pointer.length as usize];
431        file.read_exact(&mut entry_bytes).map_err(|e| {
432            AmateRSError::IoError(ErrorContext::new(format!(
433                "Failed to read vLog entry: {}",
434                e
435            )))
436        })?;
437
438        // Decode entry
439        let entry = VLogEntry::decode(&entry_bytes)?;
440
441        Ok(entry.value)
442    }
443
444    /// Rotate to a new vLog file
445    pub(crate) fn rotate(&self) -> Result<()> {
446        // Flush current file
447        {
448            let mut writer = self.writer.write();
449            writer.flush().map_err(|e| {
450                AmateRSError::IoError(ErrorContext::new(format!("Failed to flush vLog: {}", e)))
451            })?;
452        }
453
454        // Increment file ID
455        let new_file_id = {
456            let mut file_id = self.current_file_id.write();
457            *file_id += 1;
458            *file_id
459        };
460
461        // Create new file
462        let new_path = Self::vlog_file_path(&self.config.vlog_dir, new_file_id);
463        let file = OpenOptions::new()
464            .create(true)
465            .append(true)
466            .open(&new_path)
467            .map_err(|e| {
468                AmateRSError::IoError(ErrorContext::new(format!(
469                    "Failed to create new vLog file: {}",
470                    e
471                )))
472            })?;
473
474        // Update writer
475        {
476            let mut writer = self.writer.write();
477            *writer = std::io::BufWriter::new(file);
478        }
479
480        // Reset offset and size
481        {
482            let mut offset = self.current_offset.write();
483            *offset = 0;
484        }
485        {
486            let mut size = self.current_size.write();
487            *size = 0;
488        }
489
490        // Initialize stats and reader lock for new segment
491        self.segment_stats.insert(new_file_id, SegmentStats::new());
492        self.segment_readers
493            .insert(new_file_id, Arc::new(RwLock::new(())));
494
495        Ok(())
496    }
497
498    /// Find the latest vLog file number
499    pub(crate) fn find_latest_vlog(config: &ValueLogConfig) -> Result<u64> {
500        let mut max_file_id = 0u64;
501
502        if config.vlog_dir.exists() {
503            let entries = std::fs::read_dir(&config.vlog_dir).map_err(|e| {
504                AmateRSError::IoError(ErrorContext::new(format!(
505                    "Failed to read vLog directory: {}",
506                    e
507                )))
508            })?;
509
510            for entry in entries {
511                let entry = entry.map_err(|e| {
512                    AmateRSError::IoError(ErrorContext::new(format!(
513                        "Failed to read directory entry: {}",
514                        e
515                    )))
516                })?;
517
518                let file_name = entry.file_name();
519                let name = file_name.to_string_lossy();
520
521                // Parse vLog file names: vlog_NNNNNNNN.log
522                if name.starts_with("vlog_") && name.ends_with(".log") {
523                    if let Ok(number) = name[5..name.len() - 4].parse::<u64>() {
524                        if number > max_file_id {
525                            max_file_id = number;
526                        }
527                    }
528                }
529            }
530        }
531
532        Ok(max_file_id)
533    }
534
535    /// Generate vLog file path
536    pub(crate) fn vlog_file_path(vlog_dir: &Path, file_id: u64) -> PathBuf {
537        vlog_dir.join(format!("vlog_{:08}.log", file_id))
538    }
539
540    /// Flush buffered writes
541    pub fn flush(&self) -> Result<()> {
542        let mut writer = self.writer.write();
543        writer.flush().map_err(|e| {
544            AmateRSError::IoError(ErrorContext::new(format!("Failed to flush vLog: {}", e)))
545        })?;
546
547        writer.get_ref().sync_all().map_err(|e| {
548            AmateRSError::IoError(ErrorContext::new(format!("Failed to sync vLog: {}", e)))
549        })?;
550
551        Ok(())
552    }
553
554    /// Get current file ID
555    pub fn current_file_id(&self) -> u64 {
556        *self.current_file_id.read()
557    }
558
559    /// Get configuration
560    pub fn config(&self) -> &ValueLogConfig {
561        &self.config
562    }
563
564    /// Get the timestamp (millis since UNIX epoch) of the last write operation
565    pub fn last_write_time_millis(&self) -> u64 {
566        self.last_write_time.load(Ordering::Acquire)
567    }
568
569    /// Get the duration since the last write operation
570    pub fn time_since_last_write(&self) -> std::time::Duration {
571        let last_millis = self.last_write_time_millis();
572        let now_millis = std::time::SystemTime::now()
573            .duration_since(std::time::UNIX_EPOCH)
574            .map(|d| d.as_millis() as u64)
575            .unwrap_or(0);
576        let elapsed_millis = now_millis.saturating_sub(last_millis);
577        std::time::Duration::from_millis(elapsed_millis)
578    }
579}
580
581#[cfg(test)]
582mod tests {
583    use super::*;
584    use std::env;
585
586    #[test]
587    fn test_value_pointer_encode_decode() -> Result<()> {
588        let pointer = ValuePointer::new(42, 1024, 256, 0xDEADBEEF);
589
590        let bytes = pointer.encode();
591        let decoded = ValuePointer::decode(&bytes)?;
592
593        assert_eq!(decoded.file_id, 42);
594        assert_eq!(decoded.offset, 1024);
595        assert_eq!(decoded.length, 256);
596        assert_eq!(decoded.checksum, 0xDEADBEEF);
597
598        Ok(())
599    }
600
601    #[test]
602    fn test_vlog_entry_encode_decode() -> Result<()> {
603        let key = Key::from_str("test_key");
604        let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
605        let entry = VLogEntry::new(key.clone(), value.clone());
606
607        let bytes = entry.encode();
608        let decoded = VLogEntry::decode(&bytes)?;
609
610        assert_eq!(decoded.key, key);
611        assert_eq!(decoded.value, value);
612        assert_eq!(decoded.checksum, entry.checksum);
613
614        Ok(())
615    }
616
617    #[test]
618    fn test_value_log_basic_operations() -> Result<()> {
619        let temp_dir = env::temp_dir().join("test_vlog_basic");
620        std::fs::create_dir_all(&temp_dir).ok();
621
622        let vlog = ValueLog::new(&temp_dir)?;
623
624        let key = Key::from_str("key1");
625        let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
626
627        // Append value
628        let pointer = vlog.append(key.clone(), value.clone())?;
629        vlog.flush()?; // Flush to ensure data is on disk
630
631        // Read value back
632        let read_value = vlog.read(&pointer)?;
633
634        assert_eq!(read_value, value);
635
636        std::fs::remove_dir_all(&temp_dir).ok();
637        Ok(())
638    }
639
640    #[test]
641    fn test_value_log_should_separate() -> Result<()> {
642        let temp_dir = env::temp_dir().join("test_vlog_should_separate");
643        std::fs::create_dir_all(&temp_dir).ok();
644
645        let vlog = ValueLog::new(&temp_dir)?;
646
647        // Small value (< 1KB)
648        let small = CipherBlob::new(vec![0u8; 512]);
649        assert!(!vlog.should_separate(&small));
650
651        // Large value (> 1KB)
652        let large = CipherBlob::new(vec![0u8; 2048]);
653        assert!(vlog.should_separate(&large));
654
655        std::fs::remove_dir_all(&temp_dir).ok();
656        Ok(())
657    }
658
659    #[test]
660    fn test_value_log_multiple_values() -> Result<()> {
661        let temp_dir = env::temp_dir().join("test_vlog_multiple");
662        std::fs::create_dir_all(&temp_dir).ok();
663
664        let vlog = ValueLog::new(&temp_dir)?;
665
666        let mut pointers = Vec::new();
667
668        // Write multiple values
669        for i in 0..10 {
670            let key = Key::from_str(&format!("key_{}", i));
671            let value = CipherBlob::new(vec![i as u8; 1000]);
672            let pointer = vlog.append(key, value)?;
673            pointers.push((pointer, i as u8));
674        }
675
676        vlog.flush()?; // Flush to ensure all data is on disk
677
678        // Read values back
679        for (pointer, expected_byte) in pointers {
680            let value = vlog.read(&pointer)?;
681            assert_eq!(value.as_bytes()[0], expected_byte);
682        }
683
684        std::fs::remove_dir_all(&temp_dir).ok();
685        Ok(())
686    }
687
688    #[test]
689    fn test_value_log_rotation() -> Result<()> {
690        let temp_dir = env::temp_dir().join("test_vlog_rotation");
691        std::fs::create_dir_all(&temp_dir).ok();
692
693        let config = ValueLogConfig {
694            vlog_dir: temp_dir.clone(),
695            max_file_size: 4096, // Small size to trigger rotation
696            sync_on_write: false,
697            ..Default::default()
698        };
699
700        let vlog = ValueLog::with_config(config)?;
701
702        let initial_file_id = vlog.current_file_id();
703
704        // Write enough data to trigger rotation
705        for i in 0..10 {
706            let key = Key::from_str(&format!("key_{}", i));
707            let value = CipherBlob::new(vec![i as u8; 1000]);
708            vlog.append(key, value)?;
709        }
710
711        // File ID should have increased
712        assert!(vlog.current_file_id() > initial_file_id);
713
714        std::fs::remove_dir_all(&temp_dir).ok();
715        Ok(())
716    }
717
718    #[test]
719    fn test_value_log_garbage_collection() -> Result<()> {
720        let temp_dir = env::temp_dir().join("test_vlog_gc");
721        std::fs::create_dir_all(&temp_dir).ok();
722
723        let vlog = ValueLog::new(&temp_dir)?;
724
725        // Write some values
726        let mut keys = Vec::new();
727        for i in 0..10 {
728            let key = Key::from_str(&format!("key_{}", i));
729            let value = CipherBlob::new(vec![i as u8; 1000]);
730            vlog.append(key.clone(), value)?;
731            keys.push(key);
732        }
733
734        vlog.flush()?;
735
736        let file_id = vlog.current_file_id();
737
738        // Simulate: keys 0-4 are live, keys 5-9 are dead
739        let is_live = |key: &Key| -> bool {
740            let key_str = String::from_utf8_lossy(key.as_bytes());
741            if let Some(num_str) = key_str.strip_prefix("key_") {
742                if let Ok(num) = num_str.parse::<usize>() {
743                    return num < 5;
744                }
745            }
746            false
747        };
748
749        // Calculate garbage ratio
750        let ratio = vlog.calculate_garbage_ratio(file_id, is_live)?;
751        assert!(ratio > 0.4 && ratio < 0.6); // Should be around 50%
752
753        // Perform GC
754        let stats = vlog.garbage_collect_file(file_id, is_live)?;
755
756        assert_eq!(stats.live_count, 5);
757        assert_eq!(stats.dead_count, 5);
758        assert!(stats.reclaimed_bytes > 0);
759
760        std::fs::remove_dir_all(&temp_dir).ok();
761        Ok(())
762    }
763
764    #[test]
765    fn test_value_log_large_values() -> Result<()> {
766        let temp_dir = env::temp_dir().join("test_vlog_large");
767        std::fs::create_dir_all(&temp_dir).ok();
768
769        let vlog = ValueLog::new(&temp_dir)?;
770
771        // Write a large value (10KB)
772        let key = Key::from_str("large_key");
773        let large_value = CipherBlob::new(vec![42u8; 10_000]);
774
775        let pointer = vlog.append(key, large_value.clone())?;
776        vlog.flush()?;
777
778        // Read it back
779        let read_value = vlog.read(&pointer)?;
780
781        assert_eq!(read_value, large_value);
782        assert_eq!(read_value.len(), 10_000);
783
784        std::fs::remove_dir_all(&temp_dir).ok();
785        Ok(())
786    }
787}