bzip2_os/bitstream/
bitwriter.rs

1//! BitWriter assembles the packed bitstreams produced by BitPacker into one continuous stream for the output.
2//!
3//! The original version of BZIP2, being single-threaded, was able to write the bitstream from start to finish.
4//! This multi-threaded version is designed so that each block can pass huffman encoded (compressed) data to this 
5//! final aggregator. This then will assemble those blocks in the proper order and write the continuous output stream.
6//!
7//! This module is the aggregator. It takes the blocks packed by BitPacker, removes the padding, and writes the
8//! blocks sequentially as a continuous stream of bits to the output device. It will write blocks as soon as they become
9//! available in the correct sequence. 
10//! 
11//! For example, if there are four threads producing blocks, and the blocks arrive in the order: 4, 2, 1, 3, then the
12//! aggregator will wait until block 1 arrives, write block 1, then block 2, then wait for block 3 and write that followed by 4.
13//!
14//! This module is called from a thread that is spawned before any block compression threads are spawned in order to ensure that it can catch
15//! the earliest blocks that are compressed. 
16//!
17//! NOTE: Error handling when encountering bad IO is **not** yet well implemented.
18//!
19use crate::tools::crc::do_stream_crc;
20
21/// Writes a bitstream for output. Takes the blocks packed by BitPacker and assembles them with
22/// the stream header and footer, calculating the stream CRC as it processes the blocks.
23pub struct BitWriter {
24    /// Output buffer used to write the bitstream.
25    output: Vec<u8>,
26    /// Private queue to hold bits that are waiting to be put as bytes into the output buffer.
27    queue: u64,
28    /// Count of valid bits in the queue.
29    q_bits: u8,
30
31    /// Handle to the output stream
32    writer: Box<dyn std::io::Write + std::marker::Sync + std::marker::Send>,
33    /// Block size, needed to create the header.
34    block_size: u8,
35    /// Stream CRC, calculated from each block crc and added to the stream footer.
36    stream_crc: u32,
37}
38
39impl BitWriter {
40    /// Create a new Bitwriter with an output buffer of size specified. We need the block size.
41    /// to create the header. Use add_block() to add each block to the stream.
42    pub fn new(filepath: &str, mut block_size: u8) -> Self {
43        // Abort if the filepath is empty
44        if filepath.is_empty() {
45            panic!("Filepath cannot be empty when compressing a file");
46        }
47        // Ensure that the block size is valid
48        let result = std::fs::File::create(filepath);
49        if block_size > 9 {
50            block_size = 9;
51        }
52        // Open the output device for writing and initialize the struct
53        Self {
54            writer: match result {
55                Ok(file) => Box::new(file),
56                Err(_) => Box::new(std::io::stdout()),
57            },
58            output: Vec::with_capacity(block_size as usize * 100000),
59            queue: 0,
60            q_bits: 0,
61            block_size,
62            stream_crc: 0,
63        }
64    }
65
66    /// Push the stream header to output buffer.
67    fn push_header(&mut self) {
68        // First write file stream header onto the stream
69        let magic = "BZh".as_bytes();
70        magic.iter().for_each(|&x| self.out8(x));
71        self.out8(self.block_size + 0x30);
72    }
73
74    /// Add a block of data to the output. The block is assumed to be packed by BitPacker. "last"
75    /// indicates if the block is the last block in the file. "padding" indicates how many
76    /// trailing zeros were added to the last byte of the block to make it a multiple of 8 bits.
77    pub fn add_block(
78        &mut self,
79        last: bool,
80        data: &[u8],
81        padding: u8,
82    ) -> Result<usize, std::io::Error> {
83        // If this is the first block, write the header
84        if self.stream_crc == 0 {
85            self.push_header()
86        };
87
88        // Get the CRC from this block
89        let block_crc = u32::from_be_bytes(data[6..10].try_into().unwrap());
90        // Update the stream crc
91        self.stream_crc = do_stream_crc(self.stream_crc, block_crc);
92
93        // Write all the block data
94        data.iter().for_each(|&x| self.out8(x));
95
96        // Back up the queue to remove any padding on the last byte
97        if padding > 0 {
98            self.queue >>= padding as u64;
99            self.q_bits -= padding
100        }
101
102        // If this is the last block, add the footer to the queue.
103        if last {
104            // First the stream footer magic, then the block_crc
105            let magic = [0x17, 0x72, 0x45, 0x38, 0x50, 0x90];
106            magic.iter().for_each(|&x| self.out8(x));
107            // Write the stream crc
108            self.out8((self.stream_crc >> 24) as u8);
109            self.out8((self.stream_crc >> 16) as u8);
110            self.out8((self.stream_crc >> 8) as u8);
111            self.out8((self.stream_crc) as u8);
112
113            // Now flush the queue
114            self.flush();
115
116            // And write out the remaining (flushed) data in the bitstream buffer.
117            let result = self.writer.write(&self.output);
118
119            // Drain what we wrote from the buffer
120            if let Ok(written) = result {
121                self.output.drain(..written);
122            }
123            result
124        } else {
125            // Write out the data in the bitstream buffer. The queue will carry over to the next block.
126            let result = self.writer.write(&self.output);
127            if result.is_ok() {
128                self.output.drain(..result.as_ref().unwrap());
129            }
130            result
131        }
132    }
133
134    /// Internal bitstream write function common to all out.XX functions.
135    fn push_queue(&mut self) {
136        // If the queue has less than 8 bits left, write all full bytes to the output buffer.
137        if self.q_bits > 56 {
138            while self.q_bits > 7 {
139                let byte = (self.queue >> (self.q_bits - 8)) as u8;
140                self.output.push(byte); //push the packed byte out
141                self.q_bits -= 8; //adjust the count of bits left in the queue
142            }
143        }
144    }
145
146    /// Put a byte of pre-packed binary encoded data on the stream.
147    fn out8(&mut self, data: u8) {
148        // Make sure the queue is empty enough to hold the data
149        self.push_queue();
150        self.queue <<= 8; //shift queue by one byte
151        self.queue |= data as u64; //add the byte to queue
152        self.q_bits += 8; //update depth of queue bits
153    }
154
155    /// Flushes the remaining bits (1-7) from the buffer, padding with 0s in the least
156    /// signficant bits. Flush MUST be called before reading the output or data may be
157    /// left in the internal queue.
158    fn flush(&mut self) {
159        // First push out all the full bytes
160        while self.q_bits > 7 {
161            let byte = (self.queue >> (self.q_bits - 8)) as u8;
162            self.output.push(byte); //push the packed byte out
163            self.q_bits -= 8; //adjust the count of bits left in the queue
164        }
165        // Then push out the remaining bits
166        if self.q_bits > 0 {
167            let mut byte = (self.queue & (0xff >> (8 - self.q_bits)) as u64) as u8;
168            byte <<= 8 - self.q_bits;
169            self.output.push(byte); //push the packed byte out
170            self.q_bits = 0; //adjust the count of bits left in the queue
171        }
172    }
173}
174
175#[cfg(test)]
176mod test {
177    use super::BitWriter;
178
179    #[test]
180    fn out8_test() {
181        let mut bw = BitWriter::new("", 1);
182        let data = 'x' as u8;
183        bw.out8(data);
184        bw.flush();
185        let out = bw.output;
186        assert_eq!(out, "x".as_bytes());
187    }
188
189    #[test]
190    fn last_bits_test_1() {
191        let mut bw = BitWriter::new("", 1);
192        bw.out8(255);
193        bw.out8(1);
194        bw.out8(128);
195        bw.out8(255);
196        bw.out8(7 << 5);
197        bw.flush();
198        let out = bw.output;
199        assert_eq!(out, vec![255, 1, 128, 255, 224]);
200    }
201
202    #[test]
203    fn out24_short_test() {
204        let mut bw = BitWriter::new("", 100);
205        bw.out8(255);
206        bw.out8(6 << 5);
207        bw.flush();
208        let out2 = &bw.output;
209        assert_eq!(out2, &[0b1111_1111, 0b1100_0000]); // Note: '33' is data from previous call
210    }
211}