use super::MetricTrait;
use crate::base::TimePredicate;
use crate::utils::curr_time_millis;
use crate::{Error, Result};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
const DEFAULT_TIME: u64 = 0;
#[derive(Debug, Default)]
pub struct BucketWrap<T: MetricTrait> {
start_stamp: AtomicU64,
value: T,
}
impl<T: MetricTrait> BucketWrap<T> {
pub fn new(start_stamp: u64) -> Self {
BucketWrap {
start_stamp: AtomicU64::new(start_stamp),
value: T::default(),
}
}
pub fn start_stamp(&self) -> u64 {
self.start_stamp.load(Ordering::SeqCst)
}
pub fn value(&self) -> &T {
&self.value
}
pub fn reset_start_stamp(&self, start_stamp: u64) {
self.start_stamp.store(start_stamp, Ordering::SeqCst);
}
pub fn reset_value(&self) {
self.value.reset();
}
pub fn is_time_in_bucket(&self, now: u64, bucket_len_ms: u32) -> bool {
let start = self.start_stamp.load(Ordering::SeqCst);
start <= now && now < start + (bucket_len_ms as u64)
}
pub fn is_deprecated(&self, now: u64, interval: u64) -> bool {
let start = self.start_stamp.load(Ordering::SeqCst);
now > start && now - start > interval
}
}
#[derive(Debug)]
pub struct LeapArray<T: MetricTrait> {
bucket_len_ms: u32,
sample_count: u32,
interval_ms: u32,
pub(crate) array: Vec<Arc<BucketWrap<T>>>, mutex: Vec<Mutex<bool>>,
}
impl<T: MetricTrait> LeapArray<T> {
pub fn new(sample_count: u32, interval_ms: u32) -> Result<Self> {
if sample_count == 0 || interval_ms % sample_count != 0 {
return Err(Error::msg(
"Invalid sample count or interval_ms. Time span needs to be evenly divided",
));
}
let mut array = Vec::with_capacity(sample_count as usize);
let mut mutex = Vec::with_capacity(sample_count as usize);
for _ in 0..sample_count {
array.push(Arc::new(BucketWrap::default()));
mutex.push(Mutex::new(false));
}
Ok(LeapArray {
bucket_len_ms: interval_ms / sample_count,
sample_count,
interval_ms,
array,
mutex,
})
}
pub fn bucket_len_ms(&self) -> u32 {
self.bucket_len_ms
}
pub fn sample_count(&self) -> u32 {
self.sample_count
}
pub fn interval_ms(&self) -> u32 {
self.interval_ms
}
pub fn reset_bucket(&self, idx: usize, start_stamp: u64) {
self.array[idx].reset_start_stamp(start_stamp);
self.array[idx].reset_value();
}
pub fn current_bucket(&self) -> Result<Arc<BucketWrap<T>>> {
self.get_bucket_of_time(curr_time_millis())
}
pub fn get_bucket_of_time(&self, now: u64) -> Result<Arc<BucketWrap<T>>> {
let idx = self.time2idx(now) as usize;
let target_start = self.calculate_start_stamp(now);
let bucket = self.array[idx].clone(); loop {
if bucket.start_stamp() == DEFAULT_TIME {
bucket.reset_start_stamp(target_start);
return Ok(Arc::clone(&bucket));
} else if bucket.start_stamp() == target_start {
return Ok(Arc::clone(&bucket));
} else if target_start > bucket.start_stamp() {
if self.mutex[idx].try_lock().is_ok() {
self.reset_bucket(idx, target_start);
return Ok(Arc::clone(&self.array[idx]));
} else {
std::thread::yield_now();
}
} else {
return Err(Error::msg("invalid time stamp, cannot find bucket"));
}
}
}
pub fn get_previous_bucket(&self) -> Result<Arc<BucketWrap<T>>> {
let previous = curr_time_millis() - (self.bucket_len_ms as u64);
let idx = self.time2idx(previous) as usize;
let bucket = self.array[idx].clone(); if bucket.is_deprecated(curr_time_millis(), self.interval_ms as u64) {
return Err(Error::msg("previous bucket has been deprecated"));
}
if bucket.start_stamp() + (self.bucket_len_ms as u64) < previous {
return Err(Error::msg("the timestamp of returnning bucket is wrong"));
}
Ok(bucket)
}
pub(crate) fn calculate_start_stamp(&self, now: u64) -> u64 {
now - now % (self.bucket_len_ms as u64)
}
pub(crate) fn time2idx(&self, now: u64) -> u64 {
let idx = now / (self.bucket_len_ms as u64);
idx % (self.sample_count as u64)
}
pub fn valid_array(&self) -> Vec<Arc<BucketWrap<T>>> {
let mut res = Vec::new();
for bucket in &self.array {
if !bucket.is_deprecated(curr_time_millis(), self.interval_ms as u64) {
res.push(bucket.clone());
}
}
res
}
pub fn get_bucket_value(&self, now: u64) -> Result<&T> {
let idx = self.time2idx(now) as usize;
let bucket = &self.array[idx]; if bucket.is_time_in_bucket(now, self.bucket_len_ms) {
Ok(bucket.value())
} else {
Err(Error::msg("invalid time, cannot get value in the bucket"))
}
}
pub fn get_current_values(&self) -> Vec<Arc<BucketWrap<T>>> {
self.get_valid_values(curr_time_millis())
}
pub fn get_valid_values(&self, now: u64) -> Vec<Arc<BucketWrap<T>>> {
self.get_valid_values_conditional(now, &|_| true)
}
pub fn get_valid_values_conditional(
&self,
now: u64,
condition: &TimePredicate,
) -> Vec<Arc<BucketWrap<T>>> {
let mut res = Vec::new();
for bucket in &self.array {
if !bucket.is_deprecated(now, self.interval_ms as u64)
&& condition(bucket.start_stamp())
{
res.push(bucket.clone());
}
}
res
}
#[cfg(test)]
pub(self) fn get_valid_head(&self) -> Result<Arc<BucketWrap<T>>> {
let idx = self.time2idx(curr_time_millis() + (self.bucket_len_ms as u64)) as usize;
let bucket = self.array[idx].clone();
if bucket.is_deprecated(curr_time_millis(), self.interval_ms as u64) {
Err(Error::msg("Cannot get a valid head"))
} else {
Ok(bucket)
}
}
}
#[cfg(test)]
mod test {
use crate::utils::sleep_for_ms;
use super::*;
use std::sync::atomic::AtomicU64;
const SAMPLE_COUNT: u32 = 20;
const BUCKET_LEN_MS: u32 = 500; const INTERVAL_MS: u32 = BUCKET_LEN_MS * SAMPLE_COUNT;
impl MetricTrait for AtomicU64 {
fn reset(&self) {
self.store(0, Ordering::SeqCst);
}
}
type LeapArrayAtomicU64 = LeapArray<AtomicU64>;
#[test]
fn time_idx() {
let arr = LeapArrayAtomicU64::new(SAMPLE_COUNT, INTERVAL_MS).unwrap();
assert_eq!(arr.time2idx(1576296044907), 9);
assert_eq!(arr.calculate_start_stamp(1576296044907), 1576296044500);
}
#[test]
fn start_time() {
let arr = LeapArrayAtomicU64::new(SAMPLE_COUNT, INTERVAL_MS).unwrap();
let now = 1596199310000;
let bucket = arr.get_bucket_of_time(now + 801).unwrap();
assert_eq!(bucket.start_stamp(), now + 500);
assert!(Arc::ptr_eq(&bucket, arr.array.get(1).unwrap()));
}
#[test]
fn deprecated() {
let now = 1576296044907;
let bucket = BucketWrap::<AtomicU64>::new(1576296004907);
assert!(bucket.is_deprecated(now, INTERVAL_MS as u64));
}
#[test]
#[ignore]
fn valid_head() {
let sample_count = 10;
cfg_if::cfg_if! {
if #[cfg(any(target_os = "macos", target_os = "windows"))]{
let interval_ms = 10000;
}else{
let interval_ms = 1000;
}
}
let bucket_len_ms = (interval_ms / sample_count) as u64;
let arr = LeapArrayAtomicU64::new(sample_count, interval_ms).unwrap();
for i in 1..=(sample_count as u64) {
cfg_if::cfg_if! {
if #[cfg(any(target_os = "macos", target_os = "windows"))]{
sleep_for_ms(bucket_len_ms - (curr_time_millis() % bucket_len_ms )/2);
}else{
sleep_for_ms(bucket_len_ms);
}
}
arr.current_bucket()
.unwrap()
.value()
.store(i, Ordering::SeqCst);
}
cfg_if::cfg_if! {
if #[cfg(any(target_os = "macos", target_os = "windows"))]{
sleep_for_ms(bucket_len_ms - (curr_time_millis() % bucket_len_ms )/2);
}else{
sleep_for_ms(bucket_len_ms);
}
}
let head = arr.get_valid_head().unwrap();
assert_eq!(2, head.value().load(Ordering::SeqCst));
}
}