1use crate::wal::config::{PREFIX_META_SIZE, checksum64, debug_print};
2use crate::wal::storage::SharedMmap;
3use rkyv::{Archive, Deserialize, Serialize};
4use std::sync::Arc;
5
6#[derive(Clone, Debug)]
7pub struct Entry {
8 pub data: Vec<u8>,
9}
10
11#[derive(Archive, Deserialize, Serialize, Debug)]
12#[archive(check_bytes)]
13pub(crate) struct Metadata {
14 pub(crate) read_size: usize,
15 pub(crate) owned_by: String,
16 pub(crate) next_block_start: u64,
17 pub(crate) checksum: u64,
18}
19
20#[derive(Clone, Debug)]
21pub struct Block {
22 pub(crate) id: u64,
23 pub(crate) file_path: String,
24 pub(crate) offset: u64,
25 pub(crate) limit: u64,
26 pub(crate) mmap: Arc<SharedMmap>,
27 pub(crate) used: u64,
28}
29
30impl Block {
31 pub(crate) fn write(
32 &self,
33 in_block_offset: u64,
34 data: &[u8],
35 owned_by: &str,
36 next_block_start: u64,
37 ) -> std::io::Result<()> {
38 debug_assert!(
39 in_block_offset + (data.len() as u64 + PREFIX_META_SIZE as u64) <= self.limit
40 );
41
42 let new_meta = Metadata {
43 read_size: data.len(),
44 owned_by: owned_by.to_string(),
45 next_block_start,
46 checksum: checksum64(data),
47 };
48
49 let meta_bytes = rkyv::to_bytes::<_, 256>(&new_meta).map_err(|e| {
50 std::io::Error::new(
51 std::io::ErrorKind::Other,
52 format!("serialize metadata failed: {:?}", e),
53 )
54 })?;
55 if meta_bytes.len() > PREFIX_META_SIZE - 2 {
56 return Err(std::io::Error::new(
57 std::io::ErrorKind::InvalidData,
58 "metadata too large",
59 ));
60 }
61
62 let mut meta_buffer = vec![0u8; PREFIX_META_SIZE];
63 meta_buffer[0] = (meta_bytes.len() & 0xFF) as u8;
65 meta_buffer[1] = ((meta_bytes.len() >> 8) & 0xFF) as u8;
66 meta_buffer[2..2 + meta_bytes.len()].copy_from_slice(&meta_bytes);
68
69 let mut combined = Vec::with_capacity(PREFIX_META_SIZE + data.len());
71 combined.extend_from_slice(&meta_buffer);
72 combined.extend_from_slice(data);
73
74 let file_offset = self.offset + in_block_offset;
75 self.mmap.write(file_offset as usize, &combined);
76 Ok(())
77 }
78
79 pub(crate) fn read(&self, in_block_offset: u64) -> std::io::Result<(Entry, usize)> {
80 let mut meta_buffer = vec![0; PREFIX_META_SIZE];
81 let file_offset = self.offset + in_block_offset;
82 self.mmap.read(file_offset as usize, &mut meta_buffer);
83
84 let meta_len = (meta_buffer[0] as usize) | ((meta_buffer[1] as usize) << 8);
86
87 if meta_len == 0 || meta_len > PREFIX_META_SIZE - 2 {
88 return Err(std::io::Error::new(
89 std::io::ErrorKind::InvalidData,
90 format!("invalid metadata length: {}", meta_len),
91 ));
92 }
93
94 let mut aligned = rkyv::AlignedVec::with_capacity(meta_len);
96 aligned.extend_from_slice(&meta_buffer[2..2 + meta_len]);
97
98 let archived = unsafe { rkyv::archived_root::<Metadata>(&aligned[..]) };
102 let meta: Metadata = archived.deserialize(&mut rkyv::Infallible).map_err(|_| {
103 std::io::Error::new(
104 std::io::ErrorKind::InvalidData,
105 "failed to deserialize metadata",
106 )
107 })?;
108 let actual_entry_size = meta.read_size;
109
110 let new_offset = file_offset + PREFIX_META_SIZE as u64;
112 let mut ret_buffer = vec![0; actual_entry_size];
113 self.mmap.read(new_offset as usize, &mut ret_buffer);
114
115 let expected = meta.checksum;
117 if checksum64(&ret_buffer) != expected {
118 debug_print!(
119 "[reader] checksum mismatch; skipping corrupted entry at offset={} in file={}, block_id={}",
120 in_block_offset,
121 self.file_path,
122 self.id
123 );
124 return Err(std::io::Error::new(
125 std::io::ErrorKind::InvalidData,
126 "checksum mismatch, data corruption detected",
127 ));
128 }
129
130 let consumed = PREFIX_META_SIZE + actual_entry_size;
131 Ok((Entry { data: ret_buffer }, consumed))
132 }
133
134 pub(crate) fn zero_range(&self, in_block_offset: u64, size: u64) -> std::io::Result<()> {
135 let len = size as usize;
138 if len == 0 {
139 return Ok(());
140 }
141 let zeros = vec![0u8; len];
142 let file_offset = self.offset + in_block_offset;
143 self.mmap.write(file_offset as usize, &zeros);
144 Ok(())
145 }
146}