blkar_lib/
sbx_container_content.rs

1use crate::data_block_buffer::{BlockArrangement, DataBlockBuffer, InputType, OutputType, Slot};
2use crate::file_reader::{FileReader, FileReaderParam};
3use crate::general_error::Error;
4use crate::hash_stats::HashStats;
5use crate::json_printer::JSONPrinter;
6use crate::multihash::*;
7use crate::progress_report::{PRVerbosityLevel, ProgressReporter};
8use crate::sbx_block;
9use crate::sbx_block::Block;
10use crate::sbx_specs::ver_to_data_size;
11use std::io::SeekFrom;
12use std::sync::atomic::AtomicBool;
13use std::sync::mpsc::channel;
14use std::sync::mpsc::sync_channel;
15use std::sync::Barrier;
16use std::sync::{Arc, Mutex};
17use std::thread;
18
19const PIPELINE_BUFFER_IN_ROTATION: usize = 2;
20
21pub fn hash(
22    json_printer: &JSONPrinter,
23    pr_verbosity_level: PRVerbosityLevel,
24    data_par_burst: Option<(usize, usize, usize)>,
25    ctrlc_stop_flag: &Arc<AtomicBool>,
26    in_file: &str,
27    orig_file_size: u64,
28    ref_block: &Block,
29    mut hash_ctx: hash::Ctx,
30) -> Result<(HashStats, HashBytes), Error> {
31    let stats = Arc::new(Mutex::new(HashStats::new(orig_file_size)));
32
33    let version = ref_block.get_version();
34
35    let data_chunk_size = ver_to_data_size(version) as u64;
36
37    let mut reader = FileReader::new(
38        &in_file,
39        FileReaderParam {
40            write: false,
41            buffered: false,
42        },
43    )?;
44
45    let reporter = Arc::new(ProgressReporter::new(
46        &stats,
47        "Stored data hashing progress",
48        "bytes",
49        pr_verbosity_level,
50        json_printer.json_enabled(),
51    ));
52
53    let header_pred = header_pred_same_ver_uid!(ref_block);
54
55    let (to_hasher, from_reader) = sync_channel(PIPELINE_BUFFER_IN_ROTATION + 1);
56    let (to_reader, from_hasher) = sync_channel(PIPELINE_BUFFER_IN_ROTATION + 1);
57    let (error_tx_reader, error_rx) = channel::<Error>();
58    let (hash_bytes_tx, hash_bytes_rx) = channel();
59
60    let worker_shutdown_barrier = Arc::new(Barrier::new(2));
61
62    // push buffers into pipeline
63    let buffers = DataBlockBuffer::new_multi(
64        version,
65        None,
66        InputType::Block(BlockArrangement::OrderedButSomeMayBeMissing),
67        OutputType::Disabled,
68        data_par_burst,
69        true,
70        false,
71        PIPELINE_BUFFER_IN_ROTATION,
72    );
73
74    for buffer in buffers.into_iter() {
75        to_reader.send(Some(buffer)).unwrap();
76    }
77
78    reporter.start();
79
80    let reader_thread = {
81        let ctrlc_stop_flag = Arc::clone(ctrlc_stop_flag);
82        let shutdown_barrier = Arc::clone(&worker_shutdown_barrier);
83        let stats = Arc::clone(&stats);
84
85        thread::spawn(move || {
86            let mut run = true;
87            let mut seq_num = 1;
88
89            let mut bytes_processed: u64 = 0;
90            let total_bytes = stats.lock().unwrap().total_bytes;
91
92            while let Some(mut buffer) = from_hasher.recv().unwrap() {
93                if !run {
94                    break;
95                }
96
97                while !buffer.is_full() {
98                    stop_run_if_atomic_bool!(run => ctrlc_stop_flag);
99
100                    let pos = sbx_block::calc_data_block_write_pos(
101                        version,
102                        seq_num,
103                        None,
104                        data_par_burst,
105                    );
106
107                    stop_run_if_error!(run => error_tx_reader => reader.seek(SeekFrom::Start(pos)));
108
109                    let Slot {
110                        block,
111                        slot,
112                        read_pos: _,
113                        content_len_exc_header,
114                    } = buffer.get_slot().unwrap();
115                    match reader.read(slot) {
116                        Ok(read_res) => {
117                            let decode_successful = !read_res.eof_seen
118                                && match block.sync_from_buffer(slot, Some(&header_pred), None) {
119                                    Ok(_) => block.get_seq_num() == seq_num,
120                                    _ => false,
121                                };
122
123                            let bytes_remaining = total_bytes - bytes_processed;
124
125                            if sbx_block::seq_num_is_parity_w_data_par_burst(
126                                seq_num,
127                                data_par_burst,
128                            ) {
129                                buffer.cancel_slot();
130                            } else {
131                                if decode_successful {
132                                    let is_last_data_block = bytes_remaining <= data_chunk_size;
133
134                                    let cur_block_bytes_processed = if is_last_data_block {
135                                        bytes_remaining
136                                    } else {
137                                        data_chunk_size
138                                    };
139
140                                    bytes_processed += cur_block_bytes_processed as u64;
141
142                                    if is_last_data_block {
143                                        *content_len_exc_header = Some(bytes_remaining as usize);
144                                        run = false;
145                                        break;
146                                    }
147                                } else {
148                                    stop_run_forward_error!(run => error_tx_reader => Error::with_msg("Failed to decode data block"));
149                                }
150                            }
151
152                            incre_or_stop_run_if_last!(run => seq_num => seq_num);
153                        }
154                        Err(e) => stop_run_forward_error!(run => error_tx_reader => e),
155                    }
156                }
157
158                {
159                    let mut stats = stats.lock().unwrap();
160
161                    stats.bytes_processed = bytes_processed;
162                }
163
164                to_hasher.send(Some(buffer)).unwrap();
165            }
166
167            worker_shutdown!(to_hasher, shutdown_barrier);
168        })
169    };
170
171    let hasher_thread = {
172        let shutdown_barrier = Arc::clone(&worker_shutdown_barrier);
173
174        thread::spawn(move || {
175            while let Some(mut buffer) = from_reader.recv().unwrap() {
176                buffer.hash(&mut hash_ctx);
177
178                buffer.reset();
179
180                to_reader.send(Some(buffer)).unwrap();
181            }
182
183            hash_bytes_tx
184                .send(hash_ctx.finish_into_hash_bytes())
185                .unwrap();
186
187            worker_shutdown!(to_reader, shutdown_barrier);
188        })
189    };
190
191    reader_thread.join().unwrap();
192    hasher_thread.join().unwrap();
193
194    if let Ok(err) = error_rx.try_recv() {
195        return Err(err);
196    }
197
198    reporter.stop();
199
200    let stats = stats.lock().unwrap().clone();
201
202    Ok((stats, hash_bytes_rx.recv().unwrap()))
203}