use core::fmt;
use std::{cmp::Ordering, collections::VecDeque, time::Duration};
use tokio::sync::Notify;
use tokio::time::{Instant, sleep};
#[derive(Debug, Clone)]
pub struct Speed {
event_speeds: VecDeque<Instant>,
}
impl Speed {
pub fn new(size_for_average: u16) -> Speed {
let size = if size_for_average > 5 {
size_for_average as usize
} else {
5usize
};
Speed {
event_speeds: VecDeque::with_capacity(size),
}
}
pub fn time_event(&mut self, instant: Instant) {
if self.event_speeds.len() == self.event_speeds.capacity() {
self.event_speeds.pop_back();
}
self.event_speeds.push_front(instant);
}
pub fn time(&mut self) {
self.time_event(Instant::now())
}
pub fn get_last_event(&self) -> Option<&Instant> {
self.event_speeds.front()
}
fn accumulate_event_speeds(&self) -> Duration {
let mut duration = Duration::ZERO;
let mut instant = Instant::now();
for event_speed in &self.event_speeds {
duration += instant.duration_since(*event_speed);
instant = *event_speed;
}
duration
}
pub fn get_mean_duration(&self) -> Duration {
if !self.event_speeds.is_empty() {
self.accumulate_event_speeds()
.div_f32(self.event_speeds.len() as f32)
} else {
Duration::ZERO
}
}
pub fn get_speed(&self) -> f64 {
let sum_duration = self.accumulate_event_speeds();
if !sum_duration.is_zero() {
(1000 * self.event_speeds.len()) as f64 / sum_duration.as_millis() as f64
} else {
0.0
}
}
pub fn get_duration_overhead(&self, tps: f64, overhead: Option<Duration>) -> Duration {
let duration =
Duration::from_millis(((1000 * self.event_speeds.len()) as f64 / tps) as u64);
let sum_duration = self.accumulate_event_speeds();
if let Some(overhead) = overhead {
duration
.saturating_add(overhead)
.saturating_sub(sum_duration)
} else {
duration.saturating_sub(sum_duration)
}
}
pub fn get_duration(&self, tps: f64) -> Duration {
self.get_duration_overhead(tps, None)
}
}
impl Default for Speed {
fn default() -> Self {
Speed {
event_speeds: VecDeque::with_capacity(15),
}
}
}
impl PartialEq for Speed {
fn eq(&self, other: &Self) -> bool {
self.get_speed() == other.get_speed()
}
}
impl PartialOrd for Speed {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.get_speed().partial_cmp(&other.get_speed())
}
}
impl fmt::Display for Speed {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(
f,
"{} TPS (mean {} ms)",
self.get_speed(),
self.get_mean_duration().as_millis()
)
}
}
pub struct Regulator {
max_speed: f64,
timeout_threshold: Duration,
max_concurrents_send: u32,
speed: Speed,
concurent_notify: Notify,
current_concurrents_send: u32,
tick_overhead: Option<Duration>,
}
impl Regulator {
pub fn new(
max_speed: f64,
timeout_threshold: Duration,
max_concurrents_send: u32,
speed_interval: u16,
) -> Regulator {
Regulator {
max_speed,
timeout_threshold,
max_concurrents_send,
speed: Speed::new(speed_interval),
concurent_notify: Notify::new(),
current_concurrents_send: 0,
tick_overhead: None,
}
}
pub async fn tick(&mut self) {
#[allow(clippy::while_immutable_condition)]
while self.current_concurrents_send >= self.max_concurrents_send {
self.concurent_notify.notified().await;
}
let duration = self
.speed
.get_duration_overhead(self.max_speed, self.tick_overhead);
if !duration.is_zero() {
sleep(duration).await;
} else {
self.tick_overhead.take();
}
}
pub fn notify_send_transaction(&mut self) {
self.speed.time();
self.current_concurrents_send += 1;
}
pub fn notify_receive_transaction(&mut self, response_time: Duration) {
if response_time > self.timeout_threshold {
self.tick_overhead = Some(response_time - self.timeout_threshold);
} else {
self.tick_overhead = None;
}
self.current_concurrents_send -= 1;
self.concurent_notify.notify_one();
}
pub fn add_tick_overhead(&mut self, overhead: Duration) {
if self.tick_overhead.is_none_or(|d| d < overhead) {
self.tick_overhead = Some(overhead);
}
}
pub fn get_speed(&self) -> f64 {
self.speed.get_speed()
}
}
impl Default for Regulator {
fn default() -> Self {
Regulator {
max_speed: 5.0,
timeout_threshold: Duration::from_secs(5),
max_concurrents_send: 1,
speed: Speed::default(),
concurent_notify: Notify::new(),
current_concurrents_send: 0,
tick_overhead: None,
}
}
}
impl fmt::Debug for Regulator {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Regulator")
.field("max_speed", &self.max_speed)
.field("timeout_threshold", &self.timeout_threshold)
.field("current_concurrents_send", &self.current_concurrents_send)
.field("max_concurrents_send", &self.max_concurrents_send)
.field("speed", &self.speed)
.field("tick_overhead", &self.tick_overhead)
.finish()
}
}
impl fmt::Display for Regulator {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(
f,
" - Tps : {} / {}",
self.speed.get_speed(),
self.max_speed
)?;
writeln!(
f,
" - Concurents transactions: {} / {}",
self.current_concurrents_send, self.max_concurrents_send
)?;
writeln!(
f,
" - Timeout Threshold : {} ms",
self.timeout_threshold.as_millis()
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::timeout;
const TPS: f64 = 25.0;
#[tokio::test]
async fn speed_test() {
const SLEEP_DURATION: Duration = Duration::from_millis(40);
let mut speed = Speed::new(4);
assert_eq!(None, speed.get_last_event());
assert_eq!(Speed::default(), speed);
assert!(Speed::default() <= speed);
assert_eq!("0 TPS (mean 0 ms)\n", speed.to_string().as_str());
speed.time();
assert!(speed.get_last_event().is_some());
tokio::time::sleep(SLEEP_DURATION).await;
for _ in 1..=3 {
speed.time();
tokio::time::sleep(SLEEP_DURATION).await;
}
speed.time();
let mut duration = speed.get_duration(TPS); assert!(
(25..=44).contains(&duration.as_millis()),
"Next duration: 25 <= {} < 44",
duration.as_millis()
);
tokio::time::sleep(SLEEP_DURATION).await;
let mean_duration = speed.get_mean_duration(); assert!(
(25..=44).contains(&mean_duration.as_millis()),
"Mean duration: 25 <= {} < 44",
mean_duration.as_millis()
);
let speed_tps = speed.get_speed().round(); assert!(
((TPS - 2.0)..=TPS).contains(&speed_tps),
"TPS: {} <= {} < {}",
TPS - 2.0,
speed_tps,
TPS
);
duration = speed.get_duration(TPS); assert!(
duration.as_millis() <= 4,
"Remain duration: {} <= 4",
duration.as_millis()
);
}
#[allow(clippy::needless_return)]
#[tokio::test]
async fn regulator_test() {
let mut regulator = Regulator::new(TPS, Duration::from_secs(3), 1, 5);
assert_eq!(0f64, regulator.get_speed());
assert_eq!(
" - Tps : 0 / 5\n - Concurents transactions: 0 / 1\n - Timeout Threshold : 5000 ms\n",
Regulator::default().to_string().as_str()
);
assert_eq!(
" - Tps : 0 / 25\n - Concurents transactions: 0 / 1\n - Timeout Threshold : 3000 ms\n",
regulator.to_string().as_str()
);
for _ in 1..=5 {
regulator.notify_send_transaction();
regulator.notify_receive_transaction(Duration::from_millis(10));
sleep(Duration::from_millis(40)).await;
}
let mut initial_time = Instant::now();
regulator.tick().await;
regulator.notify_send_transaction();
assert!(initial_time.elapsed() <= Duration::from_millis(1));
assert!(
timeout(Duration::from_millis(400), regulator.tick())
.await
.is_err()
);
regulator.notify_receive_transaction(Duration::from_millis(10));
for _ in 1..=5 {
regulator.notify_send_transaction();
regulator.notify_receive_transaction(Duration::from_millis(10));
sleep(Duration::from_millis(10)).await;
}
initial_time = Instant::now();
regulator.tick().await;
assert!(initial_time.elapsed() >= Duration::from_millis(100));
}
}