use std::cmp::Ordering;
use std::error::Error;
use std::fmt::Write;
use std::str::FromStr;
use std::{fmt, io, ops, str, time};
use nom::branch::alt;
use nom::bytes::complete::{tag, take, take_until, take_while1};
use nom::character::complete::char;
use nom::character::is_digit;
use nom::combinator::{complete, map_res, opt, recognize};
use nom::error::ErrorKind;
use nom::number::complete::double;
use nom::{Err, IResult};
use super::super::{Counter, Histogram};
use super::Serializer;
pub struct IntervalLogWriterBuilder {
comments: Vec<String>,
start_time: Option<f64>,
base_time: Option<f64>,
max_value_divisor: f64,
}
impl Default for IntervalLogWriterBuilder {
fn default() -> Self {
Self::new()
}
}
impl IntervalLogWriterBuilder {
pub fn new() -> IntervalLogWriterBuilder {
IntervalLogWriterBuilder {
comments: Vec::new(),
start_time: None,
base_time: None,
max_value_divisor: 1.0,
}
}
pub fn add_comment(&mut self, s: &str) -> &mut Self {
self.comments.push(s.to_owned());
self
}
pub fn with_start_time(&mut self, time: time::SystemTime) -> &mut Self {
self.start_time = Some(system_time_as_fp_seconds(time));
self
}
pub fn with_base_time(&mut self, time: time::SystemTime) -> &mut Self {
self.base_time = Some(system_time_as_fp_seconds(time));
self
}
pub fn with_max_value_divisor(&mut self, max_value_divisor: f64) -> &mut Self {
self.max_value_divisor = max_value_divisor;
self
}
#[allow(clippy::float_cmp)]
pub fn begin_log_with<'a, 'b, W: 'a + io::Write, S: 'b + Serializer>(
&self,
writer: &'a mut W,
serializer: &'b mut S,
) -> Result<IntervalLogWriter<'a, 'b, W, S>, io::Error> {
let mut internal_writer = InternalLogWriter {
writer,
serializer,
text_buf: String::new(),
serialize_buf: Vec::new(),
max_value_divisor: self.max_value_divisor,
};
for c in &self.comments {
internal_writer.write_comment(&c)?;
}
if let Some(st) = self.start_time {
internal_writer.write_fmt(format_args!(
"#[StartTime: {:.3} (seconds since epoch)]\n",
st
))?;
}
if let Some(bt) = self.base_time {
internal_writer.write_fmt(format_args!(
"#[BaseTime: {:.3} (seconds since epoch)]\n",
bt
))?;
}
if self.max_value_divisor != 1.0_f64 {
internal_writer.write_fmt(format_args!(
"#[MaxValueDivisor: {:.3}]\n",
self.max_value_divisor
))?;
}
Ok(IntervalLogWriter { internal_writer })
}
}
pub struct IntervalLogWriter<'a, 'b, W: 'a + io::Write, S: 'b + Serializer> {
internal_writer: InternalLogWriter<'a, 'b, W, S>,
}
impl<'a, 'b, W: 'a + io::Write, S: 'b + Serializer> IntervalLogWriter<'a, 'b, W, S> {
pub fn write_comment(&mut self, s: &str) -> io::Result<()> {
self.internal_writer.write_comment(s)
}
pub fn write_histogram<T: Counter>(
&mut self,
h: &Histogram<T>,
start_timestamp: time::Duration,
duration: time::Duration,
tag: Option<Tag>,
) -> Result<(), IntervalLogWriterError<S::SerializeError>> {
self.internal_writer
.write_histogram(h, start_timestamp, duration, tag)
}
}
#[derive(Debug)]
pub enum IntervalLogWriterError<E> {
SerializeError(E),
IoError(io::Error),
}
impl<E> From<io::Error> for IntervalLogWriterError<E> {
fn from(e: io::Error) -> Self {
IntervalLogWriterError::IoError(e)
}
}
impl<E: fmt::Display + fmt::Debug> fmt::Display for IntervalLogWriterError<E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
IntervalLogWriterError::SerializeError(e) => {
write!(f, "Histogram serialization failed: {}", e)
}
IntervalLogWriterError::IoError(e) => write!(f, "An i/o error occurred: {}", e),
}
}
}
impl<E: Error + 'static> Error for IntervalLogWriterError<E> {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
IntervalLogWriterError::SerializeError(e) => Some(e),
IntervalLogWriterError::IoError(e) => Some(e),
}
}
}
struct InternalLogWriter<'a, 'b, W: 'a + io::Write, S: 'b + Serializer> {
writer: &'a mut W,
serializer: &'b mut S,
text_buf: String,
serialize_buf: Vec<u8>,
max_value_divisor: f64,
}
impl<'a, 'b, W: 'a + io::Write, S: 'b + Serializer> InternalLogWriter<'a, 'b, W, S> {
fn write_fmt(&mut self, args: fmt::Arguments) -> io::Result<()> {
self.writer.write_fmt(args)
}
fn write_comment(&mut self, s: &str) -> io::Result<()> {
for l in s.split('\n') {
writeln!(self.writer, "#{}", l)?;
}
Ok(())
}
fn write_histogram<T: Counter>(
&mut self,
h: &Histogram<T>,
start_timestamp: time::Duration,
duration: time::Duration,
tag: Option<Tag>,
) -> Result<(), IntervalLogWriterError<S::SerializeError>> {
self.serialize_buf.clear();
self.text_buf.clear();
if let Some(Tag(s)) = tag {
write!(self.text_buf, "Tag={},", &s).expect("Writes to a String can't fail");
}
write!(
self.writer,
"{}{:.3},{:.3},{:.3},",
self.text_buf,
duration_as_fp_seconds(start_timestamp),
duration_as_fp_seconds(duration),
h.max() as f64 / self.max_value_divisor )?;
self.text_buf.clear();
let _len = self
.serializer
.serialize(h, &mut self.serialize_buf)
.map_err(IntervalLogWriterError::SerializeError)?;
base64::encode_config_buf(&self.serialize_buf, base64::STANDARD, &mut self.text_buf);
self.writer.write_all(self.text_buf.as_bytes())?;
self.writer.write_all(b"\n")?;
Ok(())
}
}
#[derive(Debug, PartialEq, Clone, Copy)]
pub struct Tag<'a>(&'a str);
impl<'a> Tag<'a> {
pub fn new(s: &'a str) -> Option<Tag<'a>> {
if s.chars()
.any(|c| c == ',' || c == '\r' || c == '\n' || c == ' ')
{
None
} else {
Some(Tag(s))
}
}
pub fn as_str(&self) -> &'a str {
self.0
}
}
impl<'a> ops::Deref for Tag<'a> {
type Target = str;
fn deref(&self) -> &Self::Target {
self.as_str()
}
}
#[derive(PartialEq, Debug)]
pub struct IntervalLogHistogram<'a> {
tag: Option<Tag<'a>>,
start_timestamp: time::Duration,
duration: time::Duration,
max: f64,
encoded_histogram: &'a str,
}
impl<'a> IntervalLogHistogram<'a> {
pub fn tag(&self) -> Option<Tag<'a>> {
self.tag
}
pub fn start_timestamp(&self) -> time::Duration {
self.start_timestamp
}
pub fn duration(&self) -> time::Duration {
self.duration
}
pub fn max(&self) -> f64 {
self.max
}
pub fn encoded_histogram(&self) -> &'a str {
self.encoded_histogram
}
}
#[derive(PartialEq, Debug)]
#[allow(variant_size_differences)]
pub enum LogEntry<'a> {
StartTime(time::Duration),
BaseTime(time::Duration),
Interval(IntervalLogHistogram<'a>),
}
#[derive(Debug, PartialEq)]
pub enum LogIteratorError {
ParseError {
offset: usize,
},
}
pub struct IntervalLogIterator<'a> {
orig_len: usize,
input: &'a [u8],
ended: bool,
}
impl<'a> IntervalLogIterator<'a> {
pub fn new(input: &'a [u8]) -> IntervalLogIterator<'a> {
IntervalLogIterator {
orig_len: input.len(),
input,
ended: false,
}
}
}
impl<'a> Iterator for IntervalLogIterator<'a> {
type Item = Result<LogEntry<'a>, LogIteratorError>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.ended {
return None;
}
if self.input.is_empty() {
self.ended = true;
return None;
}
if let Ok((rest, e)) = log_entry(self.input) {
self.input = rest;
return Some(Ok(e));
}
match ignored_line(self.input) {
Ok((rest, _)) => {
self.input = rest;
continue;
}
_ => {
self.ended = true;
return Some(Err(LogIteratorError::ParseError {
offset: self.orig_len - self.input.len(),
}));
}
}
}
}
}
fn duration_as_fp_seconds(d: time::Duration) -> f64 {
d.as_secs() as f64 + f64::from(d.subsec_nanos()) / 1_000_000_000_f64
}
fn system_time_as_fp_seconds(time: time::SystemTime) -> f64 {
match time.duration_since(time::UNIX_EPOCH) {
Ok(dur_after_epoch) => duration_as_fp_seconds(dur_after_epoch),
Err(t) => duration_as_fp_seconds(t.duration()) * -1_f64,
}
}
fn start_time(input: &[u8]) -> IResult<&[u8], LogEntry> {
let (input, _) = tag("#[StartTime: ")(input)?;
let (input, duration) = fract_sec_duration(input)?;
let (input, _) = char(' ')(input)?;
let (input, _) = take_until("\n")(input)?;
let (input, _) = take(1_usize)(input)?;
Ok((input, LogEntry::StartTime(duration)))
}
fn base_time(input: &[u8]) -> IResult<&[u8], LogEntry> {
let (input, _) = tag("#[BaseTime: ")(input)?;
let (input, duration) = fract_sec_duration(input)?;
let (input, _) = char(' ')(input)?;
let (input, _) = take_until("\n")(input)?;
let (input, _) = take(1_usize)(input)?;
Ok((input, LogEntry::BaseTime(duration)))
}
fn tag_bytes(input: &[u8]) -> IResult<&[u8], &[u8]> {
let (input, _) = tag("Tag=")(input)?;
let (input, tag) = take_until(",")(input)?;
let (input, _) = take(1_usize)(input)?;
Ok((input, tag))
}
fn tag_parser(input: &[u8]) -> IResult<&[u8], Tag> {
let (input, tag) = map_res(tag_bytes, str::from_utf8)(input)?;
Ok((input, Tag(tag)))
}
fn interval_hist(input: &[u8]) -> IResult<&[u8], LogEntry> {
let (input, tag) = opt(tag_parser)(input)?;
let (input, start_timestamp) = fract_sec_duration(input)?;
let (input, _) = char(',')(input)?;
let (input, duration) = fract_sec_duration(input)?;
let (input, _) = char(',')(input)?;
let (input, max) = double(input)?;
let (input, _) = char(',')(input)?;
let (input, encoded_histogram) = map_res(take_until("\n"), str::from_utf8)(input)?;
let encoded_histogram = encoded_histogram.trim_end_matches('\r');
let (input, _) = take(1_usize)(input)?;
Ok((
input,
LogEntry::Interval(IntervalLogHistogram {
tag,
start_timestamp,
duration,
max,
encoded_histogram,
}),
))
}
fn log_entry(input: &[u8]) -> IResult<&[u8], LogEntry<'_>> {
complete(alt((start_time, base_time, interval_hist)))(input)
}
fn comment_line(input: &[u8]) -> IResult<&[u8], ()> {
let (input, _) = tag("#")(input)?;
let (input, _) = take_until("\n")(input)?;
let (input, _) = take(1_usize)(input)?;
Ok((input, ()))
}
fn legend(input: &[u8]) -> IResult<&[u8], ()> {
let (input, _) = tag("\"StartTimestamp\"")(input)?;
let (input, _) = take_until("\n")(input)?;
let (input, _) = take(1_usize)(input)?;
Ok((input, ()))
}
fn ignored_line(input: &[u8]) -> IResult<&[u8], ()> {
alt((comment_line, legend))(input)
}
fn fract_sec_duration(input: &[u8]) -> IResult<&[u8], time::Duration> {
let (rest, data) = fract_sec_tuple(input)?;
let (secs, nanos_str) = data;
let nanos_parse_res = match nanos_str.len().cmp(&9) {
Ordering::Greater => nanos_str[0..9].parse::<u32>(),
Ordering::Equal => nanos_str.parse::<u32>(),
Ordering::Less => nanos_str
.parse::<u32>()
.map(|n| n * 10_u32.pow(9 - nanos_str.len() as u32)),
};
if let Ok(nanos) = nanos_parse_res {
return Ok((rest, time::Duration::new(secs, nanos)));
}
Err(Err::Error(error_position!(input, ErrorKind::Alpha)))
}
type FResult<'a> = IResult<&'a [u8], (u64, &'a str)>;
fn fract_sec_tuple(input: &[u8]) -> FResult {
let (input, secs) = map_res(
map_res(recognize(take_until(".")), str::from_utf8),
u64::from_str,
)(input)?;
let (input, _) = tag(".")(input)?;
let (input, nanos_str) = map_res(complete(take_while1(is_digit)), str::from_utf8)(input)?;
Ok((input, (secs, nanos_str)))
}
#[cfg(test)]
mod tests;