use std::time::Instant;
use crate::channel::UnboundedChannelSender;
use crate::network::Coord;
use crate::profiler::metrics::{ProfilerBucket, ProfilerResult};
use crate::profiler::{get_sender, Profiler, TimePoint};
const BUCKET_RESOLUTION_MS: TimePoint = 20;
#[derive(Clone, Debug)]
pub struct ProfilerBackend {
thread_name: String,
start: Instant,
buckets: Vec<ProfilerBucket>,
sender: UnboundedChannelSender<ProfilerResult>,
}
impl ProfilerBackend {
pub fn new(start: Instant) -> Self {
Self {
thread_name: std::thread::current()
.name()
.unwrap_or("unnamed")
.to_string(),
start,
buckets: vec![ProfilerBucket::new(0)],
sender: get_sender(),
}
}
fn now(&self) -> TimePoint {
self.start.elapsed().as_millis() as TimePoint
}
#[inline]
fn bucket(&mut self) -> &mut ProfilerBucket {
let now = self.now();
if now >= self.buckets.last().unwrap().start_ms + BUCKET_RESOLUTION_MS {
let start = now - now % BUCKET_RESOLUTION_MS;
self.buckets.push(ProfilerBucket::new(start));
}
self.buckets.last_mut().unwrap()
}
}
impl Drop for ProfilerBackend {
fn drop(&mut self) {
self.sender
.send(ProfilerResult {
thread_name: std::mem::take(&mut self.thread_name),
buckets: std::mem::take(&mut self.buckets),
})
.unwrap();
}
}
impl Profiler for ProfilerBackend {
#[inline]
fn items_in(&mut self, from: Coord, to: Coord, amount: usize) {
let entry = self.bucket().metrics.items_in.entry((from, to));
*entry.or_default() += amount;
}
#[inline]
fn items_out(&mut self, from: Coord, to: Coord, amount: usize) {
let entry = self.bucket().metrics.items_out.entry((from, to));
*entry.or_default() += amount;
}
#[inline]
fn net_bytes_in(&mut self, from: Coord, to: Coord, amount: usize) {
let entry = self.bucket().metrics.net_messages_in.entry((from, to));
let entry = entry.or_default();
entry.0 += 1;
entry.1 += amount;
}
#[inline]
fn net_bytes_out(&mut self, from: Coord, to: Coord, amount: usize) {
let entry = self.bucket().metrics.net_messages_out.entry((from, to));
let entry = entry.or_default();
entry.0 += 1;
entry.1 += amount;
}
#[inline]
fn iteration_boundary(&mut self, leader_block_id: usize) {
let now = self.now();
self.bucket()
.metrics
.iteration_boundaries
.push((leader_block_id, now))
}
}