extern crate rand;
use super::common::Encode;
use super::common::Service;
use super::common::Uids;
use super::common::NUMBER_OF_FILL_THREAD;
use data_encoding::{BASE32, BASE64, BASE64URL, HEXLOWER};
use log::*;
use rand::prelude::*;
use std::collections::hash_map::DefaultHasher;
use std::collections::VecDeque;
use std::fmt;
use std::hash::Hasher;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::thread::JoinHandle;
use std::time::{Duration, SystemTime};
const RAND_LENGTH: usize = 16;
pub struct IdsService {
ids_cache: Arc<Mutex<VecDeque<u64>>>,
cache_size: usize,
number_of_threads: usize,
threads_pool: Vec<Option<JoinHandle<()>>>,
stop_state: Arc<AtomicBool>,
}
impl Default for IdsService {
fn default() -> Self {
let cache_size = 100_000_usize;
IdsService {
ids_cache: Arc::new(Mutex::new(VecDeque::with_capacity(cache_size))),
cache_size,
number_of_threads: *NUMBER_OF_FILL_THREAD,
threads_pool: Vec::with_capacity(*NUMBER_OF_FILL_THREAD),
stop_state: Arc::new(AtomicBool::new(false)),
}
}
}
impl fmt::Debug for IdsService {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"IdsService {{ ids_cache nb. ids: {}, cache_size: {}, number_of_threads : {} }}",
self.ids_cache.lock().unwrap().len(),
self.cache_size,
self.number_of_threads
)
}
}
impl Service for IdsService {
fn start(&mut self) {
info!("Start ids_service!");
let mut rng = thread_rng();
self.stop_state.store(false, Ordering::Relaxed);
for i in 0..self.number_of_threads {
let builder = thread::Builder::new().name(format!("{}", i));
let ids_cache_clone = self.ids_cache.clone();
let cache_size = self.cache_size;
let delay_ms: u64 = rng.gen_range(50..200);
let stop_state_clone = self.stop_state.clone();
let handler = builder
.spawn(move || {
let thread_id: usize =
usize::from_str(thread::current().name().unwrap()).unwrap();
trace!("Thread id {}: is up", thread_id);
loop {
if ids_cache_clone.lock().unwrap().len() < cache_size {
let id_result = build_id();
if let Ok(id) = id_result {
ids_cache_clone.lock().unwrap().push_back(id);
} else {
warn!(
"Thread id {}: Error on random data generator. {}",
thread_id,
id_result.err().unwrap().to_string()
);
}
} else {
thread::sleep(Duration::from_millis(delay_ms));
}
if stop_state_clone.load(Ordering::Relaxed) {
trace!("Thread id {}: stopped", thread_id);
break;
}
}
})
.unwrap_or_else(|_| panic!("Expect no error from thread {}", i));
self.threads_pool.push(Some(handler));
}
}
fn stop(&mut self) {
info!("Stop ids_service!");
self.stop_state.store(true, Ordering::Relaxed);
for handle in &mut self.threads_pool {
if let Some(handle) = handle.take() {
handle.join().unwrap();
}
}
self.ids_cache.lock().unwrap().clear();
}
}
impl Encode for u64 {
fn as_hex(&self) -> String {
HEXLOWER.encode(&self.to_ne_bytes())
}
fn as_base64(&self) -> String {
BASE64.encode(&self.to_ne_bytes())
}
fn as_base64_url(&self) -> String {
BASE64URL.encode(&self.to_ne_bytes())
}
fn as_base32(&self) -> String {
BASE32.encode(&self.to_ne_bytes())
}
fn as_json(&self) -> String {
let mut message: String = String::new();
message.push_str(format!("{{\n\"u64\" : \"{}\",", self).as_str());
message.push_str(format!("\n\"base64\" : \"{}\",", self.as_base64()).as_str());
message.push_str(format!("\n\"base32\" : \"{}\",", self.as_base32()).as_str());
message.push_str(format!("\n\"hex\" : \"0x{}\"\n}}", self.as_hex()).as_str());
message
}
}
impl Iterator for IdsService {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
Some(self.get_id())
}
}
impl Uids for IdsService {
type Id = u64;
fn get_id(&mut self) -> Self::Id {
let id = self.ids_cache.lock().unwrap().pop_front();
match id {
Some(i) => i,
None => {
info!("The ids_cache is empty, create on fly an id");
self.create_id().unwrap()
}
}
}
fn get_id_from_cache(&mut self) -> Option<Self::Id> {
self.ids_cache.lock().unwrap().pop_front()
}
}
impl IdsService {
pub fn new(cache_size: usize, number_of_threads: Option<usize>) -> IdsService {
let threads = match number_of_threads {
Some(n) => n,
None => *NUMBER_OF_FILL_THREAD,
};
IdsService {
ids_cache: Arc::new(Mutex::new(VecDeque::with_capacity(cache_size))),
cache_size,
number_of_threads: threads,
threads_pool: Vec::with_capacity(*NUMBER_OF_FILL_THREAD),
stop_state: Arc::new(AtomicBool::new(false)),
}
}
fn create_id(&mut self) -> Result<u64, rand::Error> {
build_id()
}
pub fn set_cache_size(&mut self, new_size: usize) {
self.cache_size = new_size;
info!("Restart ids_service!");
self.stop();
self.start();
}
pub fn is_filled(&self) -> bool {
let len = self.ids_cache.lock().unwrap().len();
len >= self.cache_size
}
pub fn filled_event(&mut self) -> mpsc::Receiver<bool> {
self.filled_at_percent_event(100)
}
pub fn filled_at_percent_event(&mut self, percentage : u8) -> mpsc::Receiver<bool> {
let percent = {if percentage>100 {100} else {percentage}};
let filled_chn: (mpsc::Sender<bool>, mpsc::Receiver<bool>) = mpsc::channel();
let sender = filled_chn.0.clone();
let name = format!("filled_event {}", self.threads_pool.len() + 1);
let builder = thread::Builder::new().name(name.clone());
let stop_state_clone = self.stop_state.clone();
let ids_cache_clone = self.ids_cache.clone();
let cache_limit = self.cache_size * percent as usize / 100;
let handler = builder
.spawn(move || {
trace!("Thread id {}: is up", thread::current().name().unwrap());
loop {
if ids_cache_clone.lock().unwrap().len() >= cache_limit {
sender.send(true).unwrap();
break;
}
if stop_state_clone.load(Ordering::Relaxed) {
sender.send(false).unwrap();
break;
}
thread::yield_now();
}
trace!("Thread id {}: stopped", thread::current().name().unwrap());
})
.unwrap_or_else(|_| panic!("Expect no error from thread {}", name));
self.threads_pool.push(Some(handler));
filled_chn.1
}
pub fn get_cache_len(&mut self) -> usize {
self.ids_cache.lock().unwrap().len()
}
}
pub fn create_id() -> Result<u64, rand::Error> {
build_id()
}
fn build_id() -> Result<u64, rand::Error> {
let mut rng = thread_rng();
let ts = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos();
let mut hasher = DefaultHasher::new();
let mut arr = [0u8; RAND_LENGTH];
rng.try_fill(&mut arr[..])?;
hasher.write(&arr);
hasher.write(&ts.to_ne_bytes());
Ok(hasher.finish())
}
#[cfg(test)]
mod tests {
use crate::rust_hash::*;
use simplelog::*;
use crate::common::{Service, Uids, Encode};
use std::collections::HashSet;
#[test]
fn aaaa_init() {
println!("Call test init");
let _ = SimpleLogger::init(LevelFilter::Info, Config::default());
assert!(true);
}
#[test]
fn test_create_id() {
let result = build_id();
assert!(result.is_ok());
assert!(result.unwrap() > 0);
}
#[test]
fn test_new() {
let mut service = IdsService::new(1000, None);
service.start();
let _ = service.filled_event().recv().is_ok();
assert!(service.get_cache_len() >= 1000);
}
#[test]
fn test_default() {
let mut service = IdsService::default();
service.start();
let _ = service.filled_event().recv().is_ok();
assert!(service.get_cache_len() >= 100_000);
}
#[test]
fn test_get_id() {
let mut service = IdsService::default();
service.start();
let _ = service.filled_at_percent_event(5).recv().is_ok();
assert_ne!(service.get_id(),0);
}
#[test]
fn test_get_id_from_cache() {
let mut service = IdsService::default();
service.start();
let _ = service.filled_at_percent_event(5).recv().is_ok();
let r1 = service.get_id_from_cache();
assert!(r1.is_some());
assert_ne!(r1.unwrap(),0);
service.stop();
let r2 = service.get_id_from_cache();
assert!(r2.is_none());
}
#[test]
fn test_filled() {
let mut service = IdsService::default();
service.start();
let _ = service.filled_event().recv().is_ok();
assert!(service.is_filled());
}
#[test]
fn test_filled_at_percent_event() {
let mut ids01 = IdsService::new(10000, None);
ids01.start();
let _ = ids01.filled_at_percent_event(5).recv().is_ok();
let cache_len = ids01.get_cache_len();
debug!("len: {}", cache_len);
assert!(cache_len > 200);
}
#[test]
fn test_iterator() {
let mut ids01 = IdsService::new(10_000, None);
ids01.start();
let _ = ids01.filled_event().recv().is_ok();
let number = 10_000;
let mut ids = HashSet::with_capacity(number);
for _ in 0..number {
ids.insert(ids01.get_id().as_hex());
}
assert_eq!(ids.len(), number);
ids.clear();
for x in ids01.take(number) {
ids.insert(x.as_hex());
}
assert_eq!(ids.len(), number);
}
}