pub mod relabel;
pub mod translate;
use crate::utils::*;
use serde_json::Value;
use std::io::{self, BufRead, Read, Result, Write};
use xz2::stream::MtStreamBuilder;
use xz2::write::XzEncoder;
use self::translate::ben_to_ben32_lines;
use super::{log, logln, BenVariant};
pub struct BenEncoder<W: Write> {
writer: W,
previous_sample: Vec<u8>,
count: u16,
variant: BenVariant,
complete: bool,
}
impl<W: Write> BenEncoder<W> {
pub fn new(mut writer: W, variant: BenVariant) -> Self {
match variant {
BenVariant::Standard => {
writer.write_all(b"STANDARD BEN FILE").unwrap();
}
BenVariant::MkvChain => {
writer.write_all(b"MKVCHAIN BEN FILE").unwrap();
}
}
BenEncoder {
writer,
previous_sample: Vec::new(),
count: 0,
complete: false,
variant: variant,
}
}
pub fn write_rle(&mut self, rle_vec: Vec<(u16, u16)>) -> Result<()> {
match self.variant {
BenVariant::Standard => {
let encoded = encode_ben_vec_from_rle(rle_vec);
self.writer.write_all(&encoded)?;
Ok(())
}
BenVariant::MkvChain => {
let encoded = encode_ben_vec_from_rle(rle_vec);
if encoded == self.previous_sample {
self.count += 1;
} else {
if self.count > 0 {
self.writer.write_all(&self.previous_sample)?;
self.writer.write_all(&self.count.to_be_bytes())?;
}
self.previous_sample = encoded;
self.count = 1;
}
Ok(())
}
}
}
pub fn write_assignment(&mut self, assign_vec: Vec<u16>) -> Result<()> {
let rle_vec = assign_to_rle(assign_vec);
self.write_rle(rle_vec)?;
Ok(())
}
pub fn write_json_value(&mut self, data: Value) -> Result<()> {
let assign_vec = data["assignment"].as_array().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
"'assignment' field either missing or is not an array of integers",
)
})?;
let converted_vec = assign_vec
.into_iter()
.map(|x| {
let u = x.as_u64().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
format!(
"The value '{}' could not be unwrapped as an unsigned 64 bit integer.",
x
),
)
})?;
u16::try_from(u).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("The value '{}' is too large to fit in a u16.", u),
)
})
})
.collect::<Result<Vec<u16>>>()?;
let rle_vec = assign_to_rle(converted_vec);
self.write_rle(rle_vec)?;
Ok(())
}
pub fn finish(&mut self) -> Result<()> {
if self.complete {
return Ok(());
}
if self.variant == BenVariant::MkvChain && self.count > 0 {
self.writer
.write_all(&self.previous_sample)
.expect("Error while writing last line to file");
self.writer
.write_all(&self.count.to_be_bytes())
.expect("Error while writing last count to file");
}
self.complete = true;
Ok(())
}
}
impl<W: Write> Drop for BenEncoder<W> {
fn drop(&mut self) {
let _ = self.finish();
}
}
pub struct XBenEncoder<W: Write> {
encoder: XzEncoder<W>,
previous_sample: Vec<u8>,
count: u16,
variant: BenVariant,
}
impl<W: Write> XBenEncoder<W> {
pub fn new(mut encoder: XzEncoder<W>, variant: BenVariant) -> Self {
match variant {
BenVariant::Standard => {
encoder.write_all(b"STANDARD BEN FILE").unwrap();
XBenEncoder {
encoder,
previous_sample: Vec::new(),
count: 0,
variant: BenVariant::Standard,
}
}
BenVariant::MkvChain => {
encoder.write_all(b"MKVCHAIN BEN FILE").unwrap();
XBenEncoder {
encoder,
previous_sample: Vec::new(),
count: 0,
variant: BenVariant::MkvChain,
}
}
}
}
pub fn write_json_value(&mut self, data: Value) -> Result<()> {
let encoded = encode_ben32_line(data);
match self.variant {
BenVariant::Standard => {
self.encoder.write_all(&encoded)?;
}
BenVariant::MkvChain => {
if encoded == self.previous_sample {
self.count += 1;
} else {
if self.count > 0 {
self.encoder.write_all(&self.previous_sample)?;
self.encoder.write_all(&self.count.to_be_bytes())?;
}
self.previous_sample = encoded;
self.count = 1;
}
}
}
Ok(())
}
pub fn write_ben_file(&mut self, mut reader: impl BufRead) -> Result<()> {
let peek = reader.fill_buf()?;
let has_banner = peek.len() >= 17
&& (peek.starts_with(b"STANDARD BEN FILE") || peek.starts_with(b"MKVCHAIN BEN FILE"));
if has_banner {
reader.consume(17);
}
ben_to_ben32_lines(&mut reader, &mut self.encoder, self.variant)
}
}
impl<W: Write> Drop for XBenEncoder<W> {
fn drop(&mut self) {
if self.variant == BenVariant::MkvChain && self.count > 0 {
self.encoder
.write_all(&self.previous_sample)
.expect("Error writing last line to file");
self.encoder
.write_all(&self.count.to_be_bytes())
.expect("Error writing last line count to file");
}
}
}
fn encode_ben32_line(data: Value) -> Vec<u8> {
let assign_vec = data["assignment"].as_array().unwrap();
let mut prev_assign: u16 = 0;
let mut count: u16 = 0;
let mut first = true;
let mut ret = Vec::new();
for assignment in assign_vec {
let assign = assignment.as_u64().unwrap() as u16;
if first {
prev_assign = assign;
count = 1;
first = false;
continue;
}
if assign == prev_assign {
count += 1;
} else {
let encoded = (prev_assign as u32) << 16 | count as u32;
ret.extend(&encoded.to_be_bytes());
prev_assign = assign;
count = 1;
}
}
if count > 0 {
let encoded = (prev_assign as u32) << 16 | count as u32;
ret.extend(&encoded.to_be_bytes());
}
ret.extend([0, 0, 0, 0]);
ret
}
pub fn encode_jsonl_to_xben<R: BufRead, W: Write>(
reader: R,
writer: W,
variant: BenVariant,
n_threads: Option<u32>,
compression_level: Option<u32>,
) -> Result<()> {
let mut n_cpus: u32 = n_threads.unwrap_or(1);
n_cpus = n_cpus
.min(
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1) as u32,
)
.max(1);
let level = compression_level.unwrap_or(9).min(9).max(0);
let mt = MtStreamBuilder::new()
.threads(n_cpus)
.preset(level)
.block_size(0)
.encoder()
.expect("init MT encoder");
let encoder = XzEncoder::new_stream(writer, mt);
let mut ben_encoder = XBenEncoder::new(encoder, variant);
let mut line_num = 1;
for line_result in reader.lines() {
log!("Encoding line: {}\r", line_num);
line_num += 1;
let line = line_result?;
let data: Value = serde_json::from_str(&line).expect("Error parsing JSON from line");
ben_encoder.write_json_value(data)?;
}
logln!();
logln!("Done!");
Ok(())
}
pub fn xz_compress<R: BufRead, W: Write>(
mut reader: R,
writer: W,
n_threads: Option<u32>,
compression_level: Option<u32>,
) -> Result<()> {
let mut buff = [0; 4096];
let mut n_cpus: u32 = n_threads.unwrap_or(1);
n_cpus = n_cpus
.min(
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1) as u32,
)
.max(1);
let level = compression_level.unwrap_or(9).min(9).max(0);
let mt = MtStreamBuilder::new()
.threads(n_cpus)
.preset(level)
.block_size(0)
.encoder()
.expect("init MT encoder");
let mut encoder = XzEncoder::new_stream(writer, mt);
while let Ok(count) = reader.read(&mut buff) {
if count == 0 {
break;
}
encoder.write_all(&buff[..count])?;
}
drop(encoder); Ok(())
}
pub fn encode_ben_vec_from_assign(assign_vec: Vec<u16>) -> Vec<u8> {
let rle_vec: Vec<(u16, u16)> = assign_to_rle(assign_vec);
encode_ben_vec_from_rle(rle_vec)
}
pub fn encode_ben_vec_from_rle(rle_vec: Vec<(u16, u16)>) -> Vec<u8> {
let mut output_vec: Vec<u8> = Vec::new();
let max_val: u16 = rle_vec.iter().max_by_key(|x| x.0).unwrap().0;
let max_len: u16 = rle_vec.iter().max_by_key(|x| x.1).unwrap().1;
let max_val_bits: u8 = (16 - max_val.leading_zeros() as u8).max(1);
let max_len_bits: u8 = 16 - max_len.leading_zeros() as u8;
let assign_bits: u32 = (max_val_bits + max_len_bits) as u32;
let n_bytes: u32 = if (assign_bits * rle_vec.len() as u32) % 8 == 0 {
(assign_bits * rle_vec.len() as u32) / 8
} else {
(assign_bits * rle_vec.len() as u32) / 8 + 1
};
output_vec.push(max_val_bits);
output_vec.push(max_len_bits);
output_vec.extend(n_bytes.to_be_bytes().as_slice());
let mut remainder: u32 = 0;
let mut remainder_bits: u8 = 0;
for (val, len) in rle_vec {
let mut new_val: u32 = (remainder << max_val_bits) | (val as u32);
let mut buff: u8;
let mut n_bits_left: u8 = remainder_bits + max_val_bits;
while n_bits_left >= 8 {
n_bits_left -= 8;
buff = (new_val >> n_bits_left) as u8;
output_vec.push(buff);
new_val = new_val & (!((0xFFFFFFFF as u32) << n_bits_left));
}
new_val = (new_val << max_len_bits) | (len as u32);
n_bits_left += max_len_bits;
while n_bits_left >= 8 {
n_bits_left -= 8;
buff = (new_val >> n_bits_left) as u8;
output_vec.push(buff);
new_val = new_val & (!((0xFFFFFFFF as u32) << n_bits_left));
}
remainder_bits = n_bits_left;
remainder = new_val;
}
if remainder_bits > 0 {
let buff = (remainder << (8 - remainder_bits)) as u8;
output_vec.push(buff);
}
output_vec
}
pub fn encode_jsonl_to_ben<R: BufRead, W: Write>(
reader: R,
writer: W,
variant: BenVariant,
) -> Result<()> {
let mut line_num = 1;
let mut ben_encoder = BenEncoder::new(writer, variant);
for line_result in reader.lines() {
log!("Encoding line: {}\r", line_num);
line_num += 1;
let line = line_result?; let data: Value = serde_json::from_str(&line).expect("Error parsing JSON from line");
ben_encoder.write_json_value(data)?;
}
logln!();
logln!("Done!"); Ok(())
}
pub fn encode_ben_to_xben<R: BufRead, W: Write>(
mut reader: R,
writer: W,
n_threads: Option<u32>,
compression_level: Option<u32>,
) -> Result<()> {
let mut check_buffer = [0u8; 17];
reader.read_exact(&mut check_buffer)?;
let mut n_cpus: u32 = n_threads.unwrap_or(1);
n_cpus = n_cpus
.min(
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1) as u32,
)
.max(1);
let level = compression_level.unwrap_or(9).min(9).max(0);
let mt = MtStreamBuilder::new()
.threads(n_cpus)
.preset(level)
.block_size(0)
.encoder()
.expect("init MT encoder");
let encoder = XzEncoder::new_stream(writer, mt);
let mut ben_encoder = match &check_buffer {
b"STANDARD BEN FILE" => XBenEncoder::new(encoder, BenVariant::Standard),
b"MKVCHAIN BEN FILE" => XBenEncoder::new(encoder, BenVariant::MkvChain),
_ => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid file format",
));
}
};
ben_encoder.write_ben_file(reader)?;
Ok(())
}
#[cfg(test)]
#[path = "tests/encode_tests.rs"]
mod tests;