ben/encode/
mod.rs

1//! This module contains the main encoding functions for turning an
2//! input JSONL or BEN file into a BEN or XBEN file.
3//!
4//! Any input JSONL file is expected to be in the standard
5//!
6//! ```json
7//! {"assignment": [...], "sample": #}
8//! ```
9//!
10//! format.
11//!
12//! The BEN format is
13//! a simple bit-packed run-length encoded assignment vector with
14//! some special headers that allow the decoder to know how many
15//! bytes to read for each sample.
16//!
17//!
18//! The XBEN format uses LZMA2 dictionary compression on
19//! a byte-level decompressed version of the BEN format (known as ben32)
20//! to achieve better compression ratios than we could achieve with applying
21//! LZMA2 compression directly to the BEN format.
22
23pub mod relabel;
24pub mod translate;
25
26use crate::utils::*;
27use serde_json::Value;
28use std::io::{self, BufRead, Read, Result, Write};
29use xz2::stream::MtStreamBuilder;
30use xz2::write::XzEncoder;
31
32use self::translate::ben_to_ben32_lines;
33use super::{log, logln, BenVariant};
34
35/// A struct to make the writing of BEN files easier
36/// and more ergonomic.
37///
38/// # Example
39///
40/// ```
41/// use ben::{encode::BenEncoder, BenVariant};
42///
43/// let mut buffer = Vec::new();
44/// let mut ben_encoder = BenEncoder::new(&mut buffer, BenVariant::Standard);
45///
46/// ben_encoder.write_assignment(vec![1, 1, 1, 2, 2, 2]);
47/// ```
48pub struct BenEncoder<W: Write> {
49    writer: W,
50    previous_sample: Vec<u8>,
51    count: u16,
52    variant: BenVariant,
53    complete: bool,
54}
55
56impl<W: Write> BenEncoder<W> {
57    /// Create a new BenEncoder instance and handles
58    /// the BEN file header.
59    ///
60    /// # Arguments
61    ///
62    /// * `writer` - A writer to write the BEN file to
63    /// * `variant` - The BEN variant to use (Standard or MkvChain)
64    ///
65    /// # Returns
66    ///
67    /// A new BenEncoder instance
68    pub fn new(mut writer: W, variant: BenVariant) -> Self {
69        match variant {
70            BenVariant::Standard => {
71                writer.write_all(b"STANDARD BEN FILE").unwrap();
72            }
73            BenVariant::MkvChain => {
74                writer.write_all(b"MKVCHAIN BEN FILE").unwrap();
75            }
76        }
77        BenEncoder {
78            writer,
79            previous_sample: Vec::new(),
80            count: 0,
81            complete: false,
82            variant: variant,
83        }
84    }
85
86    /// Write a run-length encoded assignment vector to the
87    /// BEN file.
88    ///
89    /// # Arguments
90    ///
91    /// * `rle_vec` - A run-length encoded assignment vector to write
92    ///
93    /// # Returns
94    ///
95    /// A Result type that contains the result of the operation
96    pub fn write_rle(&mut self, rle_vec: Vec<(u16, u16)>) -> Result<()> {
97        match self.variant {
98            BenVariant::Standard => {
99                let encoded = encode_ben_vec_from_rle(rle_vec);
100                self.writer.write_all(&encoded)?;
101                Ok(())
102            }
103            BenVariant::MkvChain => {
104                let encoded = encode_ben_vec_from_rle(rle_vec);
105                if encoded == self.previous_sample {
106                    self.count += 1;
107                } else {
108                    if self.count > 0 {
109                        self.writer.write_all(&self.previous_sample)?;
110                        self.writer.write_all(&self.count.to_be_bytes())?;
111                    }
112                    self.previous_sample = encoded;
113                    self.count = 1;
114                }
115                Ok(())
116            }
117        }
118    }
119
120    /// Write an assignment vector to the BEN file.
121    ///
122    /// # Arguments
123    ///
124    /// * `assign_vec` - An assignment vector to write
125    ///
126    /// # Returns
127    ///
128    /// A Result type that contains the result of the operation
129    pub fn write_assignment(&mut self, assign_vec: Vec<u16>) -> Result<()> {
130        let rle_vec = assign_to_rle(assign_vec);
131        self.write_rle(rle_vec)?;
132        Ok(())
133    }
134
135    /// Write a JSON value containing an assignment vector to the BEN file.
136    ///
137    /// # Arguments
138    ///
139    /// * `data` - A JSON value containing an assignment vector
140    ///
141    /// # Returns
142    ///
143    /// A Result type that contains the result of the operation
144    pub fn write_json_value(&mut self, data: Value) -> Result<()> {
145        let assign_vec = data["assignment"].as_array().ok_or_else(|| {
146            io::Error::new(
147                io::ErrorKind::InvalidData,
148                "'assignment' field either missing or is not an array of integers",
149            )
150        })?;
151        let converted_vec = assign_vec
152            .into_iter()
153            .map(|x| {
154                let u = x.as_u64().ok_or_else(|| {
155                    io::Error::new(
156                        io::ErrorKind::InvalidData,
157                        format!(
158                            "The value '{}' could not be unwrapped as an unsigned 64 bit integer.",
159                            x
160                        ),
161                    )
162                })?;
163
164                u16::try_from(u).map_err(|_| {
165                    io::Error::new(
166                        io::ErrorKind::InvalidData,
167                        format!("The value '{}' is too large to fit in a u16.", u),
168                    )
169                })
170            })
171            .collect::<Result<Vec<u16>>>()?;
172
173        let rle_vec = assign_to_rle(converted_vec);
174        self.write_rle(rle_vec)?;
175        Ok(())
176    }
177
178    /// Cleanup function to make sure the last sample is written
179    /// to the BEN file if using the MkvChain variant.
180    ///
181    /// This function is automatically called when the BenEncoder
182    /// goes out of scope, but can be called manually if desired.
183    ///
184    /// # Returns
185    ///
186    /// A Result type that contains the result of the operation
187    ///
188    /// # Errors
189    ///
190    /// This function will return an error if the writer encounters
191    /// an error while writing the last sample to the BEN file.
192    pub fn finish(&mut self) -> Result<()> {
193        if self.complete {
194            return Ok(());
195        }
196        if self.variant == BenVariant::MkvChain && self.count > 0 {
197            self.writer
198                .write_all(&self.previous_sample)
199                .expect("Error while writing last line to file");
200            self.writer
201                .write_all(&self.count.to_be_bytes())
202                .expect("Error while writing last count to file");
203        }
204        self.complete = true;
205        Ok(())
206    }
207}
208
209impl<W: Write> Drop for BenEncoder<W> {
210    /// Make sure to finish writing the BEN file when the
211    /// BenEncoder goes out of scope.
212    fn drop(&mut self) {
213        let _ = self.finish();
214    }
215}
216
217/// A struct to make the writing of XBEN files easier
218/// and more ergonomic.
219pub struct XBenEncoder<W: Write> {
220    encoder: XzEncoder<W>,
221    previous_sample: Vec<u8>,
222    count: u16,
223    variant: BenVariant,
224}
225
226impl<W: Write> XBenEncoder<W> {
227    /// Create a new XBenEncoder instance and handles
228    /// the XBEN file header.
229    ///
230    /// # Arguments
231    ///
232    /// * `encoder` - An XzEncoder to write the XBEN file to
233    /// * `variant` - The BEN variant to use (Standard or MkvChain)
234    ///
235    /// # Returns
236    ///
237    /// A new XBenEncoder instance
238    pub fn new(mut encoder: XzEncoder<W>, variant: BenVariant) -> Self {
239        match variant {
240            BenVariant::Standard => {
241                encoder.write_all(b"STANDARD BEN FILE").unwrap();
242                XBenEncoder {
243                    encoder,
244                    previous_sample: Vec::new(),
245                    count: 0,
246                    variant: BenVariant::Standard,
247                }
248            }
249            BenVariant::MkvChain => {
250                encoder.write_all(b"MKVCHAIN BEN FILE").unwrap();
251                XBenEncoder {
252                    encoder,
253                    previous_sample: Vec::new(),
254                    count: 0,
255                    variant: BenVariant::MkvChain,
256                }
257            }
258        }
259    }
260
261    /// Write a an assigment vector encoded as a JSON value
262    /// to the XBEN file.
263    ///
264    /// # Arguments
265    ///
266    /// * `data` - A JSON value containing an assignment vector
267    ///
268    /// # Returns
269    ///
270    /// A Result type that contains the result of the operation
271    pub fn write_json_value(&mut self, data: Value) -> Result<()> {
272        let encoded = encode_ben32_line(data);
273        match self.variant {
274            BenVariant::Standard => {
275                self.encoder.write_all(&encoded)?;
276            }
277            BenVariant::MkvChain => {
278                if encoded == self.previous_sample {
279                    self.count += 1;
280                } else {
281                    if self.count > 0 {
282                        self.encoder.write_all(&self.previous_sample)?;
283                        self.encoder.write_all(&self.count.to_be_bytes())?;
284                    }
285                    self.previous_sample = encoded;
286                    self.count = 1;
287                }
288            }
289        }
290        Ok(())
291    }
292
293    /// Converts a raw BEN assignment file into to an XBEN file.
294    /// This function will check to see if the header is there and then
295    /// handle it accordingly.
296    ///
297    /// # Arguments
298    ///
299    /// * `reader` - A buffered reader for the input BEN file
300    ///
301    /// # Returns
302    ///
303    /// A Result type that contains the result of the operation
304    pub fn write_ben_file(&mut self, mut reader: impl BufRead) -> Result<()> {
305        let peek = reader.fill_buf()?;
306        let has_banner = peek.len() >= 17
307            && (peek.starts_with(b"STANDARD BEN FILE") || peek.starts_with(b"MKVCHAIN BEN FILE"));
308
309        if has_banner {
310            reader.consume(17);
311        }
312
313        ben_to_ben32_lines(&mut reader, &mut self.encoder, self.variant)
314    }
315}
316
317impl<W: Write> Drop for XBenEncoder<W> {
318    /// Make sure to finish writing the XBEN file when the
319    /// XBenEncoder goes out of scope.
320    fn drop(&mut self) {
321        if self.variant == BenVariant::MkvChain && self.count > 0 {
322            self.encoder
323                .write_all(&self.previous_sample)
324                .expect("Error writing last line to file");
325            self.encoder
326                .write_all(&self.count.to_be_bytes())
327                .expect("Error writing last line count to file");
328        }
329    }
330}
331
332/// This function takes a json encoded line containing an assignment
333/// vector and a sample number and encodes the assignment vector
334/// into a binary format known as "ben32". The ben32 format serves
335/// as an intermediate format that allows for efficient compression
336/// of BEN files using LZMA2 compression methods.
337///
338/// # Arguments
339///
340/// * `data` - A JSON object containing an assignment vector and a sample number
341///
342/// # Returns
343///
344/// A vector of bytes containing the ben32 encoded assignment vector
345fn encode_ben32_line(data: Value) -> Vec<u8> {
346    let assign_vec = data["assignment"].as_array().unwrap();
347    let mut prev_assign: u16 = 0;
348    let mut count: u16 = 0;
349    let mut first = true;
350
351    let mut ret = Vec::new();
352
353    for assignment in assign_vec {
354        let assign = assignment.as_u64().unwrap() as u16;
355        if first {
356            prev_assign = assign;
357            count = 1;
358            first = false;
359            continue;
360        }
361        if assign == prev_assign {
362            count += 1;
363        } else {
364            let encoded = (prev_assign as u32) << 16 | count as u32;
365            ret.extend(&encoded.to_be_bytes());
366            // Reset for next run
367            prev_assign = assign;
368            count = 1;
369        }
370    }
371
372    // Handle the last run
373    if count > 0 {
374        let encoded = (prev_assign as u32) << 16 | count as u32;
375        ret.extend(&encoded.to_be_bytes());
376    }
377
378    ret.extend([0, 0, 0, 0]);
379    ret
380}
381
382/// This function takes a JSONL file and compresses it to the
383/// XBEN format.
384///
385/// The JSONL file is assumed to be formatted in the standard
386///
387/// ```json
388/// {"assignment": [...], "sample": #}
389/// ```
390///
391/// format. While the BEN format is
392/// a simple bit-packed (streamable!) run-length encoded assignment
393/// vector, the XBEN format uses LZMA2 dictionary compression on
394/// the byte level to achieve better compression ratios. In order
395/// to use XBEN files, the `decode_xben_to_ben` function must be
396/// used to decode the file back into a BEN format.
397///
398/// # Arguments
399///
400/// * `reader` - A buffered reader for the input file
401/// * `writer` - A writer for the output file
402/// * `variant` - The BEN variant to use (Standard or MkvChain)
403/// * `n_threads` - The number of threads to use for compression (optional)
404/// * `compression_level` - The compression level to use (0-9, optional)
405///
406/// # Returns
407///
408/// A Result type that contains the result of the operation
409pub fn encode_jsonl_to_xben<R: BufRead, W: Write>(
410    reader: R,
411    writer: W,
412    variant: BenVariant,
413    n_threads: Option<u32>,
414    compression_level: Option<u32>,
415) -> Result<()> {
416    let mut n_cpus: u32 = n_threads.unwrap_or(1);
417    n_cpus = n_cpus
418        .min(
419            std::thread::available_parallelism()
420                .map(|n| n.get())
421                .unwrap_or(1) as u32,
422        )
423        .max(1);
424
425    let level = compression_level.unwrap_or(9).min(9).max(0);
426
427    let mt = MtStreamBuilder::new()
428        .threads(n_cpus)
429        .preset(level)
430        .block_size(0)
431        .encoder()
432        .expect("init MT encoder");
433    let encoder = XzEncoder::new_stream(writer, mt);
434    let mut ben_encoder = XBenEncoder::new(encoder, variant);
435
436    let mut line_num = 1;
437
438    for line_result in reader.lines() {
439        log!("Encoding line: {}\r", line_num);
440        line_num += 1;
441        let line = line_result?;
442        let data: Value = serde_json::from_str(&line).expect("Error parsing JSON from line");
443
444        ben_encoder.write_json_value(data)?;
445    }
446
447    logln!();
448    logln!("Done!");
449
450    Ok(())
451}
452
453/// This is a convenience function that applies level 9 LZMA2 compression
454/// to a general file.
455///
456/// # Arguments
457///
458/// * `reader` - A buffered reader for the input file
459/// * `writer` - A writer for the output file
460///
461/// # Returns
462///
463/// A Result type that contains the result of the operation
464///
465/// # Example
466///
467/// ```
468/// use ben::encode::xz_compress;
469/// use lipsum::lipsum;
470/// use std::io::{BufReader, BufWriter};
471///
472/// let input = lipsum(100);
473/// let reader = BufReader::new(input.as_bytes());
474///
475/// let mut output_buffer = Vec::new();
476/// let writer = BufWriter::new(&mut output_buffer);
477///
478/// xz_compress(reader, writer, Some(1), Some(1)).unwrap();
479///
480/// println!("{:?}", output_buffer);
481/// ```
482pub fn xz_compress<R: BufRead, W: Write>(
483    mut reader: R,
484    writer: W,
485    n_threads: Option<u32>,
486    compression_level: Option<u32>,
487) -> Result<()> {
488    let mut buff = [0; 4096];
489    // let mut encoder = XzEncoder::new(writer, 1);
490
491    let mut n_cpus: u32 = n_threads.unwrap_or(1);
492    n_cpus = n_cpus
493        .min(
494            std::thread::available_parallelism()
495                .map(|n| n.get())
496                .unwrap_or(1) as u32,
497        )
498        .max(1);
499
500    let level = compression_level.unwrap_or(9).min(9).max(0);
501
502    let mt = MtStreamBuilder::new()
503        .threads(n_cpus)
504        .preset(level)
505        .block_size(0)
506        .encoder()
507        .expect("init MT encoder");
508    let mut encoder = XzEncoder::new_stream(writer, mt);
509
510    while let Ok(count) = reader.read(&mut buff) {
511        if count == 0 {
512            break;
513        }
514        encoder.write_all(&buff[..count])?;
515    }
516    drop(encoder); // Make sure to flush and finish compression
517    Ok(())
518}
519
520/// This function takes in a standard assignment vector and encodes
521/// it into a bit-packed ben version.
522///
523/// # Arguments
524///
525/// * `assign_vec` - A vector of u16 values representing the assignment vector
526///
527/// # Returns
528///
529/// A vector of bytes containing the bit-packed ben encoded assignment vector
530pub fn encode_ben_vec_from_assign(assign_vec: Vec<u16>) -> Vec<u8> {
531    let rle_vec: Vec<(u16, u16)> = assign_to_rle(assign_vec);
532    encode_ben_vec_from_rle(rle_vec)
533}
534
535/// This function takes a run-length encoded assignment vector and
536/// encodes into a bit-packed ben version
537///
538/// # Arguments
539///
540/// * `rle_vec` - A vector of tuples containing the value and length of each run
541///
542/// # Returns
543///
544/// A vector of bytes containing the bit-packed ben encoded assignment vector
545pub fn encode_ben_vec_from_rle(rle_vec: Vec<(u16, u16)>) -> Vec<u8> {
546    let mut output_vec: Vec<u8> = Vec::new();
547
548    let max_val: u16 = rle_vec.iter().max_by_key(|x| x.0).unwrap().0;
549    let max_len: u16 = rle_vec.iter().max_by_key(|x| x.1).unwrap().1;
550    let max_val_bits: u8 = (16 - max_val.leading_zeros() as u8).max(1);
551    let max_len_bits: u8 = 16 - max_len.leading_zeros() as u8;
552    let assign_bits: u32 = (max_val_bits + max_len_bits) as u32;
553    let n_bytes: u32 = if (assign_bits * rle_vec.len() as u32) % 8 == 0 {
554        (assign_bits * rle_vec.len() as u32) / 8
555    } else {
556        (assign_bits * rle_vec.len() as u32) / 8 + 1
557    };
558
559    output_vec.push(max_val_bits);
560    output_vec.push(max_len_bits);
561    output_vec.extend(n_bytes.to_be_bytes().as_slice());
562
563    let mut remainder: u32 = 0;
564    let mut remainder_bits: u8 = 0;
565
566    for (val, len) in rle_vec {
567        let mut new_val: u32 = (remainder << max_val_bits) | (val as u32);
568
569        let mut buff: u8;
570
571        let mut n_bits_left: u8 = remainder_bits + max_val_bits;
572
573        while n_bits_left >= 8 {
574            n_bits_left -= 8;
575            buff = (new_val >> n_bits_left) as u8;
576            output_vec.push(buff);
577            new_val = new_val & (!((0xFFFFFFFF as u32) << n_bits_left));
578        }
579
580        new_val = (new_val << max_len_bits) | (len as u32);
581        n_bits_left += max_len_bits;
582
583        while n_bits_left >= 8 {
584            n_bits_left -= 8;
585            buff = (new_val >> n_bits_left) as u8;
586            output_vec.push(buff);
587            new_val = new_val & (!((0xFFFFFFFF as u32) << n_bits_left));
588        }
589
590        remainder_bits = n_bits_left;
591        remainder = new_val;
592    }
593
594    if remainder_bits > 0 {
595        let buff = (remainder << (8 - remainder_bits)) as u8;
596        output_vec.push(buff);
597    }
598
599    output_vec
600}
601
602/// This function takes a JSONL file and compresses it into
603/// the BEN format.
604///
605/// The JSONL file is assumed to be formatted in the standard
606///
607/// ```json
608/// {"assignment": [...], "sample": #}
609/// ```
610///
611/// format.
612///
613/// # Arguments
614///
615/// * `reader` - A buffered reader for the input file
616/// * `writer` - A writer for the output file
617/// * `variant` - The BEN variant to use (Standard or MkvChain)
618///
619/// # Returns
620///
621/// A Result type that contains the result of the operation
622///
623/// # Example
624///
625/// ```
626/// use std::io::{BufReader, BufWriter};
627/// use serde_json::json;
628/// use ben::{encode::encode_jsonl_to_ben, BenVariant};
629///
630/// let input = r#"{"assignment": [1,1,1,2,2,2], "sample": 1}"#.to_string()
631///     + "\n"
632///     + r#"{"assignment": [1,1,2,2,1,2], "sample": 2}"#;
633///
634/// let reader = BufReader::new(input.as_bytes());
635/// let mut write_buffer = Vec::new();
636/// let mut writer = BufWriter::new(&mut write_buffer);
637///
638/// encode_jsonl_to_ben(reader, writer, BenVariant::Standard).unwrap();
639///
640/// println!("{:?}", write_buffer);
641/// // This will output
642/// // [83, 84, 65, 78, 68, 65, 82, 68, 32,
643/// //  66, 69, 78, 32, 70, 73, 76, 69, 2,
644/// //  2, 0, 0, 0, 1, 123, 2, 2, 0, 0, 0,
645/// //  2, 106, 89]
646/// ```
647///
648pub fn encode_jsonl_to_ben<R: BufRead, W: Write>(
649    reader: R,
650    writer: W,
651    variant: BenVariant,
652) -> Result<()> {
653    let mut line_num = 1;
654    let mut ben_encoder = BenEncoder::new(writer, variant);
655    for line_result in reader.lines() {
656        log!("Encoding line: {}\r", line_num);
657        line_num += 1;
658        let line = line_result?; // Handle potential I/O errors for each line
659        let data: Value = serde_json::from_str(&line).expect("Error parsing JSON from line");
660
661        ben_encoder.write_json_value(data)?;
662    }
663    logln!();
664    logln!("Done!"); // Print newline after progress bar
665    Ok(())
666}
667
668/// This function takes a BEN file and encodes it into an XBEN
669/// file using bit-to-byte decompression followed by LZMA2 compression.
670///
671/// # Arguments
672///
673/// * `reader` - A buffered reader for the input file
674/// * `writer` - A writer for the output file
675/// * `n_threads` - The number of threads to use for compression (optional)
676/// * `compression_level` - The compression level to use (0-9, optional)
677///
678/// # Returns
679///
680/// A Result type that contains the result of the operation
681pub fn encode_ben_to_xben<R: BufRead, W: Write>(
682    mut reader: R,
683    writer: W,
684    n_threads: Option<u32>,
685    compression_level: Option<u32>,
686) -> Result<()> {
687    let mut check_buffer = [0u8; 17];
688    reader.read_exact(&mut check_buffer)?;
689
690    let mut n_cpus: u32 = n_threads.unwrap_or(1);
691    n_cpus = n_cpus
692        .min(
693            std::thread::available_parallelism()
694                .map(|n| n.get())
695                .unwrap_or(1) as u32,
696        )
697        .max(1);
698
699    let level = compression_level.unwrap_or(9).min(9).max(0);
700
701    let mt = MtStreamBuilder::new()
702        .threads(n_cpus)
703        .preset(level)
704        .block_size(0)
705        .encoder()
706        .expect("init MT encoder");
707    let encoder = XzEncoder::new_stream(writer, mt);
708
709    let mut ben_encoder = match &check_buffer {
710        b"STANDARD BEN FILE" => XBenEncoder::new(encoder, BenVariant::Standard),
711        b"MKVCHAIN BEN FILE" => XBenEncoder::new(encoder, BenVariant::MkvChain),
712        _ => {
713            return Err(io::Error::new(
714                io::ErrorKind::InvalidData,
715                "Invalid file format",
716            ));
717        }
718    };
719
720    ben_encoder.write_ben_file(reader)?;
721
722    Ok(())
723}
724
725#[cfg(test)]
726#[path = "tests/encode_tests.rs"]
727mod tests;