use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex};
use file_reader::{FileReader,
FileReaderParam};
use sbx_block::{Block,
BlockType};
use file_utils;
use smallvec::SmallVec;
use sbx_block;
use sbx_specs::{ver_to_usize,
ver_uses_rs,
ver_to_block_size,
SBX_LARGEST_BLOCK_SIZE,
SBX_SCAN_BLOCK_SIZE,
SBX_MAX_BURST_ERR_RESISTANCE};
use progress_report::*;
use general_error::Error;
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum RefBlockChoice {
Any,
Prefer(BlockType),
MustBe(BlockType),
}
pub struct LazyReadResult {
pub len_read : usize,
pub usable : bool,
pub eof_seen : bool,
}
struct ScanStats {
pub bytes_processed : u64,
pub total_bytes : u64,
start_time : f64,
end_time : f64,
}
impl ScanStats {
pub fn new(file_size : u64) -> ScanStats {
ScanStats {
bytes_processed : 0,
total_bytes : file_size,
start_time : 0.,
end_time : 0.,
}
}
}
impl ProgressReport for ScanStats {
fn start_time_mut(&mut self) -> &mut f64 { &mut self.start_time }
fn end_time_mut(&mut self) -> &mut f64 { &mut self.end_time }
fn units_so_far(&self) -> u64 { self.bytes_processed }
fn total_units(&self) -> u64 { self.total_bytes }
}
pub fn read_block_lazily(block : &mut Block,
buffer : &mut [u8; SBX_LARGEST_BLOCK_SIZE],
reader : &mut FileReader)
-> Result<LazyReadResult, Error> {
let mut total_len_read = 0;
{ total_len_read += reader.read(&mut buffer[0..SBX_SCAN_BLOCK_SIZE])?.len_read;
if total_len_read < SBX_SCAN_BLOCK_SIZE {
return Ok(LazyReadResult { len_read : total_len_read,
usable : false,
eof_seen : true });
}
match block.sync_from_buffer_header_only(&buffer[0..SBX_SCAN_BLOCK_SIZE]) {
Ok(()) => {},
Err(_) => { return Ok(LazyReadResult { len_read : total_len_read,
usable : false,
eof_seen : false }); }
}
}
{ let block_size = ver_to_block_size(block.get_version());
total_len_read +=
reader.read(&mut buffer[SBX_SCAN_BLOCK_SIZE..block_size])?.len_read;
if total_len_read < block_size {
return Ok(LazyReadResult { len_read : total_len_read,
usable : false,
eof_seen : true });
}
match block.sync_from_buffer(&buffer[0..block_size], None) {
Ok(()) => {},
Err(_) => { return Ok(LazyReadResult { len_read : total_len_read,
usable : false,
eof_seen : false }); }
}
}
Ok(LazyReadResult { len_read : total_len_read,
usable : true,
eof_seen : false })
}
pub fn get_ref_block(in_file : &str,
ref_block_choice : RefBlockChoice,
pr_verbosity_level : PRVerbosityLevel,
json_enabled : bool,
stop_flag : &Arc<AtomicBool>)
-> Result<Option<(u64, Block)>, Error> {
let file_size = file_utils::get_file_size(in_file)?;
let stats = Arc::new(Mutex::new(ScanStats::new(file_size)));
let reporter = ProgressReporter::new(&stats,
"Reference block scanning progress",
"bytes",
pr_verbosity_level,
json_enabled);
let mut buffer : [u8; SBX_LARGEST_BLOCK_SIZE] =
[0; SBX_LARGEST_BLOCK_SIZE];
let mut block = Block::dummy();
let mut meta_block = None;
let mut data_block = None;
let mut reader = FileReader::new(in_file,
FileReaderParam { write : false,
buffered : true })?;
reporter.start();
let mut block_pos : u64;
let mut bytes_processed : u64 = 0;
loop {
break_if_atomic_bool!(stop_flag);
let lazy_read_res = read_block_lazily(&mut block,
&mut buffer,
&mut reader)?;
block_pos = bytes_processed;
bytes_processed += lazy_read_res.len_read as u64;
stats.lock().unwrap().bytes_processed = bytes_processed;
break_if_eof_seen!(lazy_read_res);
if !lazy_read_res.usable { continue; }
match block.block_type() {
BlockType::Meta => {
if let None = meta_block {
meta_block = Some((block_pos, block.clone()));
}
},
BlockType::Data => {
if let None = data_block {
data_block = Some((block_pos, block.clone()));
}
}
}
match ref_block_choice {
RefBlockChoice::Any => {
if let Some(_) = meta_block { break; }
if let Some(_) = data_block { break; }
},
RefBlockChoice::Prefer(bt) |
RefBlockChoice::MustBe(bt) => match bt {
BlockType::Meta => {
if let Some(_) = meta_block { break; }
},
BlockType::Data => {
if let Some(_) = data_block { break; }
}
},
}
}
reporter.stop();
Ok(match ref_block_choice {
RefBlockChoice::Any => match (meta_block, data_block) {
(Some(m), _ ) => Some(m),
(_ , Some(d)) => Some(d),
(None, None ) => None,
},
RefBlockChoice::Prefer(bt) => match bt {
BlockType::Meta => match (meta_block, data_block) {
(Some(m), _ ) => Some(m),
(_ , Some(d)) => Some(d),
(None, None ) => None,
},
BlockType::Data => match (meta_block, data_block) {
(_ , Some(d)) => Some(d),
(Some(m), _ ) => Some(m),
(None, None ) => None,
}
},
RefBlockChoice::MustBe(bt) => match bt {
BlockType::Meta => match (meta_block, data_block) {
(Some(m), _ ) => Some(m),
(_ , _ ) => None,
},
BlockType::Data => match (meta_block, data_block) {
(_ , Some(d)) => Some(d),
(_ , _ ) => None,
}
}
})
}
pub fn guess_burst_err_resistance_level(in_file : &str,
ref_block_pos : u64,
ref_block : &Block)
-> Result<Option<usize>, Error> {
let rs_enabled = ver_uses_rs(ref_block.get_version());
if !rs_enabled { return Ok(None); }
let data_shards = get_RSD_from_ref_block!(ref_block_pos, ref_block, "guess the burst error resistance level");
let parity_shards = get_RSP_from_ref_block!(ref_block_pos, ref_block, "guess the burst error resistance level");
let mut buffer : [u8; SBX_LARGEST_BLOCK_SIZE] =
[0; SBX_LARGEST_BLOCK_SIZE];
let mut block = Block::dummy();
let mut reader = FileReader::new(in_file,
FileReaderParam { write : false,
buffered : true })?;
const BLOCKS_TO_SAMPLE_BASE_NUM : usize = 1024;
let blocks_to_sample =
(1 + parity_shards) + SBX_MAX_BURST_ERR_RESISTANCE + 1;
let mut seq_nums : SmallVec<[Option<u32>; BLOCKS_TO_SAMPLE_BASE_NUM]> =
smallvec![None; blocks_to_sample];
let mut mismatches_for_level : [usize; SBX_MAX_BURST_ERR_RESISTANCE + 1] =
[0; SBX_MAX_BURST_ERR_RESISTANCE + 1];
let mut blocks_processed = 0;
let pred = block_pred_same_ver_uid!(ref_block);
loop {
let read_res = reader.read(sbx_block::slice_buf_mut(ref_block.get_version(),
&mut buffer))?;
break_if_eof_seen!(read_res);
if blocks_processed >= seq_nums.len() { break; }
seq_nums[blocks_processed] =
match block.sync_from_buffer(&buffer, Some(&pred)) {
Ok(()) => Some(block.get_seq_num()),
Err(_) => None,
};
blocks_processed += 1;
}
for level in 0..mismatches_for_level.len() {
for index in 0..seq_nums.len() {
let expected_seq_num =
sbx_block::calc_seq_num_at_index(index as u64,
None,
Some((data_shards,
parity_shards,
level)));
if let Some(seq_num) = seq_nums[index] {
if seq_num != expected_seq_num {
mismatches_for_level[level] += 1;
}
}
}
}
let mut best_guess = 0;
for level in 0..mismatches_for_level.len() {
if mismatches_for_level[level] < mismatches_for_level[best_guess] {
best_guess = level;
}
}
let mut same_as_best_guess = 0;
for level in 0..mismatches_for_level.len() {
if mismatches_for_level[level] == mismatches_for_level[best_guess] {
same_as_best_guess += 1;
}
}
if same_as_best_guess == mismatches_for_level.len() {
Ok(None)
} else {
Ok(Some(best_guess))
}
}