barrique 1.0.0

Portable binary serialiation format
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
use crate::decode::{ReadError, Reader};
use crate::encode::{write_to_uninit, Encode, WriteError, Writer};
use crate::lz4::{compress_bound, CompressStream, DecompressStream};

use core::marker::PhantomData;
use core::ptr::copy_nonoverlapping;
use core::slice::SliceIndex;
use core::num::NonZeroU64;

use alloc::vec::Vec;

use twox_hash::XxHash64;

/// Maximum size of region body.
pub(crate) const REGION_SIZE: usize = 64 * 1024 - 1;

/// Returns the maximum possible size of an encoded region stream
/// containing a value sized in `size` bytes.
///
/// This does not account for possible early state switches in
/// non-last regions because that directly depends
/// on [`Encode`] implementation
pub const fn max_encoded_size(size: usize) -> usize {
    (size / REGION_SIZE) * compress_bound(REGION_SIZE)
        + compress_bound(size - (size / REGION_SIZE) * REGION_SIZE)
        + size.div_ceil(REGION_SIZE) * HEADER_SIZE
}

/// Seed of a region hash.
///
/// This struct allows disabling computation of a hash for the region header;
/// however, the field will not be truncated.
///
/// # Examples
///
/// Passing an enabled seed:
///
/// ```
/// use barrique::encode::StreamEncoder;
///
/// let seed = Seed::new(0);
/// // Now, if we call an actual `Encode` implementation with this bearer,
/// // resulting stream will contain hash
/// let _ = StreamEncoder::new(vec![], seed, Default::default());
/// ```
///
/// If you wish to disable hash computation:
///
/// ```
/// use barrique::region::Seed;
///
/// assert!(Seed::empty(), Default::default());
/// ```
#[derive(Default, Copy, Clone)]
#[repr(transparent)]
pub struct Seed {
    inner: u64,
}

// Public methods
impl Seed {
    /// Constructs a new [`Seed`] indicating an enabled hash field.
    ///
    /// Note: actual seed value is incremented by one as the zero seed
    /// disables the hash computation
    pub fn new(seed: u64) -> Self {
        Self {
            inner: seed.saturating_add(1),
        }
    }

    /// Constructs an empty seed, disabling hash field
    pub fn empty() -> Self {
        Self { inner: 0 }
    }
}

// Private methods
impl Seed {
    /// Produces a hash of `bytes` if this seed is non-zero, otherwise
    /// returns zero
    pub(crate) fn hash(&self, bytes: &[u8]) -> u64 {
        if self.is_empty() {
            return 0;
        }
        XxHash64::oneshot(self.inner, bytes)
    }

    /// Checks if this seed equals to `0`, indicating explicitly disabled
    /// hash inclusion
    pub(crate) fn is_empty(&self) -> bool {
        self.inner == 0
    }
}

impl From<u64> for Seed {
    fn from(seed: u64) -> Self {
        Self::new(seed)
    }
}

impl From<NonZeroU64> for Seed {
    fn from(seed: NonZeroU64) -> Self {
        Self::new(seed.get())
    }
}

/// A hint of capacity required for bearer to stream a pipeline.
///
/// Bearers are inherently pairs of buffers holding some amount of
/// bytes and operating pointwise on given state of these buffers
/// and the size of these buffers is limited to 64 KiB. This enum
/// is essentially capacity value which will be used to allocate
/// a bearer.
///
/// # Example
///
/// An example of hinting size of a value which will be encoded:
///
/// ```
/// use barrique::region::AllocOrd;
/// use barrique::encode::StreamEncoder;
///
/// let value = String::new("Hello, world!");
/// let ord = AllocOrd::Auto(&value);
///
/// // `AllocOrd::Auto` usually applied to encode bearers only
/// let mut encoder = StreamEncoder::new(vec![], 0, ord);
///
/// // Now, if we call value's implementation, bearer will exactly
/// // fit encoded bytes without any remaining capacity:
/// // ... <String as Encode>::encode(&mut encoder, &value);
/// ```
///
/// # Default value
///
/// Default value of [`AllocOrd`] is 4 KiB
pub enum AllocOrd<T = PhantomData<()>>
where
    T: Encode,
{
    /// An explicitly specified capacity in bytes
    Manual(isize),
    /// Capacity derived from `<T as Encode>::size_of` method call
    Auto(T),
    /// Hint to allocate the maximum capacity possible
    Full,
}

impl Default for AllocOrd {
    fn default() -> Self {
        AllocOrd::Manual(4 * 1024)
    }
}

impl<T: Encode> AllocOrd<T> {
    /// Returns capacity hinted in bytes
    #[inline]
    pub(crate) fn cap(&self) -> usize {
        match self {
            Self::Manual(cap) => (*cap).max(0) as usize,
            Self::Auto(hint) => hint.size_of(),
            Self::Full => REGION_SIZE * 2,
        }
    }
}

/// An error type returned by region processing related methods
#[derive(Debug, thiserror::Error)]
pub enum RegionError {
    #[error("Failed to read data into a region buffer")]
    ReadFailure(#[from] ReadError),
    #[error("Failed to allocate capacity to write a region buffer")]
    WriteFailure(#[from] WriteError),
    #[error("Invalid region size hint")]
    InvalidSizeTip,
    #[error("Malformed region")]
    MalformedRegion,
    #[error("Hash is not valid for contiguous region")]
    InvalidHash,
    #[error("Requested region operation is out of bounds of current capacity")]
    OutOfBounds,
}

/// Two uninitialized region buffers, used for region streaming.
///
/// LZ4 streaming compression implemented using `_continue` functions, which
/// require to keep data referred in the stream alive. Only one additional region
/// stored since LZ4 window size is 64 KiB, which is the same as region size.
///
/// The `previous` is used only in an even switch, meaning it will be used
/// after first 64 KiB span streamed, which is the reason of why it is
/// wrapped into [`Option`], escaping additional 64 KiB allocation for
/// smaller passes
struct DoubleBuffer {
    curr: Vec<u8>,
    prev: Option<Vec<u8>>,
}

impl DoubleBuffer {
    /// Constructs a new [`DoubleBuffer`] capable of streaming `cap` bytes
    fn new(cap: usize) -> Self {
        let curr = Vec::with_capacity(cap.min(REGION_SIZE));
        let prev =
            (cap > REGION_SIZE).then(|| Vec::with_capacity((cap - REGION_SIZE).min(REGION_SIZE)));

        Self { curr, prev }
    }

    /// Swaps current and previous buffers
    fn swap(&mut self) {
        let prev = self.prev.get_or_insert_with(|| {
            // This branch is called only in case of incorrect `AllocOrd` hint,
            // so the allocation is small
            let predict = if self.curr.len() > 4 * 1024 /* 4 Kib */ { self.curr.len() / 4 } else { 256 };
            Vec::with_capacity(predict)
        });
        core::mem::swap(&mut self.curr, prev);
    }

    /// Invokes `pass` method of the given `authority` with access to current buffer.
    ///
    /// Access to contents of [`DoubleBuffer`] achieved via [`SwitchAuthority`] only, implementations
    /// of which are limited to [`Push`] and [`Pull`]
    fn authorize_pass<S: SwitchAuthority>(&mut self, authority: &mut S) -> Result<(), RegionError> {
        authority.pass(&mut self.curr)
    }

    /// Returns the length of the current buffer
    fn len(&self) -> usize {
        self.curr.len()
    }

    /// Returns requested range or index of current buffer
    fn get<I>(&self, idx: I) -> Option<&I::Output>
    where
        I: SliceIndex<[u8]>,
    {
        self.curr.get(idx)
    }

    /// Extends current buffer with `src` bytes.
    ///
    /// # Safety
    ///
    /// - ranges of `src` and allocation of current buffer must not overlap
    unsafe fn extend_nonoverlapping(&mut self, src: &[u8]) {
        self.curr.reserve(src.len());
        let dst = self.curr.spare_capacity_mut();
        unsafe {
            // Safety:
            // - caller guarantees ranges of `src` and current vector to
            //   not overlap.
            // - capacity is sufficient to accommodate `src` since `reserve` was invoked
            copy_nonoverlapping(src.as_ptr(), dst.as_mut_ptr().cast(), src.len());

            // Safety: we've just initialized these bytes
            self.curr.set_len(self.curr.len() + src.len())
        }
    }
}

/// A buffer for operations on region stream, containing a [`DoubleBuffer`]
/// and a cursor for tracking current position
pub struct RegionBuffer {
    buffer: DoubleBuffer,
    cursor: usize,
}

impl RegionBuffer {
    /// Constructs an empty region buffer capable of streaming `cap` bytes
    pub fn new(cap: usize) -> Self {
        Self {
            buffer: DoubleBuffer::new(cap),
            cursor: 0,
        }
    }

    /// Returns remaining capacity of this region buffer
    pub fn remaining_cap(&self) -> usize {
        REGION_SIZE - self.buffer.len()
    }

    /// Returns remaining length of this region buffer
    pub fn remaining_len(&self) -> usize {
        self.buffer.len() - self.cursor
    }

    /// Calls a pass implementation for provided [`SwitchAuthority`], performing
    /// region state switch on the current region buffer.
    ///
    /// Cursor will be reset to `0` after a successful pass
    pub fn pass<S: SwitchAuthority>(&mut self, authority: &mut S) -> Result<(), RegionError> {
        self.buffer.authorize_pass(authority)?;
        self.cursor = 0;

        Ok(())
    }

    /// Swaps the current buffer with previous
    pub fn swap(&mut self) {
        self.buffer.swap();
    }

    /// Reads `n` bytes of this region buffer from the current cursor position
    pub fn read(&mut self, n: usize) -> Option<&[u8]> {
        let bytes = self.buffer.get(self.cursor..self.cursor + n)?;
        self.cursor = self.cursor.saturating_add(n);

        Some(bytes)
    }

    /// Writes `src` bytes into this region buffer.
    ///
    /// Inner buffer will be reallocated if capacity is insufficient
    /// to hold `src` more bytes.
    ///
    /// # Safety
    ///
    /// - ranges of `src` and internal buffer allocation must not overlap. This
    ///   is only possible if given `src` derived from `read` method result
    pub unsafe fn write_nonoverlapping(&mut self, src: &[u8]) {
        self.buffer.extend_nonoverlapping(src);
    }
}

/// Byte size of region header
const HEADER_SIZE: usize = 12;

/// In-memory representation of region header.
///
/// Byte format of region header stated in specification declared as following:
///
/// ```non-rust
/// RegionHeader:
/// [CompressedSize; 2][AllocSize; 2][RegionHash; 8]
/// ```
///
/// `CompressedSize` and `AllocSize` are 2-byte unsigned integers indicating compressed
/// contents and raw contents lengths accordingly. `RegionHash` is a result of XXHASH64
/// hash function performed on raw (decompressed) contents of this region
#[derive(Debug)]
struct RegionHeader {
    compressed_size: u16,
    alloc_size: u16,
    hash: u64,
}

impl RegionHeader {
    /// Constructs a new [`RegionHeader`] from serialized header bytes.
    ///
    /// This method will panic if `bytes` length is less than [`HEADER_SIZE`]
    fn from_bytes(bytes: &[u8]) -> Self {
        if bytes.len() < HEADER_SIZE {
            panic!("Attempt to create a header from slice with length less than HEADER_SIZE");
        }

        let compressed_size = u16::from_le_bytes(bytes[..2].try_into().unwrap());
        let alloc_size = u16::from_le_bytes(bytes[2..4].try_into().unwrap());
        let hash = u64::from_le_bytes(bytes[4..12].try_into().unwrap());

        Self { compressed_size, alloc_size, hash }
    }

    /// Serializes this [`RegionHeader`] into bytes
    fn into_bytes(self) -> [u8; HEADER_SIZE] {
        let mut buf = [0u8; HEADER_SIZE];

        buf[..2].clone_from_slice(&self.compressed_size.to_le_bytes());
        buf[2..4].clone_from_slice(&self.alloc_size.to_le_bytes());
        buf[4..12].copy_from_slice(&self.hash.to_le_bytes());

        buf
    }
}

mod private {
    use super::*;

    pub trait Sealed {}

    impl<W: Writer> Sealed for Push<W> {}
    impl<R: Reader> Sealed for Pull<R> {}
}

/// A trait defining authorized implementation of region switch
/// fiduciary to get access to region buffer contents.
///
/// The region switch is a pass of current state of a region buffer
/// which main point is to prepare the buffer for streaming next
/// region of bytes.
///
/// # Pull switch
///
/// The [`Pull`] authority implementation is a switch of read pipeline:
/// new region is read, decompressed and copied into the given
/// region buffer.
///
/// # Push switch
///
/// The [`Push`] authority implementation is a switch of write pipeline:
/// current contents of the region buffer compressed, structured into
/// a region format and flushed into internal destination
pub trait SwitchAuthority: private::Sealed {
    /// Perform a state switch on the `buf` passed
    fn pass(&mut self, buf: &mut Vec<u8>) -> Result<(), RegionError>;
}

/// A [`SwitchAuthority`] implementation for [`StreamDecoder`] operations
///
/// [`StreamDecoder`]: crate::decode::StreamDecoder
pub(crate) struct Pull<R>
where
    R: Reader,
{
    stream: DecompressStream,
    seed: Seed,
    source: R,
}

impl<R> Pull<R>
where
    R: Reader,
{
    /// Constructs a new [`Pull`] authority with `src` source [`Reader`] and `seed`
    pub fn new(src: R, seed: Seed) -> Self {
        Self {
            stream: DecompressStream::new(),
            source: src,
            seed,
        }
    }

    /// Returns contained region hash seed
    pub const fn seed(&self) -> Seed {
        self.seed
    }
}

impl<R> SwitchAuthority for Pull<R>
where
    R: Reader,
{
    /// Reads next region from the [`Reader`] source, decompresses the body to the region buffer and
    /// verifies a hash of resulting data. Buffer swap must be performed *before* the pass.
    ///
    /// This method will panic if [`Reader`] implementation returned invalid result
    fn pass(&mut self, buf: &mut Vec<u8>) -> Result<(), RegionError> {
        let header = RegionHeader::from_bytes(self.source.read_borrow(HEADER_SIZE)?);

        let size = HEADER_SIZE + header.compressed_size as usize;
        let bytes = self.source.read_borrow(size).map_err(|e| match e {
            ReadError::OutOfBounds => RegionError::InvalidSizeTip,
            #[cfg_attr(not(feature = "std"), allow(unreachable_patterns))]
            _ => e.into(),
        })?;

        // `Reader` is not an unsafe trait, so check of incorrect implementation required. For
        // region header, constructor will panic in case of slice with length less
        // than requested `HEADER_SIZE`
        assert_eq!(
            bytes.len(),
            size,
            "Mismatch between Reader implementation result and actual length requested"
        );

        unsafe {
            // Safety:
            // - length set to `0` so no elements need to be initialized.
            // - `0` is always less or equal to capacity
            buf.set_len(0);
        }

        buf.reserve(header.alloc_size as usize);
        unsafe {
            // Safety:
            // - decompressed data stored in internal buffer inside `RegionBuffer`, semantics
            //   of which guarantees region buffer to live long enough.
            // - extremely unlikely that single memory allocation exceeds `c_int::MAX`
            let init = self
                .stream
                .decompress(&bytes[HEADER_SIZE..], buf.spare_capacity_mut())
                .ok_or(RegionError::MalformedRegion)?;

            // Safety: `..init` bytes are initialized by `decompress` call
            // and within bounds of `buf` capacity since it was given a
            // slice from `spare_capacity_mut`
            buf.set_len(init);
        }

        if self.seed.hash(buf) != header.hash {
            return Err(RegionError::InvalidHash);
        }

        self.source.advance(size);
        Ok(())
    }
}

/// A [`SwitchAuthority`] implementation for [`WriteBearer`] operations
///
/// [`WriteBearer`]: crate::encode::WriteBearer
pub(crate) struct Push<W>
where
    W: Writer,
{
    stream: CompressStream,
    seed: Seed,
    writer: W,
}

impl<W> Push<W>
where
    W: Writer,
{
    /// Creates a new [`Push`] with `dst` [`Writer`] destination and `seed`
    pub fn new(dst: W, seed: Seed) -> Self {
        Self {
            stream: CompressStream::new(),
            writer: dst,
            seed,
        }
    }

    /// Return contained region hash seed
    pub const fn seed(&self) -> Seed {
        self.seed
    }
}

impl<W> SwitchAuthority for Push<W>
where
    W: Writer,
{
    /// Serializes initialized part of `buf` and flushed it into internal [`Writer`].
    /// Buffer swap must be performed *after* a pass.
    ///
    /// This method will panic in case of invalid result of [`Writer`] implementation.
    fn pass(&mut self, buf: &mut Vec<u8>) -> Result<(), RegionError> {
        // Situation when region buffer appears empty (e.g. unnecessary flush) is not dangerous,
        // but since LZ4 wrapper treats 0 as an error, this check is performed
        // to improve error clarity
        if buf.is_empty() {
            return Ok(());
        }

        let size = compress_bound(buf.len()) + HEADER_SIZE;
        let arena = self.writer.allocate(size)?;

        // Similarly to `Pull` implementation, we check for incorrect implementation
        assert_eq!(
            arena.len(),
            size,
            "Mismatch between Writer implementation result and actual length requested"
        );

        let compressed = unsafe {
            // Safety:
            // - uncompressed data stored in internal buffer inside `RegionBuffer`, semantics
            //   of which guarantees region buffer to live long enough.
            // - extremely unlikely that single memory allocation exceeds `c_int::MAX`
            self.stream
                .compress(buf, &mut arena[HEADER_SIZE..])
                .ok_or(RegionError::MalformedRegion)?
        };

        let header = RegionHeader {
            compressed_size: compressed as u16,
            alloc_size: buf.len() as u16,
            hash: self.seed.hash(buf),
        };
        write_to_uninit(header.into_bytes().as_slice(), arena);

        unsafe {
            // Safety:
            // - length set to `0` so no elements need to be initialized.
            // - `0` is always less or equal to capacity.
            // Note: this is necessary because `extend_nonoverlapping` method extends
            // from the length, which is, unlike the cursor, not reset by the
            // region buffer
            buf.set_len(0);

            // Safety:
            // - `compress` call initialized `HEADER_SIZE..compressed` range,
            //   header range initialized by `write_to_uninit` above.
            // - commitment of `n` can not overflow since `compressed` can't be
            //   greater than `compress_bound`, which is the size of requested
            //   allocation without header bytes
            self.writer.commit(HEADER_SIZE + compressed);
        }
        Ok(())
    }
}