1pub mod varint;
9
10use std::collections::VecDeque;
11use std::fs;
12use std::io::{self, BufRead, Read, Write};
13#[cfg(unix)]
14use std::os::unix::fs::FileExt;
15
16use crc32fast::Hasher;
17use fault_injection::{annotate, fallible, maybe};
18
19const MAX_HEADER_SIZE: usize = 13;
20const CRC_LEN: usize = 4;
21const CRC_BEGIN: usize = 0;
22const CRC_END: usize = CRC_LEN;
23const VARINT_BEGIN: usize = CRC_END;
24
25pub fn write_frame_into<W: io::Write>(mut writer: W, buf: &[u8]) -> io::Result<usize> {
46 let (header_buf, header_end_offset) = frame_header(buf);
47
48 fallible!(writer.write_all(&header_buf[..header_end_offset]));
49 fallible!(writer.write_all(buf));
50
51 Ok(header_end_offset + buf.len())
52}
53
54pub struct Encoder<W: Write> {
59 inner: W,
60}
61
62impl<W: Write> Encoder<W> {
63 pub const fn new(inner: W) -> Encoder<W> {
64 Encoder { inner }
65 }
66}
67
68impl<W: Write> Write for Encoder<W> {
69 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
74 write_frame_into(&mut self.inner, buf)?;
75 Ok(buf.len())
76 }
77
78 fn flush(&mut self) -> io::Result<()> {
79 self.inner.flush()
80 }
81}
82
83pub struct Decoder<R: Read> {
90 inner: R,
91 buf: VecDeque<u8>,
93 capacity: usize,
94}
95
96impl<R: Read> Decoder<R> {
97 pub const fn new(inner: R) -> Decoder<R> {
98 Decoder {
99 inner,
100 buf: VecDeque::new(),
101 capacity: 128 * 1024,
102 }
103 }
104
105 pub fn with_capacity(capacity: usize, inner: R) -> Decoder<R> {
106 Decoder {
107 inner,
108 buf: VecDeque::with_capacity(capacity),
109 capacity,
110 }
111 }
112}
113
114impl<R: Read> Read for Decoder<R> {
115 fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
117 fallible!(self.fill_buf());
118
119 let bytes_copied = usize::try_from(io::copy(&mut self.buf, &mut buf)?).unwrap();
120
121 Ok(bytes_copied)
122 }
123}
124
125impl<R: Read> BufRead for Decoder<R> {
126 fn fill_buf(&mut self) -> io::Result<&[u8]> {
127 if self.buf.is_empty() {
128 fallible!(read_frame_from_reader_into_writer(
129 &mut self.inner,
130 &mut self.buf,
131 self.capacity
132 ));
133 }
134
135 let (l, r) = self.buf.as_slices();
136 assert!(r.is_empty());
137 Ok(l)
138 }
139
140 fn consume(&mut self, amt: usize) {
141 self.buf.drain(..amt);
142 }
143}
144
145#[cfg(unix)]
148pub fn write_frame_at(buf: &[u8], file: &fs::File, at: u64) -> io::Result<usize> {
149 let (header_buf, header_end_offset) = frame_header(buf);
150 let header = &header_buf[..header_end_offset];
151
152 fallible!(file.write_all_at(header, at));
153 fallible!(file.write_all_at(buf, at + header.len() as u64));
154
155 Ok(header_end_offset + buf.len())
156}
157
158fn uninit_boxed_slice(len: usize) -> Box<[u8]> {
159 use std::alloc::{alloc, Layout};
160
161 let layout = Layout::array::<u8>(len).unwrap();
162
163 unsafe {
164 let ptr = alloc(layout);
165 let slice = std::slice::from_raw_parts_mut(ptr, len);
166 Box::from_raw(slice)
167 }
168}
169
170pub fn read_frame_from_reader_into_writer<R: io::Read, W: io::Write>(
175 mut reader: R,
176 mut writer: W,
177 max_len: usize,
178) -> io::Result<usize> {
179 let mut crc_bytes = [0; 4];
180 let varint_buf = &mut [0; 9];
181
182 fallible!(reader.read_exact(&mut crc_bytes));
183 fallible!(reader.read_exact(&mut varint_buf[..1]));
184
185 let varint_size = varint::size_of_varint_from_first_byte(varint_buf[0]);
186
187 fallible!(reader.read_exact(&mut varint_buf[1..varint_size]));
188
189 let (buf_len_u64, _varint_len) = varint::deserialize(varint_buf)?;
190
191 let data_len = if let Ok(data_len) = usize::try_from(buf_len_u64) {
192 data_len
193 } else {
194 return Err(annotate!(io::Error::new(
195 io::ErrorKind::InvalidData,
196 "encountered a corrupt varint len or this platform \
197 cannot represent the required data size as a usize"
198 )));
199 };
200
201 if data_len > max_len {
202 return Err(annotate!(io::Error::new(
203 io::ErrorKind::InvalidData,
204 "encountered a varint len that is larger than the \
205 max_len, and is possibly corrupt or was written with \
206 a different configuration.",
207 )));
208 }
209
210 let crc_expected = u32::from_le_bytes(crc_bytes);
211
212 let mut hasher = Hasher::new();
213
214 let mut copy_buf: [u8; 4096] = [0; 4096];
215
216 let mut remainder = data_len;
217 while remainder > 0 {
218 let bytes_to_copy = remainder.min(copy_buf.len());
219
220 fallible!(reader.read(&mut copy_buf[..bytes_to_copy]));
221 fallible!(writer.write_all(©_buf[..bytes_to_copy]));
222
223 hasher.update(©_buf[..bytes_to_copy]);
224
225 remainder -= bytes_to_copy;
226 }
227
228 hasher.update(&varint_buf[..varint_size]);
232
233 let crc_actual = hasher.finalize() ^ 0xFF;
237
238 if crc_actual != crc_expected {
239 return Err(annotate!(io::Error::new(
240 io::ErrorKind::InvalidData,
241 format!(
242 "input buffer crc {} does not match expected crc {}",
243 crc_actual, crc_expected
244 ),
245 )));
246 }
247
248 Ok(data_len)
249}
250
251#[cfg(unix)]
253pub fn read_frame_at(file: &fs::File, at: u64, max_len: usize) -> io::Result<Box<[u8]>> {
254 const FIRST_READ_SIZE: usize = 512;
255
256 let header = &mut [0; FIRST_READ_SIZE];
257
258 match maybe!(file.read_exact_at(header, at)) {
259 Ok(_) => {}
260 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {}
262 Err(e) => return Err(e),
263 }
264
265 let (buf_len_u64, varint_len) = varint::deserialize(&header[VARINT_BEGIN..])?;
266
267 if buf_len_u64 > max_len as u64 {
268 return Err(annotate!(io::Error::new(
269 io::ErrorKind::InvalidData,
270 "encountered a varint len that is larger than the \
271 max_len, and is possibly corrupt or was written with \
272 a different configuration.",
273 )));
274 }
275
276 let buf_len = usize::try_from(buf_len_u64).unwrap();
278
279 let mut buf = uninit_boxed_slice(buf_len);
280
281 let crc_expected = &header[CRC_BEGIN..CRC_END];
282
283 let varint_end = VARINT_BEGIN + varint_len;
284 let potential_inline_len = FIRST_READ_SIZE - varint_end;
285
286 let header_buf_len = potential_inline_len.min(buf_len);
287 let header_buf_begin = varint_end;
288 let header_buf_end = header_buf_begin + header_buf_len;
289
290 buf[..header_buf_len].copy_from_slice(&header[header_buf_begin..header_buf_end]);
291
292 let remainder_buf_begin = header_buf_len;
293
294 fallible!(file.read_exact_at(&mut buf[remainder_buf_begin..], at + FIRST_READ_SIZE as u64));
295
296 let crc_actual = hash(&buf, &header[VARINT_BEGIN..varint_end]);
297
298 if crc_actual != crc_expected {
299 return Err(annotate!(io::Error::new(
300 io::ErrorKind::InvalidData,
301 "input buffer crc does not match expected crc",
302 )));
303 }
304
305 Ok(buf)
306}
307
308fn hash(data_buf: &[u8], varint_buf: &[u8]) -> [u8; CRC_LEN] {
309 let mut hasher = Hasher::new();
310 hasher.update(data_buf);
311 hasher.update(varint_buf);
312
313 (hasher.finalize() ^ 0xFF).to_le_bytes()
317}
318
319pub fn frame_header(buf: &[u8]) -> ([u8; MAX_HEADER_SIZE], usize) {
339 let (varint_buf, varint_size) = varint::get_varint(buf.len() as u64);
341
342 let crc_bytes = hash(buf, &varint_buf[..varint_size]);
343
344 let mut header_buf = [0_u8; MAX_HEADER_SIZE];
345
346 header_buf[CRC_BEGIN..CRC_END].copy_from_slice(&crc_bytes);
348
349 let varint_end = VARINT_BEGIN + varint_size;
351 header_buf[VARINT_BEGIN..varint_end].copy_from_slice(&varint_buf[..varint_size]);
352
353 (header_buf, varint_end)
354}
355
356pub fn parse_frame(buf: &[u8]) -> io::Result<(usize, usize)> {
385 if buf.len() < VARINT_BEGIN + 1 {
386 return Err(annotate!(io::Error::new(
387 io::ErrorKind::InvalidData,
388 "encountered a buffer that is not even large enough to contain a CRC and minimal one-byte varint"
389 )));
390 }
391
392 let expected_crc: [u8; CRC_LEN] = buf[CRC_BEGIN..CRC_END].try_into().unwrap();
393
394 let (buf_len_u64, varint_len) = varint::deserialize(&buf[VARINT_BEGIN..])?;
395
396 let varint_end = VARINT_BEGIN + varint_len;
397 let data_begin = varint_end;
398
399 let data_len = if let Ok(data_len) = usize::try_from(buf_len_u64) {
400 data_len
401 } else {
402 return Err(annotate!(io::Error::new(
403 io::ErrorKind::InvalidData,
404 "encountered a corrupt varint len or this platform \
405 cannot represent the required data size as a usize"
406 )));
407 };
408
409 let data_end = data_begin + data_len;
410
411 if data_end > buf.len() {
412 return Err(annotate!(io::Error::new(
413 io::ErrorKind::InvalidData,
414 format!(
415 "encountered a corrupt varint len or an input \
416 buffer of size {} that does not contain the full \
417 frame of size {}",
418 buf.len(),
419 data_end
420 )
421 )));
422 }
423
424 let data_buf = &buf[data_begin..data_end];
425 let varint_buf = &buf[VARINT_BEGIN..varint_end];
426
427 let actual_crc = hash(data_buf, varint_buf);
428
429 if actual_crc != expected_crc {
430 return Err(annotate!(io::Error::new(
431 io::ErrorKind::InvalidData,
432 "input buffer crc does not match expected crc",
433 )));
434 }
435
436 Ok((data_begin, data_end))
437}