use crate::hasher::{HasherSHA512, HasherCRC, NextHash};
use crate::kdf::kdf;
use std::sync::Arc;
use std::sync::atomic::{AtomicIsize, AtomicBool, Ordering};
use std::sync::mpsc::{channel, Sender, Receiver};
use std::thread;
use std::time::Duration;
#[derive(Copy, Clone, Debug)]
pub enum DtStreamType {
CRC,
SHA512,
}
pub struct DtStreamChunk {
pub index: u64,
pub data: Vec<u8>,
}
fn thread_worker(stype: DtStreamType,
seed: Vec<u8>,
thread_id: u32,
abort: Arc<AtomicBool>,
level: Arc<AtomicIsize>,
tx: Sender<DtStreamChunk>) {
let thread_seed = kdf(&seed, thread_id);
drop(seed);
let mut hasher: Box<dyn NextHash> = match stype {
DtStreamType::SHA512 => Box::new(HasherSHA512::new(&thread_seed)),
DtStreamType::CRC => Box::new(HasherCRC::new(&thread_seed)),
};
let mut index = 0;
while !abort.load(Ordering::Relaxed) {
if level.load(Ordering::Relaxed) < DtStream::LEVEL_THRES {
let mut chunk = DtStreamChunk {
data: Vec::with_capacity(hasher.get_size() * DtStream::CHUNKFACTOR),
index,
};
index += 1;
hasher.next_chunk(&mut chunk.data, DtStream::CHUNKFACTOR);
tx.send(chunk).expect("Worker thread: Send failed.");
level.fetch_add(1, Ordering::Relaxed);
} else {
thread::sleep(Duration::from_millis(10));
}
}
}
pub struct DtStream {
stype: DtStreamType,
seed: Vec<u8>,
thread_id: u32,
rx: Option<Receiver<DtStreamChunk>>,
is_active: bool,
thread_join: Option<thread::JoinHandle<()>>,
abort: Arc<AtomicBool>,
level: Arc<AtomicIsize>,
}
impl DtStream {
const LEVEL_THRES: isize = 8;
pub const CHUNKFACTOR: usize = 1024 * 10;
pub fn new(stype: DtStreamType,
seed: &Vec<u8>,
thread_id: u32) -> DtStream {
let abort = Arc::new(AtomicBool::new(false));
let level = Arc::new(AtomicIsize::new(0));
DtStream {
stype,
seed: seed.to_vec(),
thread_id,
rx: None,
is_active: false,
thread_join: None,
abort,
level,
}
}
fn stop(&mut self) {
self.is_active = false;
self.abort.store(true, Ordering::Release);
if let Some(thread_join) = self.thread_join.take() {
thread_join.join().unwrap();
}
self.abort.store(false, Ordering::Release);
}
fn start(&mut self) {
assert!(!self.is_active);
assert!(self.thread_join.is_none());
self.abort.store(false, Ordering::Release);
self.level.store(0, Ordering::Release);
let (tx, rx) = channel();
self.rx = Some(rx);
let thread_stype = self.stype;
let thread_seed = self.seed.to_vec();
let thread_id = self.thread_id;
let thread_abort = Arc::clone(&self.abort);
let thread_level = Arc::clone(&self.level);
self.thread_join = Some(thread::spawn(move || {
thread_worker(thread_stype,
thread_seed,
thread_id,
thread_abort,
thread_level,
tx);
}));
self.is_active = true;
}
pub fn activate(&mut self) {
self.stop();
self.start();
}
#[inline]
pub fn is_active(&self) -> bool {
self.is_active
}
fn get_hash_size(&self) -> usize {
match self.stype {
DtStreamType::SHA512 => HasherSHA512::OUTSIZE,
DtStreamType::CRC => HasherCRC::OUTSIZE,
}
}
pub fn get_chunk_size(&self) -> usize {
self.get_hash_size() * DtStream::CHUNKFACTOR
}
#[inline]
pub fn get_chunk(&mut self) -> Option<DtStreamChunk> {
if self.is_active() {
if let Some(rx) = &self.rx {
match rx.try_recv() {
Ok(chunk) => {
self.level.fetch_sub(1, Ordering::Relaxed);
Some(chunk)
},
Err(_) => None,
}
} else {
None
}
} else {
None
}
}
}
impl Drop for DtStream {
fn drop(&mut self) {
self.stop();
}
}
#[cfg(test)]
mod tests {
use super::*;
fn run_test(algorithm: DtStreamType) {
let mut s = DtStream::new(algorithm, &vec![1,2,3], 0);
s.activate();
assert_eq!(s.is_active(), true);
assert_eq!(s.get_chunk_size(),
match algorithm {
DtStreamType::SHA512 => HasherSHA512::OUTSIZE * DtStream::CHUNKFACTOR,
DtStreamType::CRC => HasherCRC::OUTSIZE * DtStream::CHUNKFACTOR,
});
let mut results_first = vec![];
let mut count = 0;
while count < 5 {
if let Some(chunk) = s.get_chunk() {
println!("{}: index={} data[0]={} (current level = {})",
count, chunk.index, chunk.data[0], s.level.load(Ordering::Relaxed));
results_first.push(chunk.data[0]);
assert_eq!(chunk.index, count);
count += 1;
} else {
thread::sleep(Duration::from_millis(10));
}
}
match algorithm {
DtStreamType::SHA512 => {
assert_eq!(results_first, vec![226, 143, 221, 30, 59]);
}
DtStreamType::CRC => {
assert_eq!(results_first, vec![132, 133, 170, 226, 104]);
}
}
}
#[test]
fn test_sha512() {
run_test(DtStreamType::SHA512);
}
#[test]
fn test_crc() {
run_test(DtStreamType::CRC);
}
}