blkar_lib/
sbx_container_content.rs1use crate::data_block_buffer::{BlockArrangement, DataBlockBuffer, InputType, OutputType, Slot};
2use crate::file_reader::{FileReader, FileReaderParam};
3use crate::general_error::Error;
4use crate::hash_stats::HashStats;
5use crate::json_printer::JSONPrinter;
6use crate::multihash::*;
7use crate::progress_report::{PRVerbosityLevel, ProgressReporter};
8use crate::sbx_block;
9use crate::sbx_block::Block;
10use crate::sbx_specs::ver_to_data_size;
11use std::io::SeekFrom;
12use std::sync::atomic::AtomicBool;
13use std::sync::mpsc::channel;
14use std::sync::mpsc::sync_channel;
15use std::sync::Barrier;
16use std::sync::{Arc, Mutex};
17use std::thread;
18
19const PIPELINE_BUFFER_IN_ROTATION: usize = 2;
20
21pub fn hash(
22 json_printer: &JSONPrinter,
23 pr_verbosity_level: PRVerbosityLevel,
24 data_par_burst: Option<(usize, usize, usize)>,
25 ctrlc_stop_flag: &Arc<AtomicBool>,
26 in_file: &str,
27 orig_file_size: u64,
28 ref_block: &Block,
29 mut hash_ctx: hash::Ctx,
30) -> Result<(HashStats, HashBytes), Error> {
31 let stats = Arc::new(Mutex::new(HashStats::new(orig_file_size)));
32
33 let version = ref_block.get_version();
34
35 let data_chunk_size = ver_to_data_size(version) as u64;
36
37 let mut reader = FileReader::new(
38 &in_file,
39 FileReaderParam {
40 write: false,
41 buffered: false,
42 },
43 )?;
44
45 let reporter = Arc::new(ProgressReporter::new(
46 &stats,
47 "Stored data hashing progress",
48 "bytes",
49 pr_verbosity_level,
50 json_printer.json_enabled(),
51 ));
52
53 let header_pred = header_pred_same_ver_uid!(ref_block);
54
55 let (to_hasher, from_reader) = sync_channel(PIPELINE_BUFFER_IN_ROTATION + 1);
56 let (to_reader, from_hasher) = sync_channel(PIPELINE_BUFFER_IN_ROTATION + 1);
57 let (error_tx_reader, error_rx) = channel::<Error>();
58 let (hash_bytes_tx, hash_bytes_rx) = channel();
59
60 let worker_shutdown_barrier = Arc::new(Barrier::new(2));
61
62 let buffers = DataBlockBuffer::new_multi(
64 version,
65 None,
66 InputType::Block(BlockArrangement::OrderedButSomeMayBeMissing),
67 OutputType::Disabled,
68 data_par_burst,
69 true,
70 false,
71 PIPELINE_BUFFER_IN_ROTATION,
72 );
73
74 for buffer in buffers.into_iter() {
75 to_reader.send(Some(buffer)).unwrap();
76 }
77
78 reporter.start();
79
80 let reader_thread = {
81 let ctrlc_stop_flag = Arc::clone(ctrlc_stop_flag);
82 let shutdown_barrier = Arc::clone(&worker_shutdown_barrier);
83 let stats = Arc::clone(&stats);
84
85 thread::spawn(move || {
86 let mut run = true;
87 let mut seq_num = 1;
88
89 let mut bytes_processed: u64 = 0;
90 let total_bytes = stats.lock().unwrap().total_bytes;
91
92 while let Some(mut buffer) = from_hasher.recv().unwrap() {
93 if !run {
94 break;
95 }
96
97 while !buffer.is_full() {
98 stop_run_if_atomic_bool!(run => ctrlc_stop_flag);
99
100 let pos = sbx_block::calc_data_block_write_pos(
101 version,
102 seq_num,
103 None,
104 data_par_burst,
105 );
106
107 stop_run_if_error!(run => error_tx_reader => reader.seek(SeekFrom::Start(pos)));
108
109 let Slot {
110 block,
111 slot,
112 read_pos: _,
113 content_len_exc_header,
114 } = buffer.get_slot().unwrap();
115 match reader.read(slot) {
116 Ok(read_res) => {
117 let decode_successful = !read_res.eof_seen
118 && match block.sync_from_buffer(slot, Some(&header_pred), None) {
119 Ok(_) => block.get_seq_num() == seq_num,
120 _ => false,
121 };
122
123 let bytes_remaining = total_bytes - bytes_processed;
124
125 if sbx_block::seq_num_is_parity_w_data_par_burst(
126 seq_num,
127 data_par_burst,
128 ) {
129 buffer.cancel_slot();
130 } else {
131 if decode_successful {
132 let is_last_data_block = bytes_remaining <= data_chunk_size;
133
134 let cur_block_bytes_processed = if is_last_data_block {
135 bytes_remaining
136 } else {
137 data_chunk_size
138 };
139
140 bytes_processed += cur_block_bytes_processed as u64;
141
142 if is_last_data_block {
143 *content_len_exc_header = Some(bytes_remaining as usize);
144 run = false;
145 break;
146 }
147 } else {
148 stop_run_forward_error!(run => error_tx_reader => Error::with_msg("Failed to decode data block"));
149 }
150 }
151
152 incre_or_stop_run_if_last!(run => seq_num => seq_num);
153 }
154 Err(e) => stop_run_forward_error!(run => error_tx_reader => e),
155 }
156 }
157
158 {
159 let mut stats = stats.lock().unwrap();
160
161 stats.bytes_processed = bytes_processed;
162 }
163
164 to_hasher.send(Some(buffer)).unwrap();
165 }
166
167 worker_shutdown!(to_hasher, shutdown_barrier);
168 })
169 };
170
171 let hasher_thread = {
172 let shutdown_barrier = Arc::clone(&worker_shutdown_barrier);
173
174 thread::spawn(move || {
175 while let Some(mut buffer) = from_reader.recv().unwrap() {
176 buffer.hash(&mut hash_ctx);
177
178 buffer.reset();
179
180 to_reader.send(Some(buffer)).unwrap();
181 }
182
183 hash_bytes_tx
184 .send(hash_ctx.finish_into_hash_bytes())
185 .unwrap();
186
187 worker_shutdown!(to_reader, shutdown_barrier);
188 })
189 };
190
191 reader_thread.join().unwrap();
192 hasher_thread.join().unwrap();
193
194 if let Ok(err) = error_rx.try_recv() {
195 return Err(err);
196 }
197
198 reporter.stop();
199
200 let stats = stats.lock().unwrap().clone();
201
202 Ok((stats, hash_bytes_rx.recv().unwrap()))
203}