bdf/
io.rs

1use super::chunks::*;
2use byteorder::{BigEndian, ByteOrder};
3use std::collections::HashMap;
4use std::convert::TryInto;
5use std::fs::File;
6use std::io::Error;
7use std::io::{BufReader, BufWriter, ErrorKind, Read, Write};
8use std::thread;
9use crossbeam_channel::{bounded, Sender, Receiver};
10use crossbeam_utils::sync::WaitGroup;
11
12const ENTRIES_PER_CHUNK: u32 = 100_000;
13
14#[derive(Debug)]
15struct ThreadManager<T1, T2> {
16    pub sender_work: Sender<T1>,
17    pub receiver_work: Receiver<T1>,
18    pub sender_result: Sender<T2>,
19    pub receiver_result: Receiver<T2>,
20    pub wg: WaitGroup,
21    pub threads_started: bool,
22}
23
24#[derive(Debug)]
25pub struct BDFReader {
26    reader: BufReader<File>,
27    pub metadata: Option<MetaChunk>,
28    pub lookup_table: Option<HashLookupTable>,
29    compressed: bool,
30    thread_manager: ThreadManager<GenericChunk, GenericChunk>,
31}
32
33#[derive(Debug)]
34pub struct BDFWriter {
35    writer: BufWriter<File>,
36    metadata: MetaChunk,
37    lookup_table: HashLookupTable,
38    data_entries: Vec<DataEntry>,
39    head_written: bool,
40    compressed: bool,
41    compression_level: u32,
42    thread_manager: ThreadManager<GenericChunk, Vec<u8>>,
43}
44
45impl<T1, T2> ThreadManager<T1, T2> {
46    /// Creates a new thread manager to store channels and information
47    /// about threads to control them
48    pub fn new(cap: usize) -> Self {
49        let (s1, r1) = bounded(cap);
50        let (s2, r2) = bounded(cap);
51        Self {
52            sender_work: s1,
53            receiver_work: r1,
54            sender_result: s2,
55            receiver_result: r2,
56            wg: WaitGroup::new(),
57            threads_started: false,
58        }
59    }
60
61    /// Drops the sender for work.
62    pub fn drop_sender(&mut self) {
63        let sender = self.sender_work.clone();
64        let (s1, _) = bounded(0);
65        self.sender_work = s1;
66        drop(sender);
67    }
68
69    /// Drops the receiver
70    pub fn drop_sender_result(&mut self) {
71        let sender = self.sender_result.clone();
72        let (s2,_) = bounded(0);
73        self.sender_result = s2;
74        drop(sender);
75    }
76
77    /// Waits for the wait group
78    pub fn wait(&mut self) {
79        let wg = self.wg.clone();
80        self.wg = WaitGroup::new();
81        wg.wait();
82    }
83}
84
85impl BDFWriter {
86    /// Creates a new BDFWriter.
87    /// The number for `entry_count` should be the total number of entries
88    /// This is required since the META chunk containing the information is the
89    /// first chunk to be written.
90    /// The number of entries can be used in tools that provide a progress
91    /// bar for how many entries were read.
92    /// If the `compress` parameter is true, each data chunk will be compressed
93    /// using lzma with a default level of 1.
94    pub fn new(inner: File, entry_count: u64, compress: bool) -> Self {
95        Self {
96            metadata: MetaChunk::new(entry_count, ENTRIES_PER_CHUNK, compress),
97            lookup_table: HashLookupTable::new(HashMap::new()),
98            data_entries: Vec::new(),
99            writer: BufWriter::new(inner),
100            head_written: false,
101            compressed: compress,
102            compression_level: 1,
103            thread_manager: ThreadManager::new(num_cpus::get()),
104        }
105    }
106
107    /// Starts threads for parallel chunk compression
108    fn start_threads(&self) {
109        for _ in 0..num_cpus::get() {
110            let compress = self.compressed;
111            let compression_level = self.compression_level;
112            thread::spawn({
113                let r = self.thread_manager.receiver_work.clone();
114                let s = self.thread_manager.sender_result.clone();
115                let wg: WaitGroup = self.thread_manager.wg.clone();
116                move || {
117                    for mut chunk in r {
118                        if compress {
119                            chunk.compress(compression_level).expect("failed to compress chunk");
120                        }
121                        s.send(chunk.serialize()).expect("failed to send result");
122                    }
123                    drop(wg);
124                    drop(s);
125                }
126            });
127        }
128    }
129
130    /// Adds an entry to the hash lookup table
131    /// If the lookup table has already been written to the file, an error is returned
132    pub fn add_lookup_entry(&mut self, mut entry: HashEntry) -> Result<u32, Error> {
133        if self.head_written {
134            return Err(Error::new(
135                ErrorKind::Other,
136                "the head has already been written",
137            ));
138        }
139        let id = self.lookup_table.entries.len() as u32;
140        entry.id = id;
141        self.lookup_table.entries.insert(id, entry);
142
143        Ok(id)
144    }
145
146    /// Adds a data entry to the file.
147    /// If the number of entries per chunk is reached,
148    /// the data will be written to the file
149    pub fn add_data_entry(&mut self, data_entry: DataEntry) -> Result<(), Error> {
150        self.data_entries.push(data_entry);
151        if self.data_entries.len() >= self.metadata.entries_per_chunk as usize {
152            self.flush()?;
153        }
154
155        Ok(())
156    }
157
158    /// Writes the data to the file
159    fn flush(&mut self) -> Result<(), Error> {
160        if !self.head_written {
161            self.writer.write(BDF_HDR)?;
162            let mut generic_meta = GenericChunk::from(&self.metadata);
163            self.writer.write(generic_meta.serialize().as_slice())?;
164            let mut generic_lookup = GenericChunk::from(&self.lookup_table);
165            self.writer.write(generic_lookup.serialize().as_slice())?;
166            self.head_written = true;
167        }
168        if !self.thread_manager.threads_started {
169            self.start_threads();
170            self.thread_manager.threads_started = true;
171        }
172        let data_chunk =
173            GenericChunk::from_data_entries(&self.data_entries, &self.lookup_table);
174        self.thread_manager.sender_work.send(data_chunk).expect("failed to send work to threads");
175        self.write_serialized()?;
176        self.data_entries = Vec::new();
177
178        Ok(())
179    }
180
181    fn write_serialized(&mut self) -> Result<(), Error> {
182        while let Ok(data) = self.thread_manager.receiver_result.try_recv() {
183            self.writer.write(data.as_slice())?;
184        }
185
186        Ok(())
187    }
188
189    /// Flushes the writer
190    /// This should be called when no more data is being written
191    fn flush_writer(&mut self) -> Result<(), Error> {
192        self.writer.flush()
193    }
194
195    /// Flushes the buffered chunk data and the writer
196    /// to finish the file.
197    pub fn finish(&mut self) -> Result<(), Error> {
198        self.flush()?;
199        self.thread_manager.drop_sender();
200        self.thread_manager.wait();
201        self.write_serialized()?;
202        self.flush_writer()?;
203
204        Ok(())
205    }
206
207    /// Sets the compression level for lzma compression
208    pub fn set_compression_level(&mut self, level: u32) {
209        self.compression_level = level;
210    }
211
212    /// Changes the entries per chunk value.
213    /// Returns an error if the metadata has already been written.
214    pub fn set_entries_per_chunk(&mut self, number: u32) -> Result<(), Error> {
215        if self.head_written {
216            return Err(Error::new(
217                ErrorKind::Other,
218                "the head has already been written",
219            ));
220        }
221        self.metadata.entries_per_chunk = number;
222        self.metadata.chunk_count =
223            (self.metadata.entry_count as f64 / number as f64).ceil() as u32;
224        Ok(())
225    }
226}
227
228impl BDFReader {
229    /// Creates a new BDFReader
230    pub fn new(inner: File) -> Self {
231        Self {
232            metadata: None,
233            lookup_table: None,
234            reader: BufReader::new(inner),
235            compressed: false,
236            thread_manager: ThreadManager::new(num_cpus::get() * 2),
237        }
238    }
239
240    /// Reads the metadata and lookup table
241    pub fn read_start(&mut self) -> Result<(), Error> {
242        self.read_metadata()?;
243        self.read_lookup_table()?;
244
245        Ok(())
246    }
247
248    /// Starts threads for decompressing chunks
249    fn start_threads(&mut self) {
250        for _ in 0..(num_cpus::get() as f32/2f32).max(1f32) as usize {
251            thread::spawn({
252                let r = self.thread_manager.receiver_work.clone();
253                let s = self.thread_manager.sender_result.clone();
254                let wg = self.thread_manager.wg.clone();
255                move || {
256                    for mut chunk in r {
257                        chunk.decompress().expect("failed to decompress chunk");
258                        s.send(chunk).expect("failed to send decompression result");
259                    }
260                    drop(wg);
261                }
262            });
263        }
264        // add some initial data to be decompressed.
265        // the data that is added is four times the number of threads
266        for _ in 0..num_cpus::get() * 2 {
267            if let Err(_) = self.add_compression_chunk() {
268                self.thread_manager.drop_sender();
269                break;
270            }
271        }
272        self.thread_manager.drop_sender_result();
273    }
274
275    /// Adds a chunk to the decompression channel to be decompressed by a worker thread
276    pub fn add_compression_chunk(&mut self) -> Result<(), Error> {
277        let gen_chunk = self.next_chunk_raw()?;
278        if gen_chunk.name == DTBL_CHUNK_NAME.to_string() && self.compressed {
279            if let Err(_) = self.thread_manager.sender_work.send(gen_chunk) {
280                return Err(Error::new(ErrorKind::Other, "failed to send chunk data"))
281            }
282        }
283
284        Ok(())
285    }
286
287    /// Verifies the header of the file and reads and stores the metadata
288    pub fn read_metadata(&mut self) -> Result<&MetaChunk, Error> {
289        if !self.validate_header() {
290            return Err(Error::new(ErrorKind::InvalidData, "invalid BDF Header"));
291        }
292        let meta_chunk: MetaChunk = self.next_chunk_raw()?.try_into()?;
293        if let Some(method) = &meta_chunk.compression_method {
294            if *method == LZMA.to_string() {
295                self.compressed = true;
296            } else {
297                return Err(Error::new(
298                    ErrorKind::Other,
299                    "unsupported compression method",
300                ));
301            }
302        }
303        self.metadata = Some(meta_chunk);
304
305        if let Some(chunk) = &self.metadata {
306            Ok(&chunk)
307        } else {
308            Err(Error::new(
309                ErrorKind::Other,
310                "Failed to read self assigned metadata.",
311            ))
312        }
313    }
314
315    /// Reads the lookup table of the file.
316    /// This function should be called after the read_metadata function was called
317    pub fn read_lookup_table(&mut self) -> Result<&HashLookupTable, Error> {
318        match &self.metadata {
319            None => self.read_metadata()?,
320            Some(t) => t,
321        };
322        let lookup_table: HashLookupTable = self.next_chunk_raw()?.try_into()?;
323        self.lookup_table = Some(lookup_table);
324
325        if self.compressed {
326            self.start_threads();
327        }
328
329        if let Some(chunk) = &self.lookup_table {
330            Ok(&chunk)
331        } else {
332            Err(Error::new(
333                ErrorKind::Other,
334                "failed to read self assigned chunk",
335            ))
336        }
337    }
338
339    /// Validates the header of the file
340    fn validate_header(&mut self) -> bool {
341        let mut header = [0u8; 11];
342        let _ = self.reader.read(&mut header);
343
344        header == BDF_HDR.as_ref()
345    }
346
347    /// Returns the next chunk
348    pub fn next_chunk(&mut self) -> Result<GenericChunk, Error> {
349        if self.compressed {
350            if let Err(_) = self.add_compression_chunk() {
351                self.thread_manager.drop_sender();
352            }
353            if let Ok(chunk) = self.thread_manager.receiver_result.recv() {
354                Ok(chunk)
355            } else {
356                Err(Error::new(ErrorKind::Other, "failed to get chunk"))
357            }
358        } else {
359            self.next_chunk_raw()
360        }
361    }
362
363    /// Returns the next chunk if one is available.
364    fn next_chunk_raw(&mut self) -> Result<GenericChunk, Error> {
365        let mut length_raw = [0u8; 4];
366        let _ = self.reader.read_exact(&mut length_raw)?;
367        let length = BigEndian::read_u32(&mut length_raw);
368        let mut name_raw = [0u8; 4];
369        let _ = self.reader.read_exact(&mut name_raw)?;
370        let name = String::from_utf8(name_raw.to_vec()).expect("Failed to parse name string.");
371        let mut data = vec![0u8; length as usize];
372        let _ = self.reader.read_exact(&mut data)?;
373        let mut crc_raw = [0u8; 4];
374        let _ = self.reader.read_exact(&mut crc_raw)?;
375        let crc = BigEndian::read_u32(&mut crc_raw);
376
377        Ok(GenericChunk {
378            length,
379            name,
380            data,
381            crc,
382        })
383    }
384}