use data_encoding::{BASE32, BASE64, BASE64URL, HEXLOWER};
use log::*;
use rand::RngExt;
use std::collections::VecDeque;
use std::collections::hash_map::DefaultHasher;
use std::fmt;
use std::hash::Hasher;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex, mpsc};
use std::thread;
use std::thread::JoinHandle;
use std::time::{Duration, SystemTime};
use super::common::{Encode, NUMBER_OF_FILL_THREAD, Service, Uids};
const RAND_LENGTH: usize = 16;
pub struct IdsService {
ids_cache: Arc<Mutex<VecDeque<u64>>>,
cache_condvar: Arc<Condvar>,
cache_size: usize,
number_of_threads: usize,
threads_pool: Vec<Option<JoinHandle<()>>>,
stop_state: Arc<AtomicBool>,
}
impl Default for IdsService {
fn default() -> Self {
let cache_size = 100_000_usize;
IdsService {
ids_cache: Arc::new(Mutex::new(VecDeque::with_capacity(cache_size))),
cache_condvar: Arc::new(Condvar::new()),
cache_size,
number_of_threads: *NUMBER_OF_FILL_THREAD,
threads_pool: Vec::with_capacity(*NUMBER_OF_FILL_THREAD),
stop_state: Arc::new(AtomicBool::new(false)),
}
}
}
impl fmt::Debug for IdsService {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"IdsService {{ cached_ids: {}, cache_size: {}, threads: {} }}",
self.ids_cache
.lock()
.unwrap_or_else(|e| e.into_inner())
.len(),
self.cache_size,
self.number_of_threads,
)
}
}
impl Service for IdsService {
fn start(&mut self) {
info!("Start ids_service (rust_hash)!");
self.stop_state.store(false, Ordering::Relaxed);
for i in 0..self.number_of_threads {
let ids_cache_clone = Arc::clone(&self.ids_cache);
let condvar_clone = Arc::clone(&self.cache_condvar);
let cache_size = self.cache_size;
let stop_state_clone = Arc::clone(&self.stop_state);
let handler = thread::Builder::new()
.name(format!("ids-rh-{}", i))
.spawn(move || {
trace!("Worker thread ids-rh-{} started", i);
loop {
let needs_fill = ids_cache_clone
.lock()
.unwrap_or_else(|e| e.into_inner())
.len()
< cache_size;
if needs_fill {
let id = build_id();
ids_cache_clone
.lock()
.unwrap_or_else(|e| e.into_inner())
.push_back(id);
condvar_clone.notify_one();
} else {
thread::sleep(Duration::from_millis(10));
}
if stop_state_clone.load(Ordering::Relaxed) {
trace!("Worker thread ids-rh-{} stopped", i);
break;
}
}
})
.unwrap_or_else(|_| panic!("Failed to spawn worker thread {}", i));
self.threads_pool.push(Some(handler));
}
}
fn stop(&mut self) {
info!("Stop ids_service (rust_hash)!");
self.stop_state.store(true, Ordering::Relaxed);
self.cache_condvar.notify_all();
for handle in &mut self.threads_pool {
if let Some(h) = handle.take() {
h.join()
.unwrap_or_else(|_| error!("Worker thread panicked"));
}
}
self.threads_pool.clear();
self.ids_cache
.lock()
.unwrap_or_else(|e| e.into_inner())
.clear();
}
}
impl Uids for IdsService {
type Id = u64;
fn get_id(&mut self) -> Self::Id {
match self
.ids_cache
.lock()
.unwrap_or_else(|e| e.into_inner())
.pop_front()
{
Some(id) => id,
None => {
info!("Cache empty — generating id on-the-fly");
build_id()
}
}
}
fn get_id_from_cache(&mut self) -> Option<Self::Id> {
self.ids_cache
.lock()
.unwrap_or_else(|e| e.into_inner())
.pop_front()
}
}
impl Iterator for IdsService {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
Some(self.get_id())
}
}
impl Encode for u64 {
fn as_hex(&self) -> String {
HEXLOWER.encode(&self.to_ne_bytes())
}
fn as_base64(&self) -> String {
BASE64.encode(&self.to_ne_bytes())
}
fn as_base64_url(&self) -> String {
BASE64URL.encode(&self.to_ne_bytes())
}
fn as_base32(&self) -> String {
BASE32.encode(&self.to_ne_bytes())
}
fn as_json(&self) -> String {
format!(
"{{\n\"u64\" : \"{}\",\n\"base64\" : \"{}\",\n\"base32\" : \"{}\",\n\"hex\" : \"0x{}\"\n}}",
self,
self.as_base64(),
self.as_base32(),
self.as_hex(),
)
}
}
impl IdsService {
pub fn new(cache_size: usize, number_of_threads: Option<usize>) -> IdsService {
let threads = number_of_threads.unwrap_or(*NUMBER_OF_FILL_THREAD);
IdsService {
ids_cache: Arc::new(Mutex::new(VecDeque::with_capacity(cache_size))),
cache_condvar: Arc::new(Condvar::new()),
cache_size,
number_of_threads: threads,
threads_pool: Vec::with_capacity(threads),
stop_state: Arc::new(AtomicBool::new(false)),
}
}
pub fn set_cache_size(&mut self, new_size: usize) {
self.cache_size = new_size;
info!(
"Restart ids_service (rust_hash) with new cache size {}!",
new_size
);
self.stop();
self.start();
}
pub fn is_filled(&self) -> bool {
self.ids_cache
.lock()
.unwrap_or_else(|e| e.into_inner())
.len()
>= self.cache_size
}
pub fn get_cache_len(&self) -> usize {
self.ids_cache
.lock()
.unwrap_or_else(|e| e.into_inner())
.len()
}
pub fn filled_event(&mut self) -> mpsc::Receiver<bool> {
self.filled_at_percent_event(100)
}
pub fn filled_at_percent_event(&mut self, percentage: u8) -> mpsc::Receiver<bool> {
let percent = percentage.min(100);
let cache_limit = self.cache_size * percent as usize / 100;
let (tx, rx) = mpsc::channel();
let ids_cache_clone = Arc::clone(&self.ids_cache);
let condvar_clone = Arc::clone(&self.cache_condvar);
let stop_state_clone = Arc::clone(&self.stop_state);
let name = format!("filled_event_{}", self.threads_pool.len() + 1);
let handler = thread::Builder::new()
.name(name)
.spawn(move || {
let mut guard = ids_cache_clone.lock().unwrap_or_else(|e| e.into_inner());
loop {
if guard.len() >= cache_limit {
let _ = tx.send(true);
break;
}
if stop_state_clone.load(Ordering::Relaxed) {
let _ = tx.send(false);
break;
}
let result = condvar_clone
.wait_timeout(guard, Duration::from_millis(50))
.unwrap_or_else(|e| e.into_inner());
guard = result.0;
}
})
.expect("failed to spawn filled_event thread");
self.threads_pool.push(Some(handler));
rx
}
}
pub fn create_id() -> u64 {
build_id()
}
fn build_id() -> u64 {
let mut rng = rand::rng();
let ts = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("system clock before UNIX epoch")
.as_nanos();
let mut hasher = DefaultHasher::new();
let mut arr = [0u8; RAND_LENGTH];
rng.fill(&mut arr[..]);
hasher.write(&arr);
hasher.write(&ts.to_ne_bytes());
hasher.finish()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::common::{Encode, Service, Uids};
use std::collections::HashSet;
fn init_log() {
let _ = simplelog::SimpleLogger::init(
simplelog::LevelFilter::Info,
simplelog::Config::default(),
);
}
#[test]
fn test_build_id() {
let id1 = build_id();
let id2 = build_id();
assert_ne!(id1, id2);
assert_ne!(id1, 0);
}
#[test]
fn test_new() {
init_log();
let mut svc = IdsService::new(1_000, None);
svc.start();
let _ = svc.filled_event().recv();
assert!(svc.get_cache_len() >= 1_000);
svc.stop();
assert_eq!(svc.get_cache_len(), 0);
}
#[test]
fn test_default() {
let mut svc = IdsService::default();
svc.start();
let _ = svc.filled_event().recv();
assert!(svc.get_cache_len() >= 100_000);
svc.stop();
}
#[test]
fn test_get_id() {
let mut svc = IdsService::default();
svc.start();
let _ = svc.filled_at_percent_event(5).recv();
assert_ne!(svc.get_id(), 0);
}
#[test]
fn test_get_id_from_cache() {
let mut svc = IdsService::default();
svc.start();
let _ = svc.filled_at_percent_event(5).recv();
let r1 = svc.get_id_from_cache();
assert!(r1.is_some());
assert_ne!(r1.unwrap(), 0);
svc.stop();
assert!(svc.get_id_from_cache().is_none());
}
#[test]
fn test_filled() {
let mut svc = IdsService::default();
svc.start();
let _ = svc.filled_event().recv();
assert!(svc.is_filled());
svc.stop();
}
#[test]
fn test_filled_at_percent_event() {
let mut svc = IdsService::new(10_000, None);
svc.start();
let _ = svc.filled_at_percent_event(5).recv();
assert!(svc.get_cache_len() > 200);
svc.stop();
}
#[test]
fn test_iterator_uniqueness() {
let mut svc = IdsService::new(10_000, None);
svc.start();
let _ = svc.filled_event().recv();
let n = 10_000;
let ids: HashSet<String> = svc.take(n).map(|id| id.as_hex()).collect();
assert_eq!(ids.len(), n);
}
#[test]
fn test_encode_hex() {
let id = build_id();
assert_eq!(id.as_hex().len(), 16); }
#[test]
fn test_encode_json() {
let id = build_id();
let json = id.as_json();
assert!(json.contains("\"u64\""));
assert!(json.contains("\"base64\""));
assert!(json.contains("\"base32\""));
assert!(json.contains("\"hex\""));
}
#[test]
fn test_set_cache_size() {
let mut svc = IdsService::new(1_000, None);
svc.start();
let _ = svc.filled_event().recv();
svc.set_cache_size(500);
let _ = svc.filled_event().recv();
assert!(svc.get_cache_len() >= 500);
svc.stop();
}
}