barc/
lib.rs

1//! **B**ody **Arc**hive container file format, reader and writer.
2//!
3//! BARC is a container file format for the storage or one to many HTTP
4//! request/response dialog records. A fixed length, ASCII-only record head
5//! specifies lengths of a subsequent series of request and response header
6//! blocks and bodies which are stored as raw (unencoded) bytes. When not
7//! using the internal compression feature, the format is easily human
8//! readable.  With compression, the `barc` CLI tool (*barc-cli* crate) can be
9//! used to view records.
10//!
11//! See some sample files in source sample/*.barc.
12//!
13//! ## Other features:
14//!
15//! * An additional *meta*-headers block provides more recording details
16//!   and can also be used to store application-specific values.
17//!
18//! * Sequential or random-access reads by record offset (which could be
19//!   stored in an external index or database).
20//!
21//! * Single-writer sessions are guaranteed safe with N concurrent readers
22//!   (in or out of process).
23//!
24//! * Optional per-record gzip or Brotli compression (headers and bodies)
25
26#![warn(rust_2018_idioms)]
27
28use std::cmp;
29use std::error::Error as StdError;
30use std::fs::{File, OpenOptions};
31use std::fmt;
32use std::io;
33use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
34use std::mem;
35use std::ops::{AddAssign, ShlAssign};
36use std::sync::{Arc, Mutex, MutexGuard};
37use std::path::Path;
38
39use bytes::{BytesMut, BufMut};
40use http::header::{HeaderName, HeaderValue};
41use tao_log::{debug, warn};
42use olio::fs::rc::{ReadPos, ReadSlice};
43
44use body_image::{
45    BodyError, BodyImage, BodySink, Dialog, Encoding,
46    Epilog, Prolog, Recorded, RequestRecorded, Tunables
47};
48
49/// Conveniently compact type alias for dyn Trait `std::error::Error`.
50///
51/// It is possible to query and downcast the type via methods of
52/// [`std::any::Any`].
53pub type Flaw = Box<dyn StdError + Send + Sync + 'static>;
54
55mod compress;
56pub use compress::{
57    CompressStrategy, Compression, EncodeWrapper,
58    GzipCompressStrategy, NoCompressStrategy,
59};
60
61use compress::DecodeWrapper;
62
63#[cfg(feature = "brotli")]
64pub use compress::BrotliCompressStrategy;
65
66use std::convert::TryFrom;
67
68/// Fixed record head size including CRLF terminator:
69/// 54 bytes
70pub const V2_HEAD_SIZE: usize = 54;
71
72/// Maximum total record length, excluding the record head:
73/// 2<sup>48</sup> (256 TiB) - 1.
74///
75/// Note: this exceeds the file or partition size limits of many
76/// file-systems.
77pub const V2_MAX_RECORD: u64 = 0xffff_ffff_ffff;
78
79/// Maximum header (meta, request, response) block size, including
80/// any CRLF terminator:
81/// 2<sup>20</sup> (1 MiB) - 1.
82pub const V2_MAX_HBLOCK: usize =        0xf_ffff;
83
84/// Maximum request body size, including any CRLF terminator:
85/// 2<sup>40</sup> (1 TiB) - 1.
86pub const V2_MAX_REQ_BODY: u64 = 0xff_ffff_ffff;
87
88/// Meta `HeaderName` for the complete URL used in the request.
89#[inline]
90pub fn hname_meta_url() -> http::header::HeaderName {
91    static NAME: &str = "url";
92    HeaderName::from_static(NAME)
93}
94
95/// Meta `HeaderName` for the HTTP method used in the request, e.g. "GET",
96/// "POST", etc.
97#[inline]
98pub fn hname_meta_method() -> http::header::HeaderName {
99    static NAME: &str = "method";
100    HeaderName::from_static(NAME)
101}
102
103/// Meta `HeaderName` for the response version, e.g. "HTTP/1.1", "HTTP/2.0",
104/// etc.
105#[inline]
106pub fn hname_meta_res_version() -> http::header::HeaderName {
107    static NAME: &str = "response-version";
108    HeaderName::from_static(NAME)
109}
110
111/// Meta `HeaderName` for the response numeric status code, SPACE, and then a
112/// standardized _reason phrase_, e.g. "200 OK". The later is intended only
113/// for human readers.
114#[inline]
115pub fn hname_meta_res_status() -> http::header::HeaderName {
116    static NAME: &str = "response-status";
117    HeaderName::from_static(NAME)
118}
119
120/// Meta `HeaderName` for a list of content or transfer encodings decoded for
121/// the current response body. The value is in HTTP content-encoding header
122/// format, e.g. "chunked, gzip".
123#[inline]
124pub fn hname_meta_res_decoded() -> http::header::HeaderName {
125    static NAME: &str = "response-decoded";
126    HeaderName::from_static(NAME)
127}
128
129/// Reference to a BARC File by `Path`, supporting up to 1 writer and N
130/// readers concurrently.
131pub struct BarcFile {
132    path: Box<Path>,
133    write_lock: Mutex<Option<File>>,
134}
135
136/// BARC file handle for write access.
137pub struct BarcWriter<'a> {
138    guard: MutexGuard<'a, Option<File>>
139}
140
141/// BARC file handle for read access. Each reader has its own file handle
142/// and position.
143pub struct BarcReader {
144    file: ReadPos,
145}
146
147/// Error enumeration for all barc module errors.
148///
149/// This may be extended in the future, so exhaustive matching is gently
150/// discouraged with an unused variant.
151#[derive(Debug)]
152pub enum BarcError {
153    /// Error with `BodySink` or `BodyImage`.
154    Body(BodyError),
155
156    /// IO errors, reading from or writing to a BARC file.
157    Io(io::Error),
158
159    /// Unknown `RecordType` byte flag.
160    UnknownRecordType(u8),
161
162    /// Unknown `Compression` byte flag.
163    UnknownCompression(u8),
164
165    /// Decoder unsupported for the `Compression` encoding found on read.
166    DecoderUnsupported(Compression),
167
168    /// Read an incomplete record head.
169    ReadIncompleteRecHead(usize),
170
171    /// Read an invalid record head.
172    ReadInvalidRecHead,
173
174    /// Read an invalid record head hex digit.
175    ReadInvalidRecHeadHex(u8),
176
177    /// Error parsing header name, value or block (with cause)
178    InvalidHeader(Flaw),
179
180    /// Wraps a `DialogConvertError` as used for `Record` to `Dialog`
181    /// conversion.
182    IntoDialog(DialogConvertError),
183
184    /// Unused variant to both enable non-exhaustive matching and warn against
185    /// exhaustive matching.
186    _FutureProof
187}
188
189impl fmt::Display for BarcError {
190    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191        match *self {
192            BarcError::Body(ref be) =>
193                write!(f, "With Body; {}", be),
194            BarcError::Io(ref e) =>
195                write!(f, "{}", e),
196            BarcError::UnknownRecordType(b) =>
197                write!(f, "Unknown record type flag [{}]", b),
198            BarcError::UnknownCompression(b) =>
199                write!(f, "Unknown compression flag [{}]", b),
200            BarcError::DecoderUnsupported(c) =>
201                write!(f, "No decoder for {:?}. Enable the feature?", c),
202            BarcError::ReadIncompleteRecHead(l) =>
203                write!(f, "Incomplete record head, len {}", l),
204            BarcError::ReadInvalidRecHead =>
205                write!(f, "Invalid record head suffix"),
206            BarcError::ReadInvalidRecHeadHex(b) =>
207                write!(f, "Invalid record head hex digit [{}]", b),
208            BarcError::IntoDialog(ref dce) =>
209                write!(f, "Record to Dialog conversion; {}", dce),
210            BarcError::InvalidHeader(ref flaw) =>
211                write!(f, "Invalid header; {}", flaw),
212            BarcError::_FutureProof =>
213                unreachable!("Don't abuse the _FutureProof!")
214        }
215    }
216}
217
218impl StdError for BarcError {
219    fn source(&self) -> Option<&(dyn StdError + 'static)> {
220        match *self {
221            BarcError::Body(ref be)               => Some(be),
222            BarcError::Io(ref e)                  => Some(e),
223            BarcError::InvalidHeader(ref flaw)    => Some(flaw.as_ref()),
224            BarcError::IntoDialog(ref dce)        => Some(dce),
225            _ => None
226        }
227    }
228}
229
230impl From<io::Error> for BarcError {
231    fn from(err: io::Error) -> BarcError {
232        BarcError::Io(err)
233    }
234}
235
236impl From<BodyError> for BarcError {
237    fn from(err: BodyError) -> BarcError {
238        BarcError::Body(err)
239    }
240}
241
242impl From<DialogConvertError> for BarcError {
243    fn from(err: DialogConvertError) -> BarcError {
244        BarcError::IntoDialog(err)
245    }
246}
247
248/// A parsed record head.
249#[derive(Debug)]
250struct RecordHead {
251    len:              u64,
252    rec_type:         RecordType,
253    compress:         Compression,
254    meta:             usize,
255    req_h:            usize,
256    req_b:            u64,
257    res_h:            usize,
258}
259
260/// An owned BARC record with public fields.
261///
262/// Additonal getter methods are found in trait implementations
263/// [`RequestRecorded`](#impl-RequestRecorded), [`Recorded`](#impl-Recorded),
264/// and [`MetaRecorded`](#impl-MetaRecorded).
265#[derive(Clone, Debug, Default)]
266pub struct Record {
267    /// Record type.
268    pub rec_type:         RecordType,
269
270    /// Map of _meta_-headers for values which are not strictly part of the
271    /// HTTP request or response headers. This can be extended with
272    /// application specific name/value pairs.
273    pub meta:             http::HeaderMap,
274
275    /// Map of HTTP request headers.
276    pub req_headers:      http::HeaderMap,
277
278    /// Request body which may or may not be RAM resident.
279    pub req_body:         BodyImage,
280
281    /// Map of HTTP response headers.
282    pub res_headers:      http::HeaderMap,
283
284    /// Response body which may or may not be RAM resident.
285    pub res_body:         BodyImage,
286}
287
288/// Access to BARC `Record` compatible objects by reference, extending
289/// `Recorded` with meta-headers and a record type.
290pub trait MetaRecorded: Recorded {
291    /// Record type.
292    fn rec_type(&self)    -> RecordType;
293
294    /// Map of _meta_-headers for values which are not strictly part of the
295    /// HTTP request or response headers.
296    fn meta(&self)        -> &http::HeaderMap;
297}
298
299impl RequestRecorded for Record {
300    fn req_headers(&self) -> &http::HeaderMap  { &self.req_headers }
301    fn req_body(&self)    -> &BodyImage        { &self.req_body }
302}
303
304impl Recorded for Record {
305    fn res_headers(&self) -> &http::HeaderMap  { &self.res_headers }
306    fn res_body(&self)    -> &BodyImage        { &self.res_body }
307}
308
309impl MetaRecorded for Record {
310    fn rec_type(&self)    -> RecordType        { self.rec_type }
311    fn meta(&self)        -> &http::HeaderMap  { &self.meta }
312}
313
314impl TryFrom<Dialog> for Record {
315    type Error = BarcError;
316
317    /// Attempt to convert `Dialog` to `Record`.  This derives meta headers
318    /// from various `Dialog` fields, and could potentially fail, based on
319    /// header value constraints, with `BarcError::InvalidHeader`. Converting
320    /// `Dialog::url` to the meta *url* header has the most potential, given
321    /// `http::Uri` validation complexity, but any conversion failure would
322    /// suggest an *http* crate bug or breaking change—as currently stated,
323    /// allowed `Uri` bytes are a subset of allowed `HeaderValue` bytes.
324    fn try_from(dialog: Dialog) -> Result<Self, Self::Error> {
325
326        let (prolog, epilog) = dialog.explode();
327        let mut meta = http::HeaderMap::with_capacity(6);
328        let efn = &|e| BarcError::InvalidHeader(Flaw::from(e));
329
330        meta.append(
331            hname_meta_url(),
332            prolog.url.to_string().parse().map_err(efn)?
333        );
334        meta.append(
335            hname_meta_method(),
336            prolog.method.to_string().parse().map_err(efn)?
337        );
338
339        // FIXME: This relies on the debug format of version, e.g. "HTTP/1.1"
340        // which might not be stable, but http::Version doesn't offer an enum
341        // to match on, only constants.
342        let v = format!("{:?}", epilog.version);
343        meta.append(
344            hname_meta_res_version(),
345            v.parse().map_err(efn)?
346        );
347
348        meta.append(
349            hname_meta_res_status(),
350            epilog.status.to_string().parse().map_err(efn)?
351        );
352
353        if !epilog.res_decoded.is_empty() {
354            let mut joined = String::with_capacity(30);
355            for e in epilog.res_decoded {
356                if !joined.is_empty() { joined.push_str(", "); }
357                joined.push_str(&e.to_string());
358            }
359            meta.append(
360                hname_meta_res_decoded(),
361                joined.parse().map_err(efn)?
362            );
363        }
364        Ok(Record {
365            rec_type: RecordType::Dialog,
366            meta,
367            req_headers: prolog.req_headers,
368            req_body:    prolog.req_body,
369            res_headers: epilog.res_headers,
370            res_body:    epilog.res_body,
371        })
372    }
373}
374
375/// Error enumeration for failures when converting from a `Record` to a
376/// `Dialog`.
377///
378/// This error type may also be converted to (wrapped as) a `BarcError`. It
379/// may be extended in the future, so exhaustive matching is gently
380/// discouraged with an unused variant.
381#[derive(Debug)]
382pub enum DialogConvertError {
383    /// No url meta header found.
384    NoMetaUrl,
385
386    /// The url meta header failed to parse as an `http::Uri`.
387    InvalidUrl(http::uri::InvalidUri),
388
389    /// No method meta header found.
390    NoMetaMethod,
391
392    /// The method meta header failed to parse as an `http::Method`.
393    InvalidMethod(http::method::InvalidMethod),
394
395    /// No response-version meta header found.
396    NoMetaResVersion,
397
398    /// The response-version meta header did not match a known value.
399    InvalidVersion(Vec<u8>),
400
401    /// No response-status meta header found.
402    NoMetaResStatus,
403
404    /// The response-status meta header is not in a recognized format.
405    MalformedMetaResStatus,
406
407    /// The response-status meta header failed to be parsed as an
408    /// `http::StatusCode`.
409    InvalidStatusCode(http::status::InvalidStatusCode),
410
411    /// The response-decoded meta header failed to be parsed.
412    InvalidResDecoded(String),
413
414    /// Unused variant to both enable non-exhaustive matching and warn against
415    /// exhaustive matching.
416    _FutureProof
417}
418
419impl fmt::Display for DialogConvertError {
420    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
421        match *self {
422            DialogConvertError::NoMetaUrl =>
423                write!(f, "No url meta header found"),
424            DialogConvertError::InvalidUrl(ref iub) =>
425                write!(f, "Invalid URI: {}", iub),
426            DialogConvertError::NoMetaMethod =>
427                write!(f, "No method meta header found"),
428            DialogConvertError::InvalidMethod(ref im) =>
429                write!(f, "Invalid HTTP Method: {}", im),
430            DialogConvertError::NoMetaResVersion =>
431                write!(f, "No response-version meta header found"),
432            DialogConvertError::InvalidVersion(ref bs) => {
433                if let Ok(s) = String::from_utf8(bs.clone()) {
434                    write!(f, "Invalid HTTP Version: {}", s)
435                } else {
436                    write!(f, "Invalid HTTP Version: {:x?}", bs)
437                }
438            }
439            DialogConvertError::NoMetaResStatus =>
440                write!(f, "No response-status meta header found"),
441            DialogConvertError::MalformedMetaResStatus =>
442                write!(f, "The response-status meta header is malformed"),
443            DialogConvertError::InvalidStatusCode(ref isc) =>
444                write!(f, "Invalid HTTP status code: {}", isc),
445            DialogConvertError::InvalidResDecoded(ref d) =>
446                write!(f, "Invalid response-decoded header value: {}", d),
447            DialogConvertError::_FutureProof =>
448                unreachable!("Don't abuse the _FutureProof!")
449        }
450    }
451}
452
453impl StdError for DialogConvertError {
454    fn source(&self) -> Option<&(dyn StdError + 'static)> {
455        match *self {
456            DialogConvertError::InvalidUrl(ref iub)           => Some(iub),
457            DialogConvertError::InvalidMethod(ref im)         => Some(im),
458            DialogConvertError::InvalidStatusCode(ref isc)    => Some(isc),
459            _ => None
460        }
461    }
462}
463
464impl TryFrom<Record> for Dialog {
465    type Error = DialogConvertError;
466
467    /// Attempt to convert `Record` to `Dialog`. This parses various meta
468    /// header values to produce `Dialog` equivalents such as
469    /// `http::StatusCode` and `http::Method`, which could fail, if the
470    /// `Record` was not originally produced from a `Dialog` or was otherwise
471    /// modified in an unsupported way.
472    fn try_from(rec: Record) -> Result<Self, Self::Error> {
473        let url = if let Some(uv) = rec.meta.get(hname_meta_url()) {
474            http::Uri::try_from(uv.as_bytes())
475                .map_err(DialogConvertError::InvalidUrl)
476        } else {
477            Err(DialogConvertError::NoMetaUrl)
478        }?;
479
480        let method = if let Some(v) = rec.meta.get(hname_meta_method()) {
481            http::Method::from_bytes(v.as_bytes())
482                .map_err(DialogConvertError::InvalidMethod)
483        } else {
484            Err(DialogConvertError::NoMetaMethod)
485        }?;
486
487        let version = if let Some(v) = rec.meta.get(hname_meta_res_version()) {
488            let vb = v.as_bytes();
489            match vb {
490                b"HTTP/0.9" => http::Version::HTTP_09,
491                b"HTTP/1.0" => http::Version::HTTP_10,
492                b"HTTP/1.1" => http::Version::HTTP_11,
493                b"HTTP/2.0" => http::Version::HTTP_2,
494                _ => {
495                    return Err(DialogConvertError::InvalidVersion(vb.to_vec()));
496                }
497            }
498        } else {
499            return Err(DialogConvertError::NoMetaResVersion);
500        };
501
502        let status = if let Some(v) = rec.meta.get(hname_meta_res_status()) {
503            let vbs = v.as_bytes();
504            if vbs.len() >= 3 {
505                http::StatusCode::from_bytes(&vbs[0..3])
506                    .map_err(DialogConvertError::InvalidStatusCode)
507            } else {
508                Err(DialogConvertError::MalformedMetaResStatus)
509            }
510        } else {
511            Err(DialogConvertError::NoMetaResStatus)
512        }?;
513
514        let res_decoded = if let Some(v) = rec.meta.get(hname_meta_res_decoded()) {
515            if let Ok(dcds) = v.to_str() {
516                let mut encodes = Vec::with_capacity(4);
517                for enc in dcds.split(',') {
518                    let enc = enc.trim();
519                    encodes.push(match enc {
520                        "chunked" => Encoding::Chunked,
521                        "deflate" => Encoding::Deflate,
522                        "gzip"    => Encoding::Gzip,
523                        "br"      => Encoding::Brotli,
524                        _ => {
525                            return Err(DialogConvertError::InvalidResDecoded(
526                                enc.to_string()
527                            ));
528                        }
529                    })
530                }
531                encodes
532            } else {
533                return Err(DialogConvertError::InvalidResDecoded(
534                    format!("{:x?}", v.as_bytes())
535                ));
536            }
537        } else {
538            Vec::with_capacity(0)
539        };
540
541        Ok(Dialog::new(
542            Prolog {
543                method,
544                url,
545                req_headers: rec.req_headers,
546                req_body:    rec.req_body
547            },
548            Epilog {
549                version,
550                status,
551                res_decoded,
552                res_headers: rec.res_headers,
553                res_body:    rec.res_body
554            }
555        ))
556    }
557}
558
559/// BARC record type.
560#[derive(Clone, Copy, Debug, PartialEq)]
561pub enum RecordType {
562    /// Used internally to _reserve_ a BARC record head.
563    Reserved,
564
565    /// A complete HTTP request and response dialog between a client
566    /// and server. See `Dialog`.
567    Dialog,
568}
569
570impl RecordType {
571    /// Return (char) flag for variant.
572    fn flag(self) -> char {
573        match self {
574            RecordType::Reserved => 'R',
575            RecordType::Dialog => 'D',
576        }
577    }
578
579    /// Return variant for (byte) flag, or fail.
580    fn from_byte(f: u8) -> Result<Self, BarcError> {
581        match f {
582            b'R' => Ok(RecordType::Reserved),
583            b'D' => Ok(RecordType::Dialog),
584            _ => Err(BarcError::UnknownRecordType(f))
585        }
586    }
587}
588
589impl Default for RecordType {
590    /// Defaults to `Dialog`.
591    fn default() -> RecordType { RecordType::Dialog }
592}
593
594const CRLF: &[u8] = b"\r\n";
595
596const WITH_CRLF: bool = true;
597const NO_CRLF:   bool = false;
598
599const V2_RESERVE_HEAD: RecordHead = RecordHead {
600    len: 0,
601    rec_type: RecordType::Reserved,
602    compress: Compression::Plain,
603    meta: 0,
604    req_h: 0,
605    req_b: 0,
606    res_h: 0
607};
608
609impl BarcFile {
610    /// Return new instance for the specified path, which may be an
611    /// existing file, or one to be created when `writer` is opened.
612    pub fn new<P>(path: P) -> BarcFile
613        where P: AsRef<Path>
614    {
615        // Save off owned path to re-open for readers
616        let path: Box<Path> = path.as_ref().into();
617        let write_lock = Mutex::new(None);
618        BarcFile { path, write_lock }
619    }
620
621    /// Get a writer for this file, opening the file for write (and
622    /// possibly creating it, or erroring) if this is the first time
623    /// called. May block on the write lock, as only one `BarcWriter`
624    /// instance is allowed.
625    pub fn writer(&self) -> Result<BarcWriter<'_>, BarcError> {
626        let mut guard = self.write_lock.lock().unwrap(); // FIXME:
627        // PoisonError is not send, so can't map to BarcError
628
629        if (*guard).is_none() {
630            let file = OpenOptions::new()
631                .create(true)
632                .read(true)
633                .write(true)
634                .open(&self.path)?;
635            // FIXME: Use fs2 crate for: file.try_lock_exclusive()?
636            *guard = Some(file);
637        }
638
639        Ok(BarcWriter { guard })
640    }
641
642    /// Get a reader for this file. Errors if the file does not exist.
643    pub fn reader(&self) -> Result<BarcReader, BarcError> {
644        let file = OpenOptions::new()
645            .read(true)
646            .open(&self.path)?;
647        // FIXME: Use fs2 crate for: file.try_lock_shared()?
648        Ok(BarcReader { file: ReadPos::new(Arc::new(file), 0) })
649    }
650}
651
652impl<'a> BarcWriter<'a> {
653    /// Write a new record, returning the record's offset from the
654    /// start of the BARC file. The writer position is then advanced
655    /// to the end of the file, for the next `write`.
656    pub fn write(
657        &mut self,
658        rec: &dyn MetaRecorded,
659        strategy: &dyn CompressStrategy)
660        -> Result<u64, BarcError>
661    {
662        // BarcFile::writer() guarantees Some(File)
663        let file: &mut File = self.guard.as_mut().unwrap();
664
665        // Write initial head as reserved place holder
666        let start = file.seek(SeekFrom::End(0))?;
667        write_record_head(file, &V2_RESERVE_HEAD)?;
668        file.flush()?;
669
670        // Write the record per strategy
671        let mut head = write_record(file, rec, strategy)?;
672
673        // Use new file offset to indicate total length
674        let end = file.seek(SeekFrom::Current(0))?;
675        let orig_len = head.len;
676        assert!(end >= (start + (V2_HEAD_SIZE as u64)));
677        head.len = end - start - (V2_HEAD_SIZE as u64);
678        if head.compress == Compression::Plain {
679            assert_eq!(orig_len, head.len);
680        } else if orig_len < head.len {
681            warn!("Compression *increased* record size from \
682                   {} to {} bytes",
683                  orig_len, head.len);
684        }
685
686        // Seek back and write final record head, with known size
687        file.seek(SeekFrom::Start(start))?;
688        write_record_head(file, &head)?;
689
690        // Seek to end and flush
691        file.seek(SeekFrom::End(0))?;
692        file.flush()?;
693
694        Ok(start)
695    }
696}
697
698// Write the record, returning a preliminary `RecordHead` with
699// observed (not compressed) lengths.
700fn write_record(
701    file: &File,
702    rec: &dyn MetaRecorded,
703    strategy: &dyn CompressStrategy)
704    -> Result<RecordHead, BarcError>
705{
706    let mut encoder = strategy.wrap_encoder(rec, file)?;
707    let fout = &mut encoder;
708    let compress = fout.mode();
709    let with_crlf = compress == Compression::Plain;
710    let head = {
711        let meta = write_headers(fout, with_crlf, rec.meta())?;
712
713        let req_h = write_headers(fout, with_crlf, rec.req_headers())?;
714        let req_b = write_body(fout, with_crlf, rec.req_body())?;
715
716        let res_h = write_headers(fout, with_crlf, rec.res_headers())?;
717
718        // Compute total thus far, excluding the fixed head length
719        let mut len: u64 = (meta + req_h + res_h) as u64 + req_b;
720
721        assert!((len + rec.res_body().len() + 2) <= V2_MAX_RECORD,
722                "body exceeds size limit");
723
724        let res_b = write_body(fout, with_crlf, rec.res_body())?;
725        len += res_b;
726
727        RecordHead {
728            len, // uncompressed length
729            rec_type: rec.rec_type(),
730            compress,
731            meta,
732            req_h,
733            req_b,
734            res_h }
735    };
736
737    encoder.finish()?;
738    Ok(head)
739}
740
741// Write record head to out, asserting the various length constraints.
742fn write_record_head<W>(out: &mut W, head: &RecordHead)
743    -> Result<(), BarcError>
744    where W: Write + ?Sized
745{
746    // Check input ranges
747    assert!(head.len   <= V2_MAX_RECORD,   "len exceeded");
748    assert!(head.meta  <= V2_MAX_HBLOCK,   "meta exceeded");
749    assert!(head.req_h <= V2_MAX_HBLOCK,   "req_h exceeded");
750    assert!(head.req_b <= V2_MAX_REQ_BODY, "req_b exceeded");
751    assert!(head.res_h <= V2_MAX_HBLOCK,   "res_h exceeded");
752
753    let size = write_all_len(out, format!(
754        // ---6------19---22-----28-----34------45----50------54
755        "BARC2 {:012x} {}{} {:05x} {:05x} {:010x} {:05x}\r\n\r\n",
756        head.len, head.rec_type.flag(), head.compress.flag(),
757        head.meta, head.req_h, head.req_b, head.res_h
758    ).as_bytes())?;
759    assert_eq!(size, V2_HEAD_SIZE, "wrong record head size");
760    Ok(())
761}
762
763/// Write header block to out, with optional CR+LF end padding, and return the
764/// length written. This is primarily an implementation detail of `BarcWriter`,
765/// but is made public for its general diagnostic utility.
766///
767/// The `Write` is passed by reference for backward compatibility with its
768/// original non-generic form as `&mut dyn Write`. [C-RW-VALUE] prefers
769/// pass by value, but this would now be a breaking change.
770/// [`std::io::copy`] is presumably in the same position.
771///
772/// [C-RW-VALUE]: https://rust-lang.github.io/api-guidelines/interoperability.html#generic-readerwriter-functions-take-r-read-and-w-write-by-value-c-rw-value
773/// [`std::io::copy`]: https://doc.rust-lang.org/std/io/fn.copy.html
774pub fn write_headers<W>(
775    out: &mut W,
776    with_crlf: bool,
777    headers: &http::HeaderMap)
778    -> Result<usize, BarcError>
779    where W: Write + ?Sized
780{
781    let mut size = 0;
782    for (key, value) in headers.iter() {
783        size += write_all_len(out, key.as_ref())?;
784        size += write_all_len(out, b": ")?;
785        size += write_all_len(out, value.as_bytes())?;
786        size += write_all_len(out, CRLF)?;
787    }
788    if with_crlf && size > 0 {
789        size += write_all_len(out, CRLF)?;
790    }
791    assert!(size <= V2_MAX_HBLOCK);
792    Ok(size)
793}
794
795/// Write body to out, with optional CR+LF end padding, and return the length
796/// written.  This is primarily an implementation detail of `BarcWriter`, but
797/// is made public for its general diagnostic utility.
798///
799/// The `Write` is passed by reference for backward compatibility with its
800/// original non-generic form as `&mut dyn Write`. [C-RW-VALUE] prefers
801/// pass by value, but this would now be a breaking change.
802///
803/// [C-RW-VALUE]: https://rust-lang.github.io/api-guidelines/interoperability.html#generic-readerwriter-functions-take-r-read-and-w-write-by-value-c-rw-value
804pub fn write_body<W>(out: &mut W, with_crlf: bool, body: &BodyImage)
805    -> Result<u64, BarcError>
806    where W: Write + ?Sized
807{
808    let mut size = body.write_to(out)?;
809    if with_crlf && size > 0 {
810        size += write_all_len(out, CRLF)? as u64;
811    }
812    Ok(size)
813}
814
815// Like `write_all`, but return the length of the provided byte slice.
816fn write_all_len<W>(out: &mut W, bs: &[u8]) -> Result<usize, BarcError>
817    where W: Write + ?Sized
818{
819    out.write_all(bs)?;
820    Ok(bs.len())
821}
822
823impl BarcReader {
824
825    /// Read and return the next Record or None if EOF. The provided Tunables
826    /// `max_body_ram` controls, depending on record sizes and compression,
827    /// whether the request and response bodies are read directly into RAM,
828    /// buffered in a file, or deferred via a `ReadSlice`.
829    pub fn read(&mut self, tune: &Tunables)
830        -> Result<Option<Record>, BarcError>
831    {
832        let fin = &mut self.file;
833        let start = fin.tell();
834
835        let rhead = match read_record_head(fin) {
836            Ok(Some(rh)) => rh,
837            Ok(None) => return Ok(None),
838            Err(e) => return Err(e)
839        };
840
841        let rec_type = rhead.rec_type;
842
843        // With a concurrent writer, its possible to see an incomplete,
844        // Reserved record head, followed by an empty or partial record
845        // payload.  In this case, seek back to start and return None.
846        if rec_type == RecordType::Reserved {
847            fin.seek(SeekFrom::Start(start))?;
848            return Ok(None);
849        }
850
851        if rhead.compress != Compression::Plain {
852            let end = fin.tell() + rhead.len;
853            let rec = read_compressed(
854                fin.subslice(fin.tell(), end), &rhead, tune
855            )?;
856            fin.seek(SeekFrom::Start(end))?;
857            return Ok(Some(rec))
858        }
859
860        let meta = read_headers(fin, WITH_CRLF, rhead.meta)?;
861        let req_headers = read_headers(fin, WITH_CRLF, rhead.req_h)?;
862
863        let req_body = if rhead.req_b <= tune.max_body_ram() {
864            read_body_ram(fin, WITH_CRLF, rhead.req_b as usize)
865        } else {
866            slice_body(fin, rhead.req_b)
867        }?;
868        let res_headers = read_headers(fin, WITH_CRLF, rhead.res_h)?;
869
870        let body_len = rhead.len - (fin.tell() - start - (V2_HEAD_SIZE as u64));
871
872        let res_body = if body_len <= tune.max_body_ram() {
873            read_body_ram(fin, WITH_CRLF, body_len as usize)
874        } else {
875            slice_body(fin, body_len)
876        }?;
877
878        Ok(Some(Record { rec_type, meta, req_headers, req_body,
879                         res_headers, res_body }))
880    }
881
882    /// Returns the current offset in bytes of this reader, which starts as 0
883    /// and is advanced by each succesful return from `read` or updated via
884    /// `seek`.
885    pub fn offset(&self) -> u64 {
886        self.file.tell()
887    }
888
889    /// Seek to a known byte offset (e.g. 0 or as returned from
890    /// `BarcWriter::write` or `offset`) from the start of the BARC file. This
891    /// effects subsequent calls to `read`, which may error if the position is
892    /// not to a valid record head.
893    pub fn seek(&mut self, offset: u64) -> Result<(), BarcError> {
894        self.file.seek(SeekFrom::Start(offset))?;
895        Ok(())
896    }
897}
898
899// Read and return a compressed `Record`. This is specialized for
900// NO_CRLF and since bodies can't be directly mapped from the file.
901fn read_compressed(rslice: ReadSlice, rhead: &RecordHead, tune: &Tunables)
902    -> Result<Record, BarcError>
903{
904    // Decoder over limited `ReadSlice` of compressed record len
905    let mut wrapper = DecodeWrapper::new(
906        rhead.compress,
907        rslice,
908        tune.buffer_size_ram())?;
909    let fin = &mut wrapper;
910
911    let rec_type = rhead.rec_type;
912
913    let meta = read_headers(fin, NO_CRLF, rhead.meta)?;
914
915    let req_headers = read_headers(fin, NO_CRLF, rhead.req_h)?;
916
917    let req_body = if rhead.req_b <= tune.max_body_ram() {
918        read_body_ram(fin, NO_CRLF, rhead.req_b as usize)?
919    } else {
920        read_body_fs(fin, rhead.req_b, tune)?
921    };
922
923    let res_headers = read_headers(fin, NO_CRLF, rhead.res_h)?;
924
925    // When compressed, we don't actually know the final size of the
926    // response body, so start small and use read_to_body. This may
927    // return `Ram` or `FsRead` states.
928    let res_body = BodyImage::read_from(fin, 4096, tune)?;
929
930    Ok(Record { rec_type, meta, req_headers, req_body, res_headers, res_body })
931}
932
933// Return RecordHead or None if EOF
934fn read_record_head<R>(rin: &mut R)
935    -> Result<Option<RecordHead>, BarcError>
936    where R: Read + ?Sized
937{
938    let mut buf = [0u8; V2_HEAD_SIZE];
939
940    let size = read_record_head_buf(rin, &mut buf)?;
941    if size == 0 {
942        return Ok(None);
943    }
944    if size != V2_HEAD_SIZE {
945        return Err(BarcError::ReadIncompleteRecHead(size));
946    }
947    if &buf[0..6] != b"BARC2 " {
948        return Err(BarcError::ReadInvalidRecHead);
949    }
950
951    let len       = parse_hex(&buf[6..18])?;
952    let rec_type  = RecordType::from_byte(buf[19])?;
953    let compress  = Compression::from_byte(buf[20])?;
954    let meta      = parse_hex(&buf[22..27])?;
955    let req_h     = parse_hex(&buf[28..33])?;
956    let req_b     = parse_hex(&buf[34..44])?;
957    let res_h     = parse_hex(&buf[45..50])?;
958    Ok(Some(RecordHead { len, rec_type, compress, meta, req_h, req_b, res_h }))
959}
960
961// Like `Read::read_exact` but we need to distinguish 0 bytes read
962// (EOF) from partial bytes read (a format error), so it also returns
963// the number of bytes read.
964fn read_record_head_buf<R>(rin: &mut R, mut buf: &mut [u8])
965    -> Result<usize, BarcError>
966    where R: Read + ?Sized
967{
968    let mut size = 0;
969    loop {
970        match rin.read(buf) {
971            Ok(0) => break,
972            Ok(n) => {
973                size += n;
974                if size >= V2_HEAD_SIZE {
975                    break;
976                }
977                let t = buf;
978                buf = &mut t[n..];
979            }
980            Err(e) => {
981                if e.kind() == ErrorKind::Interrupted {
982                    continue;
983                } else {
984                    return Err(e.into());
985                }
986            }
987        }
988    }
989    Ok(size)
990}
991
992// Read lowercase hexadecimal unsigned value directly from bytes.
993fn parse_hex<T>(buf: &[u8]) -> Result<T, BarcError>
994    where T: AddAssign<T> + From<u8> + ShlAssign<u8>
995{
996    let mut v = T::from(0u8);
997    for d in buf {
998        v <<= 4u8;
999        if *d >= b'0' && *d <= b'9' {
1000            v += T::from(*d - b'0');
1001        } else if *d >= b'a' && *d <= b'f' {
1002            v += T::from(10 + (*d - b'a'));
1003        } else {
1004            return Err(BarcError::ReadInvalidRecHeadHex(*d));
1005        }
1006    }
1007    Ok(v)
1008}
1009
1010// Reader header block of len bytes to HeaderMap.
1011fn read_headers<R>(rin: &mut R, with_crlf: bool, len: usize)
1012    -> Result<http::HeaderMap, BarcError>
1013    where R: Read + ?Sized
1014{
1015    if len == 0 {
1016        return Ok(http::HeaderMap::with_capacity(0));
1017    }
1018
1019    assert!(len > 2);
1020
1021    let tlen = if with_crlf { len } else { len + 2 };
1022    let mut buf = BytesMut::with_capacity(tlen);
1023    unsafe {
1024        let b = &mut *(
1025            buf.chunk_mut() as *mut _
1026                as *mut [mem::MaybeUninit<u8>]
1027                as *mut [u8]
1028        );
1029        rin.read_exact(&mut b[..len])?;
1030        buf.advance_mut(len);
1031    }
1032
1033    // Add CRLF for parsing if its not already present
1034    // (e.g. Compression::Plain padding)
1035    if !with_crlf {
1036        buf.put_slice(CRLF)
1037    }
1038    parse_headers(&buf[..])
1039}
1040
1041// Parse header byte slice to HeaderMap.
1042fn parse_headers(buf: &[u8]) -> Result<http::HeaderMap, BarcError> {
1043    let mut headbuf = [httparse::EMPTY_HEADER; 128];
1044
1045    // FIXME: httparse will return TooManyHeaders if headbuf isn't
1046    // large enough. Hyper 0.11.15 allocates 100, so 128 is room for
1047    // _even more_ (sigh). Might be better to just replace this with
1048    // our own parser, as the grammar isn't particularly complex.
1049
1050    match httparse::parse_headers(buf, &mut headbuf) {
1051        Ok(httparse::Status::Complete((size, heads))) => {
1052            let mut hmap = http::HeaderMap::with_capacity(heads.len());
1053            assert_eq!(size, buf.len());
1054            for h in heads {
1055                let name = h.name.parse::<HeaderName>()
1056                    .map_err(|e| BarcError::InvalidHeader(e.into()))?;
1057                let value = HeaderValue::from_bytes(h.value)
1058                    .map_err(|e| BarcError::InvalidHeader(e.into()))?;
1059                hmap.append(name, value);
1060            }
1061            Ok(hmap)
1062        }
1063        Ok(httparse::Status::Partial) => {
1064            Err(BarcError::InvalidHeader(
1065                Box::new(httparse::Error::TooManyHeaders)
1066            ))
1067        }
1068        Err(e) => Err(BarcError::InvalidHeader(e.into()))
1069    }
1070}
1071
1072// Read into `BodyImage` of state `Ram` as a single buffer.
1073fn read_body_ram<R>(rin: &mut R, with_crlf: bool, len: usize)
1074    -> Result<BodyImage, BarcError>
1075    where R: Read + ?Sized
1076{
1077    if len == 0 {
1078        return Ok(BodyImage::empty());
1079    }
1080
1081    assert!(!with_crlf || len > 2);
1082
1083    let mut buf = BytesMut::with_capacity(len);
1084    unsafe {
1085        let b = &mut *(
1086            buf.chunk_mut() as *mut _
1087                as *mut [mem::MaybeUninit<u8>]
1088                as *mut [u8]
1089        );
1090        rin.read_exact(&mut b[..len])?;
1091        let l = if with_crlf { len - 2 } else { len };
1092        buf.advance_mut(l);
1093    }
1094
1095    Ok(BodyImage::from_slice(buf.freeze()))
1096}
1097
1098// Read into `BodyImage` state `FsRead`. Assumes no CRLF terminator
1099// (only used for compressed records).
1100fn read_body_fs<R>(rin: &mut R, len: u64, tune: &Tunables)
1101    -> Result<BodyImage, BarcError>
1102    where R: Read + ?Sized
1103{
1104    if len == 0 {
1105        return Ok(BodyImage::empty());
1106    }
1107
1108    let mut body = BodySink::with_fs(tune.temp_dir())?;
1109    let mut buf = BytesMut::with_capacity(tune.buffer_size_fs());
1110    loop {
1111        let rlen = {
1112            let b = unsafe { &mut *(
1113                buf.chunk_mut() as *mut _
1114                    as *mut [mem::MaybeUninit<u8>]
1115                    as *mut [u8]
1116            )};
1117            let limit = cmp::min(b.len() as u64, len - body.len()) as usize;
1118            assert!(limit > 0);
1119            match rin.read(&mut b[..limit]) {
1120                Ok(l) => l,
1121                Err(e) => {
1122                    if e.kind() == ErrorKind::Interrupted {
1123                        continue;
1124                    } else {
1125                        return Err(e.into());
1126                    }
1127                }
1128            }
1129        };
1130        if rlen == 0 {
1131            break;
1132        }
1133        unsafe { buf.advance_mut(rlen); }
1134        debug!("Write (Fs) buffer len {}", rlen);
1135        body.write_all(&buf)?;
1136
1137        if body.len() < len {
1138            buf.clear();
1139        } else {
1140            assert_eq!(body.len(), len);
1141            break;
1142        }
1143    }
1144    let body = body.prepare()?;
1145    Ok(body)
1146}
1147
1148// Return `BodyImage::FsReadSlice` for an uncompressed body in file, at the
1149// current offset of `ReadPos`, for the given length.
1150fn slice_body(rp: &mut ReadPos, len: u64) -> Result<BodyImage, BarcError> {
1151    assert!(len > 2);
1152    let offset = rp.tell();
1153
1154    // Seek past the body, as if read.
1155    rp.seek(SeekFrom::Current(len as i64))?;
1156
1157    let rslice = rp.subslice(offset, offset + len - 2); // - CRLF
1158
1159    // Safety: There is only appending writes, so within reason, the slice
1160    // (and any later memory mapping) should be safe from concurrent
1161    // modification and UB. The `allow(unused_unsafe)` is because the method
1162    // is actually not flagged as `unsafe` when the *memmap* feature is
1163    // disabled.
1164    #[allow(unused_unsafe)]
1165    {
1166        Ok(unsafe { BodyImage::from_read_slice(rslice) })
1167    }
1168}
1169
1170#[cfg(test)]
1171mod barc_tests {
1172    use std::convert::TryInto;
1173
1174    use std::fs;
1175    use std::mem::size_of;
1176    use std::path::{Path, PathBuf};
1177    use http::header::{AGE, REFERER, VIA};
1178    use super::*;
1179    use body_image::Tuner;
1180    use piccolog::test_logger;
1181    use tao_log::debugv;
1182
1183    fn barc_test_file(name: &str) -> Result<PathBuf, Flaw> {
1184        let target = env!("CARGO_MANIFEST_DIR");
1185        let path = format!("{}/../target/testmp", target);
1186        let tpath = Path::new(&path);
1187        fs::create_dir_all(tpath)?;
1188
1189        let fname = tpath.join(name);
1190        if fname.exists() {
1191            fs::remove_file(&fname)?;
1192        }
1193        Ok(fname)
1194    }
1195
1196    fn is_flaw(_f: Flaw) -> bool { true }
1197    fn is_barc_error(e: BarcError) -> bool {
1198        assert!(is_flaw(e.into()));
1199        true
1200    }
1201
1202    #[test]
1203    fn test_barc_error_as_flaw() {
1204        assert!(is_barc_error(BarcError::ReadInvalidRecHead));
1205        assert!(is_barc_error(DialogConvertError::NoMetaUrl.into()));
1206    }
1207
1208    #[test]
1209    fn test_barc_error_sizes() {
1210        assert!(test_logger());
1211        assert!(debugv!(size_of::<BarcError>()) <= 40);
1212        assert!(debugv!(size_of::<DialogConvertError>()) <= 32);
1213    }
1214
1215    #[test]
1216    fn test_write_read_small() {
1217        let fname = barc_test_file("small.barc").unwrap();
1218        let strategy = NoCompressStrategy::default();
1219        write_read_small(&fname, &strategy).unwrap();
1220    }
1221
1222    #[test]
1223    fn test_write_read_small_gzip() {
1224        assert!(test_logger());
1225        let fname = barc_test_file("small_gzip.barc").unwrap();
1226        let strategy = GzipCompressStrategy::default().set_min_len(0);
1227        write_read_small(&fname, &strategy).unwrap();
1228        assert_compression(&fname, Compression::Gzip);
1229    }
1230
1231    #[cfg(feature = "brotli")]
1232    #[test]
1233    fn test_write_read_small_brotli() {
1234        assert!(test_logger());
1235        let fname = barc_test_file("small_brotli.barc").unwrap();
1236        let strategy = BrotliCompressStrategy::default().set_min_len(0);
1237        write_read_small(&fname, &strategy).unwrap();
1238        assert_compression(&fname, Compression::Brotli);
1239    }
1240
1241    fn write_read_small(fname: &PathBuf, strategy: &dyn CompressStrategy)
1242        -> Result<(), Flaw>
1243    {
1244        let bfile = BarcFile::new(fname);
1245
1246        let req_body_str = "REQUEST BODY";
1247        let res_body_str = "RESPONSE BODY";
1248
1249        let rec_type = RecordType::Dialog;
1250        let mut meta = http::HeaderMap::new();
1251        meta.insert(AGE, "0".parse()?);
1252
1253        let mut req_headers = http::HeaderMap::new();
1254        req_headers.insert(REFERER, "http:://other.com".parse()?);
1255        req_headers.insert(
1256            http::header::CONTENT_TYPE,
1257            "text/plain".parse().unwrap()
1258        );
1259
1260        let req_body = BodyImage::from_slice(req_body_str);
1261
1262        let mut res_headers = http::HeaderMap::new();
1263        res_headers.insert(VIA, "test".parse()?);
1264        res_headers.insert(
1265            http::header::CONTENT_TYPE,
1266            "text/plain".parse().unwrap()
1267        );
1268
1269        let res_body = BodyImage::from_slice(res_body_str);
1270
1271        let mut writer = bfile.writer()?;
1272        assert!(fname.exists()); // on writer creation
1273        writer.write(&Record { rec_type, meta,
1274                               req_headers, req_body,
1275                               res_headers, res_body },
1276                     strategy)?;
1277
1278        let tune = Tunables::new();
1279        let mut reader = bfile.reader()?;
1280        let record = debugv!(reader.read(&tune))?.unwrap();
1281
1282        assert_eq!(record.rec_type, RecordType::Dialog);
1283        assert_eq!(record.meta.len(), 1);
1284        assert_eq!(record.req_headers.len(), 2);
1285        assert_eq!(record.req_body.len(), req_body_str.len() as u64);
1286        assert_eq!(record.res_headers.len(), 2);
1287        assert_eq!(record.res_body.len(), res_body_str.len() as u64);
1288
1289        let record = reader.read(&tune)?;
1290        assert!(record.is_none());
1291        Ok(())
1292    }
1293
1294    #[test]
1295    fn test_write_read_empty_record() {
1296        assert!(test_logger());
1297        let fname = barc_test_file("empty_record.barc").unwrap();
1298        let strategy = NoCompressStrategy::default();
1299        write_read_empty_record(&fname, &strategy).unwrap();
1300    }
1301
1302    #[test]
1303    fn test_write_read_empty_record_gzip() {
1304        assert!(test_logger());
1305        let fname = barc_test_file("empty_record_gzip.barc").unwrap();
1306        let strategy = GzipCompressStrategy::default().set_min_len(1);
1307        write_read_empty_record(&fname, &strategy).unwrap();
1308        assert_compression(&fname, Compression::Plain);
1309    }
1310
1311    #[cfg(feature = "brotli")]
1312    #[test]
1313    fn test_write_read_empty_record_brotli() {
1314        assert!(test_logger());
1315        let fname = barc_test_file("empty_record_brotli.barc").unwrap();
1316        let strategy = BrotliCompressStrategy::default().set_min_len(1);
1317        write_read_empty_record(&fname, &strategy).unwrap();
1318        assert_compression(&fname, Compression::Plain);
1319    }
1320
1321    fn write_read_empty_record(fname: &PathBuf, strategy: &dyn CompressStrategy)
1322        -> Result<(), Flaw>
1323    {
1324        assert!(test_logger());
1325        let bfile = BarcFile::new(fname);
1326
1327        let mut writer = bfile.writer()?;
1328
1329        writer.write(&Record::default(), strategy)?;
1330
1331        let tune = Tunables::new();
1332        let mut reader = bfile.reader()?;
1333        let record = debugv!(reader.read(&tune))?.unwrap();
1334
1335        assert_eq!(record.rec_type, RecordType::Dialog);
1336        assert_eq!(record.meta.len(), 0);
1337        assert_eq!(record.req_headers.len(), 0);
1338        assert_eq!(record.req_body.len(), 0);
1339        assert_eq!(record.res_headers.len(), 0);
1340        assert_eq!(record.res_body.len(), 0);
1341
1342        assert!(reader.read(&tune)?.is_none());
1343        Ok(())
1344    }
1345
1346    #[test]
1347    fn test_write_read_large() {
1348        assert!(test_logger());
1349        let fname = barc_test_file("large.barc").unwrap();
1350        let strategy = NoCompressStrategy::default();
1351        write_read_large(&fname, &strategy).unwrap();
1352    }
1353
1354    #[test]
1355    fn test_write_read_large_gzip() {
1356        assert!(test_logger());
1357        let fname = barc_test_file("large_gzip.barc").unwrap();
1358        let strategy = GzipCompressStrategy::default().set_min_len(0xa359b);
1359        write_read_large(&fname, &strategy).unwrap();
1360        assert_compression(&fname, Compression::Gzip);
1361    }
1362
1363    #[test]
1364    fn test_write_read_large_gzip_0() {
1365        assert!(test_logger());
1366        let fname = barc_test_file("large_gzip_0.barc").unwrap();
1367        let strategy = GzipCompressStrategy::default().set_compression_level(0);
1368        write_read_large(&fname, &strategy).unwrap();
1369        assert_compression(&fname, Compression::Gzip);
1370    }
1371
1372    #[cfg(feature = "brotli")]
1373    #[test]
1374    fn test_write_read_large_brotli() {
1375        assert!(test_logger());
1376        let fname = barc_test_file("large_brotli.barc").unwrap();
1377        let strategy = BrotliCompressStrategy::default().set_min_len(0xa359b);
1378        write_read_large(&fname, &strategy).unwrap();
1379        assert_compression(&fname, Compression::Brotli);
1380    }
1381
1382    fn assert_compression(fname: &PathBuf, comp: Compression) {
1383        let mut file = File::open(fname).unwrap();
1384        let rhead = read_record_head(&mut file).unwrap().unwrap();
1385        assert_eq!(rhead.compress, comp);
1386    }
1387
1388    fn write_read_large(fname: &PathBuf, strategy: &dyn CompressStrategy)
1389        -> Result<(), Flaw>
1390    {
1391        let bfile = BarcFile::new(fname);
1392
1393        let mut writer = bfile.writer()?;
1394
1395        let lorem_ipsum =
1396           "Lorem ipsum dolor sit amet, consectetur adipiscing elit, \
1397            sed do eiusmod tempor incididunt ut labore et dolore magna \
1398            aliqua. Ut enim ad minim veniam, quis nostrud exercitation \
1399            ullamco laboris nisi ut aliquip ex ea commodo \
1400            consequat. Duis aute irure dolor in reprehenderit in \
1401            voluptate velit esse cillum dolore eu fugiat nulla \
1402            pariatur. Excepteur sint occaecat cupidatat non proident, \
1403            sunt in culpa qui officia deserunt mollit anim id est \
1404            laborum. ";
1405
1406        let req_reps =   500;
1407        let res_reps = 1_000;
1408
1409        let mut req_body = BodySink::with_ram_buffers(req_reps);
1410        for _ in 0..req_reps {
1411            req_body.push(lorem_ipsum)?;
1412        }
1413        let req_body = req_body.prepare()?;
1414
1415        let mut res_body = BodySink::with_ram_buffers(res_reps);
1416        for _ in 0..res_reps {
1417            res_body.push(lorem_ipsum)?;
1418        }
1419        let res_body = res_body.prepare()?;
1420
1421        let mut res_headers = http::HeaderMap::default();
1422        res_headers.insert(http::header::CONTENT_TYPE, "text/plain".parse()?);
1423
1424        let mut req_headers = res_headers.clone();
1425        req_headers.insert(http::header::USER_AGENT,
1426                           "barc large tester".parse()?);
1427
1428        let mut meta = http::HeaderMap::default();
1429        meta.insert(hname_meta_res_decoded(), "identity".parse()?);
1430
1431        writer.write(
1432            &Record { req_body, req_headers, res_body, res_headers, meta,
1433                      ..Record::default() },
1434            strategy)?;
1435
1436        let tune = Tunables::new();
1437        let mut reader = bfile.reader()?;
1438        let record = debugv!(reader.read(&tune))?.unwrap();
1439
1440        assert_eq!(record.rec_type, RecordType::Dialog);
1441        assert_eq!(record.meta.len(), 1);
1442        assert_eq!(record.req_headers.len(), 2);
1443        assert_eq!(record.req_body.len(),
1444                   (lorem_ipsum.len() * req_reps) as u64);
1445        assert_eq!(record.res_headers.len(), 1);
1446        assert_eq!(record.res_body.len(),
1447                   (lorem_ipsum.len() * res_reps) as u64);
1448
1449        assert!(reader.read(&tune)?.is_none());
1450        Ok(())
1451    }
1452
1453    #[test]
1454    fn test_write_read_parallel() {
1455        assert!(test_logger());
1456        let fname = barc_test_file("parallel.barc").unwrap();
1457        let bfile = BarcFile::new(&fname);
1458        // Temp writer to ensure file is created
1459        {
1460            let mut _writer = bfile.writer().unwrap();
1461        }
1462
1463        let res_body_str = "RESPONSE BODY";
1464
1465        // Establish reader.
1466        let tune = Tunables::new();
1467        let mut reader = bfile.reader().unwrap();
1468        let record = reader.read(&tune).unwrap();
1469        assert!(record.is_none());
1470
1471        // Write record with new writer
1472        let mut writer = bfile.writer().unwrap();
1473        let res_body = BodyImage::from_slice(res_body_str);
1474
1475        let offset = writer.write(&Record {
1476            res_body, ..Record::default() },
1477            &NoCompressStrategy::default()).unwrap();
1478        assert_eq!(offset, 0);
1479        reader.seek(offset).unwrap();
1480
1481        let record = debugv!(reader.read(&tune)).unwrap().unwrap();
1482
1483        assert_eq!(record.rec_type, RecordType::Dialog);
1484        assert_eq!(record.meta.len(), 0);
1485        assert_eq!(record.req_headers.len(), 0);
1486        assert_eq!(record.req_body.len(), 0);
1487        assert_eq!(record.res_headers.len(), 0);
1488        assert_eq!(record.res_body.len(), res_body_str.len() as u64);
1489
1490        let record = reader.read(&tune).unwrap();
1491        assert!(record.is_none());
1492
1493        // Write another, empty
1494        writer.write(&Record::default(),
1495                     &NoCompressStrategy::default()).unwrap();
1496
1497        let record = reader.read(&tune).unwrap().unwrap();
1498        assert_eq!(record.rec_type, RecordType::Dialog);
1499        assert_eq!(record.res_body.len(), 0);
1500
1501        let record = reader.read(&tune).unwrap();
1502        assert!(record.is_none());
1503    }
1504
1505    #[test]
1506    fn test_read_sample() {
1507        assert!(test_logger());
1508        let tune = Tunables::new();
1509        let bfile = BarcFile::new("sample/example.barc");
1510        let mut reader = bfile.reader().unwrap();
1511        let record = debugv!(reader.read(&tune)).unwrap().unwrap();
1512
1513        assert_eq!(record.rec_type, RecordType::Dialog);
1514        assert_eq!(record.meta.len(), 5);
1515        assert_eq!(record.req_headers.len(), 4);
1516        assert!(record.req_body.is_empty());
1517        assert_eq!(record.res_headers.len(), 11);
1518
1519        assert!(record.res_body.is_ram());
1520        let mut br = record.res_body.reader();
1521        let mut buf = Vec::with_capacity(2048);
1522        br.read_to_end(&mut buf).unwrap();
1523        assert_eq!(buf.len(), 1270);
1524        assert_eq!(&buf[0..15], b"<!doctype html>");
1525        assert_eq!(&buf[(buf.len()-8)..], b"</html>\n");
1526
1527        let record = reader.read(&tune).unwrap();
1528        assert!(record.is_none());
1529    }
1530
1531    #[test]
1532    fn test_record_convert_dialog() {
1533        let tune = Tunables::new();
1534        let bfile = BarcFile::new("sample/example.barc");
1535        let mut reader = bfile.reader().unwrap();
1536        let rc1 = reader.read(&tune).unwrap().unwrap();
1537
1538        let dl: Dialog = rc1.clone().try_into().unwrap();
1539        let rc2: Record = dl.try_into().unwrap();
1540        assert_eq!(rc1.rec_type, rc2.rec_type);
1541        assert_eq!(rc1.meta, rc2.meta);
1542        assert_eq!(rc1.req_headers, rc2.req_headers);
1543        assert_eq!(rc1.req_body.len(), rc2.req_body.len());
1544        assert_eq!(rc1.res_headers, rc2.res_headers);
1545        assert_eq!(rc1.res_body.len(), rc2.res_body.len());
1546    }
1547
1548    #[test]
1549    fn test_record_convert_dialog_204() {
1550        let tune = Tunables::new();
1551        let bfile = BarcFile::new("sample/204_no_body.barc");
1552        let mut reader = bfile.reader().unwrap();
1553        let rc1 = reader.read(&tune).unwrap().unwrap();
1554
1555        let dl: Dialog = rc1.clone().try_into().unwrap();
1556        let rc2: Record = dl.try_into().unwrap();
1557        assert_eq!(rc1.rec_type, rc2.rec_type);
1558        assert_eq!(rc1.meta, rc2.meta);
1559        assert_eq!(rc1.req_headers, rc2.req_headers);
1560        assert_eq!(rc1.req_body.len(), rc2.req_body.len());
1561        assert_eq!(rc1.res_headers, rc2.res_headers);
1562        assert_eq!(rc1.res_body.len(), rc2.res_body.len());
1563    }
1564
1565    #[test]
1566    fn test_read_sample_larger() {
1567        assert!(test_logger());
1568        let record = {
1569            let tune = Tuner::new()
1570                .set_max_body_ram(1024) // < 1270 expected length
1571                .finish();
1572
1573            let bfile = BarcFile::new("sample/example.barc");
1574            let mut reader = bfile.reader().unwrap();
1575            let r = reader.read(&tune).unwrap().unwrap();
1576
1577            let next = reader.read(&tune).unwrap();
1578            assert!(next.is_none());
1579            r
1580        };
1581        debugv!(&record);
1582
1583        assert!(!record.res_body.is_ram());
1584        let mut br = record.res_body.reader();
1585        let mut buf = Vec::with_capacity(2048);
1586        br.read_to_end(&mut buf).unwrap();
1587        assert_eq!(buf.len(), 1270);
1588        assert_eq!(&buf[0..15], b"<!doctype html>");
1589        assert_eq!(&buf[(buf.len()-8)..], b"</html>\n");
1590    }
1591
1592    #[cfg(feature = "mmap")]
1593    #[test]
1594    fn test_read_sample_mapped() {
1595        assert!(test_logger());
1596        let mut record = {
1597            let tune = Tuner::new()
1598                .set_max_body_ram(1024) // < 1270 expected length
1599                .finish();
1600
1601            let bfile = BarcFile::new("sample/example.barc");
1602            let mut reader = bfile.reader().unwrap();
1603            let r = reader.read(&tune).unwrap().unwrap();
1604
1605            let next = reader.read(&tune).unwrap();
1606            assert!(next.is_none());
1607            r
1608        };
1609        record.res_body.mem_map().unwrap();
1610        debugv!(&record);
1611
1612        assert!(!record.res_body.is_ram());
1613        let mut br = record.res_body.reader();
1614        let mut buf = Vec::with_capacity(2048);
1615        br.read_to_end(&mut buf).unwrap();
1616        assert_eq!(buf.len(), 1270);
1617        assert_eq!(&buf[0..15], b"<!doctype html>");
1618        assert_eq!(&buf[(buf.len()-8)..], b"</html>\n");
1619    }
1620
1621    #[test]
1622    fn test_read_empty_file() {
1623        assert!(test_logger());
1624        let tune = Tunables::new();
1625        let bfile = BarcFile::new("sample/empty.barc");
1626        let mut reader = bfile.reader().unwrap();
1627        let record = reader.read(&tune).unwrap();
1628        assert!(record.is_none());
1629
1630        // Shouldn't have moved
1631        let record = debugv!(reader.read(&tune)).unwrap();
1632        assert!(record.is_none());
1633    }
1634
1635    #[test]
1636    fn test_read_over_reserved() {
1637        assert!(test_logger());
1638        let tune = Tunables::new();
1639        let bfile = BarcFile::new("sample/reserved.barc");
1640        let mut reader = bfile.reader().unwrap();
1641        let record = debugv!(reader.read(&tune)).unwrap();
1642
1643        assert!(record.is_none());
1644
1645        // Should seek back to do it again
1646        let record = reader.read(&tune).unwrap();
1647        assert!(record.is_none());
1648    }
1649
1650    #[test]
1651    fn test_read_short_record_head() {
1652        let tune = Tunables::new();
1653        let bfile = BarcFile::new("sample/reserved.barc");
1654        let mut reader = bfile.reader().unwrap();
1655
1656        // Seek to bad position
1657        reader.seek(1).unwrap();
1658
1659        if let Err(e) = reader.read(&tune) {
1660            if let BarcError::ReadIncompleteRecHead(l) = e {
1661                assert_eq!(l, V2_HEAD_SIZE - 1);
1662                let em = e.to_string();
1663                assert!(em.contains("Incomplete"), "{}", em)
1664            } else {
1665                panic!("Other error: {}", e);
1666            }
1667        } else {
1668            panic!("Should not succeed!");
1669        }
1670    }
1671
1672    #[test]
1673    fn test_read_bad_record_head() {
1674        let tune = Tunables::new();
1675        let bfile = BarcFile::new("sample/example.barc");
1676        let mut reader = bfile.reader().unwrap();
1677
1678        // Seek to bad position
1679        reader.seek(1).unwrap();
1680
1681        if let Err(e) = reader.read(&tune) {
1682            if let BarcError::ReadInvalidRecHead = e {
1683                //expected
1684            } else {
1685                panic!("Other error: {}", e);
1686            }
1687        } else {
1688            panic!("Should not succeed!");
1689        }
1690    }
1691
1692    #[test]
1693    fn test_read_truncated() {
1694        let tune = Tunables::new();
1695        let bfile = BarcFile::new("sample/truncated.barc");
1696        let mut reader = bfile.reader().unwrap();
1697        if let Err(e) = reader.read(&tune) {
1698            if let BarcError::Io(ioe) = e {
1699                assert_eq!(ErrorKind::UnexpectedEof, ioe.kind());
1700            } else {
1701                panic!("Other error type {:?}", e);
1702            }
1703        } else {
1704            panic!("Should not succeed!");
1705        }
1706    }
1707
1708    #[test]
1709    fn test_read_204_no_body() {
1710        assert!(test_logger());
1711        let tune = Tunables::new();
1712        let bfile = BarcFile::new("sample/204_no_body.barc");
1713        let mut reader = bfile.reader().unwrap();
1714        let record = debugv!(reader.read(&tune)).unwrap().unwrap();
1715
1716        assert_eq!(record.rec_type, RecordType::Dialog);
1717        assert_eq!(record.meta.len(), 4);
1718        assert_eq!(record.req_headers.len(), 4);
1719        assert!(record.req_body.is_empty());
1720        assert_eq!(record.res_headers.len(), 9);
1721
1722        assert!(record.res_body.is_empty());
1723    }
1724}