crc_frame/
lib.rs

1//! Simple tools for reading and writing crc-checked frames of bytes.
2//! * Uses crc32fast for a 4-byte crc
3//! * Uses varint for frame sizing
4//! * Tested against libfuzzer
5//!
6//! The `varint` module is also public for direct use.
7
8pub mod varint;
9
10use std::collections::VecDeque;
11use std::fs;
12use std::io::{self, BufRead, Read, Write};
13#[cfg(unix)]
14use std::os::unix::fs::FileExt;
15
16use crc32fast::Hasher;
17use fault_injection::{annotate, fallible, maybe};
18
19const MAX_HEADER_SIZE: usize = 13;
20const CRC_LEN: usize = 4;
21const CRC_BEGIN: usize = 0;
22const CRC_END: usize = CRC_LEN;
23const VARINT_BEGIN: usize = CRC_END;
24
25/// Write a crc'd frame into the provided `Write` instance. Returns the
26/// number of bytes written in total, including the varint size and crc.
27/// This is always equivalent to a [`std::io::Write::write_all`] call
28/// due to the impossibility to write partial frames.
29///
30/// # Examples
31///
32/// ```
33/// use crc_frame::{write_frame_into, parse_frame};
34///
35/// let data = b"12345";
36///
37/// let mut buf = vec![];
38///
39/// write_frame_into(&mut buf, data).unwrap();
40///
41/// let (begin, end) = parse_frame(&buf).unwrap();
42///
43/// assert_eq!(&buf[begin..end], data);
44/// ```
45pub fn write_frame_into<W: io::Write>(mut writer: W, buf: &[u8]) -> io::Result<usize> {
46    let (header_buf, header_end_offset) = frame_header(buf);
47
48    fallible!(writer.write_all(&header_buf[..header_end_offset]));
49    fallible!(writer.write_all(buf));
50
51    Ok(header_end_offset + buf.len())
52}
53
54/// A simple encoder that will wrap any passed data
55/// into a crc'ed frame, suitable for stacking with
56/// other encoders for compression or serialization
57/// etc...
58pub struct Encoder<W: Write> {
59    inner: W,
60}
61
62impl<W: Write> Encoder<W> {
63    pub const fn new(inner: W) -> Encoder<W> {
64        Encoder { inner }
65    }
66}
67
68impl<W: Write> Write for Encoder<W> {
69    /// Write a crc'd frame into the provided `Write` instance. Returns the
70    /// number of bytes written in total, including the varint size and crc.
71    /// This is always equivalent to a [`std::io::Write::write_all`] call
72    /// due to the impossibility to write partial frames.
73    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
74        write_frame_into(&mut self.inner, buf)?;
75        Ok(buf.len())
76    }
77
78    fn flush(&mut self) -> io::Result<()> {
79        self.inner.flush()
80    }
81}
82
83/// A simple decoder that will parse data from the
84/// inner `Read` as a crc'ed frame. Suitable for
85/// stacking with other decoders for decompression
86/// or deserialization etc...
87///
88/// This will pull data from the inner `Read`
89pub struct Decoder<R: Read> {
90    inner: R,
91    // NB: buf always contains at most one single frame
92    buf: VecDeque<u8>,
93    capacity: usize,
94}
95
96impl<R: Read> Decoder<R> {
97    pub const fn new(inner: R) -> Decoder<R> {
98        Decoder {
99            inner,
100            buf: VecDeque::new(),
101            capacity: 128 * 1024,
102        }
103    }
104
105    pub fn with_capacity(capacity: usize, inner: R) -> Decoder<R> {
106        Decoder {
107            inner,
108            buf: VecDeque::with_capacity(capacity),
109            capacity,
110        }
111    }
112}
113
114impl<R: Read> Read for Decoder<R> {
115    /// Fills up to one frame into the provided buffer.
116    fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
117        fallible!(self.fill_buf());
118
119        let bytes_copied = usize::try_from(io::copy(&mut self.buf, &mut buf)?).unwrap();
120
121        Ok(bytes_copied)
122    }
123}
124
125impl<R: Read> BufRead for Decoder<R> {
126    fn fill_buf(&mut self) -> io::Result<&[u8]> {
127        if self.buf.is_empty() {
128            fallible!(read_frame_from_reader_into_writer(
129                &mut self.inner,
130                &mut self.buf,
131                self.capacity
132            ));
133        }
134
135        let (l, r) = self.buf.as_slices();
136        assert!(r.is_empty());
137        Ok(l)
138    }
139
140    fn consume(&mut self, amt: usize) {
141        self.buf.drain(..amt);
142    }
143}
144
145/// Write a crc'd frame into the provided `File` at the given offset.
146/// Returns the number of bytes written in total, including the varint size and crc.
147#[cfg(unix)]
148pub fn write_frame_at(buf: &[u8], file: &fs::File, at: u64) -> io::Result<usize> {
149    let (header_buf, header_end_offset) = frame_header(buf);
150    let header = &header_buf[..header_end_offset];
151
152    fallible!(file.write_all_at(header, at));
153    fallible!(file.write_all_at(buf, at + header.len() as u64));
154
155    Ok(header_end_offset + buf.len())
156}
157
158fn uninit_boxed_slice(len: usize) -> Box<[u8]> {
159    use std::alloc::{alloc, Layout};
160
161    let layout = Layout::array::<u8>(len).unwrap();
162
163    unsafe {
164        let ptr = alloc(layout);
165        let slice = std::slice::from_raw_parts_mut(ptr, len);
166        Box::from_raw(slice)
167    }
168}
169
170/// Read a frame out of the provided `Read` implementation. This calls
171/// [`std::io::Read::read_exact`] many times under the hood, so it's a
172/// good idea to use a buffered source of data to avoid high numbers of
173/// syscalls.
174pub fn read_frame_from_reader_into_writer<R: io::Read, W: io::Write>(
175    mut reader: R,
176    mut writer: W,
177    max_len: usize,
178) -> io::Result<usize> {
179    let mut crc_bytes = [0; 4];
180    let varint_buf = &mut [0; 9];
181
182    fallible!(reader.read_exact(&mut crc_bytes));
183    fallible!(reader.read_exact(&mut varint_buf[..1]));
184
185    let varint_size = varint::size_of_varint_from_first_byte(varint_buf[0]);
186
187    fallible!(reader.read_exact(&mut varint_buf[1..varint_size]));
188
189    let (buf_len_u64, _varint_len) = varint::deserialize(varint_buf)?;
190
191    let data_len = if let Ok(data_len) = usize::try_from(buf_len_u64) {
192        data_len
193    } else {
194        return Err(annotate!(io::Error::new(
195            io::ErrorKind::InvalidData,
196            "encountered a corrupt varint len or this platform \
197            cannot represent the required data size as a usize"
198        )));
199    };
200
201    if data_len > max_len {
202        return Err(annotate!(io::Error::new(
203            io::ErrorKind::InvalidData,
204            "encountered a varint len that is larger than the \
205            max_len, and is possibly corrupt or was written with \
206            a different configuration.",
207        )));
208    }
209
210    let crc_expected = u32::from_le_bytes(crc_bytes);
211
212    let mut hasher = Hasher::new();
213
214    let mut copy_buf: [u8; 4096] = [0; 4096];
215
216    let mut remainder = data_len;
217    while remainder > 0 {
218        let bytes_to_copy = remainder.min(copy_buf.len());
219
220        fallible!(reader.read(&mut copy_buf[..bytes_to_copy]));
221        fallible!(writer.write_all(&copy_buf[..bytes_to_copy]));
222
223        hasher.update(&copy_buf[..bytes_to_copy]);
224
225        remainder -= bytes_to_copy;
226    }
227
228    //fallible!(reader.read_exact(&mut data_buf[..]));
229
230    // NB: only hash varint after we finish hashing data
231    hasher.update(&varint_buf[..varint_size]);
232
233    // We XOR one byte in the crc to make it non-zero
234    // for empty buffers, which forces bit flips to
235    // materialize in a crc mismatch more often.
236    let crc_actual = hasher.finalize() ^ 0xFF;
237
238    if crc_actual != crc_expected {
239        return Err(annotate!(io::Error::new(
240            io::ErrorKind::InvalidData,
241            format!(
242                "input buffer crc {} does not match expected crc {}",
243                crc_actual, crc_expected
244            ),
245        )));
246    }
247
248    Ok(data_len)
249}
250
251/// Read a frame out of the provided `File`
252#[cfg(unix)]
253pub fn read_frame_at(file: &fs::File, at: u64, max_len: usize) -> io::Result<Box<[u8]>> {
254    const FIRST_READ_SIZE: usize = 512;
255
256    let header = &mut [0; FIRST_READ_SIZE];
257
258    match maybe!(file.read_exact_at(header, at)) {
259        Ok(_) => {}
260        // it's OK if we do a short read because of CRC check
261        Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {}
262        Err(e) => return Err(e),
263    }
264
265    let (buf_len_u64, varint_len) = varint::deserialize(&header[VARINT_BEGIN..])?;
266
267    if buf_len_u64 > max_len as u64 {
268        return Err(annotate!(io::Error::new(
269            io::ErrorKind::InvalidData,
270            "encountered a varint len that is larger than the \
271            max_len, and is possibly corrupt or was written with \
272            a different configuration.",
273        )));
274    }
275
276    // at this point we know that the buffer len fits in a usize
277    let buf_len = usize::try_from(buf_len_u64).unwrap();
278
279    let mut buf = uninit_boxed_slice(buf_len);
280
281    let crc_expected = &header[CRC_BEGIN..CRC_END];
282
283    let varint_end = VARINT_BEGIN + varint_len;
284    let potential_inline_len = FIRST_READ_SIZE - varint_end;
285
286    let header_buf_len = potential_inline_len.min(buf_len);
287    let header_buf_begin = varint_end;
288    let header_buf_end = header_buf_begin + header_buf_len;
289
290    buf[..header_buf_len].copy_from_slice(&header[header_buf_begin..header_buf_end]);
291
292    let remainder_buf_begin = header_buf_len;
293
294    fallible!(file.read_exact_at(&mut buf[remainder_buf_begin..], at + FIRST_READ_SIZE as u64));
295
296    let crc_actual = hash(&buf, &header[VARINT_BEGIN..varint_end]);
297
298    if crc_actual != crc_expected {
299        return Err(annotate!(io::Error::new(
300            io::ErrorKind::InvalidData,
301            "input buffer crc does not match expected crc",
302        )));
303    }
304
305    Ok(buf)
306}
307
308fn hash(data_buf: &[u8], varint_buf: &[u8]) -> [u8; CRC_LEN] {
309    let mut hasher = Hasher::new();
310    hasher.update(data_buf);
311    hasher.update(varint_buf);
312
313    // We XOR one byte in the crc to make it non-zero
314    // for empty buffers, which forces bit flips to
315    // materialize in a crc mismatch more often.
316    (hasher.finalize() ^ 0xFF).to_le_bytes()
317}
318
319/// Return an array which contains the crc and varint for
320/// a given buffer, and a `usize` that is the length of
321/// the provided array which corresponds to the valid
322/// varint and crc. Returns an array instead of a Vec<u8>
323/// to avoid allocations.
324///
325/// # Examples
326///
327/// ```
328/// use crc_frame::frame_header;
329///
330/// let buf = b"12345";
331///
332/// let (header_buf, header_len) = frame_header(buf);
333///
334/// let mut out = vec![];
335/// out.extend_from_slice(&header_buf[..header_len]);
336/// out.extend_from_slice(buf);
337/// ```
338pub fn frame_header(buf: &[u8]) -> ([u8; MAX_HEADER_SIZE], usize) {
339    // write the buf len varint into the header buffer
340    let (varint_buf, varint_size) = varint::get_varint(buf.len() as u64);
341
342    let crc_bytes = hash(buf, &varint_buf[..varint_size]);
343
344    let mut header_buf = [0_u8; MAX_HEADER_SIZE];
345
346    // write crc
347    header_buf[CRC_BEGIN..CRC_END].copy_from_slice(&crc_bytes);
348
349    // write varint
350    let varint_end = VARINT_BEGIN + varint_size;
351    header_buf[VARINT_BEGIN..varint_end].copy_from_slice(&varint_buf[..varint_size]);
352
353    (header_buf, varint_end)
354}
355
356/// Reads a header out of an arbitrary buffer, checks the crc,
357/// and if the crc matches the corresponding bytes, returns
358/// the start and end offsets in the buffer for the inner
359/// bytes. The end offset of the buffer is also the end
360/// offset for this entire frame, so you may advance
361/// any cursors to this point for reading the next frame
362/// in a sequence of bytes etc...
363///
364/// # Examples
365///
366/// ```
367/// use crc_frame::{write_frame_into, parse_frame};
368///
369/// let data = b"12345";
370///
371/// let mut buf = vec![];
372///
373/// write_frame_into(&mut buf, data).unwrap();
374/// write_frame_into(&mut buf, data).unwrap();
375///
376/// let (begin_1, end_1) = parse_frame(&buf).unwrap();
377///
378/// assert_eq!(&buf[begin_1..end_1], data);
379///
380/// let (begin_2, end_2) = parse_frame(&buf[end_1..]).unwrap();
381///
382/// assert_eq!(&buf[begin_2..end_2], data);
383/// ```
384pub fn parse_frame(buf: &[u8]) -> io::Result<(usize, usize)> {
385    if buf.len() < VARINT_BEGIN + 1 {
386        return Err(annotate!(io::Error::new(
387            io::ErrorKind::InvalidData,
388            "encountered a buffer that is not even large enough to contain a CRC and minimal one-byte varint"
389        )));
390    }
391
392    let expected_crc: [u8; CRC_LEN] = buf[CRC_BEGIN..CRC_END].try_into().unwrap();
393
394    let (buf_len_u64, varint_len) = varint::deserialize(&buf[VARINT_BEGIN..])?;
395
396    let varint_end = VARINT_BEGIN + varint_len;
397    let data_begin = varint_end;
398
399    let data_len = if let Ok(data_len) = usize::try_from(buf_len_u64) {
400        data_len
401    } else {
402        return Err(annotate!(io::Error::new(
403            io::ErrorKind::InvalidData,
404            "encountered a corrupt varint len or this platform \
405            cannot represent the required data size as a usize"
406        )));
407    };
408
409    let data_end = data_begin + data_len;
410
411    if data_end > buf.len() {
412        return Err(annotate!(io::Error::new(
413            io::ErrorKind::InvalidData,
414            format!(
415                "encountered a corrupt varint len or an input \
416                buffer of size {} that does not contain the full \
417                frame of size {}",
418                buf.len(),
419                data_end
420            )
421        )));
422    }
423
424    let data_buf = &buf[data_begin..data_end];
425    let varint_buf = &buf[VARINT_BEGIN..varint_end];
426
427    let actual_crc = hash(data_buf, varint_buf);
428
429    if actual_crc != expected_crc {
430        return Err(annotate!(io::Error::new(
431            io::ErrorKind::InvalidData,
432            "input buffer crc does not match expected crc",
433        )));
434    }
435
436    Ok((data_begin, data_end))
437}