ben/encode/mod.rs
1//! This module contains the main encoding functions for turning an
2//! input JSONL or BEN file into a BEN or XBEN file.
3//!
4//! Any input JSONL file is expected to be in the standard
5//!
6//! ```json
7//! {"assignment": [...], "sample": #}
8//! ```
9//!
10//! format.
11//!
12//! The BEN format is
13//! a simple bit-packed run-length encoded assignment vector with
14//! some special headers that allow the decoder to know how many
15//! bytes to read for each sample.
16//!
17//!
18//! The XBEN format uses LZMA2 dictionary compression on
19//! a byte-level decompressed version of the BEN format (known as ben32)
20//! to achieve better compression ratios than we could achieve with applying
21//! LZMA2 compression directly to the BEN format.
22
23pub mod relabel;
24pub mod translate;
25
26use crate::utils::*;
27use serde_json::Value;
28use std::io::{self, BufRead, Read, Result, Write};
29use xz2::stream::MtStreamBuilder;
30use xz2::write::XzEncoder;
31
32use self::translate::ben_to_ben32_lines;
33use super::{log, logln, BenVariant};
34
35/// A struct to make the writing of BEN files easier
36/// and more ergonomic.
37///
38/// # Example
39///
40/// ```
41/// use ben::{encode::BenEncoder, BenVariant};
42///
43/// let mut buffer = Vec::new();
44/// let mut ben_encoder = BenEncoder::new(&mut buffer, BenVariant::Standard);
45///
46/// ben_encoder.write_assignment(vec![1, 1, 1, 2, 2, 2]);
47/// ```
48pub struct BenEncoder<W: Write> {
49 writer: W,
50 previous_sample: Vec<u8>,
51 count: u16,
52 variant: BenVariant,
53 complete: bool,
54}
55
56impl<W: Write> BenEncoder<W> {
57 /// Create a new BenEncoder instance and handles
58 /// the BEN file header.
59 ///
60 /// # Arguments
61 ///
62 /// * `writer` - A writer to write the BEN file to
63 /// * `variant` - The BEN variant to use (Standard or MkvChain)
64 ///
65 /// # Returns
66 ///
67 /// A new BenEncoder instance
68 pub fn new(mut writer: W, variant: BenVariant) -> Self {
69 match variant {
70 BenVariant::Standard => {
71 writer.write_all(b"STANDARD BEN FILE").unwrap();
72 }
73 BenVariant::MkvChain => {
74 writer.write_all(b"MKVCHAIN BEN FILE").unwrap();
75 }
76 }
77 BenEncoder {
78 writer,
79 previous_sample: Vec::new(),
80 count: 0,
81 complete: false,
82 variant: variant,
83 }
84 }
85
86 /// Write a run-length encoded assignment vector to the
87 /// BEN file.
88 ///
89 /// # Arguments
90 ///
91 /// * `rle_vec` - A run-length encoded assignment vector to write
92 ///
93 /// # Returns
94 ///
95 /// A Result type that contains the result of the operation
96 pub fn write_rle(&mut self, rle_vec: Vec<(u16, u16)>) -> Result<()> {
97 match self.variant {
98 BenVariant::Standard => {
99 let encoded = encode_ben_vec_from_rle(rle_vec);
100 self.writer.write_all(&encoded)?;
101 Ok(())
102 }
103 BenVariant::MkvChain => {
104 let encoded = encode_ben_vec_from_rle(rle_vec);
105 if encoded == self.previous_sample {
106 self.count += 1;
107 } else {
108 if self.count > 0 {
109 self.writer.write_all(&self.previous_sample)?;
110 self.writer.write_all(&self.count.to_be_bytes())?;
111 }
112 self.previous_sample = encoded;
113 self.count = 1;
114 }
115 Ok(())
116 }
117 }
118 }
119
120 /// Write an assignment vector to the BEN file.
121 ///
122 /// # Arguments
123 ///
124 /// * `assign_vec` - An assignment vector to write
125 ///
126 /// # Returns
127 ///
128 /// A Result type that contains the result of the operation
129 pub fn write_assignment(&mut self, assign_vec: Vec<u16>) -> Result<()> {
130 let rle_vec = assign_to_rle(assign_vec);
131 self.write_rle(rle_vec)?;
132 Ok(())
133 }
134
135 /// Write a JSON value containing an assignment vector to the BEN file.
136 ///
137 /// # Arguments
138 ///
139 /// * `data` - A JSON value containing an assignment vector
140 ///
141 /// # Returns
142 ///
143 /// A Result type that contains the result of the operation
144 pub fn write_json_value(&mut self, data: Value) -> Result<()> {
145 let assign_vec = data["assignment"].as_array().ok_or_else(|| {
146 io::Error::new(
147 io::ErrorKind::InvalidData,
148 "'assignment' field either missing or is not an array of integers",
149 )
150 })?;
151 let converted_vec = assign_vec
152 .into_iter()
153 .map(|x| {
154 let u = x.as_u64().ok_or_else(|| {
155 io::Error::new(
156 io::ErrorKind::InvalidData,
157 format!(
158 "The value '{}' could not be unwrapped as an unsigned 64 bit integer.",
159 x
160 ),
161 )
162 })?;
163
164 u16::try_from(u).map_err(|_| {
165 io::Error::new(
166 io::ErrorKind::InvalidData,
167 format!("The value '{}' is too large to fit in a u16.", u),
168 )
169 })
170 })
171 .collect::<Result<Vec<u16>>>()?;
172
173 let rle_vec = assign_to_rle(converted_vec);
174 self.write_rle(rle_vec)?;
175 Ok(())
176 }
177
178 /// Cleanup function to make sure the last sample is written
179 /// to the BEN file if using the MkvChain variant.
180 ///
181 /// This function is automatically called when the BenEncoder
182 /// goes out of scope, but can be called manually if desired.
183 ///
184 /// # Returns
185 ///
186 /// A Result type that contains the result of the operation
187 ///
188 /// # Errors
189 ///
190 /// This function will return an error if the writer encounters
191 /// an error while writing the last sample to the BEN file.
192 pub fn finish(&mut self) -> Result<()> {
193 if self.complete {
194 return Ok(());
195 }
196 if self.variant == BenVariant::MkvChain && self.count > 0 {
197 self.writer
198 .write_all(&self.previous_sample)
199 .expect("Error while writing last line to file");
200 self.writer
201 .write_all(&self.count.to_be_bytes())
202 .expect("Error while writing last count to file");
203 }
204 self.complete = true;
205 Ok(())
206 }
207}
208
209impl<W: Write> Drop for BenEncoder<W> {
210 /// Make sure to finish writing the BEN file when the
211 /// BenEncoder goes out of scope.
212 fn drop(&mut self) {
213 let _ = self.finish();
214 }
215}
216
217/// A struct to make the writing of XBEN files easier
218/// and more ergonomic.
219pub struct XBenEncoder<W: Write> {
220 encoder: XzEncoder<W>,
221 previous_sample: Vec<u8>,
222 count: u16,
223 variant: BenVariant,
224}
225
226impl<W: Write> XBenEncoder<W> {
227 /// Create a new XBenEncoder instance and handles
228 /// the XBEN file header.
229 ///
230 /// # Arguments
231 ///
232 /// * `encoder` - An XzEncoder to write the XBEN file to
233 /// * `variant` - The BEN variant to use (Standard or MkvChain)
234 ///
235 /// # Returns
236 ///
237 /// A new XBenEncoder instance
238 pub fn new(mut encoder: XzEncoder<W>, variant: BenVariant) -> Self {
239 match variant {
240 BenVariant::Standard => {
241 encoder.write_all(b"STANDARD BEN FILE").unwrap();
242 XBenEncoder {
243 encoder,
244 previous_sample: Vec::new(),
245 count: 0,
246 variant: BenVariant::Standard,
247 }
248 }
249 BenVariant::MkvChain => {
250 encoder.write_all(b"MKVCHAIN BEN FILE").unwrap();
251 XBenEncoder {
252 encoder,
253 previous_sample: Vec::new(),
254 count: 0,
255 variant: BenVariant::MkvChain,
256 }
257 }
258 }
259 }
260
261 /// Write a an assigment vector encoded as a JSON value
262 /// to the XBEN file.
263 ///
264 /// # Arguments
265 ///
266 /// * `data` - A JSON value containing an assignment vector
267 ///
268 /// # Returns
269 ///
270 /// A Result type that contains the result of the operation
271 pub fn write_json_value(&mut self, data: Value) -> Result<()> {
272 let encoded = encode_ben32_line(data);
273 match self.variant {
274 BenVariant::Standard => {
275 self.encoder.write_all(&encoded)?;
276 }
277 BenVariant::MkvChain => {
278 if encoded == self.previous_sample {
279 self.count += 1;
280 } else {
281 if self.count > 0 {
282 self.encoder.write_all(&self.previous_sample)?;
283 self.encoder.write_all(&self.count.to_be_bytes())?;
284 }
285 self.previous_sample = encoded;
286 self.count = 1;
287 }
288 }
289 }
290 Ok(())
291 }
292
293 /// Converts a raw BEN assignment file into to an XBEN file.
294 /// This function will check to see if the header is there and then
295 /// handle it accordingly.
296 ///
297 /// # Arguments
298 ///
299 /// * `reader` - A buffered reader for the input BEN file
300 ///
301 /// # Returns
302 ///
303 /// A Result type that contains the result of the operation
304 pub fn write_ben_file(&mut self, mut reader: impl BufRead) -> Result<()> {
305 let peek = reader.fill_buf()?;
306 let has_banner = peek.len() >= 17
307 && (peek.starts_with(b"STANDARD BEN FILE") || peek.starts_with(b"MKVCHAIN BEN FILE"));
308
309 if has_banner {
310 reader.consume(17);
311 }
312
313 ben_to_ben32_lines(&mut reader, &mut self.encoder, self.variant)
314 }
315}
316
317impl<W: Write> Drop for XBenEncoder<W> {
318 /// Make sure to finish writing the XBEN file when the
319 /// XBenEncoder goes out of scope.
320 fn drop(&mut self) {
321 if self.variant == BenVariant::MkvChain && self.count > 0 {
322 self.encoder
323 .write_all(&self.previous_sample)
324 .expect("Error writing last line to file");
325 self.encoder
326 .write_all(&self.count.to_be_bytes())
327 .expect("Error writing last line count to file");
328 }
329 }
330}
331
332/// This function takes a json encoded line containing an assignment
333/// vector and a sample number and encodes the assignment vector
334/// into a binary format known as "ben32". The ben32 format serves
335/// as an intermediate format that allows for efficient compression
336/// of BEN files using LZMA2 compression methods.
337///
338/// # Arguments
339///
340/// * `data` - A JSON object containing an assignment vector and a sample number
341///
342/// # Returns
343///
344/// A vector of bytes containing the ben32 encoded assignment vector
345fn encode_ben32_line(data: Value) -> Vec<u8> {
346 let assign_vec = data["assignment"].as_array().unwrap();
347 let mut prev_assign: u16 = 0;
348 let mut count: u16 = 0;
349 let mut first = true;
350
351 let mut ret = Vec::new();
352
353 for assignment in assign_vec {
354 let assign = assignment.as_u64().unwrap() as u16;
355 if first {
356 prev_assign = assign;
357 count = 1;
358 first = false;
359 continue;
360 }
361 if assign == prev_assign {
362 count += 1;
363 } else {
364 let encoded = (prev_assign as u32) << 16 | count as u32;
365 ret.extend(&encoded.to_be_bytes());
366 // Reset for next run
367 prev_assign = assign;
368 count = 1;
369 }
370 }
371
372 // Handle the last run
373 if count > 0 {
374 let encoded = (prev_assign as u32) << 16 | count as u32;
375 ret.extend(&encoded.to_be_bytes());
376 }
377
378 ret.extend([0, 0, 0, 0]);
379 ret
380}
381
382/// This function takes a JSONL file and compresses it to the
383/// XBEN format.
384///
385/// The JSONL file is assumed to be formatted in the standard
386///
387/// ```json
388/// {"assignment": [...], "sample": #}
389/// ```
390///
391/// format. While the BEN format is
392/// a simple bit-packed (streamable!) run-length encoded assignment
393/// vector, the XBEN format uses LZMA2 dictionary compression on
394/// the byte level to achieve better compression ratios. In order
395/// to use XBEN files, the `decode_xben_to_ben` function must be
396/// used to decode the file back into a BEN format.
397///
398/// # Arguments
399///
400/// * `reader` - A buffered reader for the input file
401/// * `writer` - A writer for the output file
402/// * `variant` - The BEN variant to use (Standard or MkvChain)
403/// * `n_threads` - The number of threads to use for compression (optional)
404/// * `compression_level` - The compression level to use (0-9, optional)
405///
406/// # Returns
407///
408/// A Result type that contains the result of the operation
409pub fn encode_jsonl_to_xben<R: BufRead, W: Write>(
410 reader: R,
411 writer: W,
412 variant: BenVariant,
413 n_threads: Option<u32>,
414 compression_level: Option<u32>,
415) -> Result<()> {
416 let mut n_cpus: u32 = n_threads.unwrap_or(1);
417 n_cpus = n_cpus
418 .min(
419 std::thread::available_parallelism()
420 .map(|n| n.get())
421 .unwrap_or(1) as u32,
422 )
423 .max(1);
424
425 let level = compression_level.unwrap_or(9).min(9).max(0);
426
427 let mt = MtStreamBuilder::new()
428 .threads(n_cpus)
429 .preset(level)
430 .block_size(0)
431 .encoder()
432 .expect("init MT encoder");
433 let encoder = XzEncoder::new_stream(writer, mt);
434 let mut ben_encoder = XBenEncoder::new(encoder, variant);
435
436 let mut line_num = 1;
437
438 for line_result in reader.lines() {
439 log!("Encoding line: {}\r", line_num);
440 line_num += 1;
441 let line = line_result?;
442 let data: Value = serde_json::from_str(&line).expect("Error parsing JSON from line");
443
444 ben_encoder.write_json_value(data)?;
445 }
446
447 logln!();
448 logln!("Done!");
449
450 Ok(())
451}
452
453/// This is a convenience function that applies level 9 LZMA2 compression
454/// to a general file.
455///
456/// # Arguments
457///
458/// * `reader` - A buffered reader for the input file
459/// * `writer` - A writer for the output file
460///
461/// # Returns
462///
463/// A Result type that contains the result of the operation
464///
465/// # Example
466///
467/// ```
468/// use ben::encode::xz_compress;
469/// use lipsum::lipsum;
470/// use std::io::{BufReader, BufWriter};
471///
472/// let input = lipsum(100);
473/// let reader = BufReader::new(input.as_bytes());
474///
475/// let mut output_buffer = Vec::new();
476/// let writer = BufWriter::new(&mut output_buffer);
477///
478/// xz_compress(reader, writer, Some(1), Some(1)).unwrap();
479///
480/// println!("{:?}", output_buffer);
481/// ```
482pub fn xz_compress<R: BufRead, W: Write>(
483 mut reader: R,
484 writer: W,
485 n_threads: Option<u32>,
486 compression_level: Option<u32>,
487) -> Result<()> {
488 let mut buff = [0; 4096];
489 // let mut encoder = XzEncoder::new(writer, 1);
490
491 let mut n_cpus: u32 = n_threads.unwrap_or(1);
492 n_cpus = n_cpus
493 .min(
494 std::thread::available_parallelism()
495 .map(|n| n.get())
496 .unwrap_or(1) as u32,
497 )
498 .max(1);
499
500 let level = compression_level.unwrap_or(9).min(9).max(0);
501
502 let mt = MtStreamBuilder::new()
503 .threads(n_cpus)
504 .preset(level)
505 .block_size(0)
506 .encoder()
507 .expect("init MT encoder");
508 let mut encoder = XzEncoder::new_stream(writer, mt);
509
510 while let Ok(count) = reader.read(&mut buff) {
511 if count == 0 {
512 break;
513 }
514 encoder.write_all(&buff[..count])?;
515 }
516 drop(encoder); // Make sure to flush and finish compression
517 Ok(())
518}
519
520/// This function takes in a standard assignment vector and encodes
521/// it into a bit-packed ben version.
522///
523/// # Arguments
524///
525/// * `assign_vec` - A vector of u16 values representing the assignment vector
526///
527/// # Returns
528///
529/// A vector of bytes containing the bit-packed ben encoded assignment vector
530pub fn encode_ben_vec_from_assign(assign_vec: Vec<u16>) -> Vec<u8> {
531 let rle_vec: Vec<(u16, u16)> = assign_to_rle(assign_vec);
532 encode_ben_vec_from_rle(rle_vec)
533}
534
535/// This function takes a run-length encoded assignment vector and
536/// encodes into a bit-packed ben version
537///
538/// # Arguments
539///
540/// * `rle_vec` - A vector of tuples containing the value and length of each run
541///
542/// # Returns
543///
544/// A vector of bytes containing the bit-packed ben encoded assignment vector
545pub fn encode_ben_vec_from_rle(rle_vec: Vec<(u16, u16)>) -> Vec<u8> {
546 let mut output_vec: Vec<u8> = Vec::new();
547
548 let max_val: u16 = rle_vec.iter().max_by_key(|x| x.0).unwrap().0;
549 let max_len: u16 = rle_vec.iter().max_by_key(|x| x.1).unwrap().1;
550 let max_val_bits: u8 = (16 - max_val.leading_zeros() as u8).max(1);
551 let max_len_bits: u8 = 16 - max_len.leading_zeros() as u8;
552 let assign_bits: u32 = (max_val_bits + max_len_bits) as u32;
553 let n_bytes: u32 = if (assign_bits * rle_vec.len() as u32) % 8 == 0 {
554 (assign_bits * rle_vec.len() as u32) / 8
555 } else {
556 (assign_bits * rle_vec.len() as u32) / 8 + 1
557 };
558
559 output_vec.push(max_val_bits);
560 output_vec.push(max_len_bits);
561 output_vec.extend(n_bytes.to_be_bytes().as_slice());
562
563 let mut remainder: u32 = 0;
564 let mut remainder_bits: u8 = 0;
565
566 for (val, len) in rle_vec {
567 let mut new_val: u32 = (remainder << max_val_bits) | (val as u32);
568
569 let mut buff: u8;
570
571 let mut n_bits_left: u8 = remainder_bits + max_val_bits;
572
573 while n_bits_left >= 8 {
574 n_bits_left -= 8;
575 buff = (new_val >> n_bits_left) as u8;
576 output_vec.push(buff);
577 new_val = new_val & (!((0xFFFFFFFF as u32) << n_bits_left));
578 }
579
580 new_val = (new_val << max_len_bits) | (len as u32);
581 n_bits_left += max_len_bits;
582
583 while n_bits_left >= 8 {
584 n_bits_left -= 8;
585 buff = (new_val >> n_bits_left) as u8;
586 output_vec.push(buff);
587 new_val = new_val & (!((0xFFFFFFFF as u32) << n_bits_left));
588 }
589
590 remainder_bits = n_bits_left;
591 remainder = new_val;
592 }
593
594 if remainder_bits > 0 {
595 let buff = (remainder << (8 - remainder_bits)) as u8;
596 output_vec.push(buff);
597 }
598
599 output_vec
600}
601
602/// This function takes a JSONL file and compresses it into
603/// the BEN format.
604///
605/// The JSONL file is assumed to be formatted in the standard
606///
607/// ```json
608/// {"assignment": [...], "sample": #}
609/// ```
610///
611/// format.
612///
613/// # Arguments
614///
615/// * `reader` - A buffered reader for the input file
616/// * `writer` - A writer for the output file
617/// * `variant` - The BEN variant to use (Standard or MkvChain)
618///
619/// # Returns
620///
621/// A Result type that contains the result of the operation
622///
623/// # Example
624///
625/// ```
626/// use std::io::{BufReader, BufWriter};
627/// use serde_json::json;
628/// use ben::{encode::encode_jsonl_to_ben, BenVariant};
629///
630/// let input = r#"{"assignment": [1,1,1,2,2,2], "sample": 1}"#.to_string()
631/// + "\n"
632/// + r#"{"assignment": [1,1,2,2,1,2], "sample": 2}"#;
633///
634/// let reader = BufReader::new(input.as_bytes());
635/// let mut write_buffer = Vec::new();
636/// let mut writer = BufWriter::new(&mut write_buffer);
637///
638/// encode_jsonl_to_ben(reader, writer, BenVariant::Standard).unwrap();
639///
640/// println!("{:?}", write_buffer);
641/// // This will output
642/// // [83, 84, 65, 78, 68, 65, 82, 68, 32,
643/// // 66, 69, 78, 32, 70, 73, 76, 69, 2,
644/// // 2, 0, 0, 0, 1, 123, 2, 2, 0, 0, 0,
645/// // 2, 106, 89]
646/// ```
647///
648pub fn encode_jsonl_to_ben<R: BufRead, W: Write>(
649 reader: R,
650 writer: W,
651 variant: BenVariant,
652) -> Result<()> {
653 let mut line_num = 1;
654 let mut ben_encoder = BenEncoder::new(writer, variant);
655 for line_result in reader.lines() {
656 log!("Encoding line: {}\r", line_num);
657 line_num += 1;
658 let line = line_result?; // Handle potential I/O errors for each line
659 let data: Value = serde_json::from_str(&line).expect("Error parsing JSON from line");
660
661 ben_encoder.write_json_value(data)?;
662 }
663 logln!();
664 logln!("Done!"); // Print newline after progress bar
665 Ok(())
666}
667
668/// This function takes a BEN file and encodes it into an XBEN
669/// file using bit-to-byte decompression followed by LZMA2 compression.
670///
671/// # Arguments
672///
673/// * `reader` - A buffered reader for the input file
674/// * `writer` - A writer for the output file
675/// * `n_threads` - The number of threads to use for compression (optional)
676/// * `compression_level` - The compression level to use (0-9, optional)
677///
678/// # Returns
679///
680/// A Result type that contains the result of the operation
681pub fn encode_ben_to_xben<R: BufRead, W: Write>(
682 mut reader: R,
683 writer: W,
684 n_threads: Option<u32>,
685 compression_level: Option<u32>,
686) -> Result<()> {
687 let mut check_buffer = [0u8; 17];
688 reader.read_exact(&mut check_buffer)?;
689
690 let mut n_cpus: u32 = n_threads.unwrap_or(1);
691 n_cpus = n_cpus
692 .min(
693 std::thread::available_parallelism()
694 .map(|n| n.get())
695 .unwrap_or(1) as u32,
696 )
697 .max(1);
698
699 let level = compression_level.unwrap_or(9).min(9).max(0);
700
701 let mt = MtStreamBuilder::new()
702 .threads(n_cpus)
703 .preset(level)
704 .block_size(0)
705 .encoder()
706 .expect("init MT encoder");
707 let encoder = XzEncoder::new_stream(writer, mt);
708
709 let mut ben_encoder = match &check_buffer {
710 b"STANDARD BEN FILE" => XBenEncoder::new(encoder, BenVariant::Standard),
711 b"MKVCHAIN BEN FILE" => XBenEncoder::new(encoder, BenVariant::MkvChain),
712 _ => {
713 return Err(io::Error::new(
714 io::ErrorKind::InvalidData,
715 "Invalid file format",
716 ));
717 }
718 };
719
720 ben_encoder.write_ben_file(reader)?;
721
722 Ok(())
723}
724
725#[cfg(test)]
726#[path = "tests/encode_tests.rs"]
727mod tests;