pub mod varint;
use std::collections::VecDeque;
use std::fs;
use std::io::{self, BufRead, Read, Write};
#[cfg(unix)]
use std::os::unix::fs::FileExt;
use crc32fast::Hasher;
use fault_injection::{annotate, fallible, maybe};
const MAX_HEADER_SIZE: usize = 13;
const CRC_LEN: usize = 4;
const CRC_BEGIN: usize = 0;
const CRC_END: usize = CRC_LEN;
const VARINT_BEGIN: usize = CRC_END;
pub fn write_frame_into<W: io::Write>(mut writer: W, buf: &[u8]) -> io::Result<usize> {
let (header_buf, header_end_offset) = frame_header(buf);
fallible!(writer.write_all(&header_buf[..header_end_offset]));
fallible!(writer.write_all(buf));
Ok(header_end_offset + buf.len())
}
pub struct Encoder<W: Write> {
inner: W,
}
impl<W: Write> Encoder<W> {
pub const fn new(inner: W) -> Encoder<W> {
Encoder { inner }
}
}
impl<W: Write> Write for Encoder<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
write_frame_into(&mut self.inner, buf)?;
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
pub struct Decoder<R: Read> {
inner: R,
buf: VecDeque<u8>,
capacity: usize,
}
impl<R: Read> Decoder<R> {
pub const fn new(inner: R) -> Decoder<R> {
Decoder {
inner,
buf: VecDeque::new(),
capacity: 128 * 1024,
}
}
pub fn with_capacity(capacity: usize, inner: R) -> Decoder<R> {
Decoder {
inner,
buf: VecDeque::with_capacity(capacity),
capacity,
}
}
}
impl<R: Read> Read for Decoder<R> {
fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
fallible!(self.fill_buf());
let bytes_copied = usize::try_from(io::copy(&mut self.buf, &mut buf)?).unwrap();
Ok(bytes_copied)
}
}
impl<R: Read> BufRead for Decoder<R> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
if self.buf.is_empty() {
fallible!(read_frame_from_reader_into_writer(
&mut self.inner,
&mut self.buf,
self.capacity
));
}
let (l, r) = self.buf.as_slices();
assert!(r.is_empty());
Ok(l)
}
fn consume(&mut self, amt: usize) {
self.buf.drain(..amt);
}
}
#[cfg(unix)]
pub fn write_frame_at(buf: &[u8], file: &fs::File, at: u64) -> io::Result<usize> {
let (header_buf, header_end_offset) = frame_header(buf);
let header = &header_buf[..header_end_offset];
fallible!(file.write_all_at(header, at));
fallible!(file.write_all_at(buf, at + header.len() as u64));
Ok(header_end_offset + buf.len())
}
fn uninit_boxed_slice(len: usize) -> Box<[u8]> {
use std::alloc::{alloc, Layout};
let layout = Layout::array::<u8>(len).unwrap();
unsafe {
let ptr = alloc(layout);
let slice = std::slice::from_raw_parts_mut(ptr, len);
Box::from_raw(slice)
}
}
pub fn read_frame_from_reader_into_writer<R: io::Read, W: io::Write>(
mut reader: R,
mut writer: W,
max_len: usize,
) -> io::Result<usize> {
let mut crc_bytes = [0; 4];
let varint_buf = &mut [0; 9];
fallible!(reader.read_exact(&mut crc_bytes));
fallible!(reader.read_exact(&mut varint_buf[..1]));
let varint_size = varint::size_of_varint_from_first_byte(varint_buf[0]);
fallible!(reader.read_exact(&mut varint_buf[1..varint_size]));
let (buf_len_u64, _varint_len) = varint::deserialize(varint_buf)?;
let data_len = if let Ok(data_len) = usize::try_from(buf_len_u64) {
data_len
} else {
return Err(annotate!(io::Error::new(
io::ErrorKind::InvalidData,
"encountered a corrupt varint len or this platform \
cannot represent the required data size as a usize"
)));
};
if data_len > max_len {
return Err(annotate!(io::Error::new(
io::ErrorKind::InvalidData,
"encountered a varint len that is larger than the \
max_len, and is possibly corrupt or was written with \
a different configuration.",
)));
}
let crc_expected = u32::from_le_bytes(crc_bytes);
let mut hasher = Hasher::new();
let mut copy_buf: [u8; 4096] = [0; 4096];
let mut remainder = data_len;
while remainder > 0 {
let bytes_to_copy = remainder.min(copy_buf.len());
fallible!(reader.read(&mut copy_buf[..bytes_to_copy]));
fallible!(writer.write_all(©_buf[..bytes_to_copy]));
hasher.update(©_buf[..bytes_to_copy]);
remainder -= bytes_to_copy;
}
hasher.update(&varint_buf[..varint_size]);
let crc_actual = hasher.finalize() ^ 0xFF;
if crc_actual != crc_expected {
return Err(annotate!(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"input buffer crc {} does not match expected crc {}",
crc_actual, crc_expected
),
)));
}
Ok(data_len)
}
#[cfg(unix)]
pub fn read_frame_at(file: &fs::File, at: u64, max_len: usize) -> io::Result<Box<[u8]>> {
const FIRST_READ_SIZE: usize = 512;
let header = &mut [0; FIRST_READ_SIZE];
match maybe!(file.read_exact_at(header, at)) {
Ok(_) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {}
Err(e) => return Err(e),
}
let (buf_len_u64, varint_len) = varint::deserialize(&header[VARINT_BEGIN..])?;
if buf_len_u64 > max_len as u64 {
return Err(annotate!(io::Error::new(
io::ErrorKind::InvalidData,
"encountered a varint len that is larger than the \
max_len, and is possibly corrupt or was written with \
a different configuration.",
)));
}
let buf_len = usize::try_from(buf_len_u64).unwrap();
let mut buf = uninit_boxed_slice(buf_len);
let crc_expected = &header[CRC_BEGIN..CRC_END];
let varint_end = VARINT_BEGIN + varint_len;
let potential_inline_len = FIRST_READ_SIZE - varint_end;
let header_buf_len = potential_inline_len.min(buf_len);
let header_buf_begin = varint_end;
let header_buf_end = header_buf_begin + header_buf_len;
buf[..header_buf_len].copy_from_slice(&header[header_buf_begin..header_buf_end]);
let remainder_buf_begin = header_buf_len;
fallible!(file.read_exact_at(&mut buf[remainder_buf_begin..], at + FIRST_READ_SIZE as u64));
let crc_actual = hash(&buf, &header[VARINT_BEGIN..varint_end]);
if crc_actual != crc_expected {
return Err(annotate!(io::Error::new(
io::ErrorKind::InvalidData,
"input buffer crc does not match expected crc",
)));
}
Ok(buf)
}
fn hash(data_buf: &[u8], varint_buf: &[u8]) -> [u8; CRC_LEN] {
let mut hasher = Hasher::new();
hasher.update(data_buf);
hasher.update(varint_buf);
(hasher.finalize() ^ 0xFF).to_le_bytes()
}
pub fn frame_header(buf: &[u8]) -> ([u8; MAX_HEADER_SIZE], usize) {
let (varint_buf, varint_size) = varint::get_varint(buf.len() as u64);
let crc_bytes = hash(buf, &varint_buf[..varint_size]);
let mut header_buf = [0_u8; MAX_HEADER_SIZE];
header_buf[CRC_BEGIN..CRC_END].copy_from_slice(&crc_bytes);
let varint_end = VARINT_BEGIN + varint_size;
header_buf[VARINT_BEGIN..varint_end].copy_from_slice(&varint_buf[..varint_size]);
(header_buf, varint_end)
}
pub fn parse_frame(buf: &[u8]) -> io::Result<(usize, usize)> {
if buf.len() < VARINT_BEGIN + 1 {
return Err(annotate!(io::Error::new(
io::ErrorKind::InvalidData,
"encountered a buffer that is not even large enough to contain a CRC and minimal one-byte varint"
)));
}
let expected_crc: [u8; CRC_LEN] = buf[CRC_BEGIN..CRC_END].try_into().unwrap();
let (buf_len_u64, varint_len) = varint::deserialize(&buf[VARINT_BEGIN..])?;
let varint_end = VARINT_BEGIN + varint_len;
let data_begin = varint_end;
let data_len = if let Ok(data_len) = usize::try_from(buf_len_u64) {
data_len
} else {
return Err(annotate!(io::Error::new(
io::ErrorKind::InvalidData,
"encountered a corrupt varint len or this platform \
cannot represent the required data size as a usize"
)));
};
let data_end = data_begin + data_len;
if data_end > buf.len() {
return Err(annotate!(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"encountered a corrupt varint len or an input \
buffer of size {} that does not contain the full \
frame of size {}",
buf.len(),
data_end
)
)));
}
let data_buf = &buf[data_begin..data_end];
let varint_buf = &buf[VARINT_BEGIN..varint_end];
let actual_crc = hash(data_buf, varint_buf);
if actual_crc != expected_crc {
return Err(annotate!(io::Error::new(
io::ErrorKind::InvalidData,
"input buffer crc does not match expected crc",
)));
}
Ok((data_begin, data_end))
}