Skip to main content

crous_core/
block.rs

1//! Block-level framing for the Crous binary format.
2//!
3//! Blocks are the fundamental container for data in a Crous file.
4//! Each block has:
5//! - A type byte identifying the block kind (data, index, schema, etc.)
6//! - A varint-encoded length
7//! - A compression type byte
8//! - An 8-byte XXH64 checksum of the (uncompressed) payload
9//! - The payload bytes
10//!
11//! This module provides `BlockWriter` for building blocks incrementally
12//! and `BlockReader` for reading them from a byte slice.
13
14use crate::checksum::compute_xxh64;
15use crate::error::{CrousError, Result};
16use crate::varint::{decode_varint, encode_varint_vec};
17use crate::wire::{BlockType, CompressionType};
18
19/// Builder for a single block's payload, with framing support.
20pub struct BlockWriter {
21    block_type: BlockType,
22    compression: CompressionType,
23    payload: Vec<u8>,
24}
25
26impl BlockWriter {
27    /// Create a new block writer of the given type.
28    pub fn new(block_type: BlockType) -> Self {
29        Self {
30            block_type,
31            compression: CompressionType::None,
32            payload: Vec::with_capacity(4096),
33        }
34    }
35
36    /// Set the compression type for this block.
37    pub fn set_compression(&mut self, comp: CompressionType) {
38        self.compression = comp;
39    }
40
41    /// Write raw bytes into the block payload.
42    pub fn write(&mut self, data: &[u8]) {
43        self.payload.extend_from_slice(data);
44    }
45
46    /// Get a mutable reference to the payload buffer.
47    pub fn payload_mut(&mut self) -> &mut Vec<u8> {
48        &mut self.payload
49    }
50
51    /// Get the current payload size.
52    pub fn payload_len(&self) -> usize {
53        self.payload.len()
54    }
55
56    /// Finalize and serialize this block into a byte vector.
57    ///
58    /// Layout: `block_type(1)` | `block_len(varint)` | `comp_type(1)` | `checksum(8)` | `payload`
59    pub fn finish(self) -> Vec<u8> {
60        let checksum = compute_xxh64(&self.payload);
61        let mut out = Vec::with_capacity(1 + 10 + 1 + 8 + self.payload.len());
62
63        out.push(self.block_type as u8);
64        encode_varint_vec(self.payload.len() as u64, &mut out);
65        out.push(self.compression as u8);
66        out.extend_from_slice(&checksum.to_le_bytes());
67        out.extend_from_slice(&self.payload);
68
69        out
70    }
71}
72
73/// A parsed block read from binary data.
74#[derive(Debug)]
75pub struct BlockReader<'a> {
76    /// The block type.
77    pub block_type: BlockType,
78    /// The compression type.
79    pub compression: CompressionType,
80    /// The expected checksum.
81    pub checksum: u64,
82    /// The payload bytes (borrowed from input).
83    pub payload: &'a [u8],
84}
85
86impl<'a> BlockReader<'a> {
87    /// Parse a block from `data` starting at `offset`.
88    /// Returns `(BlockReader, bytes_consumed)`.
89    pub fn parse(data: &'a [u8], offset: usize) -> Result<(Self, usize)> {
90        let mut pos = offset;
91
92        if pos >= data.len() {
93            return Err(CrousError::UnexpectedEof(pos));
94        }
95
96        let block_type_byte = data[pos];
97        pos += 1;
98        let block_type = BlockType::from_byte(block_type_byte)
99            .ok_or(CrousError::InvalidBlockType(block_type_byte))?;
100
101        let (block_len, varint_bytes) = decode_varint(data, pos)?;
102        pos += varint_bytes;
103        let block_len = block_len as usize;
104
105        if pos >= data.len() {
106            return Err(CrousError::UnexpectedEof(pos));
107        }
108        let comp_byte = data[pos];
109        pos += 1;
110        let compression = CompressionType::from_byte(comp_byte)
111            .ok_or(CrousError::UnknownCompression(comp_byte))?;
112
113        if pos + 8 > data.len() {
114            return Err(CrousError::UnexpectedEof(pos));
115        }
116        let checksum = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
117        pos += 8;
118
119        if pos.checked_add(block_len).is_none_or(|end| end > data.len()) {
120            return Err(CrousError::UnexpectedEof(pos));
121        }
122        let payload = &data[pos..pos + block_len];
123        pos += block_len;
124
125        Ok((
126            Self {
127                block_type,
128                compression,
129                checksum,
130                payload,
131            },
132            pos - offset,
133        ))
134    }
135
136    /// Verify the block's checksum matches its payload.
137    pub fn verify_checksum(&self) -> bool {
138        compute_xxh64(self.payload) == self.checksum
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145
146    #[test]
147    fn block_roundtrip() {
148        let mut writer = BlockWriter::new(BlockType::Data);
149        writer.write(b"hello world");
150        let bytes = writer.finish();
151
152        let (reader, consumed) = BlockReader::parse(&bytes, 0).unwrap();
153        assert_eq!(consumed, bytes.len());
154        assert_eq!(reader.block_type, BlockType::Data);
155        assert_eq!(reader.compression, CompressionType::None);
156        assert_eq!(reader.payload, b"hello world");
157        assert!(reader.verify_checksum());
158    }
159
160    #[test]
161    fn block_checksum_failure() {
162        let mut writer = BlockWriter::new(BlockType::Data);
163        writer.write(b"test data");
164        let mut bytes = writer.finish();
165
166        // Corrupt last byte of payload.
167        let last = bytes.len() - 1;
168        bytes[last] ^= 0xFF;
169
170        let (reader, _) = BlockReader::parse(&bytes, 0).unwrap();
171        assert!(!reader.verify_checksum());
172    }
173
174    #[test]
175    fn block_types() {
176        for bt in [
177            BlockType::Data,
178            BlockType::Index,
179            BlockType::Schema,
180            BlockType::StringDict,
181        ] {
182            let writer = BlockWriter::new(bt);
183            let bytes = writer.finish();
184            let (reader, _) = BlockReader::parse(&bytes, 0).unwrap();
185            assert_eq!(reader.block_type, bt);
186        }
187    }
188}