1use crate::error::{AmateRSError, ErrorContext, Result};
14use crate::types::{CipherBlob, Key};
15use parking_lot::RwLock;
16use std::fs::{File, OpenOptions};
17use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
18use std::path::{Path, PathBuf};
19use std::sync::Arc;
20
21#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct ValuePointer {
24 pub file_id: u64,
26 pub offset: u64,
28 pub length: u32,
30 pub checksum: u32,
32}
33
34impl ValuePointer {
35 pub fn new(file_id: u64, offset: u64, length: u32, checksum: u32) -> Self {
37 Self {
38 file_id,
39 offset,
40 length,
41 checksum,
42 }
43 }
44
45 pub fn encode(&self) -> Vec<u8> {
47 let mut bytes = Vec::with_capacity(24);
48 bytes.extend_from_slice(&self.file_id.to_le_bytes());
49 bytes.extend_from_slice(&self.offset.to_le_bytes());
50 bytes.extend_from_slice(&self.length.to_le_bytes());
51 bytes.extend_from_slice(&self.checksum.to_le_bytes());
52 bytes
53 }
54
55 pub fn decode(bytes: &[u8]) -> Result<Self> {
57 if bytes.len() < 24 {
58 return Err(AmateRSError::SerializationError(ErrorContext::new(
59 "ValuePointer too short",
60 )));
61 }
62
63 let file_id = u64::from_le_bytes(bytes[0..8].try_into().map_err(|_| {
64 AmateRSError::SerializationError(ErrorContext::new("Failed to read file_id"))
65 })?);
66
67 let offset = u64::from_le_bytes(bytes[8..16].try_into().map_err(|_| {
68 AmateRSError::SerializationError(ErrorContext::new("Failed to read offset"))
69 })?);
70
71 let length = u32::from_le_bytes(bytes[16..20].try_into().map_err(|_| {
72 AmateRSError::SerializationError(ErrorContext::new("Failed to read length"))
73 })?);
74
75 let checksum = u32::from_le_bytes(bytes[20..24].try_into().map_err(|_| {
76 AmateRSError::SerializationError(ErrorContext::new("Failed to read checksum"))
77 })?);
78
79 Ok(Self {
80 file_id,
81 offset,
82 length,
83 checksum,
84 })
85 }
86}
87
88#[derive(Debug, Clone)]
90pub struct ValueLogConfig {
91 pub vlog_dir: PathBuf,
93 pub max_file_size: u64,
95 pub value_threshold: usize,
97 pub sync_on_write: bool,
99 pub gc_threshold: f64,
101}
102
103impl Default for ValueLogConfig {
104 fn default() -> Self {
105 Self {
106 vlog_dir: PathBuf::from("./vlog"),
107 max_file_size: 1024 * 1024 * 1024, value_threshold: 1024, sync_on_write: false,
110 gc_threshold: 0.5,
111 }
112 }
113}
114
115struct VLogEntry {
117 key: Key,
119 value: CipherBlob,
121 checksum: u32,
123}
124
125impl VLogEntry {
126 fn new(key: Key, value: CipherBlob) -> Self {
127 let mut hasher = crc32fast::Hasher::new();
128 hasher.update(key.as_bytes());
129 hasher.update(value.as_bytes());
130 let checksum = hasher.finalize();
131
132 Self {
133 key,
134 value,
135 checksum,
136 }
137 }
138
139 fn encode(&self) -> Vec<u8> {
140 let mut bytes = Vec::new();
141
142 bytes.extend_from_slice(&0x564C4F47u32.to_le_bytes());
144
145 bytes.extend_from_slice(&(self.key.len() as u32).to_le_bytes());
147 bytes.extend_from_slice(self.key.as_bytes());
148
149 bytes.extend_from_slice(&(self.value.len() as u32).to_le_bytes());
151 bytes.extend_from_slice(self.value.as_bytes());
152
153 bytes.extend_from_slice(&self.checksum.to_le_bytes());
155
156 bytes
157 }
158
159 fn decode(bytes: &[u8]) -> Result<Self> {
160 if bytes.len() < 16 {
161 return Err(AmateRSError::SerializationError(ErrorContext::new(
162 "VLogEntry too short",
163 )));
164 }
165
166 let mut offset = 0;
167
168 let magic = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
170 if magic != 0x564C4F47 {
171 return Err(AmateRSError::SerializationError(ErrorContext::new(
172 "Invalid vLog entry magic number",
173 )));
174 }
175 offset += 4;
176
177 let key_len = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
179 AmateRSError::SerializationError(ErrorContext::new("Failed to read key length"))
180 })?) as usize;
181 offset += 4;
182
183 let key_bytes = &bytes[offset..offset + key_len];
184 let key = Key::from_slice(key_bytes);
185 offset += key_len;
186
187 let value_len = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
189 AmateRSError::SerializationError(ErrorContext::new("Failed to read value length"))
190 })?) as usize;
191 offset += 4;
192
193 let value_bytes = &bytes[offset..offset + value_len];
194 let value = CipherBlob::new(value_bytes.to_vec());
195 offset += value_len;
196
197 let checksum = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
199 AmateRSError::SerializationError(ErrorContext::new("Failed to read checksum"))
200 })?);
201
202 let entry = Self {
203 key,
204 value,
205 checksum,
206 };
207
208 let mut hasher = crc32fast::Hasher::new();
210 hasher.update(entry.key.as_bytes());
211 hasher.update(entry.value.as_bytes());
212 let calculated = hasher.finalize();
213
214 if calculated != entry.checksum {
215 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
216 "vLog entry checksum mismatch: expected {}, got {}",
217 entry.checksum, calculated
218 ))));
219 }
220
221 Ok(entry)
222 }
223}
224
225pub struct ValueLog {
227 config: ValueLogConfig,
229 current_file_id: Arc<RwLock<u64>>,
231 writer: Arc<RwLock<BufWriter<File>>>,
233 current_offset: Arc<RwLock<u64>>,
235 current_size: Arc<RwLock<u64>>,
237}
238
239impl ValueLog {
240 pub fn new(vlog_dir: impl AsRef<Path>) -> Result<Self> {
242 let config = ValueLogConfig {
243 vlog_dir: vlog_dir.as_ref().to_path_buf(),
244 ..Default::default()
245 };
246 Self::with_config(config)
247 }
248
249 pub fn with_config(config: ValueLogConfig) -> Result<Self> {
251 std::fs::create_dir_all(&config.vlog_dir).map_err(|e| {
253 AmateRSError::IoError(ErrorContext::new(format!(
254 "Failed to create vLog directory: {}",
255 e
256 )))
257 })?;
258
259 let file_id = Self::find_latest_vlog(&config)?;
261 let file_path = Self::vlog_file_path(&config.vlog_dir, file_id);
262
263 let file = OpenOptions::new()
264 .create(true)
265 .append(true)
266 .open(&file_path)
267 .map_err(|e| {
268 AmateRSError::IoError(ErrorContext::new(format!("Failed to open vLog: {}", e)))
269 })?;
270
271 let current_size = file
272 .metadata()
273 .map_err(|e| {
274 AmateRSError::IoError(ErrorContext::new(format!(
275 "Failed to get vLog file size: {}",
276 e
277 )))
278 })?
279 .len();
280
281 Ok(Self {
282 config,
283 current_file_id: Arc::new(RwLock::new(file_id)),
284 writer: Arc::new(RwLock::new(BufWriter::new(file))),
285 current_offset: Arc::new(RwLock::new(current_size)),
286 current_size: Arc::new(RwLock::new(current_size)),
287 })
288 }
289
290 pub fn should_separate(&self, value: &CipherBlob) -> bool {
292 value.len() > self.config.value_threshold
293 }
294
295 pub fn append(&self, key: Key, value: CipherBlob) -> Result<ValuePointer> {
297 let entry = VLogEntry::new(key, value);
298 let entry_bytes = entry.encode();
299 let entry_len = entry_bytes.len() as u64;
300
301 let file_id = *self.current_file_id.read();
303 let offset = *self.current_offset.read();
304
305 {
307 let mut writer = self.writer.write();
308 writer.write_all(&entry_bytes).map_err(|e| {
309 AmateRSError::IoError(ErrorContext::new(format!(
310 "Failed to write vLog entry: {}",
311 e
312 )))
313 })?;
314
315 if self.config.sync_on_write {
316 writer.flush().map_err(|e| {
317 AmateRSError::IoError(ErrorContext::new(format!("Failed to flush vLog: {}", e)))
318 })?;
319 }
320 }
321
322 {
324 let mut current_offset = self.current_offset.write();
325 *current_offset += entry_len;
326 }
327 {
328 let mut current_size = self.current_size.write();
329 *current_size += entry_len;
330 }
331
332 if *self.current_size.read() >= self.config.max_file_size {
334 self.rotate()?;
335 }
336
337 let pointer = ValuePointer::new(file_id, offset, entry_bytes.len() as u32, entry.checksum);
339
340 Ok(pointer)
341 }
342
343 pub fn read(&self, pointer: &ValuePointer) -> Result<CipherBlob> {
345 let file_path = Self::vlog_file_path(&self.config.vlog_dir, pointer.file_id);
346
347 let mut file = File::open(&file_path).map_err(|e| {
349 AmateRSError::IoError(ErrorContext::new(format!(
350 "Failed to open vLog file for reading: {}",
351 e
352 )))
353 })?;
354
355 file.seek(SeekFrom::Start(pointer.offset)).map_err(|e| {
357 AmateRSError::IoError(ErrorContext::new(format!(
358 "Failed to seek vLog file: {}",
359 e
360 )))
361 })?;
362
363 let mut entry_bytes = vec![0u8; pointer.length as usize];
365 file.read_exact(&mut entry_bytes).map_err(|e| {
366 AmateRSError::IoError(ErrorContext::new(format!(
367 "Failed to read vLog entry: {}",
368 e
369 )))
370 })?;
371
372 let entry = VLogEntry::decode(&entry_bytes)?;
374
375 Ok(entry.value)
376 }
377
378 fn rotate(&self) -> Result<()> {
380 {
382 let mut writer = self.writer.write();
383 writer.flush().map_err(|e| {
384 AmateRSError::IoError(ErrorContext::new(format!("Failed to flush vLog: {}", e)))
385 })?;
386 }
387
388 let new_file_id = {
390 let mut file_id = self.current_file_id.write();
391 *file_id += 1;
392 *file_id
393 };
394
395 let new_path = Self::vlog_file_path(&self.config.vlog_dir, new_file_id);
397 let file = OpenOptions::new()
398 .create(true)
399 .append(true)
400 .open(&new_path)
401 .map_err(|e| {
402 AmateRSError::IoError(ErrorContext::new(format!(
403 "Failed to create new vLog file: {}",
404 e
405 )))
406 })?;
407
408 {
410 let mut writer = self.writer.write();
411 *writer = BufWriter::new(file);
412 }
413
414 {
416 let mut offset = self.current_offset.write();
417 *offset = 0;
418 }
419 {
420 let mut size = self.current_size.write();
421 *size = 0;
422 }
423
424 Ok(())
425 }
426
427 fn find_latest_vlog(config: &ValueLogConfig) -> Result<u64> {
429 let mut max_file_id = 0u64;
430
431 if config.vlog_dir.exists() {
432 let entries = std::fs::read_dir(&config.vlog_dir).map_err(|e| {
433 AmateRSError::IoError(ErrorContext::new(format!(
434 "Failed to read vLog directory: {}",
435 e
436 )))
437 })?;
438
439 for entry in entries {
440 let entry = entry.map_err(|e| {
441 AmateRSError::IoError(ErrorContext::new(format!(
442 "Failed to read directory entry: {}",
443 e
444 )))
445 })?;
446
447 let file_name = entry.file_name();
448 let name = file_name.to_string_lossy();
449
450 if name.starts_with("vlog_") && name.ends_with(".log") {
452 if let Ok(number) = name[5..name.len() - 4].parse::<u64>() {
453 if number > max_file_id {
454 max_file_id = number;
455 }
456 }
457 }
458 }
459 }
460
461 Ok(max_file_id)
462 }
463
464 fn vlog_file_path(vlog_dir: &Path, file_id: u64) -> PathBuf {
466 vlog_dir.join(format!("vlog_{:08}.log", file_id))
467 }
468
469 pub fn flush(&self) -> Result<()> {
471 let mut writer = self.writer.write();
472 writer.flush().map_err(|e| {
473 AmateRSError::IoError(ErrorContext::new(format!("Failed to flush vLog: {}", e)))
474 })?;
475
476 writer.get_ref().sync_all().map_err(|e| {
477 AmateRSError::IoError(ErrorContext::new(format!("Failed to sync vLog: {}", e)))
478 })?;
479
480 Ok(())
481 }
482
483 pub fn current_file_id(&self) -> u64 {
485 *self.current_file_id.read()
486 }
487
488 pub fn config(&self) -> &ValueLogConfig {
490 &self.config
491 }
492
493 pub fn garbage_collect<F>(&self, file_id: u64, is_live_fn: F) -> Result<GcStats>
500 where
501 F: Fn(&Key) -> bool,
502 {
503 let file_path = Self::vlog_file_path(&self.config.vlog_dir, file_id);
504
505 let old_file = File::open(&file_path).map_err(|e| {
507 AmateRSError::IoError(ErrorContext::new(format!(
508 "Failed to open vLog file for GC: {}",
509 e
510 )))
511 })?;
512
513 let file_size = old_file
514 .metadata()
515 .map_err(|e| {
516 AmateRSError::IoError(ErrorContext::new(format!("Failed to get file size: {}", e)))
517 })?
518 .len();
519
520 let mut reader = BufReader::new(old_file);
521 let mut offset = 0u64;
522
523 let mut live_values = Vec::new();
524 let mut dead_count = 0usize;
525 let mut live_count = 0usize;
526
527 while offset < file_size {
529 let start_offset = offset;
531
532 let mut magic_bytes = [0u8; 4];
534 match reader.read_exact(&mut magic_bytes) {
535 Ok(()) => {}
536 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
537 break;
539 }
540 Err(e) => {
541 return Err(AmateRSError::IoError(ErrorContext::new(format!(
542 "Failed to read magic: {}",
543 e
544 ))));
545 }
546 }
547
548 let magic = u32::from_le_bytes(magic_bytes);
550 if magic != 0x564C4F47 {
551 break;
553 }
554
555 let mut key_len_bytes = [0u8; 4];
557 reader.read_exact(&mut key_len_bytes).map_err(|e| {
558 AmateRSError::IoError(ErrorContext::new(format!(
559 "Failed to read key length: {}",
560 e
561 )))
562 })?;
563 let key_len = u32::from_le_bytes(key_len_bytes) as usize;
564
565 let mut key_bytes = vec![0u8; key_len];
567 reader.read_exact(&mut key_bytes).map_err(|e| {
568 AmateRSError::IoError(ErrorContext::new(format!("Failed to read key: {}", e)))
569 })?;
570 let key = Key::from_slice(&key_bytes);
571
572 let mut value_len_bytes = [0u8; 4];
574 reader.read_exact(&mut value_len_bytes).map_err(|e| {
575 AmateRSError::IoError(ErrorContext::new(format!(
576 "Failed to read value length: {}",
577 e
578 )))
579 })?;
580 let value_len = u32::from_le_bytes(value_len_bytes) as usize;
581
582 let mut value_bytes = vec![0u8; value_len];
584 reader.read_exact(&mut value_bytes).map_err(|e| {
585 AmateRSError::IoError(ErrorContext::new(format!("Failed to read value: {}", e)))
586 })?;
587 let value = CipherBlob::new(value_bytes);
588
589 let mut checksum_bytes = [0u8; 4];
591 reader.read_exact(&mut checksum_bytes).map_err(|e| {
592 AmateRSError::IoError(ErrorContext::new(format!("Failed to read checksum: {}", e)))
593 })?;
594
595 let entry_size = 4 + 4 + key_len + 4 + value_len + 4;
597 offset += entry_size as u64;
598
599 if is_live_fn(&key) {
601 live_values.push((key, value));
602 live_count += 1;
603 } else {
604 dead_count += 1;
605 }
606 }
607
608 let new_file_id = Self::find_latest_vlog(&self.config)? + 1;
610 let new_file_path = Self::vlog_file_path(&self.config.vlog_dir, new_file_id);
611
612 let new_file = OpenOptions::new()
613 .create(true)
614 .write(true)
615 .truncate(true)
616 .open(&new_file_path)
617 .map_err(|e| {
618 AmateRSError::IoError(ErrorContext::new(format!(
619 "Failed to create new vLog file: {}",
620 e
621 )))
622 })?;
623
624 let mut new_writer = BufWriter::new(new_file);
625
626 for (key, value) in live_values {
627 let entry = VLogEntry::new(key, value);
628 let entry_bytes = entry.encode();
629 new_writer.write_all(&entry_bytes).map_err(|e| {
630 AmateRSError::IoError(ErrorContext::new(format!(
631 "Failed to write GC entry: {}",
632 e
633 )))
634 })?;
635 }
636
637 new_writer.flush().map_err(|e| {
638 AmateRSError::IoError(ErrorContext::new(format!("Failed to flush GC file: {}", e)))
639 })?;
640
641 std::fs::remove_file(&file_path).map_err(|e| {
643 AmateRSError::IoError(ErrorContext::new(format!(
644 "Failed to delete old vLog file: {}",
645 e
646 )))
647 })?;
648
649 Ok(GcStats {
650 file_id,
651 live_count,
652 dead_count,
653 reclaimed_bytes: file_size
654 - new_writer
655 .get_ref()
656 .metadata()
657 .map_err(|e| {
658 AmateRSError::IoError(ErrorContext::new(format!(
659 "Failed to get new file size: {}",
660 e
661 )))
662 })?
663 .len(),
664 })
665 }
666
667 pub fn calculate_garbage_ratio<F>(&self, file_id: u64, is_live_fn: F) -> Result<f64>
672 where
673 F: Fn(&Key) -> bool,
674 {
675 let file_path = Self::vlog_file_path(&self.config.vlog_dir, file_id);
676
677 let file = File::open(&file_path).map_err(|e| {
678 AmateRSError::IoError(ErrorContext::new(format!(
679 "Failed to open vLog file: {}",
680 e
681 )))
682 })?;
683
684 let file_size = file
685 .metadata()
686 .map_err(|e| {
687 AmateRSError::IoError(ErrorContext::new(format!("Failed to get file size: {}", e)))
688 })?
689 .len();
690
691 let mut reader = BufReader::new(file);
692 let mut offset = 0u64;
693
694 let mut live_bytes = 0u64;
695 let mut dead_bytes = 0u64;
696
697 while offset < file_size {
698 let start_offset = offset;
699
700 let mut magic_bytes = [0u8; 4];
702 match reader.read_exact(&mut magic_bytes) {
703 Ok(()) => {}
704 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
705 Err(e) => {
706 return Err(AmateRSError::IoError(ErrorContext::new(format!(
707 "Failed to read magic: {}",
708 e
709 ))));
710 }
711 }
712
713 let magic = u32::from_le_bytes(magic_bytes);
714 if magic != 0x564C4F47 {
715 break;
716 }
717
718 let mut key_len_bytes = [0u8; 4];
720 reader.read_exact(&mut key_len_bytes).map_err(|e| {
721 AmateRSError::IoError(ErrorContext::new(format!(
722 "Failed to read key length: {}",
723 e
724 )))
725 })?;
726 let key_len = u32::from_le_bytes(key_len_bytes) as usize;
727
728 let mut key_bytes = vec![0u8; key_len];
730 reader.read_exact(&mut key_bytes).map_err(|e| {
731 AmateRSError::IoError(ErrorContext::new(format!("Failed to read key: {}", e)))
732 })?;
733 let key = Key::from_slice(&key_bytes);
734
735 let mut value_len_bytes = [0u8; 4];
737 reader.read_exact(&mut value_len_bytes).map_err(|e| {
738 AmateRSError::IoError(ErrorContext::new(format!(
739 "Failed to read value length: {}",
740 e
741 )))
742 })?;
743 let value_len = u32::from_le_bytes(value_len_bytes) as usize;
744
745 let mut value_bytes = vec![0u8; value_len];
747 reader.read_exact(&mut value_bytes).map_err(|e| {
748 AmateRSError::IoError(ErrorContext::new(format!("Failed to read value: {}", e)))
749 })?;
750
751 let mut checksum_bytes = [0u8; 4];
753 reader.read_exact(&mut checksum_bytes).map_err(|e| {
754 AmateRSError::IoError(ErrorContext::new(format!("Failed to read checksum: {}", e)))
755 })?;
756
757 let entry_size = 4 + 4 + key_len + 4 + value_len + 4;
758 offset += entry_size as u64;
759
760 if is_live_fn(&key) {
761 live_bytes += entry_size as u64;
762 } else {
763 dead_bytes += entry_size as u64;
764 }
765 }
766
767 let total_bytes = live_bytes + dead_bytes;
768 if total_bytes == 0 {
769 Ok(0.0)
770 } else {
771 Ok(dead_bytes as f64 / total_bytes as f64)
772 }
773 }
774}
775
776#[derive(Debug, Clone)]
778pub struct GcStats {
779 pub file_id: u64,
781 pub live_count: usize,
783 pub dead_count: usize,
785 pub reclaimed_bytes: u64,
787}
788
789#[cfg(test)]
790mod tests {
791 use super::*;
792 use std::env;
793
794 #[test]
795 fn test_value_pointer_encode_decode() -> Result<()> {
796 let pointer = ValuePointer::new(42, 1024, 256, 0xDEADBEEF);
797
798 let bytes = pointer.encode();
799 let decoded = ValuePointer::decode(&bytes)?;
800
801 assert_eq!(decoded.file_id, 42);
802 assert_eq!(decoded.offset, 1024);
803 assert_eq!(decoded.length, 256);
804 assert_eq!(decoded.checksum, 0xDEADBEEF);
805
806 Ok(())
807 }
808
809 #[test]
810 fn test_vlog_entry_encode_decode() -> Result<()> {
811 let key = Key::from_str("test_key");
812 let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
813 let entry = VLogEntry::new(key.clone(), value.clone());
814
815 let bytes = entry.encode();
816 let decoded = VLogEntry::decode(&bytes)?;
817
818 assert_eq!(decoded.key, key);
819 assert_eq!(decoded.value, value);
820 assert_eq!(decoded.checksum, entry.checksum);
821
822 Ok(())
823 }
824
825 #[test]
826 fn test_value_log_basic_operations() -> Result<()> {
827 let temp_dir = env::temp_dir().join("test_vlog_basic");
828 std::fs::create_dir_all(&temp_dir).ok();
829
830 let vlog = ValueLog::new(&temp_dir)?;
831
832 let key = Key::from_str("key1");
833 let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
834
835 let pointer = vlog.append(key.clone(), value.clone())?;
837 vlog.flush()?; let read_value = vlog.read(&pointer)?;
841
842 assert_eq!(read_value, value);
843
844 std::fs::remove_dir_all(&temp_dir).ok();
845 Ok(())
846 }
847
848 #[test]
849 fn test_value_log_should_separate() -> Result<()> {
850 let temp_dir = env::temp_dir().join("test_vlog_should_separate");
851 std::fs::create_dir_all(&temp_dir).ok();
852
853 let vlog = ValueLog::new(&temp_dir)?;
854
855 let small = CipherBlob::new(vec![0u8; 512]);
857 assert!(!vlog.should_separate(&small));
858
859 let large = CipherBlob::new(vec![0u8; 2048]);
861 assert!(vlog.should_separate(&large));
862
863 std::fs::remove_dir_all(&temp_dir).ok();
864 Ok(())
865 }
866
867 #[test]
868 fn test_value_log_multiple_values() -> Result<()> {
869 let temp_dir = env::temp_dir().join("test_vlog_multiple");
870 std::fs::create_dir_all(&temp_dir).ok();
871
872 let vlog = ValueLog::new(&temp_dir)?;
873
874 let mut pointers = Vec::new();
875
876 for i in 0..10 {
878 let key = Key::from_str(&format!("key_{}", i));
879 let value = CipherBlob::new(vec![i as u8; 1000]);
880 let pointer = vlog.append(key, value)?;
881 pointers.push((pointer, i as u8));
882 }
883
884 vlog.flush()?; for (pointer, expected_byte) in pointers {
888 let value = vlog.read(&pointer)?;
889 assert_eq!(value.as_bytes()[0], expected_byte);
890 }
891
892 std::fs::remove_dir_all(&temp_dir).ok();
893 Ok(())
894 }
895
896 #[test]
897 fn test_value_log_rotation() -> Result<()> {
898 let temp_dir = env::temp_dir().join("test_vlog_rotation");
899 std::fs::create_dir_all(&temp_dir).ok();
900
901 let config = ValueLogConfig {
902 vlog_dir: temp_dir.clone(),
903 max_file_size: 4096, sync_on_write: false,
905 ..Default::default()
906 };
907
908 let vlog = ValueLog::with_config(config)?;
909
910 let initial_file_id = vlog.current_file_id();
911
912 for i in 0..10 {
914 let key = Key::from_str(&format!("key_{}", i));
915 let value = CipherBlob::new(vec![i as u8; 1000]);
916 vlog.append(key, value)?;
917 }
918
919 assert!(vlog.current_file_id() > initial_file_id);
921
922 std::fs::remove_dir_all(&temp_dir).ok();
923 Ok(())
924 }
925
926 #[test]
927 fn test_value_log_garbage_collection() -> Result<()> {
928 let temp_dir = env::temp_dir().join("test_vlog_gc");
929 std::fs::create_dir_all(&temp_dir).ok();
930
931 let vlog = ValueLog::new(&temp_dir)?;
932
933 let mut keys = Vec::new();
935 for i in 0..10 {
936 let key = Key::from_str(&format!("key_{}", i));
937 let value = CipherBlob::new(vec![i as u8; 1000]);
938 vlog.append(key.clone(), value)?;
939 keys.push(key);
940 }
941
942 vlog.flush()?;
943
944 let file_id = vlog.current_file_id();
945
946 let is_live = |key: &Key| -> bool {
948 let key_str = String::from_utf8_lossy(key.as_bytes());
949 if let Some(num_str) = key_str.strip_prefix("key_") {
950 if let Ok(num) = num_str.parse::<usize>() {
951 return num < 5;
952 }
953 }
954 false
955 };
956
957 let ratio = vlog.calculate_garbage_ratio(file_id, is_live)?;
959 assert!(ratio > 0.4 && ratio < 0.6); let stats = vlog.garbage_collect(file_id, is_live)?;
963
964 assert_eq!(stats.live_count, 5);
965 assert_eq!(stats.dead_count, 5);
966 assert!(stats.reclaimed_bytes > 0);
967
968 std::fs::remove_dir_all(&temp_dir).ok();
969 Ok(())
970 }
971
972 #[test]
973 fn test_value_log_large_values() -> Result<()> {
974 let temp_dir = env::temp_dir().join("test_vlog_large");
975 std::fs::create_dir_all(&temp_dir).ok();
976
977 let vlog = ValueLog::new(&temp_dir)?;
978
979 let key = Key::from_str("large_key");
981 let large_value = CipherBlob::new(vec![42u8; 10_000]);
982
983 let pointer = vlog.append(key, large_value.clone())?;
984 vlog.flush()?;
985
986 let read_value = vlog.read(&pointer)?;
988
989 assert_eq!(read_value, large_value);
990 assert_eq!(read_value.len(), 10_000);
991
992 std::fs::remove_dir_all(&temp_dir).ok();
993 Ok(())
994 }
995}