extern crate sha3;
extern crate data_encoding;
extern crate log;
extern crate rand;
extern crate simplelog;
use data_encoding::{BASE32, BASE64, BASE64URL, HEXLOWER};
use log::*;
use rand::prelude::*;
use std::collections::VecDeque;
use std::fmt;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::thread::JoinHandle;
use std::time::{Duration,SystemTime};
use super::common::Encode;
use super::common::Uids;
use super::common::Service;
use super::common::NUMBER_OF_FILL_THREAD;
use sha3::{Sha3_224,Sha3_256,Sha3_384,Sha3_512,Digest};
pub struct IdsService {
ids_cache: Arc<Mutex<VecDeque<Vec<u8>>>>,
cache_size: usize,
rand_length: usize,
mode: Sha3Mode,
number_of_threads: usize,
threads_pool: Vec<Option<JoinHandle<()>>>,
stop_state: Arc<AtomicBool>,
}
impl Default for IdsService {
fn default() -> Self {
let cache_size = 100_000_usize;
IdsService {
ids_cache: Arc::new(Mutex::new(VecDeque::with_capacity(cache_size))),
rand_length: Sha3Length::from(Sha3Mode::Sha3_256) * 2,
cache_size,
mode: Sha3Mode::Sha3_256,
number_of_threads: *NUMBER_OF_FILL_THREAD,
threads_pool: Vec::with_capacity(*NUMBER_OF_FILL_THREAD),
stop_state: Arc::new(AtomicBool::new(false)),
}
}
}
impl fmt::Debug for IdsService {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "IdsService {{ ids_cache nb. ids: {}, cache_size: {}, rand_length : {}, mode : {:?}, number_of_threads : {} }}", self.ids_cache.lock().unwrap().len(), self.cache_size, self.rand_length,self.mode, self.number_of_threads)
}
}
#[derive(PartialEq, Clone, Copy)]
pub enum Sha3Mode {
Sha3_224,
Sha3_256,
Sha3_384,
Sha3_512,
}
impl fmt::Debug for Sha3Mode {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Sha3Mode::Sha3_224 => write!(f, "SHA3-224"),
Sha3Mode::Sha3_256 => write!(f, "SHA3-256"),
Sha3Mode::Sha3_512 => write!(f, "SHA3-512"),
Sha3Mode::Sha3_384 => write!(f, "SHA3-384"),
}
}
}
#[derive(Clone)]
#[allow(clippy::enum_variant_names)]
enum Sha3Hasher {
Sha3_224(Sha3_224),
Sha3_256(Sha3_256),
Sha3_384(Sha3_384),
Sha3_512(Sha3_512),
}
impl From<Sha3Mode> for Sha3Hasher {
fn from(mode: Sha3Mode) -> Self {
match mode {
Sha3Mode::Sha3_224 => Sha3Hasher::Sha3_224(Sha3_224::new()),
Sha3Mode::Sha3_256 => Sha3Hasher::Sha3_256(Sha3_256::new()),
Sha3Mode::Sha3_384 => Sha3Hasher::Sha3_384(Sha3_384::new()),
Sha3Mode::Sha3_512 => Sha3Hasher::Sha3_512(Sha3_512::new()),
}
}
}
type Sha3Length = usize;
impl From<Sha3Mode> for Sha3Length {
fn from(mode: Sha3Mode) -> Self {
match mode {
Sha3Mode::Sha3_224 => 28_usize,
Sha3Mode::Sha3_256 => 32_usize,
Sha3Mode::Sha3_384 => 48_usize,
Sha3Mode::Sha3_512 => 64_usize,
}
}
}
impl Encode for Vec<u8> {
fn as_hex(&self) -> String {
HEXLOWER.encode(self)
}
fn as_base64(&self) -> String {
BASE64.encode(self)
}
fn as_base64_url(&self) -> String {
BASE64URL.encode(self)
}
fn as_base32(&self) -> String {
BASE32.encode(self)
}
fn as_json(&self) -> String {
let mut message: String = String::new();
message.push_str(format!("{{\n\"bytes\" : \"{:?}\",", self).as_str());
message.push_str(format!("\n\"base64\" : \"{}\",", self.as_base64()).as_str());
message.push_str(format!("\n\"base32\" : \"{}\",", self.as_base32()).as_str());
message.push_str(format!("\n\"hex\" : \"0x{}\"\n}}", self.as_hex()).as_str());
message
}
}
impl Iterator for IdsService {
type Item = Vec<u8>;
fn next(&mut self) -> Option<Self::Item> {
Some(self.get_id())
}
}
impl Uids for IdsService {
type Id = Vec<u8>;
fn get_id(&mut self) -> Self::Id {
let id = self.ids_cache.lock().unwrap().pop_front();
match id {
Some(i) => i,
None => {
info!("The ids_cache is empty, create on fly an id");
let mut id: Vec<u8> = vec![0u8; Sha3Length::from(self.mode)];
self.create_id(id.as_mut());
id
}
}
}
fn get_id_from_cache(&mut self) -> Option<Self::Id> {
self.ids_cache.lock().unwrap().pop_front()
}
}
impl Service for IdsService {
fn start(&mut self) {
info!("Start ids_service!");
let mut rng = thread_rng();
self.stop_state.store(false, Ordering::Relaxed);
for i in 0..self.number_of_threads {
let builder = thread::Builder::new().name(format!("{}", i));
let ids_cache_clone = self.ids_cache.clone();
let cache_size = self.cache_size;
let mode = self.mode;
let delay_ms: u64 = rng.gen_range(50..200);
let stop_state_clone = self.stop_state.clone();
let handler = builder
.spawn(move || {
let thread_id: usize =
usize::from_str(thread::current().name().unwrap()).unwrap();
trace!("Thread id {}: is up", thread_id);
loop {
if ids_cache_clone.lock().unwrap().len() < cache_size {
let id = make_id(mode);
ids_cache_clone.lock().unwrap().push_back(id.to_vec());
} else {
thread::sleep(Duration::from_millis(delay_ms));
}
if stop_state_clone.load(Ordering::Relaxed) {
trace!("Thread id {}: stopped", thread_id);
break;
}
}
})
.unwrap_or_else(|_| panic!("Expect no error from thread {}", i));
self.threads_pool.push(Some(handler));
}
}
fn stop(&mut self) {
info!("Stop ids_service!");
self.stop_state.store(true, Ordering::Relaxed);
for handle in &mut self.threads_pool {
if let Some(handle) = handle.take() {
handle.join().unwrap();
}
}
self.ids_cache.lock().unwrap().clear();
}
}
impl IdsService {
pub fn new(cache_size: usize, mode: Sha3Mode, number_of_threads: Option<usize>) -> IdsService {
let rand_length = Sha3Length::from(mode) * 2;
let threads = match number_of_threads {
Some(n) => n,
None => *NUMBER_OF_FILL_THREAD,
};
IdsService {
ids_cache: Arc::new(Mutex::new(VecDeque::with_capacity(cache_size))),
rand_length,
cache_size,
mode,
number_of_threads: threads,
threads_pool: Vec::with_capacity(*NUMBER_OF_FILL_THREAD),
stop_state: Arc::new(AtomicBool::new(false)),
}
}
fn create_id(&mut self, out: &mut Vec<u8>) {
let length = Sha3Length::from(self.mode);
if out.len() < length {
warn!("Create an id with vec param out not initialized. out len = {}, out expected len {}",out.len(),length);
out.clear();
for _ in 0..length {
out.push(0u8);
}
}
create_id(self.mode, out);
}
pub fn get_cache_len(&mut self) -> usize {
self.ids_cache.lock().unwrap().len()
}
pub fn set_cache_size(&mut self, new_size: usize) {
self.cache_size = new_size;
info!("Restart ids_service!");
self.stop();
self.start();
}
pub fn is_filled(&self) -> bool {
let len = self.ids_cache.lock().unwrap().len();
len >= self.cache_size
}
pub fn filled_event(&mut self) -> mpsc::Receiver<bool> {
self.filled_at_percent_event(100)
}
pub fn filled_at_percent_event(&mut self, percentage : u8) -> mpsc::Receiver<bool> {
let percent = {if percentage>100 {100} else {percentage}};
let filled_chn: (mpsc::Sender<bool>, mpsc::Receiver<bool>) = mpsc::channel();
let sender = filled_chn.0.clone();
let name = format!("filled_event {}", self.threads_pool.len() + 1);
let builder = thread::Builder::new().name(name.clone());
let stop_state_clone = self.stop_state.clone();
let ids_cache_clone = self.ids_cache.clone();
let cache_limit = self.cache_size * percent as usize / 100;
let handler = builder
.spawn(move || {
trace!("Thread id {}: is up", thread::current().name().unwrap());
loop {
if ids_cache_clone.lock().unwrap().len() >= cache_limit {
sender.send(true).unwrap();
break;
}
if stop_state_clone.load(Ordering::Relaxed) {
sender.send(false).unwrap();
break;
}
thread::yield_now();
}
trace!("Thread id {}: stopped", thread::current().name().unwrap());
})
.unwrap_or_else(|_| panic!("Expect no error from thread {}", name));
self.threads_pool.push(Some(handler));
filled_chn.1
}
}
fn make_id(mode: Sha3Mode) -> Vec<u8> {
let mut rand = thread_rng();
let hasher = Sha3Hasher::from(mode);
let len = Sha3Length::from(mode);
let ts = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos();
let mut arr = vec!(0u8; len * 2);
let mut out : Vec<u8> = Vec::with_capacity(len);
rand.try_fill_bytes(&mut arr[..]).expect("Expect a random array");
match hasher {
Sha3Hasher::Sha3_224(mut x) => {
x.update(ts.to_ne_bytes());
x.update(arr.as_slice());
out.extend(x.finalize().iter());
},
Sha3Hasher::Sha3_256(mut x) => {
x.update(ts.to_ne_bytes());
x.update(arr.as_slice());
out.extend(x.finalize().iter());
},
Sha3Hasher::Sha3_384(mut x) => {
x.update(ts.to_ne_bytes());
x.update(arr.as_slice());
out.extend(x.finalize().iter());
},
Sha3Hasher::Sha3_512(mut x) => {
x.update(ts.to_ne_bytes());
x.update(arr.as_slice());
out.extend(x.finalize().iter());
},
};
out
}
fn create_id(mode: Sha3Mode, out: &mut [u8]) {
let result = make_id(mode);
out.copy_from_slice(result.as_slice());
}
pub fn create_id_as_sha512() -> String {
let result = make_id(Sha3Mode::Sha3_512);
HEXLOWER.encode(result.as_slice())
}
pub fn create_id_as_sha256() -> String {
let result = make_id(Sha3Mode::Sha3_256);
HEXLOWER.encode(result.as_slice())
}
#[cfg(test)]
mod tests {
use super::Uids;
use super::Encode;
use simplelog::*;
use log::*;
use std::collections::HashSet;
use crate::common::Service;
use crate::crypto_hash::{Sha3Mode,IdsService,create_id_as_sha512,create_id_as_sha256,Sha3Length};
#[test]
fn aaaa_init() {
println!("Call test init");
let _ = SimpleLogger::init(LevelFilter::Info, Config::default());
assert!(true);
info!("Logger initiallized for tests...");
}
#[test]
fn test_start() {
let mut service = IdsService::new(1000, Sha3Mode::Sha3_512, None);
service.start();
let _ = service.filled_event().recv().is_ok();
assert!(service.get_cache_len() >= 1000);
}
#[test]
fn test_stop() {
let mut ids01 = IdsService::new(1000, Sha3Mode::Sha3_512, None);
ids01.start();
let _ = ids01.filled_event().recv().is_ok();
debug!("Filled");
ids01.stop();
assert!(true);
}
#[test]
fn test_create_id_512() {
let id1 = create_id_as_sha512();
let id2 = create_id_as_sha512();
assert!(!id1.eq(&id2));
}
#[test]
fn test_create_id_256() {
let id1 = create_id_as_sha256();
let id2 = create_id_as_sha256();
assert!(!id1.eq(&id2));
}
#[test]
fn test_get_id() {
let mut ids01 = IdsService::new(1000, Sha3Mode::Sha3_256, None);
ids01.start();
let r1 = ids01.get_id();
debug!("r1: {:?}", r1);
assert_eq!(r1.len(), Sha3Length::from(Sha3Mode::Sha3_256));
let r2 = ids01.get_id();
debug!("r2: {:?}", r2);
assert_eq!(r2.len(), Sha3Length::from(Sha3Mode::Sha3_256));
assert!(!r1.eq(&r2));
}
#[test]
fn test_filled() {
let mut ids01 = IdsService::new(1000, Sha3Mode::Sha3_256, None);
ids01.start();
let _ = ids01.filled_event().recv().is_ok();
debug!("len: {}", ids01.get_cache_len());
assert!(ids01.is_filled());
}
#[test]
fn test_filled_at_percent_event() {
let mut ids01 = IdsService::new(1000, Sha3Mode::Sha3_256, None);
ids01.start();
let _ = ids01.filled_at_percent_event(20).recv().is_ok();
let cache_len = ids01.get_cache_len();
info!("len: {}", cache_len);
assert!(cache_len >= 200);
}
#[test]
fn test_get_id_from_cache() {
let mut ids01 = IdsService::new(1000, Sha3Mode::Sha3_256, None);
assert!(ids01.get_id_from_cache().is_none());
ids01.start();
let _ = ids01.filled_at_percent_event(5).recv().is_ok();
debug!("len: {}", ids01.get_cache_len());
let r1 = ids01.get_id_from_cache().expect("Expect an id");
debug!("r1: {:?}", r1);
assert_eq!(r1.len(), Sha3Length::from(Sha3Mode::Sha3_256));
let r2 = ids01.get_id_from_cache().expect("Expect an id");
debug!("r2: {:?}", r2);
assert_eq!(r2.len(), Sha3Length::from(Sha3Mode::Sha3_256));
assert!(!r1.eq(&r2));
}
#[test]
fn test_get_id_all() {
let do_test = |m: Sha3Mode| {
debug!("mode: {:?}", m);
let mut ids01 = IdsService::new(1000, m, None);
ids01.start();
let _ = ids01.filled_at_percent_event(2).recv().is_ok();
let r1 = ids01.get_id();
debug!("r1: {:?}", r1);
assert!(r1.len() > 0);
let r2 = ids01.get_id();
debug!("r2: {:?}", r2);
assert!(r2.len() > 0);
assert!(!r1.eq(&r2));
};
do_test(Sha3Mode::Sha3_224);
do_test(Sha3Mode::Sha3_256);
do_test(Sha3Mode::Sha3_384);
do_test(Sha3Mode::Sha3_512);
}
#[test]
fn test_get_id_as_hex() {
let mut ids01 = IdsService::new(1000, Sha3Mode::Sha3_256, None);
ids01.start();
let result = ids01.get_id().as_hex();
debug!("Id {} / len {}", result, result.len());
assert_eq!(result.len(), 64);
}
#[test]
fn test_get_id_as_base64() {
let mut ids01 = IdsService::new(1000, Sha3Mode::Sha3_256, None);
ids01.start();
let result = ids01.get_id().as_base64();
debug!("Id {} / len {}", result, result.len());
assert_eq!(result.len(), 44);
}
#[test]
fn test_get_id_as_base64_url() {
let mut ids01 = IdsService::new(1000, Sha3Mode::Sha3_256, None);
ids01.start();
let result = ids01.get_id().as_base64_url();
debug!("Id {} / len {}", result, result.len());
assert_eq!(result.len(), 44);
}
#[test]
fn test_get_id_as_base32() {
let mut ids01 = IdsService::new(1000, Sha3Mode::Sha3_256, None);
ids01.start();
let result = ids01.get_id().as_base32();
debug!("Id {} / len {}", result, result.len());
assert_eq!(result.len(), 56);
}
#[test]
fn test_get_id_as_json() {
let mut ids01 = IdsService::new(1000, Sha3Mode::Sha3_256, None);
ids01.start();
let result = ids01.get_id().as_json();
info!("{}", result);
assert!(result.contains("\"bytes\""));
assert!(result.contains("\"base64\""));
assert!(result.contains("\"base32\""));
assert!(result.contains("\"hex\""));
}
#[test]
fn test_iterator() {
let mut ids01 = IdsService::new(10_000, Sha3Mode::Sha3_512, None);
ids01.start();
let _ = ids01.filled_event().recv().is_ok();
let number = 10_000;
let mut ids = HashSet::with_capacity(number);
for _ in 0..number {
ids.insert(ids01.get_id().as_hex());
}
assert_eq!(ids.len(), number);
ids.clear();
for x in ids01.take(number) {
ids.insert(x.as_hex());
}
assert_eq!(ids.len(), number);
}
}