1use crate::crc;
7use crate::error::{err, Result, StatusCode};
8
9use std::io::{Read, Write};
10
11use integer_encoding::FixedInt;
12use integer_encoding::FixedIntWriter;
13
14const BLOCK_SIZE: usize = 32 * 1024;
15const HEADER_SIZE: usize = 4 + 2 + 1;
16
17#[derive(Clone, Copy)]
18pub enum RecordType {
19 Full = 1,
20 First = 2,
21 Middle = 3,
22 Last = 4,
23}
24
25pub struct LogWriter<W: Write> {
26 dst: W,
27 current_block_offset: usize,
28 block_size: usize,
29}
30
31impl<W: Write> LogWriter<W> {
32 pub fn new(writer: W) -> LogWriter<W> {
33 LogWriter {
34 dst: writer,
35 current_block_offset: 0,
36 block_size: BLOCK_SIZE,
37 }
38 }
39
40 pub fn new_with_off(writer: W, off: usize) -> LogWriter<W> {
43 let mut w = LogWriter::new(writer);
44 w.current_block_offset = off % BLOCK_SIZE;
45 w
46 }
47
48 pub fn add_record(&mut self, mut record: &[u8]) -> Result<usize> {
49 let mut first_frag = true;
50 let mut result = Ok(0);
51 while result.is_ok() && !record.is_empty() {
52 assert!(self.block_size > HEADER_SIZE);
53
54 let space_left = self.block_size - self.current_block_offset;
55
56 if space_left < HEADER_SIZE {
58 self.dst.write_all(&vec![0, 0, 0, 0, 0, 0][0..space_left])?;
59 self.current_block_offset = 0;
60 }
61
62 let avail_for_data = self.block_size - self.current_block_offset - HEADER_SIZE;
63
64 let data_frag_len = if record.len() < avail_for_data {
65 record.len()
66 } else {
67 avail_for_data
68 };
69
70 let recordtype;
71
72 if first_frag && data_frag_len == record.len() {
73 recordtype = RecordType::Full;
74 } else if first_frag {
75 recordtype = RecordType::First;
76 } else if data_frag_len == record.len() {
77 recordtype = RecordType::Last;
78 } else {
79 recordtype = RecordType::Middle;
80 }
81
82 result = self.emit_record(recordtype, record, data_frag_len);
83 record = &record[data_frag_len..];
84 first_frag = false;
85 }
86 result
87 }
88
89 fn emit_record(&mut self, t: RecordType, data: &[u8], len: usize) -> Result<usize> {
90 assert!(len < 256 * 256);
91
92 let mut digest = crc::digest();
93 digest.update(&[t as u8]);
94 digest.update(&data[0..len]);
95
96 let chksum = mask_crc(digest.finalize());
97
98 let mut s = 0;
99 s += self.dst.write(&chksum.encode_fixed_vec())?;
100 s += self.dst.write_fixedint(len as u16)?;
101 s += self.dst.write(&[t as u8])?;
102 s += self.dst.write(&data[0..len])?;
103
104 self.current_block_offset += s;
105 Ok(s)
106 }
107
108 pub fn flush(&mut self) -> Result<()> {
109 self.dst.flush()?;
110 Ok(())
111 }
112}
113
114pub struct LogReader<R: Read> {
115 src: R,
117 blk_off: usize,
118 blocksize: usize,
119 head_scratch: [u8; 7],
120 checksums: bool,
121}
122
123impl<R: Read> LogReader<R> {
124 pub fn new(src: R, chksum: bool) -> LogReader<R> {
125 LogReader {
126 src,
127 blk_off: 0,
128 blocksize: BLOCK_SIZE,
129 checksums: chksum,
130 head_scratch: [0; 7],
131 }
132 }
133
134 pub fn read(&mut self, dst: &mut Vec<u8>) -> Result<usize> {
136 let mut checksum: u32;
137 let mut length: u16;
138 let mut typ: u8;
139 let mut dst_offset: usize = 0;
140
141 dst.clear();
142
143 loop {
144 if self.blocksize - self.blk_off < HEADER_SIZE {
145 self.src
147 .read_exact(&mut self.head_scratch[0..self.blocksize - self.blk_off])?;
148 self.blk_off = 0;
149 }
150
151 let mut bytes_read = self.src.read(&mut self.head_scratch)?;
152
153 if bytes_read == 0 {
155 return Ok(0);
156 }
157
158 self.blk_off += bytes_read;
159
160 checksum = u32::decode_fixed(&self.head_scratch[0..4]);
161 length = u16::decode_fixed(&self.head_scratch[4..6]);
162 typ = self.head_scratch[6];
163
164 dst.resize(dst_offset + length as usize, 0);
165 bytes_read = self
166 .src
167 .read(&mut dst[dst_offset..dst_offset + length as usize])?;
168 self.blk_off += bytes_read;
169
170 if self.checksums
171 && !self.check_integrity(typ, &dst[dst_offset..dst_offset + bytes_read], checksum)
172 {
173 return err(StatusCode::Corruption, "Invalid Checksum");
174 }
175
176 dst_offset += length as usize;
177
178 if typ == RecordType::Full as u8 || typ == RecordType::Last as u8 {
179 return Ok(dst_offset);
180 } else if typ == RecordType::First as u8 || typ == RecordType::Middle as u8 {
181 continue;
182 }
183 }
184 }
185
186 fn check_integrity(&mut self, typ: u8, data: &[u8], expected: u32) -> bool {
187 let mut digest = crc::digest();
188 digest.update(&[typ]);
189 digest.update(data);
190 unmask_crc(expected) == digest.finalize()
191 }
192}
193
194const MASK_DELTA: u32 = 0xa282ead8;
195
196pub fn mask_crc(c: u32) -> u32 {
197 (c.wrapping_shr(15) | c.wrapping_shl(17)).wrapping_add(MASK_DELTA)
198}
199
200pub fn unmask_crc(mc: u32) -> u32 {
201 let rot = mc.wrapping_sub(MASK_DELTA);
202 rot.wrapping_shr(17) | rot.wrapping_shl(15)
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use std::io::Cursor;
209
210 #[test]
211 fn test_crc_mask_crc() {
212 let mut digest = crc::digest();
213 digest.update("abcde".as_bytes());
214 let sum = digest.finalize();
215 assert_eq!(sum, unmask_crc(mask_crc(sum)));
216 assert!(sum != mask_crc(sum));
217 }
218
219 #[test]
220 fn test_crc_sanity() {
221 assert_eq!(0x8a9136aa, crc::crc32([0_u8; 32]));
222 assert_eq!(0x62a8ab43, crc::crc32([0xff_u8; 32]));
223 }
224
225 #[test]
226 fn test_writer() {
227 let data = &[
228 "hello world. My first log entry.",
229 "and my second",
230 "and my third",
231 ];
232 let mut lw = LogWriter::new(Vec::new());
233 let total_len = data.iter().fold(0, |l, d| l + d.len());
234
235 for d in data {
236 let _ = lw.add_record(d.as_bytes());
237 }
238
239 assert_eq!(lw.current_block_offset, total_len + 3 * super::HEADER_SIZE);
240 }
241
242 #[test]
243 fn test_writer_append() {
244 let data = &[
245 "hello world. My first log entry.",
246 "and my second",
247 "and my third",
248 ];
249
250 let mut dst = vec![0_u8; 1024];
251
252 {
253 let mut lw = LogWriter::new(Cursor::new(dst.as_mut_slice()));
254 for d in data {
255 let _ = lw.add_record(d.as_bytes());
256 }
257 }
258
259 let old = dst.clone();
260
261 {
264 let offset = data[0].len() + super::HEADER_SIZE;
265 let mut lw =
266 LogWriter::new_with_off(Cursor::new(&mut dst.as_mut_slice()[offset..]), offset);
267 for d in &data[1..] {
268 let _ = lw.add_record(d.as_bytes());
269 }
270 }
271 assert_eq!(old, dst);
272 }
273
274 #[test]
275 fn test_reader() {
276 let data = [
277 "abcdefghi".as_bytes().to_vec(), "123456789012".as_bytes().to_vec(), "0101010101010101010101".as_bytes().to_vec(),
280 ]; let mut lw = LogWriter::new(Vec::new());
282 lw.block_size = super::HEADER_SIZE + 10;
283
284 for e in data.iter() {
285 assert!(lw.add_record(e).is_ok());
286 }
287
288 assert_eq!(lw.dst.len(), 93);
289 lw.dst[2] += 1;
291
292 let mut lr = LogReader::new(lw.dst.as_slice(), true);
293 lr.blocksize = super::HEADER_SIZE + 10;
294 let mut dst = Vec::with_capacity(128);
295
296 assert_eq!(
298 err(StatusCode::Corruption, "Invalid Checksum"),
299 lr.read(&mut dst)
300 );
301
302 let mut i = 1;
303 loop {
304 let r = lr.read(&mut dst);
305
306 if r.is_err() {
307 panic!("{}", r.unwrap_err());
308 } else if r.unwrap() == 0 {
309 break;
310 }
311
312 assert_eq!(dst, data[i]);
313 i += 1;
314 }
315 assert_eq!(i, data.len());
316 }
317}