1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
use super::chunks::*;
use byteorder::{BigEndian, ByteOrder};
use std::collections::HashMap;
use std::convert::TryInto;
use std::fs::File;
use std::io::Error;
use std::io::{BufReader, BufWriter, ErrorKind, Read, Write};
use std::thread;
use crossbeam_channel::{bounded, Sender, Receiver};
use crossbeam_utils::sync::WaitGroup;

const ENTRIES_PER_CHUNK: u32 = 100_000;

struct ThreadManager<T1, T2> {
    pub sender_work: Option<Sender<T1>>,
    pub receiver_work: Receiver<T1>,
    pub sender_result: Sender<T2>,
    pub receiver_result: Receiver<T2>,
    pub wg: WaitGroup,
    pub threads_started: bool,
}

pub struct BDFReader {
    reader: BufReader<File>,
    pub metadata: Option<MetaChunk>,
    pub lookup_table: Option<HashLookupTable>,
    compressed: bool,
}

pub struct BDFWriter {
    writer: BufWriter<File>,
    metadata: MetaChunk,
    lookup_table: HashLookupTable,
    data_entries: Vec<DataEntry>,
    head_written: bool,
    compressed: bool,
    compression_level: u32,
    thread_manager: ThreadManager<GenericChunk, Vec<u8>>,
}

impl<T1, T2> ThreadManager<T1, T2> {
    /// Creates a new thread manager to store channels and information
    /// about threads to control them
    pub fn new(cap: usize) -> Self {
        let (s1, r1) = bounded(cap);
        let (s2, r2) = bounded(cap);
        Self {
            sender_work: Some(s1),
            receiver_work: r1,
            sender_result: s2,
            receiver_result: r2,
            wg: WaitGroup::new(),
            threads_started: false,
        }
    }

    /// Drops the sender for work.
    pub fn drop_sender(&mut self) {
        self.sender_work = None;
    }

    /// Waits for the wait group
    pub fn wait(&mut self) {
        let wg = self.wg.clone();
        self.wg = WaitGroup::new();
        wg.wait();
    }
}

impl BDFWriter {
    /// Creates a new BDFWriter.
    /// The number for `entry_count` should be the total number of entries
    /// This is required since the META chunk containing the information is the
    /// first chunk to be written.
    /// The number of entries can be used in tools that provide a progress
    /// bar for how many entries were read.
    /// If the `compress` parameter is true, each data chunk will be compressed
    /// using lzma with a default level of 1.
    pub fn new(inner: File, entry_count: u64, compress: bool) -> Self {
        let thread_manager = ThreadManager::new(num_cpus::get());
        Self {
            metadata: MetaChunk::new(entry_count, ENTRIES_PER_CHUNK, compress),
            lookup_table: HashLookupTable::new(HashMap::new()),
            data_entries: Vec::new(),
            writer: BufWriter::new(inner),
            head_written: false,
            compressed: compress,
            compression_level: 1,
            thread_manager,
        }
    }

    /// Starts threads for parallel chunk compression
    pub fn start_threads(&self) {
        for _ in 0..num_cpus::get() {
            let compress = self.compressed;
            let compression_level = self.compression_level;
            thread::spawn({
                let r = self.thread_manager.receiver_work.clone();
                let s = self.thread_manager.sender_result.clone();
                let wg: WaitGroup = self.thread_manager.wg.clone();
                move || {
                    for mut chunk in r {
                        if compress {
                            chunk.compress(compression_level).expect("failed to compress chunk");
                        }
                        s.send(chunk.serialize()).expect("failed to send result");
                    }
                    drop(wg);
                }
            });
        }
    }

    /// Adds an entry to the hash lookup table
    /// If the lookup table has already been written to the file, an error is returned
    pub fn add_lookup_entry(&mut self, mut entry: HashEntry) -> Result<u32, Error> {
        if self.head_written {
            return Err(Error::new(
                ErrorKind::Other,
                "the head has already been written",
            ));
        }
        let id = self.lookup_table.entries.len() as u32;
        entry.id = id;
        self.lookup_table.entries.insert(id, entry);

        Ok(id)
    }

    /// Adds a data entry to the file.
    /// If the number of entries per chunk is reached,
    /// the data will be written to the file
    pub fn add_data_entry(&mut self, data_entry: DataEntry) -> Result<(), Error> {
        self.data_entries.push(data_entry);
        if self.data_entries.len() >= self.metadata.entries_per_chunk as usize {
            self.flush()?;
        }

        Ok(())
    }

    /// Writes the data to the file
    fn flush(&mut self) -> Result<(), Error> {
        if !self.head_written {
            self.writer.write(BDF_HDR)?;
            let mut generic_meta = GenericChunk::from(&self.metadata);
            self.writer.write(generic_meta.serialize().as_slice())?;
            let mut generic_lookup = GenericChunk::from(&self.lookup_table);
            self.writer.write(generic_lookup.serialize().as_slice())?;
            self.head_written = true;
        }
        if !self.thread_manager.threads_started {
            self.start_threads();
            self.thread_manager.threads_started = true;
        }
        let mut data_chunk =
            GenericChunk::from_data_entries(&self.data_entries, &self.lookup_table);
        if let Some(sender) = &self.thread_manager.sender_work {
            sender.send(data_chunk).expect("failed to send work to threads");
        } else {
            if self.compressed {
                data_chunk.compress(self.compression_level)?;
            }
            self.thread_manager.sender_result.send(data_chunk.serialize()).expect("failed to send serialization result");
        }
        self.write_serialized()?;
        self.data_entries = Vec::new();

        Ok(())
    }

    fn write_serialized(&mut self) -> Result<(), Error> {
        while let Ok(data) = self.thread_manager.receiver_result.try_recv() {
            self.writer.write(data.as_slice())?;
        }

        Ok(())
    }

    /// Flushes the writer
    /// This should be called when no more data is being written
    fn flush_writer(&mut self) -> Result<(), Error> {
        self.writer.flush()
    }

    /// Flushes the buffered chunk data and the writer
    /// to finish the file.
    pub fn finish(&mut self) -> Result<(), Error> {
        self.flush()?;
        self.thread_manager.drop_sender();
        self.thread_manager.wait();
        self.write_serialized()?;
        self.flush_writer()?;

        Ok(())
    }

    /// Sets the compression level for lzma compression
    pub fn set_compression_level(&mut self, level: u32) {
        self.compression_level = level;
    }

    /// Changes the entries per chunk value.
    /// Returns an error if the metadata has already been written.
    pub fn set_entries_per_chunk(&mut self, number: u32) -> Result<(), Error> {
        if self.head_written {
            return Err(Error::new(
                ErrorKind::Other,
                "the head has already been written",
            ));
        }
        self.metadata.entries_per_chunk = number;
        self.metadata.chunk_count =
            (self.metadata.entry_count as f64 / number as f64).ceil() as u32;
        Ok(())
    }
}

impl BDFReader {
    /// Creates a new BDFReader
    pub fn new(inner: File) -> Self {
        Self {
            metadata: None,
            lookup_table: None,
            reader: BufReader::new(inner),
            compressed: false,
        }
    }

    /// Reads the metadata and lookup table
    pub fn read_start(&mut self) -> Result<(), Error> {
        self.read_metadata()?;
        self.read_lookup_table()?;

        Ok(())
    }

    /// Verifies the header of the file and reads and stores the metadata
    pub fn read_metadata(&mut self) -> Result<&MetaChunk, Error> {
        if !self.validate_header() {
            return Err(Error::new(ErrorKind::InvalidData, "invalid BDF Header"));
        }
        let meta_chunk: MetaChunk = self.next_chunk()?.try_into()?;
        if let Some(method) = &meta_chunk.compression_method {
            if *method == LZMA.to_string() {
                self.compressed = true;
            } else {
                return Err(Error::new(
                    ErrorKind::Other,
                    "unsupported compression method",
                ));
            }
        }
        self.metadata = Some(meta_chunk);

        if let Some(chunk) = &self.metadata {
            Ok(&chunk)
        } else {
            Err(Error::new(
                ErrorKind::Other,
                "Failed to read self assigned metadata.",
            ))
        }
    }

    /// Reads the lookup table of the file.
    /// This function should be called after the read_metadata function was called
    pub fn read_lookup_table(&mut self) -> Result<&HashLookupTable, Error> {
        match &self.metadata {
            None => self.read_metadata()?,
            Some(t) => t,
        };
        let lookup_table: HashLookupTable = self.next_chunk()?.try_into()?;
        self.lookup_table = Some(lookup_table);

        if let Some(chunk) = &self.lookup_table {
            Ok(&chunk)
        } else {
            Err(Error::new(
                ErrorKind::Other,
                "failed to read self assigned chunk",
            ))
        }
    }

    /// Validates the header of the file
    fn validate_header(&mut self) -> bool {
        let mut header = [0u8; 11];
        let _ = self.reader.read(&mut header);

        header == BDF_HDR.as_ref()
    }

    /// Returns the next chunk if one is available.
    pub fn next_chunk(&mut self) -> Result<GenericChunk, Error> {
        let mut length_raw = [0u8; 4];
        let _ = self.reader.read_exact(&mut length_raw)?;
        let length = BigEndian::read_u32(&mut length_raw);
        let mut name_raw = [0u8; 4];
        let _ = self.reader.read_exact(&mut name_raw)?;
        let name = String::from_utf8(name_raw.to_vec()).expect("Failed to parse name string.");
        let mut data = vec![0u8; length as usize];
        let _ = self.reader.read_exact(&mut data)?;
        let mut crc_raw = [0u8; 4];
        let _ = self.reader.read_exact(&mut crc_raw)?;
        let crc = BigEndian::read_u32(&mut crc_raw);
        let mut gen_chunk = GenericChunk {
            length,
            name,
            data,
            crc,
        };

        if gen_chunk.name == DTBL_CHUNK_NAME.to_string() && self.compressed {
            gen_chunk.decompress()?;
        }

        Ok(gen_chunk)
    }
}