use data_encoding::{BASE32, BASE64, BASE64URL, HEXLOWER};
use log::*;
use rand::RngExt;
use sha3::{Digest, Sha3_224, Sha3_256, Sha3_384, Sha3_512};
use std::collections::VecDeque;
use std::fmt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex, mpsc};
use std::thread;
use std::thread::JoinHandle;
use std::time::{Duration, SystemTime};
use super::common::{Encode, NUMBER_OF_FILL_THREAD, Service, Uids};
pub struct IdsService {
ids_cache: Arc<Mutex<VecDeque<Vec<u8>>>>,
cache_condvar: Arc<Condvar>,
cache_size: usize,
rand_length: usize,
mode: Sha3Mode,
number_of_threads: usize,
threads_pool: Vec<Option<JoinHandle<()>>>,
stop_state: Arc<AtomicBool>,
}
impl Default for IdsService {
fn default() -> Self {
let cache_size = 100_000_usize;
IdsService {
ids_cache: Arc::new(Mutex::new(VecDeque::with_capacity(cache_size))),
cache_condvar: Arc::new(Condvar::new()),
rand_length: Sha3Length::from(Sha3Mode::Sha3_256) * 2,
cache_size,
mode: Sha3Mode::Sha3_256,
number_of_threads: *NUMBER_OF_FILL_THREAD,
threads_pool: Vec::with_capacity(*NUMBER_OF_FILL_THREAD),
stop_state: Arc::new(AtomicBool::new(false)),
}
}
}
impl IdsService {
fn make(cache_size: usize, mode: Sha3Mode, number_of_threads: usize) -> Self {
IdsService {
ids_cache: Arc::new(Mutex::new(VecDeque::with_capacity(cache_size))),
cache_condvar: Arc::new(Condvar::new()),
rand_length: Sha3Length::from(mode) * 2,
cache_size,
mode,
number_of_threads,
threads_pool: Vec::with_capacity(number_of_threads),
stop_state: Arc::new(AtomicBool::new(false)),
}
}
pub fn new(cache_size: usize, mode: Sha3Mode, number_of_threads: Option<usize>) -> IdsService {
let threads = number_of_threads.unwrap_or(*NUMBER_OF_FILL_THREAD);
Self::make(cache_size, mode, threads)
}
pub fn get_cache_len(&self) -> usize {
self.ids_cache
.lock()
.unwrap_or_else(|e| e.into_inner())
.len()
}
pub fn set_cache_size(&mut self, new_size: usize) {
self.cache_size = new_size;
info!("Restart ids_service with new cache size {}!", new_size);
self.stop();
self.start();
}
pub fn is_filled(&self) -> bool {
self.ids_cache
.lock()
.unwrap_or_else(|e| e.into_inner())
.len()
>= self.cache_size
}
pub fn filled_event(&mut self) -> mpsc::Receiver<bool> {
self.filled_at_percent_event(100)
}
pub fn filled_at_percent_event(&mut self, percentage: u8) -> mpsc::Receiver<bool> {
let percent = percentage.min(100);
let cache_limit = self.cache_size * percent as usize / 100;
let (tx, rx) = mpsc::channel();
let ids_cache_clone = Arc::clone(&self.ids_cache);
let condvar_clone = Arc::clone(&self.cache_condvar);
let stop_state_clone = Arc::clone(&self.stop_state);
let name = format!("filled_event_{}", self.threads_pool.len() + 1);
let handler = thread::Builder::new()
.name(name)
.spawn(move || {
let mut guard = ids_cache_clone.lock().unwrap_or_else(|e| e.into_inner());
loop {
if guard.len() >= cache_limit {
let _ = tx.send(true);
break;
}
if stop_state_clone.load(Ordering::Relaxed) {
let _ = tx.send(false);
break;
}
let result = condvar_clone
.wait_timeout(guard, Duration::from_millis(50))
.unwrap_or_else(|e| e.into_inner());
guard = result.0;
}
})
.expect("failed to spawn filled_event thread");
self.threads_pool.push(Some(handler));
rx
}
}
impl fmt::Debug for IdsService {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"IdsService {{ cached_ids: {}, cache_size: {}, rand_length: {}, mode: {:?}, threads: {} }}",
self.ids_cache
.lock()
.unwrap_or_else(|e| e.into_inner())
.len(),
self.cache_size,
self.rand_length,
self.mode,
self.number_of_threads,
)
}
}
impl Service for IdsService {
fn start(&mut self) {
info!("Start ids_service (crypto_hash, mode={:?})!", self.mode);
self.stop_state.store(false, Ordering::Relaxed);
for i in 0..self.number_of_threads {
let ids_cache_clone = Arc::clone(&self.ids_cache);
let condvar_clone = Arc::clone(&self.cache_condvar);
let cache_size = self.cache_size;
let mode = self.mode;
let stop_state_clone = Arc::clone(&self.stop_state);
let handler = thread::Builder::new()
.name(format!("ids-crypto-{}", i))
.spawn(move || {
trace!("Worker thread ids-crypto-{} started", i);
loop {
let needs_fill = {
let guard = ids_cache_clone.lock().unwrap_or_else(|e| e.into_inner());
guard.len() < cache_size
};
if needs_fill {
let id = make_id(mode);
let mut guard =
ids_cache_clone.lock().unwrap_or_else(|e| e.into_inner());
guard.push_back(id);
drop(guard);
condvar_clone.notify_one();
} else {
thread::sleep(Duration::from_millis(10));
}
if stop_state_clone.load(Ordering::Relaxed) {
trace!("Worker thread ids-crypto-{} stopped", i);
break;
}
}
})
.unwrap_or_else(|_| panic!("Failed to spawn worker thread {}", i));
self.threads_pool.push(Some(handler));
}
}
fn stop(&mut self) {
info!("Stop ids_service (crypto_hash)!");
self.stop_state.store(true, Ordering::Relaxed);
self.cache_condvar.notify_all();
for handle in &mut self.threads_pool {
if let Some(h) = handle.take() {
h.join()
.unwrap_or_else(|_| error!("Worker thread panicked"));
}
}
self.threads_pool.clear();
self.ids_cache
.lock()
.unwrap_or_else(|e| e.into_inner())
.clear();
}
}
impl Uids for IdsService {
type Id = Vec<u8>;
fn get_id(&mut self) -> Self::Id {
match self
.ids_cache
.lock()
.unwrap_or_else(|e| e.into_inner())
.pop_front()
{
Some(id) => id,
None => {
info!("Cache empty — generating id on-the-fly");
make_id(self.mode)
}
}
}
fn get_id_from_cache(&mut self) -> Option<Self::Id> {
self.ids_cache
.lock()
.unwrap_or_else(|e| e.into_inner())
.pop_front()
}
}
impl Iterator for IdsService {
type Item = Vec<u8>;
fn next(&mut self) -> Option<Self::Item> {
Some(self.get_id())
}
}
impl Encode for Vec<u8> {
fn as_hex(&self) -> String {
HEXLOWER.encode(self)
}
fn as_base64(&self) -> String {
BASE64.encode(self)
}
fn as_base64_url(&self) -> String {
BASE64URL.encode(self)
}
fn as_base32(&self) -> String {
BASE32.encode(self)
}
fn as_json(&self) -> String {
format!(
"{{\n\"bytes\" : \"{:?}\",\n\"base64\" : \"{}\",\n\"base32\" : \"{}\",\n\"hex\" : \"0x{}\"\n}}",
self,
self.as_base64(),
self.as_base32(),
self.as_hex(),
)
}
}
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum Sha3Mode {
Sha3_224,
Sha3_256,
Sha3_384,
Sha3_512,
}
#[derive(Clone)]
#[allow(clippy::enum_variant_names)]
enum Sha3Hasher {
Sha3_224(Sha3_224),
Sha3_256(Sha3_256),
Sha3_384(Sha3_384),
Sha3_512(Sha3_512),
}
impl From<Sha3Mode> for Sha3Hasher {
fn from(mode: Sha3Mode) -> Self {
match mode {
Sha3Mode::Sha3_224 => Sha3Hasher::Sha3_224(Sha3_224::new()),
Sha3Mode::Sha3_256 => Sha3Hasher::Sha3_256(Sha3_256::new()),
Sha3Mode::Sha3_384 => Sha3Hasher::Sha3_384(Sha3_384::new()),
Sha3Mode::Sha3_512 => Sha3Hasher::Sha3_512(Sha3_512::new()),
}
}
}
pub type Sha3Length = usize;
impl From<Sha3Mode> for Sha3Length {
fn from(mode: Sha3Mode) -> Self {
match mode {
Sha3Mode::Sha3_224 => 28,
Sha3Mode::Sha3_256 => 32,
Sha3Mode::Sha3_384 => 48,
Sha3Mode::Sha3_512 => 64,
}
}
}
fn make_id(mode: Sha3Mode) -> Vec<u8> {
let mut rng = rand::rng();
let len = Sha3Length::from(mode);
let ts = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("system clock before UNIX epoch")
.as_nanos();
let mut arr = vec![0u8; len * 2];
rng.fill(&mut arr[..]);
let hasher = Sha3Hasher::from(mode);
let mut out: Vec<u8> = Vec::with_capacity(len);
match hasher {
Sha3Hasher::Sha3_224(mut h) => {
h.update(ts.to_ne_bytes());
h.update(&arr);
out.extend(h.finalize());
}
Sha3Hasher::Sha3_256(mut h) => {
h.update(ts.to_ne_bytes());
h.update(&arr);
out.extend(h.finalize());
}
Sha3Hasher::Sha3_384(mut h) => {
h.update(ts.to_ne_bytes());
h.update(&arr);
out.extend(h.finalize());
}
Sha3Hasher::Sha3_512(mut h) => {
h.update(ts.to_ne_bytes());
h.update(&arr);
out.extend(h.finalize());
}
}
out
}
pub fn create_id_as_sha512() -> String {
HEXLOWER.encode(&make_id(Sha3Mode::Sha3_512))
}
pub fn create_id_as_sha256() -> String {
HEXLOWER.encode(&make_id(Sha3Mode::Sha3_256))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::common::{Encode, Service, Uids};
use std::collections::HashSet;
fn init_log() {
let _ = simplelog::SimpleLogger::init(
simplelog::LevelFilter::Info,
simplelog::Config::default(),
);
}
#[test]
fn test_start_stop() {
init_log();
let mut svc = IdsService::new(1_000, Sha3Mode::Sha3_512, None);
svc.start();
let _ = svc.filled_event().recv();
assert!(svc.get_cache_len() >= 1_000);
svc.stop();
assert_eq!(svc.get_cache_len(), 0);
}
#[test]
fn test_create_id_512() {
let id1 = create_id_as_sha512();
let id2 = create_id_as_sha512();
assert_ne!(id1, id2);
assert_eq!(id1.len(), 128); }
#[test]
fn test_create_id_256() {
let id1 = create_id_as_sha256();
let id2 = create_id_as_sha256();
assert_ne!(id1, id2);
assert_eq!(id1.len(), 64);
}
#[test]
fn test_get_id() {
let mut svc = IdsService::new(1_000, Sha3Mode::Sha3_256, None);
svc.start();
let r1 = svc.get_id();
let r2 = svc.get_id();
assert_eq!(r1.len(), Sha3Length::from(Sha3Mode::Sha3_256));
assert_ne!(r1, r2);
}
#[test]
fn test_filled() {
let mut svc = IdsService::new(1_000, Sha3Mode::Sha3_256, None);
svc.start();
let _ = svc.filled_event().recv();
assert!(svc.is_filled());
}
#[test]
fn test_filled_at_percent_event() {
let mut svc = IdsService::new(1_000, Sha3Mode::Sha3_256, None);
svc.start();
let _ = svc.filled_at_percent_event(20).recv();
assert!(svc.get_cache_len() >= 200);
}
#[test]
fn test_get_id_from_cache_empty() {
let mut svc = IdsService::new(1_000, Sha3Mode::Sha3_256, None);
assert!(svc.get_id_from_cache().is_none());
}
#[test]
fn test_get_id_from_cache() {
let mut svc = IdsService::new(1_000, Sha3Mode::Sha3_256, None);
svc.start();
let _ = svc.filled_at_percent_event(5).recv();
let r1 = svc.get_id_from_cache().expect("cache should have ids");
let r2 = svc.get_id_from_cache().expect("cache should have ids");
assert_eq!(r1.len(), Sha3Length::from(Sha3Mode::Sha3_256));
assert_ne!(r1, r2);
}
#[test]
fn test_all_modes() {
for mode in [
Sha3Mode::Sha3_224,
Sha3Mode::Sha3_256,
Sha3Mode::Sha3_384,
Sha3Mode::Sha3_512,
] {
let mut svc = IdsService::new(200, mode, None);
svc.start();
let _ = svc.filled_at_percent_event(2).recv();
let r1 = svc.get_id();
let r2 = svc.get_id();
assert_eq!(r1.len(), Sha3Length::from(mode));
assert_ne!(r1, r2);
svc.stop();
}
}
#[test]
fn test_encode_hex() {
let mut svc = IdsService::new(100, Sha3Mode::Sha3_256, None);
svc.start();
let hex = svc.get_id().as_hex();
assert_eq!(hex.len(), 64);
}
#[test]
fn test_encode_base64() {
let mut svc = IdsService::new(100, Sha3Mode::Sha3_256, None);
svc.start();
let b64 = svc.get_id().as_base64();
assert_eq!(b64.len(), 44);
}
#[test]
fn test_encode_base64_url() {
let mut svc = IdsService::new(100, Sha3Mode::Sha3_256, None);
svc.start();
let b64url = svc.get_id().as_base64_url();
assert_eq!(b64url.len(), 44);
}
#[test]
fn test_encode_base32() {
let mut svc = IdsService::new(100, Sha3Mode::Sha3_256, None);
svc.start();
let b32 = svc.get_id().as_base32();
assert_eq!(b32.len(), 56);
}
#[test]
fn test_encode_json() {
let mut svc = IdsService::new(100, Sha3Mode::Sha3_256, None);
svc.start();
let json = svc.get_id().as_json();
assert!(json.contains("\"bytes\""));
assert!(json.contains("\"base64\""));
assert!(json.contains("\"base32\""));
assert!(json.contains("\"hex\""));
}
#[test]
fn test_iterator_uniqueness() {
let mut svc = IdsService::new(10_000, Sha3Mode::Sha3_512, None);
svc.start();
let _ = svc.filled_event().recv();
let n = 10_000;
let ids: HashSet<String> = svc.take(n).map(|id| id.as_hex()).collect();
assert_eq!(ids.len(), n);
}
#[test]
fn test_set_cache_size() {
let mut svc = IdsService::default();
svc.start();
let _ = svc.filled_event().recv();
svc.set_cache_size(20_000);
let _ = svc.filled_event().recv();
assert!(svc.get_cache_len() >= 20_000);
svc.stop();
}
}