use std::io::Read;
use bytes::{BufMut, BytesMut};
use snafu::OptionExt;
use crate::{
encoding::{
integer::{
rle_v2::{EncodingType, MAX_RUN_LENGTH},
util::{
extract_run_length_from_header, read_ints, read_varint_zigzagged,
rle_v2_decode_bit_width, rle_v2_encode_bit_width, write_aligned_packed_ints,
write_varint_zigzagged,
},
EncodingSign, SignedEncoding, VarintSerde,
},
util::read_u8,
},
error::{OrcError, OutOfSpecSnafu, Result},
};
use super::NInt;
pub fn read_delta_values<N: NInt, R: Read, S: EncodingSign>(
reader: &mut R,
out_ints: &mut Vec<N>,
deltas: &mut Vec<i64>,
header: u8,
) -> Result<()> {
let encoded_delta_bit_width = (header >> 1) & 0x1f;
let delta_bit_width = if encoded_delta_bit_width == 0 {
encoded_delta_bit_width as usize
} else {
rle_v2_decode_bit_width(encoded_delta_bit_width)
};
let second_byte = read_u8(reader)?;
let length = extract_run_length_from_header(header, second_byte);
let base_value = read_varint_zigzagged::<N, _, S>(reader)?;
out_ints.push(base_value);
let delta_base = read_varint_zigzagged::<i64, _, SignedEncoding>(reader)?;
let op: fn(N, i64) -> Option<N> = if delta_base.is_positive() {
|acc, delta| acc.add_i64(delta)
} else {
|acc, delta| acc.sub_i64(delta)
};
let delta_base = delta_base.abs();
if delta_bit_width == 0 {
(1..length).try_fold(base_value, |acc, _| {
let acc = op(acc, delta_base).context(OutOfSpecSnafu {
msg: "over/underflow when decoding delta integer",
})?;
out_ints.push(acc);
Ok::<_, OrcError>(acc)
})?;
} else {
deltas.clear();
let second_value = op(base_value, delta_base).context(OutOfSpecSnafu {
msg: "over/underflow when decoding delta integer",
})?;
out_ints.push(second_value);
let length = length - 2;
read_ints(deltas, length, delta_bit_width, reader)?;
let mut acc = second_value;
for delta in deltas {
acc = op(acc, *delta).context(OutOfSpecSnafu {
msg: "over/underflow when decoding delta integer",
})?;
out_ints.push(acc);
}
}
Ok(())
}
pub fn write_varying_delta<N: NInt, S: EncodingSign>(
writer: &mut BytesMut,
base_value: N,
first_delta: i64,
max_delta: i64,
subsequent_deltas: &[i64],
) {
debug_assert!(
max_delta > 0,
"varying deltas must have at least one non-zero delta"
);
let bit_width = max_delta.closest_aligned_bit_width();
let bit_width = if bit_width == 1 { 2 } else { bit_width };
let header = derive_delta_header(bit_width, subsequent_deltas.len() + 2);
writer.put_slice(&header);
write_varint_zigzagged::<_, S>(writer, base_value);
write_varint_zigzagged::<_, SignedEncoding>(writer, first_delta);
write_aligned_packed_ints(writer, bit_width, subsequent_deltas);
}
pub fn write_fixed_delta<N: NInt, S: EncodingSign>(
writer: &mut BytesMut,
base_value: N,
fixed_delta: i64,
subsequent_deltas_len: usize,
) {
let header = derive_delta_header(0, subsequent_deltas_len + 2);
writer.put_slice(&header);
write_varint_zigzagged::<_, S>(writer, base_value);
write_varint_zigzagged::<_, SignedEncoding>(writer, fixed_delta);
}
fn derive_delta_header(delta_width: usize, run_length: usize) -> [u8; 2] {
debug_assert!(
(1..=MAX_RUN_LENGTH).contains(&run_length),
"delta run length cannot exceed 512 values"
);
let run_length = run_length as u16 - 1;
let delta_width = if delta_width == 0 {
0
} else {
rle_v2_encode_bit_width(delta_width)
};
let encoded_length_high_bit = (run_length >> 8) as u8;
let encoded_length_low_bits = (run_length & 0xFF) as u8;
let header1 = EncodingType::Delta.to_header() | (delta_width << 1) | encoded_length_high_bit;
let header2 = encoded_length_low_bits;
[header1, header2]
}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use crate::encoding::integer::UnsignedEncoding;
use super::*;
#[test]
fn test_fixed_delta_positive() {
let mut buf = BytesMut::new();
let mut out = vec![];
let mut deltas = vec![];
write_fixed_delta::<i64, UnsignedEncoding>(&mut buf, 0, 10, 100 - 2);
let header = buf[0];
read_delta_values::<i64, _, UnsignedEncoding>(
&mut Cursor::new(&buf[1..]),
&mut out,
&mut deltas,
header,
)
.unwrap();
let expected = (0..100).map(|i| i * 10).collect::<Vec<i64>>();
assert_eq!(expected, out);
}
#[test]
fn test_fixed_delta_negative() {
let mut buf = BytesMut::new();
let mut out = vec![];
let mut deltas = vec![];
write_fixed_delta::<i64, UnsignedEncoding>(&mut buf, 10_000, -63, 150 - 2);
let header = buf[0];
read_delta_values::<i64, _, UnsignedEncoding>(
&mut Cursor::new(&buf[1..]),
&mut out,
&mut deltas,
header,
)
.unwrap();
let expected = (0..150).map(|i| 10_000 - i * 63).collect::<Vec<i64>>();
assert_eq!(expected, out);
}
#[test]
fn test_varying_delta_positive() {
let deltas = [
1, 6, 98, 12, 65, 9, 0, 0, 1, 128, 643, 129, 469, 123, 4572, 124,
];
let max = *deltas.iter().max().unwrap();
let mut buf = BytesMut::new();
let mut out = vec![];
let mut deltas = vec![];
write_varying_delta::<i64, UnsignedEncoding>(&mut buf, 0, 10, max, &deltas);
let header = buf[0];
read_delta_values::<i64, _, UnsignedEncoding>(
&mut Cursor::new(&buf[1..]),
&mut out,
&mut deltas,
header,
)
.unwrap();
let mut expected = vec![0, 10];
let mut i = 1;
for d in deltas {
expected.push(d + expected[i]);
i += 1;
}
assert_eq!(expected, out);
}
#[test]
fn test_varying_delta_negative() {
let deltas = [
1, 6, 98, 12, 65, 9, 0, 0, 1, 128, 643, 129, 469, 123, 4572, 124,
];
let max = *deltas.iter().max().unwrap();
let mut buf = BytesMut::new();
let mut out = vec![];
let mut deltas = vec![];
write_varying_delta::<i64, UnsignedEncoding>(&mut buf, 10_000, -1, max, &deltas);
let header = buf[0];
read_delta_values::<i64, _, UnsignedEncoding>(
&mut Cursor::new(&buf[1..]),
&mut out,
&mut deltas,
header,
)
.unwrap();
let mut expected = vec![10_000, 9_999];
let mut i = 1;
for d in deltas {
expected.push(expected[i] - d);
i += 1;
}
assert_eq!(expected, out);
}
#[test]
fn test_i32_add_sub_i64() {
let v = i32::MIN;
let i = (i32::MAX as i64) * 2;
let add_result = v.add_i64(i);
assert_eq!(add_result, Some(2147483646));
let v = i32::MIN;
let i = -(i32::MAX as i64) * 2;
let sub_result = v.sub_i64(i);
assert_eq!(sub_result, Some(2147483646));
}
#[test]
fn test_i16_add_sub_i64() {
let v = i16::MIN;
let i = (i16::MAX as i64) * 2;
let add_result = v.add_i64(i);
assert_eq!(add_result, Some(32766i16));
let v = i16::MIN;
let i = -(i16::MAX as i64) * 2;
let sub_result = v.sub_i64(i);
assert_eq!(sub_result, Some(32766i16));
}
}