walrus_rust/wal/
block.rs

1use crate::wal::config::{PREFIX_META_SIZE, checksum64, debug_print};
2use crate::wal::storage::SharedMmap;
3use rkyv::{Archive, Deserialize, Serialize};
4use std::sync::Arc;
5
6#[derive(Clone, Debug)]
7pub struct Entry {
8    pub data: Vec<u8>,
9}
10
11#[derive(Archive, Deserialize, Serialize, Debug)]
12#[archive(check_bytes)]
13pub(crate) struct Metadata {
14    pub(crate) read_size: usize,
15    pub(crate) owned_by: String,
16    pub(crate) next_block_start: u64,
17    pub(crate) checksum: u64,
18}
19
20#[derive(Clone, Debug)]
21pub struct Block {
22    pub(crate) id: u64,
23    pub(crate) file_path: String,
24    pub(crate) offset: u64,
25    pub(crate) limit: u64,
26    pub(crate) mmap: Arc<SharedMmap>,
27    pub(crate) used: u64,
28}
29
30impl Block {
31    pub(crate) fn write(
32        &self,
33        in_block_offset: u64,
34        data: &[u8],
35        owned_by: &str,
36        next_block_start: u64,
37    ) -> std::io::Result<()> {
38        debug_assert!(
39            in_block_offset + (data.len() as u64 + PREFIX_META_SIZE as u64) <= self.limit
40        );
41
42        let new_meta = Metadata {
43            read_size: data.len(),
44            owned_by: owned_by.to_string(),
45            next_block_start,
46            checksum: checksum64(data),
47        };
48
49        let meta_bytes = rkyv::to_bytes::<_, 256>(&new_meta).map_err(|e| {
50            std::io::Error::new(
51                std::io::ErrorKind::Other,
52                format!("serialize metadata failed: {:?}", e),
53            )
54        })?;
55        if meta_bytes.len() > PREFIX_META_SIZE - 2 {
56            return Err(std::io::Error::new(
57                std::io::ErrorKind::InvalidData,
58                "metadata too large",
59            ));
60        }
61
62        let mut meta_buffer = vec![0u8; PREFIX_META_SIZE];
63        // Store actual length in first 2 bytes (little endian)
64        meta_buffer[0] = (meta_bytes.len() & 0xFF) as u8;
65        meta_buffer[1] = ((meta_bytes.len() >> 8) & 0xFF) as u8;
66        // Copy actual metadata starting at byte 2
67        meta_buffer[2..2 + meta_bytes.len()].copy_from_slice(&meta_bytes);
68
69        // Combine and write
70        let mut combined = Vec::with_capacity(PREFIX_META_SIZE + data.len());
71        combined.extend_from_slice(&meta_buffer);
72        combined.extend_from_slice(data);
73
74        let file_offset = self.offset + in_block_offset;
75        self.mmap.write(file_offset as usize, &combined);
76        Ok(())
77    }
78
79    pub(crate) fn read(&self, in_block_offset: u64) -> std::io::Result<(Entry, usize)> {
80        let mut meta_buffer = vec![0; PREFIX_META_SIZE];
81        let file_offset = self.offset + in_block_offset;
82        self.mmap.read(file_offset as usize, &mut meta_buffer);
83
84        // Read the actual metadata length from first 2 bytes
85        let meta_len = (meta_buffer[0] as usize) | ((meta_buffer[1] as usize) << 8);
86
87        if meta_len == 0 || meta_len > PREFIX_META_SIZE - 2 {
88            return Err(std::io::Error::new(
89                std::io::ErrorKind::InvalidData,
90                format!("invalid metadata length: {}", meta_len),
91            ));
92        }
93
94        // Deserialize only the actual metadata bytes (skip the 2-byte length prefix)
95        let mut aligned = rkyv::AlignedVec::with_capacity(meta_len);
96        aligned.extend_from_slice(&meta_buffer[2..2 + meta_len]);
97
98        // SAFETY: `aligned` contains bytes we just read from our own file format.
99        // We bounded `meta_len` to PREFIX_META_SIZE and copy into an `AlignedVec`,
100        // which satisfies alignment requirements of rkyv.
101        let archived = unsafe { rkyv::archived_root::<Metadata>(&aligned[..]) };
102        let meta: Metadata = archived.deserialize(&mut rkyv::Infallible).map_err(|_| {
103            std::io::Error::new(
104                std::io::ErrorKind::InvalidData,
105                "failed to deserialize metadata",
106            )
107        })?;
108        let actual_entry_size = meta.read_size;
109
110        // Read the actual data
111        let new_offset = file_offset + PREFIX_META_SIZE as u64;
112        let mut ret_buffer = vec![0; actual_entry_size];
113        self.mmap.read(new_offset as usize, &mut ret_buffer);
114
115        // Verify checksum
116        let expected = meta.checksum;
117        if checksum64(&ret_buffer) != expected {
118            debug_print!(
119                "[reader] checksum mismatch; skipping corrupted entry at offset={} in file={}, block_id={}",
120                in_block_offset,
121                self.file_path,
122                self.id
123            );
124            return Err(std::io::Error::new(
125                std::io::ErrorKind::InvalidData,
126                "checksum mismatch, data corruption detected",
127            ));
128        }
129
130        let consumed = PREFIX_META_SIZE + actual_entry_size;
131        Ok((Entry { data: ret_buffer }, consumed))
132    }
133
134    pub(crate) fn zero_range(&self, in_block_offset: u64, size: u64) -> std::io::Result<()> {
135        // Zero a small region within this block; used to invalidate headers on rollback
136        // Caller ensures size is reasonable (typically PREFIX_META_SIZE)
137        let len = size as usize;
138        if len == 0 {
139            return Ok(());
140        }
141        let zeros = vec![0u8; len];
142        let file_offset = self.offset + in_block_offset;
143        self.mmap.write(file_offset as usize, &zeros);
144        Ok(())
145    }
146}