1use crate::checksum::compute_xxh64;
15use crate::error::{CrousError, Result};
16use crate::varint::{decode_varint, encode_varint_vec};
17use crate::wire::{BlockType, CompressionType};
18
19pub struct BlockWriter {
21 block_type: BlockType,
22 compression: CompressionType,
23 payload: Vec<u8>,
24}
25
26impl BlockWriter {
27 pub fn new(block_type: BlockType) -> Self {
29 Self {
30 block_type,
31 compression: CompressionType::None,
32 payload: Vec::with_capacity(4096),
33 }
34 }
35
36 pub fn set_compression(&mut self, comp: CompressionType) {
38 self.compression = comp;
39 }
40
41 pub fn write(&mut self, data: &[u8]) {
43 self.payload.extend_from_slice(data);
44 }
45
46 pub fn payload_mut(&mut self) -> &mut Vec<u8> {
48 &mut self.payload
49 }
50
51 pub fn payload_len(&self) -> usize {
53 self.payload.len()
54 }
55
56 pub fn finish(self) -> Vec<u8> {
60 let checksum = compute_xxh64(&self.payload);
61 let mut out = Vec::with_capacity(1 + 10 + 1 + 8 + self.payload.len());
62
63 out.push(self.block_type as u8);
64 encode_varint_vec(self.payload.len() as u64, &mut out);
65 out.push(self.compression as u8);
66 out.extend_from_slice(&checksum.to_le_bytes());
67 out.extend_from_slice(&self.payload);
68
69 out
70 }
71}
72
73#[derive(Debug)]
75pub struct BlockReader<'a> {
76 pub block_type: BlockType,
78 pub compression: CompressionType,
80 pub checksum: u64,
82 pub payload: &'a [u8],
84}
85
86impl<'a> BlockReader<'a> {
87 pub fn parse(data: &'a [u8], offset: usize) -> Result<(Self, usize)> {
90 let mut pos = offset;
91
92 if pos >= data.len() {
93 return Err(CrousError::UnexpectedEof(pos));
94 }
95
96 let block_type_byte = data[pos];
97 pos += 1;
98 let block_type = BlockType::from_byte(block_type_byte)
99 .ok_or(CrousError::InvalidBlockType(block_type_byte))?;
100
101 let (block_len, varint_bytes) = decode_varint(data, pos)?;
102 pos += varint_bytes;
103 let block_len = block_len as usize;
104
105 if pos >= data.len() {
106 return Err(CrousError::UnexpectedEof(pos));
107 }
108 let comp_byte = data[pos];
109 pos += 1;
110 let compression = CompressionType::from_byte(comp_byte)
111 .ok_or(CrousError::UnknownCompression(comp_byte))?;
112
113 if pos + 8 > data.len() {
114 return Err(CrousError::UnexpectedEof(pos));
115 }
116 let checksum = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
117 pos += 8;
118
119 if pos.checked_add(block_len).is_none_or(|end| end > data.len()) {
120 return Err(CrousError::UnexpectedEof(pos));
121 }
122 let payload = &data[pos..pos + block_len];
123 pos += block_len;
124
125 Ok((
126 Self {
127 block_type,
128 compression,
129 checksum,
130 payload,
131 },
132 pos - offset,
133 ))
134 }
135
136 pub fn verify_checksum(&self) -> bool {
138 compute_xxh64(self.payload) == self.checksum
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use super::*;
145
146 #[test]
147 fn block_roundtrip() {
148 let mut writer = BlockWriter::new(BlockType::Data);
149 writer.write(b"hello world");
150 let bytes = writer.finish();
151
152 let (reader, consumed) = BlockReader::parse(&bytes, 0).unwrap();
153 assert_eq!(consumed, bytes.len());
154 assert_eq!(reader.block_type, BlockType::Data);
155 assert_eq!(reader.compression, CompressionType::None);
156 assert_eq!(reader.payload, b"hello world");
157 assert!(reader.verify_checksum());
158 }
159
160 #[test]
161 fn block_checksum_failure() {
162 let mut writer = BlockWriter::new(BlockType::Data);
163 writer.write(b"test data");
164 let mut bytes = writer.finish();
165
166 let last = bytes.len() - 1;
168 bytes[last] ^= 0xFF;
169
170 let (reader, _) = BlockReader::parse(&bytes, 0).unwrap();
171 assert!(!reader.verify_checksum());
172 }
173
174 #[test]
175 fn block_types() {
176 for bt in [
177 BlockType::Data,
178 BlockType::Index,
179 BlockType::Schema,
180 BlockType::StringDict,
181 ] {
182 let writer = BlockWriter::new(bt);
183 let bytes = writer.finish();
184 let (reader, _) = BlockReader::parse(&bytes, 0).unwrap();
185 assert_eq!(reader.block_type, bt);
186 }
187 }
188}