extern crate chrono;
extern crate crypto;
extern crate data_encoding;
extern crate log;
extern crate rand;
extern crate simplelog;
pub mod crypto_hash {
use chrono::prelude::*;
use crypto::digest::Digest;
use crypto::sha3::Sha3;
use crypto::sha3::Sha3Mode;
use data_encoding::{BASE32, BASE64, BASE64URL, HEXLOWER};
use log::*;
use rand::prelude::*;
use std::collections::VecDeque;
use std::fmt;
use std::str::FromStr;
use std::sync::{mpsc,Arc, Mutex};
use std::thread;
use std::time::Duration;
use std::thread::JoinHandle;
use std::sync::atomic::{AtomicBool,Ordering};
const NUMBER_OF_FILL_THREAD: usize = 20;
pub struct IdsService {
ids_cache: Arc<Mutex<VecDeque<Vec<u8>>>>,
cache_size: usize,
rand_length: usize,
mode: Sha3Mode,
number_of_threads: usize,
threads_pool: Vec<Option<JoinHandle<()>>>,
stop_state: Arc<AtomicBool>
}
impl Default for IdsService {
fn default() -> Self {
let mode = Sha3Mode::Sha3_256;
let cache_size = 100_000 as usize;
IdsService {
ids_cache: Arc::new(Mutex::new(VecDeque::with_capacity(cache_size))),
rand_length: length_for(mode) * 2,
cache_size,
mode,
number_of_threads: NUMBER_OF_FILL_THREAD,
threads_pool: Vec::with_capacity(NUMBER_OF_FILL_THREAD),
stop_state: Arc::new(AtomicBool::new(false)),
}
}
}
impl fmt::Debug for IdsService {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "IdsService {{ ids_cache nb. ids: {}, cache_size: {}, rand_length : {}, mode : {:?}, number_of_threads : {} }}", self.ids_cache.lock().unwrap().len(), self.cache_size, self.rand_length,self.mode, self.number_of_threads)
}
}
pub trait Encode: Sized {
fn as_hex(&self) -> String {
unimplemented!()
}
fn as_base64(&self) -> String {
unimplemented!()
}
fn as_base64_url(&self) -> String {
unimplemented!()
}
fn as_base32(&self) -> String {
unimplemented!()
}
fn as_json(&self) -> String {
unimplemented!()
}
}
impl Encode for Vec<u8> {
fn as_hex(&self) -> String {
HEXLOWER.encode(self)
}
fn as_base64(&self) -> String {
BASE64.encode(self)
}
fn as_base64_url(&self) -> String {
BASE64URL.encode(self)
}
fn as_base32(&self) -> String {
BASE32.encode(self)
}
fn as_json(&self) -> String {
let mut message : String = String::new();
message.push_str(format!("{{\n\"bytes\" : \"{:?}\",",self).as_str());
message.push_str(format!("\n\"base64\" : \"{}\",",self.as_base64()).as_str());
message.push_str(format!("\n\"base32\" : \"{}\",",self.as_base32()).as_str());
message.push_str(format!("\n\"hex\" : \"0x{}\"\n}}",self.as_hex()).as_str());
message
}
}
impl Iterator for IdsService {
type Item = Vec<u8>;
fn next(&mut self) -> Option<Self::Item> {
Some(self.get_id())
}
}
impl IdsService {
pub fn new(
cache_size: usize,
mode: Sha3Mode,
number_of_threads: Option<usize>,
) -> IdsService {
let rand_length = length_for(mode) * 2;
let threads = match number_of_threads {
Some(n) => n,
None => NUMBER_OF_FILL_THREAD,
};
IdsService {
ids_cache: Arc::new(Mutex::new(VecDeque::with_capacity(cache_size))),
rand_length,
cache_size,
mode,
number_of_threads: threads,
threads_pool: Vec::with_capacity(NUMBER_OF_FILL_THREAD),
stop_state: Arc::new(AtomicBool::new(false)),
}
}
pub fn start(&mut self) {
info!("Start ids_service!");
let mut rng = rand::thread_rng();
self.stop_state.store(false,Ordering::Relaxed);
for i in 0..self.number_of_threads {
let builder = thread::Builder::new().name(format!("{}", i));
let ids_cache_clone = self.ids_cache.clone();
let rand_length = self.rand_length;
let cache_size = self.cache_size;
let mode = self.mode;
let delay_ms: u64 = rng.gen_range(50, 200);
let stop_state_clone = self.stop_state.clone();
let handler = builder
.spawn(move || {
let thread_id: usize =
usize::from_str(thread::current().name().unwrap()).unwrap();
trace!("Thread id {}: is up", thread_id);
loop {
if ids_cache_clone.lock().unwrap().len() < cache_size {
let mut id: Vec<u8> = vec![0u8; length_for(mode)];
create_id(mode, rand_length, id.as_mut());
ids_cache_clone.lock().unwrap().push_back(id);
} else {
trace!("Thread id {}: wait for {} ms", thread_id, delay_ms);
thread::sleep(Duration::from_millis(delay_ms));
}
if stop_state_clone.load(Ordering::Relaxed) {
trace!("Thread id {}: stopped", thread_id);
break;
}
}
})
.unwrap_or_else(|_| panic!("Expect no error from thread {}", i));
self.threads_pool.push(Some(handler));
}
}
pub fn stop(&mut self) {
info!("Stop ids_service!");
self.stop_state.store(true,Ordering::Relaxed);
for handle in &mut self.threads_pool {
if let Some(handle) = handle.take() {
handle.join().unwrap();
}
}
self.ids_cache.lock().unwrap().clear();
}
fn create_id(&mut self, out: &mut Vec<u8>) {
if out.len() < length_for(self.mode) {
warn!("Create an id with vec param out not initialized. out len = {}, out expected len {}",out.len(),length_for(self.mode));
out.clear();
for _ in 0..length_for(self.mode) {
out.push(0u8);
}
}
create_id(self.mode, self.rand_length, out);
}
pub fn get_id(&mut self) -> Vec<u8> {
let id = self.ids_cache.lock().unwrap().pop_front();
match id {
Some(i) => i,
None => {
info!("The ids_cache is empty, create on fly an id");
let mut id: Vec<u8> = vec![0u8; length_for(self.mode)];
self.create_id(id.as_mut());
id
}
}
}
pub fn get_id_from_cache(&mut self) -> Option<Vec<u8>> {
self.ids_cache.lock().unwrap().pop_front()
}
pub fn get_cache_len(&mut self) -> usize {
self.ids_cache.lock().unwrap().len()
}
pub fn set_cache_size(&mut self, new_size: usize) {
self.cache_size = new_size;
info!("Restart ids_service!");
self.stop();
self.start();
}
pub fn is_filled(&self) -> bool {
let len = self.ids_cache.lock().unwrap().len();
len >= self.cache_size
}
pub fn filled_event(&mut self) -> mpsc::Receiver<bool> {
let mut rng = rand::thread_rng();
let filled_chn:(mpsc::Sender<bool>, mpsc::Receiver<bool>) = mpsc::channel();
let sender = filled_chn.0.clone();
let name = format!("filled_event {}", self.threads_pool.len() + 1);
let builder = thread::Builder::new().name(name.clone());
let stop_state_clone = self.stop_state.clone();
let ids_cache_clone = self.ids_cache.clone();
let cache_size = self.cache_size;
let delay_ms: u64 = rng.gen_range(20, 50);
let handler = builder
.spawn(move || {
trace!("Thread id {}: is up", thread::current().name().unwrap());
loop{
if ids_cache_clone.lock().unwrap().len() >= cache_size {
sender.send(true).unwrap();
break
}
if stop_state_clone.load(Ordering::Relaxed) {
sender.send(false).unwrap();
break;
}
thread::sleep(Duration::from_millis(delay_ms));
}
trace!("Thread id {}: stopped", thread::current().name().unwrap());
}).unwrap_or_else(|_| panic!("Expect no error from thread {}", name));
self.threads_pool.push(Some(handler));
filled_chn.1
}
}
fn create_id(mode: Sha3Mode, rand_length: usize, out: &mut Vec<u8>) {
let ts = Utc::now().timestamp_nanos();
let mut rand_data = vec![0u8; rand_length + 8];
rand_data.extend(ts.to_ne_bytes().iter());
for _i in 0..rand_length {
rand_data.push(thread_rng().gen());
}
let mut hasher = Sha3::new(mode);
hasher.input(rand_data.as_slice());
hasher.result(out.as_mut());
}
pub fn create_id_as_sha512() -> String {
let mut id = vec![0u8; Sha3Mode::Sha3_512.digest_length()];
create_id(
Sha3Mode::Sha3_512,
Sha3Mode::Sha3_512.digest_length(),
id.as_mut(),
);
HEXLOWER.encode(id.as_slice())
}
pub fn create_id_as_sha256() -> String {
let mut id = vec![0u8; Sha3Mode::Sha3_256.digest_length()];
create_id(
Sha3Mode::Sha3_256,
Sha3Mode::Sha3_256.digest_length(),
id.as_mut(),
);
HEXLOWER.encode(id.as_slice())
}
fn length_for(mode: Sha3Mode) -> usize {
match mode {
Sha3Mode::Shake256 => 32,
Sha3Mode::Shake128 => 16,
_ => mode.digest_length(),
}
}
#[cfg(test)]
mod tests {
use crate::crypto_hash::Encode;
use crate::crypto_hash::*;
use crypto::sha3::Sha3Mode;
use simplelog::*;
use std::collections::HashSet;
use std::thread;
use std::time::Duration;
#[test]
fn test_start() {
let _ = SimpleLogger::init(LevelFilter::Info, Config::default());
let mut service = IdsService::new(1000, Sha3Mode::Sha3_512, None);
service.start();
loop {
thread::sleep(Duration::from_millis(10));
if service.is_filled() {
break;
}
}
assert!(service.get_cache_len() >= 1000);
}
#[test]
fn test_stop() {
let _ = SimpleLogger::init(LevelFilter::Trace, Config::default());
let mut ids01 = IdsService::new(1000, Sha3Mode::Sha3_512, None);
ids01.start();
ids01.filled_event().recv().is_ok();
debug!("Filled");
ids01.stop();
assert!(true);
}
#[test]
fn test_create_id_512() {
let _ = SimpleLogger::init(LevelFilter::Info, Config::default());
let id1 = create_id_as_sha512();
let id2 = create_id_as_sha512();
assert!(!id1.eq(&id2));
}
#[test]
fn test_create_id_256() {
let _ = SimpleLogger::init(LevelFilter::Info, Config::default());
let id1 = create_id_as_sha256();
let id2 = create_id_as_sha256();
assert!(!id1.eq(&id2));
}
#[test]
fn test_get_id() {
let _ = SimpleLogger::init(LevelFilter::Info, Config::default());
let mut ids01 = IdsService::new(1000, Sha3Mode::Sha3_256, None);
ids01.start();
let r1 = ids01.get_id();
debug!("r1: {:?}", r1);
assert_eq!(r1.len(), Sha3Mode::Sha3_256.digest_length());
let r2 = ids01.get_id();
debug!("r2: {:?}", r2);
assert_eq!(r2.len(), Sha3Mode::Sha3_256.digest_length());
assert!(!r1.eq(&r2));
}
#[test]
fn test_filled() {
let _ = SimpleLogger::init(LevelFilter::Info, Config::default());
let mut ids01 = IdsService::new(1000, Sha3Mode::Sha3_256, None);
ids01.start();
ids01.filled_event().recv().is_ok();
debug!("len: {}",ids01.get_cache_len());
assert!(ids01.is_filled());
}
#[test]
fn test_get_id_from_cache() {
let _ = SimpleLogger::init(LevelFilter::Info, Config::default());
let mut ids01 = IdsService::new(1000, Sha3Mode::Sha3_256, None);
assert!(ids01.get_id_from_cache().is_none());
ids01.start();
ids01.filled_event().recv().is_ok();
debug!("len: {}", ids01.get_cache_len());
let r1 = ids01.get_id_from_cache().expect("Expect an id");
debug!("r1: {:?}", r1);
assert_eq!(r1.len(), Sha3Mode::Sha3_256.digest_length());
let r2 = ids01.get_id_from_cache().expect("Expect an id");
debug!("r2: {:?}", r2);
assert_eq!(r2.len(), Sha3Mode::Sha3_256.digest_length());
assert!(!r1.eq(&r2));
}
#[test]
fn test_get_id_all() {
let _ = SimpleLogger::init(LevelFilter::Info, Config::default());
let do_test = |m: Sha3Mode| {
debug!("mode: {:?}", m);
let mut ids01 = IdsService::new(1000, m, None);
ids01.start();
ids01.filled_event().recv().is_ok();
let r1 = ids01.get_id();
debug!("r1: {:?}", r1);
assert!(r1.len() > 0);
let r2 = ids01.get_id();
debug!("r2: {:?}", r2);
assert!(r2.len() > 0);
assert!(!r1.eq(&r2));
};
do_test(Sha3Mode::Keccak224);
do_test(Sha3Mode::Keccak256);
do_test(Sha3Mode::Keccak384);
do_test(Sha3Mode::Keccak512);
do_test(Sha3Mode::Sha3_224);
do_test(Sha3Mode::Sha3_256);
do_test(Sha3Mode::Sha3_384);
do_test(Sha3Mode::Sha3_512);
do_test(Sha3Mode::Shake128);
do_test(Sha3Mode::Shake256);
}
#[test]
fn test_get_id_as_hex() {
let _ = SimpleLogger::init(LevelFilter::Info, Config::default());
let mut ids01 = IdsService::new(1000, Sha3Mode::Sha3_256, None);
ids01.start();
let result = ids01.get_id().as_hex();
debug!("Id {} / len {}", result, result.len());
assert_eq!(result.len(), 64);
}
#[test]
fn test_get_id_as_base64() {
let _ = SimpleLogger::init(LevelFilter::Info, Config::default());
let mut ids01 = IdsService::new(1000, Sha3Mode::Sha3_256, None);
ids01.start();
let result = ids01.get_id().as_base64();
debug!("Id {} / len {}", result, result.len());
assert_eq!(result.len(), 44);
}
#[test]
fn test_get_id_as_base64_url() {
let _ = SimpleLogger::init(LevelFilter::Info, Config::default());
let mut ids01 = IdsService::new(1000, Sha3Mode::Sha3_256, None);
ids01.start();
let result = ids01.get_id().as_base64_url();
debug!("Id {} / len {}", result, result.len());
assert_eq!(result.len(), 44);
}
#[test]
fn test_get_id_as_base32() {
let _ = SimpleLogger::init(LevelFilter::Info, Config::default());
let mut ids01 = IdsService::new(1000, Sha3Mode::Sha3_256, None);
ids01.start();
let result = ids01.get_id().as_base32();
debug!("Id {} / len {}", result, result.len());
assert_eq!(result.len(), 56);
}
#[test]
fn test_get_id_as_json() {
let _ = SimpleLogger::init(LevelFilter::Info, Config::default());
let mut ids01 = IdsService::new(1000, Sha3Mode::Sha3_256, None);
ids01.start();
let result = ids01.get_id().as_json();
info!("{}", result);
assert!(result.contains("\"bytes\""));
assert!(result.contains("\"base64\""));
assert!(result.contains("\"base32\""));
assert!(result.contains("\"hex\""));
}
#[test]
fn test_iterator() {
let _ = SimpleLogger::init(LevelFilter::Warn, Config::default());
let mut ids01 = IdsService::new(1000, Sha3Mode::Sha3_512, Some(100 as usize));
ids01.start();
ids01.filled_event().recv().is_ok();
let number = 500;
let mut ids = HashSet::with_capacity(number);
for _ in 0..number {
ids.insert(ids01.get_id().as_hex());
}
assert_eq!(ids.len(), number);
ids.clear();
for x in ids01.take(number) {
ids.insert(x.as_hex());
}
assert_eq!(ids.len(), number);
}
}
}