use crate::prelude::*;
use std::cell::RefCell;
use std::thread_local;
use std::vec::IntoIter;
thread_local! {
static IN_RLE_ENCODE: RefCell<bool> = RefCell::new(false);
}
pub(crate) fn within_rle<T>(f: impl FnOnce() -> Result<T, ()>) -> Result<T, ()> {
if IN_RLE_ENCODE.with(|v| *v.borrow()) {
Err(())
} else {
IN_RLE_ENCODE.with(|v| *v.borrow_mut() = true);
let result = f();
IN_RLE_ENCODE.with(|v| *v.borrow_mut() = false);
result
}
}
pub struct RleIterator<T> {
runs: IntoIter<u64>,
values: IntoIter<T>,
current_run: Option<u64>,
current_value: Option<T>,
}
impl<T: Clone> Iterator for RleIterator<T> {
type Item = T;
fn next(&mut self) -> Option<T> {
match self.current_run {
None => {
self.current_run = Some(self.runs.next().unwrap_or_default());
self.current_value = self.values.next();
self.next()
}
Some(run) => match run {
0 => {
self.current_run = None;
self.current_value.take()
}
_ => {
self.current_run = Some(run - 1);
self.current_value.clone()
}
},
}
}
}
impl<T: Send + Clone> RleIterator<T> {
pub fn new(
runs: Box<DynArrayBranch<'_>>,
values: Box<DynArrayBranch<'_>>,
options: &impl DecodeOptions,
f: impl Send + FnOnce(DynArrayBranch<'_>) -> DecodeResult<IntoIter<T>>,
) -> DecodeResult<Self> {
let (runs, values) = parallel(|| <u64 as Decodable>::DecoderArray::new(*runs, options), || f(*values), options);
let runs = runs?;
let values = values?;
Ok(Self {
current_run: None,
current_value: None,
runs,
values,
})
}
}
pub(crate) struct RLE<S> {
sub_compressors: S,
}
impl<S> RLE<S> {
pub fn new(sub_compressors: S) -> Self {
Self { sub_compressors }
}
}
fn get_runs<T: PartialEq + Copy>(data: &[T]) -> Result<(Vec<u64>, Vec<T>), ()> {
if data.len() < 2 {
return Err(());
}
profile_fn!(rle_get_runs);
let mut runs = Vec::new();
let mut current_run = 0u64;
let mut current_value = data[0];
let mut values = vec![];
for item in data[1..].iter() {
if current_value == *item {
current_run += 1;
} else {
runs.push(current_run);
values.push(current_value);
current_value = *item;
current_run = 0;
}
}
runs.push(current_run);
values.push(current_value);
debug_assert!(runs.len() == values.len());
if values.len() == data.len() {
Err(())
} else {
Ok((runs, values))
}
}
impl<T: PartialEq + Copy + std::fmt::Debug, S: CompressorSet<T>> Compressor<T> for RLE<S> {
fn compress<O: EncodeOptions>(&self, data: &[T], stream: &mut EncoderStream<'_, O>) -> Result<ArrayTypeId, ()> {
profile_method!(compress);
within_rle(|| {
let (runs, values) = get_runs(data)?;
stream.encode_with_id(|stream| compress(&values[..], stream, &self.sub_compressors));
stream.encode_with_id(|stream| runs.flush(stream));
Ok(ArrayTypeId::RLE)
})
}
fn fast_size_for<O: EncodeOptions>(&self, data: &[T], options: &O) -> Result<usize, ()> {
profile_method!(fast_size_for);
within_rle(|| {
let (runs, values) = get_runs(data)?;
let from_values = fast_size_for(&values[..], &self.sub_compressors, options);
let from_runs = Vec::<u64>::fast_size_for_all(&runs[..], options);
let from_ids = 2;
Ok(from_ids + from_runs + from_values)
})
}
}