bzip2_os/bitstream/bitwriter.rs
1//! BitWriter assembles the packed bitstreams produced by BitPacker into one continuous stream for the output.
2//!
3//! The original version of BZIP2, being single-threaded, was able to write the bitstream from start to finish.
4//! This multi-threaded version is designed so that each block can pass huffman encoded (compressed) data to this
5//! final aggregator. This then will assemble those blocks in the proper order and write the continuous output stream.
6//!
7//! This module is the aggregator. It takes the blocks packed by BitPacker, removes the padding, and writes the
8//! blocks sequentially as a continuous stream of bits to the output device. It will write blocks as soon as they become
9//! available in the correct sequence.
10//!
11//! For example, if there are four threads producing blocks, and the blocks arrive in the order: 4, 2, 1, 3, then the
12//! aggregator will wait until block 1 arrives, write block 1, then block 2, then wait for block 3 and write that followed by 4.
13//!
14//! This module is called from a thread that is spawned before any block compression threads are spawned in order to ensure that it can catch
15//! the earliest blocks that are compressed.
16//!
17//! NOTE: Error handling when encountering bad IO is **not** yet well implemented.
18//!
19use crate::tools::crc::do_stream_crc;
20
21/// Writes a bitstream for output. Takes the blocks packed by BitPacker and assembles them with
22/// the stream header and footer, calculating the stream CRC as it processes the blocks.
23pub struct BitWriter {
24 /// Output buffer used to write the bitstream.
25 output: Vec<u8>,
26 /// Private queue to hold bits that are waiting to be put as bytes into the output buffer.
27 queue: u64,
28 /// Count of valid bits in the queue.
29 q_bits: u8,
30
31 /// Handle to the output stream
32 writer: Box<dyn std::io::Write + std::marker::Sync + std::marker::Send>,
33 /// Block size, needed to create the header.
34 block_size: u8,
35 /// Stream CRC, calculated from each block crc and added to the stream footer.
36 stream_crc: u32,
37}
38
39impl BitWriter {
40 /// Create a new Bitwriter with an output buffer of size specified. We need the block size.
41 /// to create the header. Use add_block() to add each block to the stream.
42 pub fn new(filepath: &str, mut block_size: u8) -> Self {
43 // Abort if the filepath is empty
44 if filepath.is_empty() {
45 panic!("Filepath cannot be empty when compressing a file");
46 }
47 // Ensure that the block size is valid
48 let result = std::fs::File::create(filepath);
49 if block_size > 9 {
50 block_size = 9;
51 }
52 // Open the output device for writing and initialize the struct
53 Self {
54 writer: match result {
55 Ok(file) => Box::new(file),
56 Err(_) => Box::new(std::io::stdout()),
57 },
58 output: Vec::with_capacity(block_size as usize * 100000),
59 queue: 0,
60 q_bits: 0,
61 block_size,
62 stream_crc: 0,
63 }
64 }
65
66 /// Push the stream header to output buffer.
67 fn push_header(&mut self) {
68 // First write file stream header onto the stream
69 let magic = "BZh".as_bytes();
70 magic.iter().for_each(|&x| self.out8(x));
71 self.out8(self.block_size + 0x30);
72 }
73
74 /// Add a block of data to the output. The block is assumed to be packed by BitPacker. "last"
75 /// indicates if the block is the last block in the file. "padding" indicates how many
76 /// trailing zeros were added to the last byte of the block to make it a multiple of 8 bits.
77 pub fn add_block(
78 &mut self,
79 last: bool,
80 data: &[u8],
81 padding: u8,
82 ) -> Result<usize, std::io::Error> {
83 // If this is the first block, write the header
84 if self.stream_crc == 0 {
85 self.push_header()
86 };
87
88 // Get the CRC from this block
89 let block_crc = u32::from_be_bytes(data[6..10].try_into().unwrap());
90 // Update the stream crc
91 self.stream_crc = do_stream_crc(self.stream_crc, block_crc);
92
93 // Write all the block data
94 data.iter().for_each(|&x| self.out8(x));
95
96 // Back up the queue to remove any padding on the last byte
97 if padding > 0 {
98 self.queue >>= padding as u64;
99 self.q_bits -= padding
100 }
101
102 // If this is the last block, add the footer to the queue.
103 if last {
104 // First the stream footer magic, then the block_crc
105 let magic = [0x17, 0x72, 0x45, 0x38, 0x50, 0x90];
106 magic.iter().for_each(|&x| self.out8(x));
107 // Write the stream crc
108 self.out8((self.stream_crc >> 24) as u8);
109 self.out8((self.stream_crc >> 16) as u8);
110 self.out8((self.stream_crc >> 8) as u8);
111 self.out8((self.stream_crc) as u8);
112
113 // Now flush the queue
114 self.flush();
115
116 // And write out the remaining (flushed) data in the bitstream buffer.
117 let result = self.writer.write(&self.output);
118
119 // Drain what we wrote from the buffer
120 if let Ok(written) = result {
121 self.output.drain(..written);
122 }
123 result
124 } else {
125 // Write out the data in the bitstream buffer. The queue will carry over to the next block.
126 let result = self.writer.write(&self.output);
127 if result.is_ok() {
128 self.output.drain(..result.as_ref().unwrap());
129 }
130 result
131 }
132 }
133
134 /// Internal bitstream write function common to all out.XX functions.
135 fn push_queue(&mut self) {
136 // If the queue has less than 8 bits left, write all full bytes to the output buffer.
137 if self.q_bits > 56 {
138 while self.q_bits > 7 {
139 let byte = (self.queue >> (self.q_bits - 8)) as u8;
140 self.output.push(byte); //push the packed byte out
141 self.q_bits -= 8; //adjust the count of bits left in the queue
142 }
143 }
144 }
145
146 /// Put a byte of pre-packed binary encoded data on the stream.
147 fn out8(&mut self, data: u8) {
148 // Make sure the queue is empty enough to hold the data
149 self.push_queue();
150 self.queue <<= 8; //shift queue by one byte
151 self.queue |= data as u64; //add the byte to queue
152 self.q_bits += 8; //update depth of queue bits
153 }
154
155 /// Flushes the remaining bits (1-7) from the buffer, padding with 0s in the least
156 /// signficant bits. Flush MUST be called before reading the output or data may be
157 /// left in the internal queue.
158 fn flush(&mut self) {
159 // First push out all the full bytes
160 while self.q_bits > 7 {
161 let byte = (self.queue >> (self.q_bits - 8)) as u8;
162 self.output.push(byte); //push the packed byte out
163 self.q_bits -= 8; //adjust the count of bits left in the queue
164 }
165 // Then push out the remaining bits
166 if self.q_bits > 0 {
167 let mut byte = (self.queue & (0xff >> (8 - self.q_bits)) as u64) as u8;
168 byte <<= 8 - self.q_bits;
169 self.output.push(byte); //push the packed byte out
170 self.q_bits = 0; //adjust the count of bits left in the queue
171 }
172 }
173}
174
175#[cfg(test)]
176mod test {
177 use super::BitWriter;
178
179 #[test]
180 fn out8_test() {
181 let mut bw = BitWriter::new("", 1);
182 let data = 'x' as u8;
183 bw.out8(data);
184 bw.flush();
185 let out = bw.output;
186 assert_eq!(out, "x".as_bytes());
187 }
188
189 #[test]
190 fn last_bits_test_1() {
191 let mut bw = BitWriter::new("", 1);
192 bw.out8(255);
193 bw.out8(1);
194 bw.out8(128);
195 bw.out8(255);
196 bw.out8(7 << 5);
197 bw.flush();
198 let out = bw.output;
199 assert_eq!(out, vec![255, 1, 128, 255, 224]);
200 }
201
202 #[test]
203 fn out24_short_test() {
204 let mut bw = BitWriter::new("", 100);
205 bw.out8(255);
206 bw.out8(6 << 5);
207 bw.flush();
208 let out2 = &bw.output;
209 assert_eq!(out2, &[0b1111_1111, 0b1100_0000]); // Note: '33' is data from previous call
210 }
211}