1use crate::error::{AmateRSError, ErrorContext, Result};
14use crate::types::{CipherBlob, Key};
15use dashmap::DashMap;
16use parking_lot::RwLock;
17use std::fs::{File, OpenOptions};
18use std::io::{Read, Seek, SeekFrom, Write};
19use std::path::{Path, PathBuf};
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
22
23pub use super::value_log_gc::{GcConfig, GcResult, GcStats, SegmentStats};
25
26#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct ValuePointer {
29 pub file_id: u64,
31 pub offset: u64,
33 pub length: u32,
35 pub checksum: u32,
37}
38
39impl ValuePointer {
40 pub fn new(file_id: u64, offset: u64, length: u32, checksum: u32) -> Self {
42 Self {
43 file_id,
44 offset,
45 length,
46 checksum,
47 }
48 }
49
50 pub fn encode(&self) -> Vec<u8> {
52 let mut bytes = Vec::with_capacity(24);
53 bytes.extend_from_slice(&self.file_id.to_le_bytes());
54 bytes.extend_from_slice(&self.offset.to_le_bytes());
55 bytes.extend_from_slice(&self.length.to_le_bytes());
56 bytes.extend_from_slice(&self.checksum.to_le_bytes());
57 bytes
58 }
59
60 pub fn decode(bytes: &[u8]) -> Result<Self> {
62 if bytes.len() < 24 {
63 return Err(AmateRSError::SerializationError(ErrorContext::new(
64 "ValuePointer too short",
65 )));
66 }
67
68 let file_id = u64::from_le_bytes(bytes[0..8].try_into().map_err(|_| {
69 AmateRSError::SerializationError(ErrorContext::new("Failed to read file_id"))
70 })?);
71
72 let offset = u64::from_le_bytes(bytes[8..16].try_into().map_err(|_| {
73 AmateRSError::SerializationError(ErrorContext::new("Failed to read offset"))
74 })?);
75
76 let length = u32::from_le_bytes(bytes[16..20].try_into().map_err(|_| {
77 AmateRSError::SerializationError(ErrorContext::new("Failed to read length"))
78 })?);
79
80 let checksum = u32::from_le_bytes(bytes[20..24].try_into().map_err(|_| {
81 AmateRSError::SerializationError(ErrorContext::new("Failed to read checksum"))
82 })?);
83
84 Ok(Self {
85 file_id,
86 offset,
87 length,
88 checksum,
89 })
90 }
91}
92
93#[derive(Debug, Clone)]
95pub struct ValueLogConfig {
96 pub vlog_dir: PathBuf,
98 pub max_file_size: u64,
100 pub value_threshold: usize,
102 pub sync_on_write: bool,
104 pub gc_threshold: f64,
106}
107
108impl Default for ValueLogConfig {
109 fn default() -> Self {
110 Self {
111 vlog_dir: PathBuf::from("./vlog"),
112 max_file_size: 1024 * 1024 * 1024, value_threshold: 1024, sync_on_write: false,
115 gc_threshold: 0.5,
116 }
117 }
118}
119
120pub(crate) struct VLogEntry {
122 pub(crate) key: Key,
124 pub(crate) value: CipherBlob,
126 pub(crate) checksum: u32,
128}
129
130impl VLogEntry {
131 pub(crate) fn new(key: Key, value: CipherBlob) -> Self {
132 let mut hasher = crc32fast::Hasher::new();
133 hasher.update(key.as_bytes());
134 hasher.update(value.as_bytes());
135 let checksum = hasher.finalize();
136
137 Self {
138 key,
139 value,
140 checksum,
141 }
142 }
143
144 pub(crate) fn encode(&self) -> Vec<u8> {
145 let mut bytes = Vec::new();
146
147 bytes.extend_from_slice(&0x564C4F47u32.to_le_bytes());
149
150 bytes.extend_from_slice(&(self.key.len() as u32).to_le_bytes());
152 bytes.extend_from_slice(self.key.as_bytes());
153
154 bytes.extend_from_slice(&(self.value.len() as u32).to_le_bytes());
156 bytes.extend_from_slice(self.value.as_bytes());
157
158 bytes.extend_from_slice(&self.checksum.to_le_bytes());
160
161 bytes
162 }
163
164 fn decode(bytes: &[u8]) -> Result<Self> {
165 if bytes.len() < 16 {
166 return Err(AmateRSError::SerializationError(ErrorContext::new(
167 "VLogEntry too short",
168 )));
169 }
170
171 let mut offset = 0;
172
173 let magic = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
175 if magic != 0x564C4F47 {
176 return Err(AmateRSError::SerializationError(ErrorContext::new(
177 "Invalid vLog entry magic number",
178 )));
179 }
180 offset += 4;
181
182 let key_len = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
184 AmateRSError::SerializationError(ErrorContext::new("Failed to read key length"))
185 })?) as usize;
186 offset += 4;
187
188 let key_bytes = &bytes[offset..offset + key_len];
189 let key = Key::from_slice(key_bytes);
190 offset += key_len;
191
192 let value_len = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
194 AmateRSError::SerializationError(ErrorContext::new("Failed to read value length"))
195 })?) as usize;
196 offset += 4;
197
198 let value_bytes = &bytes[offset..offset + value_len];
199 let value = CipherBlob::new(value_bytes.to_vec());
200 offset += value_len;
201
202 let checksum = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
204 AmateRSError::SerializationError(ErrorContext::new("Failed to read checksum"))
205 })?);
206
207 let entry = Self {
208 key,
209 value,
210 checksum,
211 };
212
213 let mut hasher = crc32fast::Hasher::new();
215 hasher.update(entry.key.as_bytes());
216 hasher.update(entry.value.as_bytes());
217 let calculated = hasher.finalize();
218
219 if calculated != entry.checksum {
220 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
221 "vLog entry checksum mismatch: expected {}, got {}",
222 entry.checksum, calculated
223 ))));
224 }
225
226 Ok(entry)
227 }
228}
229
230pub struct ValueLog {
232 pub(crate) config: ValueLogConfig,
234 pub(crate) gc_config: GcConfig,
236 pub(crate) current_file_id: Arc<RwLock<u64>>,
238 pub(crate) writer: Arc<RwLock<std::io::BufWriter<File>>>,
240 pub(crate) current_offset: Arc<RwLock<u64>>,
242 pub(crate) current_size: Arc<RwLock<u64>>,
244 pub(crate) segment_stats: Arc<DashMap<u64, SegmentStats>>,
246 pub(crate) gc_running: Arc<AtomicBool>,
248 pub(crate) segment_readers: Arc<DashMap<u64, Arc<RwLock<()>>>>,
250 pub(crate) last_write_time: Arc<AtomicU64>,
252}
253
254impl ValueLog {
255 pub fn new(vlog_dir: impl AsRef<Path>) -> Result<Self> {
257 let config = ValueLogConfig {
258 vlog_dir: vlog_dir.as_ref().to_path_buf(),
259 ..Default::default()
260 };
261 Self::with_config(config)
262 }
263
264 pub fn with_config(config: ValueLogConfig) -> Result<Self> {
266 Self::with_config_and_gc(config, GcConfig::default())
267 }
268
269 pub fn with_config_and_gc(config: ValueLogConfig, gc_config: GcConfig) -> Result<Self> {
271 std::fs::create_dir_all(&config.vlog_dir).map_err(|e| {
273 AmateRSError::IoError(ErrorContext::new(format!(
274 "Failed to create vLog directory: {}",
275 e
276 )))
277 })?;
278
279 let file_id = Self::find_latest_vlog(&config)?;
281 let file_path = Self::vlog_file_path(&config.vlog_dir, file_id);
282
283 let file = OpenOptions::new()
284 .create(true)
285 .append(true)
286 .open(&file_path)
287 .map_err(|e| {
288 AmateRSError::IoError(ErrorContext::new(format!("Failed to open vLog: {}", e)))
289 })?;
290
291 let current_size = file
292 .metadata()
293 .map_err(|e| {
294 AmateRSError::IoError(ErrorContext::new(format!(
295 "Failed to get vLog file size: {}",
296 e
297 )))
298 })?
299 .len();
300
301 let segment_stats = Arc::new(DashMap::new());
302 let mut initial_stats = SegmentStats::new();
304 initial_stats.total_bytes = current_size;
305 initial_stats.live_bytes = current_size;
306 segment_stats.insert(file_id, initial_stats);
307
308 let segment_readers = Arc::new(DashMap::new());
309 segment_readers.insert(file_id, Arc::new(RwLock::new(())));
310
311 let now_millis = std::time::SystemTime::now()
312 .duration_since(std::time::UNIX_EPOCH)
313 .map(|d| d.as_millis() as u64)
314 .unwrap_or(0);
315
316 Ok(Self {
317 config,
318 gc_config,
319 current_file_id: Arc::new(RwLock::new(file_id)),
320 writer: Arc::new(RwLock::new(std::io::BufWriter::new(file))),
321 current_offset: Arc::new(RwLock::new(current_size)),
322 current_size: Arc::new(RwLock::new(current_size)),
323 segment_stats,
324 gc_running: Arc::new(AtomicBool::new(false)),
325 segment_readers,
326 last_write_time: Arc::new(AtomicU64::new(now_millis)),
327 })
328 }
329
330 pub fn should_separate(&self, value: &CipherBlob) -> bool {
332 value.len() > self.config.value_threshold
333 }
334
335 pub fn append(&self, key: Key, value: CipherBlob) -> Result<ValuePointer> {
337 let entry = VLogEntry::new(key, value);
338 let entry_bytes = entry.encode();
339 let entry_len = entry_bytes.len() as u64;
340
341 let file_id = *self.current_file_id.read();
343 let offset = *self.current_offset.read();
344
345 {
347 let mut writer = self.writer.write();
348 writer.write_all(&entry_bytes).map_err(|e| {
349 AmateRSError::IoError(ErrorContext::new(format!(
350 "Failed to write vLog entry: {}",
351 e
352 )))
353 })?;
354
355 if self.config.sync_on_write {
356 writer.flush().map_err(|e| {
357 AmateRSError::IoError(ErrorContext::new(format!("Failed to flush vLog: {}", e)))
358 })?;
359 }
360 }
361
362 {
364 let mut current_offset = self.current_offset.write();
365 *current_offset += entry_len;
366 }
367 {
368 let mut current_size = self.current_size.write();
369 *current_size += entry_len;
370 }
371
372 {
374 let mut stats = self
375 .segment_stats
376 .entry(file_id)
377 .or_insert_with(SegmentStats::new);
378 stats.record_write(entry_len);
379 }
380
381 {
383 let now_millis = std::time::SystemTime::now()
384 .duration_since(std::time::UNIX_EPOCH)
385 .map(|d| d.as_millis() as u64)
386 .unwrap_or(0);
387 self.last_write_time.store(now_millis, Ordering::Release);
388 }
389
390 if *self.current_size.read() >= self.config.max_file_size {
392 self.rotate()?;
393 }
394
395 let pointer = ValuePointer::new(file_id, offset, entry_bytes.len() as u32, entry.checksum);
397
398 Ok(pointer)
399 }
400
401 pub fn read(&self, pointer: &ValuePointer) -> Result<CipherBlob> {
403 let file_path = Self::vlog_file_path(&self.config.vlog_dir, pointer.file_id);
404
405 let reader_lock = self
407 .segment_readers
408 .entry(pointer.file_id)
409 .or_insert_with(|| Arc::new(RwLock::new(())))
410 .clone();
411 let _read_guard = reader_lock.read();
412
413 let mut file = File::open(&file_path).map_err(|e| {
415 AmateRSError::IoError(ErrorContext::new(format!(
416 "Failed to open vLog file for reading: {}",
417 e
418 )))
419 })?;
420
421 file.seek(SeekFrom::Start(pointer.offset)).map_err(|e| {
423 AmateRSError::IoError(ErrorContext::new(format!(
424 "Failed to seek vLog file: {}",
425 e
426 )))
427 })?;
428
429 let mut entry_bytes = vec![0u8; pointer.length as usize];
431 file.read_exact(&mut entry_bytes).map_err(|e| {
432 AmateRSError::IoError(ErrorContext::new(format!(
433 "Failed to read vLog entry: {}",
434 e
435 )))
436 })?;
437
438 let entry = VLogEntry::decode(&entry_bytes)?;
440
441 Ok(entry.value)
442 }
443
444 pub(crate) fn rotate(&self) -> Result<()> {
446 {
448 let mut writer = self.writer.write();
449 writer.flush().map_err(|e| {
450 AmateRSError::IoError(ErrorContext::new(format!("Failed to flush vLog: {}", e)))
451 })?;
452 }
453
454 let new_file_id = {
456 let mut file_id = self.current_file_id.write();
457 *file_id += 1;
458 *file_id
459 };
460
461 let new_path = Self::vlog_file_path(&self.config.vlog_dir, new_file_id);
463 let file = OpenOptions::new()
464 .create(true)
465 .append(true)
466 .open(&new_path)
467 .map_err(|e| {
468 AmateRSError::IoError(ErrorContext::new(format!(
469 "Failed to create new vLog file: {}",
470 e
471 )))
472 })?;
473
474 {
476 let mut writer = self.writer.write();
477 *writer = std::io::BufWriter::new(file);
478 }
479
480 {
482 let mut offset = self.current_offset.write();
483 *offset = 0;
484 }
485 {
486 let mut size = self.current_size.write();
487 *size = 0;
488 }
489
490 self.segment_stats.insert(new_file_id, SegmentStats::new());
492 self.segment_readers
493 .insert(new_file_id, Arc::new(RwLock::new(())));
494
495 Ok(())
496 }
497
498 pub(crate) fn find_latest_vlog(config: &ValueLogConfig) -> Result<u64> {
500 let mut max_file_id = 0u64;
501
502 if config.vlog_dir.exists() {
503 let entries = std::fs::read_dir(&config.vlog_dir).map_err(|e| {
504 AmateRSError::IoError(ErrorContext::new(format!(
505 "Failed to read vLog directory: {}",
506 e
507 )))
508 })?;
509
510 for entry in entries {
511 let entry = entry.map_err(|e| {
512 AmateRSError::IoError(ErrorContext::new(format!(
513 "Failed to read directory entry: {}",
514 e
515 )))
516 })?;
517
518 let file_name = entry.file_name();
519 let name = file_name.to_string_lossy();
520
521 if name.starts_with("vlog_") && name.ends_with(".log") {
523 if let Ok(number) = name[5..name.len() - 4].parse::<u64>() {
524 if number > max_file_id {
525 max_file_id = number;
526 }
527 }
528 }
529 }
530 }
531
532 Ok(max_file_id)
533 }
534
535 pub(crate) fn vlog_file_path(vlog_dir: &Path, file_id: u64) -> PathBuf {
537 vlog_dir.join(format!("vlog_{:08}.log", file_id))
538 }
539
540 pub fn flush(&self) -> Result<()> {
542 let mut writer = self.writer.write();
543 writer.flush().map_err(|e| {
544 AmateRSError::IoError(ErrorContext::new(format!("Failed to flush vLog: {}", e)))
545 })?;
546
547 writer.get_ref().sync_all().map_err(|e| {
548 AmateRSError::IoError(ErrorContext::new(format!("Failed to sync vLog: {}", e)))
549 })?;
550
551 Ok(())
552 }
553
554 pub fn current_file_id(&self) -> u64 {
556 *self.current_file_id.read()
557 }
558
559 pub fn config(&self) -> &ValueLogConfig {
561 &self.config
562 }
563
564 pub fn last_write_time_millis(&self) -> u64 {
566 self.last_write_time.load(Ordering::Acquire)
567 }
568
569 pub fn time_since_last_write(&self) -> std::time::Duration {
571 let last_millis = self.last_write_time_millis();
572 let now_millis = std::time::SystemTime::now()
573 .duration_since(std::time::UNIX_EPOCH)
574 .map(|d| d.as_millis() as u64)
575 .unwrap_or(0);
576 let elapsed_millis = now_millis.saturating_sub(last_millis);
577 std::time::Duration::from_millis(elapsed_millis)
578 }
579}
580
581#[cfg(test)]
582mod tests {
583 use super::*;
584 use std::env;
585
586 #[test]
587 fn test_value_pointer_encode_decode() -> Result<()> {
588 let pointer = ValuePointer::new(42, 1024, 256, 0xDEADBEEF);
589
590 let bytes = pointer.encode();
591 let decoded = ValuePointer::decode(&bytes)?;
592
593 assert_eq!(decoded.file_id, 42);
594 assert_eq!(decoded.offset, 1024);
595 assert_eq!(decoded.length, 256);
596 assert_eq!(decoded.checksum, 0xDEADBEEF);
597
598 Ok(())
599 }
600
601 #[test]
602 fn test_vlog_entry_encode_decode() -> Result<()> {
603 let key = Key::from_str("test_key");
604 let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
605 let entry = VLogEntry::new(key.clone(), value.clone());
606
607 let bytes = entry.encode();
608 let decoded = VLogEntry::decode(&bytes)?;
609
610 assert_eq!(decoded.key, key);
611 assert_eq!(decoded.value, value);
612 assert_eq!(decoded.checksum, entry.checksum);
613
614 Ok(())
615 }
616
617 #[test]
618 fn test_value_log_basic_operations() -> Result<()> {
619 let temp_dir = env::temp_dir().join("test_vlog_basic");
620 std::fs::create_dir_all(&temp_dir).ok();
621
622 let vlog = ValueLog::new(&temp_dir)?;
623
624 let key = Key::from_str("key1");
625 let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
626
627 let pointer = vlog.append(key.clone(), value.clone())?;
629 vlog.flush()?; let read_value = vlog.read(&pointer)?;
633
634 assert_eq!(read_value, value);
635
636 std::fs::remove_dir_all(&temp_dir).ok();
637 Ok(())
638 }
639
640 #[test]
641 fn test_value_log_should_separate() -> Result<()> {
642 let temp_dir = env::temp_dir().join("test_vlog_should_separate");
643 std::fs::create_dir_all(&temp_dir).ok();
644
645 let vlog = ValueLog::new(&temp_dir)?;
646
647 let small = CipherBlob::new(vec![0u8; 512]);
649 assert!(!vlog.should_separate(&small));
650
651 let large = CipherBlob::new(vec![0u8; 2048]);
653 assert!(vlog.should_separate(&large));
654
655 std::fs::remove_dir_all(&temp_dir).ok();
656 Ok(())
657 }
658
659 #[test]
660 fn test_value_log_multiple_values() -> Result<()> {
661 let temp_dir = env::temp_dir().join("test_vlog_multiple");
662 std::fs::create_dir_all(&temp_dir).ok();
663
664 let vlog = ValueLog::new(&temp_dir)?;
665
666 let mut pointers = Vec::new();
667
668 for i in 0..10 {
670 let key = Key::from_str(&format!("key_{}", i));
671 let value = CipherBlob::new(vec![i as u8; 1000]);
672 let pointer = vlog.append(key, value)?;
673 pointers.push((pointer, i as u8));
674 }
675
676 vlog.flush()?; for (pointer, expected_byte) in pointers {
680 let value = vlog.read(&pointer)?;
681 assert_eq!(value.as_bytes()[0], expected_byte);
682 }
683
684 std::fs::remove_dir_all(&temp_dir).ok();
685 Ok(())
686 }
687
688 #[test]
689 fn test_value_log_rotation() -> Result<()> {
690 let temp_dir = env::temp_dir().join("test_vlog_rotation");
691 std::fs::create_dir_all(&temp_dir).ok();
692
693 let config = ValueLogConfig {
694 vlog_dir: temp_dir.clone(),
695 max_file_size: 4096, sync_on_write: false,
697 ..Default::default()
698 };
699
700 let vlog = ValueLog::with_config(config)?;
701
702 let initial_file_id = vlog.current_file_id();
703
704 for i in 0..10 {
706 let key = Key::from_str(&format!("key_{}", i));
707 let value = CipherBlob::new(vec![i as u8; 1000]);
708 vlog.append(key, value)?;
709 }
710
711 assert!(vlog.current_file_id() > initial_file_id);
713
714 std::fs::remove_dir_all(&temp_dir).ok();
715 Ok(())
716 }
717
718 #[test]
719 fn test_value_log_garbage_collection() -> Result<()> {
720 let temp_dir = env::temp_dir().join("test_vlog_gc");
721 std::fs::create_dir_all(&temp_dir).ok();
722
723 let vlog = ValueLog::new(&temp_dir)?;
724
725 let mut keys = Vec::new();
727 for i in 0..10 {
728 let key = Key::from_str(&format!("key_{}", i));
729 let value = CipherBlob::new(vec![i as u8; 1000]);
730 vlog.append(key.clone(), value)?;
731 keys.push(key);
732 }
733
734 vlog.flush()?;
735
736 let file_id = vlog.current_file_id();
737
738 let is_live = |key: &Key| -> bool {
740 let key_str = String::from_utf8_lossy(key.as_bytes());
741 if let Some(num_str) = key_str.strip_prefix("key_") {
742 if let Ok(num) = num_str.parse::<usize>() {
743 return num < 5;
744 }
745 }
746 false
747 };
748
749 let ratio = vlog.calculate_garbage_ratio(file_id, is_live)?;
751 assert!(ratio > 0.4 && ratio < 0.6); let stats = vlog.garbage_collect_file(file_id, is_live)?;
755
756 assert_eq!(stats.live_count, 5);
757 assert_eq!(stats.dead_count, 5);
758 assert!(stats.reclaimed_bytes > 0);
759
760 std::fs::remove_dir_all(&temp_dir).ok();
761 Ok(())
762 }
763
764 #[test]
765 fn test_value_log_large_values() -> Result<()> {
766 let temp_dir = env::temp_dir().join("test_vlog_large");
767 std::fs::create_dir_all(&temp_dir).ok();
768
769 let vlog = ValueLog::new(&temp_dir)?;
770
771 let key = Key::from_str("large_key");
773 let large_value = CipherBlob::new(vec![42u8; 10_000]);
774
775 let pointer = vlog.append(key, large_value.clone())?;
776 vlog.flush()?;
777
778 let read_value = vlog.read(&pointer)?;
780
781 assert_eq!(read_value, large_value);
782 assert_eq!(read_value.len(), 10_000);
783
784 std::fs::remove_dir_all(&temp_dir).ok();
785 Ok(())
786 }
787}