use crate::{
graph_config::{
DataSource, EventDeltaSpec, FieldCaptureSpec, InputFilesContext, TimestampFormat, YAxis,
},
logging::APPV,
match_preview_cli_builder::{MatchPreviewConfig, SharedMatchPreviewContext},
resolved_graph_config::{ResolvedGraphConfig, ResolvedLine},
};
use chrono::{NaiveDate, NaiveDateTime, NaiveTime, ParseError, TimeDelta};
use regex::Regex;
use serde::Deserialize;
use statrs::statistics::{Data, OrderStatistics, Statistics};
use std::{
collections::HashMap,
fs::{self, File},
io::{self, BufRead, BufReader, Write},
path::{Path, PathBuf},
time::UNIX_EPOCH,
};
use tracing::{Level, debug, info, trace, warn};
use tracing_subscriber::{EnvFilter, Layer, Registry, layer::SubscriberExt};
const LOG_TARGET: &str = "csv";
pub const MATCH_PREVIEW: &str = "match-preview";
const RECORD_DATE_FORMAT: &str = "%Y-%m-%d";
const RECORD_TIME_FORMAT: &str = "%H:%M:%S%.3f";
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Regex error: {0}")]
Regex(#[from] regex::Error),
#[error("I/O Error while accessing file: '{0}': '{1}'")]
FileIoError(PathBuf, io::Error),
#[error("Invalid input file '{0}': '{1}'")]
InvalidInputFile(PathBuf, String),
#[error("Field regex shall have 1 or 2 capture groups. Regex: {0}")]
RegexCapturesGroupsInvalidCount(String),
#[error("User provided time range parsing error: {0}")]
TimeRangeParsingError(#[from] ParseError),
#[error("Timestamp extraction failed: file:'{0}' format:'{1:?}', line:'{2}' ")]
TimestampExtractionFailure(PathBuf, TimestampFormat, String),
#[error("CSV parse error file:'{0}' error:'{1}' ")]
CsvParseError(PathBuf, csv::Error),
#[error("Cat command supports only one input file.")]
CatCmdManyInputFiles,
}
impl Error {
fn new_file_io_error(f: &Path, e: io::Error) -> Self {
Self::FileIoError(f.to_path_buf(), e)
}
}
#[derive(Debug)]
struct ProcessingState {
count: u64,
value_sum: f64,
last_timestamp: Option<ExtractedNaiveDateTime>,
}
#[derive(Debug)]
struct LogRecord {
pub date: Option<String>,
pub time: String,
pub value: f64,
pub count: u64,
pub diff: Option<f64>,
pub value_sum: f64,
}
#[derive(Debug)]
struct LineProcessor {
data_source: DataSource,
pub regex: Regex,
pub state: ProcessingState,
pub records: Vec<LogRecord>,
pub output_path: Option<PathBuf>,
pub timestamp_format: TimestampFormat,
timestamp_extraction_failure_count: usize,
input_file_name: PathBuf,
ignore_invalid_timestamps: bool,
pub global_guards: Vec<String>,
}
impl LineProcessor {
pub fn from_data_source_with_global_guards(
data_source: DataSource,
output_path: Option<PathBuf>,
timestamp_format: TimestampFormat,
input_file_name: PathBuf,
ignore_invalid_timestamps: bool,
global_guards: Vec<String>,
) -> Result<Self, Error> {
let regex = data_source.compile_regex()?;
Ok(Self {
data_source,
regex,
output_path,
timestamp_format,
state: ProcessingState::new(),
records: Vec::new(),
timestamp_extraction_failure_count: 0,
input_file_name,
ignore_invalid_timestamps,
global_guards,
})
}
pub fn from_data_source(
data_source: DataSource,
output_path: Option<PathBuf>,
timestamp_format: TimestampFormat,
input_file_name: PathBuf,
ignore_invalid_timestamps: bool,
) -> Result<Self, Error> {
Self::from_data_source_with_global_guards(
data_source,
output_path,
timestamp_format,
input_file_name,
ignore_invalid_timestamps,
vec![],
)
}
fn extract_timestamp<'a>(
&self,
line: &'a str,
) -> Result<(ExtractedNaiveDateTime, &'a str), ParseError> {
let result = self.timestamp_format.extract_timestamp(line);
trace!(target:MATCH_PREVIEW, timestamp_format=?self.timestamp_format, "extract_timestamp");
debug!(target:MATCH_PREVIEW, result=?result.map(|r|r.0), "extract_timestamp");
result
}
fn handle_timestamp_extraction_failure(&mut self, line: &str) -> Result<(), Error> {
self.timestamp_extraction_failure_count += 1;
if !self.ignore_invalid_timestamps && self.timestamp_extraction_failure_count > 3 {
warn!(target:APPV, log_line = line,
timestamp_format=?self.timestamp_format,
"Timestamp extraction failed for {} lines. Exiting.", self.timestamp_extraction_failure_count);
Err(Error::TimestampExtractionFailure(
self.input_file_name.clone(),
self.timestamp_format.clone(),
line.to_string(),
))
} else {
Ok(())
}
}
pub fn guard_matches(&self, log_line: &str) -> bool {
self.data_source
.guard()
.iter()
.chain(self.global_guards.iter())
.all(|g| log_line.contains(g))
}
pub fn try_match<'a>(
&mut self,
line: &'a str,
) -> Result<(bool, Option<(regex::Captures<'a>, ExtractedNaiveDateTime)>), Error> {
if self.guard_matches(line) {
if tracing::event_enabled!(target:MATCH_PREVIEW, Level::TRACE) {
trace!(target:MATCH_PREVIEW, "try_match: line:\"{line}\"");
} else {
info!(target:MATCH_PREVIEW, "try_match: line:\"{line}\"");
}
if let Ok((timestamp, remainder)) = self.extract_timestamp(line) {
let captures = self.regex.captures(remainder).map(|capture| (capture, timestamp));
if tracing::event_enabled!(Level::TRACE) {
trace!(target:MATCH_PREVIEW, "try_match remainder={remainder} regex={:#?} captures={captures:#?}", self.regex);
} else {
debug!(target:MATCH_PREVIEW, "try_match: line remainder: \"{remainder}\"");
if let Some((captures, _)) = &captures {
if let Some(c) = captures.get(1) {
debug!(target:MATCH_PREVIEW, "try_match: (value) captures[1]={c:?}");
};
if let Some(c) = captures.get(2) {
debug!(target:MATCH_PREVIEW, "try_match: (unit) captures[2]={c:?}");
};
} else {
debug!(target:MATCH_PREVIEW, "try_match: no matches...");
}
}
Ok((true, captures))
} else {
self.handle_timestamp_extraction_failure(line)?;
Ok((true, None))
}
} else {
Ok((false, None))
}
}
pub fn process(&mut self, caps: regex::Captures, timestamp: ExtractedNaiveDateTime) {
let date = timestamp.date().map(|d| d.format(RECORD_DATE_FORMAT).to_string());
let time = timestamp.time().format(RECORD_TIME_FORMAT).to_string();
let count = self.state.next_count();
let diff = self.state.compute_delta(timestamp);
let mut value = 1.0;
match &self.data_source {
DataSource::EventValue { yvalue, .. } => value = *yvalue,
DataSource::EventCount { .. } | DataSource::EventDelta { .. } => (),
DataSource::FieldValue { .. } | DataSource::FieldValueSum { .. } => {
let raw_val = caps.get(1).map(|m| m.as_str()).unwrap_or("0");
let unit = caps.get(2).map(|m| m.as_str()).unwrap_or("");
value = match normalize_value(raw_val, unit) {
Some(v) => v,
None => {
return;
},
};
},
};
let value_sum = self.state.compute_sum(value);
self.records.push(LogRecord { date, time, value, count, diff, value_sum });
}
fn write_csv(&self) -> Result<(), Error> {
let filename = self.expect_output_path();
let mut file =
File::create(filename).map_err(|e| Error::FileIoError(filename.clone(), e))?;
match self.timestamp_format {
TimestampFormat::Time(_) => {
writeln!(file, "date,time,value,count,delta,value_sum")
.map_err(|e| Error::FileIoError(filename.clone(), e))?;
for r in &self.records {
writeln!(
file,
"2025-01-01,{},{},{},{},{}",
r.time,
r.value,
r.count,
r.diff.unwrap_or(0.0),
r.value_sum
)
.map_err(|e| Error::new_file_io_error(filename, e))?;
}
},
TimestampFormat::DateTime(_) => {
writeln!(file, "date,time,value,count,delta,value_sum")
.map_err(|e| Error::new_file_io_error(filename, e))?;
for r in &self.records {
writeln!(
file,
"{},{},{},{},{},{}",
r.date.as_ref().expect("date should be set"),
r.time,
r.value,
r.count,
r.diff.unwrap_or(0.0),
r.value_sum
)
.map_err(|e| Error::new_file_io_error(filename, e))?;
}
},
};
Ok(())
}
pub fn expect_output_path(&self) -> &PathBuf {
self.output_path
.as_ref()
.expect("output_path is expected to be set (this is bug")
}
}
impl ResolvedLine {
pub fn regex_filename_tag(&self) -> String {
self.line.data_source.regex_filename_tag()
}
pub fn raw_pattern(&self) -> String {
self.line.data_source.raw_pattern()
}
pub fn regex_pattern(&self) -> String {
self.line.data_source.regex_pattern()
}
pub fn title(&self, multi_input_files: bool) -> String {
let file_stem = self
.source
.file_name()
.file_stem()
.expect("filename is validated at this point")
.to_string_lossy();
let title = self.line.params.title.clone().unwrap_or(self.line.data_source.title());
let title = if multi_input_files { format!("{} ({})", title, file_stem) } else { title };
if self.line.params.yaxis == Some(YAxis::Y2) { format!("{} | y2", title) } else { title }
}
pub fn source_file_name(&self) -> &PathBuf {
self.source.file_name()
}
pub fn guard(&self) -> &Option<String> {
self.line.data_source.guard()
}
pub fn csv_data_column_for_plot(&self) -> &'static str {
self.line.data_source.csv_data_column_for_plot()
}
}
impl DataSource {
pub fn regex_filename_tag(&self) -> String {
urlencoding::encode(&self.regex_pattern()).to_string()
}
pub fn title(&self) -> String {
match &self {
DataSource::FieldValue(FieldCaptureSpec { guard: Some(guard), .. }) => {
format!("value of {} {}", guard, self.raw_pattern())
},
DataSource::FieldValueSum(FieldCaptureSpec { guard: Some(guard), .. }) => {
format!("sum of values of {} {}", guard, self.raw_pattern())
},
DataSource::EventValue { guard: Some(guard), .. } => {
format!("presence of {} {}", guard, self.raw_pattern())
},
DataSource::EventCount { guard: Some(guard), .. } => {
format!("count of {} {}", guard, self.raw_pattern())
},
DataSource::EventDelta(EventDeltaSpec { guard: Some(guard), .. }) => {
format!("delta {} {}", guard, self.raw_pattern())
},
DataSource::FieldValue(FieldCaptureSpec { guard: None, .. }) => {
format!("value of {}", self.raw_pattern())
},
DataSource::FieldValueSum(FieldCaptureSpec { guard: None, .. }) => {
format!("sum values of {}", self.raw_pattern())
},
DataSource::EventValue { guard: None, .. } => {
format!("presence of {}", self.raw_pattern())
},
DataSource::EventCount { guard: None, .. } => {
format!("count of {}", self.raw_pattern())
},
DataSource::EventDelta(EventDeltaSpec { guard: None, .. }) => {
format!("delta {}", self.raw_pattern())
},
}
}
fn raw_pattern(&self) -> String {
match &self {
DataSource::EventValue { pattern, .. }
| DataSource::EventCount { pattern, .. }
| DataSource::EventDelta(EventDeltaSpec { pattern, .. }) => pattern.clone(),
DataSource::FieldValue(FieldCaptureSpec { field, .. })
| DataSource::FieldValueSum(FieldCaptureSpec { field, .. }) => field.clone(),
}
}
fn validate_field_regex(&self) -> Result<bool, Error> {
if let DataSource::FieldValue(FieldCaptureSpec { field, .. }) = &self {
if let Ok(regex) = Regex::new(field) {
let captures_len = regex.captures_len() - 1;
if (1..=2).contains(&captures_len) {
return Ok(true);
}
if captures_len > 2 {
return Err(Error::RegexCapturesGroupsInvalidCount(field.clone()));
}
}
}
Ok(false)
}
fn is_field_valid_regex(&self) -> bool {
self.validate_field_regex().unwrap_or(false)
}
fn regex_pattern(&self) -> String {
match &self {
DataSource::EventValue { pattern, .. }
| DataSource::EventCount { pattern, .. }
| DataSource::EventDelta(EventDeltaSpec { pattern, .. }) => pattern.clone(),
DataSource::FieldValue(FieldCaptureSpec { field, .. })
| DataSource::FieldValueSum(FieldCaptureSpec { field, .. }) => {
if self.is_field_valid_regex() {
field.clone()
} else {
format!(r"\b{}=([\d\.]+)(\w+)?", regex::escape(field))
}
},
}
}
pub fn compile_regex(&self) -> Result<Regex, Error> {
self.validate_field_regex()?;
Regex::new(&self.regex_pattern()).map_err(Into::into)
}
pub fn guard(&self) -> &Option<String> {
match &self {
DataSource::EventValue { guard, .. }
| DataSource::EventCount { guard, .. }
| DataSource::EventDelta(EventDeltaSpec { guard, .. })
| DataSource::FieldValue(FieldCaptureSpec { guard, .. }) => guard,
| DataSource::FieldValueSum(FieldCaptureSpec { guard, .. }) => guard,
}
}
pub fn csv_data_column_for_plot(&self) -> &'static str {
match &self {
DataSource::FieldValue { .. } | DataSource::EventValue { .. } => "value",
DataSource::FieldValueSum { .. } => "value_sum",
DataSource::EventCount { .. } => "count",
DataSource::EventDelta { .. } => "delta",
}
}
}
impl ResolvedLine {
pub fn can_csv_file_be_shared(&self) -> bool {
matches!(
&self.line.data_source,
DataSource::EventCount { .. } | DataSource::EventDelta { .. }
)
}
pub fn get_csv_filename(&self, global_guards: &[String]) -> PathBuf {
let tag = self.regex_filename_tag();
let core = match &self.line.data_source {
DataSource::EventValue { yvalue, .. } => format!("value_{yvalue}_{tag}"),
DataSource::EventCount { .. } => format!("count_{tag}"),
DataSource::EventDelta { .. } => format!("delta_{tag}"),
DataSource::FieldValue { .. } | DataSource::FieldValueSum { .. } => tag,
};
let log_name = self
.source_file_name()
.file_name()
.expect("file path shall be given")
.to_string_lossy();
let ts = fs::metadata(self.source_file_name())
.and_then(|m| m.modified())
.map_err(|_| ())
.and_then(|t| t.duration_since(UNIX_EPOCH).map_err(|_| ()))
.map(|d| d.as_secs().to_string())
.unwrap_or_else(|_| "nots".to_string());
let global_guards = global_guards.join("___");
PathBuf::from(if let Some(guard) = self.line.data_source.guard() {
format!("{log_name}_{ts}__{global_guards}__{guard}__{core}.csv")
} else {
format!("{log_name}_{ts}__{global_guards}__{core}.csv")
})
}
}
fn propagate_shared_csv_files<F>(
config: &mut ResolvedGraphConfig,
inpput_files_context: &InputFilesContext,
get_cache_dir: F,
) -> Result<HashMap<PathBuf, ResolvedLine>, Error>
where
F: Fn(&InputFilesContext, &PathBuf) -> Result<PathBuf, Error>,
{
type MatchKey = (Option<String>, String, PathBuf);
let mut grouped_lines: HashMap<MatchKey, Vec<&mut ResolvedLine>> = HashMap::new();
for panel in &mut config.panels {
for line in &mut panel.lines {
let guard = line.guard().clone();
let token = line.raw_pattern();
let input = line.source_file_name().clone();
grouped_lines.entry((guard, token, input)).or_default().push(line);
}
}
trace!(target: LOG_TARGET, "propagete_shared_csv_files {:#?}", grouped_lines);
let mut canonicals: HashMap<PathBuf, ResolvedLine> = Default::default();
for ((_, _, input_filename), mut lines) in grouped_lines {
for line in &mut lines {
let output_dir = get_cache_dir(inpput_files_context, &input_filename)?;
let csv_output_path =
output_dir.join(line.get_csv_filename(inpput_files_context.guards()));
line.set_shared_csv_filename(&csv_output_path);
}
let canonical = lines
.iter()
.find(|l| matches!(l.line.data_source, DataSource::FieldValue { .. }))
.or(lines
.iter()
.find(|l| matches!(l.line.data_source, DataSource::EventValue { .. })))
.unwrap_or(&lines[0]);
canonicals.insert((*canonical).expect_shared_csv_filename(), (*canonical).clone());
let shared_path = canonical.expect_shared_csv_filename();
trace!(target: LOG_TARGET, "propagete_shared_csv_files canonical {:#?}", shared_path);
for line in lines {
if line.can_csv_file_be_shared() {
line.set_shared_csv_filename(&shared_path);
} else {
canonicals
.entry(line.expect_shared_csv_filename())
.or_insert_with(|| (*line).clone());
}
}
}
trace!(target: LOG_TARGET, "propagete_shared_csv_files cannonicals: {:#?}", canonicals);
Ok(canonicals)
}
pub fn process_inputs(
config: &mut ResolvedGraphConfig,
input_context: &InputFilesContext,
) -> Result<(), Error> {
let mut canonical_lines =
propagate_shared_csv_files(config, input_context, |input_context, input_file_name| {
input_context.get_cache_dir(input_file_name)
})?;
trace!(target: LOG_TARGET, "after propagete_shared_csv_files {:#?}", config);
let mut processors: HashMap<PathBuf, HashMap<PathBuf, LineProcessor>> = Default::default();
for line in config.all_lines() {
let csv_output_path = line.expect_shared_csv_filename();
let output_dir: PathBuf = csv_output_path
.parent()
.expect("CSV file shall be resolved to path with at least one parent")
.into();
if !output_dir.exists() {
std::fs::create_dir_all(&output_dir)
.map_err(|e| Error::new_file_io_error(&output_dir, e))?;
}
if !input_context.force_csv_regen() && Path::new(&csv_output_path).exists() {
debug!(
target: APPV,
"Using cached file for regex: {} file: {}",
line.line.data_source.regex_pattern(),
csv_output_path.display(),
);
continue;
}
if let Some(canonical_line) = canonical_lines.remove(&csv_output_path) {
let processor = LineProcessor::from_data_source_with_global_guards(
canonical_line.line.data_source.clone(),
Some(csv_output_path),
input_context.timestamp_format().clone(),
canonical_line.source_file_name().clone(),
input_context.ignore_invalid_timestamps(),
input_context.guards().clone(),
)?;
processors
.entry(canonical_line.source_file_name().clone())
.or_default()
.entry(processor.expect_output_path().clone())
.or_insert(processor);
}
}
trace!(target: LOG_TARGET, "process_inputs readers: {:#?}", processors);
for (log_file_name, mut processors) in processors {
if !log_file_name.is_file() {
return Err(Error::InvalidInputFile(log_file_name, "Not a regular file".to_string()));
}
let input_file =
File::open(&log_file_name).map_err(|e| Error::new_file_io_error(&log_file_name, e))?;
let reader = BufReader::new(input_file);
for line in reader.lines().map_while(Result::ok) {
for processor in &mut processors.values_mut() {
if let (_, Some((captures, timestamp))) = processor.try_match(&line)? {
processor.process(captures, timestamp);
}
}
}
for (_, processor) in processors {
assert_eq!(log_file_name, processor.input_file_name);
if !processor.records.is_empty() {
debug!(
target:APPV,
"Processed input file: {}, regex: {}, matched {}, cache file: {}",
log_file_name.display(),
processor.data_source.regex_pattern(),
processor.records.len(),
processor.expect_output_path().display()
);
}
processor.write_csv()?;
}
}
config.resolve_data_points_count()?;
Ok(())
}
pub fn regex_match_preview(
config: MatchPreviewConfig,
context: SharedMatchPreviewContext,
verbose_level: u8,
) -> Result<(), Error> {
let env_filter = if verbose_level == 2 {
EnvFilter::new(format!("warn,{}=trace", MATCH_PREVIEW))
} else {
EnvFilter::new(format!("warn,{}=debug", MATCH_PREVIEW))
};
let preview_layer = tracing_subscriber::fmt::layer()
.without_time()
.with_target(false)
.with_level(true);
let preview_subscriber = Registry::default().with(preview_layer.with_filter(env_filter));
tracing::subscriber::with_default(preview_subscriber, || {
regex_match_preview_inner(config, context)
})
}
pub fn regex_match_preview_inner(
config: MatchPreviewConfig,
context: SharedMatchPreviewContext,
) -> Result<(), Error> {
let mut processor = LineProcessor::from_data_source(
config.data_source.clone(),
None,
context.timestamp_format().clone(),
context.input.clone(),
false,
)?;
let input_file =
File::open(&context.input).map_err(|e| Error::FileIoError(context.input.clone(), e))?;
let reader = BufReader::new(input_file);
let mut matched_count = 0;
info!(target:MATCH_PREVIEW, "input file: {}", context.input.display());
if let Some(guard) = config.data_source.guard().as_ref() {
info!(target:MATCH_PREVIEW, "guard: {guard}")
};
info!(target:MATCH_PREVIEW, "regex pattern: {}", config.data_source.regex_pattern());
info!(target:MATCH_PREVIEW, "timestamp pattern: {:?}", context.timestamp_format);
for line in reader.lines().map_while(Result::ok) {
let (guard_matched, captured) = processor.try_match(&line)?;
if guard_matched {
if let Some((captures, timestamp)) = captured {
processor.process(captures, timestamp);
info!(target:MATCH_PREVIEW, "matched: {:#?}", processor.records.last());
}
matched_count += 1;
}
if matched_count >= context.count {
break;
}
}
if matched_count == 0 {
if let Some(guard) = config.data_source.guard() {
warn!(target:MATCH_PREVIEW, "No lines matched against guard: '{:?}'", guard);
warn!(target:MATCH_PREVIEW, "Is it correctly configured?");
}
}
Ok(())
}
impl ResolvedGraphConfig {
pub fn resolve_data_points_count(&mut self) -> Result<(), Error> {
for panel in &mut self.panels {
for line in &mut panel.lines {
let file_path = line.expect_shared_csv_filename();
let file =
File::open(&file_path).map_err(|e| Error::new_file_io_error(&file_path, e))?;
let reader = io::BufReader::new(file);
let data_points_count = reader.lines().count() - 1;
line.set_data_points_count(data_points_count);
let log_file_name = line.source_file_name();
if data_points_count == 0 {
warn!(
target:APPV,
input_file = ?log_file_name.display(),
guard = ?line.guard(),
regex = line.regex_pattern(),
"No matches."
);
} else {
debug!(
target:APPV,
"Matched {} entries: {}{}, user-pattern: {}, regex: {}, cache file: {}",
data_points_count,
log_file_name.display(),
line.guard().clone().map(|v| format!(", guard: {v}")).unwrap_or_default(),
line.raw_pattern(),
line.regex_pattern(),
line.expect_shared_csv_filename().display()
);
}
}
}
Ok(())
}
}
fn normalize_value(value: &str, unit: &str) -> Option<f64> {
let base: f64 = value.parse().ok()?;
match unit {
"s" => Some(base * 1000.0),
"ms" => Some(base),
"us" | "µs" => Some(base / 1000.0),
"ns" => Some(base / 1000000.0),
"microseconds" => Some(base / 1000.0),
_ => Some(base),
}
}
impl ProcessingState {
fn new() -> Self {
Self { count: 0, last_timestamp: None, value_sum: 0.0 }
}
fn next_count(&mut self) -> u64 {
self.count += 1;
self.count
}
fn compute_delta(&mut self, current: ExtractedNaiveDateTime) -> Option<f64> {
let diff = self
.last_timestamp
.map(|prev| current.signed_duration_since(prev).num_milliseconds() as f64);
self.last_timestamp = Some(current);
diff
}
fn compute_sum(&mut self, value: f64) -> f64 {
self.value_sum += value;
self.value_sum
}
}
#[derive(Clone, Copy, Debug)]
pub enum ExtractedNaiveDateTime {
DateTime(NaiveDateTime),
Time(NaiveTime),
}
impl ExtractedNaiveDateTime {
fn date(&self) -> Option<NaiveDate> {
match self {
Self::Time(_) => None,
Self::DateTime(v) => Some(v.date()),
}
}
fn time(&self) -> NaiveTime {
match self {
Self::Time(v) => *v,
Self::DateTime(v) => v.time(),
}
}
pub const fn signed_duration_since(self, rhs: ExtractedNaiveDateTime) -> TimeDelta {
match (self, rhs) {
(Self::Time(v), Self::Time(rhs)) => v.signed_duration_since(rhs),
(Self::DateTime(v), Self::DateTime(rhs)) => v.signed_duration_since(rhs),
_ => panic!("should not happen"),
}
}
}
impl TimestampFormat {
fn extract_timestamp<'a>(
&self,
line: &'a str,
) -> Result<(ExtractedNaiveDateTime, &'a str), ParseError> {
Ok(match self {
TimestampFormat::Time(fmt) => NaiveTime::parse_and_remainder(line, fmt)
.map(|v| (ExtractedNaiveDateTime::Time(v.0), v.1))?,
TimestampFormat::DateTime(fmt) => {
let mut parsed = chrono::format::Parsed::new();
let remainder = chrono::format::parse_and_remainder(
&mut parsed,
line,
chrono::format::StrftimeItems::new(fmt),
)?;
trace!(target:MATCH_PREVIEW, ?parsed, "extract_timestamp");
let dt = match parsed.to_naive_datetime_with_offset(0) {
Ok(dt) => dt,
_ => {
if parsed.year().is_none() {
parsed.set_year(2025)?;
}
parsed.to_naive_datetime_with_offset(0)?
},
};
(ExtractedNaiveDateTime::DateTime(dt), remainder)
},
})
}
}
impl InputFilesContext {
fn get_cache_root(&self) -> &Option<PathBuf> {
self.cache_dir()
}
pub fn get_cache_dir(&self, log_file: &Path) -> Result<PathBuf, Error> {
let log_file_path =
log_file.canonicalize().map_err(|e| Error::new_file_io_error(log_file, e))?; self.get_cache_dir_inner(&log_file_path)
}
fn get_cache_dir_inner(&self, log_file_path: &Path) -> Result<PathBuf, Error> {
assert!(log_file_path.is_absolute());
if let Some(root) = self.get_cache_root() {
let relative = log_file_path.strip_prefix("/").unwrap_or(log_file_path);
Ok(root.join(relative).parent().unwrap_or(root).to_path_buf())
} else {
let log_dir = log_file_path.parent().unwrap_or_else(|| Path::new("."));
Ok(log_dir.join(".plox"))
}
}
}
struct PloxHisto {
histogram: histo_fp::Histogram,
width: Option<usize>,
precision: Option<usize>,
}
impl PloxHisto {
pub fn with_buckets(
num_buckets: u64,
width: Option<usize>,
precision: Option<usize>,
) -> PloxHisto {
PloxHisto {
histogram: histo_fp::Histogram::with_buckets(num_buckets, None),
width,
precision,
}
}
}
use std::cmp;
use std::fmt;
impl fmt::Display for PloxHisto {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use std::fmt::Write;
let width = self.width.unwrap_or(10);
let precision = self.precision.unwrap_or(4);
if self.histogram.buckets().next().is_none() {
return Ok(());
}
let max_bucket_count = self.histogram.buckets().map(|b| b.count()).fold(0, cmp::max);
const WIDTH: u64 = 50;
let count_per_char = cmp::max(max_bucket_count / WIDTH, 1);
writeln!(f, "# Each ∎ is a count of {}", count_per_char)?;
writeln!(f, "#")?;
let mut count_str = String::new();
let widest_count = self.histogram.buckets().fold(0, |n, b| {
count_str.clear();
write!(&mut count_str, "{}", b.count()).unwrap();
cmp::max(n, count_str.len())
});
let mut end_str = String::new();
let widest_range = self.histogram.buckets().fold(0, |n, b| {
end_str.clear();
write!(
&mut end_str,
"{:width$.precision$}",
b.end(),
width = width,
precision = precision
)
.unwrap();
cmp::max(n, end_str.len())
});
let mut start_str = String::with_capacity(widest_range);
for bucket in self.histogram.buckets() {
start_str.clear();
write!(
&mut start_str,
"{:width$.precision$}",
bucket.start(),
width = width,
precision = precision
)
.unwrap();
for _ in 0..widest_range - start_str.len() {
start_str.insert(0, ' ');
}
end_str.clear();
write!(
&mut end_str,
"{:width$.precision$}",
bucket.end(),
width = width,
precision = precision,
)
.unwrap();
for _ in 0..widest_range - end_str.len() {
end_str.insert(0, ' ');
}
count_str.clear();
write!(&mut count_str, "{}", bucket.count()).unwrap();
for _ in 0..widest_count - count_str.len() {
count_str.insert(0, ' ');
}
write!(f, "{} - {} [ {} ]: ", start_str, end_str, count_str)?;
for _ in 0..bucket.count() / count_per_char {
write!(f, "∎")?;
}
writeln!(f)?;
}
Ok(())
}
}
impl ResolvedLine {
pub fn has_data_points_in_time_range(
&self,
start: NaiveDateTime,
end: NaiveDateTime,
) -> Result<bool, Error> {
let filename = self.expect_shared_csv_filename();
let mut rdr = csv::Reader::from_path(&filename)
.map_err(|e| Error::CsvParseError(filename.clone(), e))?;
for result in rdr.deserialize() {
let record: CvsLogRecord =
result.map_err(|e| Error::CsvParseError(filename.clone(), e))?;
let record_ts = NaiveDateTime::new(
NaiveDate::parse_from_str(
&record.date.expect("date is always written into csv"),
RECORD_DATE_FORMAT,
)?,
NaiveTime::parse_from_str(&record.time, RECORD_TIME_FORMAT)?,
);
if record_ts >= start && record_ts < end {
return Ok(true);
}
if record_ts > end {
return Ok(false);
}
}
Ok(false)
}
}
#[derive(Debug, Deserialize)]
struct CvsLogRecord {
pub date: Option<String>,
pub time: String,
pub value: f64,
#[allow(dead_code)]
pub count: u64,
pub delta: f64,
#[allow(dead_code)]
pub value_sum: f64,
}
pub fn display_stats(
config: &ResolvedGraphConfig,
buckets_count: u64,
width: Option<usize>,
precision: Option<usize>,
) -> Result<(), Error> {
let lines_count = config.all_lines_count();
for (i, line) in config.all_lines().enumerate() {
let filename = line.expect_shared_csv_filename();
let mut rdr = csv::Reader::from_path(&filename)
.map_err(|e| Error::CsvParseError(filename.clone(), e))?;
let mut values: Vec<f64> = vec![];
for result in rdr.deserialize() {
let record: CvsLogRecord =
result.map_err(|e| Error::CsvParseError(filename.clone(), e))?;
match &line.line.data_source {
DataSource::FieldValue { .. } => values.push(record.value),
DataSource::EventDelta { .. } => {
values.push(record.delta);
},
_ => {
unreachable!("this is bug.");
},
};
}
let mut h = PloxHisto::with_buckets(buckets_count, width, precision);
values.iter().for_each(|x| {
h.histogram.add(*x);
});
if i > 0 {
println!("-------------------------");
}
if lines_count > 1 {
println!("file: {}", line.source.file_name().display());
}
println!(" count: {}", values.len());
if values.is_empty() {
continue;
}
println!(" min: {}", Statistics::min(&values));
println!(" max: {}", Statistics::max(&values));
println!(" mean: {}", Statistics::mean(&values));
let mut data = Data::new(values);
println!("median: {}", data.percentile(50));
println!(" q75: {}", data.percentile(75));
println!(" q90: {}", data.percentile(90));
println!(" q95: {}", data.percentile(95));
println!(" q99: {}", data.percentile(99));
println!("\n{h}");
}
Ok(())
}
pub fn display_values(config: &ResolvedGraphConfig) -> Result<(), Error> {
if config.all_lines_count() > 1 {
return Err(Error::CatCmdManyInputFiles);
}
for line in config.all_lines() {
let filename = line.expect_shared_csv_filename();
let mut rdr = csv::Reader::from_path(&filename)
.map_err(|e| Error::CsvParseError(filename.clone(), e))?;
for result in rdr.deserialize() {
let record: CvsLogRecord =
result.map_err(|e| Error::CsvParseError(filename.clone(), e))?;
match &line.line.data_source {
DataSource::FieldValue { .. } => println!("{:?}", record.value),
DataSource::EventDelta { .. } => println!("{:?}", record.delta),
_ => {
unreachable!("this is bug.");
},
};
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use chrono::{NaiveDate, NaiveTime};
use crate::{
graph_config::{DEFAULT_TIMESTAMP_FORMAT, Line},
logging::init_tracing_test,
resolved_graph_config::ResolvedPanel,
};
use super::*;
fn build_resolved_graph_config(lines: Vec<ResolvedLine>) -> ResolvedGraphConfig {
ResolvedGraphConfig { panels: vec![ResolvedPanel::new_with_lines(lines)] }
}
fn event_line(
input_file: &'static str,
guard: Option<&'static str>,
field: &'static str,
yvalue: f64,
) -> ResolvedLine {
ResolvedLine::from_explicit_name(
Line::new_with_data_source(DataSource::new_event_value(
guard.map(Into::into),
field.into(),
yvalue,
)),
PathBuf::from(input_file),
)
}
fn event_delta_line(
input_file: &'static str,
guard: Option<&'static str>,
field: &'static str,
) -> ResolvedLine {
ResolvedLine::from_explicit_name(
Line::new_with_data_source(DataSource::new_event_delta(
guard.map(Into::into),
field.into(),
)),
PathBuf::from(input_file),
)
}
fn event_count_line(
input_file: &'static str,
guard: Option<&'static str>,
field: &'static str,
) -> ResolvedLine {
ResolvedLine::from_explicit_name(
Line::new_with_data_source(DataSource::new_event_count(
guard.map(Into::into),
field.into(),
)),
PathBuf::from(input_file),
)
}
fn plot_line(
input_file: &'static str,
guard: Option<&'static str>,
field: &'static str,
) -> ResolvedLine {
ResolvedLine::from_explicit_name(
Line::new_with_data_source(DataSource::new_plot_field(
guard.map(Into::into),
field.into(),
)),
PathBuf::from(input_file),
)
}
fn field_sum_line(
input_file: &'static str,
guard: Option<&'static str>,
field: &'static str,
) -> ResolvedLine {
ResolvedLine::from_explicit_name(
Line::new_with_data_source(DataSource::new_field_sum(
guard.map(Into::into),
field.into(),
)),
PathBuf::from(input_file),
)
}
fn check_output_and_config(
config: ResolvedGraphConfig,
output: HashMap<PathBuf, ResolvedLine>,
expected_output_len: usize,
allow_shared_lines_in_output: bool,
) {
assert_eq!(
output.len(),
expected_output_len,
"Output len mismatch e:{}/a:{}",
expected_output_len,
output.len()
);
for value in config.all_lines() {
value.expect_shared_csv_filename();
}
for (output_file_name, canonical) in &output {
assert_eq!(*output_file_name, canonical.expect_shared_csv_filename());
if !allow_shared_lines_in_output {
assert!(!canonical.can_csv_file_be_shared(), "no shared lines allowed in output");
}
}
let mut seen_filenames = std::collections::HashSet::new();
for value in output.values() {
let filename = value.expect_shared_csv_filename();
assert!(
seen_filenames.insert(filename.clone()),
"Duplicate filename detected: {}",
filename.display()
);
}
for line in config.all_lines() {
let mut allowed_canonical_names = vec![];
for (output_file_name, canonical) in &output {
if line.raw_pattern() == canonical.raw_pattern()
&& line.guard() == canonical.guard()
&& line.source_file_name() == canonical.source_file_name()
{
allowed_canonical_names.push(output_file_name.clone());
}
}
assert!(allowed_canonical_names.contains(&line.expect_shared_csv_filename()));
}
for line in config.all_lines() {
let shared_csv_file = line.expect_shared_csv_filename();
assert!(
output.contains_key(&shared_csv_file),
"Output should contain shared_csv_file: {}",
shared_csv_file.display()
);
}
}
fn call_propagate_shared_csv_files(
config: &mut ResolvedGraphConfig,
) -> Result<HashMap<PathBuf, ResolvedLine>, Error> {
let input_context = InputFilesContext::new_with_input(vec![PathBuf::from("input.log")]);
propagate_shared_csv_files(config, &input_context, |_, _| {
Ok(PathBuf::from("/some/out/dir"))
})
}
#[test]
fn test_csv_resolution_00() {
init_tracing_test();
let mut config =
build_resolved_graph_config(vec![plot_line("input.log", Some("guard"), "duration")]);
let output = call_propagate_shared_csv_files(&mut config).unwrap();
check_output_and_config(config, output, 1, false);
}
#[test]
fn test_csv_resolution_00a() {
init_tracing_test();
let mut config = build_resolved_graph_config(vec![
plot_line("input.log", Some("guard"), "duration"),
event_count_line("input.log", Some("guard"), "duration"),
]);
let output = call_propagate_shared_csv_files(&mut config).unwrap();
check_output_and_config(config, output, 1, false);
}
#[test]
fn test_csv_resolution_00b() {
init_tracing_test();
let mut config = build_resolved_graph_config(vec![
plot_line("input.log", Some("guard"), "duration"),
event_count_line("input.log", Some("guard"), "duration"),
event_delta_line("input.log", Some("guard"), "duration"),
]);
let output = call_propagate_shared_csv_files(&mut config).unwrap();
check_output_and_config(config, output, 1, false);
}
#[test]
fn test_csv_resolution_00c() {
init_tracing_test();
let mut config = build_resolved_graph_config(vec![
plot_line("input.log", Some("guard"), "duration"),
event_line("input.log", Some("guard"), "duration", 100.0),
]);
let output = call_propagate_shared_csv_files(&mut config).unwrap();
check_output_and_config(config, output, 2, false);
}
#[test]
fn test_csv_resolution_00d() {
init_tracing_test();
let mut config = build_resolved_graph_config(vec![
event_line("input.log", Some("guard"), "duration", 100.0),
event_delta_line("input.log", Some("guard"), "duration"),
]);
let output = call_propagate_shared_csv_files(&mut config).unwrap();
check_output_and_config(config, output, 1, false);
}
#[test]
fn test_csv_resolution_01() {
init_tracing_test();
let mut config = build_resolved_graph_config(vec![
plot_line("input.log", Some("guard0"), "duration"),
plot_line("input.log", Some("guard1"), "duration"),
]);
let output = call_propagate_shared_csv_files(&mut config).unwrap();
check_output_and_config(config, output, 2, false);
}
#[test]
fn test_csv_resolution_03() {
init_tracing_test();
let mut config = build_resolved_graph_config(vec![
plot_line("input.log", Some("guard0"), "duration"),
event_line("input.log", Some("guard1"), "duration", 100.0),
]);
let output = call_propagate_shared_csv_files(&mut config).unwrap();
check_output_and_config(config, output, 2, false);
}
#[test]
fn test_csv_resolution_04() {
init_tracing_test();
let mut config = build_resolved_graph_config(vec![
plot_line("input.log", Some("guard0"), "duration"),
event_line("input.log", Some("guard1"), "duration", 100.0),
event_count_line("input.log", Some("guard1"), "duration"),
]);
let output = call_propagate_shared_csv_files(&mut config).unwrap();
check_output_and_config(config, output, 2, false);
}
#[test]
fn test_csv_resolution_05() {
init_tracing_test();
let mut config = build_resolved_graph_config(vec![event_count_line(
"input.log",
Some("guard1"),
"duration",
)]);
let output = call_propagate_shared_csv_files(&mut config).unwrap();
check_output_and_config(config, output, 1, true);
}
#[test]
fn test_csv_resolution_06() {
init_tracing_test();
let mut config = build_resolved_graph_config(vec![
event_count_line("input.log", Some("guard1"), "duration1"),
event_delta_line("input.log", Some("guard1"), "duration2"),
]);
let output = call_propagate_shared_csv_files(&mut config).unwrap();
check_output_and_config(config, output, 2, true);
}
#[test]
fn test_csv_resolution_07() {
init_tracing_test();
let mut config = build_resolved_graph_config(vec![
plot_line("input.log", Some("guard1"), "duration"),
event_line("input.log", Some("guard1"), "duration", 100.0),
event_count_line("input.log", Some("guard1"), "duration"),
event_delta_line("input.log", Some("guard1"), "duration"),
plot_line("input.log", Some("guard2"), "duration"),
event_line("input.log", Some("guard2"), "duration", 100.0),
event_count_line("input.log", Some("guard2"), "duration"),
event_delta_line("input.log", Some("guard2"), "duration"),
]);
let output = call_propagate_shared_csv_files(&mut config).unwrap();
check_output_and_config(config, output, 4, false);
}
#[test]
fn test_csv_resolution_08() {
init_tracing_test();
let mut config = build_resolved_graph_config(vec![
plot_line("input1.log", Some("guard"), "duration"),
event_line("input1.log", Some("guard"), "duration", 100.0),
event_count_line("input1.log", Some("guard"), "duration"),
event_delta_line("input1.log", Some("guard"), "duration"),
plot_line("input2.log", Some("guard"), "duration"),
event_line("input2.log", Some("guard"), "duration", 100.0),
event_count_line("input2.log", Some("guard"), "duration"),
event_delta_line("input2.log", Some("guard"), "duration"),
]);
let output = call_propagate_shared_csv_files(&mut config).unwrap();
check_output_and_config(config, output, 4, false);
}
#[test]
fn test_csv_resolution_09() {
init_tracing_test();
let mut config = build_resolved_graph_config(vec![
event_count_line("input1.log", Some("guard1"), "duration"),
event_delta_line("input1.log", Some("guard2"), "duration"),
event_count_line("input2.log", Some("guard1"), "duration"),
event_delta_line("input2.log", Some("guard2"), "duration"),
]);
let output = call_propagate_shared_csv_files(&mut config).unwrap();
check_output_and_config(config, output, 4, true);
}
#[test]
fn test_csv_resolution_10() {
init_tracing_test();
let mut config = build_resolved_graph_config(vec![
plot_line("input.log", Some("guard"), "duration"),
plot_line("input.log", Some("guard"), "duration"),
plot_line("input.log", Some("guard"), "duration"),
]);
let output = call_propagate_shared_csv_files(&mut config).unwrap();
check_output_and_config(config, output, 1, false);
}
#[test]
fn test_csv_resolution_11() {
init_tracing_test();
let mut config = build_resolved_graph_config(vec![
event_line("input.log", Some("guard"), "duration", 1.0),
event_line("input.log", Some("guard"), "duration", 2.0),
event_line("input.log", Some("guard"), "duration", 3.0),
]);
let output = call_propagate_shared_csv_files(&mut config).unwrap();
check_output_and_config(config, output, 3, false);
}
#[test]
fn test_csv_resolution_12() {
init_tracing_test();
let mut config = build_resolved_graph_config(vec![
event_count_line("input.log", Some("guard"), "duration"),
event_count_line("input.log", Some("guard"), "duration"),
event_count_line("input.log", Some("guard"), "duration"),
]);
let output = call_propagate_shared_csv_files(&mut config).unwrap();
check_output_and_config(config, output, 1, true);
}
#[test]
fn test_line_processing_00() {
init_tracing_test();
let log_line = "2025-04-03 11:32:48.027 INFO main: operation duration=12.5ms";
let resolved_line = plot_line("input.log", Some("operation"), "duration");
let mut processor = LineProcessor::from_data_source(
resolved_line.line.data_source,
Some(PathBuf::from("output.csv")),
DEFAULT_TIMESTAMP_FORMAT,
"input.log".into(),
false,
)
.unwrap();
assert!(processor.guard_matches(log_line));
let (g, matched) = processor.try_match(log_line).unwrap();
let (captures, timestamp) = matched.unwrap();
assert!(g);
processor.process(captures, timestamp);
assert_eq!(processor.records.len(), 1);
let record = &processor.records[0];
assert_eq!(record.value, 12.5);
assert_eq!(record.count, 1);
assert_eq!(record.diff, None);
}
#[test]
fn test_line_processing_single_line_check() {
init_tracing_test();
let log_line = "2025-04-03 11:32:48.027 INFO main: operation duration:12.5us, val:127.0ms";
let resolved_line = plot_line("input.log", Some("operation"), r"duration:([\d\.]+)(\w+)?");
let mut processor = LineProcessor::from_data_source(
resolved_line.line.data_source,
Some(PathBuf::from("output.csv")),
DEFAULT_TIMESTAMP_FORMAT,
"input.log".into(),
false,
)
.unwrap();
assert!(processor.guard_matches(log_line));
let (g, matched) = processor.try_match(log_line).unwrap();
let (captures, timestamp) = matched.unwrap();
assert!(g);
processor.process(captures, timestamp);
tracing::info!("{:#?}", timestamp);
let d = NaiveDate::from_ymd_opt(2025, 4, 3).unwrap();
let t = NaiveTime::from_hms_milli_opt(11, 32, 48, 27).unwrap();
assert_eq!(timestamp.date().unwrap(), d);
assert_eq!(timestamp.time(), t);
assert_eq!(processor.records.len(), 1);
let record = &processor.records[0];
assert_eq!(record.value, 0.0125);
assert_eq!(record.count, 1);
assert_eq!(record.diff, None);
}
#[test]
fn test_line_processing_date_format_no_year2() {
init_tracing_test();
let log_line = "Apr 20 08:26:13 AM 1000 25131 6737.00 3.17 817575604 3179060 2.41 polkadot-parach";
let resolved_line =
plot_line("input.log", Some("polkadot-parach"), r"^\s+(?:[\d\.]+\s+){3}([\d\.]+)");
let mut processor = LineProcessor::from_data_source(
resolved_line.line.data_source,
Some(PathBuf::from("output.csv")),
"%b %d %I:%M:%S %p".into(),
"input.log".into(),
false,
)
.unwrap();
assert!(processor.guard_matches(log_line));
let (g, matched) = processor.try_match(log_line).unwrap();
let (captures, timestamp) = matched.unwrap();
assert!(g);
let d = NaiveDate::from_ymd_opt(2025, 4, 20).unwrap();
let t = NaiveTime::from_hms_opt(8, 26, 13).unwrap();
assert_eq!(timestamp.date().unwrap(), d);
assert_eq!(timestamp.time(), t);
processor.process(captures, timestamp);
assert_eq!(processor.records.len(), 1);
let record = &processor.records[0];
assert_eq!(record.value, 3.17);
assert_eq!(record.count, 1);
assert_eq!(record.diff, None);
}
#[test]
fn test_line_processing_date_format_seconds_since_epoch() {
init_tracing_test();
let log_line = "[1577834199] 1000 25131 6737.00 3.17 817575604 3179060 2.41 polkadot-parach";
let resolved_line =
plot_line("input.log", Some("polkadot-parach"), r"^\s+(?:[\d\.]+\s+){3}([\d\.]+)");
let mut processor = LineProcessor::from_data_source(
resolved_line.line.data_source,
Some(PathBuf::from("output.csv")),
"[%s]".into(),
"input.log".into(),
false,
)
.unwrap();
assert!(processor.guard_matches(log_line));
let (g, matched) = processor.try_match(log_line).unwrap();
let (captures, timestamp) = matched.unwrap();
assert!(g);
let d = NaiveDate::from_ymd_opt(2019, 12, 31).unwrap();
let t = NaiveTime::from_hms_opt(23, 16, 39).unwrap();
assert_eq!(timestamp.date().unwrap(), d);
assert_eq!(timestamp.time(), t);
processor.process(captures, timestamp);
assert_eq!(processor.records.len(), 1);
let record = &processor.records[0];
assert_eq!(record.value, 3.17);
assert_eq!(record.count, 1);
assert_eq!(record.diff, None);
}
#[test]
#[ignore]
fn test_line_processing_date_format_seconds_since_epoch2() {
init_tracing_test();
let log_line = "[636152.333] 1000 25131 6737.00 3.17 817575604 3179060 2.41 polkadot-parach";
let resolved_line =
plot_line("input.log", Some("polkadot-parach"), r"^\s+(?:[\d\.]+\s+){3}([\d\.]+)");
let mut processor = LineProcessor::from_data_source(
resolved_line.line.data_source,
Some(PathBuf::from("output.csv")),
"[%s.3f]".into(),
"input.log".into(),
false,
)
.unwrap();
assert!(processor.guard_matches(log_line));
let (g, matched) = processor.try_match(log_line).unwrap();
let (captures, timestamp) = matched.unwrap();
assert!(g);
let d = NaiveDate::from_ymd_opt(2019, 12, 31).unwrap();
let t = NaiveTime::from_hms_opt(23, 16, 39).unwrap();
assert_eq!(timestamp.date().unwrap(), d);
assert_eq!(timestamp.time(), t);
processor.process(captures, timestamp);
assert_eq!(processor.records.len(), 1);
let record = &processor.records[0];
assert_eq!(record.value, 3.17);
assert_eq!(record.count, 1);
assert_eq!(record.diff, None);
}
#[test]
fn test_line_processing_date_format_no_year() {
init_tracing_test();
let log_line = "035 08:26:13 AM 1000 25131 6737.00 3.17 817575604 3179060 2.41 polkadot-parach";
let resolved_line =
plot_line("input.log", Some("polkadot-parach"), r"^\s+(?:[\d\.]+\s+){3}([\d\.]+)");
let mut processor = LineProcessor::from_data_source(
resolved_line.line.data_source,
Some(PathBuf::from("output.csv")),
"%j %I:%M:%S %p".into(),
"input.log".into(),
false,
)
.unwrap();
assert!(processor.guard_matches(log_line));
let (g, matched) = processor.try_match(log_line).unwrap();
let (captures, timestamp) = matched.unwrap();
assert!(g);
let d = NaiveDate::from_ymd_opt(2025, 2, 4).unwrap();
let t = NaiveTime::from_hms_opt(8, 26, 13).unwrap();
assert_eq!(timestamp.date().unwrap(), d);
assert_eq!(timestamp.time(), t);
processor.process(captures, timestamp);
assert_eq!(processor.records.len(), 1);
let record = &processor.records[0];
assert_eq!(record.value, 3.17);
assert_eq!(record.count, 1);
assert_eq!(record.diff, None);
}
#[test]
fn test_line_processing_date_format() {
init_tracing_test();
let log_line = "2025 035 08:26:13 AM 1000 25131 6737.00 3.17 817575604 3179060 2.41 polkadot-parach";
let resolved_line =
plot_line("input.log", Some("polkadot-parach"), r"^\s+(?:[\d\.]+\s+){3}([\d\.]+)");
let mut processor = LineProcessor::from_data_source(
resolved_line.line.data_source,
Some(PathBuf::from("output.csv")),
"%Y %j %I:%M:%S %p".into(),
"input.log".into(),
false,
)
.unwrap();
assert!(processor.guard_matches(log_line));
let (g, matched) = processor.try_match(log_line).unwrap();
let (captures, timestamp) = matched.unwrap();
assert!(g);
let d = NaiveDate::from_ymd_opt(2025, 2, 4).unwrap();
let t = NaiveTime::from_hms_opt(8, 26, 13).unwrap();
assert_eq!(timestamp.date().unwrap(), d);
assert_eq!(timestamp.time(), t);
processor.process(captures, timestamp);
assert_eq!(processor.records.len(), 1);
let record = &processor.records[0];
assert_eq!(record.value, 3.17);
assert_eq!(record.count, 1);
assert_eq!(record.diff, None);
}
#[test]
fn test_line_processing_time_only() {
init_tracing_test();
let log_line = "08:26:13 AM 1000 25131 6737.00 3.17 817575604 3179060 2.41 polkadot-parach";
let resolved_line =
plot_line("input.log", Some("polkadot-parach"), r"^\s+(?:[\d\.]+\s+){3}([\d\.]+)");
let mut processor = LineProcessor::from_data_source(
resolved_line.line.data_source,
Some(PathBuf::from("output.csv")),
"%I:%M:%S %p".into(),
"input.log".into(),
false,
)
.unwrap();
assert!(processor.guard_matches(log_line));
let (g, matched) = processor.try_match(log_line).unwrap();
let (captures, timestamp) = matched.unwrap();
assert!(g);
processor.process(captures, timestamp);
let t = NaiveTime::from_hms_opt(8, 26, 13).unwrap();
assert!(timestamp.date().is_none());
assert_eq!(timestamp.time(), t);
assert_eq!(processor.records.len(), 1);
let record = &processor.records[0];
assert_eq!(record.value, 3.17);
assert_eq!(record.count, 1);
assert_eq!(record.diff, None);
}
#[test]
fn test_line_processing_multi_line_field() {
init_tracing_test();
let log_lines = [
"2025-04-03 11:32:48.027 INFO main: operation duration=1.5",
"2025-04-03 11:32:48.054 INFO main: operation duration=2.5",
"2025-04-03 11:32:49.054 INFO main: operation duration=3.5",
"2025-04-03 11:33:49.154 INFO main: operation duration=4.5",
"2025-04-04 11:33:49.154 INFO main: operation duration=2.5",
];
let resolved_line = plot_line("input.log", Some("operation"), r"duration");
let mut processor = LineProcessor::from_data_source(
resolved_line.line.data_source,
Some(PathBuf::from("output.csv")),
DEFAULT_TIMESTAMP_FORMAT,
"input.log".into(),
false,
)
.unwrap();
for log_line in log_lines {
assert!(processor.guard_matches(log_line));
let (g, matched) = processor.try_match(log_line).unwrap();
let (captures, timestamp) = matched.unwrap();
assert!(g);
processor.process(captures, timestamp);
}
assert_eq!(processor.records.len(), 5);
let record = &processor.records[0];
assert_eq!(record.value, 1.5);
assert_eq!(record.count, 1);
assert_eq!(record.diff, None);
let record = &processor.records[1];
assert_eq!(record.value, 2.5);
assert_eq!(record.count, 2);
assert_eq!(record.diff.unwrap(), 27.0);
let record = &processor.records[2];
assert_eq!(record.value, 3.5);
assert_eq!(record.count, 3);
assert_eq!(record.diff.unwrap(), 1000.0);
let record = &processor.records[3];
assert_eq!(record.value, 4.5);
assert_eq!(record.count, 4);
assert_eq!(record.diff.unwrap(), 60100.0);
let record = &processor.records[4];
assert_eq!(record.value, 2.5);
assert_eq!(record.count, 5);
assert_eq!(record.diff.unwrap(), 86400000.0);
}
#[test]
fn test_line_processing_multi_line_field_sum() {
init_tracing_test();
let log_lines = [
"2025-04-03 11:32:48.027 INFO main: operation duration=1.5",
"2025-04-03 11:32:48.054 INFO main: operation duration=2.5",
"2025-04-03 11:32:49.054 INFO main: operation duration=3.5",
"2025-04-03 11:33:49.154 INFO main: operation duration=4.5",
"2025-04-04 11:33:49.154 INFO main: operation duration=2.5",
];
let resolved_line = field_sum_line("input.log", Some("operation"), r"duration");
let mut processor = LineProcessor::from_data_source(
resolved_line.line.data_source,
Some(PathBuf::from("output.csv")),
DEFAULT_TIMESTAMP_FORMAT,
"input.log".into(),
false,
)
.unwrap();
for log_line in log_lines {
assert!(processor.guard_matches(log_line));
let (g, matched) = processor.try_match(log_line).unwrap();
let (captures, timestamp) = matched.unwrap();
assert!(g);
processor.process(captures, timestamp);
}
assert_eq!(processor.records.len(), 5);
let record = &processor.records[0];
assert_eq!(record.value, 1.5);
assert_eq!(record.value_sum, 1.5);
assert_eq!(record.count, 1);
assert_eq!(record.diff, None);
let record = &processor.records[1];
assert_eq!(record.value, 2.5);
assert_eq!(record.value_sum, 4.0);
assert_eq!(record.count, 2);
assert_eq!(record.diff.unwrap(), 27.0);
let record = &processor.records[2];
assert_eq!(record.value, 3.5);
assert_eq!(record.value_sum, 7.5);
assert_eq!(record.count, 3);
assert_eq!(record.diff.unwrap(), 1000.0);
let record = &processor.records[3];
assert_eq!(record.value, 4.5);
assert_eq!(record.value_sum, 12.0);
assert_eq!(record.count, 4);
assert_eq!(record.diff.unwrap(), 60100.0);
let record = &processor.records[4];
assert_eq!(record.value, 2.5);
assert_eq!(record.value_sum, 14.5);
assert_eq!(record.count, 5);
assert_eq!(record.diff.unwrap(), 86400000.0);
}
#[test]
fn test_line_processing_multi_line_regex_global_guards() {
init_tracing_test();
let log_lines = [
(
true,
"2025-04-03 11:32:48.027 AAA BBB CCCC INFO main: operation duration:1.5ns, val:127.0",
),
(
true,
"2025-04-03 11:32:48.054 AAA BBB CCCC INFO main: operation duration:2.5us, val:127.0",
),
(
false,
"2025-04-03 11:32:49.054 AAA CCCC INFO main: operation duration:4.5ms, val:127.0",
),
(false, "2025-04-03 11:32:49.054 AAA INFO main: operation duration:4.5ms, val:127.0"),
(
true,
"2025-04-03 11:32:49.054 AAA BBB CCCC INFO main: operation duration:3.5ms, val:127.0",
),
(
true,
"2025-04-03 11:33:49.154 AAA BBB CCCC INFO main: operation duration:4.5s, val:127.0",
),
(
false,
"2025-04-04 11:33:49.154 AAX BBB CCCC INFO main: operation duration:2.5s, val:127.0",
),
(
true,
"2025-04-04 11:33:49.154 AAA BBB CCCC INFO main: operation duration:2.5s, val:127.0",
),
(false, "2025-04-04 11:33:49.154 INFO main: operation duration:2.2s, val:127.0"),
];
let resolved_line = plot_line("input.log", Some("operation"), r"duration:([\d\.]+)(\w+)?");
let mut processor = LineProcessor::from_data_source_with_global_guards(
resolved_line.line.data_source,
Some(PathBuf::from("output.csv")),
DEFAULT_TIMESTAMP_FORMAT,
"input.log".into(),
false,
vec!["AAA".to_string(), "BBB".to_string(), "CCCC".to_string()],
)
.unwrap();
for (guard_should_match, log_line) in log_lines {
assert_eq!(processor.guard_matches(log_line), guard_should_match);
let (guard_matched, matched) = processor.try_match(log_line).unwrap();
assert_eq!(guard_matched, guard_should_match);
if guard_should_match {
let (captures, timestamp) = matched.unwrap();
processor.process(captures, timestamp);
}
}
assert_eq!(processor.records.len(), 5);
let record = &processor.records[0];
assert_eq!(record.value, 1.5 / 1_000_000.0);
assert_eq!(record.count, 1);
assert_eq!(record.diff, None);
let record = &processor.records[1];
assert_eq!(record.value, 2.5 / 1000.0);
assert_eq!(record.count, 2);
assert_eq!(record.diff.unwrap(), 27.0);
let record = &processor.records[2];
assert_eq!(record.value, 3.5);
assert_eq!(record.count, 3);
assert_eq!(record.diff.unwrap(), 1000.0);
let record = &processor.records[3];
assert_eq!(record.value, 4500.0);
assert_eq!(record.count, 4);
assert_eq!(record.diff.unwrap(), 60100.0);
let record = &processor.records[4];
assert_eq!(record.value, 2500.0);
assert_eq!(record.count, 5);
assert_eq!(record.diff.unwrap(), 86400000.0);
}
#[test]
fn test_line_processing_multi_line_regex() {
init_tracing_test();
let log_lines = [
"2025-04-03 11:32:48.027 INFO main: operation duration:1.5ns, val:127.0",
"2025-04-03 11:32:48.054 INFO main: operation duration:2.5us, val:127.0",
"2025-04-03 11:32:49.054 INFO main: operation duration:3.5ms, val:127.0",
"2025-04-03 11:33:49.154 INFO main: operation duration:4.5s, val:127.0",
"2025-04-04 11:33:49.154 INFO main: operation duration:2.5s, val:127.0",
];
let resolved_line = plot_line("input.log", Some("operation"), r"duration:([\d\.]+)(\w+)?");
let mut processor = LineProcessor::from_data_source(
resolved_line.line.data_source,
Some(PathBuf::from("output.csv")),
DEFAULT_TIMESTAMP_FORMAT,
"input.log".into(),
false,
)
.unwrap();
for log_line in log_lines {
assert!(processor.guard_matches(log_line));
let (g, matched) = processor.try_match(log_line).unwrap();
let (captures, timestamp) = matched.unwrap();
assert!(g);
processor.process(captures, timestamp);
}
assert_eq!(processor.records.len(), 5);
let record = &processor.records[0];
assert_eq!(record.value, 1.5 / 1_000_000.0);
assert_eq!(record.count, 1);
assert_eq!(record.diff, None);
let record = &processor.records[1];
assert_eq!(record.value, 2.5 / 1000.0);
assert_eq!(record.count, 2);
assert_eq!(record.diff.unwrap(), 27.0);
let record = &processor.records[2];
assert_eq!(record.value, 3.5);
assert_eq!(record.count, 3);
assert_eq!(record.diff.unwrap(), 1000.0);
let record = &processor.records[3];
assert_eq!(record.value, 4500.0);
assert_eq!(record.count, 4);
assert_eq!(record.diff.unwrap(), 60100.0);
let record = &processor.records[4];
assert_eq!(record.value, 2500.0);
assert_eq!(record.count, 5);
assert_eq!(record.diff.unwrap(), 86400000.0);
}
#[test]
fn test_line_processing_bad_regex() {
let r = r"^\s+(?:[\d\.]+\s+){3}([\d\.]+)([\d\.]+)([\d\.]+)";
init_tracing_test();
let resolved_line = plot_line("input.log", Some("polkadot-parach"), r);
let err = LineProcessor::from_data_source(
resolved_line.line.data_source,
Some(PathBuf::from("output.csv")),
"%Y %j %I:%M:%S %p".into(),
"input.log".into(),
false,
)
.unwrap_err();
if let Error::RegexCapturesGroupsInvalidCount(x) = err {
assert_eq!(x, r);
} else {
panic!("incorrect error value");
}
}
}