bzip2_os/compression/compress.rs
1//! This manages the compression process for BZIP2 encoding.
2//!
3//! The compression process is multi-threaded. A thread is spawned first to receive the blocks of 
4//! data that are compressed by the block compression routine. Then the input is broken into blocks and a thread 
5//! is spawned to compress each block.
6//! 
7//! When a block is compressed, the compressed data along with a sequence number is passed to the aggregator/receiver. 
8//! If the sequence number is the next block to be written out, the block is added to the output. If it arrived
9//! out of sequence, it is held until the previous blocks can be written.
10//! 
11//! Once all blocks are written, the stream footer is written and the process is completed.
12//! 
13//! NOTE 1: THE ROUTINES FOR FILE I/O MAY NOT PROPERLY RESOLVE ALL I/O ERRORS.
14//! 
15//! NOTE 2: BZIP2 should default to deleting the source file (if input comes from a file), and set the creation date
16//! of the compressed file to mirror the original file. This is NOT yet implemented.
17//! 
18//! 
19use super::compress_block::compress_block;
20use crate::bitstream::bitwriter::BitWriter;
21use crate::tools::{cli::BzOpts, rle1::RLE1Block};
22use rayon::prelude::*;
23use simplelog::info;
24use std::fs::File;
25use std::io;
26
27#[allow(clippy::unusual_byte_groupings)]
28/*
29    This is repsonsible for creating the bitstream writer, a struct that
30    contains the block data passes to the block compression routine,
31    and an indictor of whether we are done processing all the data.
32
33    Each block will be passed to compress_block.rs for RLE1, BWTransform,
34    MTF, RLE2, and huffman compression.
35
36    Again, this will iterate multiple times to get through the input file.
37*/
38
39/// Compress the input file defined in opts `<BzOpts>`. Modified for multi-core processing.
40pub fn compress(opts: &mut BzOpts) -> io::Result<()> {
41    /*
42      Since this can be parallel, we pass a reference to the u8 data as well as a sequence number.
43      We will receive back the compressed data and sequence number. We will then assemble the compressed
44      data segments in the correct order and write them out to the output file.
45
46      THE ROUTINES FOR FILE I/O ARE RUDEMENTARY, AND MAY NOT PROPERLY RESOLVE ALL I/O ERRORS.
47    */
48
49    // Prepare to read the data.
50    let fname = opts.files[0].clone();
51    let source_file = File::open(&fname)?;
52
53    // Initialize the RLE1 reader/iterator. This reads the input file and creates blocks of the
54    // proper size to then be compressed.
55    let block_size = (opts.block_size as usize * 100000) - 19;
56    let rle1_blocks = RLE1Block::new(source_file, block_size);
57
58    // Prepare to write the compressed data. 
59    let mut fname = opts.files[0].clone();
60    fname.push_str(".bz2");
61
62    /*
63    This works by compressing each block in parallel. Depending on the sequence of when those blocks finish,
64    this will hold compressed blocks in memory until it is their turn to be written.
65    */
66
67    // Initialize thread channel communication. Sends block data, sequence number, and indicator whether
68    //  this is the last block
69    let (tx, rx) = std::sync::mpsc::channel();
70    // Initialize a bitwriter.
71    let mut bw = BitWriter::new(&fname, opts.block_size as u8);
72
73    // Spawn the BitWriter thread and wait for blocks to write.
74    let handle = std::thread::spawn(move || {
75        // Set the current block (the block we are waiting to write) to 0.
76        let mut current_block = 0;
77        // Initialize a vec to hold out-of-sequence blocks we might receive
78        let mut results = vec![];
79
80        'outer: loop {
81            info!(
82                "RX: Waiting for block {}. The queue contains {} blocks.",
83                current_block,
84                results.len(),
85            );
86
87            // Wait for a block to be sent to this thread.
88            let result: ((Vec<u8>, u8), usize, bool) = rx.recv().unwrap();
89            // If the block is the one we are waiting for, process it.
90            if result.1 == current_block {
91                info!("RX: Found block {}. Writing it...", current_block,);
92                let data = &result.0 .0;
93                let padding = result.0 .1;
94                let last = result.2;
95                bw.add_block(last, data, padding).unwrap();
96                current_block += 1;
97                if last {
98                    break;
99                }
100            } else {
101                info!(
102                    "RX: Adding block {} to the queue. The queue will now contain {} blocks.",
103                    result.1,
104                    results.len() + 1,
105                );
106                // Otherwise, save it until we get the one we want.
107                results.push(result);
108            }
109            while let Some(idx) = results.iter().position(|x| x.1 == current_block) {
110                info!("RX: Found block {}. Writing it...", current_block,);
111                let data = &results[idx].0 .0;
112                let last_bits = results[idx].0 .1;
113                let last = results[idx].2;
114                bw.add_block(last, data, last_bits).unwrap();
115                results.swap_remove(idx);
116                current_block += 1;
117                if last {
118                    break 'outer;
119                }
120            }
121        }
122    });
123
124    // Build the RLE1 blocks and compress them
125    rle1_blocks
126        .into_iter()
127        .enumerate()
128        .par_bridge()
129        .for_each_with(tx, |tx, (i, (crc, block, last_block))| {
130            let result = compress_block(&block, crc);
131            tx.send((result, i, last_block)).unwrap();
132        });
133    let joined =  handle.join();
134    info!("RX: Thread returned {:?}", joined);
135    Ok(())
136}