1use super::chunks::*;
2use byteorder::{BigEndian, ByteOrder};
3use std::collections::HashMap;
4use std::convert::TryInto;
5use std::fs::File;
6use std::io::Error;
7use std::io::{BufReader, BufWriter, ErrorKind, Read, Write};
8use std::thread;
9use crossbeam_channel::{bounded, Sender, Receiver};
10use crossbeam_utils::sync::WaitGroup;
11
12const ENTRIES_PER_CHUNK: u32 = 100_000;
13
14#[derive(Debug)]
15struct ThreadManager<T1, T2> {
16 pub sender_work: Sender<T1>,
17 pub receiver_work: Receiver<T1>,
18 pub sender_result: Sender<T2>,
19 pub receiver_result: Receiver<T2>,
20 pub wg: WaitGroup,
21 pub threads_started: bool,
22}
23
24#[derive(Debug)]
25pub struct BDFReader {
26 reader: BufReader<File>,
27 pub metadata: Option<MetaChunk>,
28 pub lookup_table: Option<HashLookupTable>,
29 compressed: bool,
30 thread_manager: ThreadManager<GenericChunk, GenericChunk>,
31}
32
33#[derive(Debug)]
34pub struct BDFWriter {
35 writer: BufWriter<File>,
36 metadata: MetaChunk,
37 lookup_table: HashLookupTable,
38 data_entries: Vec<DataEntry>,
39 head_written: bool,
40 compressed: bool,
41 compression_level: u32,
42 thread_manager: ThreadManager<GenericChunk, Vec<u8>>,
43}
44
45impl<T1, T2> ThreadManager<T1, T2> {
46 pub fn new(cap: usize) -> Self {
49 let (s1, r1) = bounded(cap);
50 let (s2, r2) = bounded(cap);
51 Self {
52 sender_work: s1,
53 receiver_work: r1,
54 sender_result: s2,
55 receiver_result: r2,
56 wg: WaitGroup::new(),
57 threads_started: false,
58 }
59 }
60
61 pub fn drop_sender(&mut self) {
63 let sender = self.sender_work.clone();
64 let (s1, _) = bounded(0);
65 self.sender_work = s1;
66 drop(sender);
67 }
68
69 pub fn drop_sender_result(&mut self) {
71 let sender = self.sender_result.clone();
72 let (s2,_) = bounded(0);
73 self.sender_result = s2;
74 drop(sender);
75 }
76
77 pub fn wait(&mut self) {
79 let wg = self.wg.clone();
80 self.wg = WaitGroup::new();
81 wg.wait();
82 }
83}
84
85impl BDFWriter {
86 pub fn new(inner: File, entry_count: u64, compress: bool) -> Self {
95 Self {
96 metadata: MetaChunk::new(entry_count, ENTRIES_PER_CHUNK, compress),
97 lookup_table: HashLookupTable::new(HashMap::new()),
98 data_entries: Vec::new(),
99 writer: BufWriter::new(inner),
100 head_written: false,
101 compressed: compress,
102 compression_level: 1,
103 thread_manager: ThreadManager::new(num_cpus::get()),
104 }
105 }
106
107 fn start_threads(&self) {
109 for _ in 0..num_cpus::get() {
110 let compress = self.compressed;
111 let compression_level = self.compression_level;
112 thread::spawn({
113 let r = self.thread_manager.receiver_work.clone();
114 let s = self.thread_manager.sender_result.clone();
115 let wg: WaitGroup = self.thread_manager.wg.clone();
116 move || {
117 for mut chunk in r {
118 if compress {
119 chunk.compress(compression_level).expect("failed to compress chunk");
120 }
121 s.send(chunk.serialize()).expect("failed to send result");
122 }
123 drop(wg);
124 drop(s);
125 }
126 });
127 }
128 }
129
130 pub fn add_lookup_entry(&mut self, mut entry: HashEntry) -> Result<u32, Error> {
133 if self.head_written {
134 return Err(Error::new(
135 ErrorKind::Other,
136 "the head has already been written",
137 ));
138 }
139 let id = self.lookup_table.entries.len() as u32;
140 entry.id = id;
141 self.lookup_table.entries.insert(id, entry);
142
143 Ok(id)
144 }
145
146 pub fn add_data_entry(&mut self, data_entry: DataEntry) -> Result<(), Error> {
150 self.data_entries.push(data_entry);
151 if self.data_entries.len() >= self.metadata.entries_per_chunk as usize {
152 self.flush()?;
153 }
154
155 Ok(())
156 }
157
158 fn flush(&mut self) -> Result<(), Error> {
160 if !self.head_written {
161 self.writer.write(BDF_HDR)?;
162 let mut generic_meta = GenericChunk::from(&self.metadata);
163 self.writer.write(generic_meta.serialize().as_slice())?;
164 let mut generic_lookup = GenericChunk::from(&self.lookup_table);
165 self.writer.write(generic_lookup.serialize().as_slice())?;
166 self.head_written = true;
167 }
168 if !self.thread_manager.threads_started {
169 self.start_threads();
170 self.thread_manager.threads_started = true;
171 }
172 let data_chunk =
173 GenericChunk::from_data_entries(&self.data_entries, &self.lookup_table);
174 self.thread_manager.sender_work.send(data_chunk).expect("failed to send work to threads");
175 self.write_serialized()?;
176 self.data_entries = Vec::new();
177
178 Ok(())
179 }
180
181 fn write_serialized(&mut self) -> Result<(), Error> {
182 while let Ok(data) = self.thread_manager.receiver_result.try_recv() {
183 self.writer.write(data.as_slice())?;
184 }
185
186 Ok(())
187 }
188
189 fn flush_writer(&mut self) -> Result<(), Error> {
192 self.writer.flush()
193 }
194
195 pub fn finish(&mut self) -> Result<(), Error> {
198 self.flush()?;
199 self.thread_manager.drop_sender();
200 self.thread_manager.wait();
201 self.write_serialized()?;
202 self.flush_writer()?;
203
204 Ok(())
205 }
206
207 pub fn set_compression_level(&mut self, level: u32) {
209 self.compression_level = level;
210 }
211
212 pub fn set_entries_per_chunk(&mut self, number: u32) -> Result<(), Error> {
215 if self.head_written {
216 return Err(Error::new(
217 ErrorKind::Other,
218 "the head has already been written",
219 ));
220 }
221 self.metadata.entries_per_chunk = number;
222 self.metadata.chunk_count =
223 (self.metadata.entry_count as f64 / number as f64).ceil() as u32;
224 Ok(())
225 }
226}
227
228impl BDFReader {
229 pub fn new(inner: File) -> Self {
231 Self {
232 metadata: None,
233 lookup_table: None,
234 reader: BufReader::new(inner),
235 compressed: false,
236 thread_manager: ThreadManager::new(num_cpus::get() * 2),
237 }
238 }
239
240 pub fn read_start(&mut self) -> Result<(), Error> {
242 self.read_metadata()?;
243 self.read_lookup_table()?;
244
245 Ok(())
246 }
247
248 fn start_threads(&mut self) {
250 for _ in 0..(num_cpus::get() as f32/2f32).max(1f32) as usize {
251 thread::spawn({
252 let r = self.thread_manager.receiver_work.clone();
253 let s = self.thread_manager.sender_result.clone();
254 let wg = self.thread_manager.wg.clone();
255 move || {
256 for mut chunk in r {
257 chunk.decompress().expect("failed to decompress chunk");
258 s.send(chunk).expect("failed to send decompression result");
259 }
260 drop(wg);
261 }
262 });
263 }
264 for _ in 0..num_cpus::get() * 2 {
267 if let Err(_) = self.add_compression_chunk() {
268 self.thread_manager.drop_sender();
269 break;
270 }
271 }
272 self.thread_manager.drop_sender_result();
273 }
274
275 pub fn add_compression_chunk(&mut self) -> Result<(), Error> {
277 let gen_chunk = self.next_chunk_raw()?;
278 if gen_chunk.name == DTBL_CHUNK_NAME.to_string() && self.compressed {
279 if let Err(_) = self.thread_manager.sender_work.send(gen_chunk) {
280 return Err(Error::new(ErrorKind::Other, "failed to send chunk data"))
281 }
282 }
283
284 Ok(())
285 }
286
287 pub fn read_metadata(&mut self) -> Result<&MetaChunk, Error> {
289 if !self.validate_header() {
290 return Err(Error::new(ErrorKind::InvalidData, "invalid BDF Header"));
291 }
292 let meta_chunk: MetaChunk = self.next_chunk_raw()?.try_into()?;
293 if let Some(method) = &meta_chunk.compression_method {
294 if *method == LZMA.to_string() {
295 self.compressed = true;
296 } else {
297 return Err(Error::new(
298 ErrorKind::Other,
299 "unsupported compression method",
300 ));
301 }
302 }
303 self.metadata = Some(meta_chunk);
304
305 if let Some(chunk) = &self.metadata {
306 Ok(&chunk)
307 } else {
308 Err(Error::new(
309 ErrorKind::Other,
310 "Failed to read self assigned metadata.",
311 ))
312 }
313 }
314
315 pub fn read_lookup_table(&mut self) -> Result<&HashLookupTable, Error> {
318 match &self.metadata {
319 None => self.read_metadata()?,
320 Some(t) => t,
321 };
322 let lookup_table: HashLookupTable = self.next_chunk_raw()?.try_into()?;
323 self.lookup_table = Some(lookup_table);
324
325 if self.compressed {
326 self.start_threads();
327 }
328
329 if let Some(chunk) = &self.lookup_table {
330 Ok(&chunk)
331 } else {
332 Err(Error::new(
333 ErrorKind::Other,
334 "failed to read self assigned chunk",
335 ))
336 }
337 }
338
339 fn validate_header(&mut self) -> bool {
341 let mut header = [0u8; 11];
342 let _ = self.reader.read(&mut header);
343
344 header == BDF_HDR.as_ref()
345 }
346
347 pub fn next_chunk(&mut self) -> Result<GenericChunk, Error> {
349 if self.compressed {
350 if let Err(_) = self.add_compression_chunk() {
351 self.thread_manager.drop_sender();
352 }
353 if let Ok(chunk) = self.thread_manager.receiver_result.recv() {
354 Ok(chunk)
355 } else {
356 Err(Error::new(ErrorKind::Other, "failed to get chunk"))
357 }
358 } else {
359 self.next_chunk_raw()
360 }
361 }
362
363 fn next_chunk_raw(&mut self) -> Result<GenericChunk, Error> {
365 let mut length_raw = [0u8; 4];
366 let _ = self.reader.read_exact(&mut length_raw)?;
367 let length = BigEndian::read_u32(&mut length_raw);
368 let mut name_raw = [0u8; 4];
369 let _ = self.reader.read_exact(&mut name_raw)?;
370 let name = String::from_utf8(name_raw.to_vec()).expect("Failed to parse name string.");
371 let mut data = vec![0u8; length as usize];
372 let _ = self.reader.read_exact(&mut data)?;
373 let mut crc_raw = [0u8; 4];
374 let _ = self.reader.read_exact(&mut crc_raw)?;
375 let crc = BigEndian::read_u32(&mut crc_raw);
376
377 Ok(GenericChunk {
378 length,
379 name,
380 data,
381 crc,
382 })
383 }
384}