parquet 32.0.0

Apache Parquet implementation in Rust
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Contains all supported decoders for Parquet.

use num::traits::WrappingAdd;
use num::FromPrimitive;
use std::{cmp, marker::PhantomData, mem};

use super::rle::RleDecoder;

use crate::basic::*;
use crate::data_type::private::ParquetValueType;
use crate::data_type::*;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::{
    bit_util::{self, BitReader},
    memory::ByteBufferPtr,
};

pub(crate) mod private {
    use super::*;

    /// A trait that allows getting a [`Decoder`] implementation for a [`DataType`] with
    /// the corresponding [`ParquetValueType`]. This is necessary to support
    /// [`Decoder`] implementations that may not be applicable for all [`DataType`]
    /// and by extension all [`ParquetValueType`]
    pub trait GetDecoder {
        fn get_decoder<T: DataType<T = Self>>(
            descr: ColumnDescPtr,
            encoding: Encoding,
        ) -> Result<Box<dyn Decoder<T>>> {
            get_decoder_default(descr, encoding)
        }
    }

    fn get_decoder_default<T: DataType>(
        descr: ColumnDescPtr,
        encoding: Encoding,
    ) -> Result<Box<dyn Decoder<T>>> {
        match encoding {
            Encoding::PLAIN => Ok(Box::new(PlainDecoder::new(descr.type_length()))),
            Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Err(general_err!(
                "Cannot initialize this encoding through this function"
            )),
            Encoding::RLE
            | Encoding::DELTA_BINARY_PACKED
            | Encoding::DELTA_BYTE_ARRAY
            | Encoding::DELTA_LENGTH_BYTE_ARRAY => Err(general_err!(
                "Encoding {} is not supported for type",
                encoding
            )),
            e => Err(nyi_err!("Encoding {} is not supported", e)),
        }
    }

    impl GetDecoder for bool {
        fn get_decoder<T: DataType<T = Self>>(
            descr: ColumnDescPtr,
            encoding: Encoding,
        ) -> Result<Box<dyn Decoder<T>>> {
            match encoding {
                Encoding::RLE => Ok(Box::new(RleValueDecoder::new())),
                _ => get_decoder_default(descr, encoding),
            }
        }
    }

    impl GetDecoder for i32 {
        fn get_decoder<T: DataType<T = Self>>(
            descr: ColumnDescPtr,
            encoding: Encoding,
        ) -> Result<Box<dyn Decoder<T>>> {
            match encoding {
                Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
                _ => get_decoder_default(descr, encoding),
            }
        }
    }

    impl GetDecoder for i64 {
        fn get_decoder<T: DataType<T = Self>>(
            descr: ColumnDescPtr,
            encoding: Encoding,
        ) -> Result<Box<dyn Decoder<T>>> {
            match encoding {
                Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
                _ => get_decoder_default(descr, encoding),
            }
        }
    }

    impl GetDecoder for f32 {}
    impl GetDecoder for f64 {}

    impl GetDecoder for ByteArray {
        fn get_decoder<T: DataType<T = Self>>(
            descr: ColumnDescPtr,
            encoding: Encoding,
        ) -> Result<Box<dyn Decoder<T>>> {
            match encoding {
                Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
                Encoding::DELTA_LENGTH_BYTE_ARRAY => {
                    Ok(Box::new(DeltaLengthByteArrayDecoder::new()))
                }
                _ => get_decoder_default(descr, encoding),
            }
        }
    }

    impl GetDecoder for FixedLenByteArray {
        fn get_decoder<T: DataType<T = Self>>(
            descr: ColumnDescPtr,
            encoding: Encoding,
        ) -> Result<Box<dyn Decoder<T>>> {
            match encoding {
                Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
                _ => get_decoder_default(descr, encoding),
            }
        }
    }

    impl GetDecoder for Int96 {}
}

// ----------------------------------------------------------------------
// Decoders

/// A Parquet decoder for the data type `T`.
pub trait Decoder<T: DataType>: Send {
    /// Sets the data to decode to be `data`, which should contain `num_values` of values
    /// to decode.
    fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()>;

    /// Consumes values from this decoder and write the results to `buffer`. This will try
    /// to fill up `buffer`.
    ///
    /// Returns the actual number of values decoded, which should be equal to
    /// `buffer.len()` unless the remaining number of values is less than
    /// `buffer.len()`.
    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize>;

    /// Consume values from this decoder and write the results to `buffer`, leaving
    /// "spaces" for null values.
    ///
    /// `null_count` is the number of nulls we expect to see in `buffer`, after reading.
    /// `valid_bits` stores the valid bit for each value in the buffer. It should contain
    ///   at least number of bits that equal to `buffer.len()`.
    ///
    /// Returns the actual number of values decoded.
    ///
    /// # Panics
    ///
    /// Panics if `null_count` is greater than `buffer.len()`.
    fn get_spaced(
        &mut self,
        buffer: &mut [T::T],
        null_count: usize,
        valid_bits: &[u8],
    ) -> Result<usize> {
        assert!(buffer.len() >= null_count);

        // TODO: check validity of the input arguments?
        if null_count == 0 {
            return self.get(buffer);
        }

        let num_values = buffer.len();
        let values_to_read = num_values - null_count;
        let values_read = self.get(buffer)?;
        if values_read != values_to_read {
            return Err(general_err!(
                "Number of values read: {}, doesn't match expected: {}",
                values_read,
                values_to_read
            ));
        }
        let mut values_to_move = values_read;
        for i in (0..num_values).rev() {
            if bit_util::get_bit(valid_bits, i) {
                values_to_move -= 1;
                buffer.swap(i, values_to_move);
            }
        }

        Ok(num_values)
    }

    /// Returns the number of values left in this decoder stream.
    fn values_left(&self) -> usize;

    /// Returns the encoding for this decoder.
    fn encoding(&self) -> Encoding;

    /// Skip the specified number of values in this decoder stream.
    fn skip(&mut self, num_values: usize) -> Result<usize>;
}

/// Gets a decoder for the column descriptor `descr` and encoding type `encoding`.
///
/// NOTE: the primitive type in `descr` MUST match the data type `T`, otherwise
/// disastrous consequence could occur.
pub fn get_decoder<T: DataType>(
    descr: ColumnDescPtr,
    encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
    use self::private::GetDecoder;
    T::T::get_decoder(descr, encoding)
}

// ----------------------------------------------------------------------
// PLAIN Decoding

#[derive(Default)]
pub struct PlainDecoderDetails {
    // The remaining number of values in the byte array
    pub(crate) num_values: usize,

    // The current starting index in the byte array. Not used when `T` is bool.
    pub(crate) start: usize,

    // The length for the type `T`. Only used when `T` is `FixedLenByteArrayType`
    pub(crate) type_length: i32,

    // The byte array to decode from. Not set if `T` is bool.
    pub(crate) data: Option<ByteBufferPtr>,

    // Read `data` bit by bit. Only set if `T` is bool.
    pub(crate) bit_reader: Option<BitReader>,
}

/// Plain decoding that supports all types.
/// Values are encoded back to back. For native types, data is encoded as little endian.
/// Floating point types are encoded in IEEE.
/// See [`PlainEncoder`](crate::encoding::PlainEncoder) for more information.
pub struct PlainDecoder<T: DataType> {
    // The binary details needed for decoding
    inner: PlainDecoderDetails,

    // To allow `T` in the generic parameter for this struct. This doesn't take any
    // space.
    _phantom: PhantomData<T>,
}

impl<T: DataType> PlainDecoder<T> {
    /// Creates new plain decoder.
    pub fn new(type_length: i32) -> Self {
        PlainDecoder {
            inner: PlainDecoderDetails {
                type_length,
                num_values: 0,
                start: 0,
                data: None,
                bit_reader: None,
            },
            _phantom: PhantomData,
        }
    }
}

impl<T: DataType> Decoder<T> for PlainDecoder<T> {
    #[inline]
    fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> {
        T::T::set_data(&mut self.inner, data, num_values);
        Ok(())
    }

    #[inline]
    fn values_left(&self) -> usize {
        self.inner.num_values
    }

    #[inline]
    fn encoding(&self) -> Encoding {
        Encoding::PLAIN
    }

    #[inline]
    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
        T::T::decode(buffer, &mut self.inner)
    }

    #[inline]
    fn skip(&mut self, num_values: usize) -> Result<usize> {
        T::T::skip(&mut self.inner, num_values)
    }
}

// ----------------------------------------------------------------------
// RLE_DICTIONARY/PLAIN_DICTIONARY Decoding

/// Dictionary decoder.
/// The dictionary encoding builds a dictionary of values encountered in a given column.
/// The dictionary is be stored in a dictionary page per column chunk.
/// See [`DictEncoder`](crate::encoding::DictEncoder) for more information.
pub struct DictDecoder<T: DataType> {
    // The dictionary, which maps ids to the values
    dictionary: Vec<T::T>,

    // Whether `dictionary` has been initialized
    has_dictionary: bool,

    // The decoder for the value ids
    rle_decoder: Option<RleDecoder>,

    // Number of values left in the data stream
    num_values: usize,
}

impl<T: DataType> Default for DictDecoder<T> {
    fn default() -> Self {
        Self::new()
    }
}

impl<T: DataType> DictDecoder<T> {
    /// Creates new dictionary decoder.
    pub fn new() -> Self {
        Self {
            dictionary: vec![],
            has_dictionary: false,
            rle_decoder: None,
            num_values: 0,
        }
    }

    /// Decodes and sets values for dictionary using `decoder` decoder.
    pub fn set_dict(&mut self, mut decoder: Box<dyn Decoder<T>>) -> Result<()> {
        let num_values = decoder.values_left();
        self.dictionary.resize(num_values, T::T::default());
        let _ = decoder.get(&mut self.dictionary)?;
        self.has_dictionary = true;
        Ok(())
    }
}

impl<T: DataType> Decoder<T> for DictDecoder<T> {
    fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> {
        // First byte in `data` is bit width
        let bit_width = data.as_ref()[0];
        let mut rle_decoder = RleDecoder::new(bit_width);
        rle_decoder.set_data(data.start_from(1));
        self.num_values = num_values;
        self.rle_decoder = Some(rle_decoder);
        Ok(())
    }

    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
        assert!(self.rle_decoder.is_some());
        assert!(self.has_dictionary, "Must call set_dict() first!");

        let rle = self.rle_decoder.as_mut().unwrap();
        let num_values = cmp::min(buffer.len(), self.num_values);
        rle.get_batch_with_dict(&self.dictionary[..], buffer, num_values)
    }

    /// Number of values left in this decoder stream
    fn values_left(&self) -> usize {
        self.num_values
    }

    fn encoding(&self) -> Encoding {
        Encoding::RLE_DICTIONARY
    }

    fn skip(&mut self, num_values: usize) -> Result<usize> {
        assert!(self.rle_decoder.is_some());
        assert!(self.has_dictionary, "Must call set_dict() first!");

        let rle = self.rle_decoder.as_mut().unwrap();
        let num_values = cmp::min(num_values, self.num_values);
        rle.skip(num_values)
    }
}

// ----------------------------------------------------------------------
// RLE Decoding

/// RLE/Bit-Packing hybrid decoding for values.
/// Currently is used only for data pages v2 and supports boolean types.
/// See [`RleValueEncoder`](crate::encoding::RleValueEncoder) for more information.
pub struct RleValueDecoder<T: DataType> {
    values_left: usize,
    decoder: RleDecoder,
    _phantom: PhantomData<T>,
}

impl<T: DataType> Default for RleValueDecoder<T> {
    fn default() -> Self {
        Self::new()
    }
}

impl<T: DataType> RleValueDecoder<T> {
    pub fn new() -> Self {
        Self {
            values_left: 0,
            decoder: RleDecoder::new(1),
            _phantom: PhantomData,
        }
    }
}

impl<T: DataType> Decoder<T> for RleValueDecoder<T> {
    #[inline]
    fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> {
        // Only support RLE value reader for boolean values with bit width of 1.
        ensure_phys_ty!(Type::BOOLEAN, "RleValueDecoder only supports BoolType");

        // We still need to remove prefix of i32 from the stream.
        const I32_SIZE: usize = mem::size_of::<i32>();
        let data_size = bit_util::read_num_bytes::<i32>(I32_SIZE, data.as_ref()) as usize;
        self.decoder = RleDecoder::new(1);
        self.decoder.set_data(data.range(I32_SIZE, data_size));
        self.values_left = num_values;
        Ok(())
    }

    #[inline]
    fn values_left(&self) -> usize {
        self.values_left
    }

    #[inline]
    fn encoding(&self) -> Encoding {
        Encoding::RLE
    }

    #[inline]
    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
        let num_values = cmp::min(buffer.len(), self.values_left);
        let values_read = self.decoder.get_batch(&mut buffer[..num_values])?;
        self.values_left -= values_read;
        Ok(values_read)
    }

    #[inline]
    fn skip(&mut self, num_values: usize) -> Result<usize> {
        let num_values = cmp::min(num_values, self.values_left);
        let values_skipped = self.decoder.skip(num_values)?;
        self.values_left -= values_skipped;
        Ok(values_skipped)
    }
}

// ----------------------------------------------------------------------
// DELTA_BINARY_PACKED Decoding

/// Delta binary packed decoder.
/// Supports INT32 and INT64 types.
/// See [`DeltaBitPackEncoder`](crate::encoding::DeltaBitPackEncoder) for more
/// information.
pub struct DeltaBitPackDecoder<T: DataType> {
    bit_reader: BitReader,
    initialized: bool,

    // Header info
    /// The number of values in each block
    block_size: usize,
    /// The number of values that remain to be read in the current page
    values_left: usize,
    /// The number of mini-blocks in each block
    mini_blocks_per_block: usize,
    /// The number of values in each mini block
    values_per_mini_block: usize,

    // Per block info
    /// The minimum delta in the block
    min_delta: T::T,
    /// The byte offset of the end of the current block
    block_end_offset: usize,
    /// The index on the current mini block
    mini_block_idx: usize,
    /// The bit widths of each mini block in the current block
    mini_block_bit_widths: Vec<u8>,
    /// The number of values remaining in the current mini block
    mini_block_remaining: usize,

    /// The first value from the block header if not consumed
    first_value: Option<T::T>,
    /// The last value to compute offsets from
    last_value: T::T,
}

impl<T: DataType> Default for DeltaBitPackDecoder<T>
where
    T::T: Default + FromPrimitive + WrappingAdd + Copy,
{
    fn default() -> Self {
        Self::new()
    }
}

impl<T: DataType> DeltaBitPackDecoder<T>
where
    T::T: Default + FromPrimitive + WrappingAdd + Copy,
{
    /// Creates new delta bit packed decoder.
    pub fn new() -> Self {
        Self {
            bit_reader: BitReader::from(vec![]),
            initialized: false,
            block_size: 0,
            values_left: 0,
            mini_blocks_per_block: 0,
            values_per_mini_block: 0,
            min_delta: Default::default(),
            mini_block_idx: 0,
            mini_block_bit_widths: vec![],
            mini_block_remaining: 0,
            block_end_offset: 0,
            first_value: None,
            last_value: Default::default(),
        }
    }

    /// Returns the current offset
    pub fn get_offset(&self) -> usize {
        assert!(self.initialized, "Bit reader is not initialized");
        match self.values_left {
            // If we've exhausted this page report the end of the current block
            // as we may not have consumed the trailing padding
            //
            // The max is necessary to handle pages which don't contain more than
            // one value and therefore have no blocks, but still contain a page header
            0 => self.bit_reader.get_byte_offset().max(self.block_end_offset),
            _ => self.bit_reader.get_byte_offset(),
        }
    }

    /// Initializes the next block and the first mini block within it
    #[inline]
    fn next_block(&mut self) -> Result<()> {
        let min_delta = self
            .bit_reader
            .get_zigzag_vlq_int()
            .ok_or_else(|| eof_err!("Not enough data to decode 'min_delta'"))?;

        self.min_delta = T::T::from_i64(min_delta)
            .ok_or_else(|| general_err!("'min_delta' too large"))?;

        self.mini_block_bit_widths.clear();
        self.bit_reader.get_aligned_bytes(
            &mut self.mini_block_bit_widths,
            self.mini_blocks_per_block,
        );

        let mut offset = self.bit_reader.get_byte_offset();
        let mut remaining = self.values_left;

        // Compute the end offset of the current block
        for b in &mut self.mini_block_bit_widths {
            if remaining == 0 {
                // Specification requires handling arbitrary bit widths
                // for trailing mini blocks
                *b = 0;
            }
            remaining = remaining.saturating_sub(self.values_per_mini_block);
            offset += *b as usize * self.values_per_mini_block / 8;
        }
        self.block_end_offset = offset;

        if self.mini_block_bit_widths.len() != self.mini_blocks_per_block {
            return Err(eof_err!("insufficient mini block bit widths"));
        }

        self.mini_block_remaining = self.values_per_mini_block;
        self.mini_block_idx = 0;

        Ok(())
    }

    /// Initializes the next mini block
    #[inline]
    fn next_mini_block(&mut self) -> Result<()> {
        if self.mini_block_idx + 1 < self.mini_block_bit_widths.len() {
            self.mini_block_idx += 1;
            self.mini_block_remaining = self.values_per_mini_block;
            Ok(())
        } else {
            self.next_block()
        }
    }
}

impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T>
where
    T::T: Default + FromPrimitive + WrappingAdd + Copy,
{
    // # of total values is derived from encoding
    #[inline]
    fn set_data(&mut self, data: ByteBufferPtr, _index: usize) -> Result<()> {
        self.bit_reader = BitReader::new(data);
        self.initialized = true;

        // Read header information
        self.block_size = self
            .bit_reader
            .get_vlq_int()
            .ok_or_else(|| eof_err!("Not enough data to decode 'block_size'"))?
            .try_into()
            .map_err(|_| general_err!("invalid 'block_size'"))?;

        self.mini_blocks_per_block = self
            .bit_reader
            .get_vlq_int()
            .ok_or_else(|| eof_err!("Not enough data to decode 'mini_blocks_per_block'"))?
            .try_into()
            .map_err(|_| general_err!("invalid 'mini_blocks_per_block'"))?;

        self.values_left = self
            .bit_reader
            .get_vlq_int()
            .ok_or_else(|| eof_err!("Not enough data to decode 'values_left'"))?
            .try_into()
            .map_err(|_| general_err!("invalid 'values_left'"))?;

        let first_value = self
            .bit_reader
            .get_zigzag_vlq_int()
            .ok_or_else(|| eof_err!("Not enough data to decode 'first_value'"))?;

        self.first_value = Some(
            T::T::from_i64(first_value)
                .ok_or_else(|| general_err!("first value too large"))?,
        );

        if self.block_size % 128 != 0 {
            return Err(general_err!(
                "'block_size' must be a multiple of 128, got {}",
                self.block_size
            ));
        }

        if self.block_size % self.mini_blocks_per_block != 0 {
            return Err(general_err!(
                "'block_size' must be a multiple of 'mini_blocks_per_block' got {} and {}",
                self.block_size, self.mini_blocks_per_block
            ));
        }

        // Reset decoding state
        self.mini_block_idx = 0;
        self.values_per_mini_block = self.block_size / self.mini_blocks_per_block;
        self.mini_block_remaining = 0;
        self.mini_block_bit_widths.clear();

        if self.values_per_mini_block % 32 != 0 {
            return Err(general_err!(
                "'values_per_mini_block' must be a multiple of 32 got {}",
                self.values_per_mini_block
            ));
        }

        Ok(())
    }

    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
        assert!(self.initialized, "Bit reader is not initialized");
        if buffer.is_empty() {
            return Ok(0);
        }

        let mut read = 0;
        let to_read = buffer.len().min(self.values_left);

        if let Some(value) = self.first_value.take() {
            self.last_value = value;
            buffer[0] = value;
            read += 1;
            self.values_left -= 1;
        }

        while read != to_read {
            if self.mini_block_remaining == 0 {
                self.next_mini_block()?;
            }

            let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
            let batch_to_read = self.mini_block_remaining.min(to_read - read);

            let batch_read = self
                .bit_reader
                .get_batch(&mut buffer[read..read + batch_to_read], bit_width);

            if batch_read != batch_to_read {
                return Err(general_err!(
                    "Expected to read {} values from miniblock got {}",
                    batch_to_read,
                    batch_read
                ));
            }

            // At this point we have read the deltas to `buffer` we now need to offset
            // these to get back to the original values that were encoded
            for v in &mut buffer[read..read + batch_read] {
                // It is OK for deltas to contain "overflowed" values after encoding,
                // e.g. i64::MAX - i64::MIN, so we use `wrapping_add` to "overflow" again and
                // restore original value.
                *v = v
                    .wrapping_add(&self.min_delta)
                    .wrapping_add(&self.last_value);

                self.last_value = *v;
            }

            read += batch_read;
            self.mini_block_remaining -= batch_read;
            self.values_left -= batch_read;
        }

        Ok(to_read)
    }

    fn values_left(&self) -> usize {
        self.values_left
    }

    fn encoding(&self) -> Encoding {
        Encoding::DELTA_BINARY_PACKED
    }

    fn skip(&mut self, num_values: usize) -> Result<usize> {
        let mut skip = 0;
        let to_skip = num_values.min(self.values_left);
        if to_skip == 0 {
            return Ok(0);
        }

        // try to consume first value in header.
        if let Some(value) = self.first_value.take() {
            self.last_value = value;
            skip += 1;
            self.values_left -= 1;
        }

        let mini_block_batch_size = match T::T::PHYSICAL_TYPE {
            Type::INT32 => 32,
            Type::INT64 => 64,
            _ => unreachable!(),
        };

        let mut skip_buffer = vec![T::T::default(); mini_block_batch_size];
        while skip < to_skip {
            if self.mini_block_remaining == 0 {
                self.next_mini_block()?;
            }

            let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
            let mini_block_to_skip = self.mini_block_remaining.min(to_skip - skip);
            let mini_block_should_skip = mini_block_to_skip;

            let skip_count = self
                .bit_reader
                .get_batch(&mut skip_buffer[0..mini_block_to_skip], bit_width);

            if skip_count != mini_block_to_skip {
                return Err(general_err!(
                    "Expected to skip {} values from mini block got {}.",
                    mini_block_batch_size,
                    skip_count
                ));
            }

            for v in &mut skip_buffer[0..skip_count] {
                *v = v
                    .wrapping_add(&self.min_delta)
                    .wrapping_add(&self.last_value);

                self.last_value = *v;
            }

            skip += mini_block_should_skip;
            self.mini_block_remaining -= mini_block_should_skip;
            self.values_left -= mini_block_should_skip;
        }

        Ok(to_skip)
    }
}

// ----------------------------------------------------------------------
// DELTA_LENGTH_BYTE_ARRAY Decoding

/// Delta length byte array decoder.
/// Only applied to byte arrays to separate the length values and the data, the lengths
/// are encoded using DELTA_BINARY_PACKED encoding.
/// See [`DeltaLengthByteArrayEncoder`](crate::encoding::DeltaLengthByteArrayEncoder)
/// for more information.
pub struct DeltaLengthByteArrayDecoder<T: DataType> {
    // Lengths for each byte array in `data`
    // TODO: add memory tracker to this
    lengths: Vec<i32>,

    // Current index into `lengths`
    current_idx: usize,

    // Concatenated byte array data
    data: Option<ByteBufferPtr>,

    // Offset into `data`, always point to the beginning of next byte array.
    offset: usize,

    // Number of values left in this decoder stream
    num_values: usize,

    // Placeholder to allow `T` as generic parameter
    _phantom: PhantomData<T>,
}

impl<T: DataType> Default for DeltaLengthByteArrayDecoder<T> {
    fn default() -> Self {
        Self::new()
    }
}

impl<T: DataType> DeltaLengthByteArrayDecoder<T> {
    /// Creates new delta length byte array decoder.
    pub fn new() -> Self {
        Self {
            lengths: vec![],
            current_idx: 0,
            data: None,
            offset: 0,
            num_values: 0,
            _phantom: PhantomData,
        }
    }
}

impl<T: DataType> Decoder<T> for DeltaLengthByteArrayDecoder<T> {
    fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> {
        match T::get_physical_type() {
            Type::BYTE_ARRAY => {
                let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
                len_decoder.set_data(data.all(), num_values)?;
                let num_lengths = len_decoder.values_left();
                self.lengths.resize(num_lengths, 0);
                len_decoder.get(&mut self.lengths[..])?;

                self.data = Some(data.start_from(len_decoder.get_offset()));
                self.offset = 0;
                self.current_idx = 0;
                self.num_values = num_lengths;
                Ok(())
            }
            _ => Err(general_err!(
                "DeltaLengthByteArrayDecoder only support ByteArrayType"
            )),
        }
    }

    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
        match T::get_physical_type() {
            Type::BYTE_ARRAY => {
                assert!(self.data.is_some());

                let data = self.data.as_ref().unwrap();
                let num_values = cmp::min(buffer.len(), self.num_values);

                for item in buffer.iter_mut().take(num_values) {
                    let len = self.lengths[self.current_idx] as usize;

                    item.as_mut_any()
                        .downcast_mut::<ByteArray>()
                        .unwrap()
                        .set_data(data.range(self.offset, len));

                    self.offset += len;
                    self.current_idx += 1;
                }

                self.num_values -= num_values;
                Ok(num_values)
            }
            _ => Err(general_err!(
                "DeltaLengthByteArrayDecoder only support ByteArrayType"
            )),
        }
    }

    fn values_left(&self) -> usize {
        self.num_values
    }

    fn encoding(&self) -> Encoding {
        Encoding::DELTA_LENGTH_BYTE_ARRAY
    }

    fn skip(&mut self, num_values: usize) -> Result<usize> {
        match T::get_physical_type() {
            Type::BYTE_ARRAY => {
                let num_values = cmp::min(num_values, self.num_values);

                let next_offset: i32 = self.lengths
                    [self.current_idx..self.current_idx + num_values]
                    .iter()
                    .sum();

                self.current_idx += num_values;
                self.offset += next_offset as usize;

                self.num_values -= num_values;
                Ok(num_values)
            }
            other_type => Err(general_err!(
                "DeltaLengthByteArrayDecoder not support {}, only support byte array",
                other_type
            )),
        }
    }
}

// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY Decoding

/// Delta byte array decoder.
/// Prefix lengths are encoded using `DELTA_BINARY_PACKED` encoding, Suffixes are stored
/// using `DELTA_LENGTH_BYTE_ARRAY` encoding.
/// See [`DeltaByteArrayEncoder`](crate::encoding::DeltaByteArrayEncoder) for more
/// information.
pub struct DeltaByteArrayDecoder<T: DataType> {
    // Prefix lengths for each byte array
    // TODO: add memory tracker to this
    prefix_lengths: Vec<i32>,

    // The current index into `prefix_lengths`,
    current_idx: usize,

    // Decoder for all suffixes, the # of which should be the same as
    // `prefix_lengths.len()`
    suffix_decoder: Option<DeltaLengthByteArrayDecoder<ByteArrayType>>,

    // The last byte array, used to derive the current prefix
    previous_value: Vec<u8>,

    // Number of values left
    num_values: usize,

    // Placeholder to allow `T` as generic parameter
    _phantom: PhantomData<T>,
}

impl<T: DataType> Default for DeltaByteArrayDecoder<T> {
    fn default() -> Self {
        Self::new()
    }
}

impl<T: DataType> DeltaByteArrayDecoder<T> {
    /// Creates new delta byte array decoder.
    pub fn new() -> Self {
        Self {
            prefix_lengths: vec![],
            current_idx: 0,
            suffix_decoder: None,
            previous_value: vec![],
            num_values: 0,
            _phantom: PhantomData,
        }
    }
}

impl<T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {
    fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> {
        match T::get_physical_type() {
            Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
                let mut prefix_len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
                prefix_len_decoder.set_data(data.all(), num_values)?;
                let num_prefixes = prefix_len_decoder.values_left();
                self.prefix_lengths.resize(num_prefixes, 0);
                prefix_len_decoder.get(&mut self.prefix_lengths[..])?;

                let mut suffix_decoder = DeltaLengthByteArrayDecoder::new();
                suffix_decoder
                    .set_data(data.start_from(prefix_len_decoder.get_offset()), num_values)?;
                self.suffix_decoder = Some(suffix_decoder);
                self.num_values = num_prefixes;
                self.current_idx = 0;
                self.previous_value.clear();
                Ok(())
            }
            _ => {
                Err(general_err!(
                    "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
                ))
            }
        }
    }

    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
        match T::get_physical_type() {
            ty @ Type::BYTE_ARRAY | ty @ Type::FIXED_LEN_BYTE_ARRAY => {
                let num_values = cmp::min(buffer.len(), self.num_values);
                let mut v: [ByteArray; 1] = [ByteArray::new(); 1];
                for item in buffer.iter_mut().take(num_values) {
                    // Process suffix
                    // TODO: this is awkward - maybe we should add a non-vectorized API?
                    let suffix_decoder = self.suffix_decoder.as_mut().expect("decoder not initialized");
                    suffix_decoder.get(&mut v[..])?;
                    let suffix = v[0].data();

                    // Extract current prefix length, can be 0
                    let prefix_len = self.prefix_lengths[self.current_idx] as usize;

                    // Concatenate prefix with suffix
                    let mut result = Vec::new();
                    result.extend_from_slice(&self.previous_value[0..prefix_len]);
                    result.extend_from_slice(suffix);

                    let data = ByteBufferPtr::new(result.clone());

                    match ty {
                        Type::BYTE_ARRAY => item
                            .as_mut_any()
                            .downcast_mut::<ByteArray>()
                            .unwrap()
                            .set_data(data),
                        Type::FIXED_LEN_BYTE_ARRAY => item
                            .as_mut_any()
                            .downcast_mut::<FixedLenByteArray>()
                            .unwrap()
                            .set_data(data),
                        _ => unreachable!(),
                    };

                    self.previous_value = result;
                    self.current_idx += 1;
                }

                self.num_values -= num_values;
                Ok(num_values)
            }
            _ => {
                Err(general_err!(
                    "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
                ))
            }
        }
    }

    fn values_left(&self) -> usize {
        self.num_values
    }

    fn encoding(&self) -> Encoding {
        Encoding::DELTA_BYTE_ARRAY
    }

    fn skip(&mut self, num_values: usize) -> Result<usize> {
        let mut buffer = vec![T::T::default(); num_values];
        self.get(&mut buffer)
    }
}

#[cfg(test)]
mod tests {
    use super::{super::encoding::*, *};

    use std::f32::consts::PI as PI_f32;
    use std::f64::consts::PI as PI_f64;
    use std::sync::Arc;

    use crate::schema::types::{
        ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType,
    };
    use crate::util::test_common::rand_gen::RandGen;

    #[test]
    fn test_get_decoders() {
        // supported encodings
        create_and_check_decoder::<Int32Type>(Encoding::PLAIN, None);
        create_and_check_decoder::<Int32Type>(Encoding::DELTA_BINARY_PACKED, None);
        create_and_check_decoder::<ByteArrayType>(
            Encoding::DELTA_LENGTH_BYTE_ARRAY,
            None,
        );
        create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, None);
        create_and_check_decoder::<BoolType>(Encoding::RLE, None);

        // error when initializing
        create_and_check_decoder::<Int32Type>(
            Encoding::RLE_DICTIONARY,
            Some(general_err!(
                "Cannot initialize this encoding through this function"
            )),
        );
        create_and_check_decoder::<Int32Type>(
            Encoding::PLAIN_DICTIONARY,
            Some(general_err!(
                "Cannot initialize this encoding through this function"
            )),
        );
        create_and_check_decoder::<Int32Type>(
            Encoding::DELTA_LENGTH_BYTE_ARRAY,
            Some(general_err!(
                "Encoding DELTA_LENGTH_BYTE_ARRAY is not supported for type"
            )),
        );
        create_and_check_decoder::<Int32Type>(
            Encoding::DELTA_BYTE_ARRAY,
            Some(general_err!(
                "Encoding DELTA_BYTE_ARRAY is not supported for type"
            )),
        );

        // unsupported
        create_and_check_decoder::<Int32Type>(
            Encoding::BIT_PACKED,
            Some(nyi_err!("Encoding BIT_PACKED is not supported")),
        );
    }

    #[test]
    fn test_plain_decode_int32() {
        let data = vec![42, 18, 52];
        let data_bytes = Int32Type::to_byte_array(&data[..]);
        let mut buffer = vec![0; 3];
        test_plain_decode::<Int32Type>(
            ByteBufferPtr::new(data_bytes),
            3,
            -1,
            &mut buffer[..],
            &data[..],
        );
    }

    #[test]
    fn test_plain_skip_int32() {
        let data = vec![42, 18, 52];
        let data_bytes = Int32Type::to_byte_array(&data[..]);
        test_plain_skip::<Int32Type>(
            ByteBufferPtr::new(data_bytes),
            3,
            1,
            -1,
            &data[1..],
        );
    }

    #[test]
    fn test_plain_skip_all_int32() {
        let data = vec![42, 18, 52];
        let data_bytes = Int32Type::to_byte_array(&data[..]);
        test_plain_skip::<Int32Type>(ByteBufferPtr::new(data_bytes), 3, 5, -1, &[]);
    }

    #[test]
    fn test_plain_decode_int32_spaced() {
        let data = [42, 18, 52];
        let expected_data = [0, 42, 0, 18, 0, 0, 52, 0];
        let data_bytes = Int32Type::to_byte_array(&data[..]);
        let mut buffer = vec![0; 8];
        let num_nulls = 5;
        let valid_bits = [0b01001010];
        test_plain_decode_spaced::<Int32Type>(
            ByteBufferPtr::new(data_bytes),
            3,
            -1,
            &mut buffer[..],
            num_nulls,
            &valid_bits,
            &expected_data[..],
        );
    }

    #[test]
    fn test_plain_decode_int64() {
        let data = vec![42, 18, 52];
        let data_bytes = Int64Type::to_byte_array(&data[..]);
        let mut buffer = vec![0; 3];
        test_plain_decode::<Int64Type>(
            ByteBufferPtr::new(data_bytes),
            3,
            -1,
            &mut buffer[..],
            &data[..],
        );
    }

    #[test]
    fn test_plain_skip_int64() {
        let data = vec![42, 18, 52];
        let data_bytes = Int64Type::to_byte_array(&data[..]);
        test_plain_skip::<Int64Type>(
            ByteBufferPtr::new(data_bytes),
            3,
            2,
            -1,
            &data[2..],
        );
    }

    #[test]
    fn test_plain_skip_all_int64() {
        let data = vec![42, 18, 52];
        let data_bytes = Int64Type::to_byte_array(&data[..]);
        test_plain_skip::<Int64Type>(ByteBufferPtr::new(data_bytes), 3, 3, -1, &[]);
    }

    #[test]
    fn test_plain_decode_float() {
        let data = vec![PI_f32, 2.414, 12.51];
        let data_bytes = FloatType::to_byte_array(&data[..]);
        let mut buffer = vec![0.0; 3];
        test_plain_decode::<FloatType>(
            ByteBufferPtr::new(data_bytes),
            3,
            -1,
            &mut buffer[..],
            &data[..],
        );
    }

    #[test]
    fn test_plain_skip_float() {
        let data = vec![PI_f32, 2.414, 12.51];
        let data_bytes = FloatType::to_byte_array(&data[..]);
        test_plain_skip::<FloatType>(
            ByteBufferPtr::new(data_bytes),
            3,
            1,
            -1,
            &data[1..],
        );
    }

    #[test]
    fn test_plain_skip_all_float() {
        let data = vec![PI_f32, 2.414, 12.51];
        let data_bytes = FloatType::to_byte_array(&data[..]);
        test_plain_skip::<FloatType>(ByteBufferPtr::new(data_bytes), 3, 4, -1, &[]);
    }

    #[test]
    fn test_plain_skip_double() {
        let data = vec![PI_f64, 2.414f64, 12.51f64];
        let data_bytes = DoubleType::to_byte_array(&data[..]);
        test_plain_skip::<DoubleType>(
            ByteBufferPtr::new(data_bytes),
            3,
            1,
            -1,
            &data[1..],
        );
    }

    #[test]
    fn test_plain_skip_all_double() {
        let data = vec![PI_f64, 2.414f64, 12.51f64];
        let data_bytes = DoubleType::to_byte_array(&data[..]);
        test_plain_skip::<DoubleType>(ByteBufferPtr::new(data_bytes), 3, 5, -1, &[]);
    }

    #[test]
    fn test_plain_decode_double() {
        let data = vec![PI_f64, 2.414f64, 12.51f64];
        let data_bytes = DoubleType::to_byte_array(&data[..]);
        let mut buffer = vec![0.0f64; 3];
        test_plain_decode::<DoubleType>(
            ByteBufferPtr::new(data_bytes),
            3,
            -1,
            &mut buffer[..],
            &data[..],
        );
    }

    #[test]
    fn test_plain_decode_int96() {
        let mut data = vec![Int96::new(); 4];
        data[0].set_data(11, 22, 33);
        data[1].set_data(44, 55, 66);
        data[2].set_data(10, 20, 30);
        data[3].set_data(40, 50, 60);
        let data_bytes = Int96Type::to_byte_array(&data[..]);
        let mut buffer = vec![Int96::new(); 4];
        test_plain_decode::<Int96Type>(
            ByteBufferPtr::new(data_bytes),
            4,
            -1,
            &mut buffer[..],
            &data[..],
        );
    }

    #[test]
    fn test_plain_skip_int96() {
        let mut data = vec![Int96::new(); 4];
        data[0].set_data(11, 22, 33);
        data[1].set_data(44, 55, 66);
        data[2].set_data(10, 20, 30);
        data[3].set_data(40, 50, 60);
        let data_bytes = Int96Type::to_byte_array(&data[..]);
        test_plain_skip::<Int96Type>(
            ByteBufferPtr::new(data_bytes),
            4,
            2,
            -1,
            &data[2..],
        );
    }

    #[test]
    fn test_plain_skip_all_int96() {
        let mut data = vec![Int96::new(); 4];
        data[0].set_data(11, 22, 33);
        data[1].set_data(44, 55, 66);
        data[2].set_data(10, 20, 30);
        data[3].set_data(40, 50, 60);
        let data_bytes = Int96Type::to_byte_array(&data[..]);
        test_plain_skip::<Int96Type>(ByteBufferPtr::new(data_bytes), 4, 8, -1, &[]);
    }

    #[test]
    fn test_plain_decode_bool() {
        let data = vec![
            false, true, false, false, true, false, true, true, false, true,
        ];
        let data_bytes = BoolType::to_byte_array(&data[..]);
        let mut buffer = vec![false; 10];
        test_plain_decode::<BoolType>(
            ByteBufferPtr::new(data_bytes),
            10,
            -1,
            &mut buffer[..],
            &data[..],
        );
    }

    #[test]
    fn test_plain_skip_bool() {
        let data = vec![
            false, true, false, false, true, false, true, true, false, true,
        ];
        let data_bytes = BoolType::to_byte_array(&data[..]);
        test_plain_skip::<BoolType>(
            ByteBufferPtr::new(data_bytes),
            10,
            5,
            -1,
            &data[5..],
        );
    }

    #[test]
    fn test_plain_skip_all_bool() {
        let data = vec![
            false, true, false, false, true, false, true, true, false, true,
        ];
        let data_bytes = BoolType::to_byte_array(&data[..]);
        test_plain_skip::<BoolType>(ByteBufferPtr::new(data_bytes), 10, 20, -1, &[]);
    }

    #[test]
    fn test_plain_decode_byte_array() {
        let mut data = vec![ByteArray::new(); 2];
        data[0].set_data(ByteBufferPtr::new(String::from("hello").into_bytes()));
        data[1].set_data(ByteBufferPtr::new(String::from("parquet").into_bytes()));
        let data_bytes = ByteArrayType::to_byte_array(&data[..]);
        let mut buffer = vec![ByteArray::new(); 2];
        test_plain_decode::<ByteArrayType>(
            ByteBufferPtr::new(data_bytes),
            2,
            -1,
            &mut buffer[..],
            &data[..],
        );
    }

    #[test]
    fn test_plain_skip_byte_array() {
        let mut data = vec![ByteArray::new(); 2];
        data[0].set_data(ByteBufferPtr::new(String::from("hello").into_bytes()));
        data[1].set_data(ByteBufferPtr::new(String::from("parquet").into_bytes()));
        let data_bytes = ByteArrayType::to_byte_array(&data[..]);
        test_plain_skip::<ByteArrayType>(
            ByteBufferPtr::new(data_bytes),
            2,
            1,
            -1,
            &data[1..],
        );
    }

    #[test]
    fn test_plain_skip_all_byte_array() {
        let mut data = vec![ByteArray::new(); 2];
        data[0].set_data(ByteBufferPtr::new(String::from("hello").into_bytes()));
        data[1].set_data(ByteBufferPtr::new(String::from("parquet").into_bytes()));
        let data_bytes = ByteArrayType::to_byte_array(&data[..]);
        test_plain_skip::<ByteArrayType>(ByteBufferPtr::new(data_bytes), 2, 2, -1, &[]);
    }

    #[test]
    fn test_plain_decode_fixed_len_byte_array() {
        let mut data = vec![FixedLenByteArray::default(); 3];
        data[0].set_data(ByteBufferPtr::new(String::from("bird").into_bytes()));
        data[1].set_data(ByteBufferPtr::new(String::from("come").into_bytes()));
        data[2].set_data(ByteBufferPtr::new(String::from("flow").into_bytes()));
        let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
        let mut buffer = vec![FixedLenByteArray::default(); 3];
        test_plain_decode::<FixedLenByteArrayType>(
            ByteBufferPtr::new(data_bytes),
            3,
            4,
            &mut buffer[..],
            &data[..],
        );
    }

    #[test]
    fn test_plain_skip_fixed_len_byte_array() {
        let mut data = vec![FixedLenByteArray::default(); 3];
        data[0].set_data(ByteBufferPtr::new(String::from("bird").into_bytes()));
        data[1].set_data(ByteBufferPtr::new(String::from("come").into_bytes()));
        data[2].set_data(ByteBufferPtr::new(String::from("flow").into_bytes()));
        let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
        test_plain_skip::<FixedLenByteArrayType>(
            ByteBufferPtr::new(data_bytes),
            3,
            1,
            4,
            &data[1..],
        );
    }

    #[test]
    fn test_plain_skip_all_fixed_len_byte_array() {
        let mut data = vec![FixedLenByteArray::default(); 3];
        data[0].set_data(ByteBufferPtr::new(String::from("bird").into_bytes()));
        data[1].set_data(ByteBufferPtr::new(String::from("come").into_bytes()));
        data[2].set_data(ByteBufferPtr::new(String::from("flow").into_bytes()));
        let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
        test_plain_skip::<FixedLenByteArrayType>(
            ByteBufferPtr::new(data_bytes),
            3,
            6,
            4,
            &[],
        );
    }

    fn test_plain_decode<T: DataType>(
        data: ByteBufferPtr,
        num_values: usize,
        type_length: i32,
        buffer: &mut [T::T],
        expected: &[T::T],
    ) {
        let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
        let result = decoder.set_data(data, num_values);
        assert!(result.is_ok());
        let result = decoder.get(buffer);
        assert!(result.is_ok());
        assert_eq!(decoder.values_left(), 0);
        assert_eq!(buffer, expected);
    }

    fn test_plain_skip<T: DataType>(
        data: ByteBufferPtr,
        num_values: usize,
        skip: usize,
        type_length: i32,
        expected: &[T::T],
    ) {
        let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
        let result = decoder.set_data(data, num_values);
        assert!(result.is_ok());
        let skipped = decoder.skip(skip).expect("skipping values");

        if skip >= num_values {
            assert_eq!(skipped, num_values);

            let mut buffer = vec![T::T::default(); 1];
            let remaining = decoder.get(&mut buffer).expect("getting remaining values");
            assert_eq!(remaining, 0);
        } else {
            assert_eq!(skipped, skip);
            let mut buffer = vec![T::T::default(); num_values - skip];
            let remaining = decoder.get(&mut buffer).expect("getting remaining values");
            assert_eq!(remaining, num_values - skip);
            assert_eq!(decoder.values_left(), 0);
            assert_eq!(buffer, expected);
        }
    }

    fn test_plain_decode_spaced<T: DataType>(
        data: ByteBufferPtr,
        num_values: usize,
        type_length: i32,
        buffer: &mut [T::T],
        num_nulls: usize,
        valid_bits: &[u8],
        expected: &[T::T],
    ) {
        let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
        let result = decoder.set_data(data, num_values);
        assert!(result.is_ok());
        let result = decoder.get_spaced(buffer, num_nulls, valid_bits);
        assert!(result.is_ok());
        assert_eq!(num_values + num_nulls, result.unwrap());
        assert_eq!(decoder.values_left(), 0);
        assert_eq!(buffer, expected);
    }

    #[test]
    #[should_panic(expected = "RleValueEncoder only supports BoolType")]
    fn test_rle_value_encode_int32_not_supported() {
        let mut encoder = RleValueEncoder::<Int32Type>::new();
        encoder.put(&[1, 2, 3, 4]).unwrap();
    }

    #[test]
    #[should_panic(expected = "RleValueDecoder only supports BoolType")]
    fn test_rle_value_decode_int32_not_supported() {
        let mut decoder = RleValueDecoder::<Int32Type>::new();
        decoder
            .set_data(ByteBufferPtr::new(vec![5, 0, 0, 0]), 1)
            .unwrap();
    }

    #[test]
    fn test_rle_value_decode_bool_decode() {
        // Test multiple 'put' calls on the same encoder
        let data = vec![
            BoolType::gen_vec(-1, 256),
            BoolType::gen_vec(-1, 257),
            BoolType::gen_vec(-1, 126),
        ];
        test_rle_value_decode::<BoolType>(data);
    }

    #[test]
    #[should_panic(expected = "Bit reader is not initialized")]
    fn test_delta_bit_packed_not_initialized_offset() {
        // Fail if set_data() is not called before get_offset()
        let decoder = DeltaBitPackDecoder::<Int32Type>::new();
        decoder.get_offset();
    }

    #[test]
    #[should_panic(expected = "Bit reader is not initialized")]
    fn test_delta_bit_packed_not_initialized_get() {
        // Fail if set_data() is not called before get()
        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
        let mut buffer = vec![];
        decoder.get(&mut buffer).unwrap();
    }

    #[test]
    fn test_delta_bit_packed_int32_empty() {
        let data = vec![vec![0; 0]];
        test_delta_bit_packed_decode::<Int32Type>(data);
    }

    #[test]
    fn test_delta_bit_packed_int32_repeat() {
        let block_data = vec![
            1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2,
            3, 4, 5, 6, 7, 8,
        ];
        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
    }

    #[test]
    fn test_skip_delta_bit_packed_int32_repeat() {
        let block_data = vec![
            1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2,
            3, 4, 5, 6, 7, 8,
        ];
        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 10);
        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
    }

    #[test]
    fn test_delta_bit_packed_int32_uneven() {
        let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
    }

    #[test]
    fn test_skip_delta_bit_packed_int32_uneven() {
        let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
    }

    #[test]
    fn test_delta_bit_packed_int32_same_values() {
        let block_data = vec![
            127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
            127,
        ];
        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);

        let block_data = vec![
            -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
            -127, -127, -127,
        ];
        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
    }

    #[test]
    fn test_skip_delta_bit_packed_int32_same_values() {
        let block_data = vec![
            127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
            127,
        ];
        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);

        let block_data = vec![
            -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
            -127, -127, -127,
        ];
        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
    }

    #[test]
    fn test_delta_bit_packed_int32_min_max() {
        let block_data = vec![
            i32::MIN,
            i32::MIN,
            i32::MIN,
            i32::MAX,
            i32::MIN,
            i32::MAX,
            i32::MIN,
            i32::MAX,
        ];
        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
    }

    #[test]
    fn test_skip_delta_bit_packed_int32_min_max() {
        let block_data = vec![
            i32::MIN,
            i32::MIN,
            i32::MIN,
            i32::MAX,
            i32::MIN,
            i32::MAX,
            i32::MIN,
            i32::MAX,
        ];
        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
    }

    #[test]
    fn test_delta_bit_packed_int32_multiple_blocks() {
        // Test multiple 'put' calls on the same encoder
        let data = vec![
            Int32Type::gen_vec(-1, 64),
            Int32Type::gen_vec(-1, 128),
            Int32Type::gen_vec(-1, 64),
        ];
        test_delta_bit_packed_decode::<Int32Type>(data);
    }

    #[test]
    fn test_delta_bit_packed_int32_data_across_blocks() {
        // Test multiple 'put' calls on the same encoder
        let data = vec![Int32Type::gen_vec(-1, 256), Int32Type::gen_vec(-1, 257)];
        test_delta_bit_packed_decode::<Int32Type>(data);
    }

    #[test]
    fn test_delta_bit_packed_int32_with_empty_blocks() {
        let data = vec![
            Int32Type::gen_vec(-1, 128),
            vec![0; 0],
            Int32Type::gen_vec(-1, 64),
        ];
        test_delta_bit_packed_decode::<Int32Type>(data);
    }

    #[test]
    fn test_delta_bit_packed_int64_empty() {
        let data = vec![vec![0; 0]];
        test_delta_bit_packed_decode::<Int64Type>(data);
    }

    #[test]
    fn test_delta_bit_packed_int64_min_max() {
        let block_data = vec![
            i64::min_value(),
            i64::max_value(),
            i64::min_value(),
            i64::max_value(),
            i64::min_value(),
            i64::max_value(),
            i64::min_value(),
            i64::max_value(),
        ];
        test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
    }

    #[test]
    fn test_delta_bit_packed_int64_multiple_blocks() {
        // Test multiple 'put' calls on the same encoder
        let data = vec![
            Int64Type::gen_vec(-1, 64),
            Int64Type::gen_vec(-1, 128),
            Int64Type::gen_vec(-1, 64),
        ];
        test_delta_bit_packed_decode::<Int64Type>(data);
    }

    #[test]
    fn test_delta_bit_packed_decoder_sample() {
        let data_bytes = vec![
            128, 1, 4, 3, 58, 28, 6, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
            0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        ];
        let buffer = ByteBufferPtr::new(data_bytes);
        let mut decoder: DeltaBitPackDecoder<Int32Type> = DeltaBitPackDecoder::new();
        decoder.set_data(buffer, 3).unwrap();
        // check exact offsets, because when reading partial values we end up with
        // some data not being read from bit reader
        assert_eq!(decoder.get_offset(), 5);
        let mut result = vec![0, 0, 0];
        decoder.get(&mut result).unwrap();
        assert_eq!(decoder.get_offset(), 34);
        assert_eq!(result, vec![29, 43, 89]);
    }

    #[test]
    fn test_delta_bit_packed_padding() {
        // Page header
        let header = vec![
            // Page Header

            // Block Size - 256
            128,
            2,
            // Miniblocks in block,
            4,
            // Total value count - 419
            128 + 35,
            3,
            // First value - 7
            7,
        ];

        // Block Header
        let block1_header = vec![
            0, // Min delta
            0, 1, 0, 0, // Bit widths
        ];

        // Mini-block 1 - bit width 0 => 0 bytes
        // Mini-block 2 - bit width 1 => 8 bytes
        // Mini-block 3 - bit width 0 => 0 bytes
        // Mini-block 4 - bit width 0 => 0 bytes
        let block1 = vec![0xFF; 8];

        // Block Header
        let block2_header = vec![
            0, // Min delta
            0, 1, 2, 0xFF, // Bit widths, including non-zero padding
        ];

        // Mini-block 1 - bit width 0 => 0 bytes
        // Mini-block 2 - bit width 1 => 8 bytes
        // Mini-block 3 - bit width 2 => 16 bytes
        // Mini-block 4 - padding => no bytes
        let block2 = vec![0xFF; 24];

        let data: Vec<u8> = header
            .into_iter()
            .chain(block1_header)
            .chain(block1)
            .chain(block2_header)
            .chain(block2)
            .collect();

        let length = data.len();

        let ptr = ByteBufferPtr::new(data);
        let mut reader = BitReader::new(ptr.clone());
        assert_eq!(reader.get_vlq_int().unwrap(), 256);
        assert_eq!(reader.get_vlq_int().unwrap(), 4);
        assert_eq!(reader.get_vlq_int().unwrap(), 419);
        assert_eq!(reader.get_vlq_int().unwrap(), 7);

        // Test output buffer larger than needed and not exact multiple of block size
        let mut output = vec![0_i32; 420];

        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
        decoder.set_data(ptr.clone(), 0).unwrap();
        assert_eq!(decoder.get(&mut output).unwrap(), 419);
        assert_eq!(decoder.get_offset(), length);

        // Test with truncated buffer
        decoder.set_data(ptr.range(0, 12), 0).unwrap();
        let err = decoder.get(&mut output).unwrap_err().to_string();
        assert!(
            err.contains("Expected to read 64 values from miniblock got 8"),
            "{}",
            err
        );
    }

    #[test]
    fn test_delta_byte_array_same_arrays() {
        let data = vec![
            vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])],
            vec![
                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
            ],
            vec![
                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
            ],
        ];
        test_delta_byte_array_decode(data);
    }

    #[test]
    fn test_delta_byte_array_unique_arrays() {
        let data = vec![
            vec![ByteArray::from(vec![1])],
            vec![ByteArray::from(vec![2, 3]), ByteArray::from(vec![4, 5, 6])],
            vec![
                ByteArray::from(vec![7, 8]),
                ByteArray::from(vec![9, 0, 1, 2]),
            ],
        ];
        test_delta_byte_array_decode(data);
    }

    #[test]
    fn test_delta_byte_array_single_array() {
        let data = vec![vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])]];
        test_delta_byte_array_decode(data);
    }

    fn test_rle_value_decode<T: DataType>(data: Vec<Vec<T::T>>) {
        test_encode_decode::<T>(data, Encoding::RLE);
    }

    fn test_delta_bit_packed_decode<T: DataType>(data: Vec<Vec<T::T>>) {
        test_encode_decode::<T>(data, Encoding::DELTA_BINARY_PACKED);
    }

    fn test_delta_byte_array_decode(data: Vec<Vec<ByteArray>>) {
        test_encode_decode::<ByteArrayType>(data, Encoding::DELTA_BYTE_ARRAY);
    }

    // Input data represents vector of data slices to write (test multiple `put()` calls)
    // For example,
    //   vec![vec![1, 2, 3]] invokes `put()` once and writes {1, 2, 3}
    //   vec![vec![1, 2], vec![3]] invokes `put()` twice and writes {1, 2, 3}
    fn test_encode_decode<T: DataType>(data: Vec<Vec<T::T>>, encoding: Encoding) {
        // Type length should not really matter for encode/decode test,
        // otherwise change it based on type
        let col_descr = create_test_col_desc_ptr(-1, T::get_physical_type());

        // Encode data
        let mut encoder = get_encoder::<T>(encoding).expect("get encoder");

        for v in &data[..] {
            encoder.put(&v[..]).expect("ok to encode");
        }
        let bytes = encoder.flush_buffer().expect("ok to flush buffer");

        // Flatten expected data as contiguous array of values
        let expected: Vec<T::T> = data.iter().flat_map(|s| s.clone()).collect();

        // Decode data and compare with original
        let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");

        let mut result = vec![T::T::default(); expected.len()];
        decoder
            .set_data(bytes, expected.len())
            .expect("ok to set data");
        let mut result_num_values = 0;
        while decoder.values_left() > 0 {
            result_num_values += decoder
                .get(&mut result[result_num_values..])
                .expect("ok to decode");
        }
        assert_eq!(result_num_values, expected.len());
        assert_eq!(result, expected);
    }

    fn test_skip<T: DataType>(data: Vec<T::T>, encoding: Encoding, skip: usize) {
        // Type length should not really matter for encode/decode test,
        // otherwise change it based on type
        let col_descr = create_test_col_desc_ptr(-1, T::get_physical_type());

        // Encode data
        let mut encoder = get_encoder::<T>(encoding).expect("get encoder");

        encoder.put(&data).expect("ok to encode");

        let bytes = encoder.flush_buffer().expect("ok to flush buffer");

        let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
        decoder.set_data(bytes, data.len()).expect("ok to set data");

        if skip >= data.len() {
            let skipped = decoder.skip(skip).expect("ok to skip");
            assert_eq!(skipped, data.len());

            let skipped_again = decoder.skip(skip).expect("ok to skip again");
            assert_eq!(skipped_again, 0);
        } else {
            let skipped = decoder.skip(skip).expect("ok to skip");
            assert_eq!(skipped, skip);

            let remaining = data.len() - skip;

            let expected = &data[skip..];
            let mut buffer = vec![T::T::default(); remaining];
            let fetched = decoder.get(&mut buffer).expect("ok to decode");
            assert_eq!(remaining, fetched);
            assert_eq!(&buffer, expected);
        }
    }

    fn create_and_check_decoder<T: DataType>(
        encoding: Encoding,
        err: Option<ParquetError>,
    ) {
        let descr = create_test_col_desc_ptr(-1, T::get_physical_type());
        let decoder = get_decoder::<T>(descr, encoding);
        match err {
            Some(parquet_error) => {
                assert_eq!(
                    decoder.err().unwrap().to_string(),
                    parquet_error.to_string()
                );
            }
            None => {
                assert_eq!(decoder.unwrap().encoding(), encoding);
            }
        }
    }

    // Creates test column descriptor.
    fn create_test_col_desc_ptr(type_len: i32, t: Type) -> ColumnDescPtr {
        let ty = SchemaType::primitive_type_builder("t", t)
            .with_length(type_len)
            .build()
            .unwrap();
        Arc::new(ColumnDescriptor::new(
            Arc::new(ty),
            0,
            0,
            ColumnPath::new(vec![]),
        ))
    }

    fn usize_to_bytes(v: usize) -> [u8; 4] {
        (v as u32).to_ne_bytes()
    }

    /// A util trait to convert slices of different types to byte arrays
    trait ToByteArray<T: DataType> {
        #[allow(clippy::wrong_self_convention)]
        fn to_byte_array(data: &[T::T]) -> Vec<u8>;
    }

    macro_rules! to_byte_array_impl {
        ($ty: ty) => {
            impl ToByteArray<$ty> for $ty {
                #[allow(clippy::wrong_self_convention)]
                fn to_byte_array(data: &[<$ty as DataType>::T]) -> Vec<u8> {
                    <$ty as DataType>::T::slice_as_bytes(data).to_vec()
                }
            }
        };
    }

    to_byte_array_impl!(Int32Type);
    to_byte_array_impl!(Int64Type);
    to_byte_array_impl!(FloatType);
    to_byte_array_impl!(DoubleType);

    impl ToByteArray<BoolType> for BoolType {
        #[allow(clippy::wrong_self_convention)]
        fn to_byte_array(data: &[bool]) -> Vec<u8> {
            let mut v = vec![];
            for (i, item) in data.iter().enumerate() {
                if i % 8 == 0 {
                    v.push(0);
                }
                if *item {
                    v[i / 8] |= 1 << (i % 8);
                }
            }
            v
        }
    }

    impl ToByteArray<Int96Type> for Int96Type {
        #[allow(clippy::wrong_self_convention)]
        fn to_byte_array(data: &[Int96]) -> Vec<u8> {
            let mut v = vec![];
            for d in data {
                v.extend_from_slice(d.as_bytes());
            }
            v
        }
    }

    impl ToByteArray<ByteArrayType> for ByteArrayType {
        #[allow(clippy::wrong_self_convention)]
        fn to_byte_array(data: &[ByteArray]) -> Vec<u8> {
            let mut v = vec![];
            for d in data {
                let buf = d.data();
                let len = &usize_to_bytes(buf.len());
                v.extend_from_slice(len);
                v.extend(buf);
            }
            v
        }
    }

    impl ToByteArray<FixedLenByteArrayType> for FixedLenByteArrayType {
        #[allow(clippy::wrong_self_convention)]
        fn to_byte_array(data: &[FixedLenByteArray]) -> Vec<u8> {
            let mut v = vec![];
            for d in data {
                let buf = d.data();
                v.extend(buf);
            }
            v
        }
    }
}