#![cfg_attr(not(feature = "smol_str"), allow(noop_method_call))]
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[cfg(not(feature = "minstant"))]
use std::time::Instant;
use std::path::{Path, PathBuf};
use std::thread::{self, JoinHandle};
use std::io;
use std::{mem, fs};
use hdrhistogram::{Histogram};
use hdrhistogram::serialization::V2DeflateSerializer;
use hdrhistogram::serialization::interval_log::{IntervalLogWriterBuilder};
use crossbeam_channel as channel;
use chrono::Utc;
#[cfg(feature = "minstant")]
use minstant::Instant;
#[cfg(feature = "smol_str")]
use smol_str::SmolStr;
pub type C = u64;
#[cfg(not(feature = "smol_str"))]
pub type SeriesName = &'static str;
#[cfg(feature = "smol_str")]
pub type SeriesName = SmolStr;
#[cfg(not(feature = "smol_str"))]
pub type Tag = &'static str;
#[cfg(feature = "smol_str")]
pub type Tag = SmolStr;
pub const SIG_FIG: u8 = 3;
pub const CHANNEL_SIZE: usize = 8;
pub const DROP_DEADLINE: Duration = Duration::from_millis(100);
pub fn nanos(d: Duration) -> u64 {
d.as_secs() * 1_000_000_000_u64 + (d.subsec_nanos() as u64)
}
pub struct HistLog {
filename: PathBuf,
series: SeriesName,
tag: Tag,
freq: Duration,
last_sent: Instant,
tx: channel::Sender<Option<Entry>>,
hist: Histogram<C>,
thread: Option<Arc<thread::JoinHandle<Result<usize, Error>>>>,
}
struct Entry {
pub tag: Tag,
pub start: SystemTime,
pub end: SystemTime,
pub hist: Histogram<C>,
}
#[derive(Debug)]
pub enum Error {
Io(io::Error),
HdrRecord(hdrhistogram::errors::RecordError),
TrySend(channel::TrySendError<()>),
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Error::Io(e) => write!(f, "Io({e})"),
Error::HdrRecord(e) => write!(f, "HdrRecord({e})"),
Error::TrySend(e) => write!(f, "TrySend({e})"),
}
}
}
impl std::error::Error for Error {}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Self {
Error::Io(err)
}
}
impl Clone for HistLog {
fn clone(&self) -> Self {
let thread = self.thread.as_ref().map(Arc::clone);
Self {
filename: self.filename.clone(),
series: self.series.clone(),
tag: self.tag.clone(),
freq: self.freq,
last_sent: Instant::now(),
tx: self.tx.clone(),
hist: self.hist.clone(),
thread,
}
}
}
impl HistLog {
#[cfg(not(feature = "smol_str"))]
pub fn new<P>(save_dir: P, series: SeriesName, tag: Tag, freq: Duration) -> Result<Self, Error>
where P: AsRef<Path>
{
Self::inner_new(save_dir, series, tag, freq)
}
#[cfg(feature = "smol_str")]
pub fn new<P, S, T>(save_dir: P, series: S, tag: T, freq: Duration) -> Result<Self, Error>
where P: AsRef<Path>,
S: AsRef<str>,
T: AsRef<str>
{
let series = SmolStr::new(series.as_ref());
let tag = SmolStr::new(tag.as_ref());
Self::inner_new(save_dir, series, tag, freq)
}
#[allow(clippy::needless_borrows_for_generic_args)]
fn inner_new<P>(save_dir: P, series: SeriesName, tag: Tag, freq: Duration) -> Result<Self, Error>
where P: AsRef<Path>
{
let save_dir = save_dir.as_ref().to_path_buf();
let scribe_series = series.clone();
let filename = Self::get_filename(&save_dir, &series);
let (tx, rx) = channel::bounded(CHANNEL_SIZE);
let thread = Some(Arc::new(Self::scribe(scribe_series, rx, filename.as_path())?));
let last_sent = Instant::now();
let hist = Histogram::new(SIG_FIG).expect("Histogram::new"); Ok(Self { filename, series, tag, freq, last_sent, tx, hist, thread })
}
#[doc(hidden)]
pub fn new_with_tag(&self, tag: Tag) -> Result<Self, Error> {
let mut save_dir = self.filename.clone();
if !save_dir.pop() { return Err(Error::Io(io::Error::new(io::ErrorKind::Other,
"`filename.pop()` returned `false`! expected it to have a file name, return `true`.")))
}
Self::new(save_dir, self.series.clone(), tag, self.freq)
}
pub fn path(&self) -> &Path { self.filename.as_path() }
#[cfg(not(feature = "smol_str"))]
pub fn clone_with_tag(&self, tag: Tag) -> Self {
self.inner_clone_with_tag(tag)
}
#[cfg(feature = "smol_str")]
pub fn clone_with_tag<T: AsRef<str>>(&self, tag: T) -> Self {
let tag = SmolStr::new(tag.as_ref());
self.inner_clone_with_tag(tag)
}
fn inner_clone_with_tag(&self, tag: Tag) -> Self {
assert!(self.thread.is_some(),
"self.thread cannot be `None` unless `HistLog` was already dropped");
let thread = self.thread.as_ref().map(Arc::clone).unwrap();
let tx = self.tx.clone();
Self {
filename: self.filename.clone(),
series: self.series.clone(),
tag,
freq: self.freq,
last_sent: Instant::now(),
tx,
hist: self.hist.clone(),
thread: Some(thread),
}
}
#[doc(hidden)]
pub fn clone_with_tag_and_freq(&self, tag: Tag, freq: Duration) -> Self {
let mut clone = self.clone_with_tag(tag);
clone.freq = freq;
clone
}
#[inline]
pub fn record(&mut self, value: u64) -> Result<(), Error> {
self.hist.record(value).map_err(Error::HdrRecord)
}
pub fn reset(&mut self) {
self.hist.clear();
self.last_sent = Instant::now();
}
fn send(&mut self, loop_time: Instant) {
let end = SystemTime::now();
let start = end - (loop_time - self.last_sent);
assert!(end > start, "end <= start!");
let mut next = Histogram::new_from(&self.hist);
mem::swap(&mut self.hist, &mut next);
self.tx.send(Some(Entry { tag: self.tag.clone(), start, end, hist: next })).ok(); self.last_sent = loop_time;
}
fn try_send(&mut self, loop_time: Instant) -> Result<(), Error>{
let end = SystemTime::now();
let start = end - (loop_time - self.last_sent);
assert!(end > start, "end <= start!");
let mut next = Histogram::new_from(&self.hist);
mem::swap(&mut self.hist, &mut next);
let entry = Entry { tag: self.tag.clone(), start, end, hist: next };
match self.tx.try_send(Some(entry)) {
Ok(_) => {
self.last_sent = loop_time;
Ok(())
}
Err(channel::TrySendError::Full(Some(Entry { mut hist, .. }))) => {
mem::swap(&mut self.hist, &mut hist);
Err(Error::TrySend(channel::TrySendError::Full(())))
}
Err(channel::TrySendError::Disconnected(_)) => {
Err(Error::TrySend(channel::TrySendError::Disconnected(())))
}
Err(channel::TrySendError::Full(None)) => {
Err(Error::TrySend(channel::TrySendError::Full(())))
}
}
}
pub fn check_send(&mut self, loop_time: Instant) -> bool {
let expired = loop_time > self.last_sent && loop_time - self.last_sent >= self.freq;
if expired { self.send(loop_time); }
expired
}
#[inline]
pub fn check_try_send(&mut self, loop_time: Instant) -> Result<bool, Error> {
let elapsed = loop_time.saturating_duration_since(self.last_sent);
let expired = elapsed >= self.freq;
if expired { self.try_send(loop_time)?; }
Ok(expired)
}
fn get_filename<S: AsRef<str>>(save_dir: &Path, series: S) -> PathBuf {
use rand::prelude::*;
let now = Utc::now();
let id: u32 = thread_rng().gen();
let series: &str = series.as_ref();
let filename =
format!("{series}.{time}.{id:x}.hlog",
series = series,
time = now.format("%Y-%m-%d-%H%M%SZ"));
save_dir.join(filename)
}
fn ensure_parent_dir_exists<P: AsRef<Path>>(path: P) -> Result<(), io::Error> {
let path: &Path = path.as_ref();
match path.parent() {
Some(parent) if parent.exists() => Ok(()),
Some(parent) => {
std::fs::create_dir_all(parent)?;
Ok(())
}
None => {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("parent path (of {}) is not a directory", path.display()),
))
}
}
}
fn scribe(
series: SeriesName,
rx: channel::Receiver<Option<Entry>>,
filename: &Path,
) -> Result<JoinHandle<Result<usize, Error>>, Error> {
let mut ser = V2DeflateSerializer::new();
let start_time = SystemTime::now();
Self::ensure_parent_dir_exists(filename)?;
let file = fs::File::create(filename).map_err(Error::Io)?;
thread::Builder::new().name(format!("histlog:{}", series)).spawn(move || {
let mut buf = io::LineWriter::new(file);
let mut wtr =
IntervalLogWriterBuilder::new()
.with_base_time(UNIX_EPOCH)
.with_start_time(start_time)
.begin_log_with(&mut buf, &mut ser)
.map_err(Error::Io)?; let mut n_rcvd = 0;
loop {
match rx.recv() {
Ok(Some(Entry { tag, start, end, hist })) => {
#[cfg(feature = "smol_str")]
let tag: &str = tag.as_ref();
wtr.write_histogram(&hist, start.duration_since(UNIX_EPOCH).unwrap(),
end.duration_since(start).unwrap(), hdrhistogram::serialization::interval_log::Tag::new(tag))
.ok();
n_rcvd += 1;
}
Ok(None) => break,
_ => thread::sleep(Duration::new(0, 1)), }
}
Ok(n_rcvd)
}).map_err(Error::Io)
}
}
impl Drop for HistLog {
fn drop(&mut self) {
if !self.hist.is_empty() { self.send(Instant::now()) }
if let Some(arc) = self.thread.take() {
if let Ok(thread) = Arc::try_unwrap(arc) {
let start = Instant::now();
while Instant::now() - start < DROP_DEADLINE {
match self.tx.try_send(None) {
Ok(_) => {
let _ = thread.join();
break
}
Err(channel::TrySendError::Full(_)) => {}
Err(_) => {
break
}
}
}
}
}
}
}
#[allow(unused)]
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn create_histlog_record_one_and_drop() {
let mut hist = HistLog::new("/tmp/histlog", "test", "red", Duration::from_millis(1)).unwrap();
for i in 0..1000u64 {
hist.record(i).unwrap();
}
assert_eq!(hist.check_send(Instant::now()), false);
assert!(hist.check_try_send(Instant::now()).is_ok());
assert_eq!(hist.check_try_send(Instant::now()).unwrap(), false);
thread::sleep(Duration::from_millis(3));
assert_eq!(hist.check_send(Instant::now()), true);
let path = hist.filename.clone();
drop(hist);
assert!(path.exists());
}
#[test]
fn clone_it() {
let mut hist = HistLog::new("/tmp/histlog", "test", "red", Duration::from_millis(1)).unwrap();
let tx = hist.tx.clone();
let mut a = hist.clone_with_tag("blue");
for i in 0..1000u64 {
hist.record(i).unwrap();
a.record(i * 2).unwrap();
}
drop(hist);
drop(a);
match tx.try_send(None) {
Err(channel::TrySendError::Disconnected(None)) => {},
other => panic!("unexpected variant: {:?}", other)
}
}
#[test]
fn generated_hlog_filenames_are_unique() {
let save_dir = Path::new("a/b/c");
let series = "d-e-f";
let filenames: Vec<PathBuf> = (0..1000)
.map(|_| HistLog::get_filename(&save_dir, &series))
.collect();
let unique = filenames.iter().cloned().collect::<std::collections::HashSet<_>>();
assert_eq!(filenames.len(), unique.len());
}
}