1use crate::crc;
7use crate::error::{err, Result, StatusCode};
8
9use std::io::{Read, Write};
10
11use integer_encoding::FixedInt;
12use integer_encoding::FixedIntWriter;
13
14const BLOCK_SIZE: usize = 32 * 1024;
15const HEADER_SIZE: usize = 4 + 2 + 1;
16
17#[derive(Clone, Copy)]
18pub enum RecordType {
19 Full = 1,
20 First = 2,
21 Middle = 3,
22 Last = 4,
23}
24
25pub struct LogWriter<W: Write> {
26 dst: W,
27 current_block_offset: usize,
28 block_size: usize,
29}
30
31impl<W: Write> LogWriter<W> {
32 pub fn new(writer: W) -> LogWriter<W> {
33 LogWriter {
34 dst: writer,
35 current_block_offset: 0,
36 block_size: BLOCK_SIZE,
37 }
38 }
39
40 pub fn new_with_off(writer: W, off: usize) -> LogWriter<W> {
43 let mut w = LogWriter::new(writer);
44 w.current_block_offset = off % BLOCK_SIZE;
45 w
46 }
47
48 pub fn add_record(&mut self, mut record: &[u8]) -> Result<usize> {
49 let mut first_frag = true;
50 let mut result = Ok(0);
51 while result.is_ok() && !record.is_empty() {
52 assert!(self.block_size > HEADER_SIZE);
53
54 let space_left = self.block_size - self.current_block_offset;
55
56 if space_left < HEADER_SIZE {
58 self.dst.write_all(&vec![0, 0, 0, 0, 0, 0][0..space_left])?;
59 self.current_block_offset = 0;
60 }
61
62 let avail_for_data = self.block_size - self.current_block_offset - HEADER_SIZE;
63
64 let data_frag_len = if record.len() < avail_for_data {
65 record.len()
66 } else {
67 avail_for_data
68 };
69
70 let recordtype;
71
72 if first_frag && data_frag_len == record.len() {
73 recordtype = RecordType::Full;
74 } else if first_frag {
75 recordtype = RecordType::First;
76 } else if data_frag_len == record.len() {
77 recordtype = RecordType::Last;
78 } else {
79 recordtype = RecordType::Middle;
80 }
81
82 result = self.emit_record(recordtype, record, data_frag_len);
83 record = &record[data_frag_len..];
84 first_frag = false;
85 }
86 result
87 }
88
89 fn emit_record(&mut self, t: RecordType, data: &[u8], len: usize) -> Result<usize> {
90 assert!(len < 256 * 256);
91
92 let mut digest = crc::digest();
93 digest.update(&[t as u8]);
94 digest.update(&data[0..len]);
95
96 let chksum = mask_crc(digest.finalize());
97
98 let mut s = 0;
99 s += self.dst.write(&chksum.encode_fixed_vec())?;
100 s += self.dst.write_fixedint(len as u16)?;
101 s += self.dst.write(&[t as u8])?;
102 s += self.dst.write(&data[0..len])?;
103
104 self.current_block_offset += s;
105 Ok(s)
106 }
107
108 pub fn flush(&mut self) -> Result<()> {
109 self.dst.flush()?;
110 Ok(())
111 }
112}
113
114pub struct LogReader<R: Read> {
115 src: R,
117 blk_off: usize,
118 blocksize: usize,
119 head_scratch: [u8; 7],
120 checksums: bool,
121}
122
123impl<R: Read> LogReader<R> {
124 pub fn new(src: R, chksum: bool) -> LogReader<R> {
125 LogReader {
126 src,
127 blk_off: 0,
128 blocksize: BLOCK_SIZE,
129 checksums: chksum,
130 head_scratch: [0; 7],
131 }
132 }
133
134 pub fn read(&mut self, dst: &mut Vec<u8>) -> Result<usize> {
136 let mut checksum: u32;
137 let mut length: u16;
138 let mut typ: u8;
139 let mut dst_offset: usize = 0;
140
141 dst.clear();
142
143 loop {
144 if self.blocksize - self.blk_off < HEADER_SIZE {
145 self.src
147 .read_exact(&mut self.head_scratch[0..self.blocksize - self.blk_off])?;
148 self.blk_off = 0;
149 }
150
151 let mut bytes_read = self.src.read(&mut self.head_scratch)?;
152
153 if bytes_read == 0 {
155 return Ok(0);
156 }
157
158 self.blk_off += bytes_read;
159
160 checksum = u32::decode_fixed(&self.head_scratch[0..4]);
161 length = u16::decode_fixed(&self.head_scratch[4..6]);
162 typ = self.head_scratch[6];
163
164 dst.resize(dst_offset + length as usize, 0);
165 bytes_read = self
166 .src
167 .read(&mut dst[dst_offset..dst_offset + length as usize])?;
168 self.blk_off += bytes_read;
169
170 if self.checksums
171 && !self.check_integrity(typ, &dst[dst_offset..dst_offset + bytes_read], checksum)
172 {
173 return err(StatusCode::Corruption, "Invalid Checksum");
174 }
175
176 dst_offset += length as usize;
177
178 if typ == RecordType::Full as u8 {
179 return Ok(dst_offset);
180 } else if typ == RecordType::First as u8 {
181 continue;
182 } else if typ == RecordType::Middle as u8 {
183 continue;
184 } else if typ == RecordType::Last as u8 {
185 return Ok(dst_offset);
186 }
187 }
188 }
189
190 fn check_integrity(&mut self, typ: u8, data: &[u8], expected: u32) -> bool {
191 let mut digest = crc::digest();
192 digest.update(&[typ]);
193 digest.update(data);
194 unmask_crc(expected) == digest.finalize()
195 }
196}
197
198const MASK_DELTA: u32 = 0xa282ead8;
199
200pub fn mask_crc(c: u32) -> u32 {
201 (c.wrapping_shr(15) | c.wrapping_shl(17)).wrapping_add(MASK_DELTA)
202}
203
204pub fn unmask_crc(mc: u32) -> u32 {
205 let rot = mc.wrapping_sub(MASK_DELTA);
206 rot.wrapping_shr(17) | rot.wrapping_shl(15)
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212 use std::io::Cursor;
213
214 #[test]
215 fn test_crc_mask_crc() {
216 let mut digest = crc::digest();
217 digest.update("abcde".as_bytes());
218 let sum = digest.finalize();
219 assert_eq!(sum, unmask_crc(mask_crc(sum)));
220 assert!(sum != mask_crc(sum));
221 }
222
223 #[test]
224 fn test_crc_sanity() {
225 assert_eq!(0x8a9136aa, crc::crc32([0_u8; 32]));
226 assert_eq!(0x62a8ab43, crc::crc32([0xff_u8; 32]));
227 }
228
229 #[test]
230 fn test_writer() {
231 let data = &[
232 "hello world. My first log entry.",
233 "and my second",
234 "and my third",
235 ];
236 let mut lw = LogWriter::new(Vec::new());
237 let total_len = data.iter().fold(0, |l, d| l + d.len());
238
239 for d in data {
240 let _ = lw.add_record(d.as_bytes());
241 }
242
243 assert_eq!(lw.current_block_offset, total_len + 3 * super::HEADER_SIZE);
244 }
245
246 #[test]
247 fn test_writer_append() {
248 let data = &[
249 "hello world. My first log entry.",
250 "and my second",
251 "and my third",
252 ];
253
254 let mut dst = vec![0_u8; 1024];
255
256 {
257 let mut lw = LogWriter::new(Cursor::new(dst.as_mut_slice()));
258 for d in data {
259 let _ = lw.add_record(d.as_bytes());
260 }
261 }
262
263 let old = dst.clone();
264
265 {
268 let offset = data[0].len() + super::HEADER_SIZE;
269 let mut lw =
270 LogWriter::new_with_off(Cursor::new(&mut dst.as_mut_slice()[offset..]), offset);
271 for d in &data[1..] {
272 let _ = lw.add_record(d.as_bytes());
273 }
274 }
275 assert_eq!(old, dst);
276 }
277
278 #[test]
279 fn test_reader() {
280 let data = [
281 "abcdefghi".as_bytes().to_vec(), "123456789012".as_bytes().to_vec(), "0101010101010101010101".as_bytes().to_vec(),
284 ]; let mut lw = LogWriter::new(Vec::new());
286 lw.block_size = super::HEADER_SIZE + 10;
287
288 for e in data.iter() {
289 assert!(lw.add_record(e).is_ok());
290 }
291
292 assert_eq!(lw.dst.len(), 93);
293 lw.dst[2] += 1;
295
296 let mut lr = LogReader::new(lw.dst.as_slice(), true);
297 lr.blocksize = super::HEADER_SIZE + 10;
298 let mut dst = Vec::with_capacity(128);
299
300 assert_eq!(
302 err(StatusCode::Corruption, "Invalid Checksum"),
303 lr.read(&mut dst)
304 );
305
306 let mut i = 1;
307 loop {
308 let r = lr.read(&mut dst);
309
310 if r.is_err() {
311 panic!("{}", r.unwrap_err());
312 } else if r.unwrap() == 0 {
313 break;
314 }
315
316 assert_eq!(dst, data[i]);
317 i += 1;
318 }
319 assert_eq!(i, data.len());
320 }
321}