#![warn(rust_2018_idioms)]
use std::cmp;
use std::error::Error as StdError;
use std::fs::{File, OpenOptions};
use std::fmt;
use std::io;
use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
use std::mem;
use std::ops::{AddAssign, ShlAssign};
use std::sync::{Arc, Mutex, MutexGuard};
use std::path::Path;
use bytes::{BytesMut, BufMut};
use http::header::{HeaderName, HeaderValue};
use tao_log::{debug, warn};
use olio::fs::rc::{ReadPos, ReadSlice};
use body_image::{
BodyError, BodyImage, BodySink, Dialog, Encoding,
Epilog, Prolog, Recorded, RequestRecorded, Tunables
};
pub type Flaw = Box<dyn StdError + Send + Sync + 'static>;
mod compress;
pub use compress::{
CompressStrategy, Compression, EncodeWrapper,
GzipCompressStrategy, NoCompressStrategy,
};
use compress::DecodeWrapper;
#[cfg(feature = "brotli")]
pub use compress::BrotliCompressStrategy;
use std::convert::TryFrom;
pub const V2_HEAD_SIZE: usize = 54;
pub const V2_MAX_RECORD: u64 = 0xffff_ffff_ffff;
pub const V2_MAX_HBLOCK: usize = 0xf_ffff;
pub const V2_MAX_REQ_BODY: u64 = 0xff_ffff_ffff;
#[inline]
pub fn hname_meta_url() -> http::header::HeaderName {
static NAME: &str = "url";
HeaderName::from_static(NAME)
}
#[inline]
pub fn hname_meta_method() -> http::header::HeaderName {
static NAME: &str = "method";
HeaderName::from_static(NAME)
}
#[inline]
pub fn hname_meta_res_version() -> http::header::HeaderName {
static NAME: &str = "response-version";
HeaderName::from_static(NAME)
}
#[inline]
pub fn hname_meta_res_status() -> http::header::HeaderName {
static NAME: &str = "response-status";
HeaderName::from_static(NAME)
}
#[inline]
pub fn hname_meta_res_decoded() -> http::header::HeaderName {
static NAME: &str = "response-decoded";
HeaderName::from_static(NAME)
}
pub struct BarcFile {
path: Box<Path>,
write_lock: Mutex<Option<File>>,
}
pub struct BarcWriter<'a> {
guard: MutexGuard<'a, Option<File>>
}
pub struct BarcReader {
file: ReadPos,
}
#[derive(Debug)]
pub enum BarcError {
Body(BodyError),
Io(io::Error),
UnknownRecordType(u8),
UnknownCompression(u8),
DecoderUnsupported(Compression),
ReadIncompleteRecHead(usize),
ReadInvalidRecHead,
ReadInvalidRecHeadHex(u8),
InvalidHeader(Flaw),
IntoDialog(DialogConvertError),
_FutureProof
}
impl fmt::Display for BarcError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
BarcError::Body(ref be) =>
write!(f, "With Body; {}", be),
BarcError::Io(ref e) =>
write!(f, "{}", e),
BarcError::UnknownRecordType(b) =>
write!(f, "Unknown record type flag [{}]", b),
BarcError::UnknownCompression(b) =>
write!(f, "Unknown compression flag [{}]", b),
BarcError::DecoderUnsupported(c) =>
write!(f, "No decoder for {:?}. Enable the feature?", c),
BarcError::ReadIncompleteRecHead(l) =>
write!(f, "Incomplete record head, len {}", l),
BarcError::ReadInvalidRecHead =>
write!(f, "Invalid record head suffix"),
BarcError::ReadInvalidRecHeadHex(b) =>
write!(f, "Invalid record head hex digit [{}]", b),
BarcError::IntoDialog(ref dce) =>
write!(f, "Record to Dialog conversion; {}", dce),
BarcError::InvalidHeader(ref flaw) =>
write!(f, "Invalid header; {}", flaw),
BarcError::_FutureProof =>
unreachable!("Don't abuse the _FutureProof!")
}
}
}
impl StdError for BarcError {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
match *self {
BarcError::Body(ref be) => Some(be),
BarcError::Io(ref e) => Some(e),
BarcError::InvalidHeader(ref flaw) => Some(flaw.as_ref()),
BarcError::IntoDialog(ref dce) => Some(dce),
_ => None
}
}
}
impl From<io::Error> for BarcError {
fn from(err: io::Error) -> BarcError {
BarcError::Io(err)
}
}
impl From<BodyError> for BarcError {
fn from(err: BodyError) -> BarcError {
BarcError::Body(err)
}
}
impl From<DialogConvertError> for BarcError {
fn from(err: DialogConvertError) -> BarcError {
BarcError::IntoDialog(err)
}
}
#[derive(Debug)]
struct RecordHead {
len: u64,
rec_type: RecordType,
compress: Compression,
meta: usize,
req_h: usize,
req_b: u64,
res_h: usize,
}
#[derive(Clone, Debug, Default)]
pub struct Record {
pub rec_type: RecordType,
pub meta: http::HeaderMap,
pub req_headers: http::HeaderMap,
pub req_body: BodyImage,
pub res_headers: http::HeaderMap,
pub res_body: BodyImage,
}
pub trait MetaRecorded: Recorded {
fn rec_type(&self) -> RecordType;
fn meta(&self) -> &http::HeaderMap;
}
impl RequestRecorded for Record {
fn req_headers(&self) -> &http::HeaderMap { &self.req_headers }
fn req_body(&self) -> &BodyImage { &self.req_body }
}
impl Recorded for Record {
fn res_headers(&self) -> &http::HeaderMap { &self.res_headers }
fn res_body(&self) -> &BodyImage { &self.res_body }
}
impl MetaRecorded for Record {
fn rec_type(&self) -> RecordType { self.rec_type }
fn meta(&self) -> &http::HeaderMap { &self.meta }
}
impl TryFrom<Dialog> for Record {
type Error = BarcError;
fn try_from(dialog: Dialog) -> Result<Self, Self::Error> {
let (prolog, epilog) = dialog.explode();
let mut meta = http::HeaderMap::with_capacity(6);
let efn = &|e| BarcError::InvalidHeader(Flaw::from(e));
meta.append(
hname_meta_url(),
prolog.url.to_string().parse().map_err(efn)?
);
meta.append(
hname_meta_method(),
prolog.method.to_string().parse().map_err(efn)?
);
let v = format!("{:?}", epilog.version);
meta.append(
hname_meta_res_version(),
v.parse().map_err(efn)?
);
meta.append(
hname_meta_res_status(),
epilog.status.to_string().parse().map_err(efn)?
);
if !epilog.res_decoded.is_empty() {
let mut joined = String::with_capacity(30);
for e in epilog.res_decoded {
if !joined.is_empty() { joined.push_str(", "); }
joined.push_str(&e.to_string());
}
meta.append(
hname_meta_res_decoded(),
joined.parse().map_err(efn)?
);
}
Ok(Record {
rec_type: RecordType::Dialog,
meta,
req_headers: prolog.req_headers,
req_body: prolog.req_body,
res_headers: epilog.res_headers,
res_body: epilog.res_body,
})
}
}
#[derive(Debug)]
pub enum DialogConvertError {
NoMetaUrl,
InvalidUrl(http::uri::InvalidUri),
NoMetaMethod,
InvalidMethod(http::method::InvalidMethod),
NoMetaResVersion,
InvalidVersion(Vec<u8>),
NoMetaResStatus,
MalformedMetaResStatus,
InvalidStatusCode(http::status::InvalidStatusCode),
InvalidResDecoded(String),
_FutureProof
}
impl fmt::Display for DialogConvertError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
DialogConvertError::NoMetaUrl =>
write!(f, "No url meta header found"),
DialogConvertError::InvalidUrl(ref iub) =>
write!(f, "Invalid URI: {}", iub),
DialogConvertError::NoMetaMethod =>
write!(f, "No method meta header found"),
DialogConvertError::InvalidMethod(ref im) =>
write!(f, "Invalid HTTP Method: {}", im),
DialogConvertError::NoMetaResVersion =>
write!(f, "No response-version meta header found"),
DialogConvertError::InvalidVersion(ref bs) => {
if let Ok(s) = String::from_utf8(bs.clone()) {
write!(f, "Invalid HTTP Version: {}", s)
} else {
write!(f, "Invalid HTTP Version: {:x?}", bs)
}
}
DialogConvertError::NoMetaResStatus =>
write!(f, "No response-status meta header found"),
DialogConvertError::MalformedMetaResStatus =>
write!(f, "The response-status meta header is malformed"),
DialogConvertError::InvalidStatusCode(ref isc) =>
write!(f, "Invalid HTTP status code: {}", isc),
DialogConvertError::InvalidResDecoded(ref d) =>
write!(f, "Invalid response-decoded header value: {}", d),
DialogConvertError::_FutureProof =>
unreachable!("Don't abuse the _FutureProof!")
}
}
}
impl StdError for DialogConvertError {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
match *self {
DialogConvertError::InvalidUrl(ref iub) => Some(iub),
DialogConvertError::InvalidMethod(ref im) => Some(im),
DialogConvertError::InvalidStatusCode(ref isc) => Some(isc),
_ => None
}
}
}
impl TryFrom<Record> for Dialog {
type Error = DialogConvertError;
fn try_from(rec: Record) -> Result<Self, Self::Error> {
let url = if let Some(uv) = rec.meta.get(hname_meta_url()) {
http::Uri::try_from(uv.as_bytes())
.map_err(DialogConvertError::InvalidUrl)
} else {
Err(DialogConvertError::NoMetaUrl)
}?;
let method = if let Some(v) = rec.meta.get(hname_meta_method()) {
http::Method::from_bytes(v.as_bytes())
.map_err(DialogConvertError::InvalidMethod)
} else {
Err(DialogConvertError::NoMetaMethod)
}?;
let version = if let Some(v) = rec.meta.get(hname_meta_res_version()) {
let vb = v.as_bytes();
match vb {
b"HTTP/0.9" => http::Version::HTTP_09,
b"HTTP/1.0" => http::Version::HTTP_10,
b"HTTP/1.1" => http::Version::HTTP_11,
b"HTTP/2.0" => http::Version::HTTP_2,
_ => {
return Err(DialogConvertError::InvalidVersion(vb.to_vec()));
}
}
} else {
return Err(DialogConvertError::NoMetaResVersion);
};
let status = if let Some(v) = rec.meta.get(hname_meta_res_status()) {
let vbs = v.as_bytes();
if vbs.len() >= 3 {
http::StatusCode::from_bytes(&vbs[0..3])
.map_err(DialogConvertError::InvalidStatusCode)
} else {
Err(DialogConvertError::MalformedMetaResStatus)
}
} else {
Err(DialogConvertError::NoMetaResStatus)
}?;
let res_decoded = if let Some(v) = rec.meta.get(hname_meta_res_decoded()) {
if let Ok(dcds) = v.to_str() {
let mut encodes = Vec::with_capacity(4);
for enc in dcds.split(',') {
let enc = enc.trim();
encodes.push(match enc {
"chunked" => Encoding::Chunked,
"deflate" => Encoding::Deflate,
"gzip" => Encoding::Gzip,
"br" => Encoding::Brotli,
_ => {
return Err(DialogConvertError::InvalidResDecoded(
enc.to_string()
));
}
})
}
encodes
} else {
return Err(DialogConvertError::InvalidResDecoded(
format!("{:x?}", v.as_bytes())
));
}
} else {
Vec::with_capacity(0)
};
Ok(Dialog::new(
Prolog {
method,
url,
req_headers: rec.req_headers,
req_body: rec.req_body
},
Epilog {
version,
status,
res_decoded,
res_headers: rec.res_headers,
res_body: rec.res_body
}
))
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum RecordType {
Reserved,
Dialog,
}
impl RecordType {
fn flag(self) -> char {
match self {
RecordType::Reserved => 'R',
RecordType::Dialog => 'D',
}
}
fn from_byte(f: u8) -> Result<Self, BarcError> {
match f {
b'R' => Ok(RecordType::Reserved),
b'D' => Ok(RecordType::Dialog),
_ => Err(BarcError::UnknownRecordType(f))
}
}
}
impl Default for RecordType {
fn default() -> RecordType { RecordType::Dialog }
}
const CRLF: &[u8] = b"\r\n";
const WITH_CRLF: bool = true;
const NO_CRLF: bool = false;
const V2_RESERVE_HEAD: RecordHead = RecordHead {
len: 0,
rec_type: RecordType::Reserved,
compress: Compression::Plain,
meta: 0,
req_h: 0,
req_b: 0,
res_h: 0
};
impl BarcFile {
pub fn new<P>(path: P) -> BarcFile
where P: AsRef<Path>
{
let path: Box<Path> = path.as_ref().into();
let write_lock = Mutex::new(None);
BarcFile { path, write_lock }
}
pub fn writer(&self) -> Result<BarcWriter<'_>, BarcError> {
let mut guard = self.write_lock.lock().unwrap();
if (*guard).is_none() {
let file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.open(&self.path)?;
*guard = Some(file);
}
Ok(BarcWriter { guard })
}
pub fn reader(&self) -> Result<BarcReader, BarcError> {
let file = OpenOptions::new()
.read(true)
.open(&self.path)?;
Ok(BarcReader { file: ReadPos::new(Arc::new(file), 0) })
}
}
impl<'a> BarcWriter<'a> {
pub fn write(
&mut self,
rec: &dyn MetaRecorded,
strategy: &dyn CompressStrategy)
-> Result<u64, BarcError>
{
let file: &mut File = self.guard.as_mut().unwrap();
let start = file.seek(SeekFrom::End(0))?;
write_record_head(file, &V2_RESERVE_HEAD)?;
file.flush()?;
let mut head = write_record(file, rec, strategy)?;
let end = file.seek(SeekFrom::Current(0))?;
let orig_len = head.len;
assert!(end >= (start + (V2_HEAD_SIZE as u64)));
head.len = end - start - (V2_HEAD_SIZE as u64);
if head.compress == Compression::Plain {
assert_eq!(orig_len, head.len);
} else if orig_len < head.len {
warn!("Compression *increased* record size from \
{} to {} bytes",
orig_len, head.len);
}
file.seek(SeekFrom::Start(start))?;
write_record_head(file, &head)?;
file.seek(SeekFrom::End(0))?;
file.flush()?;
Ok(start)
}
}
fn write_record(
file: &File,
rec: &dyn MetaRecorded,
strategy: &dyn CompressStrategy)
-> Result<RecordHead, BarcError>
{
let mut encoder = strategy.wrap_encoder(rec, file)?;
let fout = &mut encoder;
let compress = fout.mode();
let with_crlf = compress == Compression::Plain;
let head = {
let meta = write_headers(fout, with_crlf, rec.meta())?;
let req_h = write_headers(fout, with_crlf, rec.req_headers())?;
let req_b = write_body(fout, with_crlf, rec.req_body())?;
let res_h = write_headers(fout, with_crlf, rec.res_headers())?;
let mut len: u64 = (meta + req_h + res_h) as u64 + req_b;
assert!((len + rec.res_body().len() + 2) <= V2_MAX_RECORD,
"body exceeds size limit");
let res_b = write_body(fout, with_crlf, rec.res_body())?;
len += res_b;
RecordHead {
len,
rec_type: rec.rec_type(),
compress,
meta,
req_h,
req_b,
res_h }
};
encoder.finish()?;
Ok(head)
}
fn write_record_head<W>(out: &mut W, head: &RecordHead)
-> Result<(), BarcError>
where W: Write + ?Sized
{
assert!(head.len <= V2_MAX_RECORD, "len exceeded");
assert!(head.meta <= V2_MAX_HBLOCK, "meta exceeded");
assert!(head.req_h <= V2_MAX_HBLOCK, "req_h exceeded");
assert!(head.req_b <= V2_MAX_REQ_BODY, "req_b exceeded");
assert!(head.res_h <= V2_MAX_HBLOCK, "res_h exceeded");
let size = write_all_len(out, format!(
"BARC2 {:012x} {}{} {:05x} {:05x} {:010x} {:05x}\r\n\r\n",
head.len, head.rec_type.flag(), head.compress.flag(),
head.meta, head.req_h, head.req_b, head.res_h
).as_bytes())?;
assert_eq!(size, V2_HEAD_SIZE, "wrong record head size");
Ok(())
}
pub fn write_headers<W>(
out: &mut W,
with_crlf: bool,
headers: &http::HeaderMap)
-> Result<usize, BarcError>
where W: Write + ?Sized
{
let mut size = 0;
for (key, value) in headers.iter() {
size += write_all_len(out, key.as_ref())?;
size += write_all_len(out, b": ")?;
size += write_all_len(out, value.as_bytes())?;
size += write_all_len(out, CRLF)?;
}
if with_crlf && size > 0 {
size += write_all_len(out, CRLF)?;
}
assert!(size <= V2_MAX_HBLOCK);
Ok(size)
}
pub fn write_body<W>(out: &mut W, with_crlf: bool, body: &BodyImage)
-> Result<u64, BarcError>
where W: Write + ?Sized
{
let mut size = body.write_to(out)?;
if with_crlf && size > 0 {
size += write_all_len(out, CRLF)? as u64;
}
Ok(size)
}
fn write_all_len<W>(out: &mut W, bs: &[u8]) -> Result<usize, BarcError>
where W: Write + ?Sized
{
out.write_all(bs)?;
Ok(bs.len())
}
impl BarcReader {
pub fn read(&mut self, tune: &Tunables)
-> Result<Option<Record>, BarcError>
{
let fin = &mut self.file;
let start = fin.tell();
let rhead = match read_record_head(fin) {
Ok(Some(rh)) => rh,
Ok(None) => return Ok(None),
Err(e) => return Err(e)
};
let rec_type = rhead.rec_type;
if rec_type == RecordType::Reserved {
fin.seek(SeekFrom::Start(start))?;
return Ok(None);
}
if rhead.compress != Compression::Plain {
let end = fin.tell() + rhead.len;
let rec = read_compressed(
fin.subslice(fin.tell(), end), &rhead, tune
)?;
fin.seek(SeekFrom::Start(end))?;
return Ok(Some(rec))
}
let meta = read_headers(fin, WITH_CRLF, rhead.meta)?;
let req_headers = read_headers(fin, WITH_CRLF, rhead.req_h)?;
let req_body = if rhead.req_b <= tune.max_body_ram() {
read_body_ram(fin, WITH_CRLF, rhead.req_b as usize)
} else {
slice_body(fin, rhead.req_b)
}?;
let res_headers = read_headers(fin, WITH_CRLF, rhead.res_h)?;
let body_len = rhead.len - (fin.tell() - start - (V2_HEAD_SIZE as u64));
let res_body = if body_len <= tune.max_body_ram() {
read_body_ram(fin, WITH_CRLF, body_len as usize)
} else {
slice_body(fin, body_len)
}?;
Ok(Some(Record { rec_type, meta, req_headers, req_body,
res_headers, res_body }))
}
pub fn offset(&self) -> u64 {
self.file.tell()
}
pub fn seek(&mut self, offset: u64) -> Result<(), BarcError> {
self.file.seek(SeekFrom::Start(offset))?;
Ok(())
}
}
fn read_compressed(rslice: ReadSlice, rhead: &RecordHead, tune: &Tunables)
-> Result<Record, BarcError>
{
let mut wrapper = DecodeWrapper::new(
rhead.compress,
rslice,
tune.buffer_size_ram())?;
let fin = &mut wrapper;
let rec_type = rhead.rec_type;
let meta = read_headers(fin, NO_CRLF, rhead.meta)?;
let req_headers = read_headers(fin, NO_CRLF, rhead.req_h)?;
let req_body = if rhead.req_b <= tune.max_body_ram() {
read_body_ram(fin, NO_CRLF, rhead.req_b as usize)?
} else {
read_body_fs(fin, rhead.req_b, tune)?
};
let res_headers = read_headers(fin, NO_CRLF, rhead.res_h)?;
let res_body = BodyImage::read_from(fin, 4096, tune)?;
Ok(Record { rec_type, meta, req_headers, req_body, res_headers, res_body })
}
fn read_record_head<R>(rin: &mut R)
-> Result<Option<RecordHead>, BarcError>
where R: Read + ?Sized
{
let mut buf = [0u8; V2_HEAD_SIZE];
let size = read_record_head_buf(rin, &mut buf)?;
if size == 0 {
return Ok(None);
}
if size != V2_HEAD_SIZE {
return Err(BarcError::ReadIncompleteRecHead(size));
}
if &buf[0..6] != b"BARC2 " {
return Err(BarcError::ReadInvalidRecHead);
}
let len = parse_hex(&buf[6..18])?;
let rec_type = RecordType::from_byte(buf[19])?;
let compress = Compression::from_byte(buf[20])?;
let meta = parse_hex(&buf[22..27])?;
let req_h = parse_hex(&buf[28..33])?;
let req_b = parse_hex(&buf[34..44])?;
let res_h = parse_hex(&buf[45..50])?;
Ok(Some(RecordHead { len, rec_type, compress, meta, req_h, req_b, res_h }))
}
fn read_record_head_buf<R>(rin: &mut R, mut buf: &mut [u8])
-> Result<usize, BarcError>
where R: Read + ?Sized
{
let mut size = 0;
loop {
match rin.read(buf) {
Ok(0) => break,
Ok(n) => {
size += n;
if size >= V2_HEAD_SIZE {
break;
}
let t = buf;
buf = &mut t[n..];
}
Err(e) => {
if e.kind() == ErrorKind::Interrupted {
continue;
} else {
return Err(e.into());
}
}
}
}
Ok(size)
}
fn parse_hex<T>(buf: &[u8]) -> Result<T, BarcError>
where T: AddAssign<T> + From<u8> + ShlAssign<u8>
{
let mut v = T::from(0u8);
for d in buf {
v <<= 4u8;
if *d >= b'0' && *d <= b'9' {
v += T::from(*d - b'0');
} else if *d >= b'a' && *d <= b'f' {
v += T::from(10 + (*d - b'a'));
} else {
return Err(BarcError::ReadInvalidRecHeadHex(*d));
}
}
Ok(v)
}
fn read_headers<R>(rin: &mut R, with_crlf: bool, len: usize)
-> Result<http::HeaderMap, BarcError>
where R: Read + ?Sized
{
if len == 0 {
return Ok(http::HeaderMap::with_capacity(0));
}
assert!(len > 2);
let tlen = if with_crlf { len } else { len + 2 };
let mut buf = BytesMut::with_capacity(tlen);
unsafe {
let b = &mut *(
buf.chunk_mut() as *mut _
as *mut [mem::MaybeUninit<u8>]
as *mut [u8]
);
rin.read_exact(&mut b[..len])?;
buf.advance_mut(len);
}
if !with_crlf {
buf.put_slice(CRLF)
}
parse_headers(&buf[..])
}
fn parse_headers(buf: &[u8]) -> Result<http::HeaderMap, BarcError> {
let mut headbuf = [httparse::EMPTY_HEADER; 128];
match httparse::parse_headers(buf, &mut headbuf) {
Ok(httparse::Status::Complete((size, heads))) => {
let mut hmap = http::HeaderMap::with_capacity(heads.len());
assert_eq!(size, buf.len());
for h in heads {
let name = h.name.parse::<HeaderName>()
.map_err(|e| BarcError::InvalidHeader(e.into()))?;
let value = HeaderValue::from_bytes(h.value)
.map_err(|e| BarcError::InvalidHeader(e.into()))?;
hmap.append(name, value);
}
Ok(hmap)
}
Ok(httparse::Status::Partial) => {
Err(BarcError::InvalidHeader(
Box::new(httparse::Error::TooManyHeaders)
))
}
Err(e) => Err(BarcError::InvalidHeader(e.into()))
}
}
fn read_body_ram<R>(rin: &mut R, with_crlf: bool, len: usize)
-> Result<BodyImage, BarcError>
where R: Read + ?Sized
{
if len == 0 {
return Ok(BodyImage::empty());
}
assert!(!with_crlf || len > 2);
let mut buf = BytesMut::with_capacity(len);
unsafe {
let b = &mut *(
buf.chunk_mut() as *mut _
as *mut [mem::MaybeUninit<u8>]
as *mut [u8]
);
rin.read_exact(&mut b[..len])?;
let l = if with_crlf { len - 2 } else { len };
buf.advance_mut(l);
}
Ok(BodyImage::from_slice(buf.freeze()))
}
fn read_body_fs<R>(rin: &mut R, len: u64, tune: &Tunables)
-> Result<BodyImage, BarcError>
where R: Read + ?Sized
{
if len == 0 {
return Ok(BodyImage::empty());
}
let mut body = BodySink::with_fs(tune.temp_dir())?;
let mut buf = BytesMut::with_capacity(tune.buffer_size_fs());
loop {
let rlen = {
let b = unsafe { &mut *(
buf.chunk_mut() as *mut _
as *mut [mem::MaybeUninit<u8>]
as *mut [u8]
)};
let limit = cmp::min(b.len() as u64, len - body.len()) as usize;
assert!(limit > 0);
match rin.read(&mut b[..limit]) {
Ok(l) => l,
Err(e) => {
if e.kind() == ErrorKind::Interrupted {
continue;
} else {
return Err(e.into());
}
}
}
};
if rlen == 0 {
break;
}
unsafe { buf.advance_mut(rlen); }
debug!("Write (Fs) buffer len {}", rlen);
body.write_all(&buf)?;
if body.len() < len {
buf.clear();
} else {
assert_eq!(body.len(), len);
break;
}
}
let body = body.prepare()?;
Ok(body)
}
fn slice_body(rp: &mut ReadPos, len: u64) -> Result<BodyImage, BarcError> {
assert!(len > 2);
let offset = rp.tell();
rp.seek(SeekFrom::Current(len as i64))?;
let rslice = rp.subslice(offset, offset + len - 2);
#[allow(unused_unsafe)]
{
Ok(unsafe { BodyImage::from_read_slice(rslice) })
}
}
#[cfg(test)]
mod barc_tests {
use std::convert::TryInto;
use std::fs;
use std::mem::size_of;
use std::path::{Path, PathBuf};
use http::header::{AGE, REFERER, VIA};
use super::*;
use body_image::Tuner;
use piccolog::test_logger;
use tao_log::debugv;
fn barc_test_file(name: &str) -> Result<PathBuf, Flaw> {
let target = env!("CARGO_MANIFEST_DIR");
let path = format!("{}/../target/testmp", target);
let tpath = Path::new(&path);
fs::create_dir_all(tpath)?;
let fname = tpath.join(name);
if fname.exists() {
fs::remove_file(&fname)?;
}
Ok(fname)
}
fn is_flaw(_f: Flaw) -> bool { true }
fn is_barc_error(e: BarcError) -> bool {
assert!(is_flaw(e.into()));
true
}
#[test]
fn test_barc_error_as_flaw() {
assert!(is_barc_error(BarcError::ReadInvalidRecHead));
assert!(is_barc_error(DialogConvertError::NoMetaUrl.into()));
}
#[test]
fn test_barc_error_sizes() {
assert!(test_logger());
assert!(debugv!(size_of::<BarcError>()) <= 40);
assert!(debugv!(size_of::<DialogConvertError>()) <= 32);
}
#[test]
fn test_write_read_small() {
let fname = barc_test_file("small.barc").unwrap();
let strategy = NoCompressStrategy::default();
write_read_small(&fname, &strategy).unwrap();
}
#[test]
fn test_write_read_small_gzip() {
assert!(test_logger());
let fname = barc_test_file("small_gzip.barc").unwrap();
let strategy = GzipCompressStrategy::default().set_min_len(0);
write_read_small(&fname, &strategy).unwrap();
assert_compression(&fname, Compression::Gzip);
}
#[cfg(feature = "brotli")]
#[test]
fn test_write_read_small_brotli() {
assert!(test_logger());
let fname = barc_test_file("small_brotli.barc").unwrap();
let strategy = BrotliCompressStrategy::default().set_min_len(0);
write_read_small(&fname, &strategy).unwrap();
assert_compression(&fname, Compression::Brotli);
}
fn write_read_small(fname: &PathBuf, strategy: &dyn CompressStrategy)
-> Result<(), Flaw>
{
let bfile = BarcFile::new(fname);
let req_body_str = "REQUEST BODY";
let res_body_str = "RESPONSE BODY";
let rec_type = RecordType::Dialog;
let mut meta = http::HeaderMap::new();
meta.insert(AGE, "0".parse()?);
let mut req_headers = http::HeaderMap::new();
req_headers.insert(REFERER, "http:://other.com".parse()?);
req_headers.insert(
http::header::CONTENT_TYPE,
"text/plain".parse().unwrap()
);
let req_body = BodyImage::from_slice(req_body_str);
let mut res_headers = http::HeaderMap::new();
res_headers.insert(VIA, "test".parse()?);
res_headers.insert(
http::header::CONTENT_TYPE,
"text/plain".parse().unwrap()
);
let res_body = BodyImage::from_slice(res_body_str);
let mut writer = bfile.writer()?;
assert!(fname.exists());
writer.write(&Record { rec_type, meta,
req_headers, req_body,
res_headers, res_body },
strategy)?;
let tune = Tunables::new();
let mut reader = bfile.reader()?;
let record = debugv!(reader.read(&tune))?.unwrap();
assert_eq!(record.rec_type, RecordType::Dialog);
assert_eq!(record.meta.len(), 1);
assert_eq!(record.req_headers.len(), 2);
assert_eq!(record.req_body.len(), req_body_str.len() as u64);
assert_eq!(record.res_headers.len(), 2);
assert_eq!(record.res_body.len(), res_body_str.len() as u64);
let record = reader.read(&tune)?;
assert!(record.is_none());
Ok(())
}
#[test]
fn test_write_read_empty_record() {
assert!(test_logger());
let fname = barc_test_file("empty_record.barc").unwrap();
let strategy = NoCompressStrategy::default();
write_read_empty_record(&fname, &strategy).unwrap();
}
#[test]
fn test_write_read_empty_record_gzip() {
assert!(test_logger());
let fname = barc_test_file("empty_record_gzip.barc").unwrap();
let strategy = GzipCompressStrategy::default().set_min_len(1);
write_read_empty_record(&fname, &strategy).unwrap();
assert_compression(&fname, Compression::Plain);
}
#[cfg(feature = "brotli")]
#[test]
fn test_write_read_empty_record_brotli() {
assert!(test_logger());
let fname = barc_test_file("empty_record_brotli.barc").unwrap();
let strategy = BrotliCompressStrategy::default().set_min_len(1);
write_read_empty_record(&fname, &strategy).unwrap();
assert_compression(&fname, Compression::Plain);
}
fn write_read_empty_record(fname: &PathBuf, strategy: &dyn CompressStrategy)
-> Result<(), Flaw>
{
assert!(test_logger());
let bfile = BarcFile::new(fname);
let mut writer = bfile.writer()?;
writer.write(&Record::default(), strategy)?;
let tune = Tunables::new();
let mut reader = bfile.reader()?;
let record = debugv!(reader.read(&tune))?.unwrap();
assert_eq!(record.rec_type, RecordType::Dialog);
assert_eq!(record.meta.len(), 0);
assert_eq!(record.req_headers.len(), 0);
assert_eq!(record.req_body.len(), 0);
assert_eq!(record.res_headers.len(), 0);
assert_eq!(record.res_body.len(), 0);
assert!(reader.read(&tune)?.is_none());
Ok(())
}
#[test]
fn test_write_read_large() {
assert!(test_logger());
let fname = barc_test_file("large.barc").unwrap();
let strategy = NoCompressStrategy::default();
write_read_large(&fname, &strategy).unwrap();
}
#[test]
fn test_write_read_large_gzip() {
assert!(test_logger());
let fname = barc_test_file("large_gzip.barc").unwrap();
let strategy = GzipCompressStrategy::default().set_min_len(0xa359b);
write_read_large(&fname, &strategy).unwrap();
assert_compression(&fname, Compression::Gzip);
}
#[test]
fn test_write_read_large_gzip_0() {
assert!(test_logger());
let fname = barc_test_file("large_gzip_0.barc").unwrap();
let strategy = GzipCompressStrategy::default().set_compression_level(0);
write_read_large(&fname, &strategy).unwrap();
assert_compression(&fname, Compression::Gzip);
}
#[cfg(feature = "brotli")]
#[test]
fn test_write_read_large_brotli() {
assert!(test_logger());
let fname = barc_test_file("large_brotli.barc").unwrap();
let strategy = BrotliCompressStrategy::default().set_min_len(0xa359b);
write_read_large(&fname, &strategy).unwrap();
assert_compression(&fname, Compression::Brotli);
}
fn assert_compression(fname: &PathBuf, comp: Compression) {
let mut file = File::open(fname).unwrap();
let rhead = read_record_head(&mut file).unwrap().unwrap();
assert_eq!(rhead.compress, comp);
}
fn write_read_large(fname: &PathBuf, strategy: &dyn CompressStrategy)
-> Result<(), Flaw>
{
let bfile = BarcFile::new(fname);
let mut writer = bfile.writer()?;
let lorem_ipsum =
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, \
sed do eiusmod tempor incididunt ut labore et dolore magna \
aliqua. Ut enim ad minim veniam, quis nostrud exercitation \
ullamco laboris nisi ut aliquip ex ea commodo \
consequat. Duis aute irure dolor in reprehenderit in \
voluptate velit esse cillum dolore eu fugiat nulla \
pariatur. Excepteur sint occaecat cupidatat non proident, \
sunt in culpa qui officia deserunt mollit anim id est \
laborum. ";
let req_reps = 500;
let res_reps = 1_000;
let mut req_body = BodySink::with_ram_buffers(req_reps);
for _ in 0..req_reps {
req_body.push(lorem_ipsum)?;
}
let req_body = req_body.prepare()?;
let mut res_body = BodySink::with_ram_buffers(res_reps);
for _ in 0..res_reps {
res_body.push(lorem_ipsum)?;
}
let res_body = res_body.prepare()?;
let mut res_headers = http::HeaderMap::default();
res_headers.insert(http::header::CONTENT_TYPE, "text/plain".parse()?);
let mut req_headers = res_headers.clone();
req_headers.insert(http::header::USER_AGENT,
"barc large tester".parse()?);
let mut meta = http::HeaderMap::default();
meta.insert(hname_meta_res_decoded(), "identity".parse()?);
writer.write(
&Record { req_body, req_headers, res_body, res_headers, meta,
..Record::default() },
strategy)?;
let tune = Tunables::new();
let mut reader = bfile.reader()?;
let record = debugv!(reader.read(&tune))?.unwrap();
assert_eq!(record.rec_type, RecordType::Dialog);
assert_eq!(record.meta.len(), 1);
assert_eq!(record.req_headers.len(), 2);
assert_eq!(record.req_body.len(),
(lorem_ipsum.len() * req_reps) as u64);
assert_eq!(record.res_headers.len(), 1);
assert_eq!(record.res_body.len(),
(lorem_ipsum.len() * res_reps) as u64);
assert!(reader.read(&tune)?.is_none());
Ok(())
}
#[test]
fn test_write_read_parallel() {
assert!(test_logger());
let fname = barc_test_file("parallel.barc").unwrap();
let bfile = BarcFile::new(&fname);
{
let mut _writer = bfile.writer().unwrap();
}
let res_body_str = "RESPONSE BODY";
let tune = Tunables::new();
let mut reader = bfile.reader().unwrap();
let record = reader.read(&tune).unwrap();
assert!(record.is_none());
let mut writer = bfile.writer().unwrap();
let res_body = BodyImage::from_slice(res_body_str);
let offset = writer.write(&Record {
res_body, ..Record::default() },
&NoCompressStrategy::default()).unwrap();
assert_eq!(offset, 0);
reader.seek(offset).unwrap();
let record = debugv!(reader.read(&tune)).unwrap().unwrap();
assert_eq!(record.rec_type, RecordType::Dialog);
assert_eq!(record.meta.len(), 0);
assert_eq!(record.req_headers.len(), 0);
assert_eq!(record.req_body.len(), 0);
assert_eq!(record.res_headers.len(), 0);
assert_eq!(record.res_body.len(), res_body_str.len() as u64);
let record = reader.read(&tune).unwrap();
assert!(record.is_none());
writer.write(&Record::default(),
&NoCompressStrategy::default()).unwrap();
let record = reader.read(&tune).unwrap().unwrap();
assert_eq!(record.rec_type, RecordType::Dialog);
assert_eq!(record.res_body.len(), 0);
let record = reader.read(&tune).unwrap();
assert!(record.is_none());
}
#[test]
fn test_read_sample() {
assert!(test_logger());
let tune = Tunables::new();
let bfile = BarcFile::new("sample/example.barc");
let mut reader = bfile.reader().unwrap();
let record = debugv!(reader.read(&tune)).unwrap().unwrap();
assert_eq!(record.rec_type, RecordType::Dialog);
assert_eq!(record.meta.len(), 5);
assert_eq!(record.req_headers.len(), 4);
assert!(record.req_body.is_empty());
assert_eq!(record.res_headers.len(), 11);
assert!(record.res_body.is_ram());
let mut br = record.res_body.reader();
let mut buf = Vec::with_capacity(2048);
br.read_to_end(&mut buf).unwrap();
assert_eq!(buf.len(), 1270);
assert_eq!(&buf[0..15], b"<!doctype html>");
assert_eq!(&buf[(buf.len()-8)..], b"</html>\n");
let record = reader.read(&tune).unwrap();
assert!(record.is_none());
}
#[test]
fn test_record_convert_dialog() {
let tune = Tunables::new();
let bfile = BarcFile::new("sample/example.barc");
let mut reader = bfile.reader().unwrap();
let rc1 = reader.read(&tune).unwrap().unwrap();
let dl: Dialog = rc1.clone().try_into().unwrap();
let rc2: Record = dl.try_into().unwrap();
assert_eq!(rc1.rec_type, rc2.rec_type);
assert_eq!(rc1.meta, rc2.meta);
assert_eq!(rc1.req_headers, rc2.req_headers);
assert_eq!(rc1.req_body.len(), rc2.req_body.len());
assert_eq!(rc1.res_headers, rc2.res_headers);
assert_eq!(rc1.res_body.len(), rc2.res_body.len());
}
#[test]
fn test_record_convert_dialog_204() {
let tune = Tunables::new();
let bfile = BarcFile::new("sample/204_no_body.barc");
let mut reader = bfile.reader().unwrap();
let rc1 = reader.read(&tune).unwrap().unwrap();
let dl: Dialog = rc1.clone().try_into().unwrap();
let rc2: Record = dl.try_into().unwrap();
assert_eq!(rc1.rec_type, rc2.rec_type);
assert_eq!(rc1.meta, rc2.meta);
assert_eq!(rc1.req_headers, rc2.req_headers);
assert_eq!(rc1.req_body.len(), rc2.req_body.len());
assert_eq!(rc1.res_headers, rc2.res_headers);
assert_eq!(rc1.res_body.len(), rc2.res_body.len());
}
#[test]
fn test_read_sample_larger() {
assert!(test_logger());
let record = {
let tune = Tuner::new()
.set_max_body_ram(1024)
.finish();
let bfile = BarcFile::new("sample/example.barc");
let mut reader = bfile.reader().unwrap();
let r = reader.read(&tune).unwrap().unwrap();
let next = reader.read(&tune).unwrap();
assert!(next.is_none());
r
};
debugv!(&record);
assert!(!record.res_body.is_ram());
let mut br = record.res_body.reader();
let mut buf = Vec::with_capacity(2048);
br.read_to_end(&mut buf).unwrap();
assert_eq!(buf.len(), 1270);
assert_eq!(&buf[0..15], b"<!doctype html>");
assert_eq!(&buf[(buf.len()-8)..], b"</html>\n");
}
#[cfg(feature = "mmap")]
#[test]
fn test_read_sample_mapped() {
assert!(test_logger());
let mut record = {
let tune = Tuner::new()
.set_max_body_ram(1024)
.finish();
let bfile = BarcFile::new("sample/example.barc");
let mut reader = bfile.reader().unwrap();
let r = reader.read(&tune).unwrap().unwrap();
let next = reader.read(&tune).unwrap();
assert!(next.is_none());
r
};
record.res_body.mem_map().unwrap();
debugv!(&record);
assert!(!record.res_body.is_ram());
let mut br = record.res_body.reader();
let mut buf = Vec::with_capacity(2048);
br.read_to_end(&mut buf).unwrap();
assert_eq!(buf.len(), 1270);
assert_eq!(&buf[0..15], b"<!doctype html>");
assert_eq!(&buf[(buf.len()-8)..], b"</html>\n");
}
#[test]
fn test_read_empty_file() {
assert!(test_logger());
let tune = Tunables::new();
let bfile = BarcFile::new("sample/empty.barc");
let mut reader = bfile.reader().unwrap();
let record = reader.read(&tune).unwrap();
assert!(record.is_none());
let record = debugv!(reader.read(&tune)).unwrap();
assert!(record.is_none());
}
#[test]
fn test_read_over_reserved() {
assert!(test_logger());
let tune = Tunables::new();
let bfile = BarcFile::new("sample/reserved.barc");
let mut reader = bfile.reader().unwrap();
let record = debugv!(reader.read(&tune)).unwrap();
assert!(record.is_none());
let record = reader.read(&tune).unwrap();
assert!(record.is_none());
}
#[test]
fn test_read_short_record_head() {
let tune = Tunables::new();
let bfile = BarcFile::new("sample/reserved.barc");
let mut reader = bfile.reader().unwrap();
reader.seek(1).unwrap();
if let Err(e) = reader.read(&tune) {
if let BarcError::ReadIncompleteRecHead(l) = e {
assert_eq!(l, V2_HEAD_SIZE - 1);
let em = e.to_string();
assert!(em.contains("Incomplete"), em)
} else {
panic!("Other error: {}", e);
}
} else {
panic!("Should not succeed!");
}
}
#[test]
fn test_read_bad_record_head() {
let tune = Tunables::new();
let bfile = BarcFile::new("sample/example.barc");
let mut reader = bfile.reader().unwrap();
reader.seek(1).unwrap();
if let Err(e) = reader.read(&tune) {
if let BarcError::ReadInvalidRecHead = e {
} else {
panic!("Other error: {}", e);
}
} else {
panic!("Should not succeed!");
}
}
#[test]
fn test_read_truncated() {
let tune = Tunables::new();
let bfile = BarcFile::new("sample/truncated.barc");
let mut reader = bfile.reader().unwrap();
if let Err(e) = reader.read(&tune) {
if let BarcError::Io(ioe) = e {
assert_eq!(ErrorKind::UnexpectedEof, ioe.kind());
} else {
panic!("Other error type {:?}", e);
}
} else {
panic!("Should not succeed!");
}
}
#[test]
fn test_read_204_no_body() {
assert!(test_logger());
let tune = Tunables::new();
let bfile = BarcFile::new("sample/204_no_body.barc");
let mut reader = bfile.reader().unwrap();
let record = debugv!(reader.read(&tune)).unwrap().unwrap();
assert_eq!(record.rec_type, RecordType::Dialog);
assert_eq!(record.meta.len(), 4);
assert_eq!(record.req_headers.len(), 4);
assert!(record.req_body.is_empty());
assert_eq!(record.res_headers.len(), 9);
assert!(record.res_body.is_empty());
}
}