amaters_core/storage/
value_log.rs

1//! Value Log (vLog) implementation for WiscKey-style value separation
2//!
3//! The value log stores large values separately from the LSM-Tree to reduce
4//! write amplification. Small values are stored inline, while large values
5//! (>threshold) are stored in the vLog and referenced by pointers.
6//!
7//! Key features:
8//! - Sequential append-only writes for high throughput
9//! - File rotation when files reach size threshold
10//! - Garbage collection to reclaim space from dead values
11//! - Value pointers stored in LSM-Tree for indirection
12
13use crate::error::{AmateRSError, ErrorContext, Result};
14use crate::types::{CipherBlob, Key};
15use parking_lot::RwLock;
16use std::fs::{File, OpenOptions};
17use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
18use std::path::{Path, PathBuf};
19use std::sync::Arc;
20
21/// Value pointer for referencing values in the vLog
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct ValuePointer {
24    /// File ID (vLog file number)
25    pub file_id: u64,
26    /// Offset within the file
27    pub offset: u64,
28    /// Length of the value
29    pub length: u32,
30    /// CRC32 checksum for verification
31    pub checksum: u32,
32}
33
34impl ValuePointer {
35    /// Create a new value pointer
36    pub fn new(file_id: u64, offset: u64, length: u32, checksum: u32) -> Self {
37        Self {
38            file_id,
39            offset,
40            length,
41            checksum,
42        }
43    }
44
45    /// Encode to bytes
46    pub fn encode(&self) -> Vec<u8> {
47        let mut bytes = Vec::with_capacity(24);
48        bytes.extend_from_slice(&self.file_id.to_le_bytes());
49        bytes.extend_from_slice(&self.offset.to_le_bytes());
50        bytes.extend_from_slice(&self.length.to_le_bytes());
51        bytes.extend_from_slice(&self.checksum.to_le_bytes());
52        bytes
53    }
54
55    /// Decode from bytes
56    pub fn decode(bytes: &[u8]) -> Result<Self> {
57        if bytes.len() < 24 {
58            return Err(AmateRSError::SerializationError(ErrorContext::new(
59                "ValuePointer too short",
60            )));
61        }
62
63        let file_id = u64::from_le_bytes(bytes[0..8].try_into().map_err(|_| {
64            AmateRSError::SerializationError(ErrorContext::new("Failed to read file_id"))
65        })?);
66
67        let offset = u64::from_le_bytes(bytes[8..16].try_into().map_err(|_| {
68            AmateRSError::SerializationError(ErrorContext::new("Failed to read offset"))
69        })?);
70
71        let length = u32::from_le_bytes(bytes[16..20].try_into().map_err(|_| {
72            AmateRSError::SerializationError(ErrorContext::new("Failed to read length"))
73        })?);
74
75        let checksum = u32::from_le_bytes(bytes[20..24].try_into().map_err(|_| {
76            AmateRSError::SerializationError(ErrorContext::new("Failed to read checksum"))
77        })?);
78
79        Ok(Self {
80            file_id,
81            offset,
82            length,
83            checksum,
84        })
85    }
86}
87
88/// Value log configuration
89#[derive(Debug, Clone)]
90pub struct ValueLogConfig {
91    /// Directory for vLog files
92    pub vlog_dir: PathBuf,
93    /// Maximum file size before rotation (default: 1GB)
94    pub max_file_size: u64,
95    /// Value size threshold for separation (default: 1KB)
96    pub value_threshold: usize,
97    /// Whether to sync after each write (default: false for performance)
98    pub sync_on_write: bool,
99    /// Garbage collection threshold (default: 0.5 = 50% garbage)
100    pub gc_threshold: f64,
101}
102
103impl Default for ValueLogConfig {
104    fn default() -> Self {
105        Self {
106            vlog_dir: PathBuf::from("./vlog"),
107            max_file_size: 1024 * 1024 * 1024, // 1GB
108            value_threshold: 1024,             // 1KB
109            sync_on_write: false,
110            gc_threshold: 0.5,
111        }
112    }
113}
114
115/// Value log entry
116struct VLogEntry {
117    /// Key (for GC to identify ownership)
118    key: Key,
119    /// Value data
120    value: CipherBlob,
121    /// CRC32 checksum
122    checksum: u32,
123}
124
125impl VLogEntry {
126    fn new(key: Key, value: CipherBlob) -> Self {
127        let mut hasher = crc32fast::Hasher::new();
128        hasher.update(key.as_bytes());
129        hasher.update(value.as_bytes());
130        let checksum = hasher.finalize();
131
132        Self {
133            key,
134            value,
135            checksum,
136        }
137    }
138
139    fn encode(&self) -> Vec<u8> {
140        let mut bytes = Vec::new();
141
142        // Magic number (0x564C4F47 = "VLOG" in hex)
143        bytes.extend_from_slice(&0x564C4F47u32.to_le_bytes());
144
145        // Key length and data
146        bytes.extend_from_slice(&(self.key.len() as u32).to_le_bytes());
147        bytes.extend_from_slice(self.key.as_bytes());
148
149        // Value length and data
150        bytes.extend_from_slice(&(self.value.len() as u32).to_le_bytes());
151        bytes.extend_from_slice(self.value.as_bytes());
152
153        // Checksum
154        bytes.extend_from_slice(&self.checksum.to_le_bytes());
155
156        bytes
157    }
158
159    fn decode(bytes: &[u8]) -> Result<Self> {
160        if bytes.len() < 16 {
161            return Err(AmateRSError::SerializationError(ErrorContext::new(
162                "VLogEntry too short",
163            )));
164        }
165
166        let mut offset = 0;
167
168        // Verify magic number
169        let magic = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
170        if magic != 0x564C4F47 {
171            return Err(AmateRSError::SerializationError(ErrorContext::new(
172                "Invalid vLog entry magic number",
173            )));
174        }
175        offset += 4;
176
177        // Key
178        let key_len = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
179            AmateRSError::SerializationError(ErrorContext::new("Failed to read key length"))
180        })?) as usize;
181        offset += 4;
182
183        let key_bytes = &bytes[offset..offset + key_len];
184        let key = Key::from_slice(key_bytes);
185        offset += key_len;
186
187        // Value
188        let value_len = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
189            AmateRSError::SerializationError(ErrorContext::new("Failed to read value length"))
190        })?) as usize;
191        offset += 4;
192
193        let value_bytes = &bytes[offset..offset + value_len];
194        let value = CipherBlob::new(value_bytes.to_vec());
195        offset += value_len;
196
197        // Checksum
198        let checksum = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
199            AmateRSError::SerializationError(ErrorContext::new("Failed to read checksum"))
200        })?);
201
202        let entry = Self {
203            key,
204            value,
205            checksum,
206        };
207
208        // Verify checksum
209        let mut hasher = crc32fast::Hasher::new();
210        hasher.update(entry.key.as_bytes());
211        hasher.update(entry.value.as_bytes());
212        let calculated = hasher.finalize();
213
214        if calculated != entry.checksum {
215            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
216                "vLog entry checksum mismatch: expected {}, got {}",
217                entry.checksum, calculated
218            ))));
219        }
220
221        Ok(entry)
222    }
223}
224
225/// Value log for storing large values
226pub struct ValueLog {
227    /// Configuration
228    config: ValueLogConfig,
229    /// Current vLog file number
230    current_file_id: Arc<RwLock<u64>>,
231    /// Current file writer
232    writer: Arc<RwLock<BufWriter<File>>>,
233    /// Current file offset
234    current_offset: Arc<RwLock<u64>>,
235    /// Current file size
236    current_size: Arc<RwLock<u64>>,
237}
238
239impl ValueLog {
240    /// Create a new value log with default configuration
241    pub fn new(vlog_dir: impl AsRef<Path>) -> Result<Self> {
242        let config = ValueLogConfig {
243            vlog_dir: vlog_dir.as_ref().to_path_buf(),
244            ..Default::default()
245        };
246        Self::with_config(config)
247    }
248
249    /// Create a new value log with custom configuration
250    pub fn with_config(config: ValueLogConfig) -> Result<Self> {
251        // Create vLog directory
252        std::fs::create_dir_all(&config.vlog_dir).map_err(|e| {
253            AmateRSError::IoError(ErrorContext::new(format!(
254                "Failed to create vLog directory: {}",
255                e
256            )))
257        })?;
258
259        // Find the latest vLog file or create a new one
260        let file_id = Self::find_latest_vlog(&config)?;
261        let file_path = Self::vlog_file_path(&config.vlog_dir, file_id);
262
263        let file = OpenOptions::new()
264            .create(true)
265            .append(true)
266            .open(&file_path)
267            .map_err(|e| {
268                AmateRSError::IoError(ErrorContext::new(format!("Failed to open vLog: {}", e)))
269            })?;
270
271        let current_size = file
272            .metadata()
273            .map_err(|e| {
274                AmateRSError::IoError(ErrorContext::new(format!(
275                    "Failed to get vLog file size: {}",
276                    e
277                )))
278            })?
279            .len();
280
281        Ok(Self {
282            config,
283            current_file_id: Arc::new(RwLock::new(file_id)),
284            writer: Arc::new(RwLock::new(BufWriter::new(file))),
285            current_offset: Arc::new(RwLock::new(current_size)),
286            current_size: Arc::new(RwLock::new(current_size)),
287        })
288    }
289
290    /// Check if a value should be stored in vLog
291    pub fn should_separate(&self, value: &CipherBlob) -> bool {
292        value.len() > self.config.value_threshold
293    }
294
295    /// Append a value to the vLog and return a pointer
296    pub fn append(&self, key: Key, value: CipherBlob) -> Result<ValuePointer> {
297        let entry = VLogEntry::new(key, value);
298        let entry_bytes = entry.encode();
299        let entry_len = entry_bytes.len() as u64;
300
301        // Get current position
302        let file_id = *self.current_file_id.read();
303        let offset = *self.current_offset.read();
304
305        // Write entry
306        {
307            let mut writer = self.writer.write();
308            writer.write_all(&entry_bytes).map_err(|e| {
309                AmateRSError::IoError(ErrorContext::new(format!(
310                    "Failed to write vLog entry: {}",
311                    e
312                )))
313            })?;
314
315            if self.config.sync_on_write {
316                writer.flush().map_err(|e| {
317                    AmateRSError::IoError(ErrorContext::new(format!("Failed to flush vLog: {}", e)))
318                })?;
319            }
320        }
321
322        // Update offset and size
323        {
324            let mut current_offset = self.current_offset.write();
325            *current_offset += entry_len;
326        }
327        {
328            let mut current_size = self.current_size.write();
329            *current_size += entry_len;
330        }
331
332        // Check if rotation is needed
333        if *self.current_size.read() >= self.config.max_file_size {
334            self.rotate()?;
335        }
336
337        // Create pointer
338        let pointer = ValuePointer::new(file_id, offset, entry_bytes.len() as u32, entry.checksum);
339
340        Ok(pointer)
341    }
342
343    /// Read a value from the vLog using a pointer
344    pub fn read(&self, pointer: &ValuePointer) -> Result<CipherBlob> {
345        let file_path = Self::vlog_file_path(&self.config.vlog_dir, pointer.file_id);
346
347        // Open file for reading
348        let mut file = File::open(&file_path).map_err(|e| {
349            AmateRSError::IoError(ErrorContext::new(format!(
350                "Failed to open vLog file for reading: {}",
351                e
352            )))
353        })?;
354
355        // Seek to offset
356        file.seek(SeekFrom::Start(pointer.offset)).map_err(|e| {
357            AmateRSError::IoError(ErrorContext::new(format!(
358                "Failed to seek vLog file: {}",
359                e
360            )))
361        })?;
362
363        // Read entry
364        let mut entry_bytes = vec![0u8; pointer.length as usize];
365        file.read_exact(&mut entry_bytes).map_err(|e| {
366            AmateRSError::IoError(ErrorContext::new(format!(
367                "Failed to read vLog entry: {}",
368                e
369            )))
370        })?;
371
372        // Decode entry
373        let entry = VLogEntry::decode(&entry_bytes)?;
374
375        Ok(entry.value)
376    }
377
378    /// Rotate to a new vLog file
379    fn rotate(&self) -> Result<()> {
380        // Flush current file
381        {
382            let mut writer = self.writer.write();
383            writer.flush().map_err(|e| {
384                AmateRSError::IoError(ErrorContext::new(format!("Failed to flush vLog: {}", e)))
385            })?;
386        }
387
388        // Increment file ID
389        let new_file_id = {
390            let mut file_id = self.current_file_id.write();
391            *file_id += 1;
392            *file_id
393        };
394
395        // Create new file
396        let new_path = Self::vlog_file_path(&self.config.vlog_dir, new_file_id);
397        let file = OpenOptions::new()
398            .create(true)
399            .append(true)
400            .open(&new_path)
401            .map_err(|e| {
402                AmateRSError::IoError(ErrorContext::new(format!(
403                    "Failed to create new vLog file: {}",
404                    e
405                )))
406            })?;
407
408        // Update writer
409        {
410            let mut writer = self.writer.write();
411            *writer = BufWriter::new(file);
412        }
413
414        // Reset offset and size
415        {
416            let mut offset = self.current_offset.write();
417            *offset = 0;
418        }
419        {
420            let mut size = self.current_size.write();
421            *size = 0;
422        }
423
424        Ok(())
425    }
426
427    /// Find the latest vLog file number
428    fn find_latest_vlog(config: &ValueLogConfig) -> Result<u64> {
429        let mut max_file_id = 0u64;
430
431        if config.vlog_dir.exists() {
432            let entries = std::fs::read_dir(&config.vlog_dir).map_err(|e| {
433                AmateRSError::IoError(ErrorContext::new(format!(
434                    "Failed to read vLog directory: {}",
435                    e
436                )))
437            })?;
438
439            for entry in entries {
440                let entry = entry.map_err(|e| {
441                    AmateRSError::IoError(ErrorContext::new(format!(
442                        "Failed to read directory entry: {}",
443                        e
444                    )))
445                })?;
446
447                let file_name = entry.file_name();
448                let name = file_name.to_string_lossy();
449
450                // Parse vLog file names: vlog_NNNNNNNN.log
451                if name.starts_with("vlog_") && name.ends_with(".log") {
452                    if let Ok(number) = name[5..name.len() - 4].parse::<u64>() {
453                        if number > max_file_id {
454                            max_file_id = number;
455                        }
456                    }
457                }
458            }
459        }
460
461        Ok(max_file_id)
462    }
463
464    /// Generate vLog file path
465    fn vlog_file_path(vlog_dir: &Path, file_id: u64) -> PathBuf {
466        vlog_dir.join(format!("vlog_{:08}.log", file_id))
467    }
468
469    /// Flush buffered writes
470    pub fn flush(&self) -> Result<()> {
471        let mut writer = self.writer.write();
472        writer.flush().map_err(|e| {
473            AmateRSError::IoError(ErrorContext::new(format!("Failed to flush vLog: {}", e)))
474        })?;
475
476        writer.get_ref().sync_all().map_err(|e| {
477            AmateRSError::IoError(ErrorContext::new(format!("Failed to sync vLog: {}", e)))
478        })?;
479
480        Ok(())
481    }
482
483    /// Get current file ID
484    pub fn current_file_id(&self) -> u64 {
485        *self.current_file_id.read()
486    }
487
488    /// Get configuration
489    pub fn config(&self) -> &ValueLogConfig {
490        &self.config
491    }
492
493    /// Perform garbage collection on a vLog file
494    ///
495    /// Scans the file and rewrites live values to a new file, discarding dead values.
496    /// This is typically called when a file has too much garbage.
497    ///
498    /// `is_live_fn`: Function that checks if a key is still live in the LSM-Tree
499    pub fn garbage_collect<F>(&self, file_id: u64, is_live_fn: F) -> Result<GcStats>
500    where
501        F: Fn(&Key) -> bool,
502    {
503        let file_path = Self::vlog_file_path(&self.config.vlog_dir, file_id);
504
505        // Open old file for reading
506        let old_file = File::open(&file_path).map_err(|e| {
507            AmateRSError::IoError(ErrorContext::new(format!(
508                "Failed to open vLog file for GC: {}",
509                e
510            )))
511        })?;
512
513        let file_size = old_file
514            .metadata()
515            .map_err(|e| {
516                AmateRSError::IoError(ErrorContext::new(format!("Failed to get file size: {}", e)))
517            })?
518            .len();
519
520        let mut reader = BufReader::new(old_file);
521        let mut offset = 0u64;
522
523        let mut live_values = Vec::new();
524        let mut dead_count = 0usize;
525        let mut live_count = 0usize;
526
527        // Scan file and identify live values
528        while offset < file_size {
529            // Read entry length (magic + key_len + key + value_len + value + checksum)
530            let start_offset = offset;
531
532            // Try to read entry
533            let mut magic_bytes = [0u8; 4];
534            match reader.read_exact(&mut magic_bytes) {
535                Ok(()) => {}
536                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
537                    // End of file
538                    break;
539                }
540                Err(e) => {
541                    return Err(AmateRSError::IoError(ErrorContext::new(format!(
542                        "Failed to read magic: {}",
543                        e
544                    ))));
545                }
546            }
547
548            // Verify magic
549            let magic = u32::from_le_bytes(magic_bytes);
550            if magic != 0x564C4F47 {
551                // Corrupted entry, skip
552                break;
553            }
554
555            // Read key length
556            let mut key_len_bytes = [0u8; 4];
557            reader.read_exact(&mut key_len_bytes).map_err(|e| {
558                AmateRSError::IoError(ErrorContext::new(format!(
559                    "Failed to read key length: {}",
560                    e
561                )))
562            })?;
563            let key_len = u32::from_le_bytes(key_len_bytes) as usize;
564
565            // Read key
566            let mut key_bytes = vec![0u8; key_len];
567            reader.read_exact(&mut key_bytes).map_err(|e| {
568                AmateRSError::IoError(ErrorContext::new(format!("Failed to read key: {}", e)))
569            })?;
570            let key = Key::from_slice(&key_bytes);
571
572            // Read value length
573            let mut value_len_bytes = [0u8; 4];
574            reader.read_exact(&mut value_len_bytes).map_err(|e| {
575                AmateRSError::IoError(ErrorContext::new(format!(
576                    "Failed to read value length: {}",
577                    e
578                )))
579            })?;
580            let value_len = u32::from_le_bytes(value_len_bytes) as usize;
581
582            // Read value
583            let mut value_bytes = vec![0u8; value_len];
584            reader.read_exact(&mut value_bytes).map_err(|e| {
585                AmateRSError::IoError(ErrorContext::new(format!("Failed to read value: {}", e)))
586            })?;
587            let value = CipherBlob::new(value_bytes);
588
589            // Read checksum
590            let mut checksum_bytes = [0u8; 4];
591            reader.read_exact(&mut checksum_bytes).map_err(|e| {
592                AmateRSError::IoError(ErrorContext::new(format!("Failed to read checksum: {}", e)))
593            })?;
594
595            // Calculate entry size
596            let entry_size = 4 + 4 + key_len + 4 + value_len + 4;
597            offset += entry_size as u64;
598
599            // Check if value is live
600            if is_live_fn(&key) {
601                live_values.push((key, value));
602                live_count += 1;
603            } else {
604                dead_count += 1;
605            }
606        }
607
608        // Rewrite live values to new file
609        let new_file_id = Self::find_latest_vlog(&self.config)? + 1;
610        let new_file_path = Self::vlog_file_path(&self.config.vlog_dir, new_file_id);
611
612        let new_file = OpenOptions::new()
613            .create(true)
614            .write(true)
615            .truncate(true)
616            .open(&new_file_path)
617            .map_err(|e| {
618                AmateRSError::IoError(ErrorContext::new(format!(
619                    "Failed to create new vLog file: {}",
620                    e
621                )))
622            })?;
623
624        let mut new_writer = BufWriter::new(new_file);
625
626        for (key, value) in live_values {
627            let entry = VLogEntry::new(key, value);
628            let entry_bytes = entry.encode();
629            new_writer.write_all(&entry_bytes).map_err(|e| {
630                AmateRSError::IoError(ErrorContext::new(format!(
631                    "Failed to write GC entry: {}",
632                    e
633                )))
634            })?;
635        }
636
637        new_writer.flush().map_err(|e| {
638            AmateRSError::IoError(ErrorContext::new(format!("Failed to flush GC file: {}", e)))
639        })?;
640
641        // Delete old file
642        std::fs::remove_file(&file_path).map_err(|e| {
643            AmateRSError::IoError(ErrorContext::new(format!(
644                "Failed to delete old vLog file: {}",
645                e
646            )))
647        })?;
648
649        Ok(GcStats {
650            file_id,
651            live_count,
652            dead_count,
653            reclaimed_bytes: file_size
654                - new_writer
655                    .get_ref()
656                    .metadata()
657                    .map_err(|e| {
658                        AmateRSError::IoError(ErrorContext::new(format!(
659                            "Failed to get new file size: {}",
660                            e
661                        )))
662                    })?
663                    .len(),
664        })
665    }
666
667    /// Calculate garbage ratio for a vLog file
668    ///
669    /// Returns the ratio of dead values to total values.
670    /// This can be used to determine if GC is needed.
671    pub fn calculate_garbage_ratio<F>(&self, file_id: u64, is_live_fn: F) -> Result<f64>
672    where
673        F: Fn(&Key) -> bool,
674    {
675        let file_path = Self::vlog_file_path(&self.config.vlog_dir, file_id);
676
677        let file = File::open(&file_path).map_err(|e| {
678            AmateRSError::IoError(ErrorContext::new(format!(
679                "Failed to open vLog file: {}",
680                e
681            )))
682        })?;
683
684        let file_size = file
685            .metadata()
686            .map_err(|e| {
687                AmateRSError::IoError(ErrorContext::new(format!("Failed to get file size: {}", e)))
688            })?
689            .len();
690
691        let mut reader = BufReader::new(file);
692        let mut offset = 0u64;
693
694        let mut live_bytes = 0u64;
695        let mut dead_bytes = 0u64;
696
697        while offset < file_size {
698            let start_offset = offset;
699
700            // Try to read entry
701            let mut magic_bytes = [0u8; 4];
702            match reader.read_exact(&mut magic_bytes) {
703                Ok(()) => {}
704                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
705                Err(e) => {
706                    return Err(AmateRSError::IoError(ErrorContext::new(format!(
707                        "Failed to read magic: {}",
708                        e
709                    ))));
710                }
711            }
712
713            let magic = u32::from_le_bytes(magic_bytes);
714            if magic != 0x564C4F47 {
715                break;
716            }
717
718            // Read key length
719            let mut key_len_bytes = [0u8; 4];
720            reader.read_exact(&mut key_len_bytes).map_err(|e| {
721                AmateRSError::IoError(ErrorContext::new(format!(
722                    "Failed to read key length: {}",
723                    e
724                )))
725            })?;
726            let key_len = u32::from_le_bytes(key_len_bytes) as usize;
727
728            // Read key
729            let mut key_bytes = vec![0u8; key_len];
730            reader.read_exact(&mut key_bytes).map_err(|e| {
731                AmateRSError::IoError(ErrorContext::new(format!("Failed to read key: {}", e)))
732            })?;
733            let key = Key::from_slice(&key_bytes);
734
735            // Read value length
736            let mut value_len_bytes = [0u8; 4];
737            reader.read_exact(&mut value_len_bytes).map_err(|e| {
738                AmateRSError::IoError(ErrorContext::new(format!(
739                    "Failed to read value length: {}",
740                    e
741                )))
742            })?;
743            let value_len = u32::from_le_bytes(value_len_bytes) as usize;
744
745            // Skip value
746            let mut value_bytes = vec![0u8; value_len];
747            reader.read_exact(&mut value_bytes).map_err(|e| {
748                AmateRSError::IoError(ErrorContext::new(format!("Failed to read value: {}", e)))
749            })?;
750
751            // Skip checksum
752            let mut checksum_bytes = [0u8; 4];
753            reader.read_exact(&mut checksum_bytes).map_err(|e| {
754                AmateRSError::IoError(ErrorContext::new(format!("Failed to read checksum: {}", e)))
755            })?;
756
757            let entry_size = 4 + 4 + key_len + 4 + value_len + 4;
758            offset += entry_size as u64;
759
760            if is_live_fn(&key) {
761                live_bytes += entry_size as u64;
762            } else {
763                dead_bytes += entry_size as u64;
764            }
765        }
766
767        let total_bytes = live_bytes + dead_bytes;
768        if total_bytes == 0 {
769            Ok(0.0)
770        } else {
771            Ok(dead_bytes as f64 / total_bytes as f64)
772        }
773    }
774}
775
776/// Garbage collection statistics
777#[derive(Debug, Clone)]
778pub struct GcStats {
779    /// File ID that was garbage collected
780    pub file_id: u64,
781    /// Number of live values kept
782    pub live_count: usize,
783    /// Number of dead values removed
784    pub dead_count: usize,
785    /// Bytes reclaimed
786    pub reclaimed_bytes: u64,
787}
788
789#[cfg(test)]
790mod tests {
791    use super::*;
792    use std::env;
793
794    #[test]
795    fn test_value_pointer_encode_decode() -> Result<()> {
796        let pointer = ValuePointer::new(42, 1024, 256, 0xDEADBEEF);
797
798        let bytes = pointer.encode();
799        let decoded = ValuePointer::decode(&bytes)?;
800
801        assert_eq!(decoded.file_id, 42);
802        assert_eq!(decoded.offset, 1024);
803        assert_eq!(decoded.length, 256);
804        assert_eq!(decoded.checksum, 0xDEADBEEF);
805
806        Ok(())
807    }
808
809    #[test]
810    fn test_vlog_entry_encode_decode() -> Result<()> {
811        let key = Key::from_str("test_key");
812        let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
813        let entry = VLogEntry::new(key.clone(), value.clone());
814
815        let bytes = entry.encode();
816        let decoded = VLogEntry::decode(&bytes)?;
817
818        assert_eq!(decoded.key, key);
819        assert_eq!(decoded.value, value);
820        assert_eq!(decoded.checksum, entry.checksum);
821
822        Ok(())
823    }
824
825    #[test]
826    fn test_value_log_basic_operations() -> Result<()> {
827        let temp_dir = env::temp_dir().join("test_vlog_basic");
828        std::fs::create_dir_all(&temp_dir).ok();
829
830        let vlog = ValueLog::new(&temp_dir)?;
831
832        let key = Key::from_str("key1");
833        let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
834
835        // Append value
836        let pointer = vlog.append(key.clone(), value.clone())?;
837        vlog.flush()?; // Flush to ensure data is on disk
838
839        // Read value back
840        let read_value = vlog.read(&pointer)?;
841
842        assert_eq!(read_value, value);
843
844        std::fs::remove_dir_all(&temp_dir).ok();
845        Ok(())
846    }
847
848    #[test]
849    fn test_value_log_should_separate() -> Result<()> {
850        let temp_dir = env::temp_dir().join("test_vlog_should_separate");
851        std::fs::create_dir_all(&temp_dir).ok();
852
853        let vlog = ValueLog::new(&temp_dir)?;
854
855        // Small value (< 1KB)
856        let small = CipherBlob::new(vec![0u8; 512]);
857        assert!(!vlog.should_separate(&small));
858
859        // Large value (> 1KB)
860        let large = CipherBlob::new(vec![0u8; 2048]);
861        assert!(vlog.should_separate(&large));
862
863        std::fs::remove_dir_all(&temp_dir).ok();
864        Ok(())
865    }
866
867    #[test]
868    fn test_value_log_multiple_values() -> Result<()> {
869        let temp_dir = env::temp_dir().join("test_vlog_multiple");
870        std::fs::create_dir_all(&temp_dir).ok();
871
872        let vlog = ValueLog::new(&temp_dir)?;
873
874        let mut pointers = Vec::new();
875
876        // Write multiple values
877        for i in 0..10 {
878            let key = Key::from_str(&format!("key_{}", i));
879            let value = CipherBlob::new(vec![i as u8; 1000]);
880            let pointer = vlog.append(key, value)?;
881            pointers.push((pointer, i as u8));
882        }
883
884        vlog.flush()?; // Flush to ensure all data is on disk
885
886        // Read values back
887        for (pointer, expected_byte) in pointers {
888            let value = vlog.read(&pointer)?;
889            assert_eq!(value.as_bytes()[0], expected_byte);
890        }
891
892        std::fs::remove_dir_all(&temp_dir).ok();
893        Ok(())
894    }
895
896    #[test]
897    fn test_value_log_rotation() -> Result<()> {
898        let temp_dir = env::temp_dir().join("test_vlog_rotation");
899        std::fs::create_dir_all(&temp_dir).ok();
900
901        let config = ValueLogConfig {
902            vlog_dir: temp_dir.clone(),
903            max_file_size: 4096, // Small size to trigger rotation
904            sync_on_write: false,
905            ..Default::default()
906        };
907
908        let vlog = ValueLog::with_config(config)?;
909
910        let initial_file_id = vlog.current_file_id();
911
912        // Write enough data to trigger rotation
913        for i in 0..10 {
914            let key = Key::from_str(&format!("key_{}", i));
915            let value = CipherBlob::new(vec![i as u8; 1000]);
916            vlog.append(key, value)?;
917        }
918
919        // File ID should have increased
920        assert!(vlog.current_file_id() > initial_file_id);
921
922        std::fs::remove_dir_all(&temp_dir).ok();
923        Ok(())
924    }
925
926    #[test]
927    fn test_value_log_garbage_collection() -> Result<()> {
928        let temp_dir = env::temp_dir().join("test_vlog_gc");
929        std::fs::create_dir_all(&temp_dir).ok();
930
931        let vlog = ValueLog::new(&temp_dir)?;
932
933        // Write some values
934        let mut keys = Vec::new();
935        for i in 0..10 {
936            let key = Key::from_str(&format!("key_{}", i));
937            let value = CipherBlob::new(vec![i as u8; 1000]);
938            vlog.append(key.clone(), value)?;
939            keys.push(key);
940        }
941
942        vlog.flush()?;
943
944        let file_id = vlog.current_file_id();
945
946        // Simulate: keys 0-4 are live, keys 5-9 are dead
947        let is_live = |key: &Key| -> bool {
948            let key_str = String::from_utf8_lossy(key.as_bytes());
949            if let Some(num_str) = key_str.strip_prefix("key_") {
950                if let Ok(num) = num_str.parse::<usize>() {
951                    return num < 5;
952                }
953            }
954            false
955        };
956
957        // Calculate garbage ratio
958        let ratio = vlog.calculate_garbage_ratio(file_id, is_live)?;
959        assert!(ratio > 0.4 && ratio < 0.6); // Should be around 50%
960
961        // Perform GC
962        let stats = vlog.garbage_collect(file_id, is_live)?;
963
964        assert_eq!(stats.live_count, 5);
965        assert_eq!(stats.dead_count, 5);
966        assert!(stats.reclaimed_bytes > 0);
967
968        std::fs::remove_dir_all(&temp_dir).ok();
969        Ok(())
970    }
971
972    #[test]
973    fn test_value_log_large_values() -> Result<()> {
974        let temp_dir = env::temp_dir().join("test_vlog_large");
975        std::fs::create_dir_all(&temp_dir).ok();
976
977        let vlog = ValueLog::new(&temp_dir)?;
978
979        // Write a large value (10KB)
980        let key = Key::from_str("large_key");
981        let large_value = CipherBlob::new(vec![42u8; 10_000]);
982
983        let pointer = vlog.append(key, large_value.clone())?;
984        vlog.flush()?;
985
986        // Read it back
987        let read_value = vlog.read(&pointer)?;
988
989        assert_eq!(read_value, large_value);
990        assert_eq!(read_value.len(), 10_000);
991
992        std::fs::remove_dir_all(&temp_dir).ok();
993        Ok(())
994    }
995}