rusty_leveldb/
log.rs

1//! A log consists of a number of blocks.
2//! A block consists of a number of records and an optional trailer (filler).
3//! A record is a bytestring: [checksum: uint32, length: uint16, type: uint8, data: [u8]]
4//! checksum is the crc32 sum of type and data; type is one of RecordType::{Full/First/Middle/Last}
5
6use crate::crc;
7use crate::error::{err, Result, StatusCode};
8
9use std::io::{Read, Write};
10
11use integer_encoding::FixedInt;
12use integer_encoding::FixedIntWriter;
13
14const BLOCK_SIZE: usize = 32 * 1024;
15const HEADER_SIZE: usize = 4 + 2 + 1;
16
17#[derive(Clone, Copy)]
18pub enum RecordType {
19    Full = 1,
20    First = 2,
21    Middle = 3,
22    Last = 4,
23}
24
25pub struct LogWriter<W: Write> {
26    dst: W,
27    current_block_offset: usize,
28    block_size: usize,
29}
30
31impl<W: Write> LogWriter<W> {
32    pub fn new(writer: W) -> LogWriter<W> {
33        LogWriter {
34            dst: writer,
35            current_block_offset: 0,
36            block_size: BLOCK_SIZE,
37        }
38    }
39
40    /// new_with_off opens a writer starting at some offset of an existing log file. The file must
41    /// have the default block size.
42    pub fn new_with_off(writer: W, off: usize) -> LogWriter<W> {
43        let mut w = LogWriter::new(writer);
44        w.current_block_offset = off % BLOCK_SIZE;
45        w
46    }
47
48    pub fn add_record(&mut self, mut record: &[u8]) -> Result<usize> {
49        let mut first_frag = true;
50        let mut result = Ok(0);
51        while result.is_ok() && !record.is_empty() {
52            assert!(self.block_size > HEADER_SIZE);
53
54            let space_left = self.block_size - self.current_block_offset;
55
56            // Fill up block; go to next block.
57            if space_left < HEADER_SIZE {
58                self.dst.write_all(&vec![0, 0, 0, 0, 0, 0][0..space_left])?;
59                self.current_block_offset = 0;
60            }
61
62            let avail_for_data = self.block_size - self.current_block_offset - HEADER_SIZE;
63
64            let data_frag_len = if record.len() < avail_for_data {
65                record.len()
66            } else {
67                avail_for_data
68            };
69
70            let recordtype;
71
72            if first_frag && data_frag_len == record.len() {
73                recordtype = RecordType::Full;
74            } else if first_frag {
75                recordtype = RecordType::First;
76            } else if data_frag_len == record.len() {
77                recordtype = RecordType::Last;
78            } else {
79                recordtype = RecordType::Middle;
80            }
81
82            result = self.emit_record(recordtype, record, data_frag_len);
83            record = &record[data_frag_len..];
84            first_frag = false;
85        }
86        result
87    }
88
89    fn emit_record(&mut self, t: RecordType, data: &[u8], len: usize) -> Result<usize> {
90        assert!(len < 256 * 256);
91
92        let mut digest = crc::digest();
93        digest.update(&[t as u8]);
94        digest.update(&data[0..len]);
95
96        let chksum = mask_crc(digest.finalize());
97
98        let mut s = 0;
99        s += self.dst.write(&chksum.encode_fixed_vec())?;
100        s += self.dst.write_fixedint(len as u16)?;
101        s += self.dst.write(&[t as u8])?;
102        s += self.dst.write(&data[0..len])?;
103
104        self.current_block_offset += s;
105        Ok(s)
106    }
107
108    pub fn flush(&mut self) -> Result<()> {
109        self.dst.flush()?;
110        Ok(())
111    }
112}
113
114pub struct LogReader<R: Read> {
115    // TODO: Wrap src in a buffer to enhance read performance.
116    src: R,
117    blk_off: usize,
118    blocksize: usize,
119    head_scratch: [u8; 7],
120    checksums: bool,
121}
122
123impl<R: Read> LogReader<R> {
124    pub fn new(src: R, chksum: bool) -> LogReader<R> {
125        LogReader {
126            src,
127            blk_off: 0,
128            blocksize: BLOCK_SIZE,
129            checksums: chksum,
130            head_scratch: [0; 7],
131        }
132    }
133
134    /// EOF is signalled by Ok(0)
135    pub fn read(&mut self, dst: &mut Vec<u8>) -> Result<usize> {
136        let mut checksum: u32;
137        let mut length: u16;
138        let mut typ: u8;
139        let mut dst_offset: usize = 0;
140
141        dst.clear();
142
143        loop {
144            if self.blocksize - self.blk_off < HEADER_SIZE {
145                // skip to next block
146                self.src
147                    .read_exact(&mut self.head_scratch[0..self.blocksize - self.blk_off])?;
148                self.blk_off = 0;
149            }
150
151            let mut bytes_read = self.src.read(&mut self.head_scratch)?;
152
153            // EOF
154            if bytes_read == 0 {
155                return Ok(0);
156            }
157
158            self.blk_off += bytes_read;
159
160            checksum = u32::decode_fixed(&self.head_scratch[0..4]);
161            length = u16::decode_fixed(&self.head_scratch[4..6]);
162            typ = self.head_scratch[6];
163
164            dst.resize(dst_offset + length as usize, 0);
165            bytes_read = self
166                .src
167                .read(&mut dst[dst_offset..dst_offset + length as usize])?;
168            self.blk_off += bytes_read;
169
170            if self.checksums
171                && !self.check_integrity(typ, &dst[dst_offset..dst_offset + bytes_read], checksum)
172            {
173                return err(StatusCode::Corruption, "Invalid Checksum");
174            }
175
176            dst_offset += length as usize;
177
178            if typ == RecordType::Full as u8 {
179                return Ok(dst_offset);
180            } else if typ == RecordType::First as u8 {
181                continue;
182            } else if typ == RecordType::Middle as u8 {
183                continue;
184            } else if typ == RecordType::Last as u8 {
185                return Ok(dst_offset);
186            }
187        }
188    }
189
190    fn check_integrity(&mut self, typ: u8, data: &[u8], expected: u32) -> bool {
191        let mut digest = crc::digest();
192        digest.update(&[typ]);
193        digest.update(data);
194        unmask_crc(expected) == digest.finalize()
195    }
196}
197
198const MASK_DELTA: u32 = 0xa282ead8;
199
200pub fn mask_crc(c: u32) -> u32 {
201    (c.wrapping_shr(15) | c.wrapping_shl(17)).wrapping_add(MASK_DELTA)
202}
203
204pub fn unmask_crc(mc: u32) -> u32 {
205    let rot = mc.wrapping_sub(MASK_DELTA);
206    rot.wrapping_shr(17) | rot.wrapping_shl(15)
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212    use std::io::Cursor;
213
214    #[test]
215    fn test_crc_mask_crc() {
216        let mut digest = crc::digest();
217        digest.update("abcde".as_bytes());
218        let sum = digest.finalize();
219        assert_eq!(sum, unmask_crc(mask_crc(sum)));
220        assert!(sum != mask_crc(sum));
221    }
222
223    #[test]
224    fn test_crc_sanity() {
225        assert_eq!(0x8a9136aa, crc::crc32([0_u8; 32]));
226        assert_eq!(0x62a8ab43, crc::crc32([0xff_u8; 32]));
227    }
228
229    #[test]
230    fn test_writer() {
231        let data = &[
232            "hello world. My first log entry.",
233            "and my second",
234            "and my third",
235        ];
236        let mut lw = LogWriter::new(Vec::new());
237        let total_len = data.iter().fold(0, |l, d| l + d.len());
238
239        for d in data {
240            let _ = lw.add_record(d.as_bytes());
241        }
242
243        assert_eq!(lw.current_block_offset, total_len + 3 * super::HEADER_SIZE);
244    }
245
246    #[test]
247    fn test_writer_append() {
248        let data = &[
249            "hello world. My first log entry.",
250            "and my second",
251            "and my third",
252        ];
253
254        let mut dst = vec![0_u8; 1024];
255
256        {
257            let mut lw = LogWriter::new(Cursor::new(dst.as_mut_slice()));
258            for d in data {
259                let _ = lw.add_record(d.as_bytes());
260            }
261        }
262
263        let old = dst.clone();
264
265        // Ensure that new_with_off positions the writer correctly. Some ugly mucking about with
266        // cursors and stuff is required.
267        {
268            let offset = data[0].len() + super::HEADER_SIZE;
269            let mut lw =
270                LogWriter::new_with_off(Cursor::new(&mut dst.as_mut_slice()[offset..]), offset);
271            for d in &data[1..] {
272                let _ = lw.add_record(d.as_bytes());
273            }
274        }
275        assert_eq!(old, dst);
276    }
277
278    #[test]
279    fn test_reader() {
280        let data = [
281            "abcdefghi".as_bytes().to_vec(),    // fits one block of 17
282            "123456789012".as_bytes().to_vec(), // spans two blocks of 17
283            "0101010101010101010101".as_bytes().to_vec(),
284        ]; // spans three blocks of 17
285        let mut lw = LogWriter::new(Vec::new());
286        lw.block_size = super::HEADER_SIZE + 10;
287
288        for e in data.iter() {
289            assert!(lw.add_record(e).is_ok());
290        }
291
292        assert_eq!(lw.dst.len(), 93);
293        // Corrupt first record.
294        lw.dst[2] += 1;
295
296        let mut lr = LogReader::new(lw.dst.as_slice(), true);
297        lr.blocksize = super::HEADER_SIZE + 10;
298        let mut dst = Vec::with_capacity(128);
299
300        // First record is corrupted.
301        assert_eq!(
302            err(StatusCode::Corruption, "Invalid Checksum"),
303            lr.read(&mut dst)
304        );
305
306        let mut i = 1;
307        loop {
308            let r = lr.read(&mut dst);
309
310            if r.is_err() {
311                panic!("{}", r.unwrap_err());
312            } else if r.unwrap() == 0 {
313                break;
314            }
315
316            assert_eq!(dst, data[i]);
317            i += 1;
318        }
319        assert_eq!(i, data.len());
320    }
321}