use std::time;
use std::thread;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use lazy_static;
use fnv::FnvHashMap;
use parking_lot::RwLock;
use crossbeam_queue::ArrayQueue;
use atom::Atom;
const MIN_DYNAMIC_COUNTER_CAPACITY: usize = 10;
const DEFAULT_DYNAMIC_COUNTER_CAPACITY: usize = 1000000;
const MIN_STATIC_COUNTER_CAPACITY: usize = 1;
const DEFAULT_STATIC_COUNTER_CAPACITY: usize = 1000;
lazy_static! {
pub static ref GLOBAL_PREF_COLLECT: PrefCollect = PrefCollect::new(DEFAULT_DYNAMIC_COUNTER_CAPACITY, DEFAULT_STATIC_COUNTER_CAPACITY);
}
pub fn check_counter(name: &str, cid: u64) -> bool {
Atom::from(name).str_hash() as u64 == cid
}
#[derive(Debug, Clone)]
pub struct PrefCounter(Arc<AtomicUsize>);
unsafe impl Send for PrefCounter {}
unsafe impl Sync for PrefCounter {}
impl PrefCounter {
pub fn get(&self) -> usize {
self.0.load(Ordering::SeqCst)
}
pub fn set(&self, count: usize) {
self.0.store(count, Ordering::SeqCst);
}
pub fn sum(&self, count: usize) {
self.0.fetch_add(count, Ordering::Relaxed);
}
}
type StartTime = time::Instant;
#[derive(Debug, Clone)]
pub struct PrefTimer(Arc<AtomicUsize>);
unsafe impl Send for PrefTimer {}
unsafe impl Sync for PrefTimer {}
impl PrefTimer {
pub fn get(&self) -> usize {
self.0.load(Ordering::Relaxed)
}
pub fn start(&self) -> StartTime {
StartTime::now()
}
pub fn timing(&self, start: StartTime) {
self.0.fetch_add((StartTime::now() - start).as_micros() as usize, Ordering::Relaxed);
}
}
pub struct DynamicIterator {
inner: Arc<InnerCollect>,
cache: Vec<(u64, Arc<AtomicUsize>)>,
}
impl Drop for DynamicIterator {
fn drop(&mut self) {
for _ in 0..self.cache.len() {
self.inner.dynamic_collect.push(self.cache.remove(0));
}
}
}
impl Iterator for DynamicIterator {
type Item = (u64, Arc<AtomicUsize>);
fn next(&mut self) -> Option<Self::Item> {
if let Ok(counter) = self.inner.dynamic_collect.pop() {
if Arc::strong_count(&(counter.1)) == 2 {
self.inner.dynamic_table.write().remove(&(counter.0));
return Some(counter);
}
let r = Some((counter.0, counter.1.clone()));
self.cache.push(counter);
return r;
}
None
}
}
pub struct StaticIterator {
inner: Arc<InnerCollect>,
cache: Vec<(u64, Arc<AtomicUsize>)>,
}
impl Drop for StaticIterator {
fn drop(&mut self) {
for _ in 0..self.cache.len() {
self.inner.static_collect.push(self.cache.remove(0));
}
}
}
impl Iterator for StaticIterator {
type Item = (u64, Arc<AtomicUsize>);
fn next(&mut self) -> Option<Self::Item> {
if let Ok(counter) = self.inner.static_collect.pop() {
let r = Some((counter.0, counter.1.clone()));
self.cache.push(counter);
return r;
}
None
}
}
#[derive(Clone)]
pub struct PrefCollect(Arc<InnerCollect>);
unsafe impl Send for PrefCollect {}
unsafe impl Sync for PrefCollect {}
struct InnerCollect {
dynamic_table: RwLock<FnvHashMap<u64, Arc<AtomicUsize>>>, dynamic_collect: ArrayQueue<(u64, Arc<AtomicUsize>)>, static_init: AtomicBool, static_collect: ArrayQueue<(u64, Arc<AtomicUsize>)>, }
impl PrefCollect {
pub fn new(dynamic_capacity: usize, static_capacity: usize) -> Self {
if dynamic_capacity < MIN_DYNAMIC_COUNTER_CAPACITY {
panic!("invalid dynamic capacity");
}
if static_capacity < MIN_STATIC_COUNTER_CAPACITY {
panic!("invalid static capacity");
}
PrefCollect(Arc::new(InnerCollect {
dynamic_table: RwLock::new(FnvHashMap::default()),
dynamic_collect: ArrayQueue::new(dynamic_capacity),
static_init: AtomicBool::new(false),
static_collect: ArrayQueue::new(static_capacity),
}))
}
pub fn dynamic_is_full(&self) -> bool {
self.0.dynamic_collect.is_full()
}
pub fn dynamic_size(&self) -> usize {
self.0.dynamic_collect.len()
}
pub fn new_dynamic_counter(&self, target: Atom, init: usize) -> Option<PrefCounter> {
if self.0.dynamic_collect.is_full() {
return None;
}
let cid = target.str_hash();
if let Some(counter) = self.0.dynamic_table.read().get(&(cid as u64)) {
return Some(PrefCounter(counter.clone()));
}
let counter = Arc::new(AtomicUsize::new(init));
self.0.dynamic_collect.push((cid as u64, counter.clone()));
self.0.dynamic_table.write().insert(cid as u64, counter.clone());
Some(PrefCounter(counter))
}
pub fn new_dynamic_timer(&self, target: Atom, init: usize) -> Option<PrefTimer> {
if self.0.dynamic_collect.is_full() {
return None;
}
let cid = target.str_hash();
if let Some(counter) = self.0.dynamic_table.read().get(&(cid as u64)) {
return Some(PrefTimer(counter.clone()));
}
let counter = Arc::new(AtomicUsize::new(init));
self.0.dynamic_collect.push((cid as u64, counter.clone()));
self.0.dynamic_table.write().insert(cid as u64, counter.clone());
Some(PrefTimer(counter))
}
pub fn dynamic_iter(&self) -> DynamicIterator {
DynamicIterator {
inner: self.0.clone(),
cache: Vec::with_capacity(self.0.dynamic_collect.len()),
}
}
pub fn static_is_full(&self) -> bool {
self.0.static_collect.is_full()
}
pub fn static_size(&self) -> usize {
self.0.static_collect.len()
}
pub fn new_static_counter(&self, target: Atom, init: usize) -> Option<PrefCounter> {
if self.0.static_init.load(Ordering::SeqCst) {
return None;
}
let cid = target.str_hash();
let counter = Arc::new(AtomicUsize::new(init));
self.0.static_collect.push((cid as u64, counter.clone()));
Some(PrefCounter(counter))
}
pub fn new_static_timer(&self, target: Atom, init: usize) -> Option<PrefTimer> {
if self.0.static_init.load(Ordering::SeqCst) {
return None;
}
let cid = target.str_hash();
let counter = Arc::new(AtomicUsize::new(init));
self.0.static_collect.push((cid as u64, counter.clone()));
Some(PrefTimer(counter))
}
pub fn static_init_ok(&self) -> usize {
self.0.static_init.compare_and_swap(false, true, Ordering::SeqCst);
self.0.static_collect.len()
}
pub fn static_iter(&self) -> StaticIterator {
StaticIterator {
inner: self.0.clone(),
cache: Vec::with_capacity(self.0.static_collect.len()),
}
}
}