use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
const MAX_SAMPLES_PER_KIND: usize = 10_000;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OpKind {
Read,
Write,
}
#[derive(Debug, Default)]
pub struct DbRecorder {
read_samples: Mutex<Vec<i64>>,
write_samples: Mutex<Vec<i64>>,
dropped_since_drain: AtomicU64,
}
#[derive(Debug, Default)]
pub struct DrainedSamples {
pub read: Vec<i64>,
pub write: Vec<i64>,
pub dropped: u64,
}
impl DbRecorder {
pub fn record(&self, kind: OpKind, ms: i64) {
let bucket = match kind {
OpKind::Read => &self.read_samples,
OpKind::Write => &self.write_samples,
};
if let Ok(mut g) = bucket.lock() {
if g.len() < MAX_SAMPLES_PER_KIND {
g.push(ms);
} else {
self.dropped_since_drain.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn drain(&self) -> DrainedSamples {
let read = self
.read_samples
.lock()
.map(|mut g| std::mem::take(&mut *g))
.unwrap_or_default();
let write = self
.write_samples
.lock()
.map(|mut g| std::mem::take(&mut *g))
.unwrap_or_default();
let dropped = self.dropped_since_drain.swap(0, Ordering::Relaxed);
DrainedSamples {
read,
write,
dropped,
}
}
}
#[derive(Debug)]
pub struct OpTimer<'r> {
recorder: &'r DbRecorder,
kind: OpKind,
start: Instant,
}
impl<'r> OpTimer<'r> {
#[must_use]
pub fn read(recorder: &'r DbRecorder) -> Self {
Self::with_kind(recorder, OpKind::Read)
}
#[must_use]
pub fn write(recorder: &'r DbRecorder) -> Self {
Self::with_kind(recorder, OpKind::Write)
}
fn with_kind(recorder: &'r DbRecorder, kind: OpKind) -> Self {
Self {
recorder,
kind,
start: Instant::now(),
}
}
}
impl Drop for OpTimer<'_> {
fn drop(&mut self) {
let ms = i64::try_from(self.start.elapsed().as_millis()).unwrap_or(i64::MAX);
self.recorder.record(self.kind, ms);
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::unwrap_used,
reason = "unit tests crash loudly on setup failure"
)]
use super::*;
#[test]
fn recorder_buckets_by_kind_and_drains() {
let r = DbRecorder::default();
r.record(OpKind::Read, 5);
r.record(OpKind::Write, 10);
r.record(OpKind::Read, 7);
let d = r.drain();
assert_eq!(d.read, vec![5, 7]);
assert_eq!(d.write, vec![10]);
let empty = r.drain();
assert!(
empty.read.is_empty() && empty.write.is_empty(),
"drain leaves both buckets empty"
);
}
#[test]
fn record_caps_each_bucket_and_counts_drops() {
let r = DbRecorder::default();
for _ in 0..MAX_SAMPLES_PER_KIND {
r.record(OpKind::Read, 1);
}
for _ in 0..50 {
r.record(OpKind::Read, 999);
}
r.record(OpKind::Write, 7);
let d = r.drain();
assert_eq!(
d.read.len(),
MAX_SAMPLES_PER_KIND,
"read bucket capped at MAX_SAMPLES_PER_KIND"
);
assert!(
d.read.iter().all(|&n| n == 1),
"no past-cap (999) sample should have leaked into the kept buffer"
);
assert_eq!(d.write, vec![7]);
assert_eq!(
d.dropped, 50,
"exactly the 50 past-cap pushes count as drops"
);
r.record(OpKind::Read, 2);
let d2 = r.drain();
assert_eq!(d2.read, vec![2]);
assert_eq!(d2.dropped, 0, "drop counter resets on drain");
}
#[test]
fn op_timer_records_into_the_right_bucket() {
let r = DbRecorder::default();
{
let _t = OpTimer::write(&r);
std::thread::sleep(std::time::Duration::from_millis(5));
}
{
let _t = OpTimer::read(&r);
std::thread::sleep(std::time::Duration::from_millis(2));
}
let d = r.drain();
assert_eq!(d.read.len(), 1, "read bucket has the read sample");
assert_eq!(d.write.len(), 1, "write bucket has the write sample");
assert!(d.read[0] >= 2);
assert!(d.write[0] >= 5);
}
}