use std::collections::HashMap;
use std::marker::PhantomData;
use std::mem;
use scroll::{ctx, Endian, Pread, Pwrite, LE};
use crate::error::CodingError;
use crate::filter::{SectFilterSink, VectorFilter};
use crate::section::*;
use crate::sink::*;
#[repr(C)]
#[derive(Debug, PartialEq, Pwrite)]
pub struct BinaryVector {
num_bytes: u32, major_type: VectorType, minor_type: VectorSubType,
_padding: u16,
}
#[repr(u8)]
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum VectorType {
Empty = 0x01,
BinSimple = 0x06,
BinDict = 0x07,
Delta2 = 0x08, Histogram = 0x09, FixedSection256 = 0x10, }
impl VectorType {
pub fn as_num(&self) -> u8 { *self as u8 }
}
impl ctx::TryIntoCtx<Endian> for &VectorType {
type Error = scroll::Error;
fn try_into_ctx(self, buf: &mut [u8], ctx: Endian) -> Result<usize, Self::Error> {
u8::try_into_ctx(self.as_num(), buf, ctx)
}
}
#[repr(u8)]
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum VectorSubType {
Primitive = 0x00,
STRING = 0x01,
UTF8 = 0x02,
FIXEDMAXUTF8 = 0x03, DATETIME = 0x04,
PrimitiveNoMask = 0x05,
REPEATED = 0x06, INT = 0x07, IntNoMask = 0x08,
FixedU64 = 0x10, FixedU32 = 0x11, FixedF32 = 0x12, }
impl VectorSubType {
pub fn as_num(&self) -> u8 { *self as u8 }
}
impl ctx::TryIntoCtx<Endian> for &VectorSubType {
type Error = scroll::Error;
fn try_into_ctx(self, buf: &mut [u8], ctx: Endian) -> Result<usize, Self::Error> {
u8::try_into_ctx(self.as_num(), buf, ctx)
}
}
const NUM_HEADER_BYTES_TOTAL: usize = 16;
const BINARYVECT_HEADER_SIZE: usize = std::mem::size_of::<BinaryVector>();
impl BinaryVector {
pub fn new(major_type: VectorType, minor_type: VectorSubType) -> Self {
Self { num_bytes: NUM_HEADER_BYTES_TOTAL as u32 - 4, major_type, minor_type, _padding: 0 }
}
pub fn whole_length(&self) -> u32 {
self.num_bytes + (mem::size_of::<u32>() as u32)
}
pub fn reset(&mut self) {
self.num_bytes = NUM_HEADER_BYTES_TOTAL as u32 - 4;
}
pub fn write_header(&self, buf: &mut [u8]) -> Result<(), CodingError> {
buf.pwrite_with(self, 0, LE)?;
Ok(())
}
pub fn update_num_bytes(&mut self,
buf: &mut [u8],
num_body_bytes: u32) -> Result<(), CodingError> {
self.num_bytes = num_body_bytes + (NUM_HEADER_BYTES_TOTAL - 4) as u32;
buf.pwrite_with(self.num_bytes, 0, LE)?;
Ok(())
}
}
pub trait BaseSubtypeMapping {
fn vect_subtype() -> VectorSubType;
}
impl BaseSubtypeMapping for u64 {
fn vect_subtype() -> VectorSubType { VectorSubType::FixedU64 }
}
impl BaseSubtypeMapping for u32 {
fn vect_subtype() -> VectorSubType { VectorSubType::FixedU32 }
}
impl BaseSubtypeMapping for f32 {
fn vect_subtype() -> VectorSubType { VectorSubType::FixedF32 }
}
#[derive(Debug, Copy, Clone, Pread, Pwrite)]
pub struct FixedSectStats {
pub num_elements: u32,
num_null_sections: u16,
}
impl FixedSectStats {
pub fn new() -> Self {
Self { num_elements: 0, num_null_sections: 0 }
}
pub fn reset(&mut self) {
self.num_elements = 0;
self.num_null_sections = 0;
}
pub fn update_num_elems(&mut self, buf: &mut [u8], num_elements: u32) -> Result<(), CodingError> {
self.num_elements = num_elements;
buf.pwrite_with(*self, BINARYVECT_HEADER_SIZE, LE)?;
Ok(())
}
}
const GROW_BYTES: usize = 4096;
pub struct VectorAppender<T, W>
where T: VectBase + Clone + PartialOrd,
W: FixedSectionWriter<T> {
vect_buf: Vec<u8>,
offset: usize,
header: BinaryVector,
write_buf: Vec<T>,
stats: FixedSectStats,
sect_writer: PhantomData<W> }
impl<T, W> VectorAppender<T, W>
where T: VectBase + Clone + PartialOrd + BaseSubtypeMapping,
W: FixedSectionWriter<T> {
pub fn try_new(initial_capacity: usize) -> Result<Self, CodingError> {
let mut new_self = Self {
vect_buf: vec![0; initial_capacity],
offset: NUM_HEADER_BYTES_TOTAL,
header: BinaryVector::new(VectorType::FixedSection256, T::vect_subtype()),
write_buf: Vec::with_capacity(FIXED_LEN),
stats: FixedSectStats::new(),
sect_writer: PhantomData
};
new_self.write_header()?;
Ok(new_self)
}
pub fn encode_all<C>(&mut self, collection: C) -> Result<Vec<u8>, CodingError>
where C: IntoIterator<Item = T> {
let mut count = 0;
for x in collection.into_iter() {
count += 1;
self.append(x)?;
};
self.finish(count)
}
pub fn num_elements(&self) -> usize {
self.stats.num_elements as usize + self.write_buf.len()
}
pub fn reset(&mut self) -> Result<(), CodingError> {
self.header.reset();
self.offset = NUM_HEADER_BYTES_TOTAL;
self.write_buf.clear();
self.vect_buf.resize(self.vect_buf.capacity(), 0); self.stats.reset();
self.stats.update_num_elems(&mut self.vect_buf, 0)?;
self.write_header()
}
fn write_header(&mut self) -> Result<(), CodingError> {
self.header.write_header(self.vect_buf.as_mut_slice())
}
fn encode_section(&mut self) -> Result<(), CodingError> {
assert!(self.write_buf.len() == FIXED_LEN);
self.offset = self.retry_grow(|s| W::gen_stats_and_write(s.vect_buf.as_mut_slice(),
s.offset,
&s.write_buf[..]))?;
self.write_buf.clear();
self.stats.update_num_elems(&mut self.vect_buf, self.stats.num_elements + FIXED_LEN as u32)?;
self.header.update_num_bytes(self.vect_buf.as_mut_slice(),
(self.offset - NUM_HEADER_BYTES_TOTAL) as u32)
}
fn retry_grow<F, U>(&mut self, mut func: F) -> Result<U, CodingError>
where F: FnMut(&mut Self) -> Result<U, CodingError> {
func(self).or_else(|err| {
match err {
CodingError::NotEnoughSpace | CodingError::BadOffset(_) => {
self.vect_buf.reserve(GROW_BYTES);
self.vect_buf.resize(self.vect_buf.capacity(), 0);
func(self)
}
_ => Err(err),
}
})
}
pub fn append(&mut self, value: T) -> Result<(), CodingError> {
self.write_buf.push(value);
if self.write_buf.len() >= FIXED_LEN {
self.encode_section()
} else {
Ok(())
}
}
pub fn append_nulls(&mut self, num_nulls: usize) -> Result<(), CodingError> {
let mut left = num_nulls;
while left > 0 {
if self.write_buf.len() > 0 {
let num_to_fill = left.min(FIXED_LEN - self.write_buf.len());
self.write_buf.resize(self.write_buf.len() + num_to_fill as usize, T::zero());
left -= num_to_fill;
if self.write_buf.len() >= FIXED_LEN { self.encode_section()?; }
} else if left >= FIXED_LEN {
self.offset = self.retry_grow(|s| NullFixedSect::write(s.vect_buf.as_mut_slice(), s.offset))?;
self.stats.num_null_sections += 1;
self.stats.update_num_elems(&mut self.vect_buf, self.stats.num_elements + FIXED_LEN as u32)?;
self.header.update_num_bytes(self.vect_buf.as_mut_slice(),
(self.offset - NUM_HEADER_BYTES_TOTAL) as u32)?;
left -= FIXED_LEN;
} else {
self.write_buf.resize(left as usize, T::zero());
left = 0;
}
}
Ok(())
}
pub fn finish(&mut self, total_num_rows: usize) -> Result<Vec<u8>, CodingError> {
let total_so_far = self.stats.num_elements as usize + self.write_buf.len();
if total_so_far > total_num_rows { return Err(CodingError::InvalidNumRows(total_num_rows, total_so_far)); }
if total_num_rows > u32::max_value() as usize {
return Err(CodingError::InvalidNumRows(total_num_rows, u32::max_value() as usize));
}
if self.write_buf.len() > 0 {
let number_to_fill = FIXED_LEN - self.write_buf.len();
self.append_nulls(number_to_fill)?;
}
while self.stats.num_elements < total_num_rows as u32 {
self.append_nulls(256)?;
}
self.stats.update_num_elems(self.vect_buf.as_mut_slice(), total_num_rows as u32)?;
self.vect_buf.as_mut_slice().pwrite_with(&self.stats, BINARYVECT_HEADER_SIZE, LE)?;
self.vect_buf.resize(self.offset, 0);
let mut returned_vec = Vec::with_capacity(self.offset);
returned_vec.append(&mut self.vect_buf);
self.reset()?;
Ok(returned_vec)
}
pub fn reader(&self) -> VectorReader<T> {
VectorReader::try_new(&self.vect_buf[..self.offset]).expect("Getting reader from appender failed")
}
}
pub type VectorU64Appender = VectorAppender<u64, AutoEncoder>;
pub type VectorU32Appender = VectorAppender<u32, AutoEncoder>;
pub type VectorF32XorAppender = VectorAppender<f32, XorNPMedFixedSect<'static>>;
pub struct VectorReader<'buf, T: VectBase> {
vect_bytes: &'buf [u8],
_reader: PhantomData<T>,
}
impl<'buf, T> VectorReader<'buf, T>
where T: VectBase + BaseSubtypeMapping {
pub fn try_new(vect_bytes: &'buf [u8]) -> Result<Self, CodingError> {
let bytes_from_header: u32 = vect_bytes.pread_with(0, LE)?;
let subtype: u8 = vect_bytes.pread_with(offset_of!(BinaryVector, minor_type), LE)?;
if vect_bytes.len() < (bytes_from_header + 4) as usize {
Err(CodingError::InputTooShort)
} else if subtype != T::vect_subtype() as u8 {
Err(CodingError::WrongVectorType(subtype))
} else {
Ok(Self { vect_bytes, _reader: PhantomData })
}
}
pub fn num_elements(&self) -> usize {
self.get_stats().num_elements as usize
}
pub fn total_bytes(&self) -> usize {
self.vect_bytes.len()
}
pub fn num_null_sections(&self) -> Result<usize, CodingError> {
let mut count = 0;
for sect_res in self.sect_iter() {
let sect = sect_res?;
if sect.is_null() { count += 1 }
}
Ok(count)
}
pub fn get_stats(&self) -> FixedSectStats {
self.vect_bytes.pread_with(BINARYVECT_HEADER_SIZE, LE).unwrap()
}
pub fn sect_iter(&self) -> FixedSectIterator<'buf, T> {
FixedSectIterator::new(&self.vect_bytes[NUM_HEADER_BYTES_TOTAL..])
}
pub fn filter_iter<F: SectFilterSink<T>>(&self, f: F) -> VectorFilter<'buf, F, T> {
VectorFilter::new(&self.vect_bytes[NUM_HEADER_BYTES_TOTAL..], f)
}
pub fn iterate(&self) -> VectorItemIter<'buf, T> {
VectorItemIter::new(self.sect_iter(), self.num_elements())
}
pub fn decode_to_sink<Output>(&self, output: &mut Output) -> Result<(), CodingError>
where Output: Sink<T::SI> {
for sect in self.sect_iter() {
sect?.decode(output)?;
}
Ok(())
}
}
#[derive(Debug)]
pub struct VectorStats {
num_bytes: usize,
bytes_per_elem: f32,
stats: FixedSectStats,
sect_types: Vec<SectionType>,
}
impl VectorStats {
pub fn new<'buf, T: VectBase + BaseSubtypeMapping>(reader: &VectorReader<'buf, T>) -> Self {
let stats = reader.get_stats();
Self {
num_bytes: reader.total_bytes(),
bytes_per_elem: reader.total_bytes() as f32 / stats.num_elements as f32,
stats,
sect_types: reader.sect_iter().map(|sect| sect.unwrap().sect_type()).collect(),
}
}
pub fn sect_types_histogram(&self) -> HashMap<SectionType, usize> {
let mut map = HashMap::new();
self.sect_types.iter().for_each(|§_type| {
let count = map.entry(sect_type).or_insert(0);
*count += 1;
});
map
}
pub fn summary_string(&self) -> String {
let keyvalues: Vec<_> = self.sect_types_histogram().iter()
.map(|(k, v)| format!("{:?}={:?}", k, v)).collect();
format!("#bytes={:?} #elems={:?} bytes-per-elem={:?}\nsection type hist: {}",
self.num_bytes, self.stats.num_elements, self.bytes_per_elem,
keyvalues.join(", "))
}
}
pub struct VectorItemIter<'buf, T: VectBase> {
sect_iter: FixedSectIterator<'buf, T>,
sink: Section256Sink<T>,
num_elems: usize,
i: usize,
}
impl<'buf, T: VectBase> VectorItemIter<'buf, T> {
pub fn new(sect_iter: FixedSectIterator<'buf, T>, num_elems: usize) -> Self {
let mut s = Self {
sect_iter,
sink: Section256Sink::<T>::new(),
num_elems,
i: 0,
};
if num_elems > 0 {
s.next_section();
}
s
}
fn next_section(&mut self) {
self.sink.reset();
if let Some(Ok(next_sect)) = self.sect_iter.next() {
next_sect.decode(&mut self.sink).expect("Unexpected end of section");
}
}
}
impl<'buf, T: VectBase> Iterator for VectorItemIter<'buf, T> {
type Item = T;
fn next(&mut self) -> Option<T> {
if self.i < self.num_elems {
let thing = self.sink.values[self.i % FIXED_LEN];
self.i += 1;
if self.i % FIXED_LEN == 0 && self.i < self.num_elems {
self.next_section();
}
Some(thing)
} else {
None
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::filter::{EqualsSink, count_hits};
#[test]
fn test_append_u64_nonulls() {
assert!(std::mem::size_of::<FixedSectStats>() + BINARYVECT_HEADER_SIZE <= NUM_HEADER_BYTES_TOTAL);
let num_values: usize = 500;
let data: Vec<u64> = (0..num_values as u64).collect();
let mut appender = VectorU64Appender::try_new(1024).unwrap();
{
let reader = appender.reader();
assert_eq!(reader.num_elements(), 0);
assert_eq!(reader.sect_iter().count(), 0);
}
data.iter().for_each(|&e| appender.append(e).unwrap());
let reader = appender.reader();
assert_eq!(reader.num_elements(), 256);
assert_eq!(reader.sect_iter().count(), 1);
let finished_vec = appender.finish(num_values).unwrap();
let reader = VectorReader::try_new(&finished_vec[..]).unwrap();
assert_eq!(reader.num_elements(), num_values);
assert_eq!(reader.sect_iter().count(), 2);
assert_eq!(reader.num_null_sections().unwrap(), 0);
let elems: Vec<u64> = reader.iterate().collect();
assert_eq!(elems, data);
}
#[test]
fn test_append_u64_mixed_nulls() {
let data1: Vec<u64> = (0..100).collect();
let num_nulls = (256 - data1.len()) + 256 + 50;
let data2: Vec<u64> = (0..50).collect();
let total_elems = data1.len() + data2.len() + num_nulls;
let mut all_data = Vec::<u64>::with_capacity(total_elems);
all_data.extend_from_slice(&data1[..]);
(0..num_nulls).for_each(|_i| all_data.push(0));
all_data.extend_from_slice(&data2[..]);
let mut appender = VectorU64Appender::try_new(1024).unwrap();
data1.iter().for_each(|&e| appender.append(e).unwrap());
appender.append_nulls(num_nulls).unwrap();
data2.iter().for_each(|&e| appender.append(e).unwrap());
let finished_vec = appender.finish(total_elems).unwrap();
let reader = VectorReader::try_new(&finished_vec[..]).unwrap();
assert_eq!(reader.num_elements(), total_elems);
assert_eq!(reader.sect_iter().count(), 3);
assert_eq!(reader.num_null_sections().unwrap(), 1);
assert_eq!(reader.get_stats().num_null_sections, 1);
let elems: Vec<u64> = reader.iterate().collect();
assert_eq!(elems, all_data);
}
#[test]
fn test_append_u64_mixed_nulls_grow() {
let data1: Vec<u64> = (0..300).collect();
let num_nulls = 350;
let total_elems = (data1.len() + num_nulls) * 2;
let mut all_data = Vec::<u64>::with_capacity(total_elems);
all_data.extend_from_slice(&data1[..]);
(0..num_nulls).for_each(|_i| all_data.push(0));
all_data.extend_from_slice(&data1[..]);
(0..num_nulls).for_each(|_i| all_data.push(0));
let mut appender = VectorU64Appender::try_new(300).unwrap();
data1.iter().for_each(|&e| appender.append(e).unwrap());
appender.append_nulls(num_nulls).unwrap();
data1.iter().for_each(|&e| appender.append(e).unwrap());
appender.append_nulls(num_nulls).unwrap();
let finished_vec = appender.finish(total_elems).unwrap();
let reader = VectorReader::try_new(&finished_vec[..]).unwrap();
println!("summary: {}", VectorStats::new(&reader).summary_string());
assert_eq!(reader.num_elements(), total_elems);
assert_eq!(reader.sect_iter().count(), 6);
assert_eq!(reader.num_null_sections().unwrap(), 2);
let elems: Vec<u64> = reader.iterate().collect();
assert_eq!(elems, all_data);
}
#[test]
fn test_append_u32_and_filter() {
let vector_size = 400;
let mut appender = VectorU32Appender::try_new(1024).unwrap();
for i in 0..vector_size {
appender.append((i % 4) + 1).unwrap();
}
let finished_vec = appender.finish(vector_size as usize).unwrap();
let reader = VectorReader::<u32>::try_new(&finished_vec[..]).unwrap();
assert_eq!(reader.num_elements(), vector_size as usize);
assert_eq!(reader.sect_iter().count(), 2);
let filter_iter = reader.filter_iter(EqualsSink::<u32>::new(&3));
let count = count_hits(filter_iter) as u32;
assert_eq!(count, vector_size / 4);
let nonnulls = 300;
let total_elems = nonnulls * 2 + 400;
for i in 0..nonnulls {
appender.append((i % 4) + 1).unwrap();
}
appender.append_nulls(400).unwrap();
for i in 0..nonnulls {
appender.append((i % 4) + 1).unwrap();
}
let finished_vec = appender.finish(total_elems as usize).unwrap();
let reader = VectorReader::<u32>::try_new(&finished_vec[..]).unwrap();
assert_eq!(reader.num_elements(), total_elems as usize);
let filter_iter = reader.filter_iter(EqualsSink::<u32>::new(&3));
let count = count_hits(filter_iter) as u32;
assert_eq!(count, nonnulls * 2 / 4);
let mut sink = VecSink::<u32>::new();
reader.decode_to_sink(&mut sink).unwrap();
let it_data: Vec<u32> = reader.iterate().collect();
assert_eq!(sink.vec[..total_elems as usize], it_data[..]);
}
#[test]
fn test_append_u32_large_vector() {
let mut appender = VectorU32Appender::try_new(4096).unwrap();
let vector_size = 100000;
for _ in 0..10 {
appender.append_nulls(9999).unwrap();
appender.append(2).unwrap();
}
assert_eq!(appender.num_elements(), vector_size);
let finished_vec = appender.finish(vector_size).unwrap();
let reader = VectorReader::<u32>::try_new(&finished_vec[..]).unwrap();
assert_eq!(reader.num_elements(), vector_size as usize);
}
#[test]
fn test_read_wrong_type_error() {
let vector_size = 400;
let mut appender = VectorU32Appender::try_new(1024).unwrap();
for i in 0..vector_size {
appender.append((i % 4) + 1).unwrap();
}
let finished_vec = appender.finish(vector_size as usize).unwrap();
let res = VectorReader::<u64>::try_new(&finished_vec[..]);
assert_eq!(res.err().unwrap(), CodingError::WrongVectorType(VectorSubType::FixedU32 as u8))
}
#[test]
fn test_append_f32_decode() {
let mut appender = VectorF32XorAppender::try_new(2048).unwrap();
let vector_size = 280;
let data: Vec<f32> = (0..vector_size).map(|x| x as f32 / 2.8).collect();
let finished_vec = appender.encode_all(data.clone()).unwrap();
let reader = VectorReader::<f32>::try_new(&finished_vec[..]).unwrap();
assert_eq!(reader.num_elements(), vector_size);
let mut sink = VecSink::<f32>::new();
reader.decode_to_sink(&mut sink).unwrap();
assert_eq!(sink.vec[..vector_size], data[..]);
}
}