tdb_succinct/
logarray.rs

1#![allow(clippy::precedence, clippy::verbose_bit_mask)]
2
3//! Code for storing, loading, and using log arrays.
4//!
5//! A log array is a contiguous sequence of N unsigned integers, with each value occupying exactly
6//! W bits. By choosing W as the minimal bit width required for the largest value in the array, the
7//! whole sequence can be compressed while increasing the constant cost of indexing by a few
8//! operations over the typical byte-aligned array.
9//!
10//! The log array operations in this module use the following implementation:
11//!
12//! 1. The input buffer can be evenly divided into L+1 words, where a word is 64 bits.
13//! 2. The first L words are the data buffer, a contiguous sequence of elements, where an element
14//!    is an unsigned integer represented by W bits.
15//! 3. The L+1 word is the control word and contains the following sequence:
16//!    1. a 32-bit unsigned integer representing N, the number of elements,
17//!    2. an 8-bit unsigned integer representing W, the number of bits used to store each element,
18//!       and
19//!    3. 24 unused bits.
20//!
21//! # Notes
22//!
23//! * All integers are stored in a standard big-endian encoding.
24//! * The maximum bit width W is 64.
25//! * The maximum number of elements is 2^32-1.
26//!
27//! # Naming
28//!
29//! Because of the ambiguity of the English language and possibility to confuse the meanings of the
30//! words used to describe aspects of this code, we try to use the following definitions
31//! consistently throughout:
32//!
33//! * buffer: a contiguous sequence of bytes
34//!
35//! * size: the number of bytes in a buffer
36//!
37//! * word: a 64-bit contiguous sequence aligned on 8-byte boundaries starting at the beginning of
38//!     the input buffer
39//!
40//! * element: a logical unsigned integer value that is a member of the log array
41//!
42//! * index: the logical address of an element in the data buffer. A physical index is preceded by
43//!     word, byte, or bit to indicate the address precision of the index.
44//!
45//! * offset: the number of bits preceding the msb of an element within the first word containing
46//!     that element
47//!
48//! * width: the number of bits that every element occupies in the log array
49//!
50//! * length: the number of elements in the log array
51
52use crate::storage::{FileLoad, SyncableFile};
53
54use super::util::{self, calculate_width};
55use byteorder::{BigEndian, ByteOrder};
56use bytes::{Buf, BufMut, Bytes, BytesMut};
57use futures::stream::{Stream, StreamExt};
58use std::{cmp::Ordering, convert::TryFrom, error, fmt, io};
59use tokio::io::{AsyncReadExt, AsyncWriteExt};
60use tokio_util::codec::{Decoder, FramedRead};
61
62use itertools::Itertools;
63
64// Static assertion: We expect the system architecture bus width to be >= 32 bits. If it is not,
65// the following line will cause a compiler error. (Ignore the unrelated error message itself.)
66const _: usize = 0 - !(std::mem::size_of::<usize>() >= 32 >> 3) as usize;
67
68/// An in-memory log array
69#[derive(Clone)]
70pub struct LogArray {
71    /// Index of the first accessible element
72    ///
73    /// For an original log array, this is initialized to 0. For a slice, this is the index to the
74    /// first element of the slice.
75    first: u64,
76
77    /// Number of accessible elements
78    ///
79    /// For an original log array, this is initialized to the value read from the control word. For
80    /// a slice, it is the length of the slice.
81    len: u64,
82
83    /// Bit width of each element
84    width: u8,
85
86    /// Shared reference to the input buffer
87    ///
88    /// Index 0 points to the first byte of the first element. The last word is the control word.
89    input_buf: Bytes,
90}
91
92impl std::fmt::Debug for LogArray {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        write!(f, "LogArray([{}])", self.iter().format(", "))
95    }
96}
97
98/// An error that occurred during a log array operation.
99#[derive(Debug, PartialEq)]
100pub enum LogArrayError {
101    InputBufferTooSmall(usize),
102    WidthTooLarge(u8),
103    UnexpectedInputBufferSize(u64, u64, u64, u8),
104}
105
106impl LogArrayError {
107    /// Validate the input buffer size.
108    ///
109    /// It must have at least the control word.
110    fn validate_input_buf_size(input_buf_size: usize) -> Result<(), Self> {
111        if input_buf_size < 8 {
112            return Err(LogArrayError::InputBufferTooSmall(input_buf_size));
113        }
114        Ok(())
115    }
116
117    /// Validate the number of elements and bit width against the input buffer size.
118    ///
119    /// The bit width should no greater than 64 since each word is 64 bits.
120    ///
121    /// The input buffer size should be the appropriate multiple of 8 to include the exact number
122    /// of encoded elements plus the control word.
123    fn validate_len_and_width(input_buf_size: usize, len: u64, width: u8) -> Result<(), Self> {
124        if width > 64 {
125            return Err(LogArrayError::WidthTooLarge(width));
126        }
127
128        // Calculate the expected input buffer size. This includes the control word.
129        // To avoid overflow, convert `len: u32` to `u64` and do the addition in `u64`.
130        let expected_buf_size = len * u64::from(width) + 127 >> 6 << 3;
131        let input_buf_size = u64::try_from(input_buf_size).unwrap();
132
133        if input_buf_size != expected_buf_size {
134            return Err(LogArrayError::UnexpectedInputBufferSize(
135                input_buf_size,
136                expected_buf_size,
137                len,
138                width,
139            ));
140        }
141
142        Ok(())
143    }
144
145    /// Validate the number of elements and bit width against the input buffer size.
146    ///
147    /// The bit width should no greater than 64 since each word is 64 bits.
148    ///
149    /// The input buffer size should be at least the appropriate
150    /// multiple of 8 to include the exact number of encoded elements
151    /// plus the control word. It is allowed to be larger.
152    fn validate_len_and_width_trailing(
153        input_buf_size: usize,
154        len: u64,
155        width: u8,
156    ) -> Result<(), Self> {
157        if width > 64 {
158            return Err(LogArrayError::WidthTooLarge(width));
159        }
160
161        // Calculate the expected input buffer size. This includes the control word.
162        // To avoid overflow, convert `len: u32` to `u64` and do the addition in `u64`.
163        let expected_buf_size = len * u64::from(width) + 127 >> 6 << 3;
164        let input_buf_size = u64::try_from(input_buf_size).unwrap();
165
166        if input_buf_size < expected_buf_size {
167            return Err(LogArrayError::UnexpectedInputBufferSize(
168                input_buf_size,
169                expected_buf_size,
170                len,
171                width,
172            ));
173        }
174
175        Ok(())
176    }
177}
178
179impl fmt::Display for LogArrayError {
180    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
181        use LogArrayError::*;
182        match self {
183            InputBufferTooSmall(input_buf_size) => {
184                write!(f, "expected input buffer size ({}) >= 8", input_buf_size)
185            }
186            WidthTooLarge(width) => write!(f, "expected width ({}) <= 64", width),
187            UnexpectedInputBufferSize(input_buf_size, expected_buf_size, len, width) => write!(
188                f,
189                "expected input buffer size ({}) to be {} for {} elements and width {}",
190                input_buf_size, expected_buf_size, len, width
191            ),
192        }
193    }
194}
195
196impl error::Error for LogArrayError {}
197
198impl From<LogArrayError> for io::Error {
199    fn from(err: LogArrayError) -> io::Error {
200        io::Error::new(io::ErrorKind::InvalidData, err)
201    }
202}
203
204#[derive(Clone)]
205pub struct LogArrayIterator {
206    logarray: LogArray,
207    pos: usize,
208    end: usize,
209}
210
211impl Iterator for LogArrayIterator {
212    type Item = u64;
213    fn next(&mut self) -> Option<u64> {
214        if self.pos == self.end {
215            None
216        } else {
217            let result = self.logarray.entry(self.pos);
218            self.pos += 1;
219
220            Some(result)
221        }
222    }
223}
224
225const MAX_LOGARRAY_LEN: u64 = (1 << 56) - 1;
226
227pub fn parse_control_word(buf: &[u8]) -> (u64, u8) {
228    let len_1 = BigEndian::read_u32(buf) as u64;
229    let width = buf[4];
230    let len_2 = (BigEndian::read_u32(&buf[4..]) & 0xFFFFFF) as u64; // ignore width byte
231    let len: u64 = (len_2 << 32) + len_1;
232
233    (len, width)
234}
235
236/// Read the length and bit width from the control word buffer. `buf` must start at the first word
237/// after the data buffer. `input_buf_size` is used for validation.
238fn read_control_word(buf: &[u8], input_buf_size: usize) -> Result<(u64, u8), LogArrayError> {
239    let (len, width) = parse_control_word(buf);
240    LogArrayError::validate_len_and_width(input_buf_size, len, width)?;
241    Ok((len, width))
242}
243
244/// Read the length and bit width from the control word buffer. `buf` must start at the first word
245/// after the data buffer. `input_buf_size` is used for validation, where it is allowed to be larger than expected.
246fn read_control_word_trailing(
247    buf: &[u8],
248    input_buf_size: usize,
249) -> Result<(u64, u8), LogArrayError> {
250    let (len, width) = parse_control_word(buf);
251    LogArrayError::validate_len_and_width_trailing(input_buf_size, len, width)?;
252    Ok((len, width))
253}
254
255fn logarray_length_from_len_width(len: u64, width: u8) -> usize {
256    let num_bits = width as usize * len as usize;
257    let num_u64 = num_bits / 64 + (if num_bits % 64 == 0 { 0 } else { 1 });
258    let num_bytes = num_u64 * 8;
259
260    num_bytes
261}
262
263pub fn logarray_length_from_control_word(buf: &[u8]) -> usize {
264    let (len, width) = parse_control_word(buf);
265
266    logarray_length_from_len_width(len, width)
267}
268
269impl LogArray {
270    /// Construct a `LogArray` by parsing a `Bytes` buffer.
271    pub fn parse(input_buf: Bytes) -> Result<LogArray, LogArrayError> {
272        let input_buf_size = input_buf.len();
273        LogArrayError::validate_input_buf_size(input_buf_size)?;
274        let (len, width) = read_control_word(&input_buf[input_buf_size - 8..], input_buf_size)?;
275        Ok(LogArray {
276            first: 0,
277            len,
278            width,
279            input_buf,
280        })
281    }
282
283    pub fn parse_header_first(mut input_buf: Bytes) -> Result<(LogArray, Bytes), LogArrayError> {
284        let input_buf_size = input_buf.len();
285        LogArrayError::validate_input_buf_size(input_buf_size)?;
286        let (len, width) = read_control_word_trailing(&input_buf[..8], input_buf_size)?;
287        let num_bytes = logarray_length_from_len_width(len, width);
288        input_buf.advance(8);
289        let rest = input_buf.split_off(num_bytes);
290        Ok((
291            LogArray {
292                first: 0,
293                len,
294                width,
295                input_buf,
296            },
297            rest,
298        ))
299    }
300
301    /// Returns the number of elements.
302    pub fn len(&self) -> usize {
303        // `usize::try_from` succeeds if `std::mem::size_of::<usize>()` >= 4.
304        usize::try_from(self.len).unwrap()
305    }
306
307    /// Returns `true` if there are no elements.
308    pub fn is_empty(&self) -> bool {
309        self.len == 0
310    }
311
312    /// Returns the bit width.
313    pub fn width(&self) -> u8 {
314        self.width
315    }
316
317    /// Reads the data buffer and returns the element at the `index`.
318    ///
319    /// Panics if `index` is >= the length of the log array.
320    pub fn entry(&self, index: usize) -> u64 {
321        debug_assert!(
322            index < self.len(),
323            "expected index ({}) < length ({})",
324            index,
325            self.len
326        );
327
328        // `usize::try_from` succeeds if `std::mem::size_of::<usize>()` >= 4.
329        let bit_index = usize::from(self.width) * (usize::try_from(self.first).unwrap() + index);
330
331        // Calculate the byte index from the bit index.
332        let byte_index = bit_index >> 6 << 3;
333
334        let buf = &self.input_buf;
335
336        // Read the first word.
337        let first_word = BigEndian::read_u64(&buf[byte_index..]);
338
339        // This is the minimum number of leading zeros that a decoded value should have.
340        let leading_zeros = 64 - self.width;
341
342        // Get the bit offset in `first_word`.
343        let offset = (bit_index & 0b11_1111) as u8;
344
345        // If the element fits completely in `first_word`, we can return it immediately.
346        if offset + self.width <= 64 {
347            // Decode by introducing leading zeros and shifting all the way to the right.
348            return first_word << offset >> leading_zeros;
349        }
350
351        // At this point, we have an element split over `first_word` and `second_word`. The bottom
352        // bits of `first_word` become the upper bits of the decoded value, and the top bits of
353        // `second_word` become the lower bits of the decoded value.
354
355        // Read the second word
356        let second_word = BigEndian::read_u64(&buf[byte_index + 8..]);
357
358        // These are the bit widths of the important parts in `first_word` and `second_word`.
359        let first_width = 64 - offset;
360        let second_width = self.width - first_width;
361
362        // These are the parts of the element with the unimportant parts removed.
363
364        // Introduce leading zeros and trailing zeros where the `second_part` will go.
365        let first_part = first_word << offset >> offset << second_width;
366
367        // Introduce leading zeros where the `first_part` will go.
368        let second_part = second_word >> 64 - second_width;
369
370        // Decode by combining the first and second parts.
371        first_part | second_part
372    }
373
374    pub fn iter(&self) -> LogArrayIterator {
375        LogArrayIterator {
376            logarray: self.clone(),
377            pos: 0,
378            end: self.len(),
379        }
380    }
381
382    /// Returns a logical slice of the elements in a log array.
383    ///
384    /// Panics if `index` + `length` is >= the length of the log array.
385    pub fn slice(&self, offset: usize, len: usize) -> LogArray {
386        let offset = offset as u64;
387        let len = len as u64;
388        let slice_end = offset.checked_add(len).unwrap_or_else(|| {
389            panic!("overflow from slice offset ({}) + length ({})", offset, len)
390        });
391        assert!(
392            slice_end <= self.len,
393            "expected slice offset ({}) + length ({}) <= source length ({})",
394            offset,
395            len,
396            self.len
397        );
398        LogArray {
399            first: self.first + offset,
400            len,
401            width: self.width,
402            input_buf: self.input_buf.clone(),
403        }
404    }
405}
406
407/// write a logarray directly to an AsyncWrite
408pub struct LogArrayBufBuilder<B: BufMut> {
409    /// Destination of the log array data
410    buf: B,
411    /// Bit width of an element
412    width: u8,
413    /// Storage for the next word to be written to the buffer
414    current: u64,
415    /// Bit offset in `current` for the msb of the next encoded element
416    offset: u8,
417    /// Number of elements written to the buffer
418    count: u64,
419}
420
421impl<D: std::ops::DerefMut<Target = BytesMut> + BufMut> LogArrayBufBuilder<D> {
422    pub fn reserve(&mut self, additional: usize) {
423        self.buf.reserve(additional * self.width as usize / 8);
424    }
425}
426
427impl<B: BufMut> LogArrayBufBuilder<B> {
428    pub fn new(buf: B, width: u8) -> Self {
429        Self {
430            buf,
431            width,
432            // Zero is needed for bitwise OR-ing new values.
433            current: 0,
434            // Start at the beginning of `current`.
435            offset: 0,
436            // No elements have been written.
437            count: 0,
438        }
439    }
440
441    pub fn count(&self) -> u64 {
442        self.count
443    }
444
445    pub fn push(&mut self, val: u64) {
446        // This is the minimum number of leading zeros that a decoded value should have.
447        let leading_zeros = u64::BITS - self.width as u32;
448
449        // If `val` does not fit in the `width`, return an error.
450        if val.leading_zeros() < u32::from(leading_zeros) {
451            panic!("expected value ({}) to fit in {} bits", val, self.width);
452        }
453
454        // Otherwise, push `val` onto the log array.
455        // Advance the element count since we know we're going to write `val`.
456        self.count += 1;
457
458        // Write the first part of `val` to `current`, putting the msb of `val` at the `offset`
459        // bit. This may be either the upper bits of `val` only or all of it. We check later.
460        self.current |= val << leading_zeros >> self.offset;
461
462        // Increment `offset` past `val`.
463        self.offset += self.width;
464
465        // Check if the new `offset` is larger than 64.
466        if self.offset >= 64 {
467            // We have filled `current`, so write it to the destination.
468            //util::write_u64(&mut self.file, self.current).await?;
469            self.buf.put_u64(self.current);
470            // Wrap the offset with the word size.
471            self.offset -= 64;
472
473            // Initialize the new `current`.
474            self.current = if self.offset == 0 {
475                // Zero is needed for bitwise OR-ing new values.
476                0
477            } else {
478                // This is the second part of `val`: the lower bits.
479                val << 64 - self.offset
480            };
481        }
482    }
483
484    pub fn push_vec(&mut self, vals: Vec<u64>) {
485        for val in vals {
486            self.push(val);
487        }
488    }
489
490    fn finalize_data(&mut self) {
491        if u64::from(self.count) * u64::from(self.width) & 0b11_1111 != 0 {
492            self.buf.put_u64(self.current);
493        }
494    }
495
496    pub fn finalize(mut self) -> B {
497        self.finalize_data();
498
499        self.write_control_word();
500        self.buf
501    }
502
503    pub(crate) fn finalize_without_control_word(mut self) {
504        self.finalize_data();
505    }
506
507    fn write_control_word(&mut self) {
508        let len = self.count;
509        let width = self.width;
510
511        let buf = control_word(len, width);
512        self.buf.put_slice(&buf);
513    }
514}
515
516pub(crate) fn control_word(len: u64, width: u8) -> [u8; 8] {
517    if len > MAX_LOGARRAY_LEN {
518        panic!(
519            "length is too large for control word of a logarray: {} (limit is {}",
520            len, MAX_LOGARRAY_LEN
521        );
522    }
523    let mut buf = [0; 8];
524    let len_1 = (len & 0xFFFFFFFF) as u32;
525    let len_2 = ((len >> 32) & 0xFFFFFF) as u32;
526    BigEndian::write_u32(&mut buf, len_1);
527    BigEndian::write_u32(&mut buf[4..], len_2);
528    buf[4] = width;
529
530    buf
531}
532
533pub struct LateLogArrayBufBuilder<B: BufMut> {
534    /// Destination of the log array data
535    buf: B,
536    /// NOTE: remove pub
537    pub vals: Vec<u64>,
538    width: u8,
539}
540
541impl<B: BufMut> LateLogArrayBufBuilder<B> {
542    pub fn new(buf: B) -> Self {
543        Self {
544            buf,
545            vals: Vec::new(),
546            width: 0,
547        }
548    }
549
550    pub fn count(&self) -> u64 {
551        self.vals.len() as u64
552    }
553
554    pub fn push(&mut self, val: u64) {
555        self.vals.push(val);
556        let width = calculate_width(val);
557        if self.width < width {
558            self.width = width;
559        }
560    }
561
562    pub fn push_vec(&mut self, vals: Vec<u64>) {
563        for val in vals {
564            self.push(val)
565        }
566    }
567
568    pub fn last(&mut self) -> Option<u64> {
569        self.vals.last().copied()
570    }
571
572    pub fn pop(&mut self) -> Option<u64> {
573        self.vals.pop()
574    }
575
576    pub fn finalize(mut self) -> B {
577        let mut builder = LogArrayBufBuilder::new(&mut self.buf, self.width);
578        builder.push_vec(self.vals);
579        builder.finalize();
580        self.buf
581    }
582
583    pub fn finalize_header_first(mut self) -> B {
584        let control_word = control_word(self.count(), self.width);
585        self.buf.put(control_word.as_ref());
586        let mut builder = LogArrayBufBuilder::new(&mut self.buf, self.width);
587        builder.push_vec(self.vals);
588        builder.finalize_without_control_word();
589        self.buf
590    }
591}
592
593/// write a logarray directly to an AsyncWrite
594pub struct LogArrayFileBuilder<W: SyncableFile> {
595    /// Destination of the log array data
596    file: W,
597    /// Bit width of an element
598    width: u8,
599    /// Storage for the next word to be written to the buffer
600    current: u64,
601    /// Bit offset in `current` for the msb of the next encoded element
602    offset: u8,
603    /// Number of elements written to the buffer
604    count: u64,
605}
606
607impl<W: SyncableFile> LogArrayFileBuilder<W> {
608    pub fn new(w: W, width: u8) -> LogArrayFileBuilder<W> {
609        LogArrayFileBuilder {
610            file: w,
611            width,
612            // Zero is needed for bitwise OR-ing new values.
613            current: 0,
614            // Start at the beginning of `current`.
615            offset: 0,
616            // No elements have been written.
617            count: 0,
618        }
619    }
620
621    pub fn count(&self) -> u64 {
622        self.count
623    }
624
625    pub async fn push(&mut self, val: u64) -> io::Result<()> {
626        // This is the minimum number of leading zeros that a decoded value should have.
627        let leading_zeros = 64 - self.width;
628
629        // If `val` does not fit in the `width`, return an error.
630        if val.leading_zeros() < u32::from(leading_zeros) {
631            return Err(io::Error::new(
632                io::ErrorKind::InvalidData,
633                format!("expected value ({}) to fit in {} bits", val, self.width),
634            ));
635        }
636
637        // Otherwise, push `val` onto the log array.
638        // Advance the element count since we know we're going to write `val`.
639        self.count += 1;
640
641        // Write the first part of `val` to `current`, putting the msb of `val` at the `offset`
642        // bit. This may be either the upper bits of `val` only or all of it. We check later.
643        self.current |= val << leading_zeros >> self.offset;
644
645        // Increment `offset` past `val`.
646        self.offset += self.width;
647
648        // Check if the new `offset` is larger than 64.
649        if self.offset >= 64 {
650            // We have filled `current`, so write it to the destination.
651            util::write_u64(&mut self.file, self.current).await?;
652            // Wrap the offset with the word size.
653            self.offset -= 64;
654
655            // Initialize the new `current`.
656            self.current = if self.offset == 0 {
657                // Zero is needed for bitwise OR-ing new values.
658                0
659            } else {
660                // This is the second part of `val`: the lower bits.
661                val << 64 - self.offset
662            };
663        }
664
665        Ok(())
666    }
667
668    pub async fn push_vec(&mut self, vals: Vec<u64>) -> io::Result<()> {
669        for val in vals {
670            self.push(val).await?;
671        }
672
673        Ok(())
674    }
675
676    pub async fn push_all<S: Stream<Item = io::Result<u64>> + Unpin>(
677        &mut self,
678        mut vals: S,
679    ) -> io::Result<()> {
680        while let Some(val) = vals.next().await {
681            self.push(val?).await?;
682        }
683
684        Ok(())
685    }
686
687    async fn finalize_data(&mut self) -> io::Result<()> {
688        if self.count * u64::from(self.width) & 0b11_1111 != 0 {
689            util::write_u64(&mut self.file, self.current).await?;
690        }
691
692        Ok(())
693    }
694
695    pub async fn finalize(mut self) -> io::Result<()> {
696        let len = self.count;
697        let width = self.width;
698
699        // Write the final data word.
700        self.finalize_data().await?;
701
702        // Write the control word.
703        let buf = control_word(len, width);
704        self.file.write_all(&buf).await?;
705
706        self.file.flush().await?;
707        self.file.sync_all().await?;
708
709        Ok(())
710    }
711}
712
713struct LogArrayDecoder {
714    /// Storage for the most recent word read from the buffer
715    current: u64,
716    /// Bit width of an element
717    width: u8,
718    /// Bit offset from the msb of `current` to the msb of the encoded element
719    offset: u8,
720    /// Number of elements remaining to decode
721    remaining: u64,
722}
723
724impl fmt::Debug for LogArrayDecoder {
725    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
726        write!(f, "LogArrayDecoder {{ current: ")?;
727        write!(f, "{:#066b}", self.current)?;
728        write!(f, ", width: ")?;
729        write!(f, "{:?}", self.width)?;
730        write!(f, ", offset: ")?;
731        write!(f, "{:?}", self.offset)?;
732        write!(f, ", remaining: ")?;
733        write!(f, "{:?}", self.remaining)?;
734        write!(f, " }}")
735    }
736}
737
738impl LogArrayDecoder {
739    /// Construct a new `LogArrayDecoder`.
740    ///
741    /// This function does not validate the parameters. Validation of `width` and `remaining` must
742    /// be done before calling this function.
743    fn new_unchecked(width: u8, remaining: u64) -> Self {
744        LogArrayDecoder {
745            // The initial value of `current` is ignored by `decode()` because `offset` is 64.
746            current: 0,
747            // The initial value of `offset` is interpreted in `decode()` to begin reading a new
748            // word and ignore the initial value of `current`.
749            offset: 64,
750            width,
751            remaining,
752        }
753    }
754}
755
756impl Decoder for LogArrayDecoder {
757    type Item = u64;
758    type Error = io::Error;
759
760    /// Decode the next element of the log array.
761    fn decode(&mut self, bytes: &mut BytesMut) -> Result<Option<u64>, io::Error> {
762        // If we have no elements remaining to decode, clean up and exit.
763        if self.remaining == 0 {
764            bytes.clear();
765            return Ok(None);
766        }
767
768        // At this point, we have at least one element to decode.
769
770        // Declare some immutable working values. After this, `self.<field>` only appears on the
771        // lhs of `=`.
772        let first_word = self.current;
773        let offset = self.offset;
774        let width = self.width;
775
776        // This is the minimum number of leading zeros that a decoded value should have.
777        let leading_zeros = 64 - width;
778
779        // If the next element fits completely in `first_word`, we can return it immediately.
780        if offset + width <= 64 {
781            // Increment to the msb of the next element.
782            self.offset += width;
783            // Decrement since we're returning a decoded element.
784            self.remaining -= 1;
785            // Decode by introducing leading zeros and shifting all the way to the right.
786            return Ok(Some(first_word << offset >> leading_zeros));
787        }
788
789        // At this point, we need to read another word because we do not have enough bits in
790        // `first_word` to decode.
791
792        // If there isn't a full word available in the buffer, stop until there is.
793        if bytes.len() < 8 {
794            return Ok(None);
795        }
796
797        // Load the `second_word` and advance `bytes` by 1 word.
798        let second_word = BigEndian::read_u64(&bytes.split_to(8));
799        self.current = second_word;
800
801        // Decrement to indicate we will return another decoded element.
802        self.remaining -= 1;
803
804        // If the `offset` is 64, it means that the element is completely included in the
805        // `second_word`.
806        if offset == 64 {
807            // Increment the `offset` to the msb of the next element.
808            self.offset = width;
809
810            // Decode by shifting all the way to the right. Since the msb of `second_word` and the
811            // encoded value are the same, this naturally introduces leading zeros.
812            return Ok(Some(second_word >> leading_zeros));
813        }
814
815        // At this point, we have an element split over `first_word` and `second_word`. The bottom
816        // bits of `first_word` become the upper bits of the decoded value, and the top bits of
817        // `second_word` become the lower bits of the decoded value.
818
819        // These are the bit widths of the important parts in `first_word` and `second_word`.
820        let first_width = 64 - offset;
821        let second_width = width - first_width;
822
823        // These are the parts of the element with the unimportant parts removed.
824
825        // Introduce leading zeros and trailing zeros where the `second_part` will go.
826        let first_part = first_word << offset >> offset << second_width;
827
828        // Introduce leading zeros where the `first_part` will go.
829        let second_part = second_word >> 64 - second_width;
830
831        // Increment the `offset` to the msb of the next element.
832        self.offset = second_width;
833
834        // Decode by combining the first and second parts.
835        Ok(Some(first_part | second_part))
836    }
837}
838
839pub async fn logarray_file_get_length_and_width<F: FileLoad>(f: F) -> io::Result<(u64, u8)> {
840    LogArrayError::validate_input_buf_size(f.size().await?)?;
841
842    let mut buf = [0; 8];
843    f.open_read_from(f.size().await? - 8)
844        .await?
845        .read_exact(&mut buf)
846        .await?;
847    Ok(read_control_word(&buf, f.size().await?)?)
848}
849
850pub async fn logarray_stream_entries<F: 'static + FileLoad>(
851    f: F,
852) -> io::Result<impl Stream<Item = io::Result<u64>> + Unpin + Send> {
853    let (len, width) = logarray_file_get_length_and_width(f.clone()).await?;
854    Ok(FramedRead::new(
855        f.open_read().await?,
856        LogArrayDecoder::new_unchecked(width, len),
857    ))
858}
859
860#[derive(Clone)]
861pub struct MonotonicLogArray(LogArray);
862
863impl std::fmt::Debug for MonotonicLogArray {
864    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
865        write!(f, "MonotonicLogArray([{}])", self.iter().format(", "))
866    }
867}
868
869impl MonotonicLogArray {
870    pub fn from_logarray(logarray: LogArray) -> MonotonicLogArray {
871        if cfg!(debug_assertions) {
872            // Validate that the elements are monotonically increasing.
873            let mut iter = logarray.iter();
874            if let Some(mut pred) = iter.next() {
875                for succ in iter {
876                    assert!(
877                        pred <= succ,
878                        "not monotonic: expected predecessor ({}) <= successor ({})",
879                        pred,
880                        succ
881                    );
882                    pred = succ;
883                }
884            }
885        }
886
887        MonotonicLogArray(logarray)
888    }
889
890    pub fn parse(bytes: Bytes) -> Result<MonotonicLogArray, LogArrayError> {
891        let logarray = LogArray::parse(bytes)?;
892
893        Ok(Self::from_logarray(logarray))
894    }
895
896    pub fn parse_header_first(bytes: Bytes) -> Result<(MonotonicLogArray, Bytes), LogArrayError> {
897        let (logarray, remainder) = LogArray::parse_header_first(bytes)?;
898
899        Ok((Self::from_logarray(logarray), remainder))
900    }
901
902    pub fn len(&self) -> usize {
903        self.0.len()
904    }
905
906    pub fn is_empty(&self) -> bool {
907        self.0.is_empty()
908    }
909
910    pub fn entry(&self, index: usize) -> u64 {
911        self.0.entry(index)
912    }
913
914    pub fn iter(&self) -> LogArrayIterator {
915        self.0.iter()
916    }
917
918    pub fn index_of(&self, element: u64) -> Option<usize> {
919        let index = self.nearest_index_of(element);
920        if index >= self.len() || self.entry(index) != element {
921            None
922        } else {
923            Some(index)
924        }
925    }
926
927    pub fn nearest_index_of(&self, element: u64) -> usize {
928        if self.is_empty() {
929            return 0;
930        }
931
932        let mut min = 0;
933        let mut max = self.len() - 1;
934        while min <= max {
935            let mid = (min + max) / 2;
936            match element.cmp(&self.entry(mid)) {
937                Ordering::Equal => return mid,
938                Ordering::Greater => min = mid + 1,
939                Ordering::Less => {
940                    if mid == 0 {
941                        return 0;
942                    }
943                    max = mid - 1
944                }
945            }
946        }
947
948        (min + max) / 2 + 1
949    }
950
951    pub fn slice(&self, offset: usize, len: usize) -> MonotonicLogArray {
952        Self(self.0.slice(offset, len))
953    }
954}
955
956impl From<LogArray> for MonotonicLogArray {
957    fn from(l: LogArray) -> Self {
958        Self::from_logarray(l)
959    }
960}
961
962#[cfg(test)]
963mod tests {
964    use super::*;
965    use crate::storage::memory::MemoryBackedStore;
966    use crate::storage::FileStore;
967    use crate::util::stream_iter_ok;
968    use futures::executor::block_on;
969    use futures::stream::TryStreamExt;
970
971    #[test]
972    fn log_array_error() {
973        // Display
974        assert_eq!(
975            "expected input buffer size (7) >= 8",
976            LogArrayError::InputBufferTooSmall(7).to_string()
977        );
978        assert_eq!(
979            "expected width (69) <= 64",
980            LogArrayError::WidthTooLarge(69).to_string()
981        );
982        assert_eq!(
983            "expected input buffer size (9) to be 8 for 0 elements and width 17",
984            LogArrayError::UnexpectedInputBufferSize(9, 8, 0, 17).to_string()
985        );
986
987        // From<LogArrayError> for io::Error
988        assert_eq!(
989            io::Error::new(
990                io::ErrorKind::InvalidData,
991                LogArrayError::InputBufferTooSmall(7)
992            )
993            .to_string(),
994            io::Error::from(LogArrayError::InputBufferTooSmall(7)).to_string()
995        );
996    }
997
998    #[test]
999    fn validate_input_buf_size() {
1000        let val = |buf_size| LogArrayError::validate_input_buf_size(buf_size);
1001        let err = |buf_size| Err(LogArrayError::InputBufferTooSmall(buf_size));
1002        assert_eq!(err(7), val(7));
1003        assert_eq!(Ok(()), val(8));
1004        assert_eq!(Ok(()), val(9));
1005        assert_eq!(Ok(()), val(usize::max_value()));
1006    }
1007
1008    #[test]
1009    fn validate_len_and_width() {
1010        let val =
1011            |buf_size, len, width| LogArrayError::validate_len_and_width(buf_size, len, width);
1012
1013        let err = |width| Err(LogArrayError::WidthTooLarge(width));
1014
1015        // width: 65
1016        assert_eq!(err(65), val(0, 0, 65));
1017
1018        let err = |buf_size, expected, len, width| {
1019            Err(LogArrayError::UnexpectedInputBufferSize(
1020                buf_size, expected, len, width,
1021            ))
1022        };
1023
1024        // width: 0
1025        assert_eq!(err(0, 8, 0, 0), val(0, 0, 0));
1026
1027        // width: 1
1028        assert_eq!(Ok(()), val(8, 0, 1));
1029        assert_eq!(err(9, 8, 0, 1), val(9, 0, 1));
1030        assert_eq!(Ok(()), val(16, 1, 1));
1031
1032        // width: 64
1033        assert_eq!(Ok(()), val(16, 1, 64));
1034        assert_eq!(err(16, 24, 2, 64), val(16, 2, 64));
1035        assert_eq!(err(24, 16, 1, 64), val(24, 1, 64));
1036
1037        #[cfg(target_pointer_width = "64")]
1038        assert_eq!(
1039            Ok(()),
1040            val(
1041                usize::try_from(u64::from(u32::max_value()) + 1 << 3).unwrap(),
1042                u32::max_value() as u64,
1043                64
1044            )
1045        );
1046
1047        // width: 5
1048        assert_eq!(err(16, 24, 13, 5), val(16, 13, 5));
1049        assert_eq!(Ok(()), val(24, 13, 5));
1050    }
1051
1052    #[test]
1053    pub fn empty() {
1054        let logarray = LogArray::parse(Bytes::from([0u8; 8].as_ref())).unwrap();
1055        assert!(logarray.is_empty());
1056        assert!(MonotonicLogArray::from_logarray(logarray).is_empty());
1057    }
1058
1059    #[test]
1060    pub fn late_logarray_just_zero() {
1061        let buf = BytesMut::new();
1062        let mut builder = LateLogArrayBufBuilder::new(buf);
1063        builder.push(0);
1064        let logarray_buf = builder.finalize().freeze();
1065        let logarray = LogArray::parse(logarray_buf).unwrap();
1066        assert_eq!(logarray.entry(0_usize), 0_u64);
1067    }
1068
1069    #[tokio::test]
1070    #[should_panic(expected = "expected value (8) to fit in 3 bits")]
1071    async fn log_array_file_builder_panic() {
1072        let store = MemoryBackedStore::new();
1073        let mut builder = LogArrayFileBuilder::new(store.open_write().await.unwrap(), 3);
1074        block_on(builder.push(8)).unwrap();
1075    }
1076
1077    #[tokio::test]
1078    async fn generate_then_parse_works() {
1079        let store = MemoryBackedStore::new();
1080        let mut builder = LogArrayFileBuilder::new(store.open_write().await.unwrap(), 5);
1081        block_on(async {
1082            builder
1083                .push_all(stream_iter_ok(vec![1, 3, 2, 5, 12, 31, 18]))
1084                .await?;
1085            builder.finalize().await?;
1086
1087            Ok::<_, io::Error>(())
1088        })
1089        .unwrap();
1090
1091        let content = block_on(store.map()).unwrap();
1092
1093        let logarray = LogArray::parse(content).unwrap();
1094
1095        assert_eq!(1, logarray.entry(0));
1096        assert_eq!(3, logarray.entry(1));
1097        assert_eq!(2, logarray.entry(2));
1098        assert_eq!(5, logarray.entry(3));
1099        assert_eq!(12, logarray.entry(4));
1100        assert_eq!(31, logarray.entry(5));
1101        assert_eq!(18, logarray.entry(6));
1102    }
1103
1104    const TEST0_DATA: [u8; 8] = [
1105        0b00000000,
1106        0b00000000,
1107        0b1_0000000,
1108        0b00000000,
1109        0b10_000000,
1110        0b00000000,
1111        0b011_00000,
1112        0b00000000,
1113    ];
1114    const TEST0_CONTROL: [u8; 8] = [0, 0, 0, 3, 17, 0, 0, 0];
1115    const TEST1_DATA: [u8; 8] = [
1116        0b0100_0000,
1117        0b00000000,
1118        0b00101_000,
1119        0b00000000,
1120        0b000110_00,
1121        0b00000000,
1122        0b0000111_0,
1123        0b00000000,
1124    ];
1125
1126    fn test0_logarray() -> LogArray {
1127        let mut content = Vec::new();
1128        content.extend_from_slice(&TEST0_DATA);
1129        content.extend_from_slice(&TEST0_CONTROL);
1130        LogArray::parse(Bytes::from(content)).unwrap()
1131    }
1132
1133    #[test]
1134    #[should_panic(expected = "expected index (3) < length (3)")]
1135    fn entry_panic() {
1136        let _ = test0_logarray().entry(3);
1137    }
1138
1139    #[test]
1140    #[should_panic(expected = "expected slice offset (2) + length (2) <= source length (3)")]
1141    fn slice_panic1() {
1142        let _ = test0_logarray().slice(2, 2);
1143    }
1144
1145    #[test]
1146    #[should_panic(expected = "expected slice offset (4294967296)")]
1147    #[cfg(target_pointer_width = "64")]
1148    fn slice_panic2() {
1149        let _ = test0_logarray().slice(usize::try_from(u32::max_value()).unwrap() + 1, 2);
1150    }
1151
1152    #[test]
1153    #[should_panic(expected = "expected index (2) < length (2)")]
1154    fn slice_entry_panic() {
1155        let _ = test0_logarray().slice(1, 2).entry(2);
1156    }
1157
1158    #[test]
1159    #[cfg(debug_assertions)]
1160    #[should_panic(expected = "not monotonic: expected predecessor (2) <= successor (1)")]
1161    fn monotonic_panic() {
1162        let content = [0u8, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 2, 32, 0, 0, 0].as_ref();
1163        MonotonicLogArray::from_logarray(LogArray::parse(Bytes::from(content)).unwrap());
1164    }
1165
1166    #[test]
1167    fn decode() {
1168        let mut decoder = LogArrayDecoder::new_unchecked(17, 1);
1169        let mut bytes = BytesMut::from(TEST0_DATA.as_ref());
1170        assert_eq!(Some(1), Decoder::decode(&mut decoder, &mut bytes).unwrap());
1171        assert_eq!(None, Decoder::decode(&mut decoder, &mut bytes).unwrap());
1172        decoder = LogArrayDecoder::new_unchecked(17, 4);
1173        bytes = BytesMut::from(TEST0_DATA.as_ref());
1174        assert_eq!(Some(1), Decoder::decode(&mut decoder, &mut bytes).unwrap());
1175        assert_eq!(
1176            "LogArrayDecoder { current: \
1177             0b0000000000000000100000000000000010000000000000000110000000000000, width: 17, \
1178             offset: 17, remaining: 3 }",
1179            format!("{:?}", decoder)
1180        );
1181        assert_eq!(Some(2), Decoder::decode(&mut decoder, &mut bytes).unwrap());
1182        assert_eq!(Some(3), Decoder::decode(&mut decoder, &mut bytes).unwrap());
1183        assert_eq!(None, Decoder::decode(&mut decoder, &mut bytes).unwrap());
1184        bytes.extend(TEST1_DATA.iter());
1185        assert_eq!(Some(4), Decoder::decode(&mut decoder, &mut bytes).unwrap());
1186        assert_eq!(None, Decoder::decode(&mut decoder, &mut bytes).unwrap());
1187    }
1188
1189    #[tokio::test]
1190    async fn logarray_file_get_length_and_width_errors() {
1191        let store = MemoryBackedStore::new();
1192        let mut writer = store.open_write().await.unwrap();
1193        writer.write_all(&[0, 0, 0]).await.unwrap();
1194        writer.sync_all().await.unwrap();
1195        assert_eq!(
1196            io::Error::from(LogArrayError::InputBufferTooSmall(3)).to_string(),
1197            block_on(logarray_file_get_length_and_width(store))
1198                .err()
1199                .unwrap()
1200                .to_string()
1201        );
1202
1203        let store = MemoryBackedStore::new();
1204        let mut writer = store.open_write().await.unwrap();
1205        writer.write_all(&[0, 0, 0, 0, 65, 0, 0, 0]).await.unwrap();
1206        writer.sync_all().await.unwrap();
1207        assert_eq!(
1208            io::Error::from(LogArrayError::WidthTooLarge(65)).to_string(),
1209            block_on(logarray_file_get_length_and_width(store))
1210                .err()
1211                .unwrap()
1212                .to_string()
1213        );
1214
1215        let store = MemoryBackedStore::new();
1216        let mut writer = store.open_write().await.unwrap();
1217        writer.write_all(&[0, 0, 0, 1, 17, 0, 0, 0]).await.unwrap();
1218        writer.sync_all().await.unwrap();
1219        assert_eq!(
1220            io::Error::from(LogArrayError::UnexpectedInputBufferSize(8, 16, 1, 17)).to_string(),
1221            block_on(logarray_file_get_length_and_width(store))
1222                .err()
1223                .unwrap()
1224                .to_string()
1225        );
1226    }
1227
1228    #[tokio::test]
1229    async fn generate_then_stream_works() {
1230        let store = MemoryBackedStore::new();
1231        let mut builder = LogArrayFileBuilder::new(store.open_write().await.unwrap(), 5);
1232        block_on(async {
1233            builder.push_all(stream_iter_ok(0..31)).await?;
1234            builder.finalize().await?;
1235
1236            Ok::<_, io::Error>(())
1237        })
1238        .unwrap();
1239
1240        let entries: Vec<u64> = block_on(
1241            logarray_stream_entries(store)
1242                .await
1243                .unwrap()
1244                .try_collect::<Vec<u64>>(),
1245        )
1246        .unwrap();
1247        let expected: Vec<u64> = (0..31).collect();
1248        assert_eq!(expected, entries);
1249    }
1250
1251    #[tokio::test]
1252    async fn iterate_over_logarray() {
1253        let store = MemoryBackedStore::new();
1254        let mut builder = LogArrayFileBuilder::new(store.open_write().await.unwrap(), 5);
1255        let original = vec![1, 3, 2, 5, 12, 31, 18];
1256        block_on(async {
1257            builder.push_all(stream_iter_ok(original.clone())).await?;
1258            builder.finalize().await?;
1259
1260            Ok::<_, io::Error>(())
1261        })
1262        .unwrap();
1263
1264        let content = block_on(store.map()).unwrap();
1265
1266        let logarray = LogArray::parse(content).unwrap();
1267
1268        let result: Vec<u64> = logarray.iter().collect();
1269
1270        assert_eq!(original, result);
1271    }
1272
1273    #[tokio::test]
1274    async fn iterate_over_logarray_slice() {
1275        let store = MemoryBackedStore::new();
1276        let mut builder = LogArrayFileBuilder::new(store.open_write().await.unwrap(), 5);
1277        let original: Vec<u64> = vec![1, 3, 2, 5, 12, 31, 18];
1278        block_on(async {
1279            builder.push_all(stream_iter_ok(original)).await?;
1280            builder.finalize().await?;
1281
1282            Ok::<_, io::Error>(())
1283        })
1284        .unwrap();
1285
1286        let content = block_on(store.map()).unwrap();
1287
1288        let logarray = LogArray::parse(content).unwrap();
1289        let slice = logarray.slice(2, 3);
1290
1291        let result: Vec<u64> = slice.iter().collect();
1292
1293        assert_eq!([2, 5, 12], result.as_ref());
1294    }
1295
1296    #[tokio::test]
1297    async fn monotonic_logarray_index_lookup() {
1298        let store = MemoryBackedStore::new();
1299        let mut builder = LogArrayFileBuilder::new(store.open_write().await.unwrap(), 5);
1300        let original = vec![1, 3, 5, 6, 7, 10, 11, 15, 16, 18, 20, 25, 31];
1301        block_on(async {
1302            builder.push_all(stream_iter_ok(original.clone())).await?;
1303            builder.finalize().await?;
1304
1305            Ok::<_, io::Error>(())
1306        })
1307        .unwrap();
1308
1309        let content = block_on(store.map()).unwrap();
1310
1311        let logarray = LogArray::parse(content).unwrap();
1312        let monotonic = MonotonicLogArray::from_logarray(logarray);
1313
1314        for (i, &val) in original.iter().enumerate() {
1315            assert_eq!(i, monotonic.index_of(val).unwrap());
1316        }
1317
1318        assert_eq!(None, monotonic.index_of(12));
1319        assert_eq!(original.len(), monotonic.len());
1320    }
1321
1322    #[tokio::test]
1323    async fn monotonic_logarray_near_index_lookup() {
1324        let store = MemoryBackedStore::new();
1325        let mut builder = LogArrayFileBuilder::new(store.open_write().await.unwrap(), 5);
1326        let original = vec![3, 5, 6, 7, 10, 11, 15, 16, 18, 20, 25, 31];
1327        block_on(async {
1328            builder.push_all(stream_iter_ok(original.clone())).await?;
1329            builder.finalize().await?;
1330            Ok::<_, io::Error>(())
1331        })
1332        .unwrap();
1333
1334        let content = block_on(store.map()).unwrap();
1335
1336        let logarray = LogArray::parse(content).unwrap();
1337        let monotonic = MonotonicLogArray::from_logarray(logarray);
1338
1339        for (i, &val) in original.iter().enumerate() {
1340            assert_eq!(i, monotonic.index_of(val).unwrap());
1341        }
1342
1343        let nearest: Vec<_> = (1..=32).map(|i| monotonic.nearest_index_of(i)).collect();
1344        let expected = vec![
1345            0, 0, 0, 1, 1, 2, 3, 4, 4, 4, 5, 6, 6, 6, 6, 7, 8, 8, 9, 9, 10, 10, 10, 10, 10, 11, 11,
1346            11, 11, 11, 11, 12,
1347        ];
1348        assert_eq!(expected, nearest);
1349    }
1350
1351    #[tokio::test]
1352    async fn writing_64_bits_of_data() {
1353        let store = MemoryBackedStore::new();
1354        let original = vec![1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8];
1355        let mut builder = LogArrayFileBuilder::new(store.open_write().await.unwrap(), 4);
1356        block_on(async {
1357            builder.push_all(stream_iter_ok(original.clone())).await?;
1358            builder.finalize().await?;
1359
1360            Ok::<_, io::Error>(())
1361        })
1362        .unwrap();
1363
1364        let content = block_on(store.map()).unwrap();
1365        let logarray = LogArray::parse(content).unwrap();
1366        assert_eq!(original, logarray.iter().collect::<Vec<_>>());
1367        assert_eq!(16, logarray.len());
1368        assert_eq!(4, logarray.width());
1369    }
1370
1371    #[test]
1372    fn large_control_word() {
1373        let num: u64 = 0xFF_FFFF_FFFF_FFFF;
1374        let width: u8 = 32;
1375
1376        let control_word = control_word(num, width);
1377        assert_eq!([255, 255, 255, 255, 32, 255, 255, 255], control_word);
1378        let (out_num, out_width) = parse_control_word(&control_word);
1379        assert_eq!(num, out_num);
1380        assert_eq!(width, out_width);
1381    }
1382}