use ::log::info;
use ::std::{collections::VecDeque, time::Instant};
use crate::mem::{
PhysicalMemory, PhysicalMemoryMapping, PhysicalMemoryMetadata, PhysicalReadMemOps,
PhysicalWriteMemOps,
};
use crate::{error::Result, mem::MemOps};
pub struct PhysicalMemoryMetrics<T> {
mem: T,
reads: MemOpsHistory,
last_read_info: Instant,
writes: MemOpsHistory,
last_write_info: Instant,
}
impl<T> Clone for PhysicalMemoryMetrics<T>
where
T: Clone,
{
fn clone(&self) -> Self {
Self {
mem: self.mem.clone(),
reads: self.reads.clone(),
last_read_info: Instant::now(),
writes: self.writes.clone(),
last_write_info: Instant::now(),
}
}
}
impl<T: PhysicalMemory> PhysicalMemoryMetrics<T> {
pub fn new(mem: T) -> Self {
Self {
mem,
reads: MemOpsHistory::new(0..100, 1.0),
last_read_info: Instant::now(),
writes: MemOpsHistory::new(0..100, 1.0),
last_write_info: Instant::now(),
}
}
pub fn into_inner(self) -> T {
self.mem
}
}
impl<T: PhysicalMemory> PhysicalMemory for PhysicalMemoryMetrics<T> {
#[inline]
fn phys_read_raw_iter(
&mut self,
MemOps { inp, out_fail, out }: PhysicalReadMemOps,
) -> Result<()> {
let mut number_of_bytes = 0;
let iter = inp.inspect(|e| number_of_bytes += e.2.len());
let start_time = Instant::now();
let mem = &mut self.mem;
let result = MemOps::with_raw(iter, out, out_fail, |data| mem.phys_read_raw_iter(data));
self.reads
.add(start_time.elapsed().as_secs_f64(), number_of_bytes);
if self.last_read_info.elapsed().as_secs_f64() >= 1f64 {
info!(
"Read Metrics: reads_per_second={} average_latency={:.4}ms; average_bytes={}; bytes_per_second={}",
self.reads.len(),
self.reads.average_latency().unwrap_or_default() * 1000f64,
self.reads.average_bytes().unwrap_or_default(),
self.reads.bandwidth().unwrap_or_default(),
);
self.last_read_info = Instant::now();
}
result
}
#[inline]
fn phys_write_raw_iter(
&mut self,
MemOps { inp, out_fail, out }: PhysicalWriteMemOps,
) -> Result<()> {
let mut number_of_bytes = 0;
let iter = inp.inspect(|e| number_of_bytes += e.2.len());
let start_time = Instant::now();
let mem = &mut self.mem;
let result = MemOps::with_raw(iter, out, out_fail, |data| mem.phys_write_raw_iter(data));
self.writes
.add(start_time.elapsed().as_secs_f64(), number_of_bytes);
if self.last_write_info.elapsed().as_secs_f64() >= 1f64 {
info!(
"Write Metrics: writes_per_second={} average_latency={:.4}ms; average_bytes={}; bytes_per_second={}",
self.writes.len(),
self.writes.average_latency().unwrap_or_default() * 1000f64,
self.writes.average_bytes().unwrap_or_default(),
self.writes.bandwidth().unwrap_or_default(),
);
self.last_write_info = Instant::now();
}
result
}
#[inline]
fn metadata(&self) -> PhysicalMemoryMetadata {
self.mem.metadata()
}
#[inline]
fn set_mem_map(&mut self, mem_map: &[PhysicalMemoryMapping]) {
self.mem.set_mem_map(mem_map)
}
}
#[cfg(feature = "plugins")]
::cglue::cglue_impl_group!(
PhysicalMemoryMetrics<T: PhysicalMemory>,
crate::plugins::ConnectorInstance,
{}
);
#[derive(Clone, Debug)]
struct MemOpsHistory {
start_time: Instant,
min_len: usize,
max_len: usize,
max_age: f32,
total_count: u64,
values: VecDeque<(f64, MemOpsHistoryEntry)>,
}
#[derive(Clone, Copy, Debug)]
struct MemOpsHistoryEntry {
pub latency: f64, pub bytes: usize, }
#[allow(unused)]
impl MemOpsHistory {
pub fn new(length_range: std::ops::Range<usize>, max_age: f32) -> Self {
Self {
start_time: Instant::now(),
min_len: length_range.start,
max_len: length_range.end,
max_age,
total_count: 0,
values: Default::default(),
}
}
#[inline]
pub fn max_len(&self) -> usize {
self.max_len
}
#[inline]
pub fn max_age(&self) -> f32 {
self.max_age
}
#[inline]
pub fn is_empty(&self) -> bool {
self.values.is_empty()
}
#[inline]
pub fn len(&self) -> usize {
self.values.len()
}
#[inline]
pub fn total_count(&self) -> u64 {
self.total_count
}
#[inline]
pub fn clear(&mut self) {
self.values.clear();
}
pub fn add(&mut self, latency: f64, bytes: usize) {
let now = self.start_time.elapsed().as_secs_f64();
if let Some((last_time, _)) = self.values.back() {
assert!(now >= *last_time, "Time shouldn't move backwards");
}
self.total_count += 1;
self.values
.push_back((now, MemOpsHistoryEntry { latency, bytes }));
self.flush();
}
pub fn mean_time_interval(&self) -> Option<f64> {
if let (Some(first), Some(last)) = (self.values.front(), self.values.back()) {
let n = self.len();
if n >= 2 {
Some((last.0 - first.0) / ((n - 1) as f64))
} else {
None
}
} else {
None
}
}
pub fn rate(&self) -> Option<f64> {
self.mean_time_interval().map(|time| 1.0 / time)
}
pub fn flush(&mut self) {
let now = self.start_time.elapsed().as_secs_f64();
while self.values.len() > self.max_len {
self.values.pop_front();
}
while self.values.len() > self.min_len {
if let Some((front_time, _)) = self.values.front() {
if *front_time < now - (self.max_age as f64) {
self.values.pop_front();
} else {
break;
}
} else {
break;
}
}
}
#[inline]
pub fn sum_latency(&self) -> f64 {
self.values.iter().map(|(_, value)| value.latency).sum()
}
pub fn average_latency(&self) -> Option<f64> {
let num = self.len();
if num > 0 {
Some(self.sum_latency() / (num as f64))
} else {
None
}
}
#[inline]
pub fn sum_bytes(&self) -> usize {
self.values.iter().map(|(_, value)| value.bytes).sum()
}
pub fn average_bytes(&self) -> Option<usize> {
let num = self.len();
if num > 0 {
Some((self.sum_bytes() as f64 / (num as f64)) as usize)
} else {
None
}
}
pub fn bandwidth(&self) -> Option<usize> {
Some((self.average_bytes()? as f64 * self.rate()?) as usize)
}
}