use strum_macros::Display;
use super::{Interval, Tick, TICKS_PER_SECOND};
use std::fmt::{Debug, Display};
const RING_SIZE: usize = 32; const PERIOD: Interval = Interval(TICKS_PER_SECOND.0 / 10);
#[derive(Debug, Default, Clone, Copy)]
pub(super) struct RingArray<T: Default + Debug + Copy + Display> {
buffer: [T; RING_SIZE],
cursor: usize,
}
impl<T> Display for RingArray<T>
where
T: Default + Debug + Copy + Display,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut s = String::new();
s.push_str(&format!(
"Cursor at #{} of {} entries,\n ",
self.cursor + 1,
self.len()
));
for i in 0..self.buffer.len() {
s.push_str(&format!("{{[{}]={}}},\n", i, self.buffer[i]));
}
write!(f, "{}", s)
}
}
impl<T: Default + Debug + Copy + Display> RingArray<T> {
fn new<F>(t: T, f: F) -> Self
where
F: Fn(&mut Self),
{
let mut ra = Self {
buffer: [t; RING_SIZE],
cursor: 0,
};
f(&mut ra);
ra
}
fn len(&self) -> usize {
self.buffer.len()
}
fn push(&mut self, v: T) -> T {
let item = self.buffer[self.cursor];
self.buffer[self.cursor] = v;
self.step_forward();
item
}
fn get(&self, index: usize) -> Option<&T> {
if index >= RING_SIZE {
None
} else {
let mut index = self.cursor + index;
if index >= RING_SIZE {
index -= RING_SIZE;
}
Some(&self.buffer[index])
}
}
fn get_mut(&mut self, index: usize) -> Option<&mut T> {
if index >= RING_SIZE {
None
} else {
let mut index = self.cursor + index;
if index >= RING_SIZE {
index -= RING_SIZE;
}
Some(&mut self.buffer[index])
}
}
fn get_abs_index(&self, index: usize) -> Option<&T> {
if index >= RING_SIZE {
None
} else {
Some(&self.buffer[index])
}
}
fn get_abs_index_mut(&mut self, index: usize) -> Option<&mut T> {
if index >= RING_SIZE {
None
} else {
Some(&mut self.buffer[index])
}
}
fn get_cursor(&self) -> &T {
&self.buffer[self.cursor]
}
fn get_cursor_mut(&mut self) -> &mut T {
&mut self.buffer[self.cursor]
}
fn reset(&mut self) {
self.cursor = 0;
for item in &mut self.buffer {
*item = T::default();
}
}
fn step_forward(&mut self) {
self.cursor += 1;
while self.cursor >= RING_SIZE {
self.cursor -= RING_SIZE;
}
}
fn iter(&self) -> impl Iterator<Item = &T> {
self.buffer[0..RING_SIZE].iter()
}
fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> {
self.buffer[0..RING_SIZE].iter_mut()
}
}
#[derive(Debug, Clone, Copy, Display)]
pub(super) enum Counter {
Created,
Canceled,
Expired,
NumOfCounters,
}
const COUNTERS: [Counter; Counter::NumOfCounters as usize] =
[Counter::Created, Counter::Canceled, Counter::Expired];
#[derive(Debug, Default, Clone, Copy)]
pub(super) struct InAndOut {
pub(super) start: Tick,
pub(super) end: Tick,
pub(super) counter: [u32; Counter::NumOfCounters as usize],
}
impl Display for InAndOut {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut s = String::new();
s.push_str(&format!("{{[Start:{:?}, End:{:?}],", self.start, self.end));
for i in 0..Counter::NumOfCounters as usize {
s.push_str(&format!("[{}]={},", COUNTERS[i], self.counter[i]));
}
write!(f, "{}}}", s)
}
}
impl InAndOut {
pub(super) fn merge(&mut self, rhs: &Self) -> &mut Self {
self.end = rhs.end;
for i in 0..Counter::NumOfCounters as usize {
self.counter[i as usize] += rhs.counter[i as usize];
}
self
}
pub(super) fn separate(&mut self, rhs: &Self) -> &mut Self {
self.start = rhs.end;
for i in 0..Counter::NumOfCounters as usize {
self.counter[i as usize] -= rhs.counter[i as usize];
}
self
}
pub(super) fn reset_time(&mut self, now: Tick, zero_range: bool) -> &mut Self {
self.start = now;
if zero_range {
self.end = self.start;
} else {
self.end = self.start.until(PERIOD);
}
self
}
pub(super) fn reset(&mut self, now: Tick, zero_range: bool) -> &mut Self {
self.reset_time(now, zero_range).counter = [0; Counter::NumOfCounters as usize];
self
}
}
#[derive(Debug, Default, Clone, Copy)]
pub(super) struct RateMonitor {
pub(super) collection: RingArray<InAndOut>,
pub(super) period: InAndOut,
pub(super) overloaded_periods_now: u32,
pub(super) overloaded_periods_total: u64,
}
impl Display for RateMonitor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut s = String::new();
s.push_str(&format!("Collection:\n{}", self.collection));
s.push_str(&format!("Period:\n{}", self.period));
s.push_str(&format!(
"\nSecsOvldedNow:{}, SecsOvldedTotal:{}",
self.overloaded_periods_now, self.overloaded_periods_total
));
write!(f, "{{\n{}\n}}\n", s)
}
}
impl RateMonitor {
pub(super) fn new(now: Tick) -> Self {
let r = Self {
collection: RingArray::new(
InAndOut {
start: now,
end: now,
..Default::default()
},
|ra| {
ra.buffer[0].end = ra.buffer[0].start.until(PERIOD);
},
),
period: InAndOut {
start: now,
end: now,
..Default::default()
},
..Default::default()
};
r
}
pub(super) fn collected_sum(&self) -> [u32; Counter::NumOfCounters as usize] {
let mut sum = [0; Counter::NumOfCounters as usize];
for i in 0..self.collection.len() {
for j in 0..Counter::NumOfCounters as usize {
sum[j] += self.collection.buffer[i].counter[j];
}
}
sum
}
pub(super) fn reset_excl_ovld(&mut self, now: Tick) {
for (i, it) in self.collection.iter_mut().enumerate() {
it.reset(now, i != 0);
}
self.collection.cursor = 0;
self.period.reset(now, true);
}
pub(super) fn overload_stats(&self) -> (u32, u64) {
(self.overloaded_periods_now, self.overloaded_periods_total)
}
pub(super) fn push_counter(&mut self, val: u32, index: Counter) {
self.collection.get_cursor_mut().counter[index as usize] += val;
}
pub(super) fn on_tick(&mut self, now: Tick) {
let mut entry = self.collection.get_cursor_mut();
self.overloaded_periods_now = 0;
if now >= entry.end {
self.period.merge(entry);
let step = if now.since(entry.end).to_i64() < PERIOD.to_i64() {
1
} else {
now.since(entry.end).to_i64() / PERIOD.to_i64() + 1
};
self.overloaded_periods_now += (step - 1) as u32;
self.overloaded_periods_total += (step - 1) as u64;
if step == 1 {
self.collection.step_forward();
entry = self.collection.get_cursor_mut();
self.period.separate(entry); entry.reset(self.period.end, false); } else if (2..RING_SIZE).contains(&(step as usize)) {
for _ in 1..step + 1 {
self.collection.step_forward();
entry = self.collection.get_cursor_mut();
self.period.separate(entry); entry.reset(self.period.end, false); self.period.merge(entry); }
} else if step >= (RING_SIZE as i64) {
self.reset_excl_ovld(now);
} else {
unreachable!("step must be positive")
}
}
}
pub(super) fn rate(&self) -> [u32; Counter::NumOfCounters as usize] {
if self.period.start >= self.period.end {
return [0; Counter::NumOfCounters as usize];
}
let delta = self.period.end.since(self.period.start);
assert!(delta.to_i64() > 0);
let mut avg: [u32; Counter::NumOfCounters as usize] = [0; Counter::NumOfCounters as usize];
for i in 0..Counter::NumOfCounters as usize {
avg[i] = (((self.period.counter[i] as f32) / (delta.to_i64() as f32))
* (TICKS_PER_SECOND.0 as f32)) as u32;
}
avg
}
}
#[derive(Debug, Default, Clone, Copy)]
pub(super) struct Stats {
pub(super) inspected_total: u64,
pub(super) rate: RateMonitor,
pub(super) total: InAndOut,
}
impl Display for Stats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut s = String::new();
s.push_str(&format!("InspectedTotal:{}, ", self.inspected_total));
s.push_str(&format!("Rate:\n\t\t{}", self.rate));
s.push_str(&format!("Total:\n\t\t{}", self.total));
write!(f, "{{\n\t{}\n}}\n", s)
}
}
impl Stats {
pub(super) fn new(now: Tick) -> Self {
Self {
inspected_total: 0,
rate: RateMonitor::new(now),
total: InAndOut {
start: now,
end: now.until(1.into()),
..Default::default()
},
}
}
pub(super) fn reset(&mut self, now: Tick) -> &mut Self {
self.rate.reset_excl_ovld(now);
self.total.reset(now, true);
self.inspected_total = 0;
self
}
pub(super) fn on_tick(&mut self, now: Tick) -> (u32, u64) {
self.rate.on_tick(now);
self.overload_stats()
}
pub(super) fn push_inspect(&mut self, val: u32) -> &mut Self {
self.inspected_total += val as u64;
self
}
pub(super) fn push_counter(&mut self, val: u32, index: Counter) {
self.rate.push_counter(val, index);
self.total.counter[index as usize] += val;
}
pub(super) fn overload_stats(&self) -> (u32, u64) {
self.rate.overload_stats()
}
pub(super) fn total(&self) -> [u32; Counter::NumOfCounters as usize] {
self.total.counter
}
pub(super) fn rate(&self) -> [u32; Counter::NumOfCounters as usize] {
self.rate.rate()
}
}
#[cfg(test)]
mod tests {
use super::*;
extern crate env_logger;
use env_logger::{Builder, Env};
use log::{debug, info};
use std::sync::Once;
static INIT: Once = Once::new();
fn initialize() {
INIT.call_once(|| {
let _ = Builder::from_env(Env::default().default_filter_or("warn")).try_init();
});
}
#[test]
fn test_ring_array_push_and_get() {
let mut ring_array: RingArray<u32> = RingArray::new(30, |_| {});
ring_array.push(1);
ring_array.push(2);
ring_array.push(3);
assert_eq!(ring_array.get_abs_index(0), Some(&1));
assert_eq!(ring_array.get_abs_index(1), Some(&2));
assert_eq!(ring_array.get_abs_index(2), Some(&3));
assert_eq!(ring_array.get_abs_index(3), Some(&30));
}
#[test]
fn test_ring_array_push_overflow() {
let mut ring_array: RingArray<usize> = RingArray::new(3, |_| {});
for i in 0..ring_array.len() {
ring_array.push(i);
}
ring_array.push(3);
assert_eq!(ring_array.get_abs_index(0), Some(&3));
assert_eq!(ring_array.get_cursor(), (&1));
for i in 1..ring_array.len() {
assert_eq!(ring_array.get_abs_index(i), Some(&i));
}
}
#[test]
fn test_step_forward() {
let mut ring_array: RingArray<usize> = RingArray::new(30, |_| {});
for i in 0..ring_array.len() {
ring_array.push(i);
}
ring_array.step_forward();
ring_array.step_forward();
assert_eq!(ring_array.get_cursor(), &2);
}
#[test]
fn test_ring_array_iter() {
let mut ring_array: RingArray<u32> = RingArray::new(0, |_| {});
ring_array.push(1);
ring_array.push(2);
ring_array.push(3);
let mut iter = ring_array.iter();
assert_eq!(iter.next(), Some(&1));
assert_eq!(iter.next(), Some(&2));
assert_eq!(iter.next(), Some(&3));
assert_eq!(iter.next(), Some(&0));
}
#[test]
fn test_in_and_out_reset() {
let now = Tick(100);
let mut in_and_out = InAndOut::default();
in_and_out.start = now;
in_and_out.end = now.until(1.into());
in_and_out.counter[0] = 10;
in_and_out.counter[1] = 20;
in_and_out.reset(now, false);
assert_eq!(in_and_out.start, now);
assert_eq!(in_and_out.end, now.until(PERIOD));
assert_eq!(in_and_out.counter[0], 0);
assert_eq!(in_and_out.counter[1], 0);
in_and_out.reset(now, true);
assert_eq!(in_and_out.start, now);
assert_eq!(in_and_out.end, now);
assert_eq!(in_and_out.counter[0], 0);
assert_eq!(in_and_out.counter[1], 0);
}
#[test]
fn test_in_and_out_merge_separate() {
let now = Tick(100);
let won = Tick(1000);
let mut io1 = InAndOut {
start: now,
end: now.until(1.into()),
counter: [10, 20, 30],
};
let io2 = InAndOut {
start: won,
end: won.until(1.into()),
counter: [100, 200, 300],
};
io1.merge(&io2);
assert_eq!(io1.start, now);
assert_eq!(io1.end, won.until(1.into()));
assert_eq!(io1.counter[0], 110);
assert_eq!(io1.counter[1], 220);
assert_eq!(io1.counter[2], 330);
io1.separate(&io2);
assert_eq!(io1.start, won.until(1.into()));
assert_eq!(io1.end, won.until(1.into()));
assert_eq!(io1.counter[0], 10);
assert_eq!(io1.counter[1], 20);
assert_eq!(io1.counter[2], 30);
}
#[test]
fn test_rate_monitor_new() {
initialize();
let now = Tick(100);
let rate_monitor = RateMonitor::new(now);
assert_eq!(rate_monitor.period.start, now);
assert_eq!(rate_monitor.period.end, now);
for i in 0..Counter::NumOfCounters as usize {
for j in 0..rate_monitor.collection.len() {
assert_eq!(rate_monitor.collection.buffer[j].counter[i], 0);
assert_eq!(rate_monitor.collection.buffer[j].start, now);
if j == 0 {
assert_eq!(
rate_monitor.collection.buffer[j].end,
rate_monitor.collection.buffer[j].start.until(PERIOD)
);
} else {
assert_eq!(rate_monitor.collection.buffer[j].end, now);
}
}
assert_eq!(rate_monitor.collection.buffer[0].counter[i], 0);
assert_eq!(rate_monitor.period.counter[i], 0);
}
}
#[test]
fn test_rate_monitor_reset() {
let mut now = Tick(100);
let mut rate_monitor = RateMonitor::new(now);
rate_monitor.push_counter(10, Counter::Created);
now = now.until(PERIOD);
rate_monitor.on_tick(now);
let s = rate_monitor.collected_sum();
assert_eq!(s[Counter::Created as usize], 10);
assert_eq!(s[Counter::Canceled as usize], 0);
assert_eq!(s[Counter::Expired as usize], 0);
assert_eq!(rate_monitor.period.counter[Counter::Created as usize], 10);
assert_eq!(rate_monitor.period.counter[Counter::Canceled as usize], 0);
assert_eq!(rate_monitor.period.counter[Counter::Expired as usize], 0);
rate_monitor.reset_excl_ovld(now);
let s = rate_monitor.collected_sum();
assert_eq!(s[Counter::Created as usize], 0);
assert_eq!(s[Counter::Canceled as usize], 0);
assert_eq!(s[Counter::Expired as usize], 0);
assert_eq!(rate_monitor.period.counter[Counter::Created as usize], 0);
assert_eq!(rate_monitor.period.counter[Counter::Canceled as usize], 0);
assert_eq!(rate_monitor.period.counter[Counter::Expired as usize], 0);
assert_eq!(rate_monitor.overload_stats(), (0, 0));
now = now.until(Interval(PERIOD.0 * 2));
rate_monitor.on_tick(now);
assert_eq!(rate_monitor.overload_stats(), (1, 1));
rate_monitor.reset_excl_ovld(now);
assert_eq!(rate_monitor.overload_stats(), (1, 1));
}
#[test]
fn test_rate_monitor_on_tick() {
let now = Tick(100);
let mut rate_monitor = RateMonitor::new(now);
rate_monitor.on_tick(now.until(10.into()));
assert_eq!(rate_monitor.period.end, now);
assert_eq!(rate_monitor.collection.buffer[0].end, now.until(PERIOD));
}
fn push_counter(rm: &mut RateMonitor) {
let created = 10;
let canceled = 8;
let expired = 2;
rm.push_counter(created, Counter::Created);
rm.push_counter(canceled, Counter::Canceled);
rm.push_counter(expired, Counter::Expired);
}
#[test]
fn test_rate_monitor_counter_collection() {
initialize();
let l = 0;
rate_monitor_counter_collection(l);
let l = 1;
rate_monitor_counter_collection(l);
let l = 2;
rate_monitor_counter_collection(l);
let l = 3;
rate_monitor_counter_collection(l);
let l = RING_SIZE / 2;
rate_monitor_counter_collection(l);
let l = RING_SIZE - 1;
rate_monitor_counter_collection(l);
let l = RING_SIZE;
rate_monitor_counter_collection(l);
let l = RING_SIZE + 1;
rate_monitor_counter_collection(l);
let l = RING_SIZE * 2 - 2;
rate_monitor_counter_collection(l);
let l = RING_SIZE * 2 - 1;
rate_monitor_counter_collection(l);
let l = RING_SIZE * 2;
rate_monitor_counter_collection(l);
let l = RING_SIZE * 2 + 1;
rate_monitor_counter_collection(l);
let l = RING_SIZE * 2 + 2;
rate_monitor_counter_collection(l);
let l = RING_SIZE * 10 - 4;
rate_monitor_counter_collection(l);
let l = RING_SIZE * 10 - 3;
rate_monitor_counter_collection(l);
let l = RING_SIZE * 10 - 2;
rate_monitor_counter_collection(l);
let l = RING_SIZE * 10 - 1;
rate_monitor_counter_collection(l);
let l = RING_SIZE * 10;
rate_monitor_counter_collection(l);
let l = RING_SIZE * 10 + 1;
rate_monitor_counter_collection(l);
let l = RING_SIZE * 10 + 2;
rate_monitor_counter_collection(l);
let l = RING_SIZE * 10 + 3;
rate_monitor_counter_collection(l);
let l = RING_SIZE * 10 + 4;
rate_monitor_counter_collection(l);
}
fn rate_monitor_counter_collection(l: usize) -> RateMonitor {
debug!("Length:{}", l);
let mut now = Tick(100);
let mut rate_monitor = RateMonitor::new(now);
assert_eq!(rate_monitor.rate(), [0; Counter::NumOfCounters as usize]);
assert_eq!(rate_monitor.overload_stats(), (0, 0));
for _ in 0..l {
push_counter(&mut rate_monitor);
now = now.until(Interval(PERIOD.0 / 2));
rate_monitor.on_tick(now);
}
info!("RateMonitor:{}Rate:{:?}", rate_monitor, rate_monitor.rate());
let s = rate_monitor.collected_sum();
let dl = {
if l >= rate_monitor.collection.len() * 2 {
rate_monitor.collection.len() * 2 - ((l + 1) % 2 + 1)
} else {
l
}
};
assert_eq!(s, [10 * dl as u32, 8 * dl as u32, 2 * dl as u32]);
assert_eq!(rate_monitor.overload_stats(), (0, 0));
let dl = {
if l >= rate_monitor.collection.len() * 2 {
rate_monitor.collection.len() - 1
} else {
l / 2
}
};
assert_eq!(
rate_monitor.period.counter,
[
(10 * 2 * dl) as u32,
(8 * 2 * dl) as u32,
(2 * 2 * dl) as u32
]
);
if l < 2 {
assert_eq!(rate_monitor.rate(), [0 as u32, 0 as u32, 0 as u32]);
} else if l <= RING_SIZE * 2 {
assert_eq!(
rate_monitor.rate(),
[
10 * 2 * (TICKS_PER_SECOND.0 / PERIOD.0) as u32,
8 * 2 * (TICKS_PER_SECOND.0 / PERIOD.0) as u32,
2 * 2 * (TICKS_PER_SECOND.0 / PERIOD.0) as u32,
]
);
};
rate_monitor
}
#[test]
fn test_rate_monitor_overload() {
initialize();
let mut now = Tick(100);
let mut rate_monitor = RateMonitor::new(now);
now = now.until(Interval(PERIOD.0 * 1));
rate_monitor.on_tick(now);
assert_eq!(rate_monitor.overload_stats(), (0, 0));
let mut ovld = 0;
for _ in 1..rate_monitor.collection.len() * 100 {
now = now.until(Interval(PERIOD.0 * 2));
rate_monitor.on_tick(now);
ovld += 1;
assert_eq!(rate_monitor.overload_stats(), (1, ovld));
}
rate_monitor.reset_excl_ovld(now);
assert_eq!(rate_monitor.overload_stats(), (1, ovld));
for i in 0..RING_SIZE * 100 {
let mut now = Tick(100);
let mut rm = RateMonitor::new(now);
now = now.until(Interval((PERIOD.0 * i as i64) / 2));
rm.push_counter(1000, Counter::Created);
rm.on_tick(now);
let ovld: u32 = match i as u32 {
0..=2 => 0 as u32,
n if n > 2 => (n) / 2 - 1,
_ => unreachable!(),
};
assert_eq!(rm.overload_stats(), (ovld, ovld as u64));
if i >= RING_SIZE * 2 || i < 2 {
assert_eq!(rm.rate(), [0 as u32, 0 as u32, 0 as u32]);
} else {
assert!(rm.rate()[Counter::Created as usize] > 0);
}
}
}
#[test]
fn test_stats_new() {
let now = Tick(100);
let stats = Stats::new(now);
assert_eq!(stats.inspected_total, 0);
assert_eq!(stats.total.start, now);
assert_eq!(stats.total.end, now.until(1.into()));
for &counter in &stats.total.counter {
assert_eq!(counter, 0);
}
}
#[test]
fn test_stats_reset() {
let now = Tick(100);
let mut stats = Stats::new(now);
stats.push_inspect(10);
stats.push_counter(5, Counter::Created);
stats.reset(now);
assert_eq!(stats.inspected_total, 0);
assert_eq!(stats.total.start, now);
assert_eq!(stats.total.end, now);
for &counter in &stats.total.counter {
assert_eq!(counter, 0);
}
}
#[test]
fn test_stats_on_tick() {
let now = Tick(100);
let mut stats = Stats::new(now);
let (overloaded_now, overloaded_total) = stats.on_tick(now);
assert_eq!(overloaded_now, 0);
assert_eq!(overloaded_total, 0);
}
#[test]
fn test_stats_push_inspect() {
let now = Tick(100);
let mut stats = Stats::new(now);
stats.push_inspect(10);
assert_eq!(stats.inspected_total, 10);
}
#[test]
fn test_stats_push_counter() {
let now = Tick(100);
let mut stats = Stats::new(now);
stats.push_counter(5, Counter::Created);
assert_eq!(stats.total.counter[Counter::Created as usize], 5);
}
#[test]
fn test_stats_overload_stats() {
let now = Tick(100);
let stats = Stats::new(now);
let (overloaded_now, overloaded_total) = stats.overload_stats();
assert_eq!(overloaded_now, 0);
assert_eq!(overloaded_total, 0);
}
#[test]
fn test_stats_total() {
let now = Tick(100);
let mut stats = Stats::new(now);
stats.push_counter(5, Counter::Created);
let total = stats.total();
assert_eq!(total[Counter::Created as usize], 5);
}
}